Move pub/barback's Pool class into its own package.
R=rnystrom@google.com
Review URL: https://codereview.chromium.org//399963004
git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart/pkg/pool@38525 260f80e4-7a28-3924-810f-c04153c831b5
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..5c60afe
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,26 @@
+Copyright 2014, the Dart project authors. All rights reserved.
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following
+ disclaimer in the documentation and/or other materials provided
+ with the distribution.
+ * Neither the name of Google Inc. nor the names of its
+ contributors may be used to endorse or promote products derived
+ from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..8987288
--- /dev/null
+++ b/README.md
@@ -0,0 +1,53 @@
+The pool package exposes a `Pool` class which makes it easy to manage a limited
+pool of resources.
+
+The easiest way to use a pool is by calling `withResource`. This runs a callback
+and returns its result, but only once there aren't too many other callbacks
+currently running.
+
+```dart
+// Create a Pool that will only allocate 10 resources at once. After 30 seconds
+// of inactivity with all resources checked out, the pool will throw an error.
+final pool = new Pool(10, timeout: new Duration(seconds: 30));
+
+Future<String> readFile(String path) {
+ // Since the call to [File.readAsString] is within [withResource], no more
+ // than ten files will be open at once.
+ return pool.withResource(() => return new File(path).readAsString());
+}
+```
+
+For more fine-grained control, the user can also explicitly request generic
+`PoolResource` objects that can later be released back into the pool. This is
+what `withResource` does under the covers: requests a resource, then releases it
+once the callback completes.
+
+`Pool` ensures that only a limited number of resources are allocated at once.
+It's the caller's responsibility to ensure that the corresponding physical
+resource is only consumed when a `PoolResource` is allocated.
+
+```dart
+class PooledFile implements RandomAccessFile {
+ final RandomAccessFile _file;
+ final PoolResource _resource;
+
+ static Future<PooledFile> open(String path) {
+ return pool.request().then((resource) {
+ return new File(path).open().then((file) {
+ return new PooledFile._(file, resource);
+ });
+ });
+ }
+
+ PooledFile(this._file, this._resource);
+
+ // ...
+
+ Future<RandomAccessFile> close() {
+ return _file.close.then((_) {
+ _resource.release();
+ return this;
+ });
+ }
+}
+```
diff --git a/lib/pool.dart b/lib/pool.dart
new file mode 100644
index 0000000..36a29c8
--- /dev/null
+++ b/lib/pool.dart
@@ -0,0 +1,139 @@
+// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library pool;
+
+import 'dart:async';
+import 'dart:collection';
+
+import 'package:stack_trace/stack_trace.dart';
+
+/// Manages an abstract pool of resources with a limit on how many may be in use
+/// at once.
+///
+/// When a resource is needed, the user should call [request]. When the returned
+/// future completes with a [PoolResource], the resource may be allocated. Once
+/// the resource has been released, the user should call [PoolResource.release].
+/// The pool will ensure that only a certain number of [PoolResource]s may be
+/// allocated at once.
+class Pool {
+ /// Completers for requests beyond the first [_maxAllocatedResources].
+ ///
+ /// When an item is released, the next element of [_requestedResources] will
+ /// be completed.
+ final _requestedResources = new Queue<Completer<PoolResource>>();
+
+ /// The maximum number of resources that may be allocated at once.
+ final int _maxAllocatedResources;
+
+ /// The number of resources that are currently allocated.
+ int _allocatedResources = 0;
+
+ /// The timeout timer.
+ ///
+ /// If [_timeout] isn't null, this timer is set as soon as the resource limit
+ /// is reached and is reset every time an resource is released or a new
+ /// resource is requested. If it fires, that indicates that the caller became
+ /// deadlocked, likely due to files waiting for additional files to be read
+ /// before they could be closed.
+ Timer _timer;
+
+ /// The amount of time to wait before timing out the pending resources.
+ Duration _timeout;
+
+ /// Creates a new pool with the given limit on how many resources may be
+ /// allocated at once.
+ ///
+ /// If [timeout] is passed, then if that much time passes without any activity
+ /// all pending [request] futures will throw an exception. This is indented
+ /// to avoid deadlocks.
+ Pool(this._maxAllocatedResources, {Duration timeout})
+ : _timeout = timeout;
+
+ /// Request a [PoolResource].
+ ///
+ /// If the maximum number of resources is already allocated, this will delay
+ /// until one of them is released.
+ Future<PoolResource> request() {
+ if (_allocatedResources < _maxAllocatedResources) {
+ _allocatedResources++;
+ return new Future.value(new PoolResource._(this));
+ } else {
+ var completer = new Completer<PoolResource>();
+ _requestedResources.add(completer);
+ _resetTimer();
+ return completer.future;
+ }
+ }
+
+ /// Requests a resource for the duration of [callback], which may return a
+ /// Future.
+ ///
+ /// The return value of [callback] is piped to the returned Future.
+ Future withResource(callback()) {
+ return request().then((resource) =>
+ Chain.track(new Future.sync(callback)).whenComplete(resource.release));
+ }
+
+ /// If there are any pending requests, this will fire the oldest one.
+ void _onResourceReleased() {
+ if (_requestedResources.isEmpty) {
+ _allocatedResources--;
+ if (_timer != null) {
+ _timer.cancel();
+ _timer = null;
+ }
+ return;
+ }
+
+ _resetTimer();
+ var pending = _requestedResources.removeFirst();
+ pending.complete(new PoolResource._(this));
+ }
+
+ /// A resource has been requested, allocated, or released.
+ void _resetTimer() {
+ if (_timer != null) _timer.cancel();
+ if (_timeout == null) {
+ _timer = null;
+ } else {
+ _timer = new Timer(_timeout, _onTimeout);
+ }
+ }
+
+ /// Handles [_timer] timing out by causing all pending resource completers to
+ /// emit exceptions.
+ void _onTimeout() {
+ for (var completer in _requestedResources) {
+ completer.completeError("Pool deadlock: all resources have been "
+ "allocated for too long.", new Chain.current());
+ }
+ _requestedResources.clear();
+ _timer = null;
+ }
+}
+
+/// A member of a [Pool].
+///
+/// A [PoolResource] is a token that indicates that a resource is allocated.
+/// When the associated resource is released, the user should call [release].
+class PoolResource {
+ final Pool _pool;
+
+ /// Whether [this] has been released yet.
+ bool _released = false;
+
+ PoolResource._(this._pool);
+
+ /// Tells the parent [Pool] that the resource associated with this resource is
+ /// no longer allocated, and that a new [PoolResource] may be allocated.
+ void release() {
+ if (_released) {
+ throw new StateError("A PoolResource may only be released once.");
+ }
+ _released = true;
+ _pool._onResourceReleased();
+ }
+}
+
diff --git a/pubspec.yaml b/pubspec.yaml
new file mode 100644
index 0000000..847a547
--- /dev/null
+++ b/pubspec.yaml
@@ -0,0 +1,7 @@
+name: pool
+version: 1.0.0
+description: A class for managing a finite pool of resources.
+dependencies:
+ stack_trace: ">=0.9.2 <2.0.0"
+dev_dependencies:
+ unittest: ">=0.11.0 <0.12.0"
diff --git a/test/pool_test.dart b/test/pool_test.dart
new file mode 100644
index 0000000..40bcf8a
--- /dev/null
+++ b/test/pool_test.dart
@@ -0,0 +1,137 @@
+// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:pool/pool.dart';
+import 'package:stack_trace/stack_trace.dart';
+import 'package:unittest/unittest.dart';
+
+void main() {
+ group("request()", () {
+ test("resources can be requested freely up to the limit", () {
+ var pool = new Pool(50);
+ var requests = [];
+ for (var i = 0; i < 50; i++) {
+ expect(pool.request(), completes);
+ }
+ });
+
+ test("resources block past the limit", () {
+ var pool = new Pool(50);
+ var requests = [];
+ for (var i = 0; i < 50; i++) {
+ expect(pool.request(), completes);
+ }
+ expect(pool.request(), doesNotComplete);
+ });
+
+ test("a blocked resource is allocated when another is released", () {
+ var pool = new Pool(50);
+ var requests = [];
+ for (var i = 0; i < 49; i++) {
+ expect(pool.request(), completes);
+ }
+
+ return pool.request().then((lastAllocatedResource) {
+ var blockedResource = pool.request();
+
+ return pumpEventQueue().then((_) {
+ lastAllocatedResource.release();
+ return pumpEventQueue();
+ }).then((_) {
+ expect(blockedResource, completes);
+ });
+ });
+ });
+ });
+
+ group("withResource()", () {
+ test("can be called freely up to the limit", () {
+ var pool = new Pool(50);
+ var requests = [];
+ for (var i = 0; i < 50; i++) {
+ pool.withResource(expectAsync(() => new Completer().future));
+ }
+ });
+
+ test("blocks the callback past the limit", () {
+ var pool = new Pool(50);
+ var requests = [];
+ for (var i = 0; i < 50; i++) {
+ pool.withResource(expectAsync(() => new Completer().future));
+ }
+ pool.withResource(expectNoAsync());
+ });
+
+ test("a blocked resource is allocated when another is released", () {
+ var pool = new Pool(50);
+ var requests = [];
+ for (var i = 0; i < 49; i++) {
+ pool.withResource(expectAsync(() => new Completer().future));
+ }
+
+ var completer = new Completer();
+ var lastAllocatedResource = pool.withResource(() => completer.future);
+ var blockedResourceAllocated = false;
+ var blockedResource = pool.withResource(() {
+ blockedResourceAllocated = true;
+ });
+
+ return pumpEventQueue().then((_) {
+ expect(blockedResourceAllocated, isFalse);
+ completer.complete();
+ return pumpEventQueue();
+ }).then((_) {
+ expect(blockedResourceAllocated, isTrue);
+ });
+ });
+ });
+
+ // TODO(nweiz): test timeouts when seaneagan's fake_async package lands.
+}
+
+/// Returns a [Future] that completes after pumping the event queue [times]
+/// times. By default, this should pump the event queue enough times to allow
+/// any code to run, as long as it's not waiting on some external event.
+Future pumpEventQueue([int times = 20]) {
+ if (times == 0) return new Future.value();
+ // We use a delayed future to allow microtask events to finish. The
+ // Future.value or Future() constructors use scheduleMicrotask themselves and
+ // would therefore not wait for microtask callbacks that are scheduled after
+ // invoking this method.
+ return new Future.delayed(Duration.ZERO, () => pumpEventQueue(times - 1));
+}
+
+/// Returns a function that will cause the test to fail if it's called.
+///
+/// This won't let the test complete until it's confident that the function
+/// won't be called.
+Function expectNoAsync() {
+ // Make sure the test lasts long enough for the function to get called if it's
+ // going to get called.
+ expect(pumpEventQueue(), completes);
+
+ var stack = new Trace.current(1);
+ return () => handleExternalError(
+ new TestFailure("Expected function not to be called."), "",
+ stack);
+}
+
+/// A matcher for Futures that asserts that they don't complete.
+///
+/// This won't let the test complete until it's confident that the function
+/// won't be called.
+Matcher get doesNotComplete => predicate((future) {
+ expect(future, new isInstanceOf<Future>('Future'));
+ // Make sure the test lasts long enough for the function to get called if it's
+ // going to get called.
+ expect(pumpEventQueue(), completes);
+
+ var stack = new Trace.current(1);
+ future.then((_) => handleExternalError(
+ new TestFailure("Expected future not to complete."), "",
+ stack));
+ return true;
+});