Add `combineLatestAll` (dart-lang/stream_transform#67)
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 1a2ce66..d50c119 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -5,6 +5,7 @@
the unlikely situation where `scan` was used to produce a `Stream<Future>`
inference may now fail and require explicit generic type arguments.
- Add `combineLatest`.
+- Add `combineLatestAll`.
## 0.0.15
diff --git a/pkgs/stream_transform/README.md b/pkgs/stream_transform/README.md
index 7e556cb..5d24caf 100644
--- a/pkgs/stream_transform/README.md
+++ b/pkgs/stream_transform/README.md
@@ -24,6 +24,11 @@
Combine the most recent event from two streams through a callback and emit the
result.
+# combineLatestAll
+
+Combines the latest events emitted from multiple source streams and yields a
+list of the values.
+
# debounce, debounceBuffer
Prevents a source stream from emitting too frequently by dropping or collecting
diff --git a/pkgs/stream_transform/lib/src/combine_latest_all.dart b/pkgs/stream_transform/lib/src/combine_latest_all.dart
new file mode 100644
index 0000000..8141a3f
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/combine_latest_all.dart
@@ -0,0 +1,110 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+/// Combine the latest value emitted from the source stream with the latest
+/// values emitted from [others].
+///
+/// [combineLatestAll] subscribes to the source stream and [others] and when
+/// any one of the streams emits, the result stream will emit a [List<T>] of
+/// the latest values emitted from all streams.
+///
+/// The result stream will not emit until all source streams emit at least
+/// once. If a source stream emits multiple values before another starts
+/// emitting, all but the last value will be lost.
+///
+/// The result stream will not close until all source streams have closed. When
+/// a source stream closes, the result stream will continue to emit the last
+/// value from the closed stream when the other source streams emit until the
+/// result stream has closed. If a source stream closes without emitting any
+/// value, the result stream will close as well.
+///
+/// Errors thrown by any source stream will be forwarded to the result stream.
+///
+/// If the source stream is a broadcast stream, the result stream will be as
+/// well, regardless of the types of [others]. If a single subscription stream
+/// is combined with a broadcast source stream, it may never be canceled.
+///
+/// ## Example
+///
+/// (Suppose first, second, and third are Stream<String>)
+/// final combined = first
+/// .transform(combineLatestAll([second, third]))
+/// .map((data) => data.join());
+///
+/// first: a----b------------------c--------d---|
+/// second: --1---------2-----------------|
+/// third: -------&----------%---|
+/// combined: -------b1&--b2&---b2%---c2%------d2%-|
+///
+StreamTransformer<T, List<T>> combineLatestAll<T>(Iterable<Stream<T>> others) =>
+ _CombineLatestAll<T>(others);
+
+class _CombineLatestAll<T> extends StreamTransformerBase<T, List<T>> {
+ final Iterable<Stream<T>> _others;
+
+ _CombineLatestAll(this._others);
+
+ @override
+ Stream<List<T>> bind(Stream<T> source) {
+ final controller = source.isBroadcast
+ ? StreamController<List<T>>.broadcast(sync: true)
+ : StreamController<List<T>>(sync: true);
+
+ var allStreams = [source]..addAll(_others);
+ if (source.isBroadcast) {
+ allStreams = allStreams
+ .map((s) => s.isBroadcast ? s : s.asBroadcastStream())
+ .toList();
+ }
+
+ List<StreamSubscription> subscriptions;
+
+ controller.onListen = () {
+ if (subscriptions != null) return;
+
+ final latestData = List<T>(allStreams.length);
+ final hasEmitted = <int>{};
+ void handleData(int index, T data) {
+ latestData[index] = data;
+ hasEmitted.add(index);
+ if (hasEmitted.length == allStreams.length) {
+ controller.add(List.from(latestData));
+ }
+ }
+
+ var activeStreamCount = 0;
+ subscriptions = allStreams.map((stream) {
+ final index = activeStreamCount;
+ activeStreamCount++;
+ return stream.listen((data) => handleData(index, data),
+ onError: controller.addError, onDone: () {
+ if (--activeStreamCount <= 0 || !hasEmitted.contains(index)) {
+ controller.close();
+ }
+ });
+ }).toList();
+ if (!source.isBroadcast) {
+ controller.onPause = () {
+ for (var subscription in subscriptions) {
+ subscription.pause();
+ }
+ };
+ controller.onResume = () {
+ for (var subscription in subscriptions) {
+ subscription.resume();
+ }
+ };
+ }
+ controller.onCancel = () {
+ final toCancel = subscriptions;
+ subscriptions = null;
+ if (activeStreamCount <= 0) return null;
+ return Future.wait(toCancel.map((s) => s.cancel()));
+ };
+ };
+ return controller.stream;
+ }
+}
diff --git a/pkgs/stream_transform/lib/stream_transform.dart b/pkgs/stream_transform/lib/stream_transform.dart
index 557fc82..b8b15f4 100644
--- a/pkgs/stream_transform/lib/stream_transform.dart
+++ b/pkgs/stream_transform/lib/stream_transform.dart
@@ -8,6 +8,7 @@
export 'src/buffer.dart';
export 'src/chain_transformers.dart';
export 'src/combine_latest.dart';
+export 'src/combine_latest_all.dart';
export 'src/concat.dart';
export 'src/concurrent_async_map.dart';
export 'src/debounce.dart';
diff --git a/pkgs/stream_transform/test/combine_latest_all_test.dart b/pkgs/stream_transform/test/combine_latest_all_test.dart
new file mode 100644
index 0000000..9cf3a91
--- /dev/null
+++ b/pkgs/stream_transform/test/combine_latest_all_test.dart
@@ -0,0 +1,166 @@
+import 'dart:async';
+import 'package:test/test.dart';
+import 'package:stream_transform/stream_transform.dart';
+
+Future<void> tick() => Future(() {});
+
+void main() {
+ group('combineLatestAll', () {
+ test('emits latest values', () async {
+ final first = StreamController<String>();
+ final second = StreamController<String>();
+ final third = StreamController<String>();
+ final combined = first.stream
+ .transform(combineLatestAll([second.stream, third.stream]))
+ .map((data) => data.join());
+
+ // first: a----b------------------c--------d---|
+ // second: --1---------2-----------------|
+ // third: -------&----------%---|
+ // combined: -------b1&--b2&---b2%---c2%------d2%-|
+
+ expect(combined,
+ emitsInOrder(['b1&', 'b2&', 'b2%', 'c2%', 'd2%', emitsDone]));
+
+ first.add('a');
+ await tick();
+ second.add('1');
+ await tick();
+ first.add('b');
+ await tick();
+ third.add('&');
+ await tick();
+ second.add('2');
+ await tick();
+ third.add('%');
+ await tick();
+ await third.close();
+ await tick();
+ first.add('c');
+ await tick();
+ await second.close();
+ await tick();
+ first.add('d');
+ await tick();
+ await first.close();
+ });
+
+ test('ends if a Stream closes without ever emitting a value', () async {
+ final first = StreamController<String>();
+ final second = StreamController<String>();
+ final combined =
+ first.stream.transform(combineLatestAll([second.stream]));
+
+ // first: -a------b-------|
+ // second: -----|
+ // combined: -----|
+
+ expect(combined, emits(emitsDone));
+
+ first.add('a');
+ await tick();
+ await second.close();
+ await tick();
+ first.add('b');
+ });
+
+ test('forwards errors', () async {
+ final first = StreamController<String>();
+ final second = StreamController<String>();
+ final combined = first.stream
+ .transform(combineLatestAll([second.stream]))
+ .map((data) => data.join());
+
+ // first: -a---------|
+ // second: ----1---#
+ // combined: ----a1--#
+
+ expect(combined, emitsThrough(emitsError('doh')));
+
+ first.add('a');
+ await tick();
+ second.add('1');
+ await tick();
+ second.addError('doh');
+ });
+
+ test('ends after both streams have ended', () async {
+ final first = StreamController<String>();
+ final second = StreamController<String>();
+
+ var done = false;
+ first.stream
+ .transform(combineLatestAll([second.stream]))
+ .listen(null, onDone: () => done = true);
+
+ // first: -a---|
+ // second: --------1--|
+ // combined: --------a1-|
+
+ first.add('a');
+ await tick();
+ await first.close();
+ await tick();
+
+ expect(done, isFalse);
+
+ second.add('1');
+ await tick();
+ await second.close();
+ await tick();
+
+ expect(done, isTrue);
+ });
+
+ group('broadcast source', () {
+ test('can cancel and relisten to broadcast stream', () async {
+ final first = StreamController<String>.broadcast();
+ final second = StreamController<String>.broadcast();
+ final combined = first.stream
+ .transform(combineLatestAll([second.stream]))
+ .map((data) => data.join());
+
+ // first: a------b----------------c------d----e---|
+ // second: --1---------2---3---4------5-|
+ // combined: --a1---b1---b2--b3--b4-----c5--d5---e5--|
+ // sub1: ^-----------------!
+ // sub2: ----------------------^-----------------|
+
+ expect(combined.take(4), emitsInOrder(['a1', 'b1', 'b2', 'b3']));
+
+ first.add('a');
+ await tick();
+ second.add('1');
+ await tick();
+ first.add('b');
+ await tick();
+ second.add('2');
+ await tick();
+ second.add('3');
+ await tick();
+
+ // First subscription is canceled here by .take(4)
+ expect(first.hasListener, isFalse);
+ expect(second.hasListener, isFalse);
+
+ // This emit is thrown away because there are no subscribers
+ second.add('4');
+ await tick();
+
+ expect(combined, emitsInOrder(['c5', 'd5', 'e5', emitsDone]));
+
+ first.add('c');
+ await tick();
+ second.add('5');
+ await tick();
+ await second.close();
+ await tick();
+ first.add('d');
+ await tick();
+ first.add('e');
+ await tick();
+ await first.close();
+ });
+ });
+ });
+}