Add asyncMapBuffer (dart-lang/stream_transform#39) In one use case for `buffer` the `trigger` depended on the output from the resulting stream so the handling was awkward and used a manual StreamController. The intention was to avoid re-entry into a block of work, and asyncMapBuffer lets us express that intention directly.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md index 9c3e057..056eebd 100644 --- a/pkgs/stream_transform/CHANGELOG.md +++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,3 +1,7 @@ +## 0.0.9 + +- Add `asyncMapBuffer`. + ## 0.0.8 - Add `takeUntil`.
diff --git a/pkgs/stream_transform/README.md b/pkgs/stream_transform/README.md index 261972e..b504e6c 100644 --- a/pkgs/stream_transform/README.md +++ b/pkgs/stream_transform/README.md
@@ -1,5 +1,10 @@ Utility methods to create `StreamTransfomer` instances to manipulate Streams. +# asyncMapBuffer + +Like `asyncMap` but events are buffered until previous events have been +processed. + # asyncWhere Like `where` but allows an asynchronous predicate.
diff --git a/pkgs/stream_transform/lib/src/async_map_buffer.dart b/pkgs/stream_transform/lib/src/async_map_buffer.dart new file mode 100644 index 0000000..315e77a --- /dev/null +++ b/pkgs/stream_transform/lib/src/async_map_buffer.dart
@@ -0,0 +1,55 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'bind.dart'; +import 'buffer.dart'; +import 'from_handlers.dart'; + +/// Like [Stream.asyncMap] but events are buffered until previous events have +/// been processed by [convert]. +/// +/// If the source stream is a broadcast stream the result will be as well. When +/// used with a broadcast stream behavior also differs from [Stream.asyncMap] in +/// that the [convert] function is only called once per event, rather than once +/// per listener per event. +/// +/// The first event from the source stream is always passed to [convert] as a +/// List with a single element. After that events are buffered until the +/// previous Future returned from [convert] has fired. +/// +/// Errors from the source stream are forwarded directly to the result stream. +/// Errors during the conversion are also forwarded to the result stream and are +/// considered completing work so the next values are let through. +/// +/// The result stream will not close until the source stream closes and all +/// pending conversions have finished. +StreamTransformer<S, T> asyncMapBuffer<S, T>( + Future<T> convert(List<S> collected)) { + var workFinished = new StreamController<Null>(); + // Let the first event through. + workFinished.add(null); + return fromBind((values) => values + .transform(buffer(workFinished.stream)) + .transform(_asyncMapThen(convert, workFinished.add))); +} + +/// Like [Stream.asyncMap] but the [convert] is only called once per event, +/// rather than once per listener, and [then] is called after completing the +/// work. +StreamTransformer<S, T> _asyncMapThen<S, T>( + Future<T> convert(S event), void then(_)) { + Future pendingEvent; + return fromHandlers(handleData: (event, sink) { + pendingEvent = + convert(event).then(sink.add).catchError(sink.addError).then(then); + }, handleDone: (sink) { + if (pendingEvent != null) { + pendingEvent.then((_) => sink.close()); + } else { + sink.close(); + } + }); +}
diff --git a/pkgs/stream_transform/lib/stream_transform.dart b/pkgs/stream_transform/lib/stream_transform.dart index 55cefa3..dced908 100644 --- a/pkgs/stream_transform/lib/stream_transform.dart +++ b/pkgs/stream_transform/lib/stream_transform.dart
@@ -2,6 +2,7 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +export 'src/async_map_buffer.dart'; export 'src/async_where.dart'; export 'src/audit.dart'; export 'src/buffer.dart';
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml index 1dd2f27..c9bdfac 100644 --- a/pkgs/stream_transform/pubspec.yaml +++ b/pkgs/stream_transform/pubspec.yaml
@@ -2,7 +2,7 @@ description: A collection of utilities to transform and manipulate streams. author: Dart Team <misc@dartlang.org> homepage: https://www.github.com/dart-lang/stream_transform -version: 0.0.8 +version: 0.0.9 environment: sdk: ">=1.22.0 <2.0.0"
diff --git a/pkgs/stream_transform/test/async_map_buffer_test.dart b/pkgs/stream_transform/test/async_map_buffer_test.dart new file mode 100644 index 0000000..f8b73a7 --- /dev/null +++ b/pkgs/stream_transform/test/async_map_buffer_test.dart
@@ -0,0 +1,207 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:test/test.dart'; + +import 'package:stream_transform/stream_transform.dart'; + +void main() { + var streamTypes = { + 'single subscription': () => new StreamController(), + 'broadcast': () => new StreamController.broadcast() + }; + StreamController values; + List emittedValues; + bool valuesCanceled; + bool isDone; + List errors; + Stream transformed; + StreamSubscription subscription; + + Completer finishWork; + List workArgument; + + /// Represents the async `convert` function and asserts that is is only called + /// after the previous iteration has completed. + Future work(List values) { + expect(finishWork, isNull, + reason: 'See $values befor previous work is complete'); + workArgument = values; + finishWork = new Completer(); + finishWork.future.then((_) { + workArgument = null; + finishWork = null; + }).catchError((_) { + workArgument = null; + finishWork = null; + }); + return finishWork.future; + } + + for (var streamType in streamTypes.keys) { + group('asyncMapBuffer for stream type: [$streamType]', () { + setUp(() { + valuesCanceled = false; + values = streamTypes[streamType]() + ..onCancel = () { + valuesCanceled = true; + }; + emittedValues = []; + errors = []; + isDone = false; + finishWork = null; + workArgument = null; + transformed = values.stream.transform(asyncMapBuffer(work)); + subscription = transformed + .listen(emittedValues.add, onError: errors.add, onDone: () { + isDone = true; + }); + }); + + test('does not emit before work finishes', () async { + values.add(1); + await new Future(() {}); + expect(emittedValues, isEmpty); + expect(workArgument, [1]); + finishWork.complete(workArgument); + await new Future(() {}); + expect(emittedValues, [ + [1] + ]); + }); + + test('buffers values while work is ongoing', () async { + values.add(1); + await new Future(() {}); + values.add(2); + values.add(3); + await new Future(() {}); + finishWork.complete(); + await new Future(() {}); + expect(workArgument, [2, 3]); + }); + + test('forwards errors without waiting for work', () async { + values.add(1); + await new Future(() {}); + values.addError('error'); + await new Future(() {}); + expect(errors, ['error']); + }); + + test('forwards errors which occur during the work', () async { + values.add(1); + await new Future(() {}); + finishWork.completeError('error'); + await new Future(() {}); + expect(errors, ['error']); + }); + + test('can continue handling events after an error', () async { + values.add(1); + await new Future(() {}); + finishWork.completeError('error'); + values.add(2); + await new Future(() {}); + expect(workArgument, [2]); + finishWork.completeError('another'); + await new Future(() {}); + expect(errors, ['error', 'another']); + }); + + test('does not start next work early due to an error in values', + () async { + values.add(1); + await new Future(() {}); + values.addError('error'); + values.add(2); + await new Future(() {}); + expect(errors, ['error']); + // [work] will assert that the second iteration is not called because + // the first has not completed. + }); + + test('cancels value subscription when output canceled', () async { + expect(valuesCanceled, false); + await subscription.cancel(); + expect(valuesCanceled, true); + }); + + test('closes when values end if no work is pending', () async { + expect(isDone, false); + await values.close(); + await new Future(() {}); + expect(isDone, true); + }); + + test('waits for pending work when values close', () async { + values.add(1); + await new Future(() {}); + expect(isDone, false); + values.add(2); + await values.close(); + expect(isDone, false); + finishWork.complete(null); + await new Future(() {}); + // Still a pending value + expect(isDone, false); + finishWork.complete(null); + await new Future(() {}); + expect(isDone, true); + }); + + test('forwards errors from values', () async { + values.addError('error'); + await new Future(() {}); + expect(errors, ['error']); + }); + + if (streamType == 'broadcast') { + test('multiple listeners all get values', () async { + var otherValues = []; + transformed.listen(otherValues.add); + values.add(1); + await new Future(() {}); + finishWork.complete('result'); + await new Future(() {}); + expect(emittedValues, ['result']); + expect(otherValues, ['result']); + }); + + test('multiple listeners get done when values end', () async { + var otherDone = false; + transformed.listen(null, onDone: () => otherDone = true); + values.add(1); + await new Future(() {}); + await values.close(); + expect(isDone, false); + expect(otherDone, false); + finishWork.complete(); + await new Future(() {}); + expect(isDone, true); + expect(otherDone, true); + }); + + test('can cancel and relisten', () async { + values.add(1); + await new Future(() {}); + finishWork.complete('first'); + await new Future(() {}); + await subscription.cancel(); + values.add(2); + await new Future(() {}); + subscription = transformed.listen(emittedValues.add); + values.add(3); + await new Future(() {}); + expect(workArgument, [3]); + finishWork.complete('second'); + await new Future(() {}); + expect(emittedValues, ['first', 'second']); + }); + } + }); + } +}