Add a StreamGroup class for merging streams.

R=lrn@google.com

Review URL: https://codereview.chromium.org//1178793006.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 125519b..1333fe3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,6 +3,9 @@
 - Added a `FutureGroup` class for waiting for a group of futures, potentially of
   unknown size, to complete.
 
+- Added a `StreamGroup` class for merging the events of a group of streams,
+  potentially of unknown size.
+
 ## 1.1.1
 
 - Updated SDK version constraint to at least 1.9.0.
diff --git a/lib/async.dart b/lib/async.dart
index 7dbda70..345b8dc 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -5,5 +5,6 @@
 library dart.pkg.async;
 
 export "src/future_group.dart";
+export "src/stream_group.dart";
 export "stream_zip.dart";
 export "result.dart";
diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart
new file mode 100644
index 0000000..0147787
--- /dev/null
+++ b/lib/src/stream_group.dart
@@ -0,0 +1,259 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.stream_group;
+
+import 'dart:async';
+
+/// A collection of streams whose events are unified and sent through a central
+/// stream.
+///
+/// Both errors and data events are forwarded through [stream]. The streams in
+/// the group won't be listened to until [stream] has a listener. **Note that
+/// this means that events emitted by broadcast streams will be dropped until
+/// [stream] has a listener.**
+///
+/// If the `StreamGroup` is construced using [new StreamGroup], [stream] will be
+/// single-subscription. In this case, if [stream] is paused or canceled, all
+/// streams in the group will likewise be paused or canceled, respectively.
+///
+/// If the `StreamGroup` is construced using [new StreamGroup.broadcast],
+/// [stream] will be a broadcast stream. In this case, the streams in the group
+/// will never be paused and single-subscription streams in the group will never
+/// be canceled. **Note that single-subscription streams in a broadcast group
+/// may drop events if a listener is added and later removed.** Broadcast
+/// streams in the group will be canceled once [stream] has no listeners, and
+/// will be listened to again once [stream] has listeners.
+///
+/// [stream] won't close until [close] is called on the group *and* every stream
+/// in the group closes.
+class StreamGroup<T> implements Sink<Stream<T>> {
+  /// The stream through which all events from streams in the group are emitted.
+  Stream<T> get stream => _controller.stream;
+  StreamController<T> _controller;
+
+  /// Whether the group is closed, meaning that no more streams may be added.
+  var _closed = false;
+
+  /// The current state of the group.
+  ///
+  /// See [_StreamGroupState] for detailed descriptions of each state.
+  var _state = _StreamGroupState.dormant;
+
+  /// Streams that have been added to the group, and their subscriptions if they
+  /// have been subscribed to.
+  ///
+  /// The subscriptions will be null until the group has a listener registered.
+  /// If it's a broadcast group and it goes dormant again, broadcast stream
+  /// subscriptions will be canceled and set to null again. Single-subscriber
+  /// stream subscriptions will be left intact, since they can't be
+  /// re-subscribed.
+  final _subscriptions = new Map<Stream<T>, StreamSubscription<T>>();
+
+  /// Merges the events from [streams] into a single (single-subscriber) stream.
+  ///
+  /// This is equivalent to adding [streams] to a group, closing that group, and
+  /// returning its stream.
+  static Stream merge(Iterable<Stream> streams) {
+    var group = new StreamGroup();
+    streams.forEach(group.add);
+    group.close();
+    return group.stream;
+  }
+
+  /// Creates a new stream group where [stream] is single-subscriber.
+  StreamGroup() {
+    _controller = new StreamController<T>(
+        onListen: _onListen,
+        onPause: _onPause,
+        onResume: _onResume,
+        onCancel: _onCancel,
+        sync: true);
+  }
+
+  /// Creates a new stream group where [stream] is a broadcast stream.
+  StreamGroup.broadcast() {
+    _controller = new StreamController<T>.broadcast(
+        onListen: _onListen,
+        onCancel: _onCancelBroadcast,
+        sync: true);
+  }
+
+  /// Adds [stream] as a member of this group.
+  ///
+  /// Any events from [stream] will be emitted through [this.stream]. If this
+  /// group has a listener, [stream] will be listened to immediately; otherwise
+  /// it will only be listened to once this group gets a listener.
+  ///
+  /// If this is a single-subscription group and its subscription has been
+  /// canceled, [stream] will be canceled as soon as its added. If this returns
+  /// a [Future], it will be returned from [add]. Otherwise, [add] returns
+  /// `null`.
+  ///
+  /// Throws a [StateError] if this group is closed.
+  Future add(Stream<T> stream) {
+    if (_closed) {
+      throw new StateError("Can't add a Stream to a closed StreamGroup.");
+    }
+
+    if (_state == _StreamGroupState.dormant) {
+      _subscriptions.putIfAbsent(stream, () => null);
+    } else if (_state == _StreamGroupState.canceled) {
+      // Listen to the stream and cancel it immediately so that no one else can
+      // listen, for consistency. If the stream has an onCancel listener this
+      // will also fire that, which may help it clean up resources.
+      return stream.listen(null).cancel();
+    } else {
+      _subscriptions.putIfAbsent(stream, () => _listenToStream(stream));
+    }
+
+    return null;
+  }
+
+  /// Removes [stream] as a member of this group.
+  ///
+  /// No further events from [stream] will be emitted through this group. If
+  /// [stream] has been listened to, its subscription will be canceled.
+  ///
+  /// If [stream] has been listened to, this *synchronously* cancels its
+  /// subscription. This means that any events from [stream] that haven't yet
+  /// been emitted through this group will not be.
+  ///
+  /// If [stream]'s subscription is canceled, this returns
+  /// [StreamSubscription.cancel]'s return value. Otherwise, it returns `null`.
+  Future remove(Stream<T> stream) {
+    var subscription = _subscriptions.remove(stream);
+    var future = subscription == null ? null : subscription.cancel();
+    if (_closed && _subscriptions.isEmpty) _controller.close();
+    return future;
+  }
+
+  /// A callback called when [stream] is listened to.
+  ///
+  /// This is called for both single-subscription and broadcast groups.
+  void _onListen() {
+    _state = _StreamGroupState.listening;
+    _subscriptions.forEach((stream, subscription) {
+      // If this is a broadcast group and this isn't the first time it's been
+      // listened to, there may still be some subscriptions to
+      // single-subscription streams.
+      if (subscription != null) return;
+      _subscriptions[stream] = _listenToStream(stream);
+    });
+  }
+
+  /// A callback called when [stream] is paused.
+  void _onPause() {
+    _state = _StreamGroupState.paused;
+    for (var subscription in _subscriptions.values) {
+      subscription.pause();
+    }
+  }
+
+  /// A callback called when [stream] is resumed.
+  void _onResume() {
+    _state = _StreamGroupState.listening;
+    for (var subscription in _subscriptions.values) {
+      subscription.resume();
+    }
+  }
+
+  /// A callback called when [stream] is canceled.
+  ///
+  /// This is only called for single-subscription groups.
+  Future _onCancel() {
+    _state = _StreamGroupState.canceled;
+
+    var futures = _subscriptions.values
+        .map((subscription) => subscription.cancel())
+        .where((future) => future != null)
+        .toList();
+
+    _subscriptions.clear();
+    return futures.isEmpty ? null : Future.wait(futures);
+  }
+
+  /// A callback called when [stream]'s last listener is canceled.
+  ///
+  /// This is only called for broadcast groups.
+  void _onCancelBroadcast() {
+    _state = _StreamGroupState.dormant;
+
+    _subscriptions.forEach((stream, subscription) {
+      // Cancel the broadcast streams, since we can re-listen to those later,
+      // but allow the single-subscription streams to keep firing. Their events
+      // will still be added to [_controller], but then they'll be dropped since
+      // it has no listeners.
+      if (!stream.isBroadcast) return;
+      subscription.cancel();
+      _subscriptions[stream] = null;
+    });
+  }
+
+  /// Starts actively forwarding events from [stream] to [_controller].
+  ///
+  /// This will pause the resulting subscription if [this] is paused.
+  StreamSubscription _listenToStream(Stream stream) {
+    var subscription = stream.listen(
+        _controller.add,
+        onError: _controller.addError,
+        onDone: () => remove(stream));
+    if (_state == _StreamGroupState.paused) subscription.pause();
+    return subscription;
+  }
+
+  /// Closes the group, indicating that no more streams will be added.
+  ///
+  /// If there are no streams in the group, [stream] is closed immediately.
+  /// Otherwise, [stream] will close once all streams in the group close.
+  ///
+  /// Returns a [Future] that completes once [stream] has actually been closed.
+  Future close() {
+    if (_closed) return _controller.done;
+
+    _closed = true;
+    if (_subscriptions.isEmpty) _controller.close();
+
+    return _controller.done;
+  }
+}
+
+/// An enum of possible states of a [StreamGroup].
+class _StreamGroupState {
+  /// The group has no listeners.
+  ///
+  /// New streams added to the group will be listened once the group has a
+  /// listener.
+  static const dormant = const _StreamGroupState("dormant");
+
+  /// The group has one or more listeners and is actively firing events.
+  ///
+  /// New streams added to the group will be immediately listeners.
+  static const listening = const _StreamGroupState("listening");
+
+  /// The group is paused and no more events will be fired until it resumes.
+  ///
+  /// New streams added to the group will be listened to, but then paused. They
+  /// will be resumed once the group itself is resumed.
+  ///
+  /// This state is only used by single-subscriber groups.
+  static const paused = const _StreamGroupState("paused");
+
+  /// The group is canceled and no more events will be fired ever.
+  ///
+  /// New streams added to the group will be listened to, canceled, and
+  /// discarded.
+  ///
+  /// This state is only used by single-subscriber groups.
+  static const canceled = const _StreamGroupState("canceled");
+
+  /// The name of the state.
+  ///
+  /// Used for debugging.
+  final String name;
+
+  const _StreamGroupState(this.name);
+
+  String toString() => name;
+}
diff --git a/test/stream_group_test.dart b/test/stream_group_test.dart
new file mode 100644
index 0000000..727077c
--- /dev/null
+++ b/test/stream_group_test.dart
@@ -0,0 +1,724 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.test.stream_group_test;
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:test/test.dart';
+
+main() {
+  group("single-subscription", () {
+    var streamGroup;
+    setUp(() {
+      streamGroup = new StreamGroup<String>();
+    });
+
+    test("buffers events from multiple sources", () async {
+      var controller1 = new StreamController<String>();
+      streamGroup.add(controller1.stream);
+      controller1.add("first");
+      controller1.close();
+
+      var controller2 = new StreamController<String>();
+      streamGroup.add(controller2.stream);
+      controller2.add("second");
+      controller2.close();
+
+      await flushMicrotasks();
+
+      expect(streamGroup.close(), completes);
+
+      expect(streamGroup.stream.toList(),
+          completion(unorderedEquals(["first", "second"])));
+    });
+
+    test("buffers errors from multiple sources", () async {
+      var controller1 = new StreamController<String>();
+      streamGroup.add(controller1.stream);
+      controller1.addError("first");
+      controller1.close();
+
+      var controller2 = new StreamController<String>();
+      streamGroup.add(controller2.stream);
+      controller2.addError("second");
+      controller2.close();
+
+      await flushMicrotasks();
+
+      expect(streamGroup.close(), completes);
+
+      var transformed = streamGroup.stream.transform(
+          new StreamTransformer.fromHandlers(
+              handleError: (error, _, sink) => sink.add("error: $error")));
+      expect(transformed.toList(),
+          completion(equals(["error: first", "error: second"])));
+    });
+
+    test("buffers events and errors together", () async {
+      var controller = new StreamController<String>();
+      streamGroup.add(controller.stream);
+
+      controller.add("first");
+      controller.addError("second");
+      controller.add("third");
+      controller.addError("fourth");
+      controller.addError("fifth");
+      controller.add("sixth");
+      controller.close();
+
+      await flushMicrotasks();
+
+      expect(streamGroup.close(), completes);
+
+      var transformed = streamGroup.stream.transform(
+          new StreamTransformer.fromHandlers(
+              handleData: (data, sink) => sink.add("data: $data"),
+              handleError: (error, _, sink) => sink.add("error: $error")));
+      expect(transformed.toList(), completion(equals([
+        "data: first",
+        "error: second",
+        "data: third",
+        "error: fourth",
+        "error: fifth",
+        "data: sixth"
+      ])));
+    });
+
+    test("emits events once there's a listener", () {
+      var controller = new StreamController<String>();
+      streamGroup.add(controller.stream);
+
+      expect(streamGroup.stream.toList(),
+          completion(equals(["first", "second"])));
+
+      controller.add("first");
+      controller.add("second");
+      controller.close();
+
+      expect(streamGroup.close(), completes);
+    });
+
+    test("doesn't buffer events from a broadcast stream", () async {
+      var controller = new StreamController<String>.broadcast();
+      streamGroup.add(controller.stream);
+
+      controller.add("first");
+      controller.add("second");
+      controller.close();
+
+      await flushMicrotasks();
+
+      expect(streamGroup.close(), completes);
+      expect(streamGroup.stream.toList(), completion(isEmpty));
+    });
+
+    test("when paused, buffers events from a broadcast stream", () async {
+      var controller = new StreamController<String>.broadcast();
+      streamGroup.add(controller.stream);
+
+      var events = [];
+      var subscription = streamGroup.stream.listen(events.add);
+      subscription.pause();
+
+      controller.add("first");
+      controller.add("second");
+      controller.close();
+      await flushMicrotasks();
+
+      subscription.resume();
+      expect(streamGroup.close(), completes);
+      await flushMicrotasks();
+
+      expect(events, equals(["first", "second"]));
+    });
+
+    test("emits events from a broadcast stream once there's a listener", () {
+      var controller = new StreamController<String>.broadcast();
+      streamGroup.add(controller.stream);
+
+      expect(streamGroup.stream.toList(),
+          completion(equals(["first", "second"])));
+
+      controller.add("first");
+      controller.add("second");
+      controller.close();
+
+      expect(streamGroup.close(), completes);
+    });
+
+    test("forwards cancel errors", () async {
+      var subscription = streamGroup.stream.listen(null);
+
+      var controller = new StreamController<String>(
+          onCancel: () => throw "error");
+      streamGroup.add(controller.stream);
+      await flushMicrotasks();
+
+      expect(subscription.cancel(), throwsA("error"));
+    });
+
+    test("forwards a cancel future", () async {
+      var subscription = streamGroup.stream.listen(null);
+
+      var completer = new Completer();
+      var controller = new StreamController<String>(
+          onCancel: () => completer.future);
+      streamGroup.add(controller.stream);
+      await flushMicrotasks();
+
+      var fired = false;
+      subscription.cancel().then((_) => fired = true);
+
+      await flushMicrotasks();
+      expect(fired, isFalse);
+
+      completer.complete();
+      await flushMicrotasks();
+      expect(fired, isTrue);
+    });
+
+    test("add() while active pauses the stream if the group is paused, then "
+        "resumes once the group resumes", () async {
+      var subscription = streamGroup.stream.listen(null);
+      await flushMicrotasks();
+
+      var paused = false;
+      var controller = new StreamController<String>(
+          onPause: () => paused = true,
+          onResume: () => paused = false);
+
+      subscription.pause();
+      await flushMicrotasks();
+
+      streamGroup.add(controller.stream);
+      await flushMicrotasks();
+      expect(paused, isTrue);
+
+      subscription.resume();
+      await flushMicrotasks();
+      expect(paused, isFalse);
+    });
+
+    group("add() while canceled", () {
+      setUp(() async {
+        streamGroup.stream.listen(null).cancel();
+        await flushMicrotasks();
+      });
+
+      test("immediately listens to and cancels the stream", () async {
+        var listened = false;
+        var canceled = false;
+        var controller = new StreamController<String>(onListen: () {
+          listened = true;
+        }, onCancel: expectAsync(() {
+          expect(listened, isTrue);
+          canceled = true;
+        }));
+
+        streamGroup.add(controller.stream);
+        await flushMicrotasks();
+        expect(listened, isTrue);
+        expect(canceled, isTrue);
+      });
+
+      test("forwards cancel errors", () {
+        var controller = new StreamController<String>(
+            onCancel: () => throw "error");
+
+        expect(streamGroup.add(controller.stream), throwsA("error"));
+      });
+
+      test("forwards a cancel future", () async {
+        var completer = new Completer();
+        var controller = new StreamController<String>(
+            onCancel: () => completer.future);
+
+        var fired = false;
+        streamGroup.add(controller.stream).then((_) => fired = true);
+
+        await flushMicrotasks();
+        expect(fired, isFalse);
+
+        completer.complete();
+        await flushMicrotasks();
+        expect(fired, isTrue);
+      });
+    });
+  });
+
+  group("broadcast", () {
+    var streamGroup;
+    setUp(() {
+      streamGroup = new StreamGroup<String>.broadcast();
+    });
+
+    test("buffers events from multiple sources", () async {
+      var controller1 = new StreamController<String>();
+      streamGroup.add(controller1.stream);
+      controller1.add("first");
+      controller1.close();
+
+      var controller2 = new StreamController<String>();
+      streamGroup.add(controller2.stream);
+      controller2.add("second");
+      controller2.close();
+
+      await flushMicrotasks();
+
+      expect(streamGroup.close(), completes);
+
+      expect(streamGroup.stream.toList(),
+          completion(equals(["first", "second"])));
+    });
+
+    test("emits events from multiple sources once there's a listener", () {
+      var controller1 = new StreamController<String>();
+      streamGroup.add(controller1.stream);
+
+      var controller2 = new StreamController<String>();
+      streamGroup.add(controller2.stream);
+
+      expect(streamGroup.stream.toList(),
+          completion(equals(["first", "second"])));
+
+      controller1.add("first");
+      controller2.add("second");
+      controller1.close();
+      controller2.close();
+
+      expect(streamGroup.close(), completes);
+    });
+
+    test("doesn't buffer events once a listener has been added and removed",
+        () async {
+      var controller = new StreamController<String>();
+      streamGroup.add(controller.stream);
+
+      streamGroup.stream.listen(null).cancel();
+      await flushMicrotasks();
+
+      controller.add("first");
+      controller.addError("second");
+      controller.close();
+
+      await flushMicrotasks();
+
+      expect(streamGroup.close(), completes);
+      expect(streamGroup.stream.toList(), completion(isEmpty));
+    });
+
+    test("doesn't buffer events from a broadcast stream", () async {
+      var controller = new StreamController<String>.broadcast();
+      streamGroup.add(controller.stream);
+      controller.add("first");
+      controller.addError("second");
+      controller.close();
+
+      await flushMicrotasks();
+
+      expect(streamGroup.close(), completes);
+      expect(streamGroup.stream.toList(), completion(isEmpty));
+    });
+
+    test("emits events from a broadcast stream once there's a listener", () {
+      var controller = new StreamController<String>.broadcast();
+      streamGroup.add(controller.stream);
+
+      expect(streamGroup.stream.toList(),
+          completion(equals(["first", "second"])));
+
+      controller.add("first");
+      controller.add("second");
+      controller.close();
+
+      expect(streamGroup.close(), completes);
+    });
+
+    test("cancels and re-listens broadcast streams", () async {
+      var subscription = streamGroup.stream.listen(null);
+
+      var controller = new StreamController<String>.broadcast();
+
+      streamGroup.add(controller.stream);
+      await flushMicrotasks();
+      expect(controller.hasListener, isTrue);
+
+      subscription.cancel();
+      await flushMicrotasks();
+      expect(controller.hasListener, isFalse);
+
+      streamGroup.stream.listen(null);
+      await flushMicrotasks();
+      expect(controller.hasListener, isTrue);
+    });
+
+    test("never cancels single-subscription streams", () async {
+      var subscription = streamGroup.stream.listen(null);
+
+      var controller = new StreamController<String>(
+          onCancel: expectAsync(() {}, count: 0));
+
+      streamGroup.add(controller.stream);
+      await flushMicrotasks();
+
+      subscription.cancel();
+      await flushMicrotasks();
+
+      streamGroup.stream.listen(null);
+      await flushMicrotasks();
+    });
+
+    test("drops events from a single-subscription stream while dormant",
+        () async {
+      var events = [];
+      var subscription = streamGroup.stream.listen(events.add);
+
+      var controller = new StreamController<String>();
+      streamGroup.add(controller.stream);
+      await flushMicrotasks();
+
+      controller.add("first");
+      await flushMicrotasks();
+      expect(events, equals(["first"]));
+
+      subscription.cancel();
+      controller.add("second");
+      await flushMicrotasks();
+      expect(events, equals(["first"]));
+
+      streamGroup.stream.listen(events.add);
+      controller.add("third");
+      await flushMicrotasks();
+      expect(events, equals(["first", "third"]));
+    });
+
+    test("a single-subscription stream can be removed while dormant", () async {
+      var controller = new StreamController<String>();
+      streamGroup.add(controller.stream);
+      await flushMicrotasks();
+
+      streamGroup.stream.listen(null).cancel();
+      await flushMicrotasks();
+
+      streamGroup.remove(controller.stream);
+      expect(controller.hasListener, isFalse);
+      await flushMicrotasks();
+
+      expect(streamGroup.stream.toList(), completion(isEmpty));
+      controller.add("first");
+      expect(streamGroup.close(), completes);
+    });
+  });
+
+  group("regardless of type", () {
+    group("single-subscription", () {
+      regardlessOfType(() => new StreamGroup<String>());
+    });
+
+    group("broadcast", () {
+      regardlessOfType(() => new StreamGroup<String>.broadcast());
+    });
+  });
+
+  test("merge() emits events from all components streams", () {
+    var controller1 = new StreamController<String>();
+    var controller2 = new StreamController<String>();
+
+    var merged = StreamGroup.merge([controller1.stream, controller2.stream]);
+
+    controller1.add("first");
+    controller1.close();
+    controller2.add("second");
+    controller2.close();
+
+    expect(merged.toList(), completion(unorderedEquals(["first", "second"])));
+  });
+}
+
+void regardlessOfType(StreamGroup<String> newStreamGroup()) {
+  var streamGroup;
+  setUp(() {
+    streamGroup = newStreamGroup();
+  });
+
+  group("add()", () {
+    group("while dormant", () {
+      test("doesn't listen to the stream until the group is listened to",
+          () async {
+        var controller = new StreamController<String>();
+
+        expect(streamGroup.add(controller.stream), isNull);
+        await flushMicrotasks();
+        expect(controller.hasListener, isFalse);
+
+        streamGroup.stream.listen(null);
+        await flushMicrotasks();
+        expect(controller.hasListener, isTrue);
+      });
+
+      test("is a no-op if the stream is already in the group", () {
+        var controller = new StreamController<String>();
+        streamGroup.add(controller.stream);
+        streamGroup.add(controller.stream);
+        streamGroup.add(controller.stream);
+
+        // If the stream was actually listened to multiple times, this would
+        // throw a StateError.
+        streamGroup.stream.listen(null);
+      });
+    });
+
+    group("while active", () {
+      var subscription;
+      setUp(() async {
+        subscription = streamGroup.stream.listen(null);
+        await flushMicrotasks();
+      });
+
+      test("listens to the stream immediately", () async {
+        var controller = new StreamController<String>();
+
+        expect(streamGroup.add(controller.stream), isNull);
+        await flushMicrotasks();
+        expect(controller.hasListener, isTrue);
+      });
+
+      test("is a no-op if the stream is already in the group", () async {
+        var controller = new StreamController<String>();
+
+        // If the stream were actually listened to more than once, future
+        // calls to [add] would throw [StateError]s.
+        streamGroup.add(controller.stream);
+        streamGroup.add(controller.stream);
+        streamGroup.add(controller.stream);
+      });
+    });
+  });
+
+  group("remove()", () {
+    group("while dormant", () {
+      test("stops emitting events for a stream that's removed", () async {
+        var controller = new StreamController<String>();
+        streamGroup.add(controller.stream);
+
+        expect(streamGroup.stream.toList(), completion(equals(["first"])));
+
+        controller.add("first");
+        await flushMicrotasks();
+        controller.add("second");
+
+        expect(streamGroup.remove(controller.stream), isNull);
+        expect(streamGroup.close(), completes);
+      });
+
+      test("is a no-op for an unknown stream", () {
+        var controller = new StreamController<String>();
+        expect(streamGroup.remove(controller.stream), isNull);
+      });
+
+      test("and closed closes the group when the last stream is removed",
+          () async {
+        var controller1 = new StreamController<String>();
+        var controller2 = new StreamController<String>();
+
+        streamGroup.add(controller1.stream);
+        streamGroup.add(controller2.stream);
+        await flushMicrotasks();
+
+        streamGroup.close();
+
+        streamGroup.remove(controller1.stream);
+        await flushMicrotasks();
+
+        streamGroup.remove(controller2.stream);
+        await flushMicrotasks();
+
+        expect(streamGroup.stream.toList(), completion(isEmpty));
+      });
+    });
+
+    group("while listening", () {
+      test("doesn't emit events from a removed stream", () {
+        var controller = new StreamController<String>();
+        streamGroup.add(controller.stream);
+
+        // The subscription to [controller.stream] is canceled synchronously, so
+        // the first event is dropped even though it was added before the
+        // removal. This is documented in [StreamGroup.remove].
+        expect(streamGroup.stream.toList(), completion(isEmpty));
+
+        controller.add("first");
+        expect(streamGroup.remove(controller.stream), isNull);
+        controller.add("second");
+
+        expect(streamGroup.close(), completes);
+      });
+
+      test("cancels the stream's subscription", () async {
+        var controller = new StreamController<String>();
+        streamGroup.add(controller.stream);
+
+        streamGroup.stream.listen(null);
+        await flushMicrotasks();
+        expect(controller.hasListener, isTrue);
+
+        streamGroup.remove(controller.stream);
+        await flushMicrotasks();
+        expect(controller.hasListener, isFalse);
+      });
+
+      test("forwards cancel errors", () async {
+        var controller = new StreamController<String>(
+            onCancel: () => throw "error");
+        streamGroup.add(controller.stream);
+
+        streamGroup.stream.listen(null);
+        await flushMicrotasks();
+
+        expect(streamGroup.remove(controller.stream), throwsA("error"));
+      });
+
+      test("forwards cancel futures", () async {
+        var completer = new Completer();
+        var controller = new StreamController<String>(
+            onCancel: () => completer.future);
+
+        streamGroup.stream.listen(null);
+        await flushMicrotasks();
+
+        streamGroup.add(controller.stream);
+        await flushMicrotasks();
+
+        var fired = false;
+        streamGroup.remove(controller.stream).then((_) => fired = true);
+
+        await flushMicrotasks();
+        expect(fired, isFalse);
+
+        completer.complete();
+        await flushMicrotasks();
+        expect(fired, isTrue);
+      });
+
+      test("is a no-op for an unknown stream", () async {
+        var controller = new StreamController<String>();
+        streamGroup.stream.listen(null);
+        await flushMicrotasks();
+
+        expect(streamGroup.remove(controller.stream), isNull);
+      });
+
+      test("and closed closes the group when the last stream is removed",
+          () async {
+        var done = false;
+        streamGroup.stream.listen(null, onDone: () => done = true);
+        await flushMicrotasks();
+
+        var controller1 = new StreamController<String>();
+        var controller2 = new StreamController<String>();
+
+        streamGroup.add(controller1.stream);
+        streamGroup.add(controller2.stream);
+        await flushMicrotasks();
+
+        streamGroup.close();
+
+        streamGroup.remove(controller1.stream);
+        await flushMicrotasks();
+        expect(done, isFalse);
+
+        streamGroup.remove(controller2.stream);
+        await flushMicrotasks();
+        expect(done, isTrue);
+      });
+    });
+  });
+
+  group("close()", () {
+    group("while dormant", () {
+      test("if there are no streams, closes the group", () {
+        expect(streamGroup.close(), completes);
+        expect(streamGroup.stream.toList(), completion(isEmpty));
+      });
+
+      test("if there are streams, closes the group once those streams close "
+          "and there's a listener", () async {
+        var controller1 = new StreamController<String>();
+        var controller2 = new StreamController<String>();
+
+        streamGroup.add(controller1.stream);
+        streamGroup.add(controller2.stream);
+        await flushMicrotasks();
+
+        streamGroup.close();
+
+        controller1.close();
+        controller2.close();
+        expect(streamGroup.stream.toList(), completion(isEmpty));
+      });
+    });
+
+    group("while active", () {
+      test("if there are no streams, closes the group", () {
+        expect(streamGroup.stream.toList(), completion(isEmpty));
+        expect(streamGroup.close(), completes);
+      });
+
+      test("if there are streams, closes the group once those streams close",
+          () async {
+        var done = false;
+        streamGroup.stream.listen(null, onDone: () => done = true);
+        await flushMicrotasks();
+
+        var controller1 = new StreamController<String>();
+        var controller2 = new StreamController<String>();
+
+        streamGroup.add(controller1.stream);
+        streamGroup.add(controller2.stream);
+        await flushMicrotasks();
+
+        streamGroup.close();
+        await flushMicrotasks();
+        expect(done, isFalse);
+
+        controller1.close();
+        await flushMicrotasks();
+        expect(done, isFalse);
+
+        controller2.close();
+        await flushMicrotasks();
+        expect(done, isTrue);
+      });
+    });
+
+    test("returns a Future that completes once all events are dispatched",
+        () async {
+      var events = [];
+      streamGroup.stream.listen(events.add);
+
+      var controller = new StreamController<String>();
+      streamGroup.add(controller.stream);
+      await flushMicrotasks();
+
+      // Add a bunch of events. Each of these will get dispatched in a
+      // separate microtask, so we can test that [close] only completes once
+      // all of them have dispatched.
+      controller.add("one");
+      controller.add("two");
+      controller.add("three");
+      controller.add("four");
+      controller.add("five");
+      controller.add("six");
+      controller.close();
+
+      await streamGroup.close();
+      expect(events, equals(["one", "two", "three", "four", "five", "six"]));
+    });
+  });
+}
+
+/// Wait for all microtasks to complete.
+Future flushMicrotasks() => new Future.delayed(Duration.ZERO);