Merge pull request #1259 from dart-lang/merge-pool-package
Merge `package:pool`
diff --git a/.github/ISSUE_TEMPLATE/pool.md b/.github/ISSUE_TEMPLATE/pool.md
new file mode 100644
index 0000000..7af32c4
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/pool.md
@@ -0,0 +1,5 @@
+---
+name: "package:pool"
+about: "Create a bug or file a feature request against package:pool."
+labels: "package:pool"
+---
\ No newline at end of file
diff --git a/.github/labeler.yml b/.github/labeler.yml
index 39d78aa..c4d658f 100644
--- a/.github/labeler.yml
+++ b/.github/labeler.yml
@@ -84,6 +84,10 @@
- changed-files:
- any-glob-to-any-file: 'pkgs/package_config/**'
+'package:pool':
+ - changed-files:
+ - any-glob-to-any-file: 'pkgs/pool/**'
+
'package:source_map_stack_trace':
- changed-files:
- any-glob-to-any-file: 'pkgs/source_map_stack_trace/**'
diff --git a/.github/workflows/pool.yaml b/.github/workflows/pool.yaml
new file mode 100644
index 0000000..6d64062
--- /dev/null
+++ b/.github/workflows/pool.yaml
@@ -0,0 +1,78 @@
+name: package:pool
+
+on:
+ # Run on PRs and pushes to the default branch.
+ push:
+ branches: [ main ]
+ paths:
+ - '.github/workflows/pool.yaml'
+ - 'pkgs/pool/**'
+ pull_request:
+ branches: [ main ]
+ paths:
+ - '.github/workflows/pool.yaml'
+ - 'pkgs/pool/**'
+ schedule:
+ - cron: "0 0 * * 0"
+
+env:
+ PUB_ENVIRONMENT: bot.github
+
+
+defaults:
+ run:
+ working-directory: pkgs/pool/
+
+jobs:
+ # Check code formatting and static analysis on a single OS (linux)
+ # against Dart dev.
+ analyze:
+ runs-on: ubuntu-latest
+ strategy:
+ fail-fast: false
+ matrix:
+ sdk: [dev]
+ steps:
+ - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
+ - uses: dart-lang/setup-dart@e630b99d28a3b71860378cafdc2a067c71107f94
+ with:
+ sdk: ${{ matrix.sdk }}
+ - id: install
+ name: Install dependencies
+ run: dart pub get
+ - name: Check formatting
+ run: dart format --output=none --set-exit-if-changed .
+ if: always() && steps.install.outcome == 'success'
+ - name: Analyze code
+ run: dart analyze --fatal-infos
+ if: always() && steps.install.outcome == 'success'
+
+ # Run tests on a matrix consisting of two dimensions:
+ # 1. OS: ubuntu-latest, (macos-latest, windows-latest)
+ # 2. release channel: dev
+ test:
+ needs: analyze
+ runs-on: ${{ matrix.os }}
+ strategy:
+ fail-fast: false
+ matrix:
+ # Add macos-latest and/or windows-latest if relevant for this package.
+ os: [ubuntu-latest]
+ sdk: [3.4, dev]
+ steps:
+ - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
+ - uses: dart-lang/setup-dart@e630b99d28a3b71860378cafdc2a067c71107f94
+ with:
+ sdk: ${{ matrix.sdk }}
+ - id: install
+ name: Install dependencies
+ run: dart pub get
+ - name: Run VM tests
+ run: dart test --platform vm
+ if: always() && steps.install.outcome == 'success'
+ - name: Run Chrome tests
+ run: dart test --platform chrome
+ if: always() && steps.install.outcome == 'success'
+ - name: Run Chrome tests - wasm
+ run: dart test --platform chrome -c dart2wasm
+ if: always() && steps.install.outcome == 'success' && matrix.sdk == 'dev'
diff --git a/README.md b/README.md
index 0acab2c..0b97b21 100644
--- a/README.md
+++ b/README.md
@@ -34,6 +34,7 @@
| [mime](pkgs/mime/) | Utilities for handling media (MIME) types, including determining a type from a file extension and file contents. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Amime) | [](https://pub.dev/packages/mime) |
| [oauth2](pkgs/oauth2/) | A client library for authenticating with a remote service via OAuth2 on behalf of a user, and making authorized HTTP requests with the user's OAuth2 credentials. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aoauth2) | [](https://pub.dev/packages/oauth2) |
| [package_config](pkgs/package_config/) | Support for reading and writing Dart Package Configuration files. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Apackage_config) | [](https://pub.dev/packages/package_config) |
+| [pool](pkgs/pool/) | Manage a finite pool of resources. Useful for controlling concurrent file system or network requests. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Apool) | [](https://pub.dev/packages/pool) |
| [source_map_stack_trace](pkgs/source_map_stack_trace/) | A package for applying source maps to stack traces. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Asource_map_stack_trace) | [](https://pub.dev/packages/source_map_stack_trace) |
| [unified_analytics](pkgs/unified_analytics/) | A package for logging analytics for all Dart and Flutter related tooling to Google Analytics. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aunified_analytics) | [](https://pub.dev/packages/unified_analytics) |
diff --git a/pkgs/pool/.gitignore b/pkgs/pool/.gitignore
new file mode 100644
index 0000000..e450c83
--- /dev/null
+++ b/pkgs/pool/.gitignore
@@ -0,0 +1,5 @@
+# Don’t commit the following directories created by pub.
+.dart_tool/
+.packages
+.pub/
+pubspec.lock
diff --git a/pkgs/pool/CHANGELOG.md b/pkgs/pool/CHANGELOG.md
new file mode 100644
index 0000000..56424fc
--- /dev/null
+++ b/pkgs/pool/CHANGELOG.md
@@ -0,0 +1,105 @@
+## 1.5.2-wip
+
+* Require Dart 3.4.
+* Move to `dart-lang/tools` monorepo.
+
+## 1.5.1
+
+* Populate the pubspec `repository` field.
+
+## 1.5.0
+
+* Stable release for null safety.
+
+## 1.5.0-nullsafety.3
+
+* Update SDK constraints to `>=2.12.0-0 <3.0.0` based on beta release
+ guidelines.
+
+## 1.5.0-nullsafety.2
+
+* Allow prerelease versions of the 2.12 sdk.
+
+## 1.5.0-nullsafety.1
+
+* Allow 2.10 stable and 2.11.0 dev SDK versions.
+
+## 1.5.0-nullsafety
+
+* Migrate to null safety.
+* `forEach`: Avoid `await null` if the `Stream` is not paused.
+ Improves trivial benchmark by 40%.
+
+## 1.4.0
+
+* Add `forEach` to `Pool` to support efficient async processing of an
+ `Iterable`.
+
+* Throw ArgumentError if poolSize <= 0
+
+## 1.3.6
+
+* Set max SDK version to `<3.0.0`, and adjust other dependencies.
+
+## 1.3.5
+
+- Updated SDK version to 2.0.0-dev.17.0
+
+## 1.3.4
+
+* Modify code to eliminate Future flattening.
+
+## 1.3.3
+
+* Declare support for `async` 2.0.0.
+
+## 1.3.2
+
+* Update to make the code work with strong-mode clean Zone API.
+
+* Required minimum SDK of 1.23.0.
+
+## 1.3.1
+
+* Fix the type annotation of `Pool.withResource()` to indicate that it takes
+ `() -> FutureOr<T>`.
+
+## 1.3.0
+
+* Add a `Pool.done` getter that returns the same future returned by
+ `Pool.close()`.
+
+## 1.2.4
+
+* Fix a strong-mode error.
+
+## 1.2.3
+
+* Fix a bug in which `Pool.withResource()` could throw a `StateError` when
+ called immediately before closing the pool.
+
+## 1.2.2
+
+* Fix strong mode warnings and add generic method annotations.
+
+## 1.2.1
+
+* Internal changes only.
+
+## 1.2.0
+
+* Add `Pool.close()`, which forbids new resource requests and releases all
+ releasable resources.
+
+## 1.1.0
+
+* Add `PoolResource.allowRelease()`, which allows a resource to indicate that it
+ can be released without forcing it to deallocate immediately.
+
+## 1.0.2
+
+* Fixed the homepage.
+
+## 1.0.1
+
+* A `TimeoutException` is now correctly thrown if the pool detects a deadlock.
diff --git a/pkgs/pool/LICENSE b/pkgs/pool/LICENSE
new file mode 100644
index 0000000..000cd7b
--- /dev/null
+++ b/pkgs/pool/LICENSE
@@ -0,0 +1,27 @@
+Copyright 2014, the Dart project authors.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following
+ disclaimer in the documentation and/or other materials provided
+ with the distribution.
+ * Neither the name of Google LLC nor the names of its
+ contributors may be used to endorse or promote products derived
+ from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/pkgs/pool/README.md b/pkgs/pool/README.md
new file mode 100644
index 0000000..461e872
--- /dev/null
+++ b/pkgs/pool/README.md
@@ -0,0 +1,57 @@
+[](https://github.com/dart-lang/tools/actions/workflows/pool.yaml)
+[](https://pub.dev/packages/pool)
+[](https://pub.dev/packages/pool/publisher)
+
+The pool package exposes a `Pool` class which makes it easy to manage a limited
+pool of resources.
+
+The easiest way to use a pool is by calling `withResource`. This runs a callback
+and returns its result, but only once there aren't too many other callbacks
+currently running.
+
+```dart
+// Create a Pool that will only allocate 10 resources at once. After 30 seconds
+// of inactivity with all resources checked out, the pool will throw an error.
+final pool = new Pool(10, timeout: new Duration(seconds: 30));
+
+Future<String> readFile(String path) {
+ // Since the call to [File.readAsString] is within [withResource], no more
+ // than ten files will be open at once.
+ return pool.withResource(() => new File(path).readAsString());
+}
+```
+
+For more fine-grained control, the user can also explicitly request generic
+`PoolResource` objects that can later be released back into the pool. This is
+what `withResource` does under the covers: requests a resource, then releases it
+once the callback completes.
+
+`Pool` ensures that only a limited number of resources are allocated at once.
+It's the caller's responsibility to ensure that the corresponding physical
+resource is only consumed when a `PoolResource` is allocated.
+
+```dart
+class PooledFile implements RandomAccessFile {
+ final RandomAccessFile _file;
+ final PoolResource _resource;
+
+ static Future<PooledFile> open(String path) {
+ return pool.request().then((resource) {
+ return new File(path).open().then((file) {
+ return new PooledFile._(file, resource);
+ });
+ });
+ }
+
+ PooledFile(this._file, this._resource);
+
+ // ...
+
+ Future<RandomAccessFile> close() {
+ return _file.close.then((_) {
+ _resource.release();
+ return this;
+ });
+ }
+}
+```
diff --git a/pkgs/pool/analysis_options.yaml b/pkgs/pool/analysis_options.yaml
new file mode 100644
index 0000000..44cda4d
--- /dev/null
+++ b/pkgs/pool/analysis_options.yaml
@@ -0,0 +1,5 @@
+include: package:dart_flutter_team_lints/analysis_options.yaml
+
+analyzer:
+ language:
+ strict-casts: true
diff --git a/pkgs/pool/benchmark/for_each_benchmark.dart b/pkgs/pool/benchmark/for_each_benchmark.dart
new file mode 100644
index 0000000..0cd2543
--- /dev/null
+++ b/pkgs/pool/benchmark/for_each_benchmark.dart
@@ -0,0 +1,55 @@
+// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:pool/pool.dart';
+
+void main(List<String> args) async {
+ var poolSize = args.isEmpty ? 5 : int.parse(args.first);
+ print('Pool size: $poolSize');
+
+ final pool = Pool(poolSize);
+ final watch = Stopwatch()..start();
+ final start = DateTime.now();
+
+ DateTime? lastLog;
+ Duration? fastest;
+ late int fastestIteration;
+ var i = 1;
+
+ void log(bool force) {
+ var now = DateTime.now();
+ if (force ||
+ lastLog == null ||
+ now.difference(lastLog!) > const Duration(seconds: 1)) {
+ lastLog = now;
+ print([
+ now.difference(start),
+ i.toString().padLeft(10),
+ fastestIteration.toString().padLeft(7),
+ fastest!.inMicroseconds.toString().padLeft(9)
+ ].join(' '));
+ }
+ }
+
+ print(['Elapsed ', 'Iterations', 'Fastest', 'Time (us)'].join(' '));
+
+ for (;; i++) {
+ watch.reset();
+
+ var sum = await pool
+ .forEach<int, int>(Iterable<int>.generate(100000), (i) => i)
+ .reduce((a, b) => a + b);
+
+ assert(sum == 4999950000, 'was $sum');
+
+ var elapsed = watch.elapsed;
+ if (fastest == null || fastest > elapsed) {
+ fastest = elapsed;
+ fastestIteration = i;
+ log(true);
+ } else {
+ log(false);
+ }
+ }
+}
diff --git a/pkgs/pool/lib/pool.dart b/pkgs/pool/lib/pool.dart
new file mode 100644
index 0000000..70e9df1
--- /dev/null
+++ b/pkgs/pool/lib/pool.dart
@@ -0,0 +1,380 @@
+// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:collection';
+
+import 'package:async/async.dart';
+import 'package:stack_trace/stack_trace.dart';
+
+/// Manages an abstract pool of resources with a limit on how many may be in use
+/// at once.
+///
+/// When a resource is needed, the user should call [request]. When the returned
+/// future completes with a [PoolResource], the resource may be allocated. Once
+/// the resource has been released, the user should call [PoolResource.release].
+/// The pool will ensure that only a certain number of [PoolResource]s may be
+/// allocated at once.
+class Pool {
+ /// Completers for requests beyond the first [_maxAllocatedResources].
+ ///
+ /// When an item is released, the next element of [_requestedResources] will
+ /// be completed.
+ final _requestedResources = Queue<Completer<PoolResource>>();
+
+ /// Callbacks that must be called before additional resources can be
+ /// allocated.
+ ///
+ /// See [PoolResource.allowRelease].
+ final _onReleaseCallbacks = Queue<void Function()>();
+
+ /// Completers that will be completed once `onRelease` callbacks are done
+ /// running.
+ ///
+ /// These are kept in a queue to ensure that the earliest request completes
+ /// first regardless of what order the `onRelease` callbacks complete in.
+ final _onReleaseCompleters = Queue<Completer<PoolResource>>();
+
+ /// The maximum number of resources that may be allocated at once.
+ final int _maxAllocatedResources;
+
+ /// The number of resources that are currently allocated.
+ int _allocatedResources = 0;
+
+ /// The timeout timer.
+ ///
+ /// This timer is canceled as long as the pool is below the resource limit.
+ /// It's reset once the resource limit is reached and again every time an
+ /// resource is released or a new resource is requested. If it fires, that
+ /// indicates that the caller became deadlocked, likely due to files waiting
+ /// for additional files to be read before they could be closed.
+ ///
+ /// This is `null` if this pool shouldn't time out.
+ RestartableTimer? _timer;
+
+ /// The amount of time to wait before timing out the pending resources.
+ final Duration? _timeout;
+
+ /// A [FutureGroup] that tracks all the `onRelease` callbacks for resources
+ /// that have been marked releasable.
+ ///
+ /// This is `null` until [close] is called.
+ FutureGroup? _closeGroup;
+
+ /// Whether [close] has been called.
+ bool get isClosed => _closeMemo.hasRun;
+
+ /// A future that completes once the pool is closed and all its outstanding
+ /// resources have been released.
+ ///
+ /// If any [PoolResource.allowRelease] callback throws an exception after the
+ /// pool is closed, this completes with that exception.
+ Future get done => _closeMemo.future;
+
+ /// Creates a new pool with the given limit on how many resources may be
+ /// allocated at once.
+ ///
+ /// If [timeout] is passed, then if that much time passes without any activity
+ /// all pending [request] futures will throw a [TimeoutException]. This is
+ /// intended to avoid deadlocks.
+ Pool(this._maxAllocatedResources, {Duration? timeout}) : _timeout = timeout {
+ if (_maxAllocatedResources <= 0) {
+ throw ArgumentError.value(_maxAllocatedResources, 'maxAllocatedResources',
+ 'Must be greater than zero.');
+ }
+
+ if (timeout != null) {
+ // Start the timer canceled since we only want to start counting down once
+ // we've run out of available resources.
+ _timer = RestartableTimer(timeout, _onTimeout)..cancel();
+ }
+ }
+
+ /// Request a [PoolResource].
+ ///
+ /// If the maximum number of resources is already allocated, this will delay
+ /// until one of them is released.
+ Future<PoolResource> request() {
+ if (isClosed) {
+ throw StateError('request() may not be called on a closed Pool.');
+ }
+
+ if (_allocatedResources < _maxAllocatedResources) {
+ _allocatedResources++;
+ return Future.value(PoolResource._(this));
+ } else if (_onReleaseCallbacks.isNotEmpty) {
+ return _runOnRelease(_onReleaseCallbacks.removeFirst());
+ } else {
+ var completer = Completer<PoolResource>();
+ _requestedResources.add(completer);
+ _resetTimer();
+ return completer.future;
+ }
+ }
+
+ /// Requests a resource for the duration of [callback], which may return a
+ /// Future.
+ ///
+ /// The return value of [callback] is piped to the returned Future.
+ Future<T> withResource<T>(FutureOr<T> Function() callback) async {
+ if (isClosed) {
+ throw StateError('withResource() may not be called on a closed Pool.');
+ }
+
+ var resource = await request();
+ try {
+ return await callback();
+ } finally {
+ resource.release();
+ }
+ }
+
+ /// Returns a [Stream] containing the result of [action] applied to each
+ /// element of [elements].
+ ///
+ /// While [action] is invoked on each element of [elements] in order,
+ /// it's possible the return [Stream] may have items out-of-order – especially
+ /// if the completion time of [action] varies.
+ ///
+ /// If [action] throws an error the source item along with the error object
+ /// and [StackTrace] are passed to [onError], if it is provided. If [onError]
+ /// returns `true`, the error is added to the returned [Stream], otherwise
+ /// it is ignored.
+ ///
+ /// Errors thrown from iterating [elements] will not be passed to
+ /// [onError]. They will always be added to the returned stream as an error.
+ ///
+ /// Note: all of the resources of the this [Pool] will be used when the
+ /// returned [Stream] is listened to until it is completed or canceled.
+ ///
+ /// Note: if this [Pool] is closed before the returned [Stream] is listened
+ /// to, a [StateError] is thrown.
+ Stream<T> forEach<S, T>(
+ Iterable<S> elements, FutureOr<T> Function(S source) action,
+ {bool Function(S item, Object error, StackTrace stack)? onError}) {
+ onError ??= (item, e, s) => true;
+
+ var cancelPending = false;
+
+ Completer? resumeCompleter;
+ late StreamController<T> controller;
+
+ late Iterator<S> iterator;
+
+ Future<void> run(int _) async {
+ while (iterator.moveNext()) {
+ // caching `current` is necessary because there are async breaks
+ // in this code and `iterator` is shared across many workers
+ final current = iterator.current;
+
+ _resetTimer();
+
+ if (resumeCompleter != null) {
+ await resumeCompleter!.future;
+ }
+
+ if (cancelPending) {
+ break;
+ }
+
+ T value;
+ try {
+ value = await action(current);
+ } catch (e, stack) {
+ if (onError!(current, e, stack)) {
+ controller.addError(e, stack);
+ }
+ continue;
+ }
+ controller.add(value);
+ }
+ }
+
+ Future<void>? doneFuture;
+
+ void onListen() {
+ iterator = elements.iterator;
+
+ assert(doneFuture == null);
+ var futures = Iterable<Future<void>>.generate(
+ _maxAllocatedResources, (i) => withResource(() => run(i)));
+ doneFuture = Future.wait(futures, eagerError: true)
+ .then<void>((_) {})
+ .catchError(controller.addError);
+
+ doneFuture!.whenComplete(controller.close);
+ }
+
+ controller = StreamController<T>(
+ sync: true,
+ onListen: onListen,
+ onCancel: () async {
+ assert(!cancelPending);
+ cancelPending = true;
+ await doneFuture;
+ },
+ onPause: () {
+ assert(resumeCompleter == null);
+ resumeCompleter = Completer<void>();
+ },
+ onResume: () {
+ assert(resumeCompleter != null);
+ resumeCompleter!.complete();
+ resumeCompleter = null;
+ },
+ );
+
+ return controller.stream;
+ }
+
+ /// Closes the pool so that no more resources are requested.
+ ///
+ /// Existing resource requests remain unchanged.
+ ///
+ /// Any resources that are marked as releasable using
+ /// [PoolResource.allowRelease] are released immediately. Once all resources
+ /// have been released and any `onRelease` callbacks have completed, the
+ /// returned future completes successfully. If any `onRelease` callback throws
+ /// an error, the returned future completes with that error.
+ ///
+ /// This may be called more than once; it returns the same [Future] each time.
+ Future close() => _closeMemo.runOnce(_close);
+
+ Future<void> _close() {
+ if (_closeGroup != null) return _closeGroup!.future;
+
+ _resetTimer();
+
+ _closeGroup = FutureGroup();
+ for (var callback in _onReleaseCallbacks) {
+ _closeGroup!.add(Future.sync(callback));
+ }
+
+ _allocatedResources -= _onReleaseCallbacks.length;
+ _onReleaseCallbacks.clear();
+
+ if (_allocatedResources == 0) _closeGroup!.close();
+ return _closeGroup!.future;
+ }
+
+ final _closeMemo = AsyncMemoizer<void>();
+
+ /// If there are any pending requests, this will fire the oldest one.
+ void _onResourceReleased() {
+ _resetTimer();
+
+ if (_requestedResources.isNotEmpty) {
+ var pending = _requestedResources.removeFirst();
+ pending.complete(PoolResource._(this));
+ } else {
+ _allocatedResources--;
+ if (isClosed && _allocatedResources == 0) _closeGroup!.close();
+ }
+ }
+
+ /// If there are any pending requests, this will fire the oldest one after
+ /// running [onRelease].
+ void _onResourceReleaseAllowed(void Function() onRelease) {
+ _resetTimer();
+
+ if (_requestedResources.isNotEmpty) {
+ var pending = _requestedResources.removeFirst();
+ pending.complete(_runOnRelease(onRelease));
+ } else if (isClosed) {
+ _closeGroup!.add(Future.sync(onRelease));
+ _allocatedResources--;
+ if (_allocatedResources == 0) _closeGroup!.close();
+ } else {
+ var zone = Zone.current;
+ var registered = zone.registerCallback(onRelease);
+ _onReleaseCallbacks.add(() => zone.run(registered));
+ }
+ }
+
+ /// Runs [onRelease] and returns a Future that completes to a resource once an
+ /// [onRelease] callback completes.
+ ///
+ /// Futures returned by [_runOnRelease] always complete in the order they were
+ /// created, even if earlier [onRelease] callbacks take longer to run.
+ Future<PoolResource> _runOnRelease(void Function() onRelease) {
+ Future.sync(onRelease).then((value) {
+ _onReleaseCompleters.removeFirst().complete(PoolResource._(this));
+ }).catchError((Object error, StackTrace stackTrace) {
+ _onReleaseCompleters.removeFirst().completeError(error, stackTrace);
+ });
+
+ var completer = Completer<PoolResource>.sync();
+ _onReleaseCompleters.add(completer);
+ return completer.future;
+ }
+
+ /// A resource has been requested, allocated, or released.
+ void _resetTimer() {
+ if (_timer == null) return;
+
+ if (_requestedResources.isEmpty) {
+ _timer!.cancel();
+ } else {
+ _timer!.reset();
+ }
+ }
+
+ /// Handles [_timer] timing out by causing all pending resource completers to
+ /// emit exceptions.
+ void _onTimeout() {
+ for (var completer in _requestedResources) {
+ completer.completeError(
+ TimeoutException(
+ 'Pool deadlock: all resources have been '
+ 'allocated for too long.',
+ _timeout),
+ Chain.current());
+ }
+ _requestedResources.clear();
+ _timer = null;
+ }
+}
+
+/// A member of a [Pool].
+///
+/// A [PoolResource] is a token that indicates that a resource is allocated.
+/// When the associated resource is released, the user should call [release].
+class PoolResource {
+ final Pool _pool;
+
+ /// Whether `this` has been released yet.
+ bool _released = false;
+
+ PoolResource._(this._pool);
+
+ /// Tells the parent [Pool] that the resource associated with this resource is
+ /// no longer allocated, and that a new [PoolResource] may be allocated.
+ void release() {
+ if (_released) {
+ throw StateError('A PoolResource may only be released once.');
+ }
+ _released = true;
+ _pool._onResourceReleased();
+ }
+
+ /// Tells the parent [Pool] that the resource associated with this resource is
+ /// no longer necessary, but should remain allocated until more resources are
+ /// needed.
+ ///
+ /// When [Pool.request] is called and there are no remaining available
+ /// resources, the [onRelease] callback is called. It should free the
+ /// resource, and it may return a Future or `null`. Once that completes, the
+ /// [Pool.request] call will complete to a new [PoolResource].
+ ///
+ /// This is useful when a resource's main function is complete, but it may
+ /// produce additional information later on. For example, an isolate's task
+ /// may be complete, but it could still emit asynchronous errors.
+ void allowRelease(FutureOr<void> Function() onRelease) {
+ if (_released) {
+ throw StateError('A PoolResource may only be released once.');
+ }
+ _released = true;
+ _pool._onResourceReleaseAllowed(onRelease);
+ }
+}
diff --git a/pkgs/pool/pubspec.yaml b/pkgs/pool/pubspec.yaml
new file mode 100644
index 0000000..a205b74
--- /dev/null
+++ b/pkgs/pool/pubspec.yaml
@@ -0,0 +1,18 @@
+name: pool
+version: 1.5.2-wip
+description: >-
+ Manage a finite pool of resources.
+ Useful for controlling concurrent file system or network requests.
+repository: https://github.com/dart-lang/tools/tree/main/pkgs/pool
+
+environment:
+ sdk: ^3.4.0
+
+dependencies:
+ async: ^2.5.0
+ stack_trace: ^1.10.0
+
+dev_dependencies:
+ dart_flutter_team_lints: ^3.0.0
+ fake_async: ^1.2.0
+ test: ^1.16.6
diff --git a/pkgs/pool/test/pool_test.dart b/pkgs/pool/test/pool_test.dart
new file mode 100644
index 0000000..6334a8a
--- /dev/null
+++ b/pkgs/pool/test/pool_test.dart
@@ -0,0 +1,745 @@
+// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:fake_async/fake_async.dart';
+import 'package:pool/pool.dart';
+import 'package:stack_trace/stack_trace.dart';
+import 'package:test/test.dart';
+
+void main() {
+ group('request()', () {
+ test('resources can be requested freely up to the limit', () {
+ var pool = Pool(50);
+ for (var i = 0; i < 50; i++) {
+ expect(pool.request(), completes);
+ }
+ });
+
+ test('resources block past the limit', () {
+ FakeAsync().run((async) {
+ var pool = Pool(50);
+ for (var i = 0; i < 50; i++) {
+ expect(pool.request(), completes);
+ }
+ expect(pool.request(), doesNotComplete);
+
+ async.elapse(const Duration(seconds: 1));
+ });
+ });
+
+ test('a blocked resource is allocated when another is released', () {
+ FakeAsync().run((async) {
+ var pool = Pool(50);
+ for (var i = 0; i < 49; i++) {
+ expect(pool.request(), completes);
+ }
+
+ pool.request().then((lastAllocatedResource) {
+ // This will only complete once [lastAllocatedResource] is released.
+ expect(pool.request(), completes);
+
+ Future<void>.delayed(const Duration(microseconds: 1)).then((_) {
+ lastAllocatedResource.release();
+ });
+ });
+
+ async.elapse(const Duration(seconds: 1));
+ });
+ });
+ });
+
+ group('withResource()', () {
+ test('can be called freely up to the limit', () {
+ var pool = Pool(50);
+ for (var i = 0; i < 50; i++) {
+ pool.withResource(expectAsync0(() => Completer<void>().future));
+ }
+ });
+
+ test('blocks the callback past the limit', () {
+ FakeAsync().run((async) {
+ var pool = Pool(50);
+ for (var i = 0; i < 50; i++) {
+ pool.withResource(expectAsync0(() => Completer<void>().future));
+ }
+ pool.withResource(expectNoAsync());
+
+ async.elapse(const Duration(seconds: 1));
+ });
+ });
+
+ test('a blocked resource is allocated when another is released', () {
+ FakeAsync().run((async) {
+ var pool = Pool(50);
+ for (var i = 0; i < 49; i++) {
+ pool.withResource(expectAsync0(() => Completer<void>().future));
+ }
+
+ var completer = Completer<void>();
+ pool.withResource(() => completer.future);
+ var blockedResourceAllocated = false;
+ pool.withResource(() {
+ blockedResourceAllocated = true;
+ });
+
+ Future<void>.delayed(const Duration(microseconds: 1)).then((_) {
+ expect(blockedResourceAllocated, isFalse);
+ completer.complete();
+ return Future<void>.delayed(const Duration(microseconds: 1));
+ }).then((_) {
+ expect(blockedResourceAllocated, isTrue);
+ });
+
+ async.elapse(const Duration(seconds: 1));
+ });
+ });
+
+ // Regression test for #3.
+ test('can be called immediately before close()', () async {
+ var pool = Pool(1);
+ unawaited(pool.withResource(expectAsync0(() {})));
+ await pool.close();
+ });
+ });
+
+ group('with a timeout', () {
+ test("doesn't time out if there are no pending requests", () {
+ FakeAsync().run((async) {
+ var pool = Pool(50, timeout: const Duration(seconds: 5));
+ for (var i = 0; i < 50; i++) {
+ expect(pool.request(), completes);
+ }
+
+ async.elapse(const Duration(seconds: 6));
+ });
+ });
+
+ test('resets the timer if a resource is returned', () {
+ FakeAsync().run((async) {
+ var pool = Pool(50, timeout: const Duration(seconds: 5));
+ for (var i = 0; i < 49; i++) {
+ expect(pool.request(), completes);
+ }
+
+ pool.request().then((lastAllocatedResource) {
+ // This will only complete once [lastAllocatedResource] is released.
+ expect(pool.request(), completes);
+
+ Future<void>.delayed(const Duration(seconds: 3)).then((_) {
+ lastAllocatedResource.release();
+ expect(pool.request(), doesNotComplete);
+ });
+ });
+
+ async.elapse(const Duration(seconds: 6));
+ });
+ });
+
+ test('resets the timer if a resource is requested', () {
+ FakeAsync().run((async) {
+ var pool = Pool(50, timeout: const Duration(seconds: 5));
+ for (var i = 0; i < 50; i++) {
+ expect(pool.request(), completes);
+ }
+ expect(pool.request(), doesNotComplete);
+
+ Future<void>.delayed(const Duration(seconds: 3)).then((_) {
+ expect(pool.request(), doesNotComplete);
+ });
+
+ async.elapse(const Duration(seconds: 6));
+ });
+ });
+
+ test('times out if nothing happens', () {
+ FakeAsync().run((async) {
+ var pool = Pool(50, timeout: const Duration(seconds: 5));
+ for (var i = 0; i < 50; i++) {
+ expect(pool.request(), completes);
+ }
+ expect(pool.request(), throwsA(const TypeMatcher<TimeoutException>()));
+
+ async.elapse(const Duration(seconds: 6));
+ });
+ });
+ });
+
+ group('allowRelease()', () {
+ test('runs the callback once the resource limit is exceeded', () async {
+ var pool = Pool(50);
+ for (var i = 0; i < 49; i++) {
+ expect(pool.request(), completes);
+ }
+
+ var resource = await pool.request();
+ var onReleaseCalled = false;
+ resource.allowRelease(() => onReleaseCalled = true);
+ await Future<void>.delayed(Duration.zero);
+ expect(onReleaseCalled, isFalse);
+
+ expect(pool.request(), completes);
+ await Future<void>.delayed(Duration.zero);
+ expect(onReleaseCalled, isTrue);
+ });
+
+ test('runs the callback immediately if there are blocked requests',
+ () async {
+ var pool = Pool(1);
+ var resource = await pool.request();
+
+ // This will be blocked until [resource.allowRelease] is called.
+ expect(pool.request(), completes);
+
+ var onReleaseCalled = false;
+ resource.allowRelease(() => onReleaseCalled = true);
+ await Future<void>.delayed(Duration.zero);
+ expect(onReleaseCalled, isTrue);
+ });
+
+ test('blocks the request until the callback completes', () async {
+ var pool = Pool(1);
+ var resource = await pool.request();
+
+ var requestComplete = false;
+ unawaited(pool.request().then((_) => requestComplete = true));
+
+ var completer = Completer<void>();
+ resource.allowRelease(() => completer.future);
+ await Future<void>.delayed(Duration.zero);
+ expect(requestComplete, isFalse);
+
+ completer.complete();
+ await Future<void>.delayed(Duration.zero);
+ expect(requestComplete, isTrue);
+ });
+
+ test('completes requests in request order regardless of callback order',
+ () async {
+ var pool = Pool(2);
+ var resource1 = await pool.request();
+ var resource2 = await pool.request();
+
+ var request1Complete = false;
+ unawaited(pool.request().then((_) => request1Complete = true));
+ var request2Complete = false;
+ unawaited(pool.request().then((_) => request2Complete = true));
+
+ var onRelease1Called = false;
+ var completer1 = Completer<void>();
+ resource1.allowRelease(() {
+ onRelease1Called = true;
+ return completer1.future;
+ });
+ await Future<void>.delayed(Duration.zero);
+ expect(onRelease1Called, isTrue);
+
+ var onRelease2Called = false;
+ var completer2 = Completer<void>();
+ resource2.allowRelease(() {
+ onRelease2Called = true;
+ return completer2.future;
+ });
+ await Future<void>.delayed(Duration.zero);
+ expect(onRelease2Called, isTrue);
+ expect(request1Complete, isFalse);
+ expect(request2Complete, isFalse);
+
+ // Complete the second resource's onRelease callback first. Even though it
+ // was triggered by the second blocking request, it should complete the
+ // first one to preserve ordering.
+ completer2.complete();
+ await Future<void>.delayed(Duration.zero);
+ expect(request1Complete, isTrue);
+ expect(request2Complete, isFalse);
+
+ completer1.complete();
+ await Future<void>.delayed(Duration.zero);
+ expect(request1Complete, isTrue);
+ expect(request2Complete, isTrue);
+ });
+
+ test('runs onRequest in the zone it was created', () async {
+ var pool = Pool(1);
+ var resource = await pool.request();
+
+ var outerZone = Zone.current;
+ runZoned(() {
+ var innerZone = Zone.current;
+ expect(innerZone, isNot(equals(outerZone)));
+
+ resource.allowRelease(expectAsync0(() {
+ expect(Zone.current, equals(innerZone));
+ }));
+ });
+
+ await pool.request();
+ });
+ });
+
+ test("done doesn't complete without close", () async {
+ var pool = Pool(1);
+ unawaited(pool.done.then(expectAsync1((_) {}, count: 0)));
+
+ var resource = await pool.request();
+ resource.release();
+
+ await Future<void>.delayed(Duration.zero);
+ });
+
+ group('close()', () {
+ test('disallows request() and withResource()', () {
+ var pool = Pool(1)..close();
+ expect(pool.request, throwsStateError);
+ expect(() => pool.withResource(() {}), throwsStateError);
+ });
+
+ test('pending requests are fulfilled', () async {
+ var pool = Pool(1);
+ var resource1 = await pool.request();
+ expect(
+ pool.request().then((resource2) {
+ resource2.release();
+ }),
+ completes);
+ expect(pool.done, completes);
+ expect(pool.close(), completes);
+ resource1.release();
+ });
+
+ test('pending requests are fulfilled with allowRelease', () async {
+ var pool = Pool(1);
+ var resource1 = await pool.request();
+
+ var completer = Completer<void>();
+ expect(
+ pool.request().then((resource2) {
+ expect(completer.isCompleted, isTrue);
+ resource2.release();
+ }),
+ completes);
+ expect(pool.close(), completes);
+
+ resource1.allowRelease(() => completer.future);
+ await Future<void>.delayed(Duration.zero);
+
+ completer.complete();
+ });
+
+ test("doesn't complete until all resources are released", () async {
+ var pool = Pool(2);
+ var resource1 = await pool.request();
+ var resource2 = await pool.request();
+ var resource3Future = pool.request();
+
+ var resource1Released = false;
+ var resource2Released = false;
+ var resource3Released = false;
+ expect(
+ pool.close().then((_) {
+ expect(resource1Released, isTrue);
+ expect(resource2Released, isTrue);
+ expect(resource3Released, isTrue);
+ }),
+ completes);
+
+ resource1Released = true;
+ resource1.release();
+ await Future<void>.delayed(Duration.zero);
+
+ resource2Released = true;
+ resource2.release();
+ await Future<void>.delayed(Duration.zero);
+
+ var resource3 = await resource3Future;
+ resource3Released = true;
+ resource3.release();
+ });
+
+ test('active onReleases complete as usual', () async {
+ var pool = Pool(1);
+ var resource = await pool.request();
+
+ // Set up an onRelease callback whose completion is controlled by
+ // [completer].
+ var completer = Completer<void>();
+ resource.allowRelease(() => completer.future);
+ expect(
+ pool.request().then((_) {
+ expect(completer.isCompleted, isTrue);
+ }),
+ completes);
+
+ await Future<void>.delayed(Duration.zero);
+ unawaited(pool.close());
+
+ await Future<void>.delayed(Duration.zero);
+ completer.complete();
+ });
+
+ test('inactive onReleases fire', () async {
+ var pool = Pool(2);
+ var resource1 = await pool.request();
+ var resource2 = await pool.request();
+
+ var completer1 = Completer<void>();
+ resource1.allowRelease(() => completer1.future);
+ var completer2 = Completer<void>();
+ resource2.allowRelease(() => completer2.future);
+
+ expect(
+ pool.close().then((_) {
+ expect(completer1.isCompleted, isTrue);
+ expect(completer2.isCompleted, isTrue);
+ }),
+ completes);
+
+ await Future<void>.delayed(Duration.zero);
+ completer1.complete();
+
+ await Future<void>.delayed(Duration.zero);
+ completer2.complete();
+ });
+
+ test('new allowReleases fire immediately', () async {
+ var pool = Pool(1);
+ var resource = await pool.request();
+
+ var completer = Completer<void>();
+ expect(
+ pool.close().then((_) {
+ expect(completer.isCompleted, isTrue);
+ }),
+ completes);
+
+ await Future<void>.delayed(Duration.zero);
+ resource.allowRelease(() => completer.future);
+
+ await Future<void>.delayed(Duration.zero);
+ completer.complete();
+ });
+
+ test('an onRelease error is piped to the return value', () async {
+ var pool = Pool(1);
+ var resource = await pool.request();
+
+ var completer = Completer<void>();
+ resource.allowRelease(() => completer.future);
+
+ expect(pool.done, throwsA('oh no!'));
+ expect(pool.close(), throwsA('oh no!'));
+
+ await Future<void>.delayed(Duration.zero);
+ completer.completeError('oh no!');
+ });
+ });
+
+ group('forEach', () {
+ late Pool pool;
+
+ tearDown(() async {
+ await pool.close();
+ });
+
+ const delayedToStringDuration = Duration(milliseconds: 10);
+
+ Future<String> delayedToString(int i) =>
+ Future<String>.delayed(delayedToStringDuration, () => i.toString());
+
+ for (var itemCount in [0, 5]) {
+ for (var poolSize in [1, 5, 6]) {
+ test('poolSize: $poolSize, itemCount: $itemCount', () async {
+ pool = Pool(poolSize);
+
+ var finishedItems = 0;
+
+ await for (var item in pool.forEach(
+ Iterable.generate(itemCount, (i) {
+ expect(i, lessThanOrEqualTo(finishedItems + poolSize),
+ reason: 'the iterator should be called lazily');
+ return i;
+ }),
+ delayedToString)) {
+ expect(int.parse(item), lessThan(itemCount));
+ finishedItems++;
+ }
+
+ expect(finishedItems, itemCount);
+ });
+ }
+ }
+
+ test('pool closed before listen', () async {
+ pool = Pool(2);
+
+ var stream = pool.forEach(Iterable<int>.generate(5), delayedToString);
+
+ await pool.close();
+
+ expect(stream.toList(), throwsStateError);
+ });
+
+ test('completes even if the pool is partially used', () async {
+ pool = Pool(2);
+
+ var resource = await pool.request();
+
+ var stream = pool.forEach(<int>[], delayedToString);
+
+ expect(await stream.length, 0);
+
+ resource.release();
+ });
+
+ test('stream paused longer than timeout', () async {
+ pool = Pool(2, timeout: delayedToStringDuration);
+
+ var resource = await pool.request();
+
+ var stream = pool.forEach<int, int>(
+ Iterable.generate(100, (i) {
+ expect(i, lessThan(20),
+ reason: 'The timeout should happen '
+ 'before the entire iterable is iterated.');
+ return i;
+ }), (i) async {
+ await Future<void>.delayed(Duration(milliseconds: i));
+ return i;
+ });
+
+ await expectLater(
+ stream.toList,
+ throwsA(const TypeMatcher<TimeoutException>().having(
+ (te) => te.message,
+ 'message',
+ contains('Pool deadlock: '
+ 'all resources have been allocated for too long.'))));
+
+ resource.release();
+ });
+
+ group('timing and timeout', () {
+ for (var poolSize in [2, 8, 64]) {
+ for (var otherTaskCount
+ in [0, 1, 7, 63].where((otc) => otc < poolSize)) {
+ test('poolSize: $poolSize, otherTaskCount: $otherTaskCount',
+ () async {
+ final itemCount = 128;
+ pool = Pool(poolSize, timeout: const Duration(milliseconds: 20));
+
+ var otherTasks = await Future.wait(
+ Iterable<int>.generate(otherTaskCount)
+ .map((i) => pool.request()));
+
+ try {
+ var finishedItems = 0;
+
+ var watch = Stopwatch()..start();
+
+ await for (var item in pool.forEach(
+ Iterable.generate(itemCount, (i) {
+ expect(i, lessThanOrEqualTo(finishedItems + poolSize),
+ reason: 'the iterator should be called lazily');
+ return i;
+ }),
+ delayedToString)) {
+ expect(int.parse(item), lessThan(itemCount));
+ finishedItems++;
+ }
+
+ expect(finishedItems, itemCount);
+
+ final expectedElapsed =
+ delayedToStringDuration.inMicroseconds * 4;
+
+ expect((watch.elapsed ~/ itemCount).inMicroseconds,
+ lessThan(expectedElapsed / (poolSize - otherTaskCount)),
+ reason: 'Average time per task should be '
+ 'proportionate to the available pool resources.');
+ } finally {
+ for (var task in otherTasks) {
+ task.release();
+ }
+ }
+ });
+ }
+ }
+ }, testOn: 'vm');
+
+ test('partial iteration', () async {
+ pool = Pool(5);
+ var stream = pool.forEach(Iterable<int>.generate(100), delayedToString);
+ expect(await stream.take(10).toList(), hasLength(10));
+ });
+
+ test('pool close during data with waiting to be done', () async {
+ pool = Pool(5);
+
+ var stream = pool.forEach(Iterable<int>.generate(100), delayedToString);
+
+ var dataCount = 0;
+ var subscription = stream.listen((data) {
+ dataCount++;
+ pool.close();
+ });
+
+ await subscription.asFuture<void>();
+ expect(dataCount, 100);
+ await subscription.cancel();
+ });
+
+ test('pause and resume ', () async {
+ var generatedCount = 0;
+ var dataCount = 0;
+ final poolSize = 5;
+
+ pool = Pool(poolSize);
+
+ var stream = pool.forEach(
+ Iterable<int>.generate(40, (i) {
+ expect(generatedCount, lessThanOrEqualTo(dataCount + 2 * poolSize),
+ reason: 'The iterator should not be called '
+ 'much faster than the data is consumed.');
+ generatedCount++;
+ return i;
+ }),
+ delayedToString);
+
+ // ignore: cancel_subscriptions
+ late StreamSubscription subscription;
+
+ subscription = stream.listen(
+ (data) {
+ dataCount++;
+
+ if (int.parse(data) % 3 == 1) {
+ subscription.pause(Future(() async {
+ await Future<void>.delayed(const Duration(milliseconds: 100));
+ }));
+ }
+ },
+ onError: registerException,
+ onDone: expectAsync0(() {
+ expect(dataCount, 40);
+ }),
+ );
+ });
+
+ group('cancel', () {
+ final dataSize = 32;
+ for (var i = 1; i < 5; i++) {
+ test('with pool size $i', () async {
+ pool = Pool(i);
+
+ var stream =
+ pool.forEach(Iterable<int>.generate(dataSize), delayedToString);
+
+ var cancelCompleter = Completer<void>();
+
+ StreamSubscription subscription;
+
+ var eventCount = 0;
+ subscription = stream.listen((data) {
+ eventCount++;
+ if (int.parse(data) == dataSize ~/ 2) {
+ cancelCompleter.complete();
+ }
+ }, onError: registerException);
+
+ await cancelCompleter.future;
+
+ await subscription.cancel();
+
+ expect(eventCount, 1 + dataSize ~/ 2);
+ });
+ }
+ });
+
+ group('errors', () {
+ Future<void> errorInIterator({
+ bool Function(int item, Object error, StackTrace stack)? onError,
+ }) async {
+ pool = Pool(20);
+
+ var listFuture = pool
+ .forEach(
+ Iterable.generate(100, (i) {
+ if (i == 50) {
+ throw StateError('error while generating item in iterator');
+ }
+
+ return i;
+ }),
+ delayedToString,
+ onError: onError)
+ .toList();
+
+ await expectLater(() async => listFuture, throwsStateError);
+ }
+
+ test('iteration, no onError', () async {
+ await errorInIterator();
+ });
+ test('iteration, with onError', () async {
+ await errorInIterator(onError: (i, e, s) => false);
+ });
+
+ test('error in action, no onError', () async {
+ pool = Pool(20);
+
+ var listFuture = pool.forEach(Iterable<int>.generate(100), (i) async {
+ await Future<void>.delayed(const Duration(milliseconds: 10));
+ if (i == 10) {
+ throw UnsupportedError('10 is not supported');
+ }
+ return i.toString();
+ }).toList();
+
+ await expectLater(() async => listFuture, throwsUnsupportedError);
+ });
+
+ test('error in action, no onError', () async {
+ pool = Pool(20);
+
+ var list = await pool.forEach(Iterable<int>.generate(100),
+ (int i) async {
+ await Future<void>.delayed(const Duration(milliseconds: 10));
+ if (i % 10 == 0) {
+ throw UnsupportedError('Multiples of 10 not supported');
+ }
+ return i.toString();
+ },
+ onError: (item, error, stack) =>
+ error is! UnsupportedError).toList();
+
+ expect(list, hasLength(90));
+ });
+ });
+ });
+
+ test('throw error when pool limit <= 0', () {
+ expect(() => Pool(-1), throwsArgumentError);
+ expect(() => Pool(0), throwsArgumentError);
+ });
+}
+
+/// Returns a function that will cause the test to fail if it's called.
+///
+/// This should only be called within a [FakeAsync.run] zone.
+void Function() expectNoAsync() {
+ var stack = Trace.current(1);
+ return () => registerException(
+ TestFailure('Expected function not to be called.'), stack);
+}
+
+/// A matcher for Futures that asserts that they don't complete.
+///
+/// This should only be called within a [FakeAsync.run] zone.
+Matcher get doesNotComplete => predicate((Future future) {
+ var stack = Trace.current(1);
+ future.then((_) => registerException(
+ TestFailure('Expected future not to complete.'), stack));
+ return true;
+ });