Add an IsolateChannel class.
R=rnystrom@google.com
Review URL: https://codereview.chromium.org//1635873002 .
diff --git a/lib/src/isolate_channel.dart b/lib/src/isolate_channel.dart
new file mode 100644
index 0000000..c664543
--- /dev/null
+++ b/lib/src/isolate_channel.dart
@@ -0,0 +1,146 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:isolate';
+
+import '../stream_channel.dart';
+
+/// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair,
+/// presumably with another isolate.
+///
+/// The remote endpoint doesn't necessarily need to be running an
+/// [IsolateChannel]. This can be used with any two ports, although the
+/// [StreamChannel] semantics mean that this class will treat them as being
+/// paired (for example, closing the [sink] will cause the [stream] to stop
+/// emitting events).
+///
+/// The underlying isolate ports have no notion of closing connections. This
+/// means that [stream] won't close unless [sink] is closed, and that closing
+/// [sink] won't cause the remote endpoint to close. Users should take care to
+/// ensure that they always close the [sink] of every [IsolateChannel] they use
+/// to avoid leaving dangling [ReceivePort]s.
+class IsolateChannel<T> extends StreamChannelMixin<T> {
+ /// The port that produces incoming messages.
+ ///
+ /// This is wrapped in a [StreamView] to produce [stream].
+ final ReceivePort _receivePort;
+
+ /// The port that sends outgoing messages.
+ final SendPort _sendPort;
+
+ Stream<T> get stream => _stream;
+ final Stream<T> _stream;
+
+ StreamSink<T> get sink => _sink;
+ _SendPortSink<T> _sink;
+
+ /// Creates a stream channel that receives messages from [receivePort] and
+ /// sends them over [sendPort].
+ IsolateChannel(ReceivePort receivePort, this._sendPort)
+ : _receivePort = receivePort,
+ _stream = new StreamView<T>(receivePort) {
+ _sink = new _SendPortSink<T>(this);
+ }
+}
+
+/// The sink for [IsolateChannel].
+///
+/// [SendPort] doesn't natively implement any sink API, so this adds that API as
+/// a wrapper. Closing this just closes the [ReceivePort].
+class _SendPortSink<T> implements StreamSink<T> {
+ /// The channel that this sink is for.
+ final IsolateChannel _channel;
+
+ Future get done => _doneCompleter.future;
+ final _doneCompleter = new Completer();
+
+ /// Whether [done] has been completed.
+ ///
+ /// This is distinct from [_closed] because [done] can complete with an error
+ /// without the user explicitly calling [close].
+ bool get _isDone => _doneCompleter.isCompleted;
+
+ /// Whether the user has called [close].
+ bool _closed = false;
+
+ /// Whether we're currently adding a stream with [addStream].
+ bool _inAddStream = false;
+
+ _SendPortSink(this._channel);
+
+ void add(T data) {
+ if (_closed) throw new StateError("Cannot add event after closing.");
+ if (_inAddStream) {
+ throw new StateError("Cannot add event while adding stream.");
+ }
+ if (_isDone) return;
+
+ _add(data);
+ }
+
+ /// A helper for [add] that doesn't check for [StateError]s.
+ ///
+ /// This is called from [addStream], so it shouldn't check [_inAddStream].
+ void _add(T data) {
+ _channel._sendPort.send(data);
+ }
+
+ void addError(error, [StackTrace stackTrace]) {
+ if (_closed) throw new StateError("Cannot add event after closing.");
+ if (_inAddStream) {
+ throw new StateError("Cannot add event while adding stream.");
+ }
+
+ _close(error, stackTrace);
+ }
+
+ Future close() {
+ if (_inAddStream) {
+ throw new StateError("Cannot close sink while adding stream.");
+ }
+
+ _closed = true;
+ return _close();
+ }
+
+ /// A helper for [close] that doesn't check for [StateError]s.
+ ///
+ /// This is called from [addStream], so it shouldn't check [_inAddStream]. It
+ /// also forwards [error] and [stackTrace] to [done] if they're passed.
+ Future _close([error, StackTrace stackTrace]) {
+ if (_isDone) return done;
+
+ _channel._receivePort.close();
+
+ if (error != null) {
+ _doneCompleter.completeError(error, stackTrace);
+ } else {
+ _doneCompleter.complete();
+ }
+
+ return done;
+ }
+
+ Future addStream(Stream<T> stream) {
+ if (_closed) throw new StateError("Cannot add stream after closing.");
+ if (_inAddStream) {
+ throw new StateError("Cannot add stream while adding stream.");
+ }
+ if (_isDone) return;
+
+ _inAddStream = true;
+ var completer = new Completer.sync();
+ stream.listen(_add,
+ onError: (error, stackTrace) {
+ _close(error, stackTrace);
+ completer.complete();
+ },
+ onDone: completer.complete,
+ cancelOnError: true);
+ return completer.future.then((_) {
+ _inAddStream = false;
+ });
+ }
+}
diff --git a/lib/stream_channel.dart b/lib/stream_channel.dart
index 4b3c659..ff36ec7 100644
--- a/lib/stream_channel.dart
+++ b/lib/stream_channel.dart
@@ -5,6 +5,7 @@
import 'dart:async';
export 'src/delegating_stream_channel.dart';
+export 'src/isolate_channel.dart';
export 'src/multi_channel.dart';
export 'src/stream_channel_completer.dart';
diff --git a/pubspec.yaml b/pubspec.yaml
index a0d8d22..97057ab 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -4,7 +4,7 @@
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/stream_channel
environment:
- sdk: '>=1.0.0 <2.0.0'
+ sdk: '>=1.8.0 <2.0.0'
dependencies:
async: '^1.8.0'
dev_dependencies:
diff --git a/test/isolate_channel_test.dart b/test/isolate_channel_test.dart
new file mode 100644
index 0000000..9e4fddc
--- /dev/null
+++ b/test/isolate_channel_test.dart
@@ -0,0 +1,126 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:isolate';
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ var receivePort;
+ var sendPort;
+ var channel;
+ setUp(() {
+ receivePort = new ReceivePort();
+ var receivePortForSend = new ReceivePort();
+ sendPort = receivePortForSend.sendPort;
+ channel = new IsolateChannel(receivePortForSend, receivePort.sendPort);
+ });
+
+ tearDown(() {
+ receivePort.close();
+ channel.sink.close();
+ });
+
+ test("the channel can send messages", () {
+ channel.sink.add(1);
+ channel.sink.add(2);
+ channel.sink.add(3);
+
+ expect(receivePort.take(3).toList(), completion(equals([1, 2, 3])));
+ });
+
+ test("the channel can receive messages", () {
+ sendPort.send(1);
+ sendPort.send(2);
+ sendPort.send(3);
+
+ expect(channel.stream.take(3).toList(), completion(equals([1, 2, 3])));
+ });
+
+ test("events can't be added to an explicitly-closed sink", () {
+ expect(channel.sink.close(), completes);
+ expect(() => channel.sink.add(1), throwsStateError);
+ expect(() => channel.sink.addError("oh no"), throwsStateError);
+ expect(() => channel.sink.addStream(new Stream.fromIterable([])),
+ throwsStateError);
+ });
+
+ test("events can't be added while a stream is being added", () {
+ var controller = new StreamController();
+ channel.sink.addStream(controller.stream);
+
+ expect(() => channel.sink.add(1), throwsStateError);
+ expect(() => channel.sink.addError("oh no"), throwsStateError);
+ expect(() => channel.sink.addStream(new Stream.fromIterable([])),
+ throwsStateError);
+ expect(() => channel.sink.close(), throwsStateError);
+
+ controller.close();
+ });
+
+ group("stream channel rules", () {
+ test("closing the sink causes the stream to close before it emits any more "
+ "events", () {
+ sendPort.send(1);
+ sendPort.send(2);
+ sendPort.send(3);
+ sendPort.send(4);
+ sendPort.send(5);
+
+ channel.stream.listen(expectAsync((message) {
+ expect(message, equals(1));
+ channel.sink.close();
+ }, count: 1));
+ });
+
+ test("cancelling the stream's subscription has no effect on the sink",
+ () async {
+ channel.stream.listen(null).cancel();
+ await pumpEventQueue();
+
+ channel.sink.add(1);
+ channel.sink.add(2);
+ channel.sink.add(3);
+ expect(receivePort.take(3).toList(), completion(equals([1, 2, 3])));
+ });
+
+ test("the sink closes as soon as an error is added", () async {
+ channel.sink.addError("oh no");
+ channel.sink.add(1);
+ expect(channel.sink.done, throwsA("oh no"));
+
+ // Since the sink is closed, the stream should also be closed.
+ expect(channel.stream.isEmpty, completion(isTrue));
+
+ // The other end shouldn't receive the next event, since the sink was
+ // closed. Pump the event queue to give it a chance to.
+ receivePort.listen(expectAsync((_) {}, count: 0));
+ await pumpEventQueue();
+ });
+
+ test("the sink closes as soon as an error is added via addStream",
+ () async {
+ var canceled = false;
+ var controller = new StreamController(onCancel: () {
+ canceled = true;
+ });
+
+ // This future shouldn't get the error, because it's sent to [Sink.done].
+ expect(channel.sink.addStream(controller.stream), completes);
+
+ controller.addError("oh no");
+ expect(channel.sink.done, throwsA("oh no"));
+ await pumpEventQueue();
+ expect(canceled, isTrue);
+
+ // Even though the sink is closed, this shouldn't throw an error because
+ // the user didn't explicitly close it.
+ channel.sink.add(1);
+ });
+ });
+}