Add a StreamSplitter class.

This splits a stream into multiple identical streams.

R=lrn@google.com

Review URL: https://codereview.chromium.org//1190333007.
diff --git a/lib/async.dart b/lib/async.dart
index 345b8dc..a7ff26c 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -6,5 +6,6 @@
 
 export "src/future_group.dart";
 export "src/stream_group.dart";
+export "src/stream_splitter.dart";
 export "stream_zip.dart";
 export "result.dart";
diff --git a/lib/src/stream_splitter.dart b/lib/src/stream_splitter.dart
new file mode 100644
index 0000000..addba93
--- /dev/null
+++ b/lib/src/stream_splitter.dart
@@ -0,0 +1,213 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.stream_splitter;
+
+import 'dart:async';
+import 'dart:collection';
+
+import '../result.dart';
+import 'future_group.dart';
+
+/// A class that splits a single source stream into an arbitrary number of
+/// (single-subscription) streams (called "branch") that emit the same events.
+///
+/// Each branch will emit all the same values and errors as the source stream,
+/// regardless of which values have been emitted on other branches. This means
+/// that the splitter stores every event that has been emitted so far, which may
+/// consume a lot of memory. The user can call [close] to indicate that no more
+/// branches will be created, and this memory will be released.
+///
+/// The source stream is only listened to once a branch is created *and listened
+/// to*. It's paused when all branches are paused *or when all branches are
+/// canceled*, and resumed once there's at least one branch that's listening and
+/// unpaused. It's not canceled unless no branches are listening and [close] has
+/// been called.
+class StreamSplitter<T> {
+  /// The wrapped stream.
+  final Stream<T> _stream;
+
+  /// The subscription to [_stream].
+  ///
+  /// This will be `null` until a branch has a listener.
+  StreamSubscription<T> _subscription;
+
+  /// The buffer of events or errors that have already been emitted by
+  /// [_stream].
+  final _buffer = new Queue<Result<T>>();
+
+  /// The controllers for branches that are listening for future events from
+  /// [_stream].
+  ///
+  /// Once a branch is canceled, it's removed from this list. When [_stream] is
+  /// done, all branches are removed.
+  final _controllers = new Set<StreamController<T>>();
+
+  /// A group of futures returned by [close].
+  ///
+  /// This is used to ensure that [close] doesn't complete until all
+  /// [StreamController.close] and [StreamSubscription.cancel] calls complete.
+  final _closeGroup = new FutureGroup();
+
+  /// Whether [_stream] is done emitting events.
+  var _isDone = false;
+
+  /// Whether [close] has been called.
+  var _isClosed = false;
+
+  /// Splits [stream] into [count] identical streams.
+  ///
+  /// [count] defaults to 2. This is the same as creating [count] branches and
+  /// then closing the [StreamSplitter].
+  static List<Stream> splitFrom(Stream stream, [int count]) {
+    if (count == null) count = 2;
+    var splitter = new StreamSplitter(stream);
+    var streams = new List.generate(count, (_) => splitter.split());
+    splitter.close();
+    return streams;
+  }
+
+  StreamSplitter(this._stream);
+
+  /// Returns a single-subscription stream that's a copy of the input stream.
+  ///
+  /// This will throw a [StateError] if [close] has been called.
+  Stream<T> split() {
+    if (_isClosed) {
+      throw new StateError("Can't call split() on a closed StreamSplitter.");
+    }
+
+    var controller;
+    controller = new StreamController<T>(
+        onListen: _onListen,
+        onPause: _onPause,
+        onResume: _onResume,
+        onCancel: () => _onCancel(controller));
+
+    for (var result in _buffer) {
+      result.addTo(controller);
+    }
+
+    if (_isDone) {
+      _closeGroup.add(controller.close());
+    } else {
+      _controllers.add(controller);
+    }
+
+    return controller.stream;
+  }
+
+  /// Indicates that no more branches will be requested via [split].
+  ///
+  /// This clears the internal buffer of events. If there are no branches or all
+  /// branches have been canceled, this cancels the subscription to the input
+  /// stream.
+  ///
+  /// Returns a [Future] that completes once all events have been processed by
+  /// all branches and (if applicable) the subscription to the input stream has
+  /// been canceled.
+  Future close() {
+    if (_isClosed) return _closeGroup.future;
+    _isClosed = true;
+
+    _buffer.clear();
+    if (_controllers.isEmpty) _cancelSubscription();
+
+    return _closeGroup.future;
+  }
+
+  /// Cancel [_subscription] and close [_closeGroup].
+  ///
+  /// This should be called after all the branches' subscriptions have been
+  /// canceled and the splitter has been closed. In that case, we won't use the
+  /// events from [_subscription] any more, since there's nothing to pipe them
+  /// to and no more branches will be created. If [_subscription] is done,
+  /// canceling it will be a no-op.
+  ///
+  /// This may also be called before any branches have been created, in which
+  /// case [_subscription] will be `null`.
+  void _cancelSubscription() {
+    assert(_controllers.isEmpty);
+    assert(_isClosed);
+
+    var future = null;
+    if (_subscription != null) future = _subscription.cancel();
+    if (future != null) _closeGroup.add(future);
+    _closeGroup.close();
+  }
+
+  // StreamController events
+
+  /// Subscribe to [_stream] if we haven't yet done so, and resume the
+  /// subscription if we have.
+  void _onListen() {
+    if (_isDone) return;
+
+    if (_subscription != null) {
+      // Resume the subscription in case it was paused, either because all the
+      // controllers were paused or because the last one was canceled. If it
+      // wasn't paused, this will be a no-op.
+      _subscription.resume();
+    } else {
+      _subscription = _stream.listen(
+          _onData, onError: _onError, onDone: _onDone);
+    }
+  }
+
+  /// Pauses [_subscription] if every controller is paused.
+  void _onPause() {
+    if (!_controllers.every((controller) => controller.isPaused)) return;
+    _subscription.pause();
+  }
+
+  /// Resumes [_subscription].
+  ///
+  /// If [_subscription] wasn't paused, this is a no-op.
+  void _onResume() {
+    _subscription.resume();
+  }
+
+  /// Removes [controller] from [_controllers] and cancels or pauses
+  /// [_subscription] as appropriate.
+  ///
+  /// Since the controller emitting a done event will cause it to register as
+  /// canceled, this is the only way that a controller is ever removed from
+  /// [_controllers].
+  void _onCancel(StreamController controller) {
+    _controllers.remove(controller);
+    if (_controllers.isNotEmpty) return;
+
+    if (_isClosed) {
+      _cancelSubscription();
+    } else {
+      _subscription.pause();
+    }
+  }
+
+  // Stream events
+
+  /// Buffers [data] and passes it to [_controllers].
+  void _onData(T data) {
+    if (!_isClosed) _buffer.add(new Result.value(data));
+    for (var controller in _controllers) {
+      controller.add(data);
+    }
+  }
+
+  /// Buffers [error] and passes it to [_controllers].
+  void _onError(Object error, StackTrace stackTrace) {
+    if (!_isClosed) _buffer.add(new Result.error(error, stackTrace));
+    for (var controller in _controllers) {
+      controller.addError(error, stackTrace);
+    }
+  }
+
+  /// Marks [_controllers] as done.
+  void _onDone() {
+    _isDone = true;
+    for (var controller in _controllers) {
+      _closeGroup.add(controller.close());
+    }
+  }
+}
diff --git a/test/stream_splitter_test.dart b/test/stream_splitter_test.dart
new file mode 100644
index 0000000..7b77baa
--- /dev/null
+++ b/test/stream_splitter_test.dart
@@ -0,0 +1,288 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:test/test.dart';
+
+main() {
+  var controller;
+  var splitter;
+  setUp(() {
+    controller = new StreamController<int>();
+    splitter = new StreamSplitter<int>(controller.stream);
+  });
+
+  test("a branch that's created before the stream starts to replay it",
+      () async {
+    var events = [];
+    var branch = splitter.split();
+    splitter.close();
+    branch.listen(events.add);
+
+    controller.add(1);
+    await flushMicrotasks();
+    expect(events, equals([1]));
+
+    controller.add(2);
+    await flushMicrotasks();
+    expect(events, equals([1, 2]));
+
+    controller.add(3);
+    await flushMicrotasks();
+    expect(events, equals([1, 2, 3]));
+
+    controller.close();
+  });
+
+  test("a branch replays error events as well as data events", () {
+    var branch = splitter.split();
+    splitter.close();
+
+    controller.add(1);
+    controller.addError("error");
+    controller.add(3);
+    controller.close();
+
+    var count = 0;
+    branch.listen(expectAsync((value) {
+      expect(count, anyOf(0, 2));
+      expect(value, equals(count + 1));
+      count++;
+    }, count: 2), onError: expectAsync((error) {
+      expect(count, equals(1));
+      expect(error, equals("error"));
+      count++;
+    }), onDone: expectAsync(() {
+      expect(count, equals(3));
+    }));
+  });
+
+  test("a branch that's created in the middle of a stream replays it", () async {
+    controller.add(1);
+    controller.add(2);
+    await flushMicrotasks();
+
+    var branch = splitter.split();
+    splitter.close();
+
+    controller.add(3);
+    controller.add(4);
+    controller.close();
+
+    expect(branch.toList(), completion(equals([1, 2, 3, 4])));
+  });
+
+  test("a branch that's created after the stream is finished replays it",
+      () async {
+    controller.add(1);
+    controller.add(2);
+    controller.add(3);
+    controller.close();
+    await flushMicrotasks();
+
+    expect(splitter.split().toList(), completion(equals([1, 2, 3])));
+    splitter.close();
+  });
+
+  test("creates single-subscription branches", () async {
+    var branch = splitter.split();
+    expect(branch.isBroadcast, isFalse);
+    branch.listen(null);
+    expect(() => branch.listen(null), throwsStateError);
+    expect(() => branch.listen(null), throwsStateError);
+  });
+
+  test("creates branches with the correct reified type", () async {
+    var branch = splitter.split();
+    expect(branch, new isInstanceOf<Stream<int>>());
+    expect(branch, isNot(new isInstanceOf<Stream<String>>()));
+  });
+
+  test("multiple branches each replay the stream", () async {
+    var branch1 = splitter.split();
+    controller.add(1);
+    controller.add(2);
+    await flushMicrotasks();
+    
+    var branch2 = splitter.split();
+    controller.add(3);
+    controller.close();
+    await flushMicrotasks();
+    
+    var branch3 = splitter.split();
+    splitter.close();
+
+    expect(branch1.toList(), completion(equals([1, 2, 3])));
+    expect(branch2.toList(), completion(equals([1, 2, 3])));
+    expect(branch3.toList(), completion(equals([1, 2, 3])));
+  });
+
+  test("a branch doesn't close until the source stream closes", () async {
+    var branch = splitter.split();
+    splitter.close();
+
+    var closed = false;
+    branch.last.then((_) => closed = true);
+
+    controller.add(1);
+    controller.add(2);
+    controller.add(3);
+    await flushMicrotasks();
+    expect(closed, isFalse);
+
+    controller.close();
+    await flushMicrotasks();
+    expect(closed, isTrue);
+  });
+
+  test("the source stream isn't listened to until a branch is", () async {
+    expect(controller.hasListener, isFalse);
+
+    var branch = splitter.split();
+    splitter.close();
+    await flushMicrotasks();
+    expect(controller.hasListener, isFalse);
+
+    branch.listen(null);
+    await flushMicrotasks();
+    expect(controller.hasListener, isTrue);
+  });
+
+  test("the source stream is paused when all branches are paused", () async {
+    var branch1 = splitter.split();
+    var branch2 = splitter.split();
+    var branch3 = splitter.split();
+    splitter.close();
+
+    var subscription1 = branch1.listen(null);
+    var subscription2 = branch2.listen(null);
+    var subscription3 = branch3.listen(null);
+
+    subscription1.pause();
+    await flushMicrotasks();
+    expect(controller.isPaused, isFalse);
+
+    subscription2.pause();
+    await flushMicrotasks();
+    expect(controller.isPaused, isFalse);
+
+    subscription3.pause();
+    await flushMicrotasks();
+    expect(controller.isPaused, isTrue);
+
+    subscription2.resume();
+    await flushMicrotasks();
+    expect(controller.isPaused, isFalse);
+  });
+
+  test("the source stream is paused when all branches are canceled", () async {
+    var branch1 = splitter.split();
+    var branch2 = splitter.split();
+    var branch3 = splitter.split();
+
+    var subscription1 = branch1.listen(null);
+    var subscription2 = branch2.listen(null);
+    var subscription3 = branch3.listen(null);
+
+    subscription1.cancel();
+    await flushMicrotasks();
+    expect(controller.isPaused, isFalse);
+
+    subscription2.cancel();
+    await flushMicrotasks();
+    expect(controller.isPaused, isFalse);
+
+    subscription3.cancel();
+    await flushMicrotasks();
+    expect(controller.isPaused, isTrue);
+
+    var branch4 = splitter.split();
+    splitter.close();
+    await flushMicrotasks();
+    expect(controller.isPaused, isTrue);
+
+    branch4.listen(null);
+    await flushMicrotasks();
+    expect(controller.isPaused, isFalse);
+  });
+
+  test("the source stream is canceled when it's closed after all branches have "
+      "been canceled", () async {
+    var branch1 = splitter.split();
+    var branch2 = splitter.split();
+    var branch3 = splitter.split();
+
+    var subscription1 = branch1.listen(null);
+    var subscription2 = branch2.listen(null);
+    var subscription3 = branch3.listen(null);
+
+    subscription1.cancel();
+    await flushMicrotasks();
+    expect(controller.hasListener, isTrue);
+
+    subscription2.cancel();
+    await flushMicrotasks();
+    expect(controller.hasListener, isTrue);
+
+    subscription3.cancel();
+    await flushMicrotasks();
+    expect(controller.hasListener, isTrue);
+
+    splitter.close();
+    expect(controller.hasListener, isFalse);
+  });
+
+  test("the source stream is canceled when all branches are canceled after it "
+      "has been closed", () async {
+    var branch1 = splitter.split();
+    var branch2 = splitter.split();
+    var branch3 = splitter.split();
+    splitter.close();
+
+    var subscription1 = branch1.listen(null);
+    var subscription2 = branch2.listen(null);
+    var subscription3 = branch3.listen(null);
+
+    subscription1.cancel();
+    await flushMicrotasks();
+    expect(controller.hasListener, isTrue);
+
+    subscription2.cancel();
+    await flushMicrotasks();
+    expect(controller.hasListener, isTrue);
+
+    subscription3.cancel();
+    await flushMicrotasks();
+    expect(controller.hasListener, isFalse);
+  });
+
+  test("a splitter that's closed before any branches are added never listens "
+      "to the source stream", () {
+    splitter.close();
+
+    // This would throw an error if the stream had already been listened to.
+    controller.stream.listen(null);
+  });
+
+  test("splitFrom splits a source stream into the designated number of "
+      "branches", () {
+    var branches = StreamSplitter.splitFrom(controller.stream, 5);
+
+    controller.add(1);
+    controller.add(2);
+    controller.add(3);
+    controller.close();
+
+    expect(branches[0].toList(), completion(equals([1, 2, 3])));
+    expect(branches[1].toList(), completion(equals([1, 2, 3])));
+    expect(branches[2].toList(), completion(equals([1, 2, 3])));
+    expect(branches[3].toList(), completion(equals([1, 2, 3])));
+    expect(branches[4].toList(), completion(equals([1, 2, 3])));
+  });
+}
+
+/// Wait for all microtasks to complete.
+Future flushMicrotasks() => new Future.delayed(Duration.ZERO);