Add Pool.close().
R=rnystrom@google.com
Review URL: https://codereview.chromium.org//1393193004 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ec8fd79..9f74d07 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 1.2.0
+
+* Add `Pool.close()`, which forbids new resource requests and releases all
+ releasable resources.
+
## 1.1.0
* Add `PoolResource.allowRelease()`, which allows a resource to indicate that it
diff --git a/lib/pool.dart b/lib/pool.dart
index e8ee99c..59b949e 100644
--- a/lib/pool.dart
+++ b/lib/pool.dart
@@ -7,6 +7,7 @@
import 'dart:async';
import 'dart:collection';
+import 'package:async/async.dart';
import 'package:stack_trace/stack_trace.dart';
/// Manages an abstract pool of resources with a limit on how many may be in use
@@ -55,6 +56,15 @@
/// The amount of time to wait before timing out the pending resources.
final Duration _timeout;
+ /// A [FutureGroup] that tracks all the `onRelease` callbacks for resources
+ /// that have been marked releasable.
+ ///
+ /// This is `null` until [close] is called.
+ FutureGroup _closeGroup;
+
+ /// Whether [close] has been called.
+ bool get isClosed => _closeGroup != null;
+
/// Creates a new pool with the given limit on how many resources may be
/// allocated at once.
///
@@ -69,6 +79,10 @@
/// If the maximum number of resources is already allocated, this will delay
/// until one of them is released.
Future<PoolResource> request() {
+ if (isClosed) {
+ throw new StateError("request() may not be called on a closed Pool.");
+ }
+
if (_allocatedResources < _maxAllocatedResources) {
_allocatedResources++;
return new Future.value(new PoolResource._(this));
@@ -87,20 +101,56 @@
///
/// The return value of [callback] is piped to the returned Future.
Future withResource(callback()) {
- return request().then((resource) => new Future.sync(callback).whenComplete(resource.release));
+ if (isClosed) {
+ throw new StateError(
+ "withResource() may not be called on a closed Pool.");
+ }
+
+ // TODO(nweiz): Use async/await when sdk#23497 is fixed.
+ return request().then((resource) {
+ return new Future.sync(callback).whenComplete(resource.release);
+ });
+ }
+
+ /// Closes the pool so that no more resources are requested.
+ ///
+ /// Existing resource requests remain unchanged.
+ ///
+ /// Any resources that are marked as releasable using
+ /// [PoolResource.allowRelease] are released immediately. Once all resources
+ /// have been released and any `onRelease` callbacks have completed, the
+ /// returned future completes successfully. If any `onRelease` callback throws
+ /// an error, the returned future completes with that error.
+ ///
+ /// This may be called more than once; it returns the same [Future] each time.
+ Future close() {
+ if (_closeGroup != null) return _closeGroup.future;
+
+ _resetTimer();
+
+ _closeGroup = new FutureGroup();
+ for (var callback in _onReleaseCallbacks) {
+ _closeGroup.add(new Future.sync(callback));
+ }
+
+ _allocatedResources -= _onReleaseCallbacks.length;
+ _onReleaseCallbacks.clear();
+
+ if (_allocatedResources == 0) _closeGroup.close();
+ return _closeGroup.future;
}
/// If there are any pending requests, this will fire the oldest one.
void _onResourceReleased() {
_resetTimer();
- if (_requestedResources.isEmpty) {
+ if (_requestedResources.isNotEmpty) {
+ var pending = _requestedResources.removeFirst();
+ pending.complete(new PoolResource._(this));
+ } else {
_allocatedResources--;
- return;
+ if (isClosed && _allocatedResources == 0) _closeGroup.close();
}
-
- var pending = _requestedResources.removeFirst();
- pending.complete(new PoolResource._(this));
}
/// If there are any pending requests, this will fire the oldest one after
@@ -108,14 +158,17 @@
void _onResourceReleaseAllowed(onRelease()) {
_resetTimer();
- if (_requestedResources.isEmpty) {
+ if (_requestedResources.isNotEmpty) {
+ var pending = _requestedResources.removeFirst();
+ pending.complete(_runOnRelease(onRelease));
+ } else if (isClosed) {
+ _closeGroup.add(new Future.sync(onRelease));
+ _allocatedResources--;
+ if (_allocatedResources == 0) _closeGroup.close();
+ } else {
_onReleaseCallbacks.add(
Zone.current.bindCallback(onRelease, runGuarded: false));
- return;
}
-
- var pending = _requestedResources.removeFirst();
- pending.complete(_runOnRelease(onRelease));
}
/// Runs [onRelease] and returns a Future that completes to a resource once an
@@ -202,4 +255,3 @@
_pool._onResourceReleaseAllowed(onRelease);
}
}
-
diff --git a/pubspec.yaml b/pubspec.yaml
index cc2d367..0013e9e 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,9 +1,10 @@
name: pool
-version: 1.1.1-dev
+version: 1.2.0
author: Dart Team <misc@dartlang.org>
description: A class for managing a finite pool of resources.
homepage: https://github.com/dart-lang/pool
dependencies:
+ async: "^1.3.0"
stack_trace: ">=0.9.2 <2.0.0"
environment:
sdk: ">=1.9.0 <2.0.0"
diff --git a/test/pool_test.dart b/test/pool_test.dart
index 5dfef2a..65fd00e 100644
--- a/test/pool_test.dart
+++ b/test/pool_test.dart
@@ -271,6 +271,139 @@
pool.request();
});
});
+
+ group("close()", () {
+ test("disallows request() and withResource()", () {
+ var pool = new Pool(1)..close();
+ expect(pool.request, throwsStateError);
+ expect(() => pool.withResource(() {}), throwsStateError);
+ });
+
+ test("pending requests are fulfilled", () async {
+ var pool = new Pool(1);
+ var resource1 = await pool.request();
+ expect(pool.request().then((resource2) {
+ resource2.release();
+ }), completes);
+ expect(pool.close(), completes);
+ resource1.release();
+ });
+
+ test("pending requests are fulfilled with allowRelease", () async {
+ var pool = new Pool(1);
+ var resource1 = await pool.request();
+
+ var completer = new Completer();
+ expect(pool.request().then((resource2) {
+ expect(completer.isCompleted, isTrue);
+ resource2.release();
+ }), completes);
+ expect(pool.close(), completes);
+
+ resource1.allowRelease(() => completer.future);
+ await new Future.delayed(Duration.ZERO);
+
+ completer.complete();
+ });
+
+ test("doesn't complete until all resources are released", () async {
+ var pool = new Pool(2);
+ var resource1 = await pool.request();
+ var resource2 = await pool.request();
+ var resource3Future = pool.request();
+
+ var resource1Released = false;
+ var resource2Released = false;
+ var resource3Released = false;
+ expect(pool.close().then((_) {
+ expect(resource1Released, isTrue);
+ expect(resource2Released, isTrue);
+ expect(resource3Released, isTrue);
+ }), completes);
+
+ resource1Released = true;
+ resource1.release();
+ await new Future.delayed(Duration.ZERO);
+
+ resource2Released = true;
+ resource2.release();
+ await new Future.delayed(Duration.ZERO);
+
+ var resource3 = await resource3Future;
+ resource3Released = true;
+ resource3.release();
+ });
+
+ test("active onReleases complete as usual", () async {
+ var pool = new Pool(1);
+ var resource = await pool.request();
+
+ // Set up an onRelease callback whose completion is controlled by
+ // [completer].
+ var completer = new Completer();
+ resource.allowRelease(() => completer.future);
+ expect(pool.request().then((_) {
+ expect(completer.isCompleted, isTrue);
+ }), completes);
+
+ await new Future.delayed(Duration.ZERO);
+ pool.close();
+
+ await new Future.delayed(Duration.ZERO);
+ completer.complete();
+ });
+
+ test("inactive onReleases fire", () async {
+ var pool = new Pool(2);
+ var resource1 = await pool.request();
+ var resource2 = await pool.request();
+
+ var completer1 = new Completer();
+ resource1.allowRelease(() => completer1.future);
+ var completer2 = new Completer();
+ resource2.allowRelease(() => completer2.future);
+
+ expect(pool.close().then((_) {
+ expect(completer1.isCompleted, isTrue);
+ expect(completer2.isCompleted, isTrue);
+ }), completes);
+
+ await new Future.delayed(Duration.ZERO);
+ completer1.complete();
+
+ await new Future.delayed(Duration.ZERO);
+ completer2.complete();
+ });
+
+ test("new allowReleases fire immediately", () async {
+ var pool = new Pool(1);
+ var resource = await pool.request();
+
+ var completer = new Completer();
+ expect(pool.close().then((_) {
+ expect(completer.isCompleted, isTrue);
+ }), completes);
+
+ await new Future.delayed(Duration.ZERO);
+ resource.allowRelease(() => completer.future);
+
+ await new Future.delayed(Duration.ZERO);
+ completer.complete();
+ });
+
+ test("an onRelease error is piped to the return value", () async {
+ var pool = new Pool(1);
+ var resource = await pool.request();
+
+ var completer = new Completer();
+ resource.allowRelease(() => completer.future);
+
+ expect(pool.close(), throwsA("oh no!"));
+
+ await new Future.delayed(Duration.ZERO);
+ completer.completeError("oh no!");
+ });
+ });
}
/// Returns a function that will cause the test to fail if it's called.