Add `combineLatestAll` (dart-lang/stream_transform#67)
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md index 1a2ce66..d50c119 100644 --- a/pkgs/stream_transform/CHANGELOG.md +++ b/pkgs/stream_transform/CHANGELOG.md
@@ -5,6 +5,7 @@ the unlikely situation where `scan` was used to produce a `Stream<Future>` inference may now fail and require explicit generic type arguments. - Add `combineLatest`. +- Add `combineLatestAll`. ## 0.0.15
diff --git a/pkgs/stream_transform/README.md b/pkgs/stream_transform/README.md index 7e556cb..5d24caf 100644 --- a/pkgs/stream_transform/README.md +++ b/pkgs/stream_transform/README.md
@@ -24,6 +24,11 @@ Combine the most recent event from two streams through a callback and emit the result. +# combineLatestAll + +Combines the latest events emitted from multiple source streams and yields a +list of the values. + # debounce, debounceBuffer Prevents a source stream from emitting too frequently by dropping or collecting
diff --git a/pkgs/stream_transform/lib/src/combine_latest_all.dart b/pkgs/stream_transform/lib/src/combine_latest_all.dart new file mode 100644 index 0000000..8141a3f --- /dev/null +++ b/pkgs/stream_transform/lib/src/combine_latest_all.dart
@@ -0,0 +1,110 @@ +// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +/// Combine the latest value emitted from the source stream with the latest +/// values emitted from [others]. +/// +/// [combineLatestAll] subscribes to the source stream and [others] and when +/// any one of the streams emits, the result stream will emit a [List<T>] of +/// the latest values emitted from all streams. +/// +/// The result stream will not emit until all source streams emit at least +/// once. If a source stream emits multiple values before another starts +/// emitting, all but the last value will be lost. +/// +/// The result stream will not close until all source streams have closed. When +/// a source stream closes, the result stream will continue to emit the last +/// value from the closed stream when the other source streams emit until the +/// result stream has closed. If a source stream closes without emitting any +/// value, the result stream will close as well. +/// +/// Errors thrown by any source stream will be forwarded to the result stream. +/// +/// If the source stream is a broadcast stream, the result stream will be as +/// well, regardless of the types of [others]. If a single subscription stream +/// is combined with a broadcast source stream, it may never be canceled. +/// +/// ## Example +/// +/// (Suppose first, second, and third are Stream<String>) +/// final combined = first +/// .transform(combineLatestAll([second, third])) +/// .map((data) => data.join()); +/// +/// first: a----b------------------c--------d---| +/// second: --1---------2-----------------| +/// third: -------&----------%---| +/// combined: -------b1&--b2&---b2%---c2%------d2%-| +/// +StreamTransformer<T, List<T>> combineLatestAll<T>(Iterable<Stream<T>> others) => + _CombineLatestAll<T>(others); + +class _CombineLatestAll<T> extends StreamTransformerBase<T, List<T>> { + final Iterable<Stream<T>> _others; + + _CombineLatestAll(this._others); + + @override + Stream<List<T>> bind(Stream<T> source) { + final controller = source.isBroadcast + ? StreamController<List<T>>.broadcast(sync: true) + : StreamController<List<T>>(sync: true); + + var allStreams = [source]..addAll(_others); + if (source.isBroadcast) { + allStreams = allStreams + .map((s) => s.isBroadcast ? s : s.asBroadcastStream()) + .toList(); + } + + List<StreamSubscription> subscriptions; + + controller.onListen = () { + if (subscriptions != null) return; + + final latestData = List<T>(allStreams.length); + final hasEmitted = <int>{}; + void handleData(int index, T data) { + latestData[index] = data; + hasEmitted.add(index); + if (hasEmitted.length == allStreams.length) { + controller.add(List.from(latestData)); + } + } + + var activeStreamCount = 0; + subscriptions = allStreams.map((stream) { + final index = activeStreamCount; + activeStreamCount++; + return stream.listen((data) => handleData(index, data), + onError: controller.addError, onDone: () { + if (--activeStreamCount <= 0 || !hasEmitted.contains(index)) { + controller.close(); + } + }); + }).toList(); + if (!source.isBroadcast) { + controller.onPause = () { + for (var subscription in subscriptions) { + subscription.pause(); + } + }; + controller.onResume = () { + for (var subscription in subscriptions) { + subscription.resume(); + } + }; + } + controller.onCancel = () { + final toCancel = subscriptions; + subscriptions = null; + if (activeStreamCount <= 0) return null; + return Future.wait(toCancel.map((s) => s.cancel())); + }; + }; + return controller.stream; + } +}
diff --git a/pkgs/stream_transform/lib/stream_transform.dart b/pkgs/stream_transform/lib/stream_transform.dart index 557fc82..b8b15f4 100644 --- a/pkgs/stream_transform/lib/stream_transform.dart +++ b/pkgs/stream_transform/lib/stream_transform.dart
@@ -8,6 +8,7 @@ export 'src/buffer.dart'; export 'src/chain_transformers.dart'; export 'src/combine_latest.dart'; +export 'src/combine_latest_all.dart'; export 'src/concat.dart'; export 'src/concurrent_async_map.dart'; export 'src/debounce.dart';
diff --git a/pkgs/stream_transform/test/combine_latest_all_test.dart b/pkgs/stream_transform/test/combine_latest_all_test.dart new file mode 100644 index 0000000..9cf3a91 --- /dev/null +++ b/pkgs/stream_transform/test/combine_latest_all_test.dart
@@ -0,0 +1,166 @@ +import 'dart:async'; +import 'package:test/test.dart'; +import 'package:stream_transform/stream_transform.dart'; + +Future<void> tick() => Future(() {}); + +void main() { + group('combineLatestAll', () { + test('emits latest values', () async { + final first = StreamController<String>(); + final second = StreamController<String>(); + final third = StreamController<String>(); + final combined = first.stream + .transform(combineLatestAll([second.stream, third.stream])) + .map((data) => data.join()); + + // first: a----b------------------c--------d---| + // second: --1---------2-----------------| + // third: -------&----------%---| + // combined: -------b1&--b2&---b2%---c2%------d2%-| + + expect(combined, + emitsInOrder(['b1&', 'b2&', 'b2%', 'c2%', 'd2%', emitsDone])); + + first.add('a'); + await tick(); + second.add('1'); + await tick(); + first.add('b'); + await tick(); + third.add('&'); + await tick(); + second.add('2'); + await tick(); + third.add('%'); + await tick(); + await third.close(); + await tick(); + first.add('c'); + await tick(); + await second.close(); + await tick(); + first.add('d'); + await tick(); + await first.close(); + }); + + test('ends if a Stream closes without ever emitting a value', () async { + final first = StreamController<String>(); + final second = StreamController<String>(); + final combined = + first.stream.transform(combineLatestAll([second.stream])); + + // first: -a------b-------| + // second: -----| + // combined: -----| + + expect(combined, emits(emitsDone)); + + first.add('a'); + await tick(); + await second.close(); + await tick(); + first.add('b'); + }); + + test('forwards errors', () async { + final first = StreamController<String>(); + final second = StreamController<String>(); + final combined = first.stream + .transform(combineLatestAll([second.stream])) + .map((data) => data.join()); + + // first: -a---------| + // second: ----1---# + // combined: ----a1--# + + expect(combined, emitsThrough(emitsError('doh'))); + + first.add('a'); + await tick(); + second.add('1'); + await tick(); + second.addError('doh'); + }); + + test('ends after both streams have ended', () async { + final first = StreamController<String>(); + final second = StreamController<String>(); + + var done = false; + first.stream + .transform(combineLatestAll([second.stream])) + .listen(null, onDone: () => done = true); + + // first: -a---| + // second: --------1--| + // combined: --------a1-| + + first.add('a'); + await tick(); + await first.close(); + await tick(); + + expect(done, isFalse); + + second.add('1'); + await tick(); + await second.close(); + await tick(); + + expect(done, isTrue); + }); + + group('broadcast source', () { + test('can cancel and relisten to broadcast stream', () async { + final first = StreamController<String>.broadcast(); + final second = StreamController<String>.broadcast(); + final combined = first.stream + .transform(combineLatestAll([second.stream])) + .map((data) => data.join()); + + // first: a------b----------------c------d----e---| + // second: --1---------2---3---4------5-| + // combined: --a1---b1---b2--b3--b4-----c5--d5---e5--| + // sub1: ^-----------------! + // sub2: ----------------------^-----------------| + + expect(combined.take(4), emitsInOrder(['a1', 'b1', 'b2', 'b3'])); + + first.add('a'); + await tick(); + second.add('1'); + await tick(); + first.add('b'); + await tick(); + second.add('2'); + await tick(); + second.add('3'); + await tick(); + + // First subscription is canceled here by .take(4) + expect(first.hasListener, isFalse); + expect(second.hasListener, isFalse); + + // This emit is thrown away because there are no subscribers + second.add('4'); + await tick(); + + expect(combined, emitsInOrder(['c5', 'd5', 'e5', emitsDone])); + + first.add('c'); + await tick(); + second.add('5'); + await tick(); + await second.close(); + await tick(); + first.add('d'); + await tick(); + first.add('e'); + await tick(); + await first.close(); + }); + }); + }); +}