add forEach to Pool (#23)

Fixes https://github.com/dart-lang/pool/issues/22
diff --git a/.travis.yml b/.travis.yml
index a9b5166..07e132f 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -7,7 +7,7 @@
   - test: --platform vm
   - test: --platform firefox
   - dartfmt
-  - dartanalyzer
+  - dartanalyzer: --fatal-infos --fatal-warnings .
 
 # Only building master means that we don't run two builds for each pull request.
 branches:
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 10727a5..0c3d314 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,7 @@
-## x.x.x
+## 1.4.0
+
+* Add `forEach` to `Pool` to support efficient async processing of an
+  `Iterable`.
 
 * Throw ArgumentError if poolSize <= 0
 
diff --git a/lib/pool.dart b/lib/pool.dart
index a97ec75..779300e 100644
--- a/lib/pool.dart
+++ b/lib/pool.dart
@@ -130,6 +130,103 @@
     }
   }
 
+  /// Returns a [Stream] containing the result of [action] applied to each
+  /// element of [elements].
+  ///
+  /// While [action] is invoked on each element of [elements] in order,
+  /// it's possible the return [Stream] may have items out-of-order – especially
+  /// if the completion time of [action] varies.
+  ///
+  /// If [action] throws an error the source item along with the error object
+  /// and [StackTrace] are passed to [onError], if it is provided. If [onError]
+  /// returns `true`, the error is added to the returned [Stream], otherwise
+  /// it is ignored.
+  ///
+  /// Errors thrown from iterating [elements] will not be passed to
+  /// [onError]. They will always be added to the returned stream as an error.
+  ///
+  /// Note: all of the resources of the this [Pool] will be used when the
+  /// returned [Stream] is listened to until it is completed or canceled.
+  ///
+  /// Note: if this [Pool] is closed before the returned [Stream] is listened
+  /// to, a [StateError] is thrown.
+  Stream<T> forEach<S, T>(
+      Iterable<S> elements, FutureOr<T> Function(S source) action,
+      {bool Function(S item, Object error, StackTrace stack) onError}) {
+    onError ??= (item, e, s) => true;
+
+    var cancelPending = false;
+
+    Completer resumeCompleter;
+    StreamController<T> controller;
+
+    Iterator<S> iterator;
+
+    Future<void> run(int i) async {
+      while (iterator.moveNext()) {
+        // caching `current` is necessary because there are async breaks
+        // in this code and `iterator` is shared across many workers
+        final current = iterator.current;
+
+        _resetTimer();
+
+        await resumeCompleter?.future;
+
+        if (cancelPending) {
+          break;
+        }
+
+        T value;
+        try {
+          value = await action(current);
+        } catch (e, stack) {
+          if (onError(current, e, stack)) {
+            controller.addError(e, stack);
+          }
+          continue;
+        }
+        controller.add(value);
+      }
+    }
+
+    Future doneFuture;
+
+    void onListen() {
+      assert(iterator == null);
+      iterator = elements.iterator;
+
+      assert(doneFuture == null);
+      doneFuture = Future.wait(
+              Iterable<int>.generate(_maxAllocatedResources)
+                  .map((i) => withResource(() => run(i))),
+              eagerError: true)
+          .catchError(controller.addError);
+
+      doneFuture.whenComplete(controller.close);
+    }
+
+    controller = StreamController<T>(
+      sync: true,
+      onListen: onListen,
+      onCancel: () async {
+        assert(!cancelPending);
+        cancelPending = true;
+        await doneFuture;
+      },
+      onPause: () {
+        assert(resumeCompleter == null);
+        resumeCompleter = Completer();
+      },
+      onResume: () {
+        assert(resumeCompleter != null);
+        resumeCompleter.complete();
+        resumeCompleter = null;
+      },
+    );
+
+    return controller.stream;
+  }
+
   /// Closes the pool so that no more resources are requested.
   ///
   /// Existing resource requests remain unchanged.
diff --git a/pubspec.yaml b/pubspec.yaml
index a709842..095c385 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: pool
-version: 1.3.7-dev
+version: 1.4.0-dev
 
 description: A class for managing a finite pool of resources.
 author: Dart Team <misc@dartlang.org>
diff --git a/test/pool_test.dart b/test/pool_test.dart
index 6c9d0a5..d4d7789 100644
--- a/test/pool_test.dart
+++ b/test/pool_test.dart
@@ -27,7 +27,7 @@
         }
         expect(pool.request(), doesNotComplete);
 
-        async.elapse(Duration(seconds: 1));
+        async.elapse(const Duration(seconds: 1));
       });
     });
 
@@ -42,12 +42,12 @@
           // This will only complete once [lastAllocatedResource] is released.
           expect(pool.request(), completes);
 
-          Future.delayed(Duration(microseconds: 1)).then((_) {
+          Future.delayed(const Duration(microseconds: 1)).then((_) {
             lastAllocatedResource.release();
           });
         });
 
-        async.elapse(Duration(seconds: 1));
+        async.elapse(const Duration(seconds: 1));
       });
     });
   });
@@ -68,7 +68,7 @@
         }
         pool.withResource(expectNoAsync());
 
-        async.elapse(Duration(seconds: 1));
+        async.elapse(const Duration(seconds: 1));
       });
     });
 
@@ -86,15 +86,15 @@
           blockedResourceAllocated = true;
         });
 
-        Future.delayed(Duration(microseconds: 1)).then((_) {
+        Future.delayed(const Duration(microseconds: 1)).then((_) {
           expect(blockedResourceAllocated, isFalse);
           completer.complete();
-          return Future.delayed(Duration(microseconds: 1));
+          return Future.delayed(const Duration(microseconds: 1));
         }).then((_) {
           expect(blockedResourceAllocated, isTrue);
         });
 
-        async.elapse(Duration(seconds: 1));
+        async.elapse(const Duration(seconds: 1));
       });
     });
 
@@ -109,18 +109,18 @@
   group("with a timeout", () {
     test("doesn't time out if there are no pending requests", () {
       FakeAsync().run((async) {
-        var pool = Pool(50, timeout: Duration(seconds: 5));
+        var pool = Pool(50, timeout: const Duration(seconds: 5));
         for (var i = 0; i < 50; i++) {
           expect(pool.request(), completes);
         }
 
-        async.elapse(Duration(seconds: 6));
+        async.elapse(const Duration(seconds: 6));
       });
     });
 
     test("resets the timer if a resource is returned", () {
       FakeAsync().run((async) {
-        var pool = Pool(50, timeout: Duration(seconds: 5));
+        var pool = Pool(50, timeout: const Duration(seconds: 5));
         for (var i = 0; i < 49; i++) {
           expect(pool.request(), completes);
         }
@@ -129,41 +129,41 @@
           // This will only complete once [lastAllocatedResource] is released.
           expect(pool.request(), completes);
 
-          Future.delayed(Duration(seconds: 3)).then((_) {
+          Future.delayed(const Duration(seconds: 3)).then((_) {
             lastAllocatedResource.release();
             expect(pool.request(), doesNotComplete);
           });
         });
 
-        async.elapse(Duration(seconds: 6));
+        async.elapse(const Duration(seconds: 6));
       });
     });
 
     test("resets the timer if a resource is requested", () {
       FakeAsync().run((async) {
-        var pool = Pool(50, timeout: Duration(seconds: 5));
+        var pool = Pool(50, timeout: const Duration(seconds: 5));
         for (var i = 0; i < 50; i++) {
           expect(pool.request(), completes);
         }
         expect(pool.request(), doesNotComplete);
 
-        Future.delayed(Duration(seconds: 3)).then((_) {
+        Future.delayed(const Duration(seconds: 3)).then((_) {
           expect(pool.request(), doesNotComplete);
         });
 
-        async.elapse(Duration(seconds: 6));
+        async.elapse(const Duration(seconds: 6));
       });
     });
 
     test("times out if nothing happens", () {
       FakeAsync().run((async) {
-        var pool = Pool(50, timeout: Duration(seconds: 5));
+        var pool = Pool(50, timeout: const Duration(seconds: 5));
         for (var i = 0; i < 50; i++) {
           expect(pool.request(), completes);
         }
         expect(pool.request(), throwsA(const TypeMatcher<TimeoutException>()));
 
-        async.elapse(Duration(seconds: 6));
+        async.elapse(const Duration(seconds: 6));
       });
     });
   });
@@ -437,6 +437,288 @@
     });
   });
 
+  group('forEach', () {
+    Pool pool;
+
+    tearDown(() async {
+      await pool.close();
+    });
+
+    const delayedToStringDuration = Duration(milliseconds: 10);
+
+    Future<String> delayedToString(int i) =>
+        Future.delayed(delayedToStringDuration, () => i.toString());
+
+    for (var itemCount in [0, 5]) {
+      for (var poolSize in [1, 5, 6]) {
+        test('poolSize: $poolSize, itemCount: $itemCount', () async {
+          pool = Pool(poolSize);
+
+          var finishedItems = 0;
+
+          await for (var item in pool.forEach(
+              Iterable.generate(itemCount, (i) {
+                expect(i, lessThanOrEqualTo(finishedItems + poolSize),
+                    reason: 'the iterator should be called lazily');
+                return i;
+              }),
+              delayedToString)) {
+            expect(int.parse(item), lessThan(itemCount));
+            finishedItems++;
+          }
+
+          expect(finishedItems, itemCount);
+        });
+      }
+    }
+
+    test('pool closed before listen', () async {
+      pool = Pool(2);
+
+      var stream = pool.forEach(Iterable<int>.generate(5), delayedToString);
+
+      await pool.close();
+
+      expect(stream.toList(), throwsStateError);
+    });
+
+    test('completes even if the pool is partially used', () async {
+      pool = Pool(2);
+
+      var resource = await pool.request();
+
+      var stream = pool.forEach(<int>[], delayedToString);
+
+      expect(await stream.length, 0);
+
+      resource.release();
+    });
+
+    test('stream paused longer than timeout', () async {
+      pool = Pool(2, timeout: delayedToStringDuration);
+
+      var resource = await pool.request();
+
+      var stream = pool.forEach<int, int>(
+          Iterable.generate(100, (i) {
+            expect(i, lessThan(20),
+                reason: 'The timeout should happen '
+                    'before the entire iterable is iterated.');
+            return i;
+          }), (i) async {
+        await Future.delayed(Duration(milliseconds: i));
+        return i;
+      });
+
+      await expectLater(
+          stream.toList,
+          throwsA(const TypeMatcher<TimeoutException>().having(
+              (te) => te.message,
+              'message',
+              contains('Pool deadlock: '
+                  'all resources have been allocated for too long.'))));
+
+      resource.release();
+    });
+
+    group('timing and timeout', () {
+      for (var poolSize in [2, 8, 64]) {
+        for (var otherTaskCount
+            in [0, 1, 7, 63].where((otc) => otc < poolSize)) {
+          test('poolSize: $poolSize, otherTaskCount: $otherTaskCount',
+              () async {
+            final itemCount = 128;
+            pool = Pool(poolSize, timeout: const Duration(milliseconds: 20));
+
+            var otherTasks = await Future.wait(
+                Iterable<int>.generate(otherTaskCount)
+                    .map((i) => pool.request()));
+
+            try {
+              var finishedItems = 0;
+
+              var watch = Stopwatch()..start();
+
+              await for (var item in pool.forEach(
+                  Iterable.generate(itemCount, (i) {
+                    expect(i, lessThanOrEqualTo(finishedItems + poolSize),
+                        reason: 'the iterator should be called lazily');
+                    return i;
+                  }),
+                  delayedToString)) {
+                expect(int.parse(item), lessThan(itemCount));
+                finishedItems++;
+              }
+
+              expect(finishedItems, itemCount);
+
+              final expectedElapsed =
+                  delayedToStringDuration.inMicroseconds * 3;
+
+              expect((watch.elapsed ~/ itemCount).inMicroseconds,
+                  lessThan(expectedElapsed / (poolSize - otherTaskCount)),
+                  reason: 'Average time per task should be '
+                      'proportionate to the available pool resources.');
+            } finally {
+              for (var task in otherTasks) {
+                task.release();
+              }
+            }
+          });
+        }
+      }
+    }, testOn: 'vm');
+
+    test('partial iteration', () async {
+      pool = Pool(5);
+      var stream = pool.forEach(Iterable<int>.generate(100), delayedToString);
+      expect(await stream.take(10).toList(), hasLength(10));
+    });
+
+    test('pool close during data with waiting to be done', () async {
+      pool = Pool(5);
+
+      var stream = pool.forEach(Iterable<int>.generate(100), delayedToString);
+
+      var dataCount = 0;
+      var subscription = stream.listen((data) {
+        dataCount++;
+        pool.close();
+      });
+
+      await subscription.asFuture();
+      expect(dataCount, 100);
+      await subscription.cancel();
+    });
+
+    test('pause and resume ', () async {
+      var generatedCount = 0;
+      var dataCount = 0;
+      final poolSize = 5;
+
+      pool = Pool(poolSize);
+
+      var stream = pool.forEach(
+          Iterable<int>.generate(40, (i) {
+            expect(generatedCount, lessThanOrEqualTo(dataCount + 2 * poolSize),
+                reason: 'The iterator should not be called '
+                    'much faster than the data is consumed.');
+            generatedCount++;
+            return i;
+          }),
+          delayedToString);
+
+      // ignore: cancel_subscriptions
+      StreamSubscription subscription;
+
+      subscription = stream.listen(
+        (data) {
+          dataCount++;
+
+          if (int.parse(data) % 3 == 1) {
+            subscription.pause(Future(() async {
+              await Future.delayed(const Duration(milliseconds: 100));
+            }));
+          }
+        },
+        onError: registerException,
+        onDone: expectAsync0(() {
+          expect(dataCount, 40);
+        }),
+      );
+    });
+
+    group('cancel', () {
+      final dataSize = 32;
+      for (var i = 1; i < 5; i++) {
+        test('with pool size $i', () async {
+          pool = Pool(i);
+
+          var stream =
+              pool.forEach(Iterable<int>.generate(dataSize), delayedToString);
+
+          var cancelCompleter = Completer<void>();
+
+          StreamSubscription subscription;
+
+          var eventCount = 0;
+          subscription = stream.listen((data) {
+            eventCount++;
+            if (int.parse(data) == dataSize ~/ 2) {
+              cancelCompleter.complete();
+            }
+          }, onError: registerException);
+
+          await cancelCompleter.future;
+
+          await subscription.cancel();
+
+          expect(eventCount, 1 + dataSize ~/ 2);
+        });
+      }
+    });
+
+    group('errors', () {
+      Future<void> errorInIterator(
+          {bool Function(int item, Object error, StackTrace stack)
+              onError}) async {
+        pool = Pool(20);
+
+        var listFuture = pool
+            .forEach(
+                Iterable.generate(100, (i) {
+                  if (i == 50) {
+                    throw StateError('error while generating item in iterator');
+                  }
+
+                  return i;
+                }),
+                delayedToString,
+                onError: onError)
+            .toList();
+
+        await expectLater(() async => listFuture, throwsStateError);
+      }
+
+      test('iteration, no onError', () async {
+        await errorInIterator();
+      });
+      test('iteration, with onError', () async {
+        await errorInIterator(onError: (i, e, s) => false);
+      });
+
+      test('error in action, no onError', () async {
+        pool = Pool(20);
+
+        var listFuture = pool.forEach(Iterable<int>.generate(100), (i) async {
+          await Future.delayed(const Duration(milliseconds: 10));
+          if (i == 10) {
+            throw UnsupportedError('10 is not supported');
+          }
+          return i.toString();
+        }).toList();
+
+        await expectLater(() async => listFuture, throwsUnsupportedError);
+      });
+
+      test('error in action, no onError', () async {
+        pool = Pool(20);
+
+        var list = await pool.forEach(Iterable<int>.generate(100), (i) async {
+          await Future.delayed(const Duration(milliseconds: 10));
+          if (i % 10 == 0) {
+            throw UnsupportedError('Multiples of 10 not supported');
+          }
+          return i.toString();
+        },
+            onError: (item, error, stack) =>
+                error is! UnsupportedError).toList();
+
+        expect(list, hasLength(90));
+      });
+    });
+  });
+
   test("throw error when pool limit <= 0", () {
     expect(() => Pool(-1), throwsArgumentError);
     expect(() => Pool(0), throwsArgumentError);