Add an IsolateChannel class.

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//1635873002 .
diff --git a/lib/src/isolate_channel.dart b/lib/src/isolate_channel.dart
new file mode 100644
index 0000000..c664543
--- /dev/null
+++ b/lib/src/isolate_channel.dart
@@ -0,0 +1,146 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:isolate';
+
+import '../stream_channel.dart';
+
+/// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair,
+/// presumably with another isolate.
+///
+/// The remote endpoint doesn't necessarily need to be running an
+/// [IsolateChannel]. This can be used with any two ports, although the
+/// [StreamChannel] semantics mean that this class will treat them as being
+/// paired (for example, closing the [sink] will cause the [stream] to stop
+/// emitting events).
+///
+/// The underlying isolate ports have no notion of closing connections. This
+/// means that [stream] won't close unless [sink] is closed, and that closing
+/// [sink] won't cause the remote endpoint to close. Users should take care to
+/// ensure that they always close the [sink] of every [IsolateChannel] they use
+/// to avoid leaving dangling [ReceivePort]s.
+class IsolateChannel<T> extends StreamChannelMixin<T> {
+  /// The port that produces incoming messages.
+  ///
+  /// This is wrapped in a [StreamView] to produce [stream].
+  final ReceivePort _receivePort;
+
+  /// The port that sends outgoing messages.
+  final SendPort _sendPort;
+
+  Stream<T> get stream => _stream;
+  final Stream<T> _stream;
+
+  StreamSink<T> get sink => _sink;
+  _SendPortSink<T> _sink;
+
+  /// Creates a stream channel that receives messages from [receivePort] and
+  /// sends them over [sendPort].
+  IsolateChannel(ReceivePort receivePort, this._sendPort)
+      : _receivePort = receivePort,
+        _stream = new StreamView<T>(receivePort) {
+    _sink = new _SendPortSink<T>(this);
+  }
+}
+
+/// The sink for [IsolateChannel].
+///
+/// [SendPort] doesn't natively implement any sink API, so this adds that API as
+/// a wrapper. Closing this just closes the [ReceivePort].
+class _SendPortSink<T> implements StreamSink<T> {
+  /// The channel that this sink is for.
+  final IsolateChannel _channel;
+
+  Future get done => _doneCompleter.future;
+  final _doneCompleter = new Completer();
+
+  /// Whether [done] has been completed.
+  ///
+  /// This is distinct from [_closed] because [done] can complete with an error
+  /// without the user explicitly calling [close].
+  bool get _isDone => _doneCompleter.isCompleted;
+
+  /// Whether the user has called [close].
+  bool _closed = false;
+
+  /// Whether we're currently adding a stream with [addStream].
+  bool _inAddStream = false;
+
+  _SendPortSink(this._channel);
+
+  void add(T data) {
+    if (_closed) throw new StateError("Cannot add event after closing.");
+    if (_inAddStream) {
+      throw new StateError("Cannot add event while adding stream.");
+    }
+    if (_isDone) return;
+
+    _add(data);
+  }
+
+  /// A helper for [add] that doesn't check for [StateError]s.
+  ///
+  /// This is called from [addStream], so it shouldn't check [_inAddStream].
+  void _add(T data) {
+    _channel._sendPort.send(data);
+  }
+
+  void addError(error, [StackTrace stackTrace]) {
+    if (_closed) throw new StateError("Cannot add event after closing.");
+    if (_inAddStream) {
+      throw new StateError("Cannot add event while adding stream.");
+    }
+
+    _close(error, stackTrace);
+  }
+
+  Future close() {
+    if (_inAddStream) {
+      throw new StateError("Cannot close sink while adding stream.");
+    }
+
+    _closed = true;
+    return _close();
+  }
+
+  /// A helper for [close] that doesn't check for [StateError]s.
+  ///
+  /// This is called from [addStream], so it shouldn't check [_inAddStream]. It
+  /// also forwards [error] and [stackTrace] to [done] if they're passed.
+  Future _close([error, StackTrace stackTrace]) {
+    if (_isDone) return done;
+
+    _channel._receivePort.close();
+
+    if (error != null) {
+      _doneCompleter.completeError(error, stackTrace);
+    } else {
+      _doneCompleter.complete();
+    }
+
+    return done;
+  }
+
+  Future addStream(Stream<T> stream) {
+    if (_closed) throw new StateError("Cannot add stream after closing.");
+    if (_inAddStream) {
+      throw new StateError("Cannot add stream while adding stream.");
+    }
+    if (_isDone) return;
+
+    _inAddStream = true;
+    var completer = new Completer.sync();
+    stream.listen(_add,
+        onError: (error, stackTrace) {
+          _close(error, stackTrace);
+          completer.complete();
+        },
+        onDone: completer.complete,
+        cancelOnError: true);
+    return completer.future.then((_) {
+      _inAddStream = false;
+    });
+  }
+}
diff --git a/lib/stream_channel.dart b/lib/stream_channel.dart
index 4b3c659..ff36ec7 100644
--- a/lib/stream_channel.dart
+++ b/lib/stream_channel.dart
@@ -5,6 +5,7 @@
 import 'dart:async';
 
 export 'src/delegating_stream_channel.dart';
+export 'src/isolate_channel.dart';
 export 'src/multi_channel.dart';
 export 'src/stream_channel_completer.dart';
 
diff --git a/pubspec.yaml b/pubspec.yaml
index a0d8d22..97057ab 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -4,7 +4,7 @@
 author: Dart Team <misc@dartlang.org>
 homepage: https://github.com/dart-lang/stream_channel
 environment:
-  sdk: '>=1.0.0 <2.0.0'
+  sdk: '>=1.8.0 <2.0.0'
 dependencies:
   async: '^1.8.0'
 dev_dependencies:
diff --git a/test/isolate_channel_test.dart b/test/isolate_channel_test.dart
new file mode 100644
index 0000000..9e4fddc
--- /dev/null
+++ b/test/isolate_channel_test.dart
@@ -0,0 +1,126 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:isolate';
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+  var receivePort;
+  var sendPort;
+  var channel;
+  setUp(() {
+    receivePort = new ReceivePort();
+    var receivePortForSend = new ReceivePort();
+    sendPort = receivePortForSend.sendPort;
+    channel = new IsolateChannel(receivePortForSend, receivePort.sendPort);
+  });
+
+  tearDown(() {
+    receivePort.close();
+    channel.sink.close();
+  });
+
+  test("the channel can send messages", () {
+    channel.sink.add(1);
+    channel.sink.add(2);
+    channel.sink.add(3);
+
+    expect(receivePort.take(3).toList(), completion(equals([1, 2, 3])));
+  });
+
+  test("the channel can receive messages", () {
+    sendPort.send(1);
+    sendPort.send(2);
+    sendPort.send(3);
+
+    expect(channel.stream.take(3).toList(), completion(equals([1, 2, 3])));
+  });
+
+  test("events can't be added to an explicitly-closed sink", () {
+    expect(channel.sink.close(), completes);
+    expect(() => channel.sink.add(1), throwsStateError);
+    expect(() => channel.sink.addError("oh no"), throwsStateError);
+    expect(() => channel.sink.addStream(new Stream.fromIterable([])),
+        throwsStateError);
+  });
+
+  test("events can't be added while a stream is being added", () {
+    var controller = new StreamController();
+    channel.sink.addStream(controller.stream);
+
+    expect(() => channel.sink.add(1), throwsStateError);
+    expect(() => channel.sink.addError("oh no"), throwsStateError);
+    expect(() => channel.sink.addStream(new Stream.fromIterable([])),
+        throwsStateError);
+    expect(() => channel.sink.close(), throwsStateError);
+
+    controller.close();
+  });
+
+  group("stream channel rules", () {
+    test("closing the sink causes the stream to close before it emits any more "
+        "events", () {
+      sendPort.send(1);
+      sendPort.send(2);
+      sendPort.send(3);
+      sendPort.send(4);
+      sendPort.send(5);
+
+      channel.stream.listen(expectAsync((message) {
+        expect(message, equals(1));
+        channel.sink.close();
+      }, count: 1));
+    });
+
+    test("cancelling the stream's subscription has no effect on the sink",
+        () async {
+      channel.stream.listen(null).cancel();
+      await pumpEventQueue();
+
+      channel.sink.add(1);
+      channel.sink.add(2);
+      channel.sink.add(3);
+      expect(receivePort.take(3).toList(), completion(equals([1, 2, 3])));
+    });
+
+    test("the sink closes as soon as an error is added", () async {
+      channel.sink.addError("oh no");
+      channel.sink.add(1);
+      expect(channel.sink.done, throwsA("oh no"));
+
+      // Since the sink is closed, the stream should also be closed.
+      expect(channel.stream.isEmpty, completion(isTrue));
+
+      // The other end shouldn't receive the next event, since the sink was
+      // closed. Pump the event queue to give it a chance to.
+      receivePort.listen(expectAsync((_) {}, count: 0));
+      await pumpEventQueue();
+    });
+
+    test("the sink closes as soon as an error is added via addStream",
+        () async {
+      var canceled = false;
+      var controller = new StreamController(onCancel: () {
+        canceled = true;
+      });
+
+      // This future shouldn't get the error, because it's sent to [Sink.done].
+      expect(channel.sink.addStream(controller.stream), completes);
+
+      controller.addError("oh no");
+      expect(channel.sink.done, throwsA("oh no"));
+      await pumpEventQueue();
+      expect(canceled, isTrue);
+
+      // Even though the sink is closed, this shouldn't throw an error because
+      // the user didn't explicitly close it.
+      channel.sink.add(1);
+    });
+  });
+}