Forward errors thrown during `asyncWhere` (dart-lang/stream_transform#49)
- Add a try/catch and a test for the new behavior.
- Update the Doc comment with more details on the behavior.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index ada1e4b..2ad693c 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 0.0.14
+
+- `asyncWhere` will now forward exceptions thrown by the callback through the
+ result Stream.
+
## 0.0.13
- `mergeAll` now accepts an `Iterable<Stream>` instead of only `List<Stream>`.
diff --git a/pkgs/stream_transform/lib/src/async_where.dart b/pkgs/stream_transform/lib/src/async_where.dart
index 6a4fc32..734f65f 100644
--- a/pkgs/stream_transform/lib/src/async_where.dart
+++ b/pkgs/stream_transform/lib/src/async_where.dart
@@ -6,13 +6,31 @@
import 'from_handlers.dart';
/// Like [Stream.where] but allows the [test] to return a [Future].
+///
+/// Events on the result stream will be emitted in the order that [test]
+/// completes which may not match the order of the original stream.
+///
+/// If the source stream is a broadcast stream the result will be as well. When
+/// used with a broadcast stream behavior also differs from [Stream.where] in
+/// that the [test] function is only called once per event, rather than once
+/// per listener per event.
+///
+/// Errors from the source stream are forwarded directly to the result stream.
+/// Errors during the conversion are also forwarded to the result stream.
+///
+/// The result stream will not close until the source stream closes and all
+/// pending [test] calls have finished.
StreamTransformer<T, T> asyncWhere<T>(FutureOr<bool> test(T element)) {
var valuesWaiting = 0;
var sourceDone = false;
return fromHandlers(handleData: (element, sink) {
valuesWaiting++;
() async {
- if (await test(element)) sink.add(element);
+ try {
+ if (await test(element)) sink.add(element);
+ } catch (e, st) {
+ sink.addError(e, st);
+ }
valuesWaiting--;
if (valuesWaiting <= 0 && sourceDone) sink.close();
}();
diff --git a/pkgs/stream_transform/test/async_where_test.dart b/pkgs/stream_transform/test/async_where_test.dart
index 4cf8af6..9c66506 100644
--- a/pkgs/stream_transform/test/async_where_test.dart
+++ b/pkgs/stream_transform/test/async_where_test.dart
@@ -66,4 +66,20 @@
expect(firstDone, true);
expect(secondDone, true);
});
+
+ test('forwards errors emitted by the test callback', () async {
+ var errors = [];
+ var emitted = [];
+ var values = new Stream.fromIterable([1, 2, 3, 4]);
+ var filtered = values.transform(asyncWhere((e) async {
+ await new Future(() {});
+ if (e.isEven) throw new Exception('$e');
+ return true;
+ }));
+ var done = new Completer();
+ filtered.listen(emitted.add, onError: errors.add, onDone: done.complete);
+ await done.future;
+ expect(emitted, [1, 3]);
+ expect(errors.map((e) => '$e'), ['Exception: 2', 'Exception: 4']);
+ });
}