Add Pool.allowRelease().
This allows a resource to indicate that it can be released without forcing it to
deallocate immediately.
R=rnystrom@google.com
Review URL: https://codereview.chromium.org//1205133002.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4fa87ca..ec8fd79 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 1.1.0
+
+* Add `PoolResource.allowRelease()`, which allows a resource to indicate that it
+ can be released without forcing it to deallocate immediately.
+
## 1.0.2
* Fixed the homepage.
diff --git a/lib/pool.dart b/lib/pool.dart
index 61482e3..6941229 100644
--- a/lib/pool.dart
+++ b/lib/pool.dart
@@ -24,6 +24,19 @@
/// be completed.
final _requestedResources = new Queue<Completer<PoolResource>>();
+ /// Callbacks that must be called before additional resources can be
+ /// allocated.
+ ///
+ /// See [PoolResource.allowRelease].
+ final _onReleaseCallbacks = new Queue<Function>();
+
+ /// Completers that will be completed once `onRelease` callbacks are done
+ /// running.
+ ///
+ /// These are kept in a queue to ensure that the earliest request completes
+ /// first regardless of what order the `onRelease` callbacks complete in.
+ final _onReleaseCompleters = new Queue<Completer<PoolResource>>();
+
/// The maximum number of resources that may be allocated at once.
final int _maxAllocatedResources;
@@ -59,6 +72,8 @@
if (_allocatedResources < _maxAllocatedResources) {
_allocatedResources++;
return new Future.value(new PoolResource._(this));
+ } else if (_onReleaseCallbacks.isNotEmpty) {
+ return _runOnRelease(_onReleaseCallbacks.removeFirst());
} else {
var completer = new Completer<PoolResource>();
_requestedResources.add(completer);
@@ -78,24 +93,53 @@
/// If there are any pending requests, this will fire the oldest one.
void _onResourceReleased() {
+ _resetTimer();
+
if (_requestedResources.isEmpty) {
_allocatedResources--;
- if (_timer != null) {
- _timer.cancel();
- _timer = null;
- }
return;
}
- _resetTimer();
var pending = _requestedResources.removeFirst();
pending.complete(new PoolResource._(this));
}
+ /// If there are any pending requests, this will fire the oldest one after
+ /// running [onRelease].
+ void _onResourceReleaseAllowed(onRelease()) {
+ _resetTimer();
+
+ if (_requestedResources.isEmpty) {
+ _onReleaseCallbacks.add(
+ Zone.current.bindCallback(onRelease, runGuarded: false));
+ return;
+ }
+
+ var pending = _requestedResources.removeFirst();
+ pending.complete(_runOnRelease(onRelease));
+ }
+
+ /// Runs [onRelease] and returns a Future that completes to a resource once an
+ /// [onRelease] callback completes.
+ ///
+ /// Futures returned by [_runOnRelease] always complete in the order they were
+ /// created, even if earlier [onRelease] callbacks take longer to run.
+ Future<PoolResource> _runOnRelease(onRelease()) {
+ new Future.sync(onRelease).then((value) {
+ _onReleaseCompleters.removeFirst().complete(new PoolResource._(this));
+ }).catchError((error, stackTrace) {
+ _onReleaseCompleters.removeFirst().completeError(error, stackTrace);
+ });
+
+ var completer = new Completer.sync();
+ _onReleaseCompleters.add(completer);
+ return completer.future;
+ }
+
/// A resource has been requested, allocated, or released.
void _resetTimer() {
if (_timer != null) _timer.cancel();
- if (_timeout == null) {
+ if (_timeout == null || _requestedResources.isEmpty) {
_timer = null;
} else {
_timer = new Timer(_timeout, _onTimeout);
@@ -138,5 +182,25 @@
_released = true;
_pool._onResourceReleased();
}
+
+ /// Tells the parent [Pool] that the resource associated with this resource is
+ /// no longer necessary, but should remain allocated until more resources are
+ /// needed.
+ ///
+ /// When [Pool.request] is called and there are no remaining available
+ /// resources, the [onRelease] callback is called. It should free the
+ /// resource, and it may return a Future or `null`. Once that completes, the
+ /// [Pool.request] call will complete to a new [PoolResource].
+ ///
+ /// This is useful when a resource's main function is complete, but it may
+ /// produce additional information later on. For example, an isolate's task
+ /// may be complete, but it could still emit asynchronous errors.
+ void allowRelease(onRelease()) {
+ if (_released) {
+ throw new StateError("A PoolResource may only be released once.");
+ }
+ _released = true;
+ _pool._onResourceReleaseAllowed(onRelease);
+ }
}
diff --git a/pubspec.yaml b/pubspec.yaml
index 19aac1e..175c8dc 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,10 +1,12 @@
name: pool
-version: 1.0.2
+version: 1.1.0
author: Dart Team <misc@dartlang.org>
description: A class for managing a finite pool of resources.
homepage: https://github.com/dart-lang/pool
dependencies:
stack_trace: ">=0.9.2 <2.0.0"
+environment:
+ sdk: ">=1.9.0 <2.0.0"
dev_dependencies:
fake_async: ">=0.1.0 <0.2.0"
test: ">=0.12.0 <0.13.0"
diff --git a/test/pool_test.dart b/test/pool_test.dart
index b654801..64bc3a6 100644
--- a/test/pool_test.dart
+++ b/test/pool_test.dart
@@ -159,6 +159,119 @@
});
});
});
+
+ group("allowRelease()", () {
+ test("runs the callback once the resource limit is exceeded", () async {
+ var pool = new Pool(50);
+ var requests = [];
+ for (var i = 0; i < 49; i++) {
+ expect(pool.request(), completes);
+ }
+
+ var resource = await pool.request();
+ var onReleaseCalled = false;
+ resource.allowRelease(() => onReleaseCalled = true);
+ await new Future.delayed(Duration.ZERO);
+ expect(onReleaseCalled, isFalse);
+
+ expect(pool.request(), completes);
+ await new Future.delayed(Duration.ZERO);
+ expect(onReleaseCalled, isTrue);
+ });
+
+ test("runs the callback immediately if there are blocked requests",
+ () async {
+ var pool = new Pool(1);
+ var resource = await pool.request();
+
+ // This will be blocked until [resource.allowRelease] is called.
+ expect(pool.request(), completes);
+
+ var onReleaseCalled = false;
+ resource.allowRelease(() => onReleaseCalled = true);
+ await new Future.delayed(Duration.ZERO);
+ expect(onReleaseCalled, isTrue);
+ });
+
+ test("blocks the request until the callback completes", () async {
+ var pool = new Pool(1);
+ var resource = await pool.request();
+
+ var requestComplete = false;
+ pool.request().then((_) => requestComplete = true);
+
+ var completer = new Completer();
+ resource.allowRelease(() => completer.future);
+ await new Future.delayed(Duration.ZERO);
+ expect(requestComplete, isFalse);
+
+ completer.complete();
+ await new Future.delayed(Duration.ZERO);
+ expect(requestComplete, isTrue);
+ });
+
+ test("completes requests in request order regardless of callback order",
+ () async {
+ var pool = new Pool(2);
+ var resource1 = await pool.request();
+ var resource2 = await pool.request();
+
+ var request1Complete = false;
+ pool.request().then((_) => request1Complete = true);
+ var request2Complete = false;
+ pool.request().then((_) => request2Complete = true);
+
+ var onRelease1Called = false;
+ var completer1 = new Completer();
+ resource1.allowRelease(() {
+ onRelease1Called = true;
+ return completer1.future;
+ });
+ await new Future.delayed(Duration.ZERO);
+ expect(onRelease1Called, isTrue);
+
+ var onRelease2Called = false;
+ var completer2 = new Completer();
+ resource2.allowRelease(() {
+ onRelease2Called = true;
+ return completer2.future;
+ });
+ await new Future.delayed(Duration.ZERO);
+ expect(onRelease2Called, isTrue);
+ expect(request1Complete, isFalse);
+ expect(request2Complete, isFalse);
+
+ // Complete the second resource's onRelease callback first. Even though it
+ // was triggered by the second blocking request, it should complete the
+ // first one to preserve ordering.
+ completer2.complete();
+ await new Future.delayed(Duration.ZERO);
+ expect(request1Complete, isTrue);
+ expect(request2Complete, isFalse);
+
+ completer1.complete();
+ await new Future.delayed(Duration.ZERO);
+ expect(request1Complete, isTrue);
+ expect(request2Complete, isTrue);
+ });
+
+ test("runs onRequest in the zone it was created", () async {
+ var pool = new Pool(1);
+ var resource = await pool.request();
+
+ var outerZone = Zone.current;
+ runZoned(() {
+ var innerZone = Zone.current;
+ expect(innerZone, isNot(equals(outerZone)));
+
+ resource.allowRelease(expectAsync(() {
+ expect(Zone.current, equals(innerZone));
+ }));
+ });
+
+ pool.request();
+ });
+ });
}
/// Returns a function that will cause the test to fail if it's called.