Forward errors thrown during `asyncWhere` (dart-lang/stream_transform#49)

- Add a try/catch and a test for the new behavior.
- Update the Doc comment with more details on the behavior.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index ada1e4b..2ad693c 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 0.0.14
+
+- `asyncWhere` will now forward exceptions thrown by the callback through the
+  result Stream.
+
 ## 0.0.13
 
 - `mergeAll` now accepts an `Iterable<Stream>` instead of only `List<Stream>`.
diff --git a/pkgs/stream_transform/lib/src/async_where.dart b/pkgs/stream_transform/lib/src/async_where.dart
index 6a4fc32..734f65f 100644
--- a/pkgs/stream_transform/lib/src/async_where.dart
+++ b/pkgs/stream_transform/lib/src/async_where.dart
@@ -6,13 +6,31 @@
 import 'from_handlers.dart';
 
 /// Like [Stream.where] but allows the [test] to return a [Future].
+///
+/// Events on the result stream will be emitted in the order that [test]
+/// completes which may not match the order of the original stream.
+///
+/// If the source stream is a broadcast stream the result will be as well. When
+/// used with a broadcast stream behavior also differs from [Stream.where] in
+/// that the [test] function is only called once per event, rather than once
+/// per listener per event.
+///
+/// Errors from the source stream are forwarded directly to the result stream.
+/// Errors during the conversion are also forwarded to the result stream.
+///
+/// The result stream will not close until the source stream closes and all
+/// pending [test] calls have finished.
 StreamTransformer<T, T> asyncWhere<T>(FutureOr<bool> test(T element)) {
   var valuesWaiting = 0;
   var sourceDone = false;
   return fromHandlers(handleData: (element, sink) {
     valuesWaiting++;
     () async {
-      if (await test(element)) sink.add(element);
+      try {
+        if (await test(element)) sink.add(element);
+      } catch (e, st) {
+        sink.addError(e, st);
+      }
       valuesWaiting--;
       if (valuesWaiting <= 0 && sourceDone) sink.close();
     }();
diff --git a/pkgs/stream_transform/test/async_where_test.dart b/pkgs/stream_transform/test/async_where_test.dart
index 4cf8af6..9c66506 100644
--- a/pkgs/stream_transform/test/async_where_test.dart
+++ b/pkgs/stream_transform/test/async_where_test.dart
@@ -66,4 +66,20 @@
     expect(firstDone, true);
     expect(secondDone, true);
   });
+
+  test('forwards errors emitted by the test callback', () async {
+    var errors = [];
+    var emitted = [];
+    var values = new Stream.fromIterable([1, 2, 3, 4]);
+    var filtered = values.transform(asyncWhere((e) async {
+      await new Future(() {});
+      if (e.isEven) throw new Exception('$e');
+      return true;
+    }));
+    var done = new Completer();
+    filtered.listen(emitted.add, onError: errors.add, onDone: done.complete);
+    await done.future;
+    expect(emitted, [1, 3]);
+    expect(errors.map((e) => '$e'), ['Exception: 2', 'Exception: 4']);
+  });
 }