Merge pull request #1259 from dart-lang/merge-pool-package

Merge `package:pool`
diff --git a/.github/ISSUE_TEMPLATE/pool.md b/.github/ISSUE_TEMPLATE/pool.md
new file mode 100644
index 0000000..7af32c4
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/pool.md
@@ -0,0 +1,5 @@
+---
+name: "package:pool"
+about: "Create a bug or file a feature request against package:pool."
+labels: "package:pool"
+---
\ No newline at end of file
diff --git a/.github/labeler.yml b/.github/labeler.yml
index 39d78aa..c4d658f 100644
--- a/.github/labeler.yml
+++ b/.github/labeler.yml
@@ -84,6 +84,10 @@
   - changed-files:
     - any-glob-to-any-file: 'pkgs/package_config/**'
 
+'package:pool':
+  - changed-files:
+    - any-glob-to-any-file: 'pkgs/pool/**'
+
 'package:source_map_stack_trace':
   - changed-files:
     - any-glob-to-any-file: 'pkgs/source_map_stack_trace/**'
diff --git a/.github/workflows/pool.yaml b/.github/workflows/pool.yaml
new file mode 100644
index 0000000..6d64062
--- /dev/null
+++ b/.github/workflows/pool.yaml
@@ -0,0 +1,78 @@
+name: package:pool
+
+on:
+  # Run on PRs and pushes to the default branch.
+  push:
+    branches: [ main ]
+    paths:
+      - '.github/workflows/pool.yaml'
+      - 'pkgs/pool/**'
+  pull_request:
+    branches: [ main ]
+    paths:
+      - '.github/workflows/pool.yaml'
+      - 'pkgs/pool/**'
+  schedule:
+    - cron: "0 0 * * 0"
+
+env:
+  PUB_ENVIRONMENT: bot.github
+
+
+defaults:
+  run:
+    working-directory: pkgs/pool/
+
+jobs:
+  # Check code formatting and static analysis on a single OS (linux)
+  # against Dart dev.
+  analyze:
+    runs-on: ubuntu-latest
+    strategy:
+      fail-fast: false
+      matrix:
+        sdk: [dev]
+    steps:
+      - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
+      - uses: dart-lang/setup-dart@e630b99d28a3b71860378cafdc2a067c71107f94
+        with:
+          sdk: ${{ matrix.sdk }}
+      - id: install
+        name: Install dependencies
+        run: dart pub get
+      - name: Check formatting
+        run: dart format --output=none --set-exit-if-changed .
+        if: always() && steps.install.outcome == 'success'
+      - name: Analyze code
+        run: dart analyze --fatal-infos
+        if: always() && steps.install.outcome == 'success'
+
+  # Run tests on a matrix consisting of two dimensions:
+  # 1. OS: ubuntu-latest, (macos-latest, windows-latest)
+  # 2. release channel: dev
+  test:
+    needs: analyze
+    runs-on: ${{ matrix.os }}
+    strategy:
+      fail-fast: false
+      matrix:
+        # Add macos-latest and/or windows-latest if relevant for this package.
+        os: [ubuntu-latest]
+        sdk: [3.4, dev]
+    steps:
+      - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
+      - uses: dart-lang/setup-dart@e630b99d28a3b71860378cafdc2a067c71107f94
+        with:
+          sdk: ${{ matrix.sdk }}
+      - id: install
+        name: Install dependencies
+        run: dart pub get
+      - name: Run VM tests
+        run: dart test --platform vm
+        if: always() && steps.install.outcome == 'success'
+      - name: Run Chrome tests
+        run: dart test --platform chrome
+        if: always() && steps.install.outcome == 'success'
+      - name: Run Chrome tests - wasm
+        run: dart test --platform chrome -c dart2wasm
+        if: always() && steps.install.outcome == 'success' && matrix.sdk == 'dev'
diff --git a/README.md b/README.md
index 0acab2c..0b97b21 100644
--- a/README.md
+++ b/README.md
@@ -34,6 +34,7 @@
 | [mime](pkgs/mime/) | Utilities for handling media (MIME) types, including determining a type from a file extension and file contents. | [![package issues](https://img.shields.io/badge/package:mime-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Amime) | [![pub package](https://img.shields.io/pub/v/mime.svg)](https://pub.dev/packages/mime) |
 | [oauth2](pkgs/oauth2/) | A client library for authenticating with a remote service via OAuth2 on behalf of a user, and making authorized HTTP requests with the user's OAuth2 credentials. | [![package issues](https://img.shields.io/badge/package:oauth2-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aoauth2) | [![pub package](https://img.shields.io/pub/v/oauth2.svg)](https://pub.dev/packages/oauth2) |
 | [package_config](pkgs/package_config/) | Support for reading and writing Dart Package Configuration files. | [![package issues](https://img.shields.io/badge/package:package_config-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Apackage_config) | [![pub package](https://img.shields.io/pub/v/package_config.svg)](https://pub.dev/packages/package_config) |
+| [pool](pkgs/pool/) | Manage a finite pool of resources. Useful for controlling concurrent file system or network requests. | [![package issues](https://img.shields.io/badge/package:pool-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Apool) | [![pub package](https://img.shields.io/pub/v/pool.svg)](https://pub.dev/packages/pool) |
 | [source_map_stack_trace](pkgs/source_map_stack_trace/) | A package for applying source maps to stack traces. | [![package issues](https://img.shields.io/badge/package:source_map_stack_trace-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Asource_map_stack_trace) | [![pub package](https://img.shields.io/pub/v/source_map_stack_trace.svg)](https://pub.dev/packages/source_map_stack_trace) |
 | [unified_analytics](pkgs/unified_analytics/) | A package for logging analytics for all Dart and Flutter related tooling to Google Analytics. | [![package issues](https://img.shields.io/badge/package:unified_analytics-4774bc)](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aunified_analytics) | [![pub package](https://img.shields.io/pub/v/unified_analytics.svg)](https://pub.dev/packages/unified_analytics) |
 
diff --git a/pkgs/pool/.gitignore b/pkgs/pool/.gitignore
new file mode 100644
index 0000000..e450c83
--- /dev/null
+++ b/pkgs/pool/.gitignore
@@ -0,0 +1,5 @@
+# Don’t commit the following directories created by pub.
+.dart_tool/
+.packages
+.pub/
+pubspec.lock
diff --git a/pkgs/pool/CHANGELOG.md b/pkgs/pool/CHANGELOG.md
new file mode 100644
index 0000000..56424fc
--- /dev/null
+++ b/pkgs/pool/CHANGELOG.md
@@ -0,0 +1,105 @@
+## 1.5.2-wip
+
+* Require Dart 3.4.
+* Move to `dart-lang/tools` monorepo.
+
+## 1.5.1
+
+* Populate the pubspec `repository` field.
+
+## 1.5.0
+
+* Stable release for null safety.
+
+## 1.5.0-nullsafety.3
+
+* Update SDK constraints to `>=2.12.0-0 <3.0.0` based on beta release
+  guidelines.
+
+## 1.5.0-nullsafety.2
+
+* Allow prerelease versions of the 2.12 sdk.
+
+## 1.5.0-nullsafety.1
+
+* Allow 2.10 stable and 2.11.0 dev SDK versions.
+
+## 1.5.0-nullsafety
+
+* Migrate to null safety.
+* `forEach`: Avoid `await null` if the `Stream` is not paused.
+  Improves trivial benchmark by 40%.
+
+## 1.4.0
+
+* Add `forEach` to `Pool` to support efficient async processing of an
+  `Iterable`.
+
+* Throw ArgumentError if poolSize <= 0
+
+## 1.3.6
+
+* Set max SDK version to `<3.0.0`, and adjust other dependencies.
+
+## 1.3.5
+
+- Updated SDK version to 2.0.0-dev.17.0
+
+## 1.3.4
+
+* Modify code to eliminate Future flattening.
+
+## 1.3.3
+
+* Declare support for `async` 2.0.0.
+
+## 1.3.2
+
+* Update to make the code work with strong-mode clean Zone API.
+
+* Required minimum SDK of 1.23.0.
+
+## 1.3.1
+
+* Fix the type annotation of `Pool.withResource()` to indicate that it takes
+  `() -> FutureOr<T>`.
+
+## 1.3.0
+
+* Add a `Pool.done` getter that returns the same future returned by
+  `Pool.close()`.
+
+## 1.2.4
+
+* Fix a strong-mode error.
+
+## 1.2.3
+
+* Fix a bug in which `Pool.withResource()` could throw a `StateError` when
+  called immediately before closing the pool.
+
+## 1.2.2
+
+* Fix strong mode warnings and add generic method annotations.
+
+## 1.2.1
+
+* Internal changes only.
+
+## 1.2.0
+
+* Add `Pool.close()`, which forbids new resource requests and releases all
+  releasable resources.
+
+## 1.1.0
+
+* Add `PoolResource.allowRelease()`, which allows a resource to indicate that it
+  can be released without forcing it to deallocate immediately.
+
+## 1.0.2
+
+* Fixed the homepage.
+
+## 1.0.1
+
+* A `TimeoutException` is now correctly thrown if the pool detects a deadlock.
diff --git a/pkgs/pool/LICENSE b/pkgs/pool/LICENSE
new file mode 100644
index 0000000..000cd7b
--- /dev/null
+++ b/pkgs/pool/LICENSE
@@ -0,0 +1,27 @@
+Copyright 2014, the Dart project authors. 
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+    * Redistributions of source code must retain the above copyright
+      notice, this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above
+      copyright notice, this list of conditions and the following
+      disclaimer in the documentation and/or other materials provided
+      with the distribution.
+    * Neither the name of Google LLC nor the names of its
+      contributors may be used to endorse or promote products derived
+      from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/pkgs/pool/README.md b/pkgs/pool/README.md
new file mode 100644
index 0000000..461e872
--- /dev/null
+++ b/pkgs/pool/README.md
@@ -0,0 +1,57 @@
+[![Build Status](https://github.com/dart-lang/tools/actions/workflows/pool.yaml/badge.svg)](https://github.com/dart-lang/tools/actions/workflows/pool.yaml)
+[![pub package](https://img.shields.io/pub/v/pool.svg)](https://pub.dev/packages/pool)
+[![package publisher](https://img.shields.io/pub/publisher/pool.svg)](https://pub.dev/packages/pool/publisher)
+
+The pool package exposes a `Pool` class which makes it easy to manage a limited
+pool of resources.
+
+The easiest way to use a pool is by calling `withResource`. This runs a callback
+and returns its result, but only once there aren't too many other callbacks
+currently running.
+
+```dart
+// Create a Pool that will only allocate 10 resources at once. After 30 seconds
+// of inactivity with all resources checked out, the pool will throw an error.
+final pool = new Pool(10, timeout: new Duration(seconds: 30));
+
+Future<String> readFile(String path) {
+  // Since the call to [File.readAsString] is within [withResource], no more
+  // than ten files will be open at once.
+  return pool.withResource(() => new File(path).readAsString());
+}
+```
+
+For more fine-grained control, the user can also explicitly request generic
+`PoolResource` objects that can later be released back into the pool. This is
+what `withResource` does under the covers: requests a resource, then releases it
+once the callback completes.
+
+`Pool` ensures that only a limited number of resources are allocated at once.
+It's the caller's responsibility to ensure that the corresponding physical
+resource is only consumed when a `PoolResource` is allocated.
+
+```dart
+class PooledFile implements RandomAccessFile {
+  final RandomAccessFile _file;
+  final PoolResource _resource;
+
+  static Future<PooledFile> open(String path) {
+    return pool.request().then((resource) {
+      return new File(path).open().then((file) {
+        return new PooledFile._(file, resource);
+      });
+    });
+  }
+
+  PooledFile(this._file, this._resource);
+
+  // ...
+
+  Future<RandomAccessFile> close() {
+    return _file.close.then((_) {
+      _resource.release();
+      return this;
+    });
+  }
+}
+```
diff --git a/pkgs/pool/analysis_options.yaml b/pkgs/pool/analysis_options.yaml
new file mode 100644
index 0000000..44cda4d
--- /dev/null
+++ b/pkgs/pool/analysis_options.yaml
@@ -0,0 +1,5 @@
+include: package:dart_flutter_team_lints/analysis_options.yaml
+
+analyzer:
+  language:
+    strict-casts: true
diff --git a/pkgs/pool/benchmark/for_each_benchmark.dart b/pkgs/pool/benchmark/for_each_benchmark.dart
new file mode 100644
index 0000000..0cd2543
--- /dev/null
+++ b/pkgs/pool/benchmark/for_each_benchmark.dart
@@ -0,0 +1,55 @@
+// Copyright (c) 2024, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:pool/pool.dart';
+
+void main(List<String> args) async {
+  var poolSize = args.isEmpty ? 5 : int.parse(args.first);
+  print('Pool size: $poolSize');
+
+  final pool = Pool(poolSize);
+  final watch = Stopwatch()..start();
+  final start = DateTime.now();
+
+  DateTime? lastLog;
+  Duration? fastest;
+  late int fastestIteration;
+  var i = 1;
+
+  void log(bool force) {
+    var now = DateTime.now();
+    if (force ||
+        lastLog == null ||
+        now.difference(lastLog!) > const Duration(seconds: 1)) {
+      lastLog = now;
+      print([
+        now.difference(start),
+        i.toString().padLeft(10),
+        fastestIteration.toString().padLeft(7),
+        fastest!.inMicroseconds.toString().padLeft(9)
+      ].join('   '));
+    }
+  }
+
+  print(['Elapsed       ', 'Iterations', 'Fastest', 'Time (us)'].join('   '));
+
+  for (;; i++) {
+    watch.reset();
+
+    var sum = await pool
+        .forEach<int, int>(Iterable<int>.generate(100000), (i) => i)
+        .reduce((a, b) => a + b);
+
+    assert(sum == 4999950000, 'was $sum');
+
+    var elapsed = watch.elapsed;
+    if (fastest == null || fastest > elapsed) {
+      fastest = elapsed;
+      fastestIteration = i;
+      log(true);
+    } else {
+      log(false);
+    }
+  }
+}
diff --git a/pkgs/pool/lib/pool.dart b/pkgs/pool/lib/pool.dart
new file mode 100644
index 0000000..70e9df1
--- /dev/null
+++ b/pkgs/pool/lib/pool.dart
@@ -0,0 +1,380 @@
+// Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:collection';
+
+import 'package:async/async.dart';
+import 'package:stack_trace/stack_trace.dart';
+
+/// Manages an abstract pool of resources with a limit on how many may be in use
+/// at once.
+///
+/// When a resource is needed, the user should call [request]. When the returned
+/// future completes with a [PoolResource], the resource may be allocated. Once
+/// the resource has been released, the user should call [PoolResource.release].
+/// The pool will ensure that only a certain number of [PoolResource]s may be
+/// allocated at once.
+class Pool {
+  /// Completers for requests beyond the first [_maxAllocatedResources].
+  ///
+  /// When an item is released, the next element of [_requestedResources] will
+  /// be completed.
+  final _requestedResources = Queue<Completer<PoolResource>>();
+
+  /// Callbacks that must be called before additional resources can be
+  /// allocated.
+  ///
+  /// See [PoolResource.allowRelease].
+  final _onReleaseCallbacks = Queue<void Function()>();
+
+  /// Completers that will be completed once `onRelease` callbacks are done
+  /// running.
+  ///
+  /// These are kept in a queue to ensure that the earliest request completes
+  /// first regardless of what order the `onRelease` callbacks complete in.
+  final _onReleaseCompleters = Queue<Completer<PoolResource>>();
+
+  /// The maximum number of resources that may be allocated at once.
+  final int _maxAllocatedResources;
+
+  /// The number of resources that are currently allocated.
+  int _allocatedResources = 0;
+
+  /// The timeout timer.
+  ///
+  /// This timer is canceled as long as the pool is below the resource limit.
+  /// It's reset once the resource limit is reached and again every time an
+  /// resource is released or a new resource is requested. If it fires, that
+  /// indicates that the caller became deadlocked, likely due to files waiting
+  /// for additional files to be read before they could be closed.
+  ///
+  /// This is `null` if this pool shouldn't time out.
+  RestartableTimer? _timer;
+
+  /// The amount of time to wait before timing out the pending resources.
+  final Duration? _timeout;
+
+  /// A [FutureGroup] that tracks all the `onRelease` callbacks for resources
+  /// that have been marked releasable.
+  ///
+  /// This is `null` until [close] is called.
+  FutureGroup? _closeGroup;
+
+  /// Whether [close] has been called.
+  bool get isClosed => _closeMemo.hasRun;
+
+  /// A future that completes once the pool is closed and all its outstanding
+  /// resources have been released.
+  ///
+  /// If any [PoolResource.allowRelease] callback throws an exception after the
+  /// pool is closed, this completes with that exception.
+  Future get done => _closeMemo.future;
+
+  /// Creates a new pool with the given limit on how many resources may be
+  /// allocated at once.
+  ///
+  /// If [timeout] is passed, then if that much time passes without any activity
+  /// all pending [request] futures will throw a [TimeoutException]. This is
+  /// intended to avoid deadlocks.
+  Pool(this._maxAllocatedResources, {Duration? timeout}) : _timeout = timeout {
+    if (_maxAllocatedResources <= 0) {
+      throw ArgumentError.value(_maxAllocatedResources, 'maxAllocatedResources',
+          'Must be greater than zero.');
+    }
+
+    if (timeout != null) {
+      // Start the timer canceled since we only want to start counting down once
+      // we've run out of available resources.
+      _timer = RestartableTimer(timeout, _onTimeout)..cancel();
+    }
+  }
+
+  /// Request a [PoolResource].
+  ///
+  /// If the maximum number of resources is already allocated, this will delay
+  /// until one of them is released.
+  Future<PoolResource> request() {
+    if (isClosed) {
+      throw StateError('request() may not be called on a closed Pool.');
+    }
+
+    if (_allocatedResources < _maxAllocatedResources) {
+      _allocatedResources++;
+      return Future.value(PoolResource._(this));
+    } else if (_onReleaseCallbacks.isNotEmpty) {
+      return _runOnRelease(_onReleaseCallbacks.removeFirst());
+    } else {
+      var completer = Completer<PoolResource>();
+      _requestedResources.add(completer);
+      _resetTimer();
+      return completer.future;
+    }
+  }
+
+  /// Requests a resource for the duration of [callback], which may return a
+  /// Future.
+  ///
+  /// The return value of [callback] is piped to the returned Future.
+  Future<T> withResource<T>(FutureOr<T> Function() callback) async {
+    if (isClosed) {
+      throw StateError('withResource() may not be called on a closed Pool.');
+    }
+
+    var resource = await request();
+    try {
+      return await callback();
+    } finally {
+      resource.release();
+    }
+  }
+
+  /// Returns a [Stream] containing the result of [action] applied to each
+  /// element of [elements].
+  ///
+  /// While [action] is invoked on each element of [elements] in order,
+  /// it's possible the return [Stream] may have items out-of-order – especially
+  /// if the completion time of [action] varies.
+  ///
+  /// If [action] throws an error the source item along with the error object
+  /// and [StackTrace] are passed to [onError], if it is provided. If [onError]
+  /// returns `true`, the error is added to the returned [Stream], otherwise
+  /// it is ignored.
+  ///
+  /// Errors thrown from iterating [elements] will not be passed to
+  /// [onError]. They will always be added to the returned stream as an error.
+  ///
+  /// Note: all of the resources of the this [Pool] will be used when the
+  /// returned [Stream] is listened to until it is completed or canceled.
+  ///
+  /// Note: if this [Pool] is closed before the returned [Stream] is listened
+  /// to, a [StateError] is thrown.
+  Stream<T> forEach<S, T>(
+      Iterable<S> elements, FutureOr<T> Function(S source) action,
+      {bool Function(S item, Object error, StackTrace stack)? onError}) {
+    onError ??= (item, e, s) => true;
+
+    var cancelPending = false;
+
+    Completer? resumeCompleter;
+    late StreamController<T> controller;
+
+    late Iterator<S> iterator;
+
+    Future<void> run(int _) async {
+      while (iterator.moveNext()) {
+        // caching `current` is necessary because there are async breaks
+        // in this code and `iterator` is shared across many workers
+        final current = iterator.current;
+
+        _resetTimer();
+
+        if (resumeCompleter != null) {
+          await resumeCompleter!.future;
+        }
+
+        if (cancelPending) {
+          break;
+        }
+
+        T value;
+        try {
+          value = await action(current);
+        } catch (e, stack) {
+          if (onError!(current, e, stack)) {
+            controller.addError(e, stack);
+          }
+          continue;
+        }
+        controller.add(value);
+      }
+    }
+
+    Future<void>? doneFuture;
+
+    void onListen() {
+      iterator = elements.iterator;
+
+      assert(doneFuture == null);
+      var futures = Iterable<Future<void>>.generate(
+          _maxAllocatedResources, (i) => withResource(() => run(i)));
+      doneFuture = Future.wait(futures, eagerError: true)
+          .then<void>((_) {})
+          .catchError(controller.addError);
+
+      doneFuture!.whenComplete(controller.close);
+    }
+
+    controller = StreamController<T>(
+      sync: true,
+      onListen: onListen,
+      onCancel: () async {
+        assert(!cancelPending);
+        cancelPending = true;
+        await doneFuture;
+      },
+      onPause: () {
+        assert(resumeCompleter == null);
+        resumeCompleter = Completer<void>();
+      },
+      onResume: () {
+        assert(resumeCompleter != null);
+        resumeCompleter!.complete();
+        resumeCompleter = null;
+      },
+    );
+
+    return controller.stream;
+  }
+
+  /// Closes the pool so that no more resources are requested.
+  ///
+  /// Existing resource requests remain unchanged.
+  ///
+  /// Any resources that are marked as releasable using
+  /// [PoolResource.allowRelease] are released immediately. Once all resources
+  /// have been released and any `onRelease` callbacks have completed, the
+  /// returned future completes successfully. If any `onRelease` callback throws
+  /// an error, the returned future completes with that error.
+  ///
+  /// This may be called more than once; it returns the same [Future] each time.
+  Future close() => _closeMemo.runOnce(_close);
+
+  Future<void> _close() {
+    if (_closeGroup != null) return _closeGroup!.future;
+
+    _resetTimer();
+
+    _closeGroup = FutureGroup();
+    for (var callback in _onReleaseCallbacks) {
+      _closeGroup!.add(Future.sync(callback));
+    }
+
+    _allocatedResources -= _onReleaseCallbacks.length;
+    _onReleaseCallbacks.clear();
+
+    if (_allocatedResources == 0) _closeGroup!.close();
+    return _closeGroup!.future;
+  }
+
+  final _closeMemo = AsyncMemoizer<void>();
+
+  /// If there are any pending requests, this will fire the oldest one.
+  void _onResourceReleased() {
+    _resetTimer();
+
+    if (_requestedResources.isNotEmpty) {
+      var pending = _requestedResources.removeFirst();
+      pending.complete(PoolResource._(this));
+    } else {
+      _allocatedResources--;
+      if (isClosed && _allocatedResources == 0) _closeGroup!.close();
+    }
+  }
+
+  /// If there are any pending requests, this will fire the oldest one after
+  /// running [onRelease].
+  void _onResourceReleaseAllowed(void Function() onRelease) {
+    _resetTimer();
+
+    if (_requestedResources.isNotEmpty) {
+      var pending = _requestedResources.removeFirst();
+      pending.complete(_runOnRelease(onRelease));
+    } else if (isClosed) {
+      _closeGroup!.add(Future.sync(onRelease));
+      _allocatedResources--;
+      if (_allocatedResources == 0) _closeGroup!.close();
+    } else {
+      var zone = Zone.current;
+      var registered = zone.registerCallback(onRelease);
+      _onReleaseCallbacks.add(() => zone.run(registered));
+    }
+  }
+
+  /// Runs [onRelease] and returns a Future that completes to a resource once an
+  /// [onRelease] callback completes.
+  ///
+  /// Futures returned by [_runOnRelease] always complete in the order they were
+  /// created, even if earlier [onRelease] callbacks take longer to run.
+  Future<PoolResource> _runOnRelease(void Function() onRelease) {
+    Future.sync(onRelease).then((value) {
+      _onReleaseCompleters.removeFirst().complete(PoolResource._(this));
+    }).catchError((Object error, StackTrace stackTrace) {
+      _onReleaseCompleters.removeFirst().completeError(error, stackTrace);
+    });
+
+    var completer = Completer<PoolResource>.sync();
+    _onReleaseCompleters.add(completer);
+    return completer.future;
+  }
+
+  /// A resource has been requested, allocated, or released.
+  void _resetTimer() {
+    if (_timer == null) return;
+
+    if (_requestedResources.isEmpty) {
+      _timer!.cancel();
+    } else {
+      _timer!.reset();
+    }
+  }
+
+  /// Handles [_timer] timing out by causing all pending resource completers to
+  /// emit exceptions.
+  void _onTimeout() {
+    for (var completer in _requestedResources) {
+      completer.completeError(
+          TimeoutException(
+              'Pool deadlock: all resources have been '
+              'allocated for too long.',
+              _timeout),
+          Chain.current());
+    }
+    _requestedResources.clear();
+    _timer = null;
+  }
+}
+
+/// A member of a [Pool].
+///
+/// A [PoolResource] is a token that indicates that a resource is allocated.
+/// When the associated resource is released, the user should call [release].
+class PoolResource {
+  final Pool _pool;
+
+  /// Whether `this` has been released yet.
+  bool _released = false;
+
+  PoolResource._(this._pool);
+
+  /// Tells the parent [Pool] that the resource associated with this resource is
+  /// no longer allocated, and that a new [PoolResource] may be allocated.
+  void release() {
+    if (_released) {
+      throw StateError('A PoolResource may only be released once.');
+    }
+    _released = true;
+    _pool._onResourceReleased();
+  }
+
+  /// Tells the parent [Pool] that the resource associated with this resource is
+  /// no longer necessary, but should remain allocated until more resources are
+  /// needed.
+  ///
+  /// When [Pool.request] is called and there are no remaining available
+  /// resources, the [onRelease] callback is called. It should free the
+  /// resource, and it may return a Future or `null`. Once that completes, the
+  /// [Pool.request] call will complete to a new [PoolResource].
+  ///
+  /// This is useful when a resource's main function is complete, but it may
+  /// produce additional information later on. For example, an isolate's task
+  /// may be complete, but it could still emit asynchronous errors.
+  void allowRelease(FutureOr<void> Function() onRelease) {
+    if (_released) {
+      throw StateError('A PoolResource may only be released once.');
+    }
+    _released = true;
+    _pool._onResourceReleaseAllowed(onRelease);
+  }
+}
diff --git a/pkgs/pool/pubspec.yaml b/pkgs/pool/pubspec.yaml
new file mode 100644
index 0000000..a205b74
--- /dev/null
+++ b/pkgs/pool/pubspec.yaml
@@ -0,0 +1,18 @@
+name: pool
+version: 1.5.2-wip
+description: >-
+  Manage a finite pool of resources.
+  Useful for controlling concurrent file system or network requests.
+repository: https://github.com/dart-lang/tools/tree/main/pkgs/pool
+
+environment:
+  sdk: ^3.4.0
+
+dependencies:
+  async: ^2.5.0
+  stack_trace: ^1.10.0
+
+dev_dependencies:
+  dart_flutter_team_lints: ^3.0.0
+  fake_async: ^1.2.0
+  test: ^1.16.6
diff --git a/pkgs/pool/test/pool_test.dart b/pkgs/pool/test/pool_test.dart
new file mode 100644
index 0000000..6334a8a
--- /dev/null
+++ b/pkgs/pool/test/pool_test.dart
@@ -0,0 +1,745 @@
+// Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:fake_async/fake_async.dart';
+import 'package:pool/pool.dart';
+import 'package:stack_trace/stack_trace.dart';
+import 'package:test/test.dart';
+
+void main() {
+  group('request()', () {
+    test('resources can be requested freely up to the limit', () {
+      var pool = Pool(50);
+      for (var i = 0; i < 50; i++) {
+        expect(pool.request(), completes);
+      }
+    });
+
+    test('resources block past the limit', () {
+      FakeAsync().run((async) {
+        var pool = Pool(50);
+        for (var i = 0; i < 50; i++) {
+          expect(pool.request(), completes);
+        }
+        expect(pool.request(), doesNotComplete);
+
+        async.elapse(const Duration(seconds: 1));
+      });
+    });
+
+    test('a blocked resource is allocated when another is released', () {
+      FakeAsync().run((async) {
+        var pool = Pool(50);
+        for (var i = 0; i < 49; i++) {
+          expect(pool.request(), completes);
+        }
+
+        pool.request().then((lastAllocatedResource) {
+          // This will only complete once [lastAllocatedResource] is released.
+          expect(pool.request(), completes);
+
+          Future<void>.delayed(const Duration(microseconds: 1)).then((_) {
+            lastAllocatedResource.release();
+          });
+        });
+
+        async.elapse(const Duration(seconds: 1));
+      });
+    });
+  });
+
+  group('withResource()', () {
+    test('can be called freely up to the limit', () {
+      var pool = Pool(50);
+      for (var i = 0; i < 50; i++) {
+        pool.withResource(expectAsync0(() => Completer<void>().future));
+      }
+    });
+
+    test('blocks the callback past the limit', () {
+      FakeAsync().run((async) {
+        var pool = Pool(50);
+        for (var i = 0; i < 50; i++) {
+          pool.withResource(expectAsync0(() => Completer<void>().future));
+        }
+        pool.withResource(expectNoAsync());
+
+        async.elapse(const Duration(seconds: 1));
+      });
+    });
+
+    test('a blocked resource is allocated when another is released', () {
+      FakeAsync().run((async) {
+        var pool = Pool(50);
+        for (var i = 0; i < 49; i++) {
+          pool.withResource(expectAsync0(() => Completer<void>().future));
+        }
+
+        var completer = Completer<void>();
+        pool.withResource(() => completer.future);
+        var blockedResourceAllocated = false;
+        pool.withResource(() {
+          blockedResourceAllocated = true;
+        });
+
+        Future<void>.delayed(const Duration(microseconds: 1)).then((_) {
+          expect(blockedResourceAllocated, isFalse);
+          completer.complete();
+          return Future<void>.delayed(const Duration(microseconds: 1));
+        }).then((_) {
+          expect(blockedResourceAllocated, isTrue);
+        });
+
+        async.elapse(const Duration(seconds: 1));
+      });
+    });
+
+    // Regression test for #3.
+    test('can be called immediately before close()', () async {
+      var pool = Pool(1);
+      unawaited(pool.withResource(expectAsync0(() {})));
+      await pool.close();
+    });
+  });
+
+  group('with a timeout', () {
+    test("doesn't time out if there are no pending requests", () {
+      FakeAsync().run((async) {
+        var pool = Pool(50, timeout: const Duration(seconds: 5));
+        for (var i = 0; i < 50; i++) {
+          expect(pool.request(), completes);
+        }
+
+        async.elapse(const Duration(seconds: 6));
+      });
+    });
+
+    test('resets the timer if a resource is returned', () {
+      FakeAsync().run((async) {
+        var pool = Pool(50, timeout: const Duration(seconds: 5));
+        for (var i = 0; i < 49; i++) {
+          expect(pool.request(), completes);
+        }
+
+        pool.request().then((lastAllocatedResource) {
+          // This will only complete once [lastAllocatedResource] is released.
+          expect(pool.request(), completes);
+
+          Future<void>.delayed(const Duration(seconds: 3)).then((_) {
+            lastAllocatedResource.release();
+            expect(pool.request(), doesNotComplete);
+          });
+        });
+
+        async.elapse(const Duration(seconds: 6));
+      });
+    });
+
+    test('resets the timer if a resource is requested', () {
+      FakeAsync().run((async) {
+        var pool = Pool(50, timeout: const Duration(seconds: 5));
+        for (var i = 0; i < 50; i++) {
+          expect(pool.request(), completes);
+        }
+        expect(pool.request(), doesNotComplete);
+
+        Future<void>.delayed(const Duration(seconds: 3)).then((_) {
+          expect(pool.request(), doesNotComplete);
+        });
+
+        async.elapse(const Duration(seconds: 6));
+      });
+    });
+
+    test('times out if nothing happens', () {
+      FakeAsync().run((async) {
+        var pool = Pool(50, timeout: const Duration(seconds: 5));
+        for (var i = 0; i < 50; i++) {
+          expect(pool.request(), completes);
+        }
+        expect(pool.request(), throwsA(const TypeMatcher<TimeoutException>()));
+
+        async.elapse(const Duration(seconds: 6));
+      });
+    });
+  });
+
+  group('allowRelease()', () {
+    test('runs the callback once the resource limit is exceeded', () async {
+      var pool = Pool(50);
+      for (var i = 0; i < 49; i++) {
+        expect(pool.request(), completes);
+      }
+
+      var resource = await pool.request();
+      var onReleaseCalled = false;
+      resource.allowRelease(() => onReleaseCalled = true);
+      await Future<void>.delayed(Duration.zero);
+      expect(onReleaseCalled, isFalse);
+
+      expect(pool.request(), completes);
+      await Future<void>.delayed(Duration.zero);
+      expect(onReleaseCalled, isTrue);
+    });
+
+    test('runs the callback immediately if there are blocked requests',
+        () async {
+      var pool = Pool(1);
+      var resource = await pool.request();
+
+      // This will be blocked until [resource.allowRelease] is called.
+      expect(pool.request(), completes);
+
+      var onReleaseCalled = false;
+      resource.allowRelease(() => onReleaseCalled = true);
+      await Future<void>.delayed(Duration.zero);
+      expect(onReleaseCalled, isTrue);
+    });
+
+    test('blocks the request until the callback completes', () async {
+      var pool = Pool(1);
+      var resource = await pool.request();
+
+      var requestComplete = false;
+      unawaited(pool.request().then((_) => requestComplete = true));
+
+      var completer = Completer<void>();
+      resource.allowRelease(() => completer.future);
+      await Future<void>.delayed(Duration.zero);
+      expect(requestComplete, isFalse);
+
+      completer.complete();
+      await Future<void>.delayed(Duration.zero);
+      expect(requestComplete, isTrue);
+    });
+
+    test('completes requests in request order regardless of callback order',
+        () async {
+      var pool = Pool(2);
+      var resource1 = await pool.request();
+      var resource2 = await pool.request();
+
+      var request1Complete = false;
+      unawaited(pool.request().then((_) => request1Complete = true));
+      var request2Complete = false;
+      unawaited(pool.request().then((_) => request2Complete = true));
+
+      var onRelease1Called = false;
+      var completer1 = Completer<void>();
+      resource1.allowRelease(() {
+        onRelease1Called = true;
+        return completer1.future;
+      });
+      await Future<void>.delayed(Duration.zero);
+      expect(onRelease1Called, isTrue);
+
+      var onRelease2Called = false;
+      var completer2 = Completer<void>();
+      resource2.allowRelease(() {
+        onRelease2Called = true;
+        return completer2.future;
+      });
+      await Future<void>.delayed(Duration.zero);
+      expect(onRelease2Called, isTrue);
+      expect(request1Complete, isFalse);
+      expect(request2Complete, isFalse);
+
+      // Complete the second resource's onRelease callback first. Even though it
+      // was triggered by the second blocking request, it should complete the
+      // first one to preserve ordering.
+      completer2.complete();
+      await Future<void>.delayed(Duration.zero);
+      expect(request1Complete, isTrue);
+      expect(request2Complete, isFalse);
+
+      completer1.complete();
+      await Future<void>.delayed(Duration.zero);
+      expect(request1Complete, isTrue);
+      expect(request2Complete, isTrue);
+    });
+
+    test('runs onRequest in the zone it was created', () async {
+      var pool = Pool(1);
+      var resource = await pool.request();
+
+      var outerZone = Zone.current;
+      runZoned(() {
+        var innerZone = Zone.current;
+        expect(innerZone, isNot(equals(outerZone)));
+
+        resource.allowRelease(expectAsync0(() {
+          expect(Zone.current, equals(innerZone));
+        }));
+      });
+
+      await pool.request();
+    });
+  });
+
+  test("done doesn't complete without close", () async {
+    var pool = Pool(1);
+    unawaited(pool.done.then(expectAsync1((_) {}, count: 0)));
+
+    var resource = await pool.request();
+    resource.release();
+
+    await Future<void>.delayed(Duration.zero);
+  });
+
+  group('close()', () {
+    test('disallows request() and withResource()', () {
+      var pool = Pool(1)..close();
+      expect(pool.request, throwsStateError);
+      expect(() => pool.withResource(() {}), throwsStateError);
+    });
+
+    test('pending requests are fulfilled', () async {
+      var pool = Pool(1);
+      var resource1 = await pool.request();
+      expect(
+          pool.request().then((resource2) {
+            resource2.release();
+          }),
+          completes);
+      expect(pool.done, completes);
+      expect(pool.close(), completes);
+      resource1.release();
+    });
+
+    test('pending requests are fulfilled with allowRelease', () async {
+      var pool = Pool(1);
+      var resource1 = await pool.request();
+
+      var completer = Completer<void>();
+      expect(
+          pool.request().then((resource2) {
+            expect(completer.isCompleted, isTrue);
+            resource2.release();
+          }),
+          completes);
+      expect(pool.close(), completes);
+
+      resource1.allowRelease(() => completer.future);
+      await Future<void>.delayed(Duration.zero);
+
+      completer.complete();
+    });
+
+    test("doesn't complete until all resources are released", () async {
+      var pool = Pool(2);
+      var resource1 = await pool.request();
+      var resource2 = await pool.request();
+      var resource3Future = pool.request();
+
+      var resource1Released = false;
+      var resource2Released = false;
+      var resource3Released = false;
+      expect(
+          pool.close().then((_) {
+            expect(resource1Released, isTrue);
+            expect(resource2Released, isTrue);
+            expect(resource3Released, isTrue);
+          }),
+          completes);
+
+      resource1Released = true;
+      resource1.release();
+      await Future<void>.delayed(Duration.zero);
+
+      resource2Released = true;
+      resource2.release();
+      await Future<void>.delayed(Duration.zero);
+
+      var resource3 = await resource3Future;
+      resource3Released = true;
+      resource3.release();
+    });
+
+    test('active onReleases complete as usual', () async {
+      var pool = Pool(1);
+      var resource = await pool.request();
+
+      // Set up an onRelease callback whose completion is controlled by
+      // [completer].
+      var completer = Completer<void>();
+      resource.allowRelease(() => completer.future);
+      expect(
+          pool.request().then((_) {
+            expect(completer.isCompleted, isTrue);
+          }),
+          completes);
+
+      await Future<void>.delayed(Duration.zero);
+      unawaited(pool.close());
+
+      await Future<void>.delayed(Duration.zero);
+      completer.complete();
+    });
+
+    test('inactive onReleases fire', () async {
+      var pool = Pool(2);
+      var resource1 = await pool.request();
+      var resource2 = await pool.request();
+
+      var completer1 = Completer<void>();
+      resource1.allowRelease(() => completer1.future);
+      var completer2 = Completer<void>();
+      resource2.allowRelease(() => completer2.future);
+
+      expect(
+          pool.close().then((_) {
+            expect(completer1.isCompleted, isTrue);
+            expect(completer2.isCompleted, isTrue);
+          }),
+          completes);
+
+      await Future<void>.delayed(Duration.zero);
+      completer1.complete();
+
+      await Future<void>.delayed(Duration.zero);
+      completer2.complete();
+    });
+
+    test('new allowReleases fire immediately', () async {
+      var pool = Pool(1);
+      var resource = await pool.request();
+
+      var completer = Completer<void>();
+      expect(
+          pool.close().then((_) {
+            expect(completer.isCompleted, isTrue);
+          }),
+          completes);
+
+      await Future<void>.delayed(Duration.zero);
+      resource.allowRelease(() => completer.future);
+
+      await Future<void>.delayed(Duration.zero);
+      completer.complete();
+    });
+
+    test('an onRelease error is piped to the return value', () async {
+      var pool = Pool(1);
+      var resource = await pool.request();
+
+      var completer = Completer<void>();
+      resource.allowRelease(() => completer.future);
+
+      expect(pool.done, throwsA('oh no!'));
+      expect(pool.close(), throwsA('oh no!'));
+
+      await Future<void>.delayed(Duration.zero);
+      completer.completeError('oh no!');
+    });
+  });
+
+  group('forEach', () {
+    late Pool pool;
+
+    tearDown(() async {
+      await pool.close();
+    });
+
+    const delayedToStringDuration = Duration(milliseconds: 10);
+
+    Future<String> delayedToString(int i) =>
+        Future<String>.delayed(delayedToStringDuration, () => i.toString());
+
+    for (var itemCount in [0, 5]) {
+      for (var poolSize in [1, 5, 6]) {
+        test('poolSize: $poolSize, itemCount: $itemCount', () async {
+          pool = Pool(poolSize);
+
+          var finishedItems = 0;
+
+          await for (var item in pool.forEach(
+              Iterable.generate(itemCount, (i) {
+                expect(i, lessThanOrEqualTo(finishedItems + poolSize),
+                    reason: 'the iterator should be called lazily');
+                return i;
+              }),
+              delayedToString)) {
+            expect(int.parse(item), lessThan(itemCount));
+            finishedItems++;
+          }
+
+          expect(finishedItems, itemCount);
+        });
+      }
+    }
+
+    test('pool closed before listen', () async {
+      pool = Pool(2);
+
+      var stream = pool.forEach(Iterable<int>.generate(5), delayedToString);
+
+      await pool.close();
+
+      expect(stream.toList(), throwsStateError);
+    });
+
+    test('completes even if the pool is partially used', () async {
+      pool = Pool(2);
+
+      var resource = await pool.request();
+
+      var stream = pool.forEach(<int>[], delayedToString);
+
+      expect(await stream.length, 0);
+
+      resource.release();
+    });
+
+    test('stream paused longer than timeout', () async {
+      pool = Pool(2, timeout: delayedToStringDuration);
+
+      var resource = await pool.request();
+
+      var stream = pool.forEach<int, int>(
+          Iterable.generate(100, (i) {
+            expect(i, lessThan(20),
+                reason: 'The timeout should happen '
+                    'before the entire iterable is iterated.');
+            return i;
+          }), (i) async {
+        await Future<void>.delayed(Duration(milliseconds: i));
+        return i;
+      });
+
+      await expectLater(
+          stream.toList,
+          throwsA(const TypeMatcher<TimeoutException>().having(
+              (te) => te.message,
+              'message',
+              contains('Pool deadlock: '
+                  'all resources have been allocated for too long.'))));
+
+      resource.release();
+    });
+
+    group('timing and timeout', () {
+      for (var poolSize in [2, 8, 64]) {
+        for (var otherTaskCount
+            in [0, 1, 7, 63].where((otc) => otc < poolSize)) {
+          test('poolSize: $poolSize, otherTaskCount: $otherTaskCount',
+              () async {
+            final itemCount = 128;
+            pool = Pool(poolSize, timeout: const Duration(milliseconds: 20));
+
+            var otherTasks = await Future.wait(
+                Iterable<int>.generate(otherTaskCount)
+                    .map((i) => pool.request()));
+
+            try {
+              var finishedItems = 0;
+
+              var watch = Stopwatch()..start();
+
+              await for (var item in pool.forEach(
+                  Iterable.generate(itemCount, (i) {
+                    expect(i, lessThanOrEqualTo(finishedItems + poolSize),
+                        reason: 'the iterator should be called lazily');
+                    return i;
+                  }),
+                  delayedToString)) {
+                expect(int.parse(item), lessThan(itemCount));
+                finishedItems++;
+              }
+
+              expect(finishedItems, itemCount);
+
+              final expectedElapsed =
+                  delayedToStringDuration.inMicroseconds * 4;
+
+              expect((watch.elapsed ~/ itemCount).inMicroseconds,
+                  lessThan(expectedElapsed / (poolSize - otherTaskCount)),
+                  reason: 'Average time per task should be '
+                      'proportionate to the available pool resources.');
+            } finally {
+              for (var task in otherTasks) {
+                task.release();
+              }
+            }
+          });
+        }
+      }
+    }, testOn: 'vm');
+
+    test('partial iteration', () async {
+      pool = Pool(5);
+      var stream = pool.forEach(Iterable<int>.generate(100), delayedToString);
+      expect(await stream.take(10).toList(), hasLength(10));
+    });
+
+    test('pool close during data with waiting to be done', () async {
+      pool = Pool(5);
+
+      var stream = pool.forEach(Iterable<int>.generate(100), delayedToString);
+
+      var dataCount = 0;
+      var subscription = stream.listen((data) {
+        dataCount++;
+        pool.close();
+      });
+
+      await subscription.asFuture<void>();
+      expect(dataCount, 100);
+      await subscription.cancel();
+    });
+
+    test('pause and resume ', () async {
+      var generatedCount = 0;
+      var dataCount = 0;
+      final poolSize = 5;
+
+      pool = Pool(poolSize);
+
+      var stream = pool.forEach(
+          Iterable<int>.generate(40, (i) {
+            expect(generatedCount, lessThanOrEqualTo(dataCount + 2 * poolSize),
+                reason: 'The iterator should not be called '
+                    'much faster than the data is consumed.');
+            generatedCount++;
+            return i;
+          }),
+          delayedToString);
+
+      // ignore: cancel_subscriptions
+      late StreamSubscription subscription;
+
+      subscription = stream.listen(
+        (data) {
+          dataCount++;
+
+          if (int.parse(data) % 3 == 1) {
+            subscription.pause(Future(() async {
+              await Future<void>.delayed(const Duration(milliseconds: 100));
+            }));
+          }
+        },
+        onError: registerException,
+        onDone: expectAsync0(() {
+          expect(dataCount, 40);
+        }),
+      );
+    });
+
+    group('cancel', () {
+      final dataSize = 32;
+      for (var i = 1; i < 5; i++) {
+        test('with pool size $i', () async {
+          pool = Pool(i);
+
+          var stream =
+              pool.forEach(Iterable<int>.generate(dataSize), delayedToString);
+
+          var cancelCompleter = Completer<void>();
+
+          StreamSubscription subscription;
+
+          var eventCount = 0;
+          subscription = stream.listen((data) {
+            eventCount++;
+            if (int.parse(data) == dataSize ~/ 2) {
+              cancelCompleter.complete();
+            }
+          }, onError: registerException);
+
+          await cancelCompleter.future;
+
+          await subscription.cancel();
+
+          expect(eventCount, 1 + dataSize ~/ 2);
+        });
+      }
+    });
+
+    group('errors', () {
+      Future<void> errorInIterator({
+        bool Function(int item, Object error, StackTrace stack)? onError,
+      }) async {
+        pool = Pool(20);
+
+        var listFuture = pool
+            .forEach(
+                Iterable.generate(100, (i) {
+                  if (i == 50) {
+                    throw StateError('error while generating item in iterator');
+                  }
+
+                  return i;
+                }),
+                delayedToString,
+                onError: onError)
+            .toList();
+
+        await expectLater(() async => listFuture, throwsStateError);
+      }
+
+      test('iteration, no onError', () async {
+        await errorInIterator();
+      });
+      test('iteration, with onError', () async {
+        await errorInIterator(onError: (i, e, s) => false);
+      });
+
+      test('error in action, no onError', () async {
+        pool = Pool(20);
+
+        var listFuture = pool.forEach(Iterable<int>.generate(100), (i) async {
+          await Future<void>.delayed(const Duration(milliseconds: 10));
+          if (i == 10) {
+            throw UnsupportedError('10 is not supported');
+          }
+          return i.toString();
+        }).toList();
+
+        await expectLater(() async => listFuture, throwsUnsupportedError);
+      });
+
+      test('error in action, no onError', () async {
+        pool = Pool(20);
+
+        var list = await pool.forEach(Iterable<int>.generate(100),
+            (int i) async {
+          await Future<void>.delayed(const Duration(milliseconds: 10));
+          if (i % 10 == 0) {
+            throw UnsupportedError('Multiples of 10 not supported');
+          }
+          return i.toString();
+        },
+            onError: (item, error, stack) =>
+                error is! UnsupportedError).toList();
+
+        expect(list, hasLength(90));
+      });
+    });
+  });
+
+  test('throw error when pool limit <= 0', () {
+    expect(() => Pool(-1), throwsArgumentError);
+    expect(() => Pool(0), throwsArgumentError);
+  });
+}
+
+/// Returns a function that will cause the test to fail if it's called.
+///
+/// This should only be called within a [FakeAsync.run] zone.
+void Function() expectNoAsync() {
+  var stack = Trace.current(1);
+  return () => registerException(
+      TestFailure('Expected function not to be called.'), stack);
+}
+
+/// A matcher for Futures that asserts that they don't complete.
+///
+/// This should only be called within a [FakeAsync.run] zone.
+Matcher get doesNotComplete => predicate((Future future) {
+      var stack = Trace.current(1);
+      future.then((_) => registerException(
+          TestFailure('Expected future not to complete.'), stack));
+      return true;
+    });