Add a StreamChannelCompleter class.
This is by analogy with StreamCompleter and StreamSinkCompleter in the
async package, and is in fact implemented using them.
R=rnystrom@google.com
Review URL: https://codereview.chromium.org//1631103002 .
diff --git a/lib/src/stream_channel_completer.dart b/lib/src/stream_channel_completer.dart
new file mode 100644
index 0000000..d15adcf
--- /dev/null
+++ b/lib/src/stream_channel_completer.dart
@@ -0,0 +1,77 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A [channel] where the source and destination are provided later.
+///
+/// The [channel] is a normal channel that can be listened to and that events
+/// can be added to immediately, but until [setChannel] is called it won't emit
+/// any events and all events added to it will be buffered.
+class StreamChannelCompleter<T> {
+ /// The completer for this channel's stream.
+ final _streamCompleter = new StreamCompleter<T>();
+
+ /// The completer for this channel's sink.
+ final _sinkCompleter = new StreamSinkCompleter<T>();
+
+ /// The channel for this completer.
+ StreamChannel<T> get channel => _channel;
+ StreamChannel<T> _channel;
+
+ /// Whether [setChannel] has been called.
+ bool _set = false;
+
+ /// Convert a `Future<StreamChannel>` to a `StreamChannel`.
+ ///
+ /// This creates a channel using a channel completer, and sets the source
+ /// channel to the result of the future when the future completes.
+ ///
+ /// If the future completes with an error, the returned channel's stream will
+ /// instead contain just that error. The sink will silently discard all
+ /// events.
+ static StreamChannel fromFuture(Future<StreamChannel> channelFuture) {
+ var completer = new StreamChannelCompleter();
+ channelFuture.then(completer.setChannel, onError: completer.setError);
+ return completer.channel;
+ }
+
+ StreamChannelCompleter() {
+ _channel = new StreamChannel<T>(
+ _streamCompleter.stream, _sinkCompleter.sink);
+ }
+
+ /// Set a channel as the source and destination for [channel].
+ ///
+ /// A channel may be set at most once.
+ ///
+ /// Either [setChannel] or [setError] may be called at most once. Trying to
+ /// call either of them again will fail.
+ void setChannel(StreamChannel<T> channel) {
+ if (_set) throw new StateError("The channel has already been set.");
+ _set = true;
+
+ _streamCompleter.setSourceStream(channel.stream);
+ _sinkCompleter.setDestinationSink(channel.sink);
+ }
+
+ /// Indicates that there was an error connecting the channel.
+ ///
+ /// This makes the stream emit [error] and close. It makes the sink discard
+ /// all its events.
+ ///
+ /// Either [setChannel] or [setError] may be called at most once. Trying to
+ /// call either of them again will fail.
+ void setError(error, [StackTrace stackTrace]) {
+ if (_set) throw new StateError("The channel has already been set.");
+ _set = true;
+
+ _streamCompleter.setError(error, stackTrace);
+ _sinkCompleter.setDestinationSink(new NullStreamSink());
+ }
+}
diff --git a/lib/stream_channel.dart b/lib/stream_channel.dart
index fd7b093..4b3c659 100644
--- a/lib/stream_channel.dart
+++ b/lib/stream_channel.dart
@@ -6,6 +6,7 @@
export 'src/delegating_stream_channel.dart';
export 'src/multi_channel.dart';
+export 'src/stream_channel_completer.dart';
/// An abstract class representing a two-way communication channel.
///
diff --git a/pubspec.yaml b/pubspec.yaml
index 76f22b7..a0d8d22 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -5,5 +5,7 @@
homepage: https://github.com/dart-lang/stream_channel
environment:
sdk: '>=1.0.0 <2.0.0'
+dependencies:
+ async: '^1.8.0'
dev_dependencies:
test: '^0.12.0'
diff --git a/test/stream_channel_completer_test.dart b/test/stream_channel_completer_test.dart
new file mode 100644
index 0000000..1ee40a5
--- /dev/null
+++ b/test/stream_channel_completer_test.dart
@@ -0,0 +1,124 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ var completer;
+ var streamController;
+ var sinkController;
+ var innerChannel;
+ setUp(() {
+ completer = new StreamChannelCompleter();
+ streamController = new StreamController();
+ sinkController = new StreamController();
+ innerChannel = new StreamChannel(
+ streamController.stream, sinkController.sink);
+ });
+
+ group("when a channel is set before accessing", () {
+ test("forwards events through the stream", () {
+ completer.setChannel(innerChannel);
+ expect(completer.channel.stream.toList(), completion(equals([1, 2, 3])));
+
+ streamController.add(1);
+ streamController.add(2);
+ streamController.add(3);
+ streamController.close();
+ });
+
+ test("forwards events through the sink", () {
+ completer.setChannel(innerChannel);
+ expect(sinkController.stream.toList(), completion(equals([1, 2, 3])));
+
+ completer.channel.sink.add(1);
+ completer.channel.sink.add(2);
+ completer.channel.sink.add(3);
+ completer.channel.sink.close();
+ });
+
+ test("forwards an error through the stream", () {
+ completer.setError("oh no");
+ expect(completer.channel.stream.first, throwsA("oh no"));
+ });
+
+ test("drops sink events", () {
+ completer.setError("oh no");
+ expect(completer.channel.sink.done, completes);
+ completer.channel.sink.add(1);
+ completer.channel.sink.addError("oh no");
+ });
+ });
+
+ group("when a channel is set after accessing", () {
+ test("forwards events through the stream", () async {
+ expect(completer.channel.stream.toList(), completion(equals([1, 2, 3])));
+ await pumpEventQueue();
+
+ completer.setChannel(innerChannel);
+ streamController.add(1);
+ streamController.add(2);
+ streamController.add(3);
+ streamController.close();
+ });
+
+ test("forwards events through the sink", () async {
+ completer.channel.sink.add(1);
+ completer.channel.sink.add(2);
+ completer.channel.sink.add(3);
+ completer.channel.sink.close();
+ await pumpEventQueue();
+
+ completer.setChannel(innerChannel);
+ expect(sinkController.stream.toList(), completion(equals([1, 2, 3])));
+ });
+
+ test("forwards an error through the stream", () async {
+ expect(completer.channel.stream.first, throwsA("oh no"));
+ await pumpEventQueue();
+
+ completer.setError("oh no");
+ });
+
+ test("drops sink events", () async {
+ expect(completer.channel.sink.done, completes);
+ completer.channel.sink.add(1);
+ completer.channel.sink.addError("oh no");
+ await pumpEventQueue();
+
+ completer.setError("oh no");
+ });
+ });
+
+ group("forFuture", () {
+ test("forwards a StreamChannel", () {
+ var channel = StreamChannelCompleter.fromFuture(
+ new Future.value(innerChannel));
+ channel.sink.add(1);
+ channel.sink.close();
+ streamController.sink.add(2);
+ streamController.sink.close();
+
+ expect(sinkController.stream.toList(), completion(equals([1])));
+ expect(channel.stream.toList(), completion(equals([2])));
+ });
+
+ test("forwards an error", () {
+ var channel = StreamChannelCompleter.fromFuture(
+ new Future.error("oh no"));
+ expect(channel.stream.toList(), throwsA("oh no"));
+ });
+ });
+
+ test("doesn't allow the channel to be set multiple times", () {
+ completer.setChannel(innerChannel);
+ expect(() => completer.setChannel(innerChannel), throwsStateError);
+ expect(() => completer.setChannel(innerChannel), throwsStateError);
+ });
+}