Add asyncMapBuffer (dart-lang/stream_transform#39)
In one use case for `buffer` the `trigger` depended on the output from
the resulting stream so the handling was awkward and used a manual
StreamController. The intention was to avoid re-entry into a block of
work, and asyncMapBuffer lets us express that intention directly.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 9c3e057..056eebd 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 0.0.9
+
+- Add `asyncMapBuffer`.
+
## 0.0.8
- Add `takeUntil`.
diff --git a/pkgs/stream_transform/README.md b/pkgs/stream_transform/README.md
index 261972e..b504e6c 100644
--- a/pkgs/stream_transform/README.md
+++ b/pkgs/stream_transform/README.md
@@ -1,5 +1,10 @@
Utility methods to create `StreamTransfomer` instances to manipulate Streams.
+# asyncMapBuffer
+
+Like `asyncMap` but events are buffered until previous events have been
+processed.
+
# asyncWhere
Like `where` but allows an asynchronous predicate.
diff --git a/pkgs/stream_transform/lib/src/async_map_buffer.dart b/pkgs/stream_transform/lib/src/async_map_buffer.dart
new file mode 100644
index 0000000..315e77a
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/async_map_buffer.dart
@@ -0,0 +1,55 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'bind.dart';
+import 'buffer.dart';
+import 'from_handlers.dart';
+
+/// Like [Stream.asyncMap] but events are buffered until previous events have
+/// been processed by [convert].
+///
+/// If the source stream is a broadcast stream the result will be as well. When
+/// used with a broadcast stream behavior also differs from [Stream.asyncMap] in
+/// that the [convert] function is only called once per event, rather than once
+/// per listener per event.
+///
+/// The first event from the source stream is always passed to [convert] as a
+/// List with a single element. After that events are buffered until the
+/// previous Future returned from [convert] has fired.
+///
+/// Errors from the source stream are forwarded directly to the result stream.
+/// Errors during the conversion are also forwarded to the result stream and are
+/// considered completing work so the next values are let through.
+///
+/// The result stream will not close until the source stream closes and all
+/// pending conversions have finished.
+StreamTransformer<S, T> asyncMapBuffer<S, T>(
+ Future<T> convert(List<S> collected)) {
+ var workFinished = new StreamController<Null>();
+ // Let the first event through.
+ workFinished.add(null);
+ return fromBind((values) => values
+ .transform(buffer(workFinished.stream))
+ .transform(_asyncMapThen(convert, workFinished.add)));
+}
+
+/// Like [Stream.asyncMap] but the [convert] is only called once per event,
+/// rather than once per listener, and [then] is called after completing the
+/// work.
+StreamTransformer<S, T> _asyncMapThen<S, T>(
+ Future<T> convert(S event), void then(_)) {
+ Future pendingEvent;
+ return fromHandlers(handleData: (event, sink) {
+ pendingEvent =
+ convert(event).then(sink.add).catchError(sink.addError).then(then);
+ }, handleDone: (sink) {
+ if (pendingEvent != null) {
+ pendingEvent.then((_) => sink.close());
+ } else {
+ sink.close();
+ }
+ });
+}
diff --git a/pkgs/stream_transform/lib/stream_transform.dart b/pkgs/stream_transform/lib/stream_transform.dart
index 55cefa3..dced908 100644
--- a/pkgs/stream_transform/lib/stream_transform.dart
+++ b/pkgs/stream_transform/lib/stream_transform.dart
@@ -2,6 +2,7 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
+export 'src/async_map_buffer.dart';
export 'src/async_where.dart';
export 'src/audit.dart';
export 'src/buffer.dart';
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index 1dd2f27..c9bdfac 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -2,7 +2,7 @@
description: A collection of utilities to transform and manipulate streams.
author: Dart Team <misc@dartlang.org>
homepage: https://www.github.com/dart-lang/stream_transform
-version: 0.0.8
+version: 0.0.9
environment:
sdk: ">=1.22.0 <2.0.0"
diff --git a/pkgs/stream_transform/test/async_map_buffer_test.dart b/pkgs/stream_transform/test/async_map_buffer_test.dart
new file mode 100644
index 0000000..f8b73a7
--- /dev/null
+++ b/pkgs/stream_transform/test/async_map_buffer_test.dart
@@ -0,0 +1,207 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:test/test.dart';
+
+import 'package:stream_transform/stream_transform.dart';
+
+void main() {
+ var streamTypes = {
+ 'single subscription': () => new StreamController(),
+ 'broadcast': () => new StreamController.broadcast()
+ };
+ StreamController values;
+ List emittedValues;
+ bool valuesCanceled;
+ bool isDone;
+ List errors;
+ Stream transformed;
+ StreamSubscription subscription;
+
+ Completer finishWork;
+ List workArgument;
+
+ /// Represents the async `convert` function and asserts that is is only called
+ /// after the previous iteration has completed.
+ Future work(List values) {
+ expect(finishWork, isNull,
+ reason: 'See $values befor previous work is complete');
+ workArgument = values;
+ finishWork = new Completer();
+ finishWork.future.then((_) {
+ workArgument = null;
+ finishWork = null;
+ }).catchError((_) {
+ workArgument = null;
+ finishWork = null;
+ });
+ return finishWork.future;
+ }
+
+ for (var streamType in streamTypes.keys) {
+ group('asyncMapBuffer for stream type: [$streamType]', () {
+ setUp(() {
+ valuesCanceled = false;
+ values = streamTypes[streamType]()
+ ..onCancel = () {
+ valuesCanceled = true;
+ };
+ emittedValues = [];
+ errors = [];
+ isDone = false;
+ finishWork = null;
+ workArgument = null;
+ transformed = values.stream.transform(asyncMapBuffer(work));
+ subscription = transformed
+ .listen(emittedValues.add, onError: errors.add, onDone: () {
+ isDone = true;
+ });
+ });
+
+ test('does not emit before work finishes', () async {
+ values.add(1);
+ await new Future(() {});
+ expect(emittedValues, isEmpty);
+ expect(workArgument, [1]);
+ finishWork.complete(workArgument);
+ await new Future(() {});
+ expect(emittedValues, [
+ [1]
+ ]);
+ });
+
+ test('buffers values while work is ongoing', () async {
+ values.add(1);
+ await new Future(() {});
+ values.add(2);
+ values.add(3);
+ await new Future(() {});
+ finishWork.complete();
+ await new Future(() {});
+ expect(workArgument, [2, 3]);
+ });
+
+ test('forwards errors without waiting for work', () async {
+ values.add(1);
+ await new Future(() {});
+ values.addError('error');
+ await new Future(() {});
+ expect(errors, ['error']);
+ });
+
+ test('forwards errors which occur during the work', () async {
+ values.add(1);
+ await new Future(() {});
+ finishWork.completeError('error');
+ await new Future(() {});
+ expect(errors, ['error']);
+ });
+
+ test('can continue handling events after an error', () async {
+ values.add(1);
+ await new Future(() {});
+ finishWork.completeError('error');
+ values.add(2);
+ await new Future(() {});
+ expect(workArgument, [2]);
+ finishWork.completeError('another');
+ await new Future(() {});
+ expect(errors, ['error', 'another']);
+ });
+
+ test('does not start next work early due to an error in values',
+ () async {
+ values.add(1);
+ await new Future(() {});
+ values.addError('error');
+ values.add(2);
+ await new Future(() {});
+ expect(errors, ['error']);
+ // [work] will assert that the second iteration is not called because
+ // the first has not completed.
+ });
+
+ test('cancels value subscription when output canceled', () async {
+ expect(valuesCanceled, false);
+ await subscription.cancel();
+ expect(valuesCanceled, true);
+ });
+
+ test('closes when values end if no work is pending', () async {
+ expect(isDone, false);
+ await values.close();
+ await new Future(() {});
+ expect(isDone, true);
+ });
+
+ test('waits for pending work when values close', () async {
+ values.add(1);
+ await new Future(() {});
+ expect(isDone, false);
+ values.add(2);
+ await values.close();
+ expect(isDone, false);
+ finishWork.complete(null);
+ await new Future(() {});
+ // Still a pending value
+ expect(isDone, false);
+ finishWork.complete(null);
+ await new Future(() {});
+ expect(isDone, true);
+ });
+
+ test('forwards errors from values', () async {
+ values.addError('error');
+ await new Future(() {});
+ expect(errors, ['error']);
+ });
+
+ if (streamType == 'broadcast') {
+ test('multiple listeners all get values', () async {
+ var otherValues = [];
+ transformed.listen(otherValues.add);
+ values.add(1);
+ await new Future(() {});
+ finishWork.complete('result');
+ await new Future(() {});
+ expect(emittedValues, ['result']);
+ expect(otherValues, ['result']);
+ });
+
+ test('multiple listeners get done when values end', () async {
+ var otherDone = false;
+ transformed.listen(null, onDone: () => otherDone = true);
+ values.add(1);
+ await new Future(() {});
+ await values.close();
+ expect(isDone, false);
+ expect(otherDone, false);
+ finishWork.complete();
+ await new Future(() {});
+ expect(isDone, true);
+ expect(otherDone, true);
+ });
+
+ test('can cancel and relisten', () async {
+ values.add(1);
+ await new Future(() {});
+ finishWork.complete('first');
+ await new Future(() {});
+ await subscription.cancel();
+ values.add(2);
+ await new Future(() {});
+ subscription = transformed.listen(emittedValues.add);
+ values.add(3);
+ await new Future(() {});
+ expect(workArgument, [3]);
+ finishWork.complete('second');
+ await new Future(() {});
+ expect(emittedValues, ['first', 'second']);
+ });
+ }
+ });
+ }
+}