Add a StreamSinkCompleter class.
R=lrn@google.com
Review URL: https://codereview.chromium.org//1616543002 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d7142da..b53d755 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 1.8.0
+
+- Added `StreamSinkCompleter`, for creating a `StreamSink` now and providing its
+ destination later as another sink.
+
## 1.7.0
- Added `SingleSubscriptionTransformer`, a `StreamTransformer` that converts a
diff --git a/lib/async.dart b/lib/async.dart
index ab630dd..ef59b73 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -21,6 +21,7 @@
export "src/stream_completer.dart";
export "src/stream_group.dart";
export "src/stream_queue.dart";
+export "src/stream_sink_completer.dart";
export "src/stream_sink_transformer.dart";
export "src/stream_splitter.dart";
export "src/subscription_stream.dart";
diff --git a/lib/src/stream_sink_completer.dart b/lib/src/stream_sink_completer.dart
new file mode 100644
index 0000000..10caa06
--- /dev/null
+++ b/lib/src/stream_sink_completer.dart
@@ -0,0 +1,151 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.stream_sink_completer;
+
+import 'dart:async';
+
+/// A [sink] where the destination is provided later.
+///
+/// The [sink] is a normal sink that you can add events to to immediately, but
+/// until [setDestinationSink] is called, the events will be buffered.
+///
+/// The same effect can be achieved by using a [StreamController] and adding it
+/// to the sink using [Sink.addStream] when the destination sink is ready. This
+/// class attempts to shortcut some of the overhead when possible. For example,
+/// if the [sink] only has events added after the destination sink has been set,
+/// those events are added directly to the sink.
+class StreamSinkCompleter<T> {
+ /// The sink for this completer.
+ ///
+ /// When a destination sink is provided, events that have been passed to the
+ /// sink will be forwarded to the destination.
+ ///
+ /// Events can be added to the sink either before or after a destination sink
+ /// is set.
+ final StreamSink<T> sink = new _CompleterSink<T>();
+
+ /// Returns [sink] typed as a [_CompleterSink].
+ _CompleterSink<T> get _sink => sink;
+
+ /// Sets a sink as the destination for events from the [StreamSinkCompleter]'s
+ /// [sink].
+ ///
+ /// The completer's [sink] will act exactly as [destinationSink].
+ ///
+ /// If the destination sink is set before events are added to [sink], further
+ /// events are forwarded directly to [destinationSink].
+ ///
+ /// If events are added to [sink] before setting the destination sink, they're
+ /// buffered until the destination is available.
+ ///
+ /// A destination sink may be set at most once.
+ void setDestinationSink(StreamSink<T> destinationSink) {
+ if (_sink._destinationSink != null) {
+ throw new StateError("Destination sink already set");
+ }
+ _sink._setDestinationSink(destinationSink);
+ }
+}
+
+/// [StreamSink] completed by [StreamSinkCompleter].
+class _CompleterSink<T> implements StreamSink<T> {
+ /// Controller for an intermediate sink.
+ ///
+ /// Created if the user adds events to this sink before the destination sink
+ /// is set.
+ StreamController<T> _controller;
+
+ /// Completer for [done].
+ ///
+ /// Created if the user requests the [done] future before the destination sink
+ /// is set.
+ Completer _doneCompleter;
+
+ /// Destination sink for the events added to this sink.
+ ///
+ /// Set when [StreamSinkCompleter.setDestinationSink] is called.
+ StreamSink<T> _destinationSink;
+
+ /// Whether events should be sent directly to [_destinationSink], as opposed
+ /// to going through [_controller].
+ bool get _canSendDirectly => _controller == null && _destinationSink != null;
+
+ Future get done {
+ if (_doneCompleter != null) return _doneCompleter.future;
+ if (_destinationSink == null) {
+ _doneCompleter = new Completer.sync();
+ return _doneCompleter.future;
+ }
+ return _destinationSink.done;
+ }
+
+ void add(T event) {
+ if (_canSendDirectly) {
+ _destinationSink.add(event);
+ } else {
+ _ensureController();
+ _controller.add(event);
+ }
+ }
+
+ void addError(error, [StackTrace stackTrace]) {
+ if (_canSendDirectly) {
+ _destinationSink.addError(error, stackTrace);
+ } else {
+ _ensureController();
+ _controller.addError(error, stackTrace);
+ }
+ }
+
+ Future addStream(Stream<T> stream) {
+ if (_canSendDirectly) return _destinationSink.addStream(stream);
+
+ _ensureController();
+ return _controller.addStream(stream, cancelOnError: false);
+ }
+
+ Future close() {
+ if (_canSendDirectly) {
+ _destinationSink.close();
+ } else {
+ _ensureController();
+ _controller.close();
+ }
+ return done;
+ }
+
+ /// Create [_controller] if it doesn't yet exist.
+ void _ensureController() {
+ if (_controller == null) _controller = new StreamController(sync: true);
+ }
+
+ /// Sets the destination sink to which events from this sink will be provided.
+ ///
+ /// If set before the user adds events, events will be added directly to the
+ /// destination sink. If the user adds events earlier, an intermediate sink is
+ /// created using a stream controller, and the destination sink is linked to
+ /// it later.
+ void _setDestinationSink(StreamSink<T> sink) {
+ assert(_destinationSink == null);
+ _destinationSink = sink;
+
+ // If the user has already added data, it's buffered in the controller, so
+ // we add it to the sink.
+ if (_controller != null) {
+ // Catch any error that may come from [addStream] or [sink.close]. They'll
+ // be reported through [done] anyway.
+ sink
+ .addStream(_controller.stream)
+ .whenComplete(sink.close)
+ .catchError((_) {});
+ }
+
+ // If the user has already asked when the sink is done, connect the sink's
+ // done callback to that completer.
+ if (_doneCompleter != null) {
+ _doneCompleter.complete(sink.done);
+ }
+ }
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index 423f3ce..ccfa970 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: async
-version: 1.7.0
+version: 1.8.0-dev
author: Dart Team <misc@dartlang.org>
description: Utility functions and classes related to the 'dart:async' library.
homepage: https://www.github.com/dart-lang/async
diff --git a/test/stream_sink_completer_test.dart b/test/stream_sink_completer_test.dart
new file mode 100644
index 0000000..3892d26
--- /dev/null
+++ b/test/stream_sink_completer_test.dart
@@ -0,0 +1,255 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import "dart:async";
+
+import "package:async/async.dart";
+import "package:test/test.dart";
+
+import "utils.dart";
+
+main() {
+ var completer;
+ setUp(() {
+ completer = new StreamSinkCompleter();
+ });
+
+ group("when a stream is linked before events are added", () {
+ test("data events are forwarded", () {
+ var sink = new TestSink();
+ completer.setDestinationSink(sink);
+ completer.sink..add(1)..add(2)..add(3)..add(4);
+
+ expect(sink.results[0].asValue.value, equals(1));
+ expect(sink.results[1].asValue.value, equals(2));
+ expect(sink.results[2].asValue.value, equals(3));
+ expect(sink.results[3].asValue.value, equals(4));
+ });
+
+ test("error events are forwarded", () {
+ var sink = new TestSink();
+ completer.setDestinationSink(sink);
+ completer.sink..addError("oh no")..addError("that's bad");
+
+ expect(sink.results[0].asError.error, equals("oh no"));
+ expect(sink.results[1].asError.error, equals("that's bad"));
+ });
+
+ test("addStream is forwarded", () async {
+ var sink = new TestSink();
+ completer.setDestinationSink(sink);
+
+ var controller = new StreamController();
+ completer.sink.addStream(controller.stream);
+
+ controller.add(1);
+ controller.addError("oh no");
+ controller.add(2);
+ controller.addError("that's bad");
+ await flushMicrotasks();
+
+ expect(sink.results[0].asValue.value, equals(1));
+ expect(sink.results[1].asError.error, equals("oh no"));
+ expect(sink.results[2].asValue.value, equals(2));
+ expect(sink.results[3].asError.error, equals("that's bad"));
+ expect(sink.isClosed, isFalse);
+
+ controller.close();
+ await flushMicrotasks();
+ expect(sink.isClosed, isFalse);
+ });
+
+ test("close() is forwarded", () {
+ var sink = new TestSink();
+ completer.setDestinationSink(sink);
+ completer.sink.close();
+ expect(sink.isClosed, isTrue);
+ });
+
+ test("the future from the inner close() is returned", () async {
+ var closeCompleter = new Completer();
+ var sink = new TestSink(onDone: () => closeCompleter.future);
+ completer.setDestinationSink(sink);
+
+ var closeCompleted = false;
+ completer.sink.close().then(expectAsync((_) {
+ closeCompleted = true;
+ }));
+
+ await flushMicrotasks();
+ expect(closeCompleted, isFalse);
+
+ closeCompleter.complete();
+ await flushMicrotasks();
+ expect(closeCompleted, isTrue);
+ });
+
+ test("errors are forwarded from the inner close()", () {
+ var sink = new TestSink(onDone: () => throw "oh no");
+ completer.setDestinationSink(sink);
+ expect(completer.sink.done, throwsA("oh no"));
+ expect(completer.sink.close(), throwsA("oh no"));
+ });
+
+ test("errors aren't top-leveled if only close() is listened to", () async {
+ var sink = new TestSink(onDone: () => throw "oh no");
+ completer.setDestinationSink(sink);
+ expect(completer.sink.close(), throwsA("oh no"));
+
+ // Give the event loop a chance to top-level errors if it's going to.
+ await flushMicrotasks();
+ });
+
+ test("errors aren't top-leveled if only done is listened to", () async {
+ var sink = new TestSink(onDone: () => throw "oh no");
+ completer.setDestinationSink(sink);
+ completer.sink.close();
+ expect(completer.sink.done, throwsA("oh no"));
+
+ // Give the event loop a chance to top-level errors if it's going to.
+ await flushMicrotasks();
+ });
+ });
+
+ group("when a stream is linked after events are added", () {
+ test("data events are forwarded", () async {
+ completer.sink..add(1)..add(2)..add(3)..add(4);
+ await flushMicrotasks();
+
+ var sink = new TestSink();
+ completer.setDestinationSink(sink);
+ await flushMicrotasks();
+
+ expect(sink.results[0].asValue.value, equals(1));
+ expect(sink.results[1].asValue.value, equals(2));
+ expect(sink.results[2].asValue.value, equals(3));
+ expect(sink.results[3].asValue.value, equals(4));
+ });
+
+ test("error events are forwarded", () async {
+ completer.sink..addError("oh no")..addError("that's bad");
+ await flushMicrotasks();
+
+ var sink = new TestSink();
+ completer.setDestinationSink(sink);
+ await flushMicrotasks();
+
+ expect(sink.results[0].asError.error, equals("oh no"));
+ expect(sink.results[1].asError.error, equals("that's bad"));
+ });
+
+ test("addStream is forwarded", () async {
+ var controller = new StreamController();
+ completer.sink.addStream(controller.stream);
+
+ controller.add(1);
+ controller.addError("oh no");
+ controller.add(2);
+ controller.addError("that's bad");
+ controller.close();
+ await flushMicrotasks();
+
+ var sink = new TestSink();
+ completer.setDestinationSink(sink);
+ await flushMicrotasks();
+
+ expect(sink.results[0].asValue.value, equals(1));
+ expect(sink.results[1].asError.error, equals("oh no"));
+ expect(sink.results[2].asValue.value, equals(2));
+ expect(sink.results[3].asError.error, equals("that's bad"));
+ expect(sink.isClosed, isFalse);
+ });
+
+ test("close() is forwarded", () async {
+ completer.sink.close();
+ await flushMicrotasks();
+
+ var sink = new TestSink();
+ completer.setDestinationSink(sink);
+ await flushMicrotasks();
+
+ expect(sink.isClosed, isTrue);
+ });
+
+ test("the future from the inner close() is returned", () async {
+ var closeCompleted = false;
+ completer.sink.close().then(expectAsync((_) {
+ closeCompleted = true;
+ }));
+ await flushMicrotasks();
+
+ var closeCompleter = new Completer();
+ var sink = new TestSink(onDone: () => closeCompleter.future);
+ completer.setDestinationSink(sink);
+ await flushMicrotasks();
+ expect(closeCompleted, isFalse);
+
+ closeCompleter.complete();
+ await flushMicrotasks();
+ expect(closeCompleted, isTrue);
+ });
+
+ test("errors are forwarded from the inner close()", () async {
+ expect(completer.sink.done, throwsA("oh no"));
+ expect(completer.sink.close(), throwsA("oh no"));
+ await flushMicrotasks();
+
+ var sink = new TestSink(onDone: () => throw "oh no");
+ completer.setDestinationSink(sink);
+ });
+
+ test("errors aren't top-leveled if only close() is listened to", () async {
+ expect(completer.sink.close(), throwsA("oh no"));
+ await flushMicrotasks();
+
+ var sink = new TestSink(onDone: () => throw "oh no");
+ completer.setDestinationSink(sink);
+
+ // Give the event loop a chance to top-level errors if it's going to.
+ await flushMicrotasks();
+ });
+
+ test("errors aren't top-leveled if only done is listened to", () async {
+ completer.sink.close();
+ expect(completer.sink.done, throwsA("oh no"));
+ await flushMicrotasks();
+
+ var sink = new TestSink(onDone: () => throw "oh no");
+ completer.setDestinationSink(sink);
+
+ // Give the event loop a chance to top-level errors if it's going to.
+ await flushMicrotasks();
+ });
+ });
+
+ test("the sink is closed, the destination is set, then done is read",
+ () async {
+ expect(completer.sink.close(), completes);
+ await flushMicrotasks();
+
+ completer.setDestinationSink(new TestSink());
+ await flushMicrotasks();
+
+ expect(completer.sink.done, completes);
+ });
+
+ test("done is read, the destination is set, then the sink is closed",
+ () async {
+ expect(completer.sink.done, completes);
+ await flushMicrotasks();
+
+ completer.setDestinationSink(new TestSink());
+ await flushMicrotasks();
+
+ expect(completer.sink.close(), completes);
+ });
+
+ test("doesn't allow the destination sink to be set multiple times", () {
+ completer.setDestinationSink(new TestSink());
+ expect(() => completer.setDestinationSink(new TestSink()),
+ throwsStateError);
+ expect(() => completer.setDestinationSink(new TestSink()),
+ throwsStateError);
+ });
+}
diff --git a/test/utils.dart b/test/utils.dart
index 0dc47fa..445b9fc 100644
--- a/test/utils.dart
+++ b/test/utils.dart
@@ -6,6 +6,8 @@
library async.test.util;
import "dart:async";
+
+import "package:async/async.dart";
import "package:test/test.dart";
/// A zero-millisecond timer should wait until after all microtasks.
@@ -40,3 +42,46 @@
Future addStream(Stream<T> stream) async {}
Future close() => completer.future;
}
+
+/// A [StreamSink] that collects all events added to it as results.
+///
+/// This is used for testing code that interacts with sinks.
+class TestSink<T> implements StreamSink<T> {
+ /// The results corresponding to events that have been added to the sink.
+ final results = <Result<T>>[];
+
+ /// Whether [close] has been called.
+ bool get isClosed => _isClosed;
+ var _isClosed = false;
+
+ Future get done => _doneCompleter.future;
+ final _doneCompleter = new Completer();
+
+ final Function _onDone;
+
+ /// Creates a new sink.
+ ///
+ /// If [onDone] is passed, it's called when the user calls [close]. Its result
+ /// is piped to the [done] future.
+ TestSink({onDone()}) : _onDone = onDone ?? (() {});
+
+ void add(T event) {
+ results.add(new Result<T>.value(event));
+ }
+
+ void addError(error, [StackTrace stackTrace]) {
+ results.add(new Result<T>.error(error, stackTrace));
+ }
+
+ Future addStream(Stream<T> stream) {
+ var completer = new Completer.sync();
+ stream.listen(add, onError: addError, onDone: completer.complete);
+ return completer.future;
+ }
+
+ Future close() {
+ _isClosed = true;
+ _doneCompleter.complete(new Future.microtask(_onDone));
+ return done;
+ }
+}