Add concurrentAsyncExpand (dart-lang/stream_transform#90)

Closes dart-lang/stream_transform#77

Added with the `Merge` extension, since the behavior is similar to
merging streams that are created lazily.

Adds a private `StreamTransformer` to merge a `Stream<Stream<T>>`. This
could be exposed separately as an extension on that type later.

In order to allow cancelling and then re-listening on broadcast streams,
an sub stream which is single subscription must be converted to
broadcast. This can be risky since subscriptions can leak and cause the
inner streams to never be canceled.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 99d2786..686f919 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 1.1.0
+
+-   Add `concurrentAsyncExpand` to interleave events emitted by multiple sub
+    streams created by a callback.
+
 ## 1.0.0
 
 -   Remove the top level methods and retain the extensions only.
diff --git a/pkgs/stream_transform/README.md b/pkgs/stream_transform/README.md
index 785adf1..56d114a 100644
--- a/pkgs/stream_transform/README.md
+++ b/pkgs/stream_transform/README.md
@@ -41,7 +41,7 @@
 
 Appends the values of a stream after another stream finishes.
 
-## merge, mergeAll
+## merge, mergeAll, concurrentAsyncExpand
 
 Interleaves events from multiple streams into a single stream.
 
diff --git a/pkgs/stream_transform/lib/src/merge.dart b/pkgs/stream_transform/lib/src/merge.dart
index bbcf759..5d2c864 100644
--- a/pkgs/stream_transform/lib/src/merge.dart
+++ b/pkgs/stream_transform/lib/src/merge.dart
@@ -54,6 +54,30 @@
   /// events emitted by that stream before the result stream has a subscriber
   /// will be discarded.
   Stream<T> mergeAll(Iterable<Stream<T>> others) => transform(_Merge(others));
+
+  /// Like [asyncExpand] but the [convert] callback may be called for an element
+  /// before the Stream emitted by the previous element has closed.
+  ///
+  /// Events on the result stream will be emitted in the order they are emitted
+  /// by the sub streams, which may not match the order of the original stream.
+  ///
+  /// Errors from [convert], the source stream, or any of the sub streams are
+  /// forwarded to the result stream.
+  ///
+  /// The result stream will not close until the source stream closes and all
+  /// sub streams have closed.
+  ///
+  /// If the source stream is a broadcast stream the result will be as well,
+  /// regardless of the types of streams created by [convert]. In this case,
+  /// some care should be taken:
+  /// -  If [convert] returns a single subscription stream it may be listened to
+  /// and never canceled.
+  /// -  For any period of time where there are no listeners on the result
+  /// stream, any sub streams from previously emitted events will be ignored,
+  /// regardless of whether they emit further events after a listener is added
+  /// back.
+  Stream<S> concurrentAsyncExpand<S>(Stream<S> Function(T) convert) =>
+      map(convert).transform(_MergeExpanded());
 }
 
 class _Merge<T> extends StreamTransformerBase<T, T> {
@@ -109,3 +133,52 @@
     return controller.stream;
   }
 }
+
+class _MergeExpanded<T> extends StreamTransformerBase<Stream<T>, T> {
+  @override
+  Stream<T> bind(Stream<Stream<T>> streams) {
+    final controller = streams.isBroadcast
+        ? StreamController<T>.broadcast(sync: true)
+        : StreamController<T>(sync: true);
+
+    controller.onListen = () {
+      final subscriptions = <StreamSubscription<dynamic>>[];
+      final outerSubscription = streams.listen((inner) {
+        if (streams.isBroadcast && !inner.isBroadcast) {
+          inner = inner.asBroadcastStream();
+        }
+        final subscription =
+            inner.listen(controller.add, onError: controller.addError);
+        subscription.onDone(() {
+          assert(subscriptions.contains(subscription));
+          subscriptions.remove(subscription);
+          if (subscriptions.isEmpty) controller.close();
+        });
+        subscriptions.add(subscription);
+      }, onError: controller.addError);
+      outerSubscription.onDone(() {
+        assert(subscriptions.contains(outerSubscription));
+        subscriptions.remove(outerSubscription);
+        if (subscriptions.isEmpty) controller.close();
+      });
+      subscriptions.add(outerSubscription);
+      if (!streams.isBroadcast) {
+        controller
+          ..onPause = () {
+            for (final subscription in subscriptions) {
+              subscription.pause();
+            }
+          }
+          ..onResume = () {
+            for (final subscription in subscriptions) {
+              subscription.resume();
+            }
+          };
+      }
+      controller.onCancel = () {
+        return Future.wait(subscriptions.map((s) => s.cancel()));
+      };
+    };
+    return controller.stream;
+  }
+}
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index 5442357..2d2f59a 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -2,7 +2,7 @@
 description: A collection of utilities to transform and manipulate streams.
 author: Dart Team <misc@dartlang.org>
 homepage: https://www.github.com/dart-lang/stream_transform
-version: 1.0.0
+version: 1.1.0-dev
 
 environment:
   sdk: ">=2.6.0 <3.0.0"
diff --git a/pkgs/stream_transform/test/concurrent_async_expand_test.dart b/pkgs/stream_transform/test/concurrent_async_expand_test.dart
new file mode 100644
index 0000000..f843c6f
--- /dev/null
+++ b/pkgs/stream_transform/test/concurrent_async_expand_test.dart
@@ -0,0 +1,187 @@
+// Copyright (c) 2019, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+  test('forwards errors from the convert callback', () async {
+    var errors = <String>[];
+    var source = Stream.fromIterable([1, 2, 3]);
+    source.concurrentAsyncExpand((i) {
+      // ignore: only_throw_errors
+      throw 'Error: $i';
+    }).listen((_) {}, onError: errors.add);
+    await Future<void>(() {});
+    expect(errors, ['Error: 1', 'Error: 2', 'Error: 3']);
+  });
+
+  for (var outerType in streamTypes) {
+    for (var innerType in streamTypes) {
+      group('concurrentAsyncExpand $outerType to $innerType', () {
+        StreamController<int> outerController;
+        bool outerCanceled;
+        List<StreamController<String>> innerControllers;
+        List<bool> innerCanceled;
+        List<String> emittedValues;
+        bool isDone;
+        List<String> errors;
+        Stream<String> transformed;
+        StreamSubscription<String> subscription;
+
+        setUp(() {
+          outerController = createController(outerType)
+            ..onCancel = () {
+              outerCanceled = true;
+            };
+          outerCanceled = false;
+          innerControllers = [];
+          innerCanceled = [];
+          emittedValues = [];
+          errors = [];
+          isDone = false;
+          transformed = outerController.stream.concurrentAsyncExpand((i) {
+            var index = innerControllers.length;
+            innerCanceled.add(false);
+            innerControllers.add(createController<String>(innerType)
+              ..onCancel = () {
+                innerCanceled[index] = true;
+              });
+            return innerControllers.last.stream;
+          });
+          subscription = transformed
+              .listen(emittedValues.add, onError: errors.add, onDone: () {
+            isDone = true;
+          });
+        });
+
+        test('interleaves events from sub streams', () async {
+          outerController..add(1)..add(2);
+          await Future<void>(() {});
+          expect(emittedValues, isEmpty);
+          expect(innerControllers, hasLength(2));
+          innerControllers[0].add('First');
+          innerControllers[1].add('Second');
+          innerControllers[0].add('First again');
+          await Future<void>(() {});
+          expect(emittedValues, ['First', 'Second', 'First again']);
+        });
+
+        test('forwards errors from outer stream', () async {
+          outerController.addError('Error');
+          await Future<void>(() {});
+          expect(errors, ['Error']);
+        });
+
+        test('forwards errors from inner streams', () async {
+          outerController..add(1)..add(2);
+          await Future<void>(() {});
+          innerControllers[0].addError('Error 1');
+          innerControllers[1].addError('Error 2');
+          await Future<void>(() {});
+          expect(errors, ['Error 1', 'Error 2']);
+        });
+
+        test('can continue handling events after an error in outer stream',
+            () async {
+          outerController
+            ..addError('Error')
+            ..add(1);
+          await Future<void>(() {});
+          innerControllers[0].add('First');
+          await Future<void>(() {});
+          expect(emittedValues, ['First']);
+          expect(errors, ['Error']);
+        });
+
+        test('cancels outer subscription if output canceled', () async {
+          await subscription.cancel();
+          expect(outerCanceled, true);
+        });
+
+        if (outerType != 'broadcast' || innerType != 'single subscription') {
+          // A single subscription inner stream in a broadcast outer stream is
+          // not canceled.
+          test('cancels inner subscriptions if output canceled', () async {
+            outerController..add(1)..add(2);
+            await Future<void>(() {});
+            await subscription.cancel();
+            expect(innerCanceled, [true, true]);
+          });
+        }
+
+        test('stays open if any inner stream is still open', () async {
+          outerController.add(1);
+          await outerController.close();
+          await Future<void>(() {});
+          expect(isDone, false);
+        });
+
+        test('stays open if outer stream is still open', () async {
+          outerController.add(1);
+          await Future<void>(() {});
+          await innerControllers[0].close();
+          await Future<void>(() {});
+          expect(isDone, false);
+        });
+
+        test('closes after all inner streams and outer stream close', () async {
+          outerController.add(1);
+          await Future<void>(() {});
+          await innerControllers[0].close();
+          await outerController.close();
+          await Future<void>(() {});
+          expect(isDone, true);
+        });
+
+        if (outerType == 'broadcast') {
+          test('multiple listerns all get values', () async {
+            var otherValues = <String>[];
+            transformed.listen(otherValues.add);
+            outerController.add(1);
+            await Future<void>(() {});
+            innerControllers[0].add('First');
+            await Future<void>(() {});
+            expect(emittedValues, ['First']);
+            expect(otherValues, ['First']);
+          });
+
+          test('multiple listeners get closed', () async {
+            var otherDone = false;
+            transformed.listen(null, onDone: () => otherDone = true);
+            outerController.add(1);
+            await Future<void>(() {});
+            await innerControllers[0].close();
+            await outerController.close();
+            await Future<void>(() {});
+            expect(isDone, true);
+            expect(otherDone, true);
+          });
+
+          test('can cancel and relisten', () async {
+            outerController..add(1)..add(2);
+            await Future(() {});
+            innerControllers[0].add('First');
+            innerControllers[1].add('Second');
+            await Future(() {});
+            await subscription.cancel();
+            innerControllers[0].add('Ignored');
+            await Future(() {});
+            subscription = transformed.listen(emittedValues.add);
+            innerControllers[0].add('Also ignored');
+            outerController.add(3);
+            await Future(() {});
+            innerControllers[2].add('More');
+            await Future(() {});
+            expect(emittedValues, ['First', 'Second', 'More']);
+          });
+        }
+      });
+    }
+  }
+}