Provide more error-handling customization.
This makes it easier to wrap underlying channels that don't support
errors at all.
R=tjblasi@google.com
Review URL: https://codereview.chromium.org//1669953002 .
diff --git a/lib/src/guarantee_channel.dart b/lib/src/guarantee_channel.dart
index 849cc2f..1047e14 100644
--- a/lib/src/guarantee_channel.dart
+++ b/lib/src/guarantee_channel.dart
@@ -30,8 +30,10 @@
/// Whether the sink has closed, causing the underlying channel to disconnect.
bool _disconnected = false;
- GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) {
- _sink = new _GuaranteeSink<T>(innerSink, this);
+ GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink,
+ {bool allowSinkErrors: true}) {
+ _sink = new _GuaranteeSink<T>(innerSink, this,
+ allowErrors: allowSinkErrors);
// Enforce the single-subscription guarantee by changing a broadcast stream
// to single-subscription.
@@ -76,10 +78,13 @@
/// The [GuaranteeChannel] this belongs to.
final GuaranteeChannel<T> _channel;
- Future get done => _inner.done;
+ Future get done => _doneCompleter.future;
+ final _doneCompleter = new Completer();
- /// Whether the stream has emitted a done event, causing the underlying
- /// channel to disconnect.
+ /// Whether connection is disconnected.
+ ///
+ /// This can happen because the stream has emitted a done event, or because
+ /// the user added an error when [_allowErrors] is `false`.
bool _disconnected = false;
/// Whether the user has called [close].
@@ -96,7 +101,14 @@
/// Whether we're currently adding a stream with [addStream].
bool get _inAddStream => _addStreamSubscription != null;
- _GuaranteeSink(this._inner, this._channel);
+ /// Whether errors are passed on to the underlying sink.
+ ///
+ /// If this is `false`, any error passed to the sink is piped to [done] and
+ /// the underlying sink is closed.
+ final bool _allowErrors;
+
+ _GuaranteeSink(this._inner, this._channel, {bool allowErrors: true})
+ : _allowErrors = allowErrors;
void add(T data) {
if (_closed) throw new StateError("Cannot add event after closing.");
@@ -115,7 +127,20 @@
}
if (_disconnected) return;
- _inner.addError(error, stackTrace);
+ if (_allowErrors) {
+ _inner.addError(error, stackTrace);
+ return;
+ }
+
+ _doneCompleter.completeError(error, stackTrace);
+
+ // Treat an error like both the stream and sink disconnecting.
+ _onStreamDisconnected();
+ _channel._onSinkDisconnected();
+
+ // Ignore errors from the inner sink. We're already surfacing one error, and
+ // if the user handles it we don't want them to have another top-level.
+ _inner.close().catchError((_) {});
}
Future addStream(Stream<T> stream) {
@@ -141,11 +166,15 @@
throw new StateError("Cannot close sink while adding stream.");
}
+ if (_closed) return done;
_closed = true;
- if (_disconnected) return new Future.value();
- _channel._onSinkDisconnected();
- return _inner.close();
+ if (!_disconnected) {
+ _channel._onSinkDisconnected();
+ _doneCompleter.complete(_inner.close());
+ }
+
+ return done;
}
/// Called by [GuaranteeChannel] when the stream emits a done event.
@@ -154,8 +183,9 @@
/// sink should stop forwarding events.
void _onStreamDisconnected() {
_disconnected = true;
- if (!_inAddStream) return;
+ if (!_doneCompleter.isCompleted) _doneCompleter.complete();
+ if (!_inAddStream) return;
_addStreamCompleter.complete(_addStreamSubscription.cancel());
_addStreamCompleter = null;
_addStreamSubscription = null;
diff --git a/lib/src/stream_channel_controller.dart b/lib/src/stream_channel_controller.dart
index 1812a0e..ad78323 100644
--- a/lib/src/stream_channel_controller.dart
+++ b/lib/src/stream_channel_controller.dart
@@ -47,12 +47,18 @@
/// If [sync] is true, events added to either channel's sink are synchronously
/// dispatched to the other channel's stream. This should only be done if the
/// source of those events is already asynchronous.
- StreamChannelController({bool sync: false}) {
+ ///
+ /// If [allowForeignErrors] is `false`, errors are not allowed to be passed to
+ /// the foreign channel's sink. If any are, the connection will close and the
+ /// error will be forwarded to the foreign channel's [Sink.done] future. This
+ /// guarantees that the local stream will never emit errors.
+ StreamChannelController({bool allowForeignErrors: true, bool sync: false}) {
var localToForeignController = new StreamController<T>(sync: sync);
var foreignToLocalController = new StreamController<T>(sync: sync);
_local = new StreamChannel<T>.withGuarantees(
foreignToLocalController.stream, localToForeignController.sink);
_foreign = new StreamChannel<T>.withGuarantees(
- localToForeignController.stream, foreignToLocalController.sink);
+ localToForeignController.stream, foreignToLocalController.sink,
+ allowSinkErrors: allowForeignErrors);
}
}
diff --git a/lib/stream_channel.dart b/lib/stream_channel.dart
index 8c4fad5..992f702 100644
--- a/lib/stream_channel.dart
+++ b/lib/stream_channel.dart
@@ -78,8 +78,13 @@
/// [StreamChannel] documentation. This makes it somewhat less efficient than
/// just wrapping a stream and a sink directly, so [new StreamChannel] should
/// be used when the guarantees are provided natively.
- factory StreamChannel.withGuarantees(Stream<T> stream, StreamSink<T> sink) =>
- new GuaranteeChannel(stream, sink);
+ ///
+ /// If [allowSinkErrors] is `false`, errors are not allowed to be passed to
+ /// [sink]. If any are, the connection will close and the error will be
+ /// forwarded to [Sink.done].
+ factory StreamChannel.withGuarantees(Stream<T> stream, StreamSink<T> sink,
+ {bool allowSinkErrors: true}) =>
+ new GuaranteeChannel(stream, sink, allowSinkErrors: allowSinkErrors);
/// Connects [this] to [other], so that any values emitted by either are sent
/// directly to the other.
diff --git a/pubspec.yaml b/pubspec.yaml
index f0b830c..52ee521 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: stream_channel
-version: 1.2.0-dev
+version: 1.2.0
description: An abstraction for two-way communication channels.
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/stream_channel
diff --git a/test/stream_channel_controller_test.dart b/test/stream_channel_controller_test.dart
index e45f090..cf8a31b 100644
--- a/test/stream_channel_controller_test.dart
+++ b/test/stream_channel_controller_test.dart
@@ -25,6 +25,17 @@
controller.foreign.sink..add(1)..add(2)..add(3)..close();
expect(controller.local.stream.toList(), completion(equals([1, 2, 3])));
});
+
+ test("with allowForeignErrors: false, shuts down the connection if an "
+ "error is added to the foreign channel", () {
+ controller = new StreamChannelController(allowForeignErrors: false);
+
+ controller.foreign.sink.addError("oh no");
+ expect(controller.foreign.sink.done, throwsA("oh no"));
+ expect(controller.foreign.stream.toList(), completion(isEmpty));
+ expect(controller.local.sink.done, completes);
+ expect(controller.local.stream.toList(), completion(isEmpty));
+ });
});
group("synchronously", () {
diff --git a/test/with_guarantees_test.dart b/test/with_guarantees_test.dart
index 0e95f9d..38afefa 100644
--- a/test/with_guarantees_test.dart
+++ b/test/with_guarantees_test.dart
@@ -107,4 +107,44 @@
channel.sink.addError("error");
expect(sinkController.stream.first, throwsA("error"));
});
+
+ test("Sink.done completes once the stream is done", () {
+ channel.stream.listen(null);
+ expect(channel.sink.done, completes);
+ streamController.close();
+ });
+
+ group("with allowSinkErrors: false", () {
+ setUp(() {
+ streamController = new StreamController();
+ sinkController = new StreamController();
+ channel = new StreamChannel.withGuarantees(
+ streamController.stream, sinkController.sink, allowSinkErrors: false);
+ });
+
+ test("forwards errors to Sink.done but not the stream", () {
+ channel.sink.addError("oh no");
+ expect(channel.sink.done, throwsA("oh no"));
+ sinkController.stream.listen(null,
+ onError: expectAsync((_) {}, count: 0));
+ });
+
+ test("adding an error causes the stream to emit a done event", () {
+ expect(channel.sink.done, throwsA("oh no"));
+
+ streamController.add(1);
+ streamController.add(2);
+ streamController.add(3);
+
+ expect(channel.stream.listen(expectAsync((event) {
+ if (event == 2) channel.sink.addError("oh no");
+ }, count: 2)).asFuture(), completes);
+ });
+
+ test("adding an error closes the inner sink", () {
+ channel.sink.addError("oh no");
+ expect(channel.sink.done, throwsA("oh no"));
+ expect(sinkController.stream.toList(), completion(isEmpty));
+ });
+ });
}