Add Pool.close().

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//1393193004 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ec8fd79..9f74d07 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 1.2.0
+
+* Add `Pool.close()`, which forbids new resource requests and releases all
+  releasable resources.
+
 ## 1.1.0
 
 * Add `PoolResource.allowRelease()`, which allows a resource to indicate that it
diff --git a/lib/pool.dart b/lib/pool.dart
index e8ee99c..59b949e 100644
--- a/lib/pool.dart
+++ b/lib/pool.dart
@@ -7,6 +7,7 @@
 import 'dart:async';
 import 'dart:collection';
 
+import 'package:async/async.dart';
 import 'package:stack_trace/stack_trace.dart';
 
 /// Manages an abstract pool of resources with a limit on how many may be in use
@@ -55,6 +56,15 @@
   /// The amount of time to wait before timing out the pending resources.
   final Duration _timeout;
 
+  /// A [FutureGroup] that tracks all the `onRelease` callbacks for resources
+  /// that have been marked releasable.
+  ///
+  /// This is `null` until [close] is called.
+  FutureGroup _closeGroup;
+
+  /// Whether [close] has been called.
+  bool get isClosed => _closeGroup != null;
+
   /// Creates a new pool with the given limit on how many resources may be
   /// allocated at once.
   ///
@@ -69,6 +79,10 @@
   /// If the maximum number of resources is already allocated, this will delay
   /// until one of them is released.
   Future<PoolResource> request() {
+    if (isClosed) {
+      throw new StateError("request() may not be called on a closed Pool.");
+    }
+
     if (_allocatedResources < _maxAllocatedResources) {
       _allocatedResources++;
       return new Future.value(new PoolResource._(this));
@@ -87,20 +101,56 @@
   ///
   /// The return value of [callback] is piped to the returned Future.
   Future withResource(callback()) {
-    return request().then((resource) => new Future.sync(callback).whenComplete(resource.release));
+    if (isClosed) {
+      throw new StateError(
+          "withResource() may not be called on a closed Pool.");
+    }
+
+    // TODO(nweiz): Use async/await when sdk#23497 is fixed.
+    return request().then((resource) {
+      return new Future.sync(callback).whenComplete(resource.release);
+    });
+  }
+
+  /// Closes the pool so that no more resources are requested.
+  ///
+  /// Existing resource requests remain unchanged.
+  ///
+  /// Any resources that are marked as releasable using
+  /// [PoolResource.allowRelease] are released immediately. Once all resources
+  /// have been released and any `onRelease` callbacks have completed, the
+  /// returned future completes successfully. If any `onRelease` callback throws
+  /// an error, the returned future completes with that error.
+  ///
+  /// This may be called more than once; it returns the same [Future] each time.
+  Future close() {
+    if (_closeGroup != null) return _closeGroup.future;
+
+    _resetTimer();
+
+    _closeGroup = new FutureGroup();
+    for (var callback in _onReleaseCallbacks) {
+      _closeGroup.add(new Future.sync(callback));
+    }
+
+    _allocatedResources -= _onReleaseCallbacks.length;
+    _onReleaseCallbacks.clear();
+
+    if (_allocatedResources == 0) _closeGroup.close();
+    return _closeGroup.future;
   }
 
   /// If there are any pending requests, this will fire the oldest one.
   void _onResourceReleased() {
     _resetTimer();
 
-    if (_requestedResources.isEmpty) {
+    if (_requestedResources.isNotEmpty) {
+      var pending = _requestedResources.removeFirst();
+      pending.complete(new PoolResource._(this));
+    } else {
       _allocatedResources--;
-      return;
+      if (isClosed && _allocatedResources == 0) _closeGroup.close();
     }
-
-    var pending = _requestedResources.removeFirst();
-    pending.complete(new PoolResource._(this));
   }
 
   /// If there are any pending requests, this will fire the oldest one after
@@ -108,14 +158,17 @@
   void _onResourceReleaseAllowed(onRelease()) {
     _resetTimer();
 
-    if (_requestedResources.isEmpty) {
+    if (_requestedResources.isNotEmpty) {
+      var pending = _requestedResources.removeFirst();
+      pending.complete(_runOnRelease(onRelease));
+    } else if (isClosed) {
+      _closeGroup.add(new Future.sync(onRelease));
+      _allocatedResources--;
+      if (_allocatedResources == 0) _closeGroup.close();
+    } else {
       _onReleaseCallbacks.add(
           Zone.current.bindCallback(onRelease, runGuarded: false));
-      return;
     }
-
-    var pending = _requestedResources.removeFirst();
-    pending.complete(_runOnRelease(onRelease));
   }
 
   /// Runs [onRelease] and returns a Future that completes to a resource once an
@@ -202,4 +255,3 @@
     _pool._onResourceReleaseAllowed(onRelease);
   }
 }
-
diff --git a/pubspec.yaml b/pubspec.yaml
index cc2d367..0013e9e 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,9 +1,10 @@
 name: pool
-version: 1.1.1-dev
+version: 1.2.0
 author: Dart Team <misc@dartlang.org>
 description: A class for managing a finite pool of resources.
 homepage: https://github.com/dart-lang/pool
 dependencies:
+  async: "^1.3.0"
   stack_trace: ">=0.9.2 <2.0.0"
 environment:
   sdk: ">=1.9.0 <2.0.0"
diff --git a/test/pool_test.dart b/test/pool_test.dart
index 5dfef2a..65fd00e 100644
--- a/test/pool_test.dart
+++ b/test/pool_test.dart
@@ -271,6 +271,139 @@
       pool.request();
     });
   });
+
+  group("close()", () {
+    test("disallows request() and withResource()", () {
+      var pool = new Pool(1)..close();
+      expect(pool.request, throwsStateError);
+      expect(() => pool.withResource(() {}), throwsStateError);
+    });
+
+    test("pending requests are fulfilled", () async {
+      var pool = new Pool(1);
+      var resource1 = await pool.request();
+      expect(pool.request().then((resource2) {
+        resource2.release();
+      }), completes);
+      expect(pool.close(), completes);
+      resource1.release();
+    });
+
+    test("pending requests are fulfilled with allowRelease", () async {
+      var pool = new Pool(1);
+      var resource1 = await pool.request();
+
+      var completer = new Completer();
+      expect(pool.request().then((resource2) {
+        expect(completer.isCompleted, isTrue);
+        resource2.release();
+      }), completes);
+      expect(pool.close(), completes);
+
+      resource1.allowRelease(() => completer.future);
+      await new Future.delayed(Duration.ZERO);
+
+      completer.complete();
+    });
+
+    test("doesn't complete until all resources are released", () async {
+      var pool = new Pool(2);
+      var resource1 = await pool.request();
+      var resource2 = await pool.request();
+      var resource3Future = pool.request();
+
+      var resource1Released = false;
+      var resource2Released = false;
+      var resource3Released = false;
+      expect(pool.close().then((_) {
+        expect(resource1Released, isTrue);
+        expect(resource2Released, isTrue);
+        expect(resource3Released, isTrue);
+      }), completes);
+
+      resource1Released = true;
+      resource1.release();
+      await new Future.delayed(Duration.ZERO);
+
+      resource2Released = true;
+      resource2.release();
+      await new Future.delayed(Duration.ZERO);
+
+      var resource3 = await resource3Future;
+      resource3Released = true;
+      resource3.release();
+    });
+
+    test("active onReleases complete as usual", () async {
+      var pool = new Pool(1);
+      var resource = await pool.request();
+
+      // Set up an onRelease callback whose completion is controlled by
+      // [completer].
+      var completer = new Completer();
+      resource.allowRelease(() => completer.future);
+      expect(pool.request().then((_) {
+        expect(completer.isCompleted, isTrue);
+      }), completes);
+
+      await new Future.delayed(Duration.ZERO);
+      pool.close();
+
+      await new Future.delayed(Duration.ZERO);
+      completer.complete();
+    });
+
+    test("inactive onReleases fire", () async {
+      var pool = new Pool(2);
+      var resource1 = await pool.request();
+      var resource2 = await pool.request();
+
+      var completer1 = new Completer();
+      resource1.allowRelease(() => completer1.future);
+      var completer2 = new Completer();
+      resource2.allowRelease(() => completer2.future);
+
+      expect(pool.close().then((_) {
+        expect(completer1.isCompleted, isTrue);
+        expect(completer2.isCompleted, isTrue);
+      }), completes);
+
+      await new Future.delayed(Duration.ZERO);
+      completer1.complete();
+
+      await new Future.delayed(Duration.ZERO);
+      completer2.complete();
+    });
+
+    test("new allowReleases fire immediately", () async {
+      var pool = new Pool(1);
+      var resource = await pool.request();
+
+      var completer = new Completer();
+      expect(pool.close().then((_) {
+        expect(completer.isCompleted, isTrue);
+      }), completes);
+
+      await new Future.delayed(Duration.ZERO);
+      resource.allowRelease(() => completer.future);
+
+      await new Future.delayed(Duration.ZERO);
+      completer.complete();
+    });
+
+    test("an onRelease error is piped to the return value", () async {
+      var pool = new Pool(1);
+      var resource = await pool.request();
+
+      var completer = new Completer();
+      resource.allowRelease(() => completer.future);
+
+      expect(pool.close(), throwsA("oh no!"));
+
+      await new Future.delayed(Duration.ZERO);
+      completer.completeError("oh no!");
+    });
+  });
 }
 
 /// Returns a function that will cause the test to fail if it's called.