Add StreamChannel.withGuarantees and StreamChannelController.
These APIs make it easier for users to create their own stream channels
that follow the StreamChannel guarantees.
R=tjblasi@google.com
Review URL: https://codereview.chromium.org//1662773003 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8138708..0b61d81 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,11 @@
+## 1.2.0
+
+* Add `new StreamChannel.withGuarantees()`, which creates a channel with extra
+ wrapping to ensure that it obeys the stream channel guarantees.
+
+* Add `StreamChannelController`, which can be used to create custom
+ `StreamChannel` objects.
+
## 1.1.1
* Fix the type annotation for `StreamChannel.transform()`'s parameter.
diff --git a/lib/src/guarantee_channel.dart b/lib/src/guarantee_channel.dart
new file mode 100644
index 0000000..849cc2f
--- /dev/null
+++ b/lib/src/guarantee_channel.dart
@@ -0,0 +1,163 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A [StreamChannel] that enforces the stream channel guarantees.
+///
+/// This is exposed via [new StreamChannel.withGuarantees].
+class GuaranteeChannel<T> extends StreamChannelMixin<T> {
+ Stream<T> get stream => _streamController.stream;
+
+ StreamSink<T> get sink => _sink;
+ _GuaranteeSink<T> _sink;
+
+ /// The controller for [stream].
+ ///
+ /// This intermediate controller allows us to continue listening for a done
+ /// event even after the user has canceled their subscription, and to send our
+ /// own done event when the sink is closed.
+ StreamController<T> _streamController;
+
+ /// The subscription to the inner stream.
+ StreamSubscription<T> _subscription;
+
+ /// Whether the sink has closed, causing the underlying channel to disconnect.
+ bool _disconnected = false;
+
+ GuaranteeChannel(Stream<T> innerStream, StreamSink<T> innerSink) {
+ _sink = new _GuaranteeSink<T>(innerSink, this);
+
+ // Enforce the single-subscription guarantee by changing a broadcast stream
+ // to single-subscription.
+ if (innerStream.isBroadcast) {
+ innerStream = innerStream.transform(
+ const SingleSubscriptionTransformer());
+ }
+
+ _streamController = new StreamController<T>(onListen: () {
+ // If the sink has disconnected, we've already called
+ // [_streamController.close].
+ if (_disconnected) return;
+
+ _subscription = innerStream.listen(_streamController.add,
+ onError: _streamController.addError,
+ onDone: () {
+ _sink._onStreamDisconnected();
+ _streamController.close();
+ });
+ }, sync: true);
+ }
+
+ /// Called by [_GuaranteeSink] when the user closes it.
+ ///
+ /// The sink closing indicates that the connection is closed, so the stream
+ /// should stop emitting events.
+ void _onSinkDisconnected() {
+ _disconnected = true;
+ if (_subscription != null) _subscription.cancel();
+ _streamController.close();
+ }
+}
+
+/// The sink for [GuaranteeChannel].
+///
+/// This wraps the inner sink to ignore events and cancel any in-progress
+/// [addStream] calls when the underlying channel closes.
+class _GuaranteeSink<T> implements StreamSink<T> {
+ /// The inner sink being wrapped.
+ final StreamSink<T> _inner;
+
+ /// The [GuaranteeChannel] this belongs to.
+ final GuaranteeChannel<T> _channel;
+
+ Future get done => _inner.done;
+
+ /// Whether the stream has emitted a done event, causing the underlying
+ /// channel to disconnect.
+ bool _disconnected = false;
+
+ /// Whether the user has called [close].
+ bool _closed = false;
+
+ /// The subscription to the stream passed to [addStream], if a stream is
+ /// currently being added.
+ StreamSubscription<T> _addStreamSubscription;
+
+ /// The completer for the future returned by [addStream], if a stream is
+ /// currently being added.
+ Completer _addStreamCompleter;
+
+ /// Whether we're currently adding a stream with [addStream].
+ bool get _inAddStream => _addStreamSubscription != null;
+
+ _GuaranteeSink(this._inner, this._channel);
+
+ void add(T data) {
+ if (_closed) throw new StateError("Cannot add event after closing.");
+ if (_inAddStream) {
+ throw new StateError("Cannot add event while adding stream.");
+ }
+ if (_disconnected) return;
+
+ _inner.add(data);
+ }
+
+ void addError(error, [StackTrace stackTrace]) {
+ if (_closed) throw new StateError("Cannot add event after closing.");
+ if (_inAddStream) {
+ throw new StateError("Cannot add event while adding stream.");
+ }
+ if (_disconnected) return;
+
+ _inner.addError(error, stackTrace);
+ }
+
+ Future addStream(Stream<T> stream) {
+ if (_closed) throw new StateError("Cannot add stream after closing.");
+ if (_inAddStream) {
+ throw new StateError("Cannot add stream while adding stream.");
+ }
+ if (_disconnected) return new Future.value();
+
+ _addStreamCompleter = new Completer.sync();
+ _addStreamSubscription = stream.listen(
+ _inner.add,
+ onError: _inner.addError,
+ onDone: _addStreamCompleter.complete);
+ return _addStreamCompleter.future.then((_) {
+ _addStreamCompleter = null;
+ _addStreamSubscription = null;
+ });
+ }
+
+ Future close() {
+ if (_inAddStream) {
+ throw new StateError("Cannot close sink while adding stream.");
+ }
+
+ _closed = true;
+ if (_disconnected) return new Future.value();
+
+ _channel._onSinkDisconnected();
+ return _inner.close();
+ }
+
+ /// Called by [GuaranteeChannel] when the stream emits a done event.
+ ///
+ /// The stream being done indicates that the connection is closed, so the
+ /// sink should stop forwarding events.
+ void _onStreamDisconnected() {
+ _disconnected = true;
+ if (!_inAddStream) return;
+
+ _addStreamCompleter.complete(_addStreamSubscription.cancel());
+ _addStreamCompleter = null;
+ _addStreamSubscription = null;
+ }
+}
diff --git a/lib/src/stream_channel_controller.dart b/lib/src/stream_channel_controller.dart
new file mode 100644
index 0000000..1812a0e
--- /dev/null
+++ b/lib/src/stream_channel_controller.dart
@@ -0,0 +1,58 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import '../stream_channel.dart';
+
+/// A controller for exposing a new [StreamChannel].
+///
+/// This exposes two connected [StreamChannel]s, [local] and [foreign]. The
+/// user's code should use [local] to emit and receive events. Then [foreign]
+/// can be returned for others to use. For example, here's a simplified version
+/// of the implementation of [new IsolateChannel]:
+///
+/// ```dart
+/// StreamChannel isolateChannel(ReceivePort receivePort, SendPort sendPort) {
+/// var controller = new StreamChannelController();
+///
+/// // Pipe all events from the receive port into the local sink...
+/// receivePort.pipe(controller.local.sink);
+///
+/// // ...and all events from the local stream into the send port.
+/// controller.local.listen(sendPort.add, onDone: receivePort.close);
+///
+/// // Then return the foreign controller for your users to use.
+/// return controller.foreign;
+/// }
+/// ```
+class StreamChannelController<T> {
+ /// The local channel.
+ ///
+ /// This channel should be used directly by the creator of this
+ /// [StreamChannelController] to send and receive events.
+ StreamChannel<T> get local => _local;
+ StreamChannel<T> _local;
+
+ /// The foreign channel.
+ ///
+ /// This channel should be returned to external users so they can communicate
+ /// with [local].
+ StreamChannel<T> get foreign => _foreign;
+ StreamChannel<T> _foreign;
+
+ /// Creates a [StreamChannelController].
+ ///
+ /// If [sync] is true, events added to either channel's sink are synchronously
+ /// dispatched to the other channel's stream. This should only be done if the
+ /// source of those events is already asynchronous.
+ StreamChannelController({bool sync: false}) {
+ var localToForeignController = new StreamController<T>(sync: sync);
+ var foreignToLocalController = new StreamController<T>(sync: sync);
+ _local = new StreamChannel<T>.withGuarantees(
+ foreignToLocalController.stream, localToForeignController.sink);
+ _foreign = new StreamChannel<T>.withGuarantees(
+ localToForeignController.stream, foreignToLocalController.sink);
+ }
+}
diff --git a/lib/stream_channel.dart b/lib/stream_channel.dart
index 7fff674..8c4fad5 100644
--- a/lib/stream_channel.dart
+++ b/lib/stream_channel.dart
@@ -6,6 +6,7 @@
import 'package:async/async.dart';
+import 'src/guarantee_channel.dart';
import 'src/stream_channel_transformer.dart';
export 'src/delegating_stream_channel.dart';
@@ -13,6 +14,7 @@
export 'src/json_document_transformer.dart';
export 'src/multi_channel.dart';
export 'src/stream_channel_completer.dart';
+export 'src/stream_channel_controller.dart';
export 'src/stream_channel_transformer.dart';
/// An abstract class representing a two-way communication channel.
@@ -65,10 +67,20 @@
/// Creates a new [StreamChannel] that communicates over [stream] and [sink].
///
/// Note that this stream/sink pair must provide the guarantees listed in the
- /// [StreamChannel] documentation.
+ /// [StreamChannel] documentation. If they don't do so natively, [new
+ /// StreamChannel.withGuarantees] should be used instead.
factory StreamChannel(Stream<T> stream, StreamSink<T> sink) =>
new _StreamChannel<T>(stream, sink);
+ /// Creates a new [StreamChannel] that communicates over [stream] and [sink].
+ ///
+ /// Unlike [new StreamChannel], this enforces the guarantees listed in the
+ /// [StreamChannel] documentation. This makes it somewhat less efficient than
+ /// just wrapping a stream and a sink directly, so [new StreamChannel] should
+ /// be used when the guarantees are provided natively.
+ factory StreamChannel.withGuarantees(Stream<T> stream, StreamSink<T> sink) =>
+ new GuaranteeChannel(stream, sink);
+
/// Connects [this] to [other], so that any values emitted by either are sent
/// directly to the other.
void pipe(StreamChannel<T> other);
diff --git a/pubspec.yaml b/pubspec.yaml
index 7d1c6a3..f0b830c 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: stream_channel
-version: 1.1.1
+version: 1.2.0-dev
description: An abstraction for two-way communication channels.
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/stream_channel
diff --git a/test/stream_channel_controller_test.dart b/test/stream_channel_controller_test.dart
new file mode 100644
index 0000000..e45f090
--- /dev/null
+++ b/test/stream_channel_controller_test.dart
@@ -0,0 +1,86 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ group("asynchronously", () {
+ var controller;
+ setUp(() {
+ controller = new StreamChannelController();
+ });
+
+ test("forwards events from the local sink to the foreign stream", () {
+ controller.local.sink..add(1)..add(2)..add(3)..close();
+ expect(controller.foreign.stream.toList(), completion(equals([1, 2, 3])));
+ });
+
+ test("forwards events from the foreign sink to the local stream", () {
+ controller.foreign.sink..add(1)..add(2)..add(3)..close();
+ expect(controller.local.stream.toList(), completion(equals([1, 2, 3])));
+ });
+ });
+
+ group("synchronously", () {
+ var controller;
+ setUp(() {
+ controller = new StreamChannelController(sync: true);
+ });
+
+ test("synchronously forwards events from the local sink to the foreign "
+ "stream", () {
+ var receivedEvent = false;
+ var receivedError = false;
+ var receivedDone = false;
+ controller.foreign.stream.listen(expectAsync((event) {
+ expect(event, equals(1));
+ receivedEvent = true;
+ }), onError: expectAsync((error) {
+ expect(error, equals("oh no"));
+ receivedError = true;
+ }), onDone: expectAsync(() {
+ receivedDone = true;
+ }));
+
+ controller.local.sink.add(1);
+ expect(receivedEvent, isTrue);
+
+ controller.local.sink.addError("oh no");
+ expect(receivedError, isTrue);
+
+ controller.local.sink.close();
+ expect(receivedDone, isTrue);
+ });
+
+ test("synchronously forwards events from the foreign sink to the local "
+ "stream", () {
+ var receivedEvent = false;
+ var receivedError = false;
+ var receivedDone = false;
+ controller.local.stream.listen(expectAsync((event) {
+ expect(event, equals(1));
+ receivedEvent = true;
+ }), onError: expectAsync((error) {
+ expect(error, equals("oh no"));
+ receivedError = true;
+ }), onDone: expectAsync(() {
+ receivedDone = true;
+ }));
+
+ controller.foreign.sink.add(1);
+ expect(receivedEvent, isTrue);
+
+ controller.foreign.sink.addError("oh no");
+ expect(receivedError, isTrue);
+
+ controller.foreign.sink.close();
+ expect(receivedDone, isTrue);
+ });
+ });
+}
diff --git a/test/with_guarantees_test.dart b/test/with_guarantees_test.dart
new file mode 100644
index 0000000..0e95f9d
--- /dev/null
+++ b/test/with_guarantees_test.dart
@@ -0,0 +1,110 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ var streamController;
+ var sinkController;
+ var channel;
+ setUp(() {
+ streamController = new StreamController();
+ sinkController = new StreamController();
+ channel = new StreamChannel.withGuarantees(
+ streamController.stream, sinkController.sink);
+ });
+
+ group("with a broadcast stream", () {
+ setUp(() {
+ streamController = new StreamController.broadcast();
+ channel = new StreamChannel.withGuarantees(
+ streamController.stream, sinkController.sink);
+ });
+
+ test("buffers events", () async {
+ streamController.add(1);
+ streamController.add(2);
+ streamController.add(3);
+ await pumpEventQueue();
+
+ expect(channel.stream.toList(), completion(equals([1, 2, 3])));
+ streamController.close();
+ });
+
+ test("only allows a single subscription", () {
+ channel.stream.listen(null);
+ expect(() => channel.stream.listen(null), throwsStateError);
+ });
+ });
+
+ test("closing the event sink causes the stream to close before it emits any "
+ "more events", () {
+ streamController.add(1);
+ streamController.add(2);
+ streamController.add(3);
+
+ expect(channel.stream.listen(expectAsync((event) {
+ if (event == 2) channel.sink.close();
+ }, count: 2)).asFuture(), completes);
+ });
+
+ test("after the stream closes, the sink ignores events", () async {
+ streamController.close();
+
+ // Wait for the done event to be delivered.
+ await channel.stream.toList();
+ channel.sink.add(1);
+ channel.sink.add(2);
+ channel.sink.add(3);
+ channel.sink.close();
+
+ // None of our channel.sink additions should make it to the other endpoint.
+ sinkController.stream.listen(
+ expectAsync((_) {}, count: 0),
+ onDone: expectAsync(() {}, count: 0));
+ await pumpEventQueue();
+ });
+
+ test("canceling the stream's subscription has no effect on the sink",
+ () async {
+ channel.stream.listen(null).cancel();
+ await pumpEventQueue();
+
+ channel.sink.add(1);
+ channel.sink.add(2);
+ channel.sink.add(3);
+ channel.sink.close();
+ expect(sinkController.stream.toList(), completion(equals([1, 2, 3])));
+ });
+
+ test("canceling the stream's subscription doesn't stop a done event",
+ () async {
+ channel.stream.listen(null).cancel();
+ await pumpEventQueue();
+
+ streamController.close();
+ await pumpEventQueue();
+
+ channel.sink.add(1);
+ channel.sink.add(2);
+ channel.sink.add(3);
+ channel.sink.close();
+
+ // The sink should be ignoring events because the stream closed.
+ sinkController.stream.listen(
+ expectAsync((_) {}, count: 0),
+ onDone: expectAsync(() {}, count: 0));
+ await pumpEventQueue();
+ });
+
+ test("forwards errors to the other endpoint", () {
+ channel.sink.addError("error");
+ expect(sinkController.stream.first, throwsA("error"));
+ });
+}