Add a StreamSplitter class.
This splits a stream into multiple identical streams.
R=lrn@google.com
Review URL: https://codereview.chromium.org//1190333007.
diff --git a/lib/async.dart b/lib/async.dart
index 345b8dc..a7ff26c 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -6,5 +6,6 @@
export "src/future_group.dart";
export "src/stream_group.dart";
+export "src/stream_splitter.dart";
export "stream_zip.dart";
export "result.dart";
diff --git a/lib/src/stream_splitter.dart b/lib/src/stream_splitter.dart
new file mode 100644
index 0000000..addba93
--- /dev/null
+++ b/lib/src/stream_splitter.dart
@@ -0,0 +1,213 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.stream_splitter;
+
+import 'dart:async';
+import 'dart:collection';
+
+import '../result.dart';
+import 'future_group.dart';
+
+/// A class that splits a single source stream into an arbitrary number of
+/// (single-subscription) streams (called "branch") that emit the same events.
+///
+/// Each branch will emit all the same values and errors as the source stream,
+/// regardless of which values have been emitted on other branches. This means
+/// that the splitter stores every event that has been emitted so far, which may
+/// consume a lot of memory. The user can call [close] to indicate that no more
+/// branches will be created, and this memory will be released.
+///
+/// The source stream is only listened to once a branch is created *and listened
+/// to*. It's paused when all branches are paused *or when all branches are
+/// canceled*, and resumed once there's at least one branch that's listening and
+/// unpaused. It's not canceled unless no branches are listening and [close] has
+/// been called.
+class StreamSplitter<T> {
+ /// The wrapped stream.
+ final Stream<T> _stream;
+
+ /// The subscription to [_stream].
+ ///
+ /// This will be `null` until a branch has a listener.
+ StreamSubscription<T> _subscription;
+
+ /// The buffer of events or errors that have already been emitted by
+ /// [_stream].
+ final _buffer = new Queue<Result<T>>();
+
+ /// The controllers for branches that are listening for future events from
+ /// [_stream].
+ ///
+ /// Once a branch is canceled, it's removed from this list. When [_stream] is
+ /// done, all branches are removed.
+ final _controllers = new Set<StreamController<T>>();
+
+ /// A group of futures returned by [close].
+ ///
+ /// This is used to ensure that [close] doesn't complete until all
+ /// [StreamController.close] and [StreamSubscription.cancel] calls complete.
+ final _closeGroup = new FutureGroup();
+
+ /// Whether [_stream] is done emitting events.
+ var _isDone = false;
+
+ /// Whether [close] has been called.
+ var _isClosed = false;
+
+ /// Splits [stream] into [count] identical streams.
+ ///
+ /// [count] defaults to 2. This is the same as creating [count] branches and
+ /// then closing the [StreamSplitter].
+ static List<Stream> splitFrom(Stream stream, [int count]) {
+ if (count == null) count = 2;
+ var splitter = new StreamSplitter(stream);
+ var streams = new List.generate(count, (_) => splitter.split());
+ splitter.close();
+ return streams;
+ }
+
+ StreamSplitter(this._stream);
+
+ /// Returns a single-subscription stream that's a copy of the input stream.
+ ///
+ /// This will throw a [StateError] if [close] has been called.
+ Stream<T> split() {
+ if (_isClosed) {
+ throw new StateError("Can't call split() on a closed StreamSplitter.");
+ }
+
+ var controller;
+ controller = new StreamController<T>(
+ onListen: _onListen,
+ onPause: _onPause,
+ onResume: _onResume,
+ onCancel: () => _onCancel(controller));
+
+ for (var result in _buffer) {
+ result.addTo(controller);
+ }
+
+ if (_isDone) {
+ _closeGroup.add(controller.close());
+ } else {
+ _controllers.add(controller);
+ }
+
+ return controller.stream;
+ }
+
+ /// Indicates that no more branches will be requested via [split].
+ ///
+ /// This clears the internal buffer of events. If there are no branches or all
+ /// branches have been canceled, this cancels the subscription to the input
+ /// stream.
+ ///
+ /// Returns a [Future] that completes once all events have been processed by
+ /// all branches and (if applicable) the subscription to the input stream has
+ /// been canceled.
+ Future close() {
+ if (_isClosed) return _closeGroup.future;
+ _isClosed = true;
+
+ _buffer.clear();
+ if (_controllers.isEmpty) _cancelSubscription();
+
+ return _closeGroup.future;
+ }
+
+ /// Cancel [_subscription] and close [_closeGroup].
+ ///
+ /// This should be called after all the branches' subscriptions have been
+ /// canceled and the splitter has been closed. In that case, we won't use the
+ /// events from [_subscription] any more, since there's nothing to pipe them
+ /// to and no more branches will be created. If [_subscription] is done,
+ /// canceling it will be a no-op.
+ ///
+ /// This may also be called before any branches have been created, in which
+ /// case [_subscription] will be `null`.
+ void _cancelSubscription() {
+ assert(_controllers.isEmpty);
+ assert(_isClosed);
+
+ var future = null;
+ if (_subscription != null) future = _subscription.cancel();
+ if (future != null) _closeGroup.add(future);
+ _closeGroup.close();
+ }
+
+ // StreamController events
+
+ /// Subscribe to [_stream] if we haven't yet done so, and resume the
+ /// subscription if we have.
+ void _onListen() {
+ if (_isDone) return;
+
+ if (_subscription != null) {
+ // Resume the subscription in case it was paused, either because all the
+ // controllers were paused or because the last one was canceled. If it
+ // wasn't paused, this will be a no-op.
+ _subscription.resume();
+ } else {
+ _subscription = _stream.listen(
+ _onData, onError: _onError, onDone: _onDone);
+ }
+ }
+
+ /// Pauses [_subscription] if every controller is paused.
+ void _onPause() {
+ if (!_controllers.every((controller) => controller.isPaused)) return;
+ _subscription.pause();
+ }
+
+ /// Resumes [_subscription].
+ ///
+ /// If [_subscription] wasn't paused, this is a no-op.
+ void _onResume() {
+ _subscription.resume();
+ }
+
+ /// Removes [controller] from [_controllers] and cancels or pauses
+ /// [_subscription] as appropriate.
+ ///
+ /// Since the controller emitting a done event will cause it to register as
+ /// canceled, this is the only way that a controller is ever removed from
+ /// [_controllers].
+ void _onCancel(StreamController controller) {
+ _controllers.remove(controller);
+ if (_controllers.isNotEmpty) return;
+
+ if (_isClosed) {
+ _cancelSubscription();
+ } else {
+ _subscription.pause();
+ }
+ }
+
+ // Stream events
+
+ /// Buffers [data] and passes it to [_controllers].
+ void _onData(T data) {
+ if (!_isClosed) _buffer.add(new Result.value(data));
+ for (var controller in _controllers) {
+ controller.add(data);
+ }
+ }
+
+ /// Buffers [error] and passes it to [_controllers].
+ void _onError(Object error, StackTrace stackTrace) {
+ if (!_isClosed) _buffer.add(new Result.error(error, stackTrace));
+ for (var controller in _controllers) {
+ controller.addError(error, stackTrace);
+ }
+ }
+
+ /// Marks [_controllers] as done.
+ void _onDone() {
+ _isDone = true;
+ for (var controller in _controllers) {
+ _closeGroup.add(controller.close());
+ }
+ }
+}
diff --git a/test/stream_splitter_test.dart b/test/stream_splitter_test.dart
new file mode 100644
index 0000000..7b77baa
--- /dev/null
+++ b/test/stream_splitter_test.dart
@@ -0,0 +1,288 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:test/test.dart';
+
+main() {
+ var controller;
+ var splitter;
+ setUp(() {
+ controller = new StreamController<int>();
+ splitter = new StreamSplitter<int>(controller.stream);
+ });
+
+ test("a branch that's created before the stream starts to replay it",
+ () async {
+ var events = [];
+ var branch = splitter.split();
+ splitter.close();
+ branch.listen(events.add);
+
+ controller.add(1);
+ await flushMicrotasks();
+ expect(events, equals([1]));
+
+ controller.add(2);
+ await flushMicrotasks();
+ expect(events, equals([1, 2]));
+
+ controller.add(3);
+ await flushMicrotasks();
+ expect(events, equals([1, 2, 3]));
+
+ controller.close();
+ });
+
+ test("a branch replays error events as well as data events", () {
+ var branch = splitter.split();
+ splitter.close();
+
+ controller.add(1);
+ controller.addError("error");
+ controller.add(3);
+ controller.close();
+
+ var count = 0;
+ branch.listen(expectAsync((value) {
+ expect(count, anyOf(0, 2));
+ expect(value, equals(count + 1));
+ count++;
+ }, count: 2), onError: expectAsync((error) {
+ expect(count, equals(1));
+ expect(error, equals("error"));
+ count++;
+ }), onDone: expectAsync(() {
+ expect(count, equals(3));
+ }));
+ });
+
+ test("a branch that's created in the middle of a stream replays it", () async {
+ controller.add(1);
+ controller.add(2);
+ await flushMicrotasks();
+
+ var branch = splitter.split();
+ splitter.close();
+
+ controller.add(3);
+ controller.add(4);
+ controller.close();
+
+ expect(branch.toList(), completion(equals([1, 2, 3, 4])));
+ });
+
+ test("a branch that's created after the stream is finished replays it",
+ () async {
+ controller.add(1);
+ controller.add(2);
+ controller.add(3);
+ controller.close();
+ await flushMicrotasks();
+
+ expect(splitter.split().toList(), completion(equals([1, 2, 3])));
+ splitter.close();
+ });
+
+ test("creates single-subscription branches", () async {
+ var branch = splitter.split();
+ expect(branch.isBroadcast, isFalse);
+ branch.listen(null);
+ expect(() => branch.listen(null), throwsStateError);
+ expect(() => branch.listen(null), throwsStateError);
+ });
+
+ test("creates branches with the correct reified type", () async {
+ var branch = splitter.split();
+ expect(branch, new isInstanceOf<Stream<int>>());
+ expect(branch, isNot(new isInstanceOf<Stream<String>>()));
+ });
+
+ test("multiple branches each replay the stream", () async {
+ var branch1 = splitter.split();
+ controller.add(1);
+ controller.add(2);
+ await flushMicrotasks();
+
+ var branch2 = splitter.split();
+ controller.add(3);
+ controller.close();
+ await flushMicrotasks();
+
+ var branch3 = splitter.split();
+ splitter.close();
+
+ expect(branch1.toList(), completion(equals([1, 2, 3])));
+ expect(branch2.toList(), completion(equals([1, 2, 3])));
+ expect(branch3.toList(), completion(equals([1, 2, 3])));
+ });
+
+ test("a branch doesn't close until the source stream closes", () async {
+ var branch = splitter.split();
+ splitter.close();
+
+ var closed = false;
+ branch.last.then((_) => closed = true);
+
+ controller.add(1);
+ controller.add(2);
+ controller.add(3);
+ await flushMicrotasks();
+ expect(closed, isFalse);
+
+ controller.close();
+ await flushMicrotasks();
+ expect(closed, isTrue);
+ });
+
+ test("the source stream isn't listened to until a branch is", () async {
+ expect(controller.hasListener, isFalse);
+
+ var branch = splitter.split();
+ splitter.close();
+ await flushMicrotasks();
+ expect(controller.hasListener, isFalse);
+
+ branch.listen(null);
+ await flushMicrotasks();
+ expect(controller.hasListener, isTrue);
+ });
+
+ test("the source stream is paused when all branches are paused", () async {
+ var branch1 = splitter.split();
+ var branch2 = splitter.split();
+ var branch3 = splitter.split();
+ splitter.close();
+
+ var subscription1 = branch1.listen(null);
+ var subscription2 = branch2.listen(null);
+ var subscription3 = branch3.listen(null);
+
+ subscription1.pause();
+ await flushMicrotasks();
+ expect(controller.isPaused, isFalse);
+
+ subscription2.pause();
+ await flushMicrotasks();
+ expect(controller.isPaused, isFalse);
+
+ subscription3.pause();
+ await flushMicrotasks();
+ expect(controller.isPaused, isTrue);
+
+ subscription2.resume();
+ await flushMicrotasks();
+ expect(controller.isPaused, isFalse);
+ });
+
+ test("the source stream is paused when all branches are canceled", () async {
+ var branch1 = splitter.split();
+ var branch2 = splitter.split();
+ var branch3 = splitter.split();
+
+ var subscription1 = branch1.listen(null);
+ var subscription2 = branch2.listen(null);
+ var subscription3 = branch3.listen(null);
+
+ subscription1.cancel();
+ await flushMicrotasks();
+ expect(controller.isPaused, isFalse);
+
+ subscription2.cancel();
+ await flushMicrotasks();
+ expect(controller.isPaused, isFalse);
+
+ subscription3.cancel();
+ await flushMicrotasks();
+ expect(controller.isPaused, isTrue);
+
+ var branch4 = splitter.split();
+ splitter.close();
+ await flushMicrotasks();
+ expect(controller.isPaused, isTrue);
+
+ branch4.listen(null);
+ await flushMicrotasks();
+ expect(controller.isPaused, isFalse);
+ });
+
+ test("the source stream is canceled when it's closed after all branches have "
+ "been canceled", () async {
+ var branch1 = splitter.split();
+ var branch2 = splitter.split();
+ var branch3 = splitter.split();
+
+ var subscription1 = branch1.listen(null);
+ var subscription2 = branch2.listen(null);
+ var subscription3 = branch3.listen(null);
+
+ subscription1.cancel();
+ await flushMicrotasks();
+ expect(controller.hasListener, isTrue);
+
+ subscription2.cancel();
+ await flushMicrotasks();
+ expect(controller.hasListener, isTrue);
+
+ subscription3.cancel();
+ await flushMicrotasks();
+ expect(controller.hasListener, isTrue);
+
+ splitter.close();
+ expect(controller.hasListener, isFalse);
+ });
+
+ test("the source stream is canceled when all branches are canceled after it "
+ "has been closed", () async {
+ var branch1 = splitter.split();
+ var branch2 = splitter.split();
+ var branch3 = splitter.split();
+ splitter.close();
+
+ var subscription1 = branch1.listen(null);
+ var subscription2 = branch2.listen(null);
+ var subscription3 = branch3.listen(null);
+
+ subscription1.cancel();
+ await flushMicrotasks();
+ expect(controller.hasListener, isTrue);
+
+ subscription2.cancel();
+ await flushMicrotasks();
+ expect(controller.hasListener, isTrue);
+
+ subscription3.cancel();
+ await flushMicrotasks();
+ expect(controller.hasListener, isFalse);
+ });
+
+ test("a splitter that's closed before any branches are added never listens "
+ "to the source stream", () {
+ splitter.close();
+
+ // This would throw an error if the stream had already been listened to.
+ controller.stream.listen(null);
+ });
+
+ test("splitFrom splits a source stream into the designated number of "
+ "branches", () {
+ var branches = StreamSplitter.splitFrom(controller.stream, 5);
+
+ controller.add(1);
+ controller.add(2);
+ controller.add(3);
+ controller.close();
+
+ expect(branches[0].toList(), completion(equals([1, 2, 3])));
+ expect(branches[1].toList(), completion(equals([1, 2, 3])));
+ expect(branches[2].toList(), completion(equals([1, 2, 3])));
+ expect(branches[3].toList(), completion(equals([1, 2, 3])));
+ expect(branches[4].toList(), completion(equals([1, 2, 3])));
+ });
+}
+
+/// Wait for all microtasks to complete.
+Future flushMicrotasks() => new Future.delayed(Duration.ZERO);