Make IsolateChannel use StreamChannelCompleter.

This exposed some lingering bugs in GuaranteeChannel as well.

R=tjblasi@google.com

Review URL: https://codereview.chromium.org//1671763002 .
diff --git a/lib/src/guarantee_channel.dart b/lib/src/guarantee_channel.dart
index 1047e14..a874799 100644
--- a/lib/src/guarantee_channel.dart
+++ b/lib/src/guarantee_channel.dart
@@ -127,6 +127,14 @@
     }
     if (_disconnected) return;
 
+    _addError(error, stackTrace);
+  }
+
+  /// Like [addError], but doesn't check to ensure that an error can be added.
+  ///
+  /// This is called from [addStream], so it shouldn't fail if a stream is being
+  /// added.
+  void _addError(error, [StackTrace stackTrace]) {
     if (_allowErrors) {
       _inner.addError(error, stackTrace);
       return;
@@ -153,7 +161,7 @@
     _addStreamCompleter = new Completer.sync();
     _addStreamSubscription = stream.listen(
         _inner.add,
-        onError: _inner.addError,
+        onError: _addError,
         onDone: _addStreamCompleter.complete);
     return _addStreamCompleter.future.then((_) {
       _addStreamCompleter = null;
diff --git a/lib/src/isolate_channel.dart b/lib/src/isolate_channel.dart
index a466d87..3466d19 100644
--- a/lib/src/isolate_channel.dart
+++ b/lib/src/isolate_channel.dart
@@ -9,7 +9,6 @@
 import 'package:stack_trace/stack_trace.dart';
 
 import '../stream_channel.dart';
-import 'isolate_channel/send_port_sink.dart';
 
 /// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair,
 /// presumably with another isolate.
@@ -54,10 +53,13 @@
     var subscription;
     subscription = receivePort.listen((message) {
       if (message is SendPort) {
-        streamCompleter.setSourceStream(
-            new SubscriptionStream<T>(subscription));
-        sinkCompleter.setDestinationSink(
-            new SendPortSink<T>(receivePort, message));
+        var controller = new StreamChannelController(allowForeignErrors: false);
+        new SubscriptionStream<T>(subscription).pipe(controller.local.sink);
+        controller.local.stream.listen(message.send,
+            onDone: receivePort.close);
+
+        streamCompleter.setSourceStream(controller.foreign.stream);
+        sinkCompleter.setDestinationSink(controller.foreign.sink);
         return;
       }
 
@@ -88,9 +90,13 @@
 
   /// Creates a stream channel that receives messages from [receivePort] and
   /// sends them over [sendPort].
-  IsolateChannel(ReceivePort receivePort, SendPort sendPort)
-      : stream = new StreamView<T>(receivePort),
-        sink = new SendPortSink<T>(receivePort, sendPort);
+  factory IsolateChannel(ReceivePort receivePort, SendPort sendPort) {
+    var controller = new StreamChannelController(allowForeignErrors: false);
+    receivePort.pipe(controller.local.sink);
+    controller.local.stream.listen(sendPort.send, onDone: receivePort.close);
+    return new IsolateChannel._(
+        controller.foreign.stream, controller.foreign.sink);
+  }
 
   IsolateChannel._(this.stream, this.sink);
 }
diff --git a/lib/src/isolate_channel/send_port_sink.dart b/lib/src/isolate_channel/send_port_sink.dart
deleted file mode 100644
index d98f1da..0000000
--- a/lib/src/isolate_channel/send_port_sink.dart
+++ /dev/null
@@ -1,111 +0,0 @@
-// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-import 'dart:async';
-import 'dart:isolate';
-
-/// The sink for [IsolateChannel].
-///
-/// [SendPort] doesn't natively implement any sink API, so this adds that API as
-/// a wrapper. Closing this just closes the [ReceivePort].
-class SendPortSink<T> implements StreamSink<T> {
-  /// The port that produces incoming messages.
-  ///
-  /// This is wrapped in a [StreamView] to produce [stream].
-  final ReceivePort _receivePort;
-
-  /// The port that sends outgoing messages.
-  final SendPort _sendPort;
-
-  Future get done => _doneCompleter.future;
-  final _doneCompleter = new Completer();
-
-  /// Whether [done] has been completed.
-  ///
-  /// This is distinct from [_closed] because [done] can complete with an error
-  /// without the user explicitly calling [close].
-  bool get _isDone => _doneCompleter.isCompleted;
-
-  /// Whether the user has called [close].
-  bool _closed = false;
-
-  /// Whether we're currently adding a stream with [addStream].
-  bool _inAddStream = false;
-
-  SendPortSink(this._receivePort, this._sendPort);
-
-  void add(T data) {
-    if (_closed) throw new StateError("Cannot add event after closing.");
-    if (_inAddStream) {
-      throw new StateError("Cannot add event while adding stream.");
-    }
-    if (_isDone) return;
-
-    _add(data);
-  }
-
-  /// A helper for [add] that doesn't check for [StateError]s.
-  ///
-  /// This is called from [addStream], so it shouldn't check [_inAddStream].
-  void _add(T data) {
-    _sendPort.send(data);
-  }
-
-  void addError(error, [StackTrace stackTrace]) {
-    if (_closed) throw new StateError("Cannot add event after closing.");
-    if (_inAddStream) {
-      throw new StateError("Cannot add event while adding stream.");
-    }
-
-    _close(error, stackTrace);
-  }
-
-  Future close() {
-    if (_inAddStream) {
-      throw new StateError("Cannot close sink while adding stream.");
-    }
-
-    _closed = true;
-    return _close();
-  }
-
-  /// A helper for [close] that doesn't check for [StateError]s.
-  ///
-  /// This is called from [addStream], so it shouldn't check [_inAddStream]. It
-  /// also forwards [error] and [stackTrace] to [done] if they're passed.
-  Future _close([error, StackTrace stackTrace]) {
-    if (_isDone) return done;
-
-    _receivePort.close();
-
-    if (error != null) {
-      _doneCompleter.completeError(error, stackTrace);
-    } else {
-      _doneCompleter.complete();
-    }
-
-    return done;
-  }
-
-  Future addStream(Stream<T> stream) {
-    if (_closed) throw new StateError("Cannot add stream after closing.");
-    if (_inAddStream) {
-      throw new StateError("Cannot add stream while adding stream.");
-    }
-    if (_isDone) return new Future.value();
-
-    _inAddStream = true;
-    var completer = new Completer.sync();
-    stream.listen(_add,
-        onError: (error, stackTrace) {
-          _close(error, stackTrace);
-          completer.complete();
-        },
-        onDone: completer.complete,
-        cancelOnError: true);
-    return completer.future.then((_) {
-      _inAddStream = false;
-    });
-  }
-}
diff --git a/lib/src/stream_channel_controller.dart b/lib/src/stream_channel_controller.dart
index ad78323..45b2865 100644
--- a/lib/src/stream_channel_controller.dart
+++ b/lib/src/stream_channel_controller.dart
@@ -15,13 +15,13 @@
 ///
 /// ```dart
 /// StreamChannel isolateChannel(ReceivePort receivePort, SendPort sendPort) {
-///   var controller = new StreamChannelController();
+///   var controller = new StreamChannelController(allowForeignErrors: false);
 ///
 ///   // Pipe all events from the receive port into the local sink...
 ///   receivePort.pipe(controller.local.sink);
 ///
 ///   // ...and all events from the local stream into the send port.
-///   controller.local.listen(sendPort.add, onDone: receivePort.close);
+///   controller.local.stream.listen(sendPort.send, onDone: receivePort.close);
 ///
 ///   // Then return the foreign controller for your users to use.
 ///   return controller.foreign;
diff --git a/test/with_guarantees_test.dart b/test/with_guarantees_test.dart
index 38afefa..409b28f 100644
--- a/test/with_guarantees_test.dart
+++ b/test/with_guarantees_test.dart
@@ -114,6 +114,29 @@
     streamController.close();
   });
 
+  test("events can't be added to an explicitly-closed sink", () {
+    sinkController.stream.listen(null); // Work around sdk#19095.
+
+    expect(channel.sink.close(), completes);
+    expect(() => channel.sink.add(1), throwsStateError);
+    expect(() => channel.sink.addError("oh no"), throwsStateError);
+    expect(() => channel.sink.addStream(new Stream.fromIterable([])),
+        throwsStateError);
+  });
+
+  test("events can't be added while a stream is being added", () {
+    var controller = new StreamController();
+    channel.sink.addStream(controller.stream);
+
+    expect(() => channel.sink.add(1), throwsStateError);
+    expect(() => channel.sink.addError("oh no"), throwsStateError);
+    expect(() => channel.sink.addStream(new Stream.fromIterable([])),
+        throwsStateError);
+    expect(() => channel.sink.close(), throwsStateError);
+
+    controller.close();
+  });
+
   group("with allowSinkErrors: false", () {
     setUp(() {
       streamController = new StreamController();
@@ -146,5 +169,25 @@
       expect(channel.sink.done, throwsA("oh no"));
       expect(sinkController.stream.toList(), completion(isEmpty));
     });
+
+    test("adding an error via via addStream causes the stream to emit a done "
+        "event", () async {
+      var canceled = false;
+      var controller = new StreamController(onCancel: () {
+        canceled = true;
+      });
+
+      // This future shouldn't get the error, because it's sent to [Sink.done].
+      expect(channel.sink.addStream(controller.stream), completes);
+
+      controller.addError("oh no");
+      expect(channel.sink.done, throwsA("oh no"));
+      await pumpEventQueue();
+      expect(canceled, isTrue);
+
+      // Even though the sink is closed, this shouldn't throw an error because
+      // the user didn't explicitly close it.
+      channel.sink.add(1);
+    });
   });
 }