Provide more error-handling customization.

This makes it easier to wrap underlying channels that don't support
errors at all.

R=tjblasi@google.com

Review URL: https://codereview.chromium.org//1669953002 .
diff --git a/lib/src/guarantee_channel.dart b/lib/src/guarantee_channel.dart
index 849cc2f..1047e14 100644
--- a/lib/src/guarantee_channel.dart
+++ b/lib/src/guarantee_channel.dart
@@ -30,8 +30,10 @@
   /// Whether the sink has closed, causing the underlying channel to disconnect.
   bool _disconnected = false;
 
-  GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) {
-    _sink = new _GuaranteeSink<T>(innerSink, this);
+  GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink,
+      {bool allowSinkErrors: true}) {
+    _sink = new _GuaranteeSink<T>(innerSink, this,
+        allowErrors: allowSinkErrors);
 
     // Enforce the single-subscription guarantee by changing a broadcast stream
     // to single-subscription.
@@ -76,10 +78,13 @@
   /// The [GuaranteeChannel] this belongs to.
   final GuaranteeChannel<T> _channel;
 
-  Future get done => _inner.done;
+  Future get done => _doneCompleter.future;
+  final _doneCompleter = new Completer();
 
-  /// Whether the stream has emitted a done event, causing the underlying
-  /// channel to disconnect.
+  /// Whether connection is disconnected.
+  ///
+  /// This can happen because the stream has emitted a done event, or because
+  /// the user added an error when [_allowErrors] is `false`.
   bool _disconnected = false;
 
   /// Whether the user has called [close].
@@ -96,7 +101,14 @@
   /// Whether we're currently adding a stream with [addStream].
   bool get _inAddStream => _addStreamSubscription != null;
 
-  _GuaranteeSink(this._inner, this._channel);
+  /// Whether errors are passed on to the underlying sink.
+  ///
+  /// If this is `false`, any error passed to the sink is piped to [done] and
+  /// the underlying sink is closed.
+  final bool _allowErrors;
+
+  _GuaranteeSink(this._inner, this._channel, {bool allowErrors: true})
+      : _allowErrors = allowErrors;
 
   void add(T data) {
     if (_closed) throw new StateError("Cannot add event after closing.");
@@ -115,7 +127,20 @@
     }
     if (_disconnected) return;
 
-    _inner.addError(error, stackTrace);
+    if (_allowErrors) {
+      _inner.addError(error, stackTrace);
+      return;
+    }
+
+    _doneCompleter.completeError(error, stackTrace);
+
+    // Treat an error like both the stream and sink disconnecting.
+    _onStreamDisconnected();
+    _channel._onSinkDisconnected();
+
+    // Ignore errors from the inner sink. We're already surfacing one error, and
+    // if the user handles it we don't want them to have another top-level.
+    _inner.close().catchError((_) {});
   }
 
   Future addStream(Stream<T> stream) {
@@ -141,11 +166,15 @@
       throw new StateError("Cannot close sink while adding stream.");
     }
 
+    if (_closed) return done;
     _closed = true;
-    if (_disconnected) return new Future.value();
 
-    _channel._onSinkDisconnected();
-    return _inner.close();
+    if (!_disconnected) {
+      _channel._onSinkDisconnected();
+      _doneCompleter.complete(_inner.close());
+    }
+
+    return done;
   }
 
   /// Called by [GuaranteeChannel] when the stream emits a done event.
@@ -154,8 +183,9 @@
   /// sink should stop forwarding events.
   void _onStreamDisconnected() {
     _disconnected = true;
-    if (!_inAddStream) return;
+    if (!_doneCompleter.isCompleted) _doneCompleter.complete();
 
+    if (!_inAddStream) return;
     _addStreamCompleter.complete(_addStreamSubscription.cancel());
     _addStreamCompleter = null;
     _addStreamSubscription = null;
diff --git a/lib/src/stream_channel_controller.dart b/lib/src/stream_channel_controller.dart
index 1812a0e..ad78323 100644
--- a/lib/src/stream_channel_controller.dart
+++ b/lib/src/stream_channel_controller.dart
@@ -47,12 +47,18 @@
   /// If [sync] is true, events added to either channel's sink are synchronously
   /// dispatched to the other channel's stream. This should only be done if the
   /// source of those events is already asynchronous.
-  StreamChannelController({bool sync: false}) {
+  ///
+  /// If [allowForeignErrors] is `false`, errors are not allowed to be passed to
+  /// the foreign channel's sink. If any are, the connection will close and the
+  /// error will be forwarded to the foreign channel's [Sink.done] future. This
+  /// guarantees that the local stream will never emit errors.
+  StreamChannelController({bool allowForeignErrors: true, bool sync: false}) {
     var localToForeignController = new StreamController<T>(sync: sync);
     var foreignToLocalController = new StreamController<T>(sync: sync);
     _local = new StreamChannel<T>.withGuarantees(
         foreignToLocalController.stream, localToForeignController.sink);
     _foreign = new StreamChannel<T>.withGuarantees(
-        localToForeignController.stream, foreignToLocalController.sink);
+        localToForeignController.stream, foreignToLocalController.sink,
+        allowSinkErrors: allowForeignErrors);
   }
 }
diff --git a/lib/stream_channel.dart b/lib/stream_channel.dart
index 8c4fad5..992f702 100644
--- a/lib/stream_channel.dart
+++ b/lib/stream_channel.dart
@@ -78,8 +78,13 @@
   /// [StreamChannel] documentation. This makes it somewhat less efficient than
   /// just wrapping a stream and a sink directly, so [new StreamChannel] should
   /// be used when the guarantees are provided natively.
-  factory StreamChannel.withGuarantees(Stream<T> stream, StreamSink<T> sink) =>
-      new GuaranteeChannel(stream, sink);
+  ///
+  /// If [allowSinkErrors] is `false`, errors are not allowed to be passed to
+  /// [sink]. If any are, the connection will close and the error will be
+  /// forwarded to [Sink.done].
+  factory StreamChannel.withGuarantees(Stream<T> stream, StreamSink<T> sink,
+          {bool allowSinkErrors: true}) =>
+      new GuaranteeChannel(stream, sink, allowSinkErrors: allowSinkErrors);
 
   /// Connects [this] to [other], so that any values emitted by either are sent
   /// directly to the other.
diff --git a/pubspec.yaml b/pubspec.yaml
index f0b830c..52ee521 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: stream_channel
-version: 1.2.0-dev
+version: 1.2.0
 description: An abstraction for two-way communication channels.
 author: Dart Team <misc@dartlang.org>
 homepage: https://github.com/dart-lang/stream_channel
diff --git a/test/stream_channel_controller_test.dart b/test/stream_channel_controller_test.dart
index e45f090..cf8a31b 100644
--- a/test/stream_channel_controller_test.dart
+++ b/test/stream_channel_controller_test.dart
@@ -25,6 +25,17 @@
       controller.foreign.sink..add(1)..add(2)..add(3)..close();
       expect(controller.local.stream.toList(), completion(equals([1, 2, 3])));
     });
+
+    test("with allowForeignErrors: false, shuts down the connection if an "
+        "error is added to the foreign channel", () {
+      controller = new StreamChannelController(allowForeignErrors: false);
+
+      controller.foreign.sink.addError("oh no");
+      expect(controller.foreign.sink.done, throwsA("oh no"));
+      expect(controller.foreign.stream.toList(), completion(isEmpty));
+      expect(controller.local.sink.done, completes);
+      expect(controller.local.stream.toList(), completion(isEmpty));
+    });
   });
 
   group("synchronously", () {
diff --git a/test/with_guarantees_test.dart b/test/with_guarantees_test.dart
index 0e95f9d..38afefa 100644
--- a/test/with_guarantees_test.dart
+++ b/test/with_guarantees_test.dart
@@ -107,4 +107,44 @@
     channel.sink.addError("error");
     expect(sinkController.stream.first, throwsA("error"));
   });
+
+  test("Sink.done completes once the stream is done", () {
+    channel.stream.listen(null);
+    expect(channel.sink.done, completes);
+    streamController.close();
+  });
+
+  group("with allowSinkErrors: false", () {
+    setUp(() {
+      streamController = new StreamController();
+      sinkController = new StreamController();
+      channel = new StreamChannel.withGuarantees(
+          streamController.stream, sinkController.sink, allowSinkErrors: false);
+    });
+
+    test("forwards errors to Sink.done but not the stream", () {
+      channel.sink.addError("oh no");
+      expect(channel.sink.done, throwsA("oh no"));
+      sinkController.stream.listen(null,
+          onError: expectAsync((_) {}, count: 0));
+    });
+
+    test("adding an error causes the stream to emit a done event", () {
+      expect(channel.sink.done, throwsA("oh no"));
+
+      streamController.add(1);
+      streamController.add(2);
+      streamController.add(3);
+
+      expect(channel.stream.listen(expectAsync((event) {
+        if (event == 2) channel.sink.addError("oh no");
+      }, count: 2)).asFuture(), completes);
+    });
+
+    test("adding an error closes the inner sink", () {
+      channel.sink.addError("oh no");
+      expect(channel.sink.done, throwsA("oh no"));
+      expect(sinkController.stream.toList(), completion(isEmpty));
+    });
+  });
 }