Add a StreamSinkCompleter class.

R=lrn@google.com

Review URL: https://codereview.chromium.org//1616543002 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d7142da..b53d755 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 1.8.0
+
+- Added `StreamSinkCompleter`, for creating a `StreamSink` now and providing its
+  destination later as another sink.
+
 ## 1.7.0
 
 - Added `SingleSubscriptionTransformer`, a `StreamTransformer` that converts a
diff --git a/lib/async.dart b/lib/async.dart
index ab630dd..ef59b73 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -21,6 +21,7 @@
 export "src/stream_completer.dart";
 export "src/stream_group.dart";
 export "src/stream_queue.dart";
+export "src/stream_sink_completer.dart";
 export "src/stream_sink_transformer.dart";
 export "src/stream_splitter.dart";
 export "src/subscription_stream.dart";
diff --git a/lib/src/stream_sink_completer.dart b/lib/src/stream_sink_completer.dart
new file mode 100644
index 0000000..10caa06
--- /dev/null
+++ b/lib/src/stream_sink_completer.dart
@@ -0,0 +1,151 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.stream_sink_completer;
+
+import 'dart:async';
+
+/// A [sink] where the destination is provided later.
+///
+/// The [sink] is a normal sink that you can add events to to immediately, but
+/// until [setDestinationSink] is called, the events will be buffered.
+///
+/// The same effect can be achieved by using a [StreamController] and adding it
+/// to the sink using [Sink.addStream] when the destination sink is ready. This
+/// class attempts to shortcut some of the overhead when possible. For example,
+/// if the [sink] only has events added after the destination sink has been set,
+/// those events are added directly to the sink.
+class StreamSinkCompleter<T> {
+  /// The sink for this completer.
+  ///
+  /// When a destination sink is provided, events that have been passed to the
+  /// sink will be forwarded to the destination.
+  ///
+  /// Events can be added to the sink either before or after a destination sink
+  /// is set.
+  final StreamSink<T> sink = new _CompleterSink<T>();
+
+  /// Returns [sink] typed as a [_CompleterSink].
+  _CompleterSink<T> get _sink => sink;
+
+  /// Sets a sink as the destination for events from the [StreamSinkCompleter]'s
+  /// [sink].
+  ///
+  /// The completer's [sink] will act exactly as [destinationSink].
+  ///
+  /// If the destination sink is set before events are added to [sink], further
+  /// events are forwarded directly to [destinationSink].
+  ///
+  /// If events are added to [sink] before setting the destination sink, they're
+  /// buffered until the destination is available.
+  ///
+  /// A destination sink may be set at most once.
+  void setDestinationSink(StreamSink<T> destinationSink) {
+    if (_sink._destinationSink != null) {
+      throw new StateError("Destination sink already set");
+    }
+    _sink._setDestinationSink(destinationSink);
+  }
+}
+
+/// [StreamSink] completed by [StreamSinkCompleter].
+class _CompleterSink<T> implements StreamSink<T> {
+  /// Controller for an intermediate sink.
+  ///
+  /// Created if the user adds events to this sink before the destination sink
+  /// is set.
+  StreamController<T> _controller;
+
+  /// Completer for [done].
+  ///
+  /// Created if the user requests the [done] future before the destination sink
+  /// is set.
+  Completer _doneCompleter;
+
+  /// Destination sink for the events added to this sink.
+  ///
+  /// Set when [StreamSinkCompleter.setDestinationSink] is called.
+  StreamSink<T> _destinationSink;
+
+  /// Whether events should be sent directly to [_destinationSink], as opposed
+  /// to going through [_controller].
+  bool get _canSendDirectly => _controller == null && _destinationSink != null;
+
+  Future get done {
+    if (_doneCompleter != null) return _doneCompleter.future;
+    if (_destinationSink == null) {
+      _doneCompleter = new Completer.sync();
+      return _doneCompleter.future;
+    }
+    return _destinationSink.done;
+  }
+
+  void add(T event) {
+    if (_canSendDirectly) {
+      _destinationSink.add(event);
+    } else {
+      _ensureController();
+      _controller.add(event);
+    }
+  }
+
+  void addError(error, [StackTrace stackTrace]) {
+    if (_canSendDirectly) {
+      _destinationSink.addError(error, stackTrace);
+    } else {
+      _ensureController();
+      _controller.addError(error, stackTrace);
+    }
+  }
+
+  Future addStream(Stream<T> stream) {
+    if (_canSendDirectly) return _destinationSink.addStream(stream);
+
+    _ensureController();
+    return _controller.addStream(stream, cancelOnError: false);
+  }
+
+  Future close() {
+    if (_canSendDirectly) {
+      _destinationSink.close();
+    } else {
+      _ensureController();
+      _controller.close();
+    }
+    return done;
+  }
+
+  /// Create [_controller] if it doesn't yet exist.
+  void _ensureController() {
+    if (_controller == null) _controller = new StreamController(sync: true);
+  }
+
+  /// Sets the destination sink to which events from this sink will be provided.
+  ///
+  /// If set before the user adds events, events will be added directly to the
+  /// destination sink. If the user adds events earlier, an intermediate sink is
+  /// created using a stream controller, and the destination sink is linked to
+  /// it later.
+  void _setDestinationSink(StreamSink<T> sink) {
+    assert(_destinationSink == null);
+    _destinationSink = sink;
+
+    // If the user has already added data, it's buffered in the controller, so
+    // we add it to the sink.
+    if (_controller != null) {
+      // Catch any error that may come from [addStream] or [sink.close]. They'll
+      // be reported through [done] anyway.
+      sink
+          .addStream(_controller.stream)
+          .whenComplete(sink.close)
+          .catchError((_) {});
+    }
+
+    // If the user has already asked when the sink is done, connect the sink's
+    // done callback to that completer.
+    if (_doneCompleter != null) {
+      _doneCompleter.complete(sink.done);
+    }
+  }
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index 423f3ce..ccfa970 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 1.7.0
+version: 1.8.0-dev
 author: Dart Team <misc@dartlang.org>
 description: Utility functions and classes related to the 'dart:async' library.
 homepage: https://www.github.com/dart-lang/async
diff --git a/test/stream_sink_completer_test.dart b/test/stream_sink_completer_test.dart
new file mode 100644
index 0000000..3892d26
--- /dev/null
+++ b/test/stream_sink_completer_test.dart
@@ -0,0 +1,255 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import "dart:async";
+
+import "package:async/async.dart";
+import "package:test/test.dart";
+
+import "utils.dart";
+
+main() {
+  var completer;
+  setUp(() {
+    completer = new StreamSinkCompleter();
+  });
+
+  group("when a stream is linked before events are added", () {
+    test("data events are forwarded", () {
+      var sink = new TestSink();
+      completer.setDestinationSink(sink);
+      completer.sink..add(1)..add(2)..add(3)..add(4);
+
+      expect(sink.results[0].asValue.value, equals(1));
+      expect(sink.results[1].asValue.value, equals(2));
+      expect(sink.results[2].asValue.value, equals(3));
+      expect(sink.results[3].asValue.value, equals(4));
+    });
+
+    test("error events are forwarded", () {
+      var sink = new TestSink();
+      completer.setDestinationSink(sink);
+      completer.sink..addError("oh no")..addError("that's bad");
+
+      expect(sink.results[0].asError.error, equals("oh no"));
+      expect(sink.results[1].asError.error, equals("that's bad"));
+    });
+
+    test("addStream is forwarded", () async {
+      var sink = new TestSink();
+      completer.setDestinationSink(sink);
+
+      var controller = new StreamController();
+      completer.sink.addStream(controller.stream);
+
+      controller.add(1);
+      controller.addError("oh no");
+      controller.add(2);
+      controller.addError("that's bad");
+      await flushMicrotasks();
+
+      expect(sink.results[0].asValue.value, equals(1));
+      expect(sink.results[1].asError.error, equals("oh no"));
+      expect(sink.results[2].asValue.value, equals(2));
+      expect(sink.results[3].asError.error, equals("that's bad"));
+      expect(sink.isClosed, isFalse);
+
+      controller.close();
+      await flushMicrotasks();
+      expect(sink.isClosed, isFalse);
+    });
+
+    test("close() is forwarded", () {
+      var sink = new TestSink();
+      completer.setDestinationSink(sink);
+      completer.sink.close();
+      expect(sink.isClosed, isTrue);
+    });
+
+    test("the future from the inner close() is returned", () async {
+      var closeCompleter = new Completer();
+      var sink = new TestSink(onDone: () => closeCompleter.future);
+      completer.setDestinationSink(sink);
+
+      var closeCompleted = false;
+      completer.sink.close().then(expectAsync((_) {
+        closeCompleted = true;
+      }));
+
+      await flushMicrotasks();
+      expect(closeCompleted, isFalse);
+
+      closeCompleter.complete();
+      await flushMicrotasks();
+      expect(closeCompleted, isTrue);
+    });
+
+    test("errors are forwarded from the inner close()", () {
+      var sink = new TestSink(onDone: () => throw "oh no");
+      completer.setDestinationSink(sink);
+      expect(completer.sink.done, throwsA("oh no"));
+      expect(completer.sink.close(), throwsA("oh no"));
+    });
+
+    test("errors aren't top-leveled if only close() is listened to", () async {
+      var sink = new TestSink(onDone: () => throw "oh no");
+      completer.setDestinationSink(sink);
+      expect(completer.sink.close(), throwsA("oh no"));
+
+      // Give the event loop a chance to top-level errors if it's going to.
+      await flushMicrotasks();
+    });
+
+    test("errors aren't top-leveled if only done is listened to", () async {
+      var sink = new TestSink(onDone: () => throw "oh no");
+      completer.setDestinationSink(sink);
+      completer.sink.close();
+      expect(completer.sink.done, throwsA("oh no"));
+
+      // Give the event loop a chance to top-level errors if it's going to.
+      await flushMicrotasks();
+    });
+  });
+
+  group("when a stream is linked after events are added", () {
+    test("data events are forwarded", () async {
+      completer.sink..add(1)..add(2)..add(3)..add(4);
+      await flushMicrotasks();
+
+      var sink = new TestSink();
+      completer.setDestinationSink(sink);
+      await flushMicrotasks();
+
+      expect(sink.results[0].asValue.value, equals(1));
+      expect(sink.results[1].asValue.value, equals(2));
+      expect(sink.results[2].asValue.value, equals(3));
+      expect(sink.results[3].asValue.value, equals(4));
+    });
+
+    test("error events are forwarded", () async {
+      completer.sink..addError("oh no")..addError("that's bad");
+      await flushMicrotasks();
+
+      var sink = new TestSink();
+      completer.setDestinationSink(sink);
+      await flushMicrotasks();
+
+      expect(sink.results[0].asError.error, equals("oh no"));
+      expect(sink.results[1].asError.error, equals("that's bad"));
+    });
+
+    test("addStream is forwarded", () async {
+      var controller = new StreamController();
+      completer.sink.addStream(controller.stream);
+
+      controller.add(1);
+      controller.addError("oh no");
+      controller.add(2);
+      controller.addError("that's bad");
+      controller.close();
+      await flushMicrotasks();
+
+      var sink = new TestSink();
+      completer.setDestinationSink(sink);
+      await flushMicrotasks();
+
+      expect(sink.results[0].asValue.value, equals(1));
+      expect(sink.results[1].asError.error, equals("oh no"));
+      expect(sink.results[2].asValue.value, equals(2));
+      expect(sink.results[3].asError.error, equals("that's bad"));
+      expect(sink.isClosed, isFalse);
+    });
+
+    test("close() is forwarded", () async {
+      completer.sink.close();
+      await flushMicrotasks();
+
+      var sink = new TestSink();
+      completer.setDestinationSink(sink);
+      await flushMicrotasks();
+
+      expect(sink.isClosed, isTrue);
+    });
+
+    test("the future from the inner close() is returned", () async {
+      var closeCompleted = false;
+      completer.sink.close().then(expectAsync((_) {
+        closeCompleted = true;
+      }));
+      await flushMicrotasks();
+
+      var closeCompleter = new Completer();
+      var sink = new TestSink(onDone: () => closeCompleter.future);
+      completer.setDestinationSink(sink);
+      await flushMicrotasks();
+      expect(closeCompleted, isFalse);
+
+      closeCompleter.complete();
+      await flushMicrotasks();
+      expect(closeCompleted, isTrue);
+    });
+
+    test("errors are forwarded from the inner close()", () async {
+      expect(completer.sink.done, throwsA("oh no"));
+      expect(completer.sink.close(), throwsA("oh no"));
+      await flushMicrotasks();
+
+      var sink = new TestSink(onDone: () => throw "oh no");
+      completer.setDestinationSink(sink);
+    });
+
+    test("errors aren't top-leveled if only close() is listened to", () async {
+      expect(completer.sink.close(), throwsA("oh no"));
+      await flushMicrotasks();
+
+      var sink = new TestSink(onDone: () => throw "oh no");
+      completer.setDestinationSink(sink);
+
+      // Give the event loop a chance to top-level errors if it's going to.
+      await flushMicrotasks();
+    });
+
+    test("errors aren't top-leveled if only done is listened to", () async {
+      completer.sink.close();
+      expect(completer.sink.done, throwsA("oh no"));
+      await flushMicrotasks();
+
+      var sink = new TestSink(onDone: () => throw "oh no");
+      completer.setDestinationSink(sink);
+
+      // Give the event loop a chance to top-level errors if it's going to.
+      await flushMicrotasks();
+    });
+  });
+
+  test("the sink is closed, the destination is set, then done is read",
+      () async {
+    expect(completer.sink.close(), completes);
+    await flushMicrotasks();
+
+    completer.setDestinationSink(new TestSink());
+    await flushMicrotasks();
+
+    expect(completer.sink.done, completes);
+  });
+
+  test("done is read, the destination is set, then the sink is closed",
+      () async {
+    expect(completer.sink.done, completes);
+    await flushMicrotasks();
+
+    completer.setDestinationSink(new TestSink());
+    await flushMicrotasks();
+
+    expect(completer.sink.close(), completes);
+  });
+
+  test("doesn't allow the destination sink to be set multiple times", () {
+    completer.setDestinationSink(new TestSink());
+    expect(() => completer.setDestinationSink(new TestSink()),
+        throwsStateError);
+    expect(() => completer.setDestinationSink(new TestSink()),
+        throwsStateError);
+  });
+}
diff --git a/test/utils.dart b/test/utils.dart
index 0dc47fa..445b9fc 100644
--- a/test/utils.dart
+++ b/test/utils.dart
@@ -6,6 +6,8 @@
 library async.test.util;
 
 import "dart:async";
+
+import "package:async/async.dart";
 import "package:test/test.dart";
 
 /// A zero-millisecond timer should wait until after all microtasks.
@@ -40,3 +42,46 @@
   Future addStream(Stream<T> stream) async {}
   Future close() => completer.future;
 }
+
+/// A [StreamSink] that collects all events added to it as results.
+///
+/// This is used for testing code that interacts with sinks.
+class TestSink<T> implements StreamSink<T> {
+  /// The results corresponding to events that have been added to the sink.
+  final results = <Result<T>>[];
+
+  /// Whether [close] has been called.
+  bool get isClosed => _isClosed;
+  var _isClosed = false;
+
+  Future get done => _doneCompleter.future;
+  final _doneCompleter = new Completer();
+
+  final Function _onDone;
+
+  /// Creates a new sink.
+  ///
+  /// If [onDone] is passed, it's called when the user calls [close]. Its result
+  /// is piped to the [done] future.
+  TestSink({onDone()}) : _onDone = onDone ?? (() {});
+
+  void add(T event) {
+    results.add(new Result<T>.value(event));
+  }
+
+  void addError(error, [StackTrace stackTrace]) {
+    results.add(new Result<T>.error(error, stackTrace));
+  }
+
+  Future addStream(Stream<T> stream) {
+    var completer = new Completer.sync();
+    stream.listen(add, onError: addError, onDone: completer.complete);
+    return completer.future;
+  }
+
+  Future close() {
+    _isClosed = true;
+    _doneCompleter.complete(new Future.microtask(_onDone));
+    return done;
+  }
+}