Make IsolateChannel use StreamChannelCompleter.
This exposed some lingering bugs in GuaranteeChannel as well.
R=tjblasi@google.com
Review URL: https://codereview.chromium.org//1671763002 .
diff --git a/lib/src/guarantee_channel.dart b/lib/src/guarantee_channel.dart
index 1047e14..a874799 100644
--- a/lib/src/guarantee_channel.dart
+++ b/lib/src/guarantee_channel.dart
@@ -127,6 +127,14 @@
}
if (_disconnected) return;
+ _addError(error, stackTrace);
+ }
+
+ /// Like [addError], but doesn't check to ensure that an error can be added.
+ ///
+ /// This is called from [addStream], so it shouldn't fail if a stream is being
+ /// added.
+ void _addError(error, [StackTrace stackTrace]) {
if (_allowErrors) {
_inner.addError(error, stackTrace);
return;
@@ -153,7 +161,7 @@
_addStreamCompleter = new Completer.sync();
_addStreamSubscription = stream.listen(
_inner.add,
- onError: _inner.addError,
+ onError: _addError,
onDone: _addStreamCompleter.complete);
return _addStreamCompleter.future.then((_) {
_addStreamCompleter = null;
diff --git a/lib/src/isolate_channel.dart b/lib/src/isolate_channel.dart
index a466d87..3466d19 100644
--- a/lib/src/isolate_channel.dart
+++ b/lib/src/isolate_channel.dart
@@ -9,7 +9,6 @@
import 'package:stack_trace/stack_trace.dart';
import '../stream_channel.dart';
-import 'isolate_channel/send_port_sink.dart';
/// A [StreamChannel] that communicates over a [ReceivePort]/[SendPort] pair,
/// presumably with another isolate.
@@ -54,10 +53,13 @@
var subscription;
subscription = receivePort.listen((message) {
if (message is SendPort) {
- streamCompleter.setSourceStream(
- new SubscriptionStream<T>(subscription));
- sinkCompleter.setDestinationSink(
- new SendPortSink<T>(receivePort, message));
+ var controller = new StreamChannelController(allowForeignErrors: false);
+ new SubscriptionStream<T>(subscription).pipe(controller.local.sink);
+ controller.local.stream.listen(message.send,
+ onDone: receivePort.close);
+
+ streamCompleter.setSourceStream(controller.foreign.stream);
+ sinkCompleter.setDestinationSink(controller.foreign.sink);
return;
}
@@ -88,9 +90,13 @@
/// Creates a stream channel that receives messages from [receivePort] and
/// sends them over [sendPort].
- IsolateChannel(ReceivePort receivePort, SendPort sendPort)
- : stream = new StreamView<T>(receivePort),
- sink = new SendPortSink<T>(receivePort, sendPort);
+ factory IsolateChannel(ReceivePort receivePort, SendPort sendPort) {
+ var controller = new StreamChannelController(allowForeignErrors: false);
+ receivePort.pipe(controller.local.sink);
+ controller.local.stream.listen(sendPort.send, onDone: receivePort.close);
+ return new IsolateChannel._(
+ controller.foreign.stream, controller.foreign.sink);
+ }
IsolateChannel._(this.stream, this.sink);
}
diff --git a/lib/src/isolate_channel/send_port_sink.dart b/lib/src/isolate_channel/send_port_sink.dart
deleted file mode 100644
index d98f1da..0000000
--- a/lib/src/isolate_channel/send_port_sink.dart
+++ /dev/null
@@ -1,111 +0,0 @@
-// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-import 'dart:async';
-import 'dart:isolate';
-
-/// The sink for [IsolateChannel].
-///
-/// [SendPort] doesn't natively implement any sink API, so this adds that API as
-/// a wrapper. Closing this just closes the [ReceivePort].
-class SendPortSink<T> implements StreamSink<T> {
- /// The port that produces incoming messages.
- ///
- /// This is wrapped in a [StreamView] to produce [stream].
- final ReceivePort _receivePort;
-
- /// The port that sends outgoing messages.
- final SendPort _sendPort;
-
- Future get done => _doneCompleter.future;
- final _doneCompleter = new Completer();
-
- /// Whether [done] has been completed.
- ///
- /// This is distinct from [_closed] because [done] can complete with an error
- /// without the user explicitly calling [close].
- bool get _isDone => _doneCompleter.isCompleted;
-
- /// Whether the user has called [close].
- bool _closed = false;
-
- /// Whether we're currently adding a stream with [addStream].
- bool _inAddStream = false;
-
- SendPortSink(this._receivePort, this._sendPort);
-
- void add(T data) {
- if (_closed) throw new StateError("Cannot add event after closing.");
- if (_inAddStream) {
- throw new StateError("Cannot add event while adding stream.");
- }
- if (_isDone) return;
-
- _add(data);
- }
-
- /// A helper for [add] that doesn't check for [StateError]s.
- ///
- /// This is called from [addStream], so it shouldn't check [_inAddStream].
- void _add(T data) {
- _sendPort.send(data);
- }
-
- void addError(error, [StackTrace stackTrace]) {
- if (_closed) throw new StateError("Cannot add event after closing.");
- if (_inAddStream) {
- throw new StateError("Cannot add event while adding stream.");
- }
-
- _close(error, stackTrace);
- }
-
- Future close() {
- if (_inAddStream) {
- throw new StateError("Cannot close sink while adding stream.");
- }
-
- _closed = true;
- return _close();
- }
-
- /// A helper for [close] that doesn't check for [StateError]s.
- ///
- /// This is called from [addStream], so it shouldn't check [_inAddStream]. It
- /// also forwards [error] and [stackTrace] to [done] if they're passed.
- Future _close([error, StackTrace stackTrace]) {
- if (_isDone) return done;
-
- _receivePort.close();
-
- if (error != null) {
- _doneCompleter.completeError(error, stackTrace);
- } else {
- _doneCompleter.complete();
- }
-
- return done;
- }
-
- Future addStream(Stream<T> stream) {
- if (_closed) throw new StateError("Cannot add stream after closing.");
- if (_inAddStream) {
- throw new StateError("Cannot add stream while adding stream.");
- }
- if (_isDone) return new Future.value();
-
- _inAddStream = true;
- var completer = new Completer.sync();
- stream.listen(_add,
- onError: (error, stackTrace) {
- _close(error, stackTrace);
- completer.complete();
- },
- onDone: completer.complete,
- cancelOnError: true);
- return completer.future.then((_) {
- _inAddStream = false;
- });
- }
-}
diff --git a/lib/src/stream_channel_controller.dart b/lib/src/stream_channel_controller.dart
index ad78323..45b2865 100644
--- a/lib/src/stream_channel_controller.dart
+++ b/lib/src/stream_channel_controller.dart
@@ -15,13 +15,13 @@
///
/// ```dart
/// StreamChannel isolateChannel(ReceivePort receivePort, SendPort sendPort) {
-/// var controller = new StreamChannelController();
+/// var controller = new StreamChannelController(allowForeignErrors: false);
///
/// // Pipe all events from the receive port into the local sink...
/// receivePort.pipe(controller.local.sink);
///
/// // ...and all events from the local stream into the send port.
-/// controller.local.listen(sendPort.add, onDone: receivePort.close);
+/// controller.local.stream.listen(sendPort.send, onDone: receivePort.close);
///
/// // Then return the foreign controller for your users to use.
/// return controller.foreign;
diff --git a/test/with_guarantees_test.dart b/test/with_guarantees_test.dart
index 38afefa..409b28f 100644
--- a/test/with_guarantees_test.dart
+++ b/test/with_guarantees_test.dart
@@ -114,6 +114,29 @@
streamController.close();
});
+ test("events can't be added to an explicitly-closed sink", () {
+ sinkController.stream.listen(null); // Work around sdk#19095.
+
+ expect(channel.sink.close(), completes);
+ expect(() => channel.sink.add(1), throwsStateError);
+ expect(() => channel.sink.addError("oh no"), throwsStateError);
+ expect(() => channel.sink.addStream(new Stream.fromIterable([])),
+ throwsStateError);
+ });
+
+ test("events can't be added while a stream is being added", () {
+ var controller = new StreamController();
+ channel.sink.addStream(controller.stream);
+
+ expect(() => channel.sink.add(1), throwsStateError);
+ expect(() => channel.sink.addError("oh no"), throwsStateError);
+ expect(() => channel.sink.addStream(new Stream.fromIterable([])),
+ throwsStateError);
+ expect(() => channel.sink.close(), throwsStateError);
+
+ controller.close();
+ });
+
group("with allowSinkErrors: false", () {
setUp(() {
streamController = new StreamController();
@@ -146,5 +169,25 @@
expect(channel.sink.done, throwsA("oh no"));
expect(sinkController.stream.toList(), completion(isEmpty));
});
+
+ test("adding an error via via addStream causes the stream to emit a done "
+ "event", () async {
+ var canceled = false;
+ var controller = new StreamController(onCancel: () {
+ canceled = true;
+ });
+
+ // This future shouldn't get the error, because it's sent to [Sink.done].
+ expect(channel.sink.addStream(controller.stream), completes);
+
+ controller.addError("oh no");
+ expect(channel.sink.done, throwsA("oh no"));
+ await pumpEventQueue();
+ expect(canceled, isTrue);
+
+ // Even though the sink is closed, this shouldn't throw an error because
+ // the user didn't explicitly close it.
+ channel.sink.add(1);
+ });
});
}