Move pub/barback's Pool class into its own package.

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//399963004

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart/pkg/pool@38525 260f80e4-7a28-3924-810f-c04153c831b5
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..5c60afe
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,26 @@
+Copyright 2014, the Dart project authors. All rights reserved.
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+    * Redistributions of source code must retain the above copyright
+      notice, this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above
+      copyright notice, this list of conditions and the following
+      disclaimer in the documentation and/or other materials provided
+      with the distribution.
+    * Neither the name of Google Inc. nor the names of its
+      contributors may be used to endorse or promote products derived
+      from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..8987288
--- /dev/null
+++ b/README.md
@@ -0,0 +1,53 @@
+The pool package exposes a `Pool` class which makes it easy to manage a limited
+pool of resources.
+
+The easiest way to use a pool is by calling `withResource`. This runs a callback
+and returns its result, but only once there aren't too many other callbacks
+currently running.
+
+```dart
+// Create a Pool that will only allocate 10 resources at once. After 30 seconds
+// of inactivity with all resources checked out, the pool will throw an error.
+final pool = new Pool(10, timeout: new Duration(seconds: 30));
+
+Future<String> readFile(String path) {
+  // Since the call to [File.readAsString] is within [withResource], no more
+  // than ten files will be open at once.
+  return pool.withResource(() => return new File(path).readAsString());
+}
+```
+
+For more fine-grained control, the user can also explicitly request generic
+`PoolResource` objects that can later be released back into the pool. This is
+what `withResource` does under the covers: requests a resource, then releases it
+once the callback completes.
+
+`Pool` ensures that only a limited number of resources are allocated at once.
+It's the caller's responsibility to ensure that the corresponding physical
+resource is only consumed when a `PoolResource` is allocated.
+
+```dart
+class PooledFile implements RandomAccessFile {
+  final RandomAccessFile _file;
+  final PoolResource _resource;
+
+  static Future<PooledFile> open(String path) {
+    return pool.request().then((resource) {
+      return new File(path).open().then((file) {
+        return new PooledFile._(file, resource);
+      });
+    });
+  }
+
+  PooledFile(this._file, this._resource);
+
+  // ...
+
+  Future<RandomAccessFile> close() {
+    return _file.close.then((_) {
+      _resource.release();
+      return this;
+    });
+  }
+}
+```
diff --git a/lib/pool.dart b/lib/pool.dart
new file mode 100644
index 0000000..36a29c8
--- /dev/null
+++ b/lib/pool.dart
@@ -0,0 +1,139 @@
+// Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library pool;
+
+import 'dart:async';
+import 'dart:collection';
+
+import 'package:stack_trace/stack_trace.dart';
+
+/// Manages an abstract pool of resources with a limit on how many may be in use
+/// at once.
+///
+/// When a resource is needed, the user should call [request]. When the returned
+/// future completes with a [PoolResource], the resource may be allocated. Once
+/// the resource has been released, the user should call [PoolResource.release].
+/// The pool will ensure that only a certain number of [PoolResource]s may be
+/// allocated at once.
+class Pool {
+  /// Completers for requests beyond the first [_maxAllocatedResources].
+  ///
+  /// When an item is released, the next element of [_requestedResources] will
+  /// be completed.
+  final _requestedResources = new Queue<Completer<PoolResource>>();
+
+  /// The maximum number of resources that may be allocated at once.
+  final int _maxAllocatedResources;
+
+  /// The number of resources that are currently allocated.
+  int _allocatedResources = 0;
+
+  /// The timeout timer.
+  ///
+  /// If [_timeout] isn't null, this timer is set as soon as the resource limit
+  /// is reached and is reset every time an resource is released or a new
+  /// resource is requested. If it fires, that indicates that the caller became
+  /// deadlocked, likely due to files waiting for additional files to be read
+  /// before they could be closed.
+  Timer _timer;
+
+  /// The amount of time to wait before timing out the pending resources.
+  Duration _timeout;
+
+  /// Creates a new pool with the given limit on how many resources may be
+  /// allocated at once.
+  ///
+  /// If [timeout] is passed, then if that much time passes without any activity
+  /// all pending [request] futures will throw an exception. This is indented
+  /// to avoid deadlocks.
+  Pool(this._maxAllocatedResources, {Duration timeout})
+      : _timeout = timeout;
+
+  /// Request a [PoolResource].
+  ///
+  /// If the maximum number of resources is already allocated, this will delay
+  /// until one of them is released.
+  Future<PoolResource> request() {
+    if (_allocatedResources < _maxAllocatedResources) {
+      _allocatedResources++;
+      return new Future.value(new PoolResource._(this));
+    } else {
+      var completer = new Completer<PoolResource>();
+      _requestedResources.add(completer);
+      _resetTimer();
+      return completer.future;
+    }
+  }
+
+  /// Requests a resource for the duration of [callback], which may return a
+  /// Future.
+  ///
+  /// The return value of [callback] is piped to the returned Future.
+  Future withResource(callback()) {
+    return request().then((resource) =>
+        Chain.track(new Future.sync(callback)).whenComplete(resource.release));
+  }
+
+  /// If there are any pending requests, this will fire the oldest one.
+  void _onResourceReleased() {
+    if (_requestedResources.isEmpty) {
+      _allocatedResources--;
+      if (_timer != null) {
+        _timer.cancel();
+        _timer = null;
+      }
+      return;
+    }
+
+    _resetTimer();
+    var pending = _requestedResources.removeFirst();
+    pending.complete(new PoolResource._(this));
+  }
+
+  /// A resource has been requested, allocated, or released.
+  void _resetTimer() {
+    if (_timer != null) _timer.cancel();
+    if (_timeout == null) {
+      _timer = null;
+    } else {
+      _timer = new Timer(_timeout, _onTimeout);
+    }
+  }
+
+  /// Handles [_timer] timing out by causing all pending resource completers to
+  /// emit exceptions.
+  void _onTimeout() {
+    for (var completer in _requestedResources) {
+      completer.completeError("Pool deadlock: all resources have been "
+          "allocated for too long.", new Chain.current());
+    }
+    _requestedResources.clear();
+    _timer = null;
+  }
+}
+
+/// A member of a [Pool].
+///
+/// A [PoolResource] is a token that indicates that a resource is allocated.
+/// When the associated resource is released, the user should call [release].
+class PoolResource {
+  final Pool _pool;
+
+  /// Whether [this] has been released yet.
+  bool _released = false;
+
+  PoolResource._(this._pool);
+
+  /// Tells the parent [Pool] that the resource associated with this resource is
+  /// no longer allocated, and that a new [PoolResource] may be allocated.
+  void release() {
+    if (_released) {
+      throw new StateError("A PoolResource may only be released once.");
+    }
+    _released = true;
+    _pool._onResourceReleased();
+  }
+}
+
diff --git a/pubspec.yaml b/pubspec.yaml
new file mode 100644
index 0000000..847a547
--- /dev/null
+++ b/pubspec.yaml
@@ -0,0 +1,7 @@
+name: pool
+version: 1.0.0
+description: A class for managing a finite pool of resources.
+dependencies:
+  stack_trace: ">=0.9.2 <2.0.0"
+dev_dependencies:
+  unittest: ">=0.11.0 <0.12.0"
diff --git a/test/pool_test.dart b/test/pool_test.dart
new file mode 100644
index 0000000..40bcf8a
--- /dev/null
+++ b/test/pool_test.dart
@@ -0,0 +1,137 @@
+// Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:pool/pool.dart';
+import 'package:stack_trace/stack_trace.dart';
+import 'package:unittest/unittest.dart';
+
+void main() {
+  group("request()", () {
+    test("resources can be requested freely up to the limit", () {
+      var pool = new Pool(50);
+      var requests = [];
+      for (var i = 0; i < 50; i++) {
+        expect(pool.request(), completes);
+      }
+    });
+
+    test("resources block past the limit", () {
+      var pool = new Pool(50);
+      var requests = [];
+      for (var i = 0; i < 50; i++) {
+        expect(pool.request(), completes);
+      }
+      expect(pool.request(), doesNotComplete);
+    });
+
+    test("a blocked resource is allocated when another is released", () {
+      var pool = new Pool(50);
+      var requests = [];
+      for (var i = 0; i < 49; i++) {
+        expect(pool.request(), completes);
+      }
+
+      return pool.request().then((lastAllocatedResource) {
+        var blockedResource = pool.request();
+
+        return pumpEventQueue().then((_) {
+          lastAllocatedResource.release();
+          return pumpEventQueue();
+        }).then((_) {
+          expect(blockedResource, completes);
+        });
+      });
+    });
+  });
+
+  group("withResource()", () {
+    test("can be called freely up to the limit", () {
+      var pool = new Pool(50);
+      var requests = [];
+      for (var i = 0; i < 50; i++) {
+        pool.withResource(expectAsync(() => new Completer().future));
+      }
+    });
+
+    test("blocks the callback past the limit", () {
+      var pool = new Pool(50);
+      var requests = [];
+      for (var i = 0; i < 50; i++) {
+        pool.withResource(expectAsync(() => new Completer().future));
+      }
+      pool.withResource(expectNoAsync());
+    });
+
+    test("a blocked resource is allocated when another is released", () {
+      var pool = new Pool(50);
+      var requests = [];
+      for (var i = 0; i < 49; i++) {
+        pool.withResource(expectAsync(() => new Completer().future));
+      }
+
+      var completer = new Completer();
+      var lastAllocatedResource = pool.withResource(() => completer.future);
+      var blockedResourceAllocated = false;
+      var blockedResource = pool.withResource(() {
+        blockedResourceAllocated = true;
+      });
+
+      return pumpEventQueue().then((_) {
+        expect(blockedResourceAllocated, isFalse);
+        completer.complete();
+        return pumpEventQueue();
+      }).then((_) {
+        expect(blockedResourceAllocated, isTrue);
+      });
+    });
+  });
+
+  // TODO(nweiz): test timeouts when seaneagan's fake_async package lands.
+}
+
+/// Returns a [Future] that completes after pumping the event queue [times]
+/// times. By default, this should pump the event queue enough times to allow
+/// any code to run, as long as it's not waiting on some external event.
+Future pumpEventQueue([int times = 20]) {
+  if (times == 0) return new Future.value();
+  // We use a delayed future to allow microtask events to finish. The
+  // Future.value or Future() constructors use scheduleMicrotask themselves and
+  // would therefore not wait for microtask callbacks that are scheduled after
+  // invoking this method.
+  return new Future.delayed(Duration.ZERO, () => pumpEventQueue(times - 1));
+}
+
+/// Returns a function that will cause the test to fail if it's called.
+///
+/// This won't let the test complete until it's confident that the function
+/// won't be called.
+Function expectNoAsync() {
+  // Make sure the test lasts long enough for the function to get called if it's
+  // going to get called.
+  expect(pumpEventQueue(), completes);
+
+  var stack = new Trace.current(1);
+  return () => handleExternalError(
+      new TestFailure("Expected function not to be called."), "",
+      stack);
+}
+
+/// A matcher for Futures that asserts that they don't complete.
+///
+/// This won't let the test complete until it's confident that the function
+/// won't be called.
+Matcher get doesNotComplete => predicate((future) {
+  expect(future, new isInstanceOf<Future>('Future'));
+  // Make sure the test lasts long enough for the function to get called if it's
+  // going to get called.
+  expect(pumpEventQueue(), completes);
+
+  var stack = new Trace.current(1);
+  future.then((_) => handleExternalError(
+      new TestFailure("Expected future not to complete."), "",
+      stack));
+  return true;
+});