Add asyncMapBuffer (dart-lang/stream_transform#39)

In one use case for `buffer` the `trigger` depended on the output from
the resulting stream so the handling was awkward and used a manual
StreamController. The intention was to avoid re-entry into a block of
work, and asyncMapBuffer lets us express that intention directly.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 9c3e057..056eebd 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 0.0.9
+
+- Add `asyncMapBuffer`.
+
 ## 0.0.8
 
 - Add `takeUntil`.
diff --git a/pkgs/stream_transform/README.md b/pkgs/stream_transform/README.md
index 261972e..b504e6c 100644
--- a/pkgs/stream_transform/README.md
+++ b/pkgs/stream_transform/README.md
@@ -1,5 +1,10 @@
 Utility methods to create `StreamTransfomer` instances to manipulate Streams.
 
+# asyncMapBuffer
+
+Like `asyncMap` but events are buffered until previous events have been
+processed.
+
 # asyncWhere
 
 Like `where` but allows an asynchronous predicate.
diff --git a/pkgs/stream_transform/lib/src/async_map_buffer.dart b/pkgs/stream_transform/lib/src/async_map_buffer.dart
new file mode 100644
index 0000000..315e77a
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/async_map_buffer.dart
@@ -0,0 +1,55 @@
+// Copyright (c) 2017, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'bind.dart';
+import 'buffer.dart';
+import 'from_handlers.dart';
+
+/// Like [Stream.asyncMap] but events are buffered until previous events have
+/// been processed by [convert].
+///
+/// If the source stream is a broadcast stream the result will be as well. When
+/// used with a broadcast stream behavior also differs from [Stream.asyncMap] in
+/// that the [convert] function is only called once per event, rather than once
+/// per listener per event.
+///
+/// The first event from the source stream is always passed to [convert] as a
+/// List with a single element. After that events are buffered until the
+/// previous Future returned from [convert] has fired.
+///
+/// Errors from the source stream are forwarded directly to the result stream.
+/// Errors during the conversion are also forwarded to the result stream and are
+/// considered completing work so the next values are let through.
+///
+/// The result stream will not close until the source stream closes and all
+/// pending conversions have finished.
+StreamTransformer<S, T> asyncMapBuffer<S, T>(
+    Future<T> convert(List<S> collected)) {
+  var workFinished = new StreamController<Null>();
+  // Let the first event through.
+  workFinished.add(null);
+  return fromBind((values) => values
+      .transform(buffer(workFinished.stream))
+      .transform(_asyncMapThen(convert, workFinished.add)));
+}
+
+/// Like [Stream.asyncMap] but the [convert] is only called once per event,
+/// rather than once per listener, and [then] is called after completing the
+/// work.
+StreamTransformer<S, T> _asyncMapThen<S, T>(
+    Future<T> convert(S event), void then(_)) {
+  Future pendingEvent;
+  return fromHandlers(handleData: (event, sink) {
+    pendingEvent =
+        convert(event).then(sink.add).catchError(sink.addError).then(then);
+  }, handleDone: (sink) {
+    if (pendingEvent != null) {
+      pendingEvent.then((_) => sink.close());
+    } else {
+      sink.close();
+    }
+  });
+}
diff --git a/pkgs/stream_transform/lib/stream_transform.dart b/pkgs/stream_transform/lib/stream_transform.dart
index 55cefa3..dced908 100644
--- a/pkgs/stream_transform/lib/stream_transform.dart
+++ b/pkgs/stream_transform/lib/stream_transform.dart
@@ -2,6 +2,7 @@
 // for details. All rights reserved. Use of this source code is governed by a
 // BSD-style license that can be found in the LICENSE file.
 
+export 'src/async_map_buffer.dart';
 export 'src/async_where.dart';
 export 'src/audit.dart';
 export 'src/buffer.dart';
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index 1dd2f27..c9bdfac 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -2,7 +2,7 @@
 description: A collection of utilities to transform and manipulate streams.
 author: Dart Team <misc@dartlang.org>
 homepage: https://www.github.com/dart-lang/stream_transform
-version: 0.0.8
+version: 0.0.9
 
 environment:
   sdk: ">=1.22.0 <2.0.0"
diff --git a/pkgs/stream_transform/test/async_map_buffer_test.dart b/pkgs/stream_transform/test/async_map_buffer_test.dart
new file mode 100644
index 0000000..f8b73a7
--- /dev/null
+++ b/pkgs/stream_transform/test/async_map_buffer_test.dart
@@ -0,0 +1,207 @@
+// Copyright (c) 2017, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:test/test.dart';
+
+import 'package:stream_transform/stream_transform.dart';
+
+void main() {
+  var streamTypes = {
+    'single subscription': () => new StreamController(),
+    'broadcast': () => new StreamController.broadcast()
+  };
+  StreamController values;
+  List emittedValues;
+  bool valuesCanceled;
+  bool isDone;
+  List errors;
+  Stream transformed;
+  StreamSubscription subscription;
+
+  Completer finishWork;
+  List workArgument;
+
+  /// Represents the async `convert` function and asserts that is is only called
+  /// after the previous iteration has completed.
+  Future work(List values) {
+    expect(finishWork, isNull,
+        reason: 'See $values befor previous work is complete');
+    workArgument = values;
+    finishWork = new Completer();
+    finishWork.future.then((_) {
+      workArgument = null;
+      finishWork = null;
+    }).catchError((_) {
+      workArgument = null;
+      finishWork = null;
+    });
+    return finishWork.future;
+  }
+
+  for (var streamType in streamTypes.keys) {
+    group('asyncMapBuffer for stream type: [$streamType]', () {
+      setUp(() {
+        valuesCanceled = false;
+        values = streamTypes[streamType]()
+          ..onCancel = () {
+            valuesCanceled = true;
+          };
+        emittedValues = [];
+        errors = [];
+        isDone = false;
+        finishWork = null;
+        workArgument = null;
+        transformed = values.stream.transform(asyncMapBuffer(work));
+        subscription = transformed
+            .listen(emittedValues.add, onError: errors.add, onDone: () {
+          isDone = true;
+        });
+      });
+
+      test('does not emit before work finishes', () async {
+        values.add(1);
+        await new Future(() {});
+        expect(emittedValues, isEmpty);
+        expect(workArgument, [1]);
+        finishWork.complete(workArgument);
+        await new Future(() {});
+        expect(emittedValues, [
+          [1]
+        ]);
+      });
+
+      test('buffers values while work is ongoing', () async {
+        values.add(1);
+        await new Future(() {});
+        values.add(2);
+        values.add(3);
+        await new Future(() {});
+        finishWork.complete();
+        await new Future(() {});
+        expect(workArgument, [2, 3]);
+      });
+
+      test('forwards errors without waiting for work', () async {
+        values.add(1);
+        await new Future(() {});
+        values.addError('error');
+        await new Future(() {});
+        expect(errors, ['error']);
+      });
+
+      test('forwards errors which occur during the work', () async {
+        values.add(1);
+        await new Future(() {});
+        finishWork.completeError('error');
+        await new Future(() {});
+        expect(errors, ['error']);
+      });
+
+      test('can continue handling events after an error', () async {
+        values.add(1);
+        await new Future(() {});
+        finishWork.completeError('error');
+        values.add(2);
+        await new Future(() {});
+        expect(workArgument, [2]);
+        finishWork.completeError('another');
+        await new Future(() {});
+        expect(errors, ['error', 'another']);
+      });
+
+      test('does not start next work early due to an error in values',
+          () async {
+        values.add(1);
+        await new Future(() {});
+        values.addError('error');
+        values.add(2);
+        await new Future(() {});
+        expect(errors, ['error']);
+        // [work] will assert that the second iteration is not called because
+        // the first has not completed.
+      });
+
+      test('cancels value subscription when output canceled', () async {
+        expect(valuesCanceled, false);
+        await subscription.cancel();
+        expect(valuesCanceled, true);
+      });
+
+      test('closes when values end if no work is pending', () async {
+        expect(isDone, false);
+        await values.close();
+        await new Future(() {});
+        expect(isDone, true);
+      });
+
+      test('waits for pending work when values close', () async {
+        values.add(1);
+        await new Future(() {});
+        expect(isDone, false);
+        values.add(2);
+        await values.close();
+        expect(isDone, false);
+        finishWork.complete(null);
+        await new Future(() {});
+        // Still a pending value
+        expect(isDone, false);
+        finishWork.complete(null);
+        await new Future(() {});
+        expect(isDone, true);
+      });
+
+      test('forwards errors from values', () async {
+        values.addError('error');
+        await new Future(() {});
+        expect(errors, ['error']);
+      });
+
+      if (streamType == 'broadcast') {
+        test('multiple listeners all get values', () async {
+          var otherValues = [];
+          transformed.listen(otherValues.add);
+          values.add(1);
+          await new Future(() {});
+          finishWork.complete('result');
+          await new Future(() {});
+          expect(emittedValues, ['result']);
+          expect(otherValues, ['result']);
+        });
+
+        test('multiple listeners get done when values end', () async {
+          var otherDone = false;
+          transformed.listen(null, onDone: () => otherDone = true);
+          values.add(1);
+          await new Future(() {});
+          await values.close();
+          expect(isDone, false);
+          expect(otherDone, false);
+          finishWork.complete();
+          await new Future(() {});
+          expect(isDone, true);
+          expect(otherDone, true);
+        });
+
+        test('can cancel and relisten', () async {
+          values.add(1);
+          await new Future(() {});
+          finishWork.complete('first');
+          await new Future(() {});
+          await subscription.cancel();
+          values.add(2);
+          await new Future(() {});
+          subscription = transformed.listen(emittedValues.add);
+          values.add(3);
+          await new Future(() {});
+          expect(workArgument, [3]);
+          finishWork.complete('second');
+          await new Future(() {});
+          expect(emittedValues, ['first', 'second']);
+        });
+      }
+    });
+  }
+}