Add `combineLatestAll` (dart-lang/stream_transform#67)

diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 1a2ce66..d50c119 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -5,6 +5,7 @@
   the unlikely situation where `scan` was used to produce a `Stream<Future>`
   inference may now fail and require explicit generic type arguments.
 - Add `combineLatest`.
+- Add `combineLatestAll`.
 
 ## 0.0.15
 
diff --git a/pkgs/stream_transform/README.md b/pkgs/stream_transform/README.md
index 7e556cb..5d24caf 100644
--- a/pkgs/stream_transform/README.md
+++ b/pkgs/stream_transform/README.md
@@ -24,6 +24,11 @@
 Combine the most recent event from two streams through a callback and emit the
 result.
 
+# combineLatestAll
+
+Combines the latest events emitted from multiple source streams and yields a
+list of the values.
+
 # debounce, debounceBuffer
 
 Prevents a source stream from emitting too frequently by dropping or collecting
diff --git a/pkgs/stream_transform/lib/src/combine_latest_all.dart b/pkgs/stream_transform/lib/src/combine_latest_all.dart
new file mode 100644
index 0000000..8141a3f
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/combine_latest_all.dart
@@ -0,0 +1,110 @@
+// Copyright (c) 2019, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+/// Combine the latest value emitted from the source stream with the latest
+/// values emitted from [others].
+///
+/// [combineLatestAll] subscribes to the source stream and [others] and when
+/// any one of the streams emits, the result stream will emit a [List<T>] of
+/// the latest values emitted from all streams.
+///
+/// The result stream will not emit until all source streams emit at least
+/// once. If a source stream emits multiple values before another starts
+/// emitting, all but the last value will be lost.
+///
+/// The result stream will not close until all source streams have closed. When
+/// a source stream closes, the result stream will continue to emit the last
+/// value from the closed stream when the other source streams emit until the
+/// result stream has closed. If a source stream closes without emitting any
+/// value, the result stream will close as well.
+///
+/// Errors thrown by any source stream will be forwarded to the result stream.
+///
+/// If the source stream is a broadcast stream, the result stream will be as
+/// well, regardless of the types of [others]. If a single subscription stream
+/// is combined with a broadcast source stream, it may never be canceled.
+///
+/// ## Example
+///
+/// (Suppose first, second, and third are Stream<String>)
+/// final combined = first
+///     .transform(combineLatestAll([second, third]))
+///     .map((data) => data.join());
+///
+/// first:    a----b------------------c--------d---|
+/// second:   --1---------2-----------------|
+/// third:    -------&----------%---|
+/// combined: -------b1&--b2&---b2%---c2%------d2%-|
+///
+StreamTransformer<T, List<T>> combineLatestAll<T>(Iterable<Stream<T>> others) =>
+    _CombineLatestAll<T>(others);
+
+class _CombineLatestAll<T> extends StreamTransformerBase<T, List<T>> {
+  final Iterable<Stream<T>> _others;
+
+  _CombineLatestAll(this._others);
+
+  @override
+  Stream<List<T>> bind(Stream<T> source) {
+    final controller = source.isBroadcast
+        ? StreamController<List<T>>.broadcast(sync: true)
+        : StreamController<List<T>>(sync: true);
+
+    var allStreams = [source]..addAll(_others);
+    if (source.isBroadcast) {
+      allStreams = allStreams
+          .map((s) => s.isBroadcast ? s : s.asBroadcastStream())
+          .toList();
+    }
+
+    List<StreamSubscription> subscriptions;
+
+    controller.onListen = () {
+      if (subscriptions != null) return;
+
+      final latestData = List<T>(allStreams.length);
+      final hasEmitted = <int>{};
+      void handleData(int index, T data) {
+        latestData[index] = data;
+        hasEmitted.add(index);
+        if (hasEmitted.length == allStreams.length) {
+          controller.add(List.from(latestData));
+        }
+      }
+
+      var activeStreamCount = 0;
+      subscriptions = allStreams.map((stream) {
+        final index = activeStreamCount;
+        activeStreamCount++;
+        return stream.listen((data) => handleData(index, data),
+            onError: controller.addError, onDone: () {
+          if (--activeStreamCount <= 0 || !hasEmitted.contains(index)) {
+            controller.close();
+          }
+        });
+      }).toList();
+      if (!source.isBroadcast) {
+        controller.onPause = () {
+          for (var subscription in subscriptions) {
+            subscription.pause();
+          }
+        };
+        controller.onResume = () {
+          for (var subscription in subscriptions) {
+            subscription.resume();
+          }
+        };
+      }
+      controller.onCancel = () {
+        final toCancel = subscriptions;
+        subscriptions = null;
+        if (activeStreamCount <= 0) return null;
+        return Future.wait(toCancel.map((s) => s.cancel()));
+      };
+    };
+    return controller.stream;
+  }
+}
diff --git a/pkgs/stream_transform/lib/stream_transform.dart b/pkgs/stream_transform/lib/stream_transform.dart
index 557fc82..b8b15f4 100644
--- a/pkgs/stream_transform/lib/stream_transform.dart
+++ b/pkgs/stream_transform/lib/stream_transform.dart
@@ -8,6 +8,7 @@
 export 'src/buffer.dart';
 export 'src/chain_transformers.dart';
 export 'src/combine_latest.dart';
+export 'src/combine_latest_all.dart';
 export 'src/concat.dart';
 export 'src/concurrent_async_map.dart';
 export 'src/debounce.dart';
diff --git a/pkgs/stream_transform/test/combine_latest_all_test.dart b/pkgs/stream_transform/test/combine_latest_all_test.dart
new file mode 100644
index 0000000..9cf3a91
--- /dev/null
+++ b/pkgs/stream_transform/test/combine_latest_all_test.dart
@@ -0,0 +1,166 @@
+import 'dart:async';
+import 'package:test/test.dart';
+import 'package:stream_transform/stream_transform.dart';
+
+Future<void> tick() => Future(() {});
+
+void main() {
+  group('combineLatestAll', () {
+    test('emits latest values', () async {
+      final first = StreamController<String>();
+      final second = StreamController<String>();
+      final third = StreamController<String>();
+      final combined = first.stream
+          .transform(combineLatestAll([second.stream, third.stream]))
+          .map((data) => data.join());
+
+      // first:    a----b------------------c--------d---|
+      // second:   --1---------2-----------------|
+      // third:    -------&----------%---|
+      // combined: -------b1&--b2&---b2%---c2%------d2%-|
+
+      expect(combined,
+          emitsInOrder(['b1&', 'b2&', 'b2%', 'c2%', 'd2%', emitsDone]));
+
+      first.add('a');
+      await tick();
+      second.add('1');
+      await tick();
+      first.add('b');
+      await tick();
+      third.add('&');
+      await tick();
+      second.add('2');
+      await tick();
+      third.add('%');
+      await tick();
+      await third.close();
+      await tick();
+      first.add('c');
+      await tick();
+      await second.close();
+      await tick();
+      first.add('d');
+      await tick();
+      await first.close();
+    });
+
+    test('ends if a Stream closes without ever emitting a value', () async {
+      final first = StreamController<String>();
+      final second = StreamController<String>();
+      final combined =
+          first.stream.transform(combineLatestAll([second.stream]));
+
+      // first:    -a------b-------|
+      // second:   -----|
+      // combined: -----|
+
+      expect(combined, emits(emitsDone));
+
+      first.add('a');
+      await tick();
+      await second.close();
+      await tick();
+      first.add('b');
+    });
+
+    test('forwards errors', () async {
+      final first = StreamController<String>();
+      final second = StreamController<String>();
+      final combined = first.stream
+          .transform(combineLatestAll([second.stream]))
+          .map((data) => data.join());
+
+      // first:    -a---------|
+      // second:   ----1---#
+      // combined: ----a1--#
+
+      expect(combined, emitsThrough(emitsError('doh')));
+
+      first.add('a');
+      await tick();
+      second.add('1');
+      await tick();
+      second.addError('doh');
+    });
+
+    test('ends after both streams have ended', () async {
+      final first = StreamController<String>();
+      final second = StreamController<String>();
+
+      var done = false;
+      first.stream
+          .transform(combineLatestAll([second.stream]))
+          .listen(null, onDone: () => done = true);
+
+      // first:    -a---|
+      // second:   --------1--|
+      // combined: --------a1-|
+
+      first.add('a');
+      await tick();
+      await first.close();
+      await tick();
+
+      expect(done, isFalse);
+
+      second.add('1');
+      await tick();
+      await second.close();
+      await tick();
+
+      expect(done, isTrue);
+    });
+
+    group('broadcast source', () {
+      test('can cancel and relisten to broadcast stream', () async {
+        final first = StreamController<String>.broadcast();
+        final second = StreamController<String>.broadcast();
+        final combined = first.stream
+            .transform(combineLatestAll([second.stream]))
+            .map((data) => data.join());
+
+        // first:    a------b----------------c------d----e---|
+        // second:   --1---------2---3---4------5-|
+        // combined: --a1---b1---b2--b3--b4-----c5--d5---e5--|
+        // sub1:     ^-----------------!
+        // sub2:     ----------------------^-----------------|
+
+        expect(combined.take(4), emitsInOrder(['a1', 'b1', 'b2', 'b3']));
+
+        first.add('a');
+        await tick();
+        second.add('1');
+        await tick();
+        first.add('b');
+        await tick();
+        second.add('2');
+        await tick();
+        second.add('3');
+        await tick();
+
+        // First subscription is canceled here by .take(4)
+        expect(first.hasListener, isFalse);
+        expect(second.hasListener, isFalse);
+
+        // This emit is thrown away because there are no subscribers
+        second.add('4');
+        await tick();
+
+        expect(combined, emitsInOrder(['c5', 'd5', 'e5', emitsDone]));
+
+        first.add('c');
+        await tick();
+        second.add('5');
+        await tick();
+        await second.close();
+        await tick();
+        first.add('d');
+        await tick();
+        first.add('e');
+        await tick();
+        await first.close();
+      });
+    });
+  });
+}