Add IsolateChannel.connect* constructors.

These constructors use a lightweight protocol to establish a two-way
connection over a previously-one-way connection.

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//1638183002 .
diff --git a/lib/src/isolate_channel.dart b/lib/src/isolate_channel.dart
index 46375c9..a466d87 100644
--- a/lib/src/isolate_channel.dart
+++ b/lib/src/isolate_channel.dart
@@ -5,7 +5,11 @@
 import 'dart:async';
 import 'dart:isolate';
 
+import 'package:async/async.dart';
+import 'package:stack_trace/stack_trace.dart';
+
 import '../stream_channel.dart';
+import 'isolate_channel/send_port_sink.dart';
 
 /// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair,
 /// presumably with another isolate.
@@ -22,125 +26,71 @@
 /// ensure that they always close the [sink] of every [IsolateChannel] they use
 /// to avoid leaving dangling [ReceivePort]s.
 class IsolateChannel<T> extends StreamChannelMixin<T> {
-  /// The port that produces incoming messages.
+  final Stream<T> stream;
+  final StreamSink<T> sink;
+
+  /// Connects to a remote channel that was created with
+  /// [IsolateChannel.connectSend].
   ///
-  /// This is wrapped in a [StreamView] to produce [stream].
-  final ReceivePort _receivePort;
+  /// These constructors establish a connection using only a single
+  /// [SendPort]/[ReceivePort] pair, as long as each side uses one of the
+  /// connect constructors.
+  ///
+  /// The connection protocol is guaranteed to remain compatible across versions
+  /// at least until the next major version release. If the protocol is
+  /// violated, the resulting channel will emit a single value on its stream and
+  /// then close.
+  factory IsolateChannel.connectReceive(ReceivePort receivePort) {
+    // We can't use a [StreamChannelCompleter] here because we need the return
+    // value to be an [IsolateChannel].
+    var streamCompleter = new StreamCompleter<T>();
+    var sinkCompleter = new StreamSinkCompleter<T>();
+    var channel = new IsolateChannel._(
+        streamCompleter.stream, sinkCompleter.sink);
 
-  /// The port that sends outgoing messages.
-  final SendPort _sendPort;
+    // The first message across the ReceivePort should be a SendPort pointing to
+    // the remote end. If it's not, we'll make the stream emit an error
+    // complaining.
+    var subscription;
+    subscription = receivePort.listen((message) {
+      if (message is SendPort) {
+        streamCompleter.setSourceStream(
+            new SubscriptionStream<T>(subscription));
+        sinkCompleter.setDestinationSink(
+            new SendPortSink<T>(receivePort, message));
+        return;
+      }
 
-  Stream<T> get stream => _stream;
-  final Stream<T> _stream;
+      streamCompleter.setError(
+          new StateError('Unexpected Isolate response "$message".'),
+          new Trace.current());
+      sinkCompleter.setDestinationSink(new NullStreamSink<T>());
+      subscription.cancel();
+    });
 
-  StreamSink<T> get sink => _sink;
-  _SendPortSink<T> _sink;
+    return channel;
+  }
+
+  /// Connects to a remote channel that was created with
+  /// [IsolateChannel.connectReceive].
+  ///
+  /// These constructors establish a connection using only a single
+  /// [SendPort]/[ReceivePort] pair, as long as each side uses one of the
+  /// connect constructors.
+  ///
+  /// The connection protocol is guaranteed to remain compatible across versions
+  /// at least until the next major version release.
+  factory IsolateChannel.connectSend(SendPort sendPort) {
+    var receivePort = new ReceivePort();
+    sendPort.send(receivePort.sendPort);
+    return new IsolateChannel(receivePort, sendPort);
+  }
 
   /// Creates a stream channel that receives messages from [receivePort] and
   /// sends them over [sendPort].
-  IsolateChannel(ReceivePort receivePort, this._sendPort)
-      : _receivePort = receivePort,
-        _stream = new StreamView<T>(receivePort) {
-    _sink = new _SendPortSink<T>(this);
-  }
-}
+  IsolateChannel(ReceivePort receivePort, SendPort sendPort)
+      : stream = new StreamView<T>(receivePort),
+        sink = new SendPortSink<T>(receivePort, sendPort);
 
-/// The sink for [IsolateChannel].
-///
-/// [SendPort] doesn't natively implement any sink API, so this adds that API as
-/// a wrapper. Closing this just closes the [ReceivePort].
-class _SendPortSink<T> implements StreamSink<T> {
-  /// The channel that this sink is for.
-  final IsolateChannel _channel;
-
-  Future get done => _doneCompleter.future;
-  final _doneCompleter = new Completer();
-
-  /// Whether [done] has been completed.
-  ///
-  /// This is distinct from [_closed] because [done] can complete with an error
-  /// without the user explicitly calling [close].
-  bool get _isDone => _doneCompleter.isCompleted;
-
-  /// Whether the user has called [close].
-  bool _closed = false;
-
-  /// Whether we're currently adding a stream with [addStream].
-  bool _inAddStream = false;
-
-  _SendPortSink(this._channel);
-
-  void add(T data) {
-    if (_closed) throw new StateError("Cannot add event after closing.");
-    if (_inAddStream) {
-      throw new StateError("Cannot add event while adding stream.");
-    }
-    if (_isDone) return;
-
-    _add(data);
-  }
-
-  /// A helper for [add] that doesn't check for [StateError]s.
-  ///
-  /// This is called from [addStream], so it shouldn't check [_inAddStream].
-  void _add(T data) {
-    _channel._sendPort.send(data);
-  }
-
-  void addError(error, [StackTrace stackTrace]) {
-    if (_closed) throw new StateError("Cannot add event after closing.");
-    if (_inAddStream) {
-      throw new StateError("Cannot add event while adding stream.");
-    }
-
-    _close(error, stackTrace);
-  }
-
-  Future close() {
-    if (_inAddStream) {
-      throw new StateError("Cannot close sink while adding stream.");
-    }
-
-    _closed = true;
-    return _close();
-  }
-
-  /// A helper for [close] that doesn't check for [StateError]s.
-  ///
-  /// This is called from [addStream], so it shouldn't check [_inAddStream]. It
-  /// also forwards [error] and [stackTrace] to [done] if they're passed.
-  Future _close([error, StackTrace stackTrace]) {
-    if (_isDone) return done;
-
-    _channel._receivePort.close();
-
-    if (error != null) {
-      _doneCompleter.completeError(error, stackTrace);
-    } else {
-      _doneCompleter.complete();
-    }
-
-    return done;
-  }
-
-  Future addStream(Stream<T> stream) {
-    if (_closed) throw new StateError("Cannot add stream after closing.");
-    if (_inAddStream) {
-      throw new StateError("Cannot add stream while adding stream.");
-    }
-    if (_isDone) return new Future.value();
-
-    _inAddStream = true;
-    var completer = new Completer.sync();
-    stream.listen(_add,
-        onError: (error, stackTrace) {
-          _close(error, stackTrace);
-          completer.complete();
-        },
-        onDone: completer.complete,
-        cancelOnError: true);
-    return completer.future.then((_) {
-      _inAddStream = false;
-    });
-  }
+  IsolateChannel._(this.stream, this.sink);
 }
diff --git a/lib/src/isolate_channel/send_port_sink.dart b/lib/src/isolate_channel/send_port_sink.dart
new file mode 100644
index 0000000..d98f1da
--- /dev/null
+++ b/lib/src/isolate_channel/send_port_sink.dart
@@ -0,0 +1,111 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:isolate';
+
+/// The sink for [IsolateChannel].
+///
+/// [SendPort] doesn't natively implement any sink API, so this adds that API as
+/// a wrapper. Closing this just closes the [ReceivePort].
+class SendPortSink<T> implements StreamSink<T> {
+  /// The port that produces incoming messages.
+  ///
+  /// This is wrapped in a [StreamView] to produce [stream].
+  final ReceivePort _receivePort;
+
+  /// The port that sends outgoing messages.
+  final SendPort _sendPort;
+
+  Future get done => _doneCompleter.future;
+  final _doneCompleter = new Completer();
+
+  /// Whether [done] has been completed.
+  ///
+  /// This is distinct from [_closed] because [done] can complete with an error
+  /// without the user explicitly calling [close].
+  bool get _isDone => _doneCompleter.isCompleted;
+
+  /// Whether the user has called [close].
+  bool _closed = false;
+
+  /// Whether we're currently adding a stream with [addStream].
+  bool _inAddStream = false;
+
+  SendPortSink(this._receivePort, this._sendPort);
+
+  void add(T data) {
+    if (_closed) throw new StateError("Cannot add event after closing.");
+    if (_inAddStream) {
+      throw new StateError("Cannot add event while adding stream.");
+    }
+    if (_isDone) return;
+
+    _add(data);
+  }
+
+  /// A helper for [add] that doesn't check for [StateError]s.
+  ///
+  /// This is called from [addStream], so it shouldn't check [_inAddStream].
+  void _add(T data) {
+    _sendPort.send(data);
+  }
+
+  void addError(error, [StackTrace stackTrace]) {
+    if (_closed) throw new StateError("Cannot add event after closing.");
+    if (_inAddStream) {
+      throw new StateError("Cannot add event while adding stream.");
+    }
+
+    _close(error, stackTrace);
+  }
+
+  Future close() {
+    if (_inAddStream) {
+      throw new StateError("Cannot close sink while adding stream.");
+    }
+
+    _closed = true;
+    return _close();
+  }
+
+  /// A helper for [close] that doesn't check for [StateError]s.
+  ///
+  /// This is called from [addStream], so it shouldn't check [_inAddStream]. It
+  /// also forwards [error] and [stackTrace] to [done] if they're passed.
+  Future _close([error, StackTrace stackTrace]) {
+    if (_isDone) return done;
+
+    _receivePort.close();
+
+    if (error != null) {
+      _doneCompleter.completeError(error, stackTrace);
+    } else {
+      _doneCompleter.complete();
+    }
+
+    return done;
+  }
+
+  Future addStream(Stream<T> stream) {
+    if (_closed) throw new StateError("Cannot add stream after closing.");
+    if (_inAddStream) {
+      throw new StateError("Cannot add stream while adding stream.");
+    }
+    if (_isDone) return new Future.value();
+
+    _inAddStream = true;
+    var completer = new Completer.sync();
+    stream.listen(_add,
+        onError: (error, stackTrace) {
+          _close(error, stackTrace);
+          completer.complete();
+        },
+        onDone: completer.complete,
+        cancelOnError: true);
+    return completer.future.then((_) {
+      _inAddStream = false;
+    });
+  }
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index 97057ab..a8e26cd 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -7,5 +7,6 @@
   sdk: '>=1.8.0 <2.0.0'
 dependencies:
   async: '^1.8.0'
+  stack_trace: '^1.0.0'
 dev_dependencies:
   test: '^0.12.0'
diff --git a/test/isolate_channel_test.dart b/test/isolate_channel_test.dart
index 9e4fddc..fa4d8d5 100644
--- a/test/isolate_channel_test.dart
+++ b/test/isolate_channel_test.dart
@@ -123,4 +123,39 @@
       channel.sink.add(1);
     });
   });
+
+  group("connect constructors", () {
+    var connectPort;
+    setUp(() {
+      connectPort = new ReceivePort();
+    });
+
+    tearDown(() {
+      connectPort.close();
+    });
+
+    test("create a connected pair of channels", () {
+      var channel1 = new IsolateChannel.connectReceive(connectPort);
+      var channel2 = new IsolateChannel.connectSend(connectPort.sendPort);
+
+      channel1.sink.add(1);
+      channel1.sink.add(2);
+      channel1.sink.add(3);
+      expect(channel2.stream.take(3).toList(), completion(equals([1, 2, 3])));
+
+      channel2.sink.add(4);
+      channel2.sink.add(5);
+      channel2.sink.add(6);
+      expect(channel1.stream.take(3).toList(), completion(equals([4, 5, 6])));
+    });
+
+    test("the receiving channel produces an error if it gets the wrong message",
+        () {
+      var connectedChannel = new IsolateChannel.connectReceive(connectPort);
+      connectPort.sendPort.send("wrong value");
+
+      expect(connectedChannel.stream.toList(), throwsStateError);
+      expect(connectedChannel.sink.done, completes);
+    });
+  });
 }