Add ClosedStreamSink, and *Completer.setError, and StreamSinkCompleter.fromFuture.

R=lrn@google.com

Review URL: https://codereview.chromium.org//1615253002 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index af87e24..15570d7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,6 +3,12 @@
 - Added `StreamSinkCompleter`, for creating a `StreamSink` now and providing its
   destination later as another sink.
 
+- Added `StreamCompleter.setError`, a shortcut for emitting a single error event
+  on the resulting stream.
+
+- Added `NullStreamSink`, an implementation of `StreamSink` that discards all
+  events.
+
 ## 1.7.0
 
 - Added `SingleSubscriptionTransformer`, a `StreamTransformer` that converts a
diff --git a/lib/async.dart b/lib/async.dart
index ef59b73..ef1ac04 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -15,6 +15,7 @@
 export "src/delegate/stream_subscription.dart";
 export "src/future_group.dart";
 export "src/lazy_stream.dart";
+export "src/null_stream_sink.dart";
 export "src/restartable_timer.dart";
 export "src/result_future.dart";
 export "src/single_subscription_transformer.dart";
diff --git a/lib/src/null_stream_sink.dart b/lib/src/null_stream_sink.dart
new file mode 100644
index 0000000..aa85924
--- /dev/null
+++ b/lib/src/null_stream_sink.dart
@@ -0,0 +1,90 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.null_stream_sink;
+
+import 'dart:async';
+
+/// A [StreamSink] that discards all events.
+///
+/// The sink silently drops events until [close] is called, at which point it
+/// throws [StateError]s when events are added. This is the same behavior as a
+/// sink whose remote end has closed, such as when a [WebSocket] connection has
+/// been closed.
+///
+/// This can be used when a sink is needed but no events are actually intended
+/// to be added. The [new NullStreamSink.error] constructor can be used to
+/// represent errors when creating a sink, since [StreamSink.done] exposes sink
+/// errors. For example:
+///
+/// ```dart
+/// StreamSink<List<int>> openForWrite(String filename) {
+///   try {
+///     return new RandomAccessSink(new File(filename).openSync());
+///   } on IOException catch (error, stackTrace) {
+///     return new NullStreamSink.error(error, stackTrace);
+///   }
+/// }
+/// ```
+class NullStreamSink<T> implements StreamSink<T> {
+  final Future done;
+
+  /// Whether the sink has been closed.
+  var _closed = false;
+
+  /// Whether an [addStream] call is pending.
+  ///
+  /// We don't actually add any events from streams, but it does return the
+  /// [StreamSubscription.cancel] future so to be [StreamSink]-complaint we
+  /// reject events until that completes.
+  var _addingStream = false;
+
+  /// Creates a null sink.
+  ///
+  /// If [done] is passed, it's used as the [Sink.done] future. Otherwise, a
+  /// completed future is used.
+  NullStreamSink({Future done}) : done = done ?? new Future.value();
+
+  /// Creates a null sink whose [done] future emits [error].
+  ///
+  /// Note that this error will not be considered uncaught.
+  NullStreamSink.error(error, [StackTrace stackTrace])
+      : done = new Future.error(error, stackTrace)
+          // Don't top-level the error. This gives the user a change to call
+          // [close] or [done], and matches the behavior of a remote endpoint
+          // experiencing an error.
+          ..catchError((_) {});
+
+  void add(T data) {
+    _checkEventAllowed();
+  }
+
+  void addError(error, [StackTrace stackTrace]) {
+    _checkEventAllowed();
+  }
+
+  Future addStream(Stream<T> stream) {
+    _checkEventAllowed();
+
+    _addingStream = true;
+    var future = stream.listen(null).cancel() ?? new Future.value();
+    return future.whenComplete(() {
+      _addingStream = false;
+    });
+  }
+
+  /// Throws a [StateError] if [close] has been called or an [addStream] call is
+  /// pending.
+  void _checkEventAllowed() {
+    if (_closed) throw new StateError("Cannot add to a closed sink.");
+    if (_addingStream) {
+      throw new StateError("Cannot add to a sink while adding a stream.");
+    }
+  }
+
+  Future close() {
+    _closed = true;
+    return done;
+  }
+}
diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart
index c343e6e..a6260dc 100644
--- a/lib/src/stream_completer.dart
+++ b/lib/src/stream_completer.dart
@@ -39,9 +39,7 @@
   static Stream fromFuture(Future<Stream> streamFuture) {
     var completer = new StreamCompleter();
     streamFuture.then(completer.setSourceStream,
-                      onError: (e, s) {
-                        completer.setSourceStream(streamFuture.asStream());
-                      });
+        onError: completer.setError);
     return completer.stream;
   }
 
@@ -76,8 +74,8 @@
   /// it is immediately listened to, and its events are forwarded to the
   /// existing subscription.
   ///
-  /// Either [setSourceStream] or [setEmpty] may be called at most once.
-  /// Trying to call either of them again will fail.
+  /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at
+  /// most once. Trying to call any of them again will fail.
   void setSourceStream(Stream<T> sourceStream) {
     if (_stream._isSourceStreamSet) {
       throw new StateError("Source stream already set");
@@ -87,14 +85,24 @@
 
   /// Equivalent to setting an empty stream using [setSourceStream].
   ///
-  /// Either [setSourceStream] or [setEmpty] may be called at most once.
-  /// Trying to call either of them again will fail.
+  /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at
+  /// most once. Trying to call any of them again will fail.
   void setEmpty() {
     if (_stream._isSourceStreamSet) {
       throw new StateError("Source stream already set");
     }
     _stream._setEmpty();
   }
+
+  /// Completes this to a stream that emits [error] and then closes.
+  ///
+  /// This is useful when the process of creating the data for the stream fails.
+  ///
+  /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at
+  /// most once. Trying to call any of them again will fail.
+  void setError(error, [StackTrace stackTrace]) {
+    setSourceStream(new Stream.fromFuture(new Future.error(error, stackTrace)));
+  }
 }
 
 /// Stream completed by [StreamCompleter].
diff --git a/lib/src/stream_sink_completer.dart b/lib/src/stream_sink_completer.dart
index 10caa06..0b6dc46 100644
--- a/lib/src/stream_sink_completer.dart
+++ b/lib/src/stream_sink_completer.dart
@@ -6,6 +6,8 @@
 
 import 'dart:async';
 
+import 'null_stream_sink.dart';
+
 /// A [sink] where the destination is provided later.
 ///
 /// The [sink] is a normal sink that you can add events to to immediately, but
@@ -29,6 +31,20 @@
   /// Returns [sink] typed as a [_CompleterSink].
   _CompleterSink<T> get _sink => sink;
 
+  /// Convert a `Future<StreamSink>` to a `StreamSink`.
+  ///
+  /// This creates a sink using a sink completer, and sets the destination sink
+  /// to the result of the future when the future completes.
+  ///
+  /// If the future completes with an error, the returned sink will instead
+  /// be closed. Its [Sink.done] future will contain the error.
+  static StreamSink fromFuture(Future<StreamSink> sinkFuture) {
+    var completer = new StreamSinkCompleter();
+    sinkFuture.then(completer.setDestinationSink,
+        onError: completer.setError);
+    return completer.sink;
+  }
+
   /// Sets a sink as the destination for events from the [StreamSinkCompleter]'s
   /// [sink].
   ///
@@ -41,12 +57,25 @@
   /// buffered until the destination is available.
   ///
   /// A destination sink may be set at most once.
+  ///
+  /// Either of [setDestinationSink] or [setError] may be called at most once.
+  /// Trying to call either of them again will fail.
   void setDestinationSink(StreamSink<T> destinationSink) {
     if (_sink._destinationSink != null) {
       throw new StateError("Destination sink already set");
     }
     _sink._setDestinationSink(destinationSink);
   }
+
+  /// Completes this to a closed sink whose [Sink.done] future emits [error].
+  ///
+  /// This is useful when the process of loading the sink fails.
+  ///
+  /// Either of [setDestinationSink] or [setError] may be called at most once.
+  /// Trying to call either of them again will fail.
+  void setError(error, [StackTrace stackTrace]) {
+    setDestinationSink(new NullStreamSink.error(error, stackTrace));
+  }
 }
 
 /// [StreamSink] completed by [StreamSinkCompleter].
diff --git a/pubspec.yaml b/pubspec.yaml
index ccfa970..7560836 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 1.8.0-dev
+version: 1.8.0
 author: Dart Team <misc@dartlang.org>
 description: Utility functions and classes related to the 'dart:async' library.
 homepage: https://www.github.com/dart-lang/async
diff --git a/test/null_stream_sink_test.dart b/test/null_stream_sink_test.dart
new file mode 100644
index 0000000..244f99c
--- /dev/null
+++ b/test/null_stream_sink_test.dart
@@ -0,0 +1,113 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import "dart:async";
+
+import "package:async/async.dart";
+import "package:test/test.dart";
+
+import "utils.dart";
+
+void main() {
+  group("constructors", () {
+    test("done defaults to a completed future", () {
+      var sink = new NullStreamSink();
+      expect(sink.done, completes);
+    });
+
+    test("a custom future may be passed to done", () async {
+      var completer = new Completer();
+      var sink = new NullStreamSink(done: completer.future);
+
+      var doneFired = false;
+      sink.done.then((_) {
+        doneFired = true;
+      });
+      await flushMicrotasks();
+      expect(doneFired, isFalse);
+
+      completer.complete();
+      await flushMicrotasks();
+      expect(doneFired, isTrue);
+    });
+
+    test("NullStreamSink.error passes an error to done", () {
+      var sink = new NullStreamSink.error("oh no");
+      expect(sink.done, throwsA("oh no"));
+    });
+  });
+
+  group("events", () {
+    test("are silently dropped before close", () {
+      var sink = new NullStreamSink();
+      sink.add(1);
+      sink.addError("oh no");
+    });
+
+    test("throw StateErrors after close", () {
+      var sink = new NullStreamSink();
+      expect(sink.close(), completes);
+
+      expect(() => sink.add(1), throwsStateError);
+      expect(() => sink.addError("oh no"), throwsStateError);
+      expect(() => sink.addStream(new Stream.empty()), throwsStateError);
+    });
+
+    group("addStream", () {
+      test("listens to the stream then cancels immediately", () async {
+        var sink = new NullStreamSink();
+        var canceled = false;
+        var controller = new StreamController(onCancel: () {
+          canceled = true;
+        });
+
+        expect(sink.addStream(controller.stream), completes);
+        await flushMicrotasks();
+        expect(canceled, isTrue);
+      });
+
+      test("returns the cancel future", () async {
+        var completer = new Completer();
+        var sink = new NullStreamSink();
+        var controller = new StreamController(onCancel: () => completer.future);
+
+        var addStreamFired = false;
+        sink.addStream(controller.stream).then((_) {
+          addStreamFired = true;
+        });
+        await flushMicrotasks();
+        expect(addStreamFired, isFalse);
+
+        completer.complete();
+        await flushMicrotasks();
+        expect(addStreamFired, isTrue);
+      });
+
+      test("pipes errors from the cancel future through addStream", () async {
+        var sink = new NullStreamSink();
+        var controller = new StreamController(onCancel: () => throw "oh no");
+        expect(sink.addStream(controller.stream), throwsA("oh no"));
+      });
+
+      test("causes events to throw StateErrors until the future completes",
+          () async {
+        var sink = new NullStreamSink();
+        var future = sink.addStream(new Stream.empty());
+        expect(() => sink.add(1), throwsStateError);
+        expect(() => sink.addError("oh no"), throwsStateError);
+        expect(() => sink.addStream(new Stream.empty()), throwsStateError);
+
+        await future;
+        sink.add(1);
+        sink.addError("oh no");
+        expect(sink.addStream(new Stream.empty()), completes);
+      });
+    });
+  });
+
+  test("close returns the done future", () {
+    var sink = new NullStreamSink.error("oh no");
+    expect(sink.close(), throwsA("oh no"));
+  });
+}
diff --git a/test/stream_completer_test.dart b/test/stream_completer_test.dart
index cd3ceb9..76d81d1 100644
--- a/test/stream_completer_test.dart
+++ b/test/stream_completer_test.dart
@@ -337,6 +337,34 @@
     });
     expect(controller.hasListener, isFalse);
   });
+
+  group("setError()", () {
+    test("produces a stream that emits a single error", () {
+      var completer = new StreamCompleter();
+      completer.stream.listen(
+          unreachable("data"),
+          onError: expectAsync((error, stackTrace) {
+            expect(error, equals("oh no"));
+          }),
+          onDone: expectAsync(() {}));
+
+      completer.setError("oh no");
+    });
+
+    test("produces a stream that emits a single error on a later listen",
+        () async {
+      var completer = new StreamCompleter();
+      completer.setError("oh no");
+      await flushMicrotasks();
+
+      completer.stream.listen(
+          unreachable("data"),
+          onError: expectAsync((error, stackTrace) {
+            expect(error, equals("oh no"));
+          }),
+          onDone: expectAsync(() {}));
+    });
+  });
 }
 
 Stream<int> createStream() async* {
diff --git a/test/stream_sink_completer_test.dart b/test/stream_sink_completer_test.dart
index 3892d26..82393fc 100644
--- a/test/stream_sink_completer_test.dart
+++ b/test/stream_sink_completer_test.dart
@@ -245,6 +245,48 @@
     expect(completer.sink.close(), completes);
   });
 
+  group("fromFuture()", () {
+    test("with a successful completion", () async {
+      var futureCompleter = new Completer();
+      var sink = StreamSinkCompleter.fromFuture(futureCompleter.future);
+      sink.add(1);
+      sink.add(2);
+      sink.add(3);
+      sink.close();
+
+      var testSink = new TestSink();
+      futureCompleter.complete(testSink);
+      await testSink.done;
+
+      expect(testSink.results[0].asValue.value, equals(1));
+      expect(testSink.results[1].asValue.value, equals(2));
+      expect(testSink.results[2].asValue.value, equals(3));
+    });
+
+    test("with an error", () async {
+      var futureCompleter = new Completer();
+      var sink = StreamSinkCompleter.fromFuture(futureCompleter.future);
+      expect(sink.done, throwsA("oh no"));
+      futureCompleter.completeError("oh no");
+    });
+  });
+
+  group("setError()", () {
+    test("produces a closed sink with the error", () {
+      completer.setError("oh no");
+      expect(completer.sink.done, throwsA("oh no"));
+      expect(completer.sink.close(), throwsA("oh no"));
+    });
+
+    test("produces an error even if done was accessed earlier", () async {
+      expect(completer.sink.done, throwsA("oh no"));
+      expect(completer.sink.close(), throwsA("oh no"));
+      await flushMicrotasks();
+
+      completer.setError("oh no");
+    });
+  });
+
   test("doesn't allow the destination sink to be set multiple times", () {
     completer.setDestinationSink(new TestSink());
     expect(() => completer.setDestinationSink(new TestSink()),