add forEach to Pool (#23)
Fixes https://github.com/dart-lang/pool/issues/22
diff --git a/.travis.yml b/.travis.yml
index a9b5166..07e132f 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -7,7 +7,7 @@
- test: --platform vm
- test: --platform firefox
- dartfmt
- - dartanalyzer
+ - dartanalyzer: --fatal-infos --fatal-warnings .
# Only building master means that we don't run two builds for each pull request.
branches:
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 10727a5..0c3d314 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,7 @@
-## x.x.x
+## 1.4.0
+
+* Add `forEach` to `Pool` to support efficient async processing of an
+ `Iterable`.
* Throw ArgumentError if poolSize <= 0
diff --git a/lib/pool.dart b/lib/pool.dart
index a97ec75..779300e 100644
--- a/lib/pool.dart
+++ b/lib/pool.dart
@@ -130,6 +130,103 @@
}
}
+ /// Returns a [Stream] containing the result of [action] applied to each
+ /// element of [elements].
+ ///
+ /// While [action] is invoked on each element of [elements] in order,
+ /// it's possible the return [Stream] may have items out-of-order – especially
+ /// if the completion time of [action] varies.
+ ///
+ /// If [action] throws an error the source item along with the error object
+ /// and [StackTrace] are passed to [onError], if it is provided. If [onError]
+ /// returns `true`, the error is added to the returned [Stream], otherwise
+ /// it is ignored.
+ ///
+ /// Errors thrown from iterating [elements] will not be passed to
+ /// [onError]. They will always be added to the returned stream as an error.
+ ///
+ /// Note: all of the resources of the this [Pool] will be used when the
+ /// returned [Stream] is listened to until it is completed or canceled.
+ ///
+ /// Note: if this [Pool] is closed before the returned [Stream] is listened
+ /// to, a [StateError] is thrown.
+ Stream<T> forEach<S, T>(
+ Iterable<S> elements, FutureOr<T> Function(S source) action,
+ {bool Function(S item, Object error, StackTrace stack) onError}) {
+ onError ??= (item, e, s) => true;
+
+ var cancelPending = false;
+
+ Completer resumeCompleter;
+ StreamController<T> controller;
+
+ Iterator<S> iterator;
+
+ Future<void> run(int i) async {
+ while (iterator.moveNext()) {
+ // caching `current` is necessary because there are async breaks
+ // in this code and `iterator` is shared across many workers
+ final current = iterator.current;
+
+ _resetTimer();
+
+ await resumeCompleter?.future;
+
+ if (cancelPending) {
+ break;
+ }
+
+ T value;
+ try {
+ value = await action(current);
+ } catch (e, stack) {
+ if (onError(current, e, stack)) {
+ controller.addError(e, stack);
+ }
+ continue;
+ }
+ controller.add(value);
+ }
+ }
+
+ Future doneFuture;
+
+ void onListen() {
+ assert(iterator == null);
+ iterator = elements.iterator;
+
+ assert(doneFuture == null);
+ doneFuture = Future.wait(
+ Iterable<int>.generate(_maxAllocatedResources)
+ .map((i) => withResource(() => run(i))),
+ eagerError: true)
+ .catchError(controller.addError);
+
+ doneFuture.whenComplete(controller.close);
+ }
+
+ controller = StreamController<T>(
+ sync: true,
+ onListen: onListen,
+ onCancel: () async {
+ assert(!cancelPending);
+ cancelPending = true;
+ await doneFuture;
+ },
+ onPause: () {
+ assert(resumeCompleter == null);
+ resumeCompleter = Completer();
+ },
+ onResume: () {
+ assert(resumeCompleter != null);
+ resumeCompleter.complete();
+ resumeCompleter = null;
+ },
+ );
+
+ return controller.stream;
+ }
+
/// Closes the pool so that no more resources are requested.
///
/// Existing resource requests remain unchanged.
diff --git a/pubspec.yaml b/pubspec.yaml
index a709842..095c385 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: pool
-version: 1.3.7-dev
+version: 1.4.0-dev
description: A class for managing a finite pool of resources.
author: Dart Team <misc@dartlang.org>
diff --git a/test/pool_test.dart b/test/pool_test.dart
index 6c9d0a5..d4d7789 100644
--- a/test/pool_test.dart
+++ b/test/pool_test.dart
@@ -27,7 +27,7 @@
}
expect(pool.request(), doesNotComplete);
- async.elapse(Duration(seconds: 1));
+ async.elapse(const Duration(seconds: 1));
});
});
@@ -42,12 +42,12 @@
// This will only complete once [lastAllocatedResource] is released.
expect(pool.request(), completes);
- Future.delayed(Duration(microseconds: 1)).then((_) {
+ Future.delayed(const Duration(microseconds: 1)).then((_) {
lastAllocatedResource.release();
});
});
- async.elapse(Duration(seconds: 1));
+ async.elapse(const Duration(seconds: 1));
});
});
});
@@ -68,7 +68,7 @@
}
pool.withResource(expectNoAsync());
- async.elapse(Duration(seconds: 1));
+ async.elapse(const Duration(seconds: 1));
});
});
@@ -86,15 +86,15 @@
blockedResourceAllocated = true;
});
- Future.delayed(Duration(microseconds: 1)).then((_) {
+ Future.delayed(const Duration(microseconds: 1)).then((_) {
expect(blockedResourceAllocated, isFalse);
completer.complete();
- return Future.delayed(Duration(microseconds: 1));
+ return Future.delayed(const Duration(microseconds: 1));
}).then((_) {
expect(blockedResourceAllocated, isTrue);
});
- async.elapse(Duration(seconds: 1));
+ async.elapse(const Duration(seconds: 1));
});
});
@@ -109,18 +109,18 @@
group("with a timeout", () {
test("doesn't time out if there are no pending requests", () {
FakeAsync().run((async) {
- var pool = Pool(50, timeout: Duration(seconds: 5));
+ var pool = Pool(50, timeout: const Duration(seconds: 5));
for (var i = 0; i < 50; i++) {
expect(pool.request(), completes);
}
- async.elapse(Duration(seconds: 6));
+ async.elapse(const Duration(seconds: 6));
});
});
test("resets the timer if a resource is returned", () {
FakeAsync().run((async) {
- var pool = Pool(50, timeout: Duration(seconds: 5));
+ var pool = Pool(50, timeout: const Duration(seconds: 5));
for (var i = 0; i < 49; i++) {
expect(pool.request(), completes);
}
@@ -129,41 +129,41 @@
// This will only complete once [lastAllocatedResource] is released.
expect(pool.request(), completes);
- Future.delayed(Duration(seconds: 3)).then((_) {
+ Future.delayed(const Duration(seconds: 3)).then((_) {
lastAllocatedResource.release();
expect(pool.request(), doesNotComplete);
});
});
- async.elapse(Duration(seconds: 6));
+ async.elapse(const Duration(seconds: 6));
});
});
test("resets the timer if a resource is requested", () {
FakeAsync().run((async) {
- var pool = Pool(50, timeout: Duration(seconds: 5));
+ var pool = Pool(50, timeout: const Duration(seconds: 5));
for (var i = 0; i < 50; i++) {
expect(pool.request(), completes);
}
expect(pool.request(), doesNotComplete);
- Future.delayed(Duration(seconds: 3)).then((_) {
+ Future.delayed(const Duration(seconds: 3)).then((_) {
expect(pool.request(), doesNotComplete);
});
- async.elapse(Duration(seconds: 6));
+ async.elapse(const Duration(seconds: 6));
});
});
test("times out if nothing happens", () {
FakeAsync().run((async) {
- var pool = Pool(50, timeout: Duration(seconds: 5));
+ var pool = Pool(50, timeout: const Duration(seconds: 5));
for (var i = 0; i < 50; i++) {
expect(pool.request(), completes);
}
expect(pool.request(), throwsA(const TypeMatcher<TimeoutException>()));
- async.elapse(Duration(seconds: 6));
+ async.elapse(const Duration(seconds: 6));
});
});
});
@@ -437,6 +437,288 @@
});
});
+ group('forEach', () {
+ Pool pool;
+
+ tearDown(() async {
+ await pool.close();
+ });
+
+ const delayedToStringDuration = Duration(milliseconds: 10);
+
+ Future<String> delayedToString(int i) =>
+ Future.delayed(delayedToStringDuration, () => i.toString());
+
+ for (var itemCount in [0, 5]) {
+ for (var poolSize in [1, 5, 6]) {
+ test('poolSize: $poolSize, itemCount: $itemCount', () async {
+ pool = Pool(poolSize);
+
+ var finishedItems = 0;
+
+ await for (var item in pool.forEach(
+ Iterable.generate(itemCount, (i) {
+ expect(i, lessThanOrEqualTo(finishedItems + poolSize),
+ reason: 'the iterator should be called lazily');
+ return i;
+ }),
+ delayedToString)) {
+ expect(int.parse(item), lessThan(itemCount));
+ finishedItems++;
+ }
+
+ expect(finishedItems, itemCount);
+ });
+ }
+ }
+
+ test('pool closed before listen', () async {
+ pool = Pool(2);
+
+ var stream = pool.forEach(Iterable<int>.generate(5), delayedToString);
+
+ await pool.close();
+
+ expect(stream.toList(), throwsStateError);
+ });
+
+ test('completes even if the pool is partially used', () async {
+ pool = Pool(2);
+
+ var resource = await pool.request();
+
+ var stream = pool.forEach(<int>[], delayedToString);
+
+ expect(await stream.length, 0);
+
+ resource.release();
+ });
+
+ test('stream paused longer than timeout', () async {
+ pool = Pool(2, timeout: delayedToStringDuration);
+
+ var resource = await pool.request();
+
+ var stream = pool.forEach<int, int>(
+ Iterable.generate(100, (i) {
+ expect(i, lessThan(20),
+ reason: 'The timeout should happen '
+ 'before the entire iterable is iterated.');
+ return i;
+ }), (i) async {
+ await Future.delayed(Duration(milliseconds: i));
+ return i;
+ });
+
+ await expectLater(
+ stream.toList,
+ throwsA(const TypeMatcher<TimeoutException>().having(
+ (te) => te.message,
+ 'message',
+ contains('Pool deadlock: '
+ 'all resources have been allocated for too long.'))));
+
+ resource.release();
+ });
+
+ group('timing and timeout', () {
+ for (var poolSize in [2, 8, 64]) {
+ for (var otherTaskCount
+ in [0, 1, 7, 63].where((otc) => otc < poolSize)) {
+ test('poolSize: $poolSize, otherTaskCount: $otherTaskCount',
+ () async {
+ final itemCount = 128;
+ pool = Pool(poolSize, timeout: const Duration(milliseconds: 20));
+
+ var otherTasks = await Future.wait(
+ Iterable<int>.generate(otherTaskCount)
+ .map((i) => pool.request()));
+
+ try {
+ var finishedItems = 0;
+
+ var watch = Stopwatch()..start();
+
+ await for (var item in pool.forEach(
+ Iterable.generate(itemCount, (i) {
+ expect(i, lessThanOrEqualTo(finishedItems + poolSize),
+ reason: 'the iterator should be called lazily');
+ return i;
+ }),
+ delayedToString)) {
+ expect(int.parse(item), lessThan(itemCount));
+ finishedItems++;
+ }
+
+ expect(finishedItems, itemCount);
+
+ final expectedElapsed =
+ delayedToStringDuration.inMicroseconds * 3;
+
+ expect((watch.elapsed ~/ itemCount).inMicroseconds,
+ lessThan(expectedElapsed / (poolSize - otherTaskCount)),
+ reason: 'Average time per task should be '
+ 'proportionate to the available pool resources.');
+ } finally {
+ for (var task in otherTasks) {
+ task.release();
+ }
+ }
+ });
+ }
+ }
+ }, testOn: 'vm');
+
+ test('partial iteration', () async {
+ pool = Pool(5);
+ var stream = pool.forEach(Iterable<int>.generate(100), delayedToString);
+ expect(await stream.take(10).toList(), hasLength(10));
+ });
+
+ test('pool close during data with waiting to be done', () async {
+ pool = Pool(5);
+
+ var stream = pool.forEach(Iterable<int>.generate(100), delayedToString);
+
+ var dataCount = 0;
+ var subscription = stream.listen((data) {
+ dataCount++;
+ pool.close();
+ });
+
+ await subscription.asFuture();
+ expect(dataCount, 100);
+ await subscription.cancel();
+ });
+
+ test('pause and resume ', () async {
+ var generatedCount = 0;
+ var dataCount = 0;
+ final poolSize = 5;
+
+ pool = Pool(poolSize);
+
+ var stream = pool.forEach(
+ Iterable<int>.generate(40, (i) {
+ expect(generatedCount, lessThanOrEqualTo(dataCount + 2 * poolSize),
+ reason: 'The iterator should not be called '
+ 'much faster than the data is consumed.');
+ generatedCount++;
+ return i;
+ }),
+ delayedToString);
+
+ // ignore: cancel_subscriptions
+ StreamSubscription subscription;
+
+ subscription = stream.listen(
+ (data) {
+ dataCount++;
+
+ if (int.parse(data) % 3 == 1) {
+ subscription.pause(Future(() async {
+ await Future.delayed(const Duration(milliseconds: 100));
+ }));
+ }
+ },
+ onError: registerException,
+ onDone: expectAsync0(() {
+ expect(dataCount, 40);
+ }),
+ );
+ });
+
+ group('cancel', () {
+ final dataSize = 32;
+ for (var i = 1; i < 5; i++) {
+ test('with pool size $i', () async {
+ pool = Pool(i);
+
+ var stream =
+ pool.forEach(Iterable<int>.generate(dataSize), delayedToString);
+
+ var cancelCompleter = Completer<void>();
+
+ StreamSubscription subscription;
+
+ var eventCount = 0;
+ subscription = stream.listen((data) {
+ eventCount++;
+ if (int.parse(data) == dataSize ~/ 2) {
+ cancelCompleter.complete();
+ }
+ }, onError: registerException);
+
+ await cancelCompleter.future;
+
+ await subscription.cancel();
+
+ expect(eventCount, 1 + dataSize ~/ 2);
+ });
+ }
+ });
+
+ group('errors', () {
+ Future<void> errorInIterator(
+ {bool Function(int item, Object error, StackTrace stack)
+ onError}) async {
+ pool = Pool(20);
+
+ var listFuture = pool
+ .forEach(
+ Iterable.generate(100, (i) {
+ if (i == 50) {
+ throw StateError('error while generating item in iterator');
+ }
+
+ return i;
+ }),
+ delayedToString,
+ onError: onError)
+ .toList();
+
+ await expectLater(() async => listFuture, throwsStateError);
+ }
+
+ test('iteration, no onError', () async {
+ await errorInIterator();
+ });
+ test('iteration, with onError', () async {
+ await errorInIterator(onError: (i, e, s) => false);
+ });
+
+ test('error in action, no onError', () async {
+ pool = Pool(20);
+
+ var listFuture = pool.forEach(Iterable<int>.generate(100), (i) async {
+ await Future.delayed(const Duration(milliseconds: 10));
+ if (i == 10) {
+ throw UnsupportedError('10 is not supported');
+ }
+ return i.toString();
+ }).toList();
+
+ await expectLater(() async => listFuture, throwsUnsupportedError);
+ });
+
+ test('error in action, no onError', () async {
+ pool = Pool(20);
+
+ var list = await pool.forEach(Iterable<int>.generate(100), (i) async {
+ await Future.delayed(const Duration(milliseconds: 10));
+ if (i % 10 == 0) {
+ throw UnsupportedError('Multiples of 10 not supported');
+ }
+ return i.toString();
+ },
+ onError: (item, error, stack) =>
+ error is! UnsupportedError).toList();
+
+ expect(list, hasLength(90));
+ });
+ });
+ });
+
test("throw error when pool limit <= 0", () {
expect(() => Pool(-1), throwsArgumentError);
expect(() => Pool(0), throwsArgumentError);