Add Pool.allowRelease().

This allows a resource to indicate that it can be released without forcing it to
deallocate immediately.

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//1205133002.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4fa87ca..ec8fd79 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 1.1.0
+
+* Add `PoolResource.allowRelease()`, which allows a resource to indicate that it
+  can be released without forcing it to deallocate immediately.
+
 ## 1.0.2
 
 * Fixed the homepage.
diff --git a/lib/pool.dart b/lib/pool.dart
index 61482e3..6941229 100644
--- a/lib/pool.dart
+++ b/lib/pool.dart
@@ -24,6 +24,19 @@
   /// be completed.
   final _requestedResources = new Queue<Completer<PoolResource>>();
 
+  /// Callbacks that must be called before additional resources can be
+  /// allocated.
+  ///
+  /// See [PoolResource.allowRelease].
+  final _onReleaseCallbacks = new Queue<Function>();
+
+  /// Completers that will be completed once `onRelease` callbacks are done
+  /// running.
+  ///
+  /// These are kept in a queue to ensure that the earliest request completes
+  /// first regardless of what order the `onRelease` callbacks complete in.
+  final _onReleaseCompleters = new Queue<Completer<PoolResource>>();
+
   /// The maximum number of resources that may be allocated at once.
   final int _maxAllocatedResources;
 
@@ -59,6 +72,8 @@
     if (_allocatedResources < _maxAllocatedResources) {
       _allocatedResources++;
       return new Future.value(new PoolResource._(this));
+    } else if (_onReleaseCallbacks.isNotEmpty) {
+      return _runOnRelease(_onReleaseCallbacks.removeFirst());
     } else {
       var completer = new Completer<PoolResource>();
       _requestedResources.add(completer);
@@ -78,24 +93,53 @@
 
   /// If there are any pending requests, this will fire the oldest one.
   void _onResourceReleased() {
+    _resetTimer();
+
     if (_requestedResources.isEmpty) {
       _allocatedResources--;
-      if (_timer != null) {
-        _timer.cancel();
-        _timer = null;
-      }
       return;
     }
 
-    _resetTimer();
     var pending = _requestedResources.removeFirst();
     pending.complete(new PoolResource._(this));
   }
 
+  /// If there are any pending requests, this will fire the oldest one after
+  /// running [onRelease].
+  void _onResourceReleaseAllowed(onRelease()) {
+    _resetTimer();
+
+    if (_requestedResources.isEmpty) {
+      _onReleaseCallbacks.add(
+          Zone.current.bindCallback(onRelease, runGuarded: false));
+      return;
+    }
+
+    var pending = _requestedResources.removeFirst();
+    pending.complete(_runOnRelease(onRelease));
+  }
+
+  /// Runs [onRelease] and returns a Future that completes to a resource once an
+  /// [onRelease] callback completes.
+  ///
+  /// Futures returned by [_runOnRelease] always complete in the order they were
+  /// created, even if earlier [onRelease] callbacks take longer to run.
+  Future<PoolResource> _runOnRelease(onRelease()) {
+    new Future.sync(onRelease).then((value) {
+      _onReleaseCompleters.removeFirst().complete(new PoolResource._(this));
+    }).catchError((error, stackTrace) {
+      _onReleaseCompleters.removeFirst().completeError(error, stackTrace);
+    });
+
+    var completer = new Completer.sync();
+    _onReleaseCompleters.add(completer);
+    return completer.future;
+  }
+
   /// A resource has been requested, allocated, or released.
   void _resetTimer() {
     if (_timer != null) _timer.cancel();
-    if (_timeout == null) {
+    if (_timeout == null || _requestedResources.isEmpty) {
       _timer = null;
     } else {
       _timer = new Timer(_timeout, _onTimeout);
@@ -138,5 +182,25 @@
     _released = true;
     _pool._onResourceReleased();
   }
+
+  /// Tells the parent [Pool] that the resource associated with this resource is
+  /// no longer necessary, but should remain allocated until more resources are
+  /// needed.
+  ///
+  /// When [Pool.request] is called and there are no remaining available
+  /// resources, the [onRelease] callback is called. It should free the
+  /// resource, and it may return a Future or `null`. Once that completes, the
+  /// [Pool.request] call will complete to a new [PoolResource].
+  ///
+  /// This is useful when a resource's main function is complete, but it may
+  /// produce additional information later on. For example, an isolate's task
+  /// may be complete, but it could still emit asynchronous errors.
+  void allowRelease(onRelease()) {
+    if (_released) {
+      throw new StateError("A PoolResource may only be released once.");
+    }
+    _released = true;
+    _pool._onResourceReleaseAllowed(onRelease);
+  }
 }
 
diff --git a/pubspec.yaml b/pubspec.yaml
index 19aac1e..175c8dc 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,10 +1,12 @@
 name: pool
-version: 1.0.2
+version: 1.1.0
 author: Dart Team <misc@dartlang.org>
 description: A class for managing a finite pool of resources.
 homepage: https://github.com/dart-lang/pool
 dependencies:
   stack_trace: ">=0.9.2 <2.0.0"
+environment:
+  sdk: ">=1.9.0 <2.0.0"
 dev_dependencies:
   fake_async: ">=0.1.0 <0.2.0"
   test: ">=0.12.0 <0.13.0"
diff --git a/test/pool_test.dart b/test/pool_test.dart
index b654801..64bc3a6 100644
--- a/test/pool_test.dart
+++ b/test/pool_test.dart
@@ -159,6 +159,119 @@
       });
     });
   });
+
+  group("allowRelease()", () {
+    test("runs the callback once the resource limit is exceeded", () async {
+      var pool = new Pool(50);
+      var requests = [];
+      for (var i = 0; i < 49; i++) {
+        expect(pool.request(), completes);
+      }
+
+      var resource = await pool.request();
+      var onReleaseCalled = false;
+      resource.allowRelease(() => onReleaseCalled = true);
+      await new Future.delayed(Duration.ZERO);
+      expect(onReleaseCalled, isFalse);
+
+      expect(pool.request(), completes);
+      await new Future.delayed(Duration.ZERO);
+      expect(onReleaseCalled, isTrue);
+    });
+
+    test("runs the callback immediately if there are blocked requests",
+        () async {
+      var pool = new Pool(1);
+      var resource = await pool.request();
+
+      // This will be blocked until [resource.allowRelease] is called.
+      expect(pool.request(), completes);
+
+      var onReleaseCalled = false;
+      resource.allowRelease(() => onReleaseCalled = true);
+      await new Future.delayed(Duration.ZERO);
+      expect(onReleaseCalled, isTrue);
+    });
+
+    test("blocks the request until the callback completes", () async {
+      var pool = new Pool(1);
+      var resource = await pool.request();
+
+      var requestComplete = false;
+      pool.request().then((_) => requestComplete = true);
+
+      var completer = new Completer();
+      resource.allowRelease(() => completer.future);
+      await new Future.delayed(Duration.ZERO);
+      expect(requestComplete, isFalse);
+
+      completer.complete();
+      await new Future.delayed(Duration.ZERO);
+      expect(requestComplete, isTrue);
+    });
+
+    test("completes requests in request order regardless of callback order",
+        () async {
+      var pool = new Pool(2);
+      var resource1 = await pool.request();
+      var resource2 = await pool.request();
+
+      var request1Complete = false;
+      pool.request().then((_) => request1Complete = true);
+      var request2Complete = false;
+      pool.request().then((_) => request2Complete = true);
+
+      var onRelease1Called = false;
+      var completer1 = new Completer();
+      resource1.allowRelease(() {
+        onRelease1Called = true;
+        return completer1.future;
+      });
+      await new Future.delayed(Duration.ZERO);
+      expect(onRelease1Called, isTrue);
+
+      var onRelease2Called = false;
+      var completer2 = new Completer();
+      resource2.allowRelease(() {
+        onRelease2Called = true;
+        return completer2.future;
+      });
+      await new Future.delayed(Duration.ZERO);
+      expect(onRelease2Called, isTrue);
+      expect(request1Complete, isFalse);
+      expect(request2Complete, isFalse);
+
+      // Complete the second resource's onRelease callback first. Even though it
+      // was triggered by the second blocking request, it should complete the
+      // first one to preserve ordering.
+      completer2.complete();
+      await new Future.delayed(Duration.ZERO);
+      expect(request1Complete, isTrue);
+      expect(request2Complete, isFalse);
+
+      completer1.complete();
+      await new Future.delayed(Duration.ZERO);
+      expect(request1Complete, isTrue);
+      expect(request2Complete, isTrue);
+    });
+
+    test("runs onRequest in the zone it was created", () async {
+      var pool = new Pool(1);
+      var resource = await pool.request();
+
+      var outerZone = Zone.current;
+      runZoned(() {
+        var innerZone = Zone.current;
+        expect(innerZone, isNot(equals(outerZone)));
+
+        resource.allowRelease(expectAsync(() {
+          expect(Zone.current, equals(innerZone));
+        }));
+      });
+
+      pool.request();
+    });
+  });
 }
 
 /// Returns a function that will cause the test to fail if it's called.