Allow a `FutureOr` return in `scan` (dart-lang/stream_transform#64)

Closes dart-lang/stream_transform#62

Adds async support in `scan` instead of a new `asyncScan` since the
latter would be able to serve both use cases so it would not be clear
why both exist.

Technically this is breaking but it's such an extreme edge case that it
is very unlikely to come up in practice - and the breakage is detected
statically. Due to the way `FuturOr<Future>` interacts with inference
there are some cases where the generic types may need to be explicitly
written. This is an unlikely scenario where the transformer emitted a
stream of futures.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index e26b1f1..371889a 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,3 +1,10 @@
+## 0.0.16
+
+- Allow a `combine` callback to return a `FutureOr<T>` in `scan`. There are no
+  behavior changes for synchronous callbacks. **Potential breaking change** In
+  the unlikely situation where `scan` was used to produce a `Stream<Future>`
+  inference may now fail and require explicit generic type arguments.
+
 ## 0.0.15
 
 - Add `whereType`.
diff --git a/pkgs/stream_transform/lib/src/scan.dart b/pkgs/stream_transform/lib/src/scan.dart
index 0936e13..b2f76ef 100644
--- a/pkgs/stream_transform/lib/src/scan.dart
+++ b/pkgs/stream_transform/lib/src/scan.dart
@@ -6,9 +6,21 @@
 
 /// Scan is like fold, but instead of producing a single value it yields
 /// each intermediate accumulation.
+///
+/// If [combine] returns a Future it will not be called again for subsequent
+/// events from the source until it completes, therefor the combine callback is
+/// always called for elements in order, and the result stream always maintains
+/// the same order as the original.
 StreamTransformer<S, T> scan<S, T>(
-        T initialValue, T combine(T previousValue, S element)) =>
-    StreamTransformer.fromBind((stream) {
+        T initialValue, FutureOr<T> combine(T previousValue, S element)) =>
+    StreamTransformer.fromBind((source) {
       var accumulated = initialValue;
-      return stream.map((value) => accumulated = combine(accumulated, value));
+      return source.asyncMap((value) {
+        var result = combine(accumulated, value);
+        if (result is Future<T>) {
+          return result.then((r) => accumulated = r);
+        } else {
+          return accumulated = result as T;
+        }
+      });
     });
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index 722cecd..c0768e9 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -2,7 +2,7 @@
 description: A collection of utilities to transform and manipulate streams.
 author: Dart Team <misc@dartlang.org>
 homepage: https://www.github.com/dart-lang/stream_transform
-version: 0.0.15
+version: 0.0.16
 
 environment:
   sdk: ">=2.1.0 <3.0.0"
diff --git a/pkgs/stream_transform/test/scan_test.dart b/pkgs/stream_transform/test/scan_test.dart
index db95885..71472a6 100644
--- a/pkgs/stream_transform/test/scan_test.dart
+++ b/pkgs/stream_transform/test/scan_test.dart
@@ -12,18 +12,97 @@
   group('Scan', () {
     test('produces intermediate values', () async {
       var source = Stream.fromIterable([1, 2, 3, 4]);
-      var sum = (int x, int y) => x + y;
+      int sum(int x, int y) => x + y;
       var result = await source.transform(scan(0, sum)).toList();
 
       expect(result, [1, 3, 6, 10]);
     });
 
-    test('can create a broadcast stream', () async {
+    test('can create a broadcast stream', () {
       var source = StreamController.broadcast();
 
       var transformed = source.stream.transform(scan(null, null));
 
       expect(transformed.isBroadcast, true);
     });
+
+    test('forwards errors from source', () async {
+      var source = StreamController<int>();
+
+      int sum(int x, int y) => x + y;
+
+      var errors = [];
+
+      source.stream.transform((scan(0, sum))).listen(null, onError: errors.add);
+
+      source.addError(StateError('fail'));
+      await Future(() {});
+
+      expect(errors, [isStateError]);
+    });
+
+    group('with async combine', () {
+      test('returns a Stream of non-futures', () async {
+        var source = Stream.fromIterable([1, 2, 3, 4]);
+        Future<int> sum(int x, int y) async => x + y;
+        var result = await source.transform(scan(0, sum)).toList();
+
+        expect(result, [1, 3, 6, 10]);
+      });
+
+      test('can return a Stream of futures when specified', () async {
+        var source = Stream.fromIterable([1, 2]);
+        Future<int> sum(Future<int> x, int y) async => (await x) + y;
+        var result = await source
+            .transform(scan<int, Future<int>>(Future.value(0), sum))
+            .toList();
+
+        expect(result, [TypeMatcher<Future>(), TypeMatcher<Future>()]);
+        expect(await Future.wait(result), [1, 3]);
+      });
+
+      test('does not call for subsequent values while waiting', () async {
+        var source = StreamController<int>();
+
+        var calledWith = <int>[];
+        var block = Completer<void>();
+        Future<int> combine(int x, int y) async {
+          calledWith.add(y);
+          await block.future;
+          return x + y;
+        }
+
+        var results = <int>[];
+
+        source.stream.transform(scan(0, combine)).forEach(results.add);
+
+        source..add(1)..add(2);
+        await Future(() {});
+        expect(calledWith, [1]);
+        expect(results, isEmpty);
+
+        block.complete();
+        await Future(() {});
+        expect(calledWith, [1, 2]);
+        expect(results, [1, 3]);
+      });
+
+      test('forwards async errors', () async {
+        var source = StreamController<int>();
+
+        Future<int> combine(int x, int y) async => throw StateError('fail');
+
+        var errors = [];
+
+        source.stream
+            .transform((scan(0, combine)))
+            .listen(null, onError: errors.add);
+
+        source.add(1);
+        await Future(() {});
+
+        expect(errors, [isStateError]);
+      });
+    });
   });
 }