Add Pool.allowRelease(). This allows a resource to indicate that it can be released without forcing it to deallocate immediately. R=rnystrom@google.com Review URL: https://codereview.chromium.org//1205133002.
diff --git a/pkgs/pool/CHANGELOG.md b/pkgs/pool/CHANGELOG.md index 4fa87ca..ec8fd79 100644 --- a/pkgs/pool/CHANGELOG.md +++ b/pkgs/pool/CHANGELOG.md
@@ -1,3 +1,8 @@ +## 1.1.0 + +* Add `PoolResource.allowRelease()`, which allows a resource to indicate that it + can be released without forcing it to deallocate immediately. + ## 1.0.2 * Fixed the homepage.
diff --git a/pkgs/pool/lib/pool.dart b/pkgs/pool/lib/pool.dart index 61482e3..6941229 100644 --- a/pkgs/pool/lib/pool.dart +++ b/pkgs/pool/lib/pool.dart
@@ -24,6 +24,19 @@ /// be completed. final _requestedResources = new Queue<Completer<PoolResource>>(); + /// Callbacks that must be called before additional resources can be + /// allocated. + /// + /// See [PoolResource.allowRelease]. + final _onReleaseCallbacks = new Queue<Function>(); + + /// Completers that will be completed once `onRelease` callbacks are done + /// running. + /// + /// These are kept in a queue to ensure that the earliest request completes + /// first regardless of what order the `onRelease` callbacks complete in. + final _onReleaseCompleters = new Queue<Completer<PoolResource>>(); + /// The maximum number of resources that may be allocated at once. final int _maxAllocatedResources; @@ -59,6 +72,8 @@ if (_allocatedResources < _maxAllocatedResources) { _allocatedResources++; return new Future.value(new PoolResource._(this)); + } else if (_onReleaseCallbacks.isNotEmpty) { + return _runOnRelease(_onReleaseCallbacks.removeFirst()); } else { var completer = new Completer<PoolResource>(); _requestedResources.add(completer); @@ -78,24 +93,53 @@ /// If there are any pending requests, this will fire the oldest one. void _onResourceReleased() { + _resetTimer(); + if (_requestedResources.isEmpty) { _allocatedResources--; - if (_timer != null) { - _timer.cancel(); - _timer = null; - } return; } - _resetTimer(); var pending = _requestedResources.removeFirst(); pending.complete(new PoolResource._(this)); } + /// If there are any pending requests, this will fire the oldest one after + /// running [onRelease]. + void _onResourceReleaseAllowed(onRelease()) { + _resetTimer(); + + if (_requestedResources.isEmpty) { + _onReleaseCallbacks.add( + Zone.current.bindCallback(onRelease, runGuarded: false)); + return; + } + + var pending = _requestedResources.removeFirst(); + pending.complete(_runOnRelease(onRelease)); + } + + /// Runs [onRelease] and returns a Future that completes to a resource once an + /// [onRelease] callback completes. + /// + /// Futures returned by [_runOnRelease] always complete in the order they were + /// created, even if earlier [onRelease] callbacks take longer to run. + Future<PoolResource> _runOnRelease(onRelease()) { + new Future.sync(onRelease).then((value) { + _onReleaseCompleters.removeFirst().complete(new PoolResource._(this)); + }).catchError((error, stackTrace) { + _onReleaseCompleters.removeFirst().completeError(error, stackTrace); + }); + + var completer = new Completer.sync(); + _onReleaseCompleters.add(completer); + return completer.future; + } + /// A resource has been requested, allocated, or released. void _resetTimer() { if (_timer != null) _timer.cancel(); - if (_timeout == null) { + if (_timeout == null || _requestedResources.isEmpty) { _timer = null; } else { _timer = new Timer(_timeout, _onTimeout); @@ -138,5 +182,25 @@ _released = true; _pool._onResourceReleased(); } + + /// Tells the parent [Pool] that the resource associated with this resource is + /// no longer necessary, but should remain allocated until more resources are + /// needed. + /// + /// When [Pool.request] is called and there are no remaining available + /// resources, the [onRelease] callback is called. It should free the + /// resource, and it may return a Future or `null`. Once that completes, the + /// [Pool.request] call will complete to a new [PoolResource]. + /// + /// This is useful when a resource's main function is complete, but it may + /// produce additional information later on. For example, an isolate's task + /// may be complete, but it could still emit asynchronous errors. + void allowRelease(onRelease()) { + if (_released) { + throw new StateError("A PoolResource may only be released once."); + } + _released = true; + _pool._onResourceReleaseAllowed(onRelease); + } }
diff --git a/pkgs/pool/pubspec.yaml b/pkgs/pool/pubspec.yaml index 19aac1e..175c8dc 100644 --- a/pkgs/pool/pubspec.yaml +++ b/pkgs/pool/pubspec.yaml
@@ -1,10 +1,12 @@ name: pool -version: 1.0.2 +version: 1.1.0 author: Dart Team <misc@dartlang.org> description: A class for managing a finite pool of resources. homepage: https://github.com/dart-lang/pool dependencies: stack_trace: ">=0.9.2 <2.0.0" +environment: + sdk: ">=1.9.0 <2.0.0" dev_dependencies: fake_async: ">=0.1.0 <0.2.0" test: ">=0.12.0 <0.13.0"
diff --git a/pkgs/pool/test/pool_test.dart b/pkgs/pool/test/pool_test.dart index b654801..64bc3a6 100644 --- a/pkgs/pool/test/pool_test.dart +++ b/pkgs/pool/test/pool_test.dart
@@ -159,6 +159,119 @@ }); }); }); + + group("allowRelease()", () { + test("runs the callback once the resource limit is exceeded", () async { + var pool = new Pool(50); + var requests = []; + for (var i = 0; i < 49; i++) { + expect(pool.request(), completes); + } + + var resource = await pool.request(); + var onReleaseCalled = false; + resource.allowRelease(() => onReleaseCalled = true); + await new Future.delayed(Duration.ZERO); + expect(onReleaseCalled, isFalse); + + expect(pool.request(), completes); + await new Future.delayed(Duration.ZERO); + expect(onReleaseCalled, isTrue); + }); + + test("runs the callback immediately if there are blocked requests", + () async { + var pool = new Pool(1); + var resource = await pool.request(); + + // This will be blocked until [resource.allowRelease] is called. + expect(pool.request(), completes); + + var onReleaseCalled = false; + resource.allowRelease(() => onReleaseCalled = true); + await new Future.delayed(Duration.ZERO); + expect(onReleaseCalled, isTrue); + }); + + test("blocks the request until the callback completes", () async { + var pool = new Pool(1); + var resource = await pool.request(); + + var requestComplete = false; + pool.request().then((_) => requestComplete = true); + + var completer = new Completer(); + resource.allowRelease(() => completer.future); + await new Future.delayed(Duration.ZERO); + expect(requestComplete, isFalse); + + completer.complete(); + await new Future.delayed(Duration.ZERO); + expect(requestComplete, isTrue); + }); + + test("completes requests in request order regardless of callback order", + () async { + var pool = new Pool(2); + var resource1 = await pool.request(); + var resource2 = await pool.request(); + + var request1Complete = false; + pool.request().then((_) => request1Complete = true); + var request2Complete = false; + pool.request().then((_) => request2Complete = true); + + var onRelease1Called = false; + var completer1 = new Completer(); + resource1.allowRelease(() { + onRelease1Called = true; + return completer1.future; + }); + await new Future.delayed(Duration.ZERO); + expect(onRelease1Called, isTrue); + + var onRelease2Called = false; + var completer2 = new Completer(); + resource2.allowRelease(() { + onRelease2Called = true; + return completer2.future; + }); + await new Future.delayed(Duration.ZERO); + expect(onRelease2Called, isTrue); + expect(request1Complete, isFalse); + expect(request2Complete, isFalse); + + // Complete the second resource's onRelease callback first. Even though it + // was triggered by the second blocking request, it should complete the + // first one to preserve ordering. + completer2.complete(); + await new Future.delayed(Duration.ZERO); + expect(request1Complete, isTrue); + expect(request2Complete, isFalse); + + completer1.complete(); + await new Future.delayed(Duration.ZERO); + expect(request1Complete, isTrue); + expect(request2Complete, isTrue); + }); + + test("runs onRequest in the zone it was created", () async { + var pool = new Pool(1); + var resource = await pool.request(); + + var outerZone = Zone.current; + runZoned(() { + var innerZone = Zone.current; + expect(innerZone, isNot(equals(outerZone))); + + resource.allowRelease(expectAsync(() { + expect(Zone.current, equals(innerZone)); + })); + }); + + pool.request(); + }); + }); } /// Returns a function that will cause the test to fail if it's called.