Add a StreamChannelCompleter class.

This is by analogy with StreamCompleter and StreamSinkCompleter in the
async package, and is in fact implemented using them.

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//1631103002 .
diff --git a/lib/src/stream_channel_completer.dart b/lib/src/stream_channel_completer.dart
new file mode 100644
index 0000000..d15adcf
--- /dev/null
+++ b/lib/src/stream_channel_completer.dart
@@ -0,0 +1,77 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+
+import '../stream_channel.dart';
+
+/// A [channel] where the source and destination are provided later.
+///
+/// The [channel] is a normal channel that can be listened to and that events
+/// can be added to immediately, but until [setChannel] is called it won't emit
+/// any events and all events added to it will be buffered.
+class StreamChannelCompleter<T> {
+  /// The completer for this channel's stream.
+  final _streamCompleter = new StreamCompleter<T>();
+
+  /// The completer for this channel's sink.
+  final _sinkCompleter = new StreamSinkCompleter<T>();
+
+  /// The channel for this completer.
+  StreamChannel<T> get channel => _channel;
+  StreamChannel<T> _channel;
+
+  /// Whether [setChannel] has been called.
+  bool _set = false;
+
+  /// Convert a `Future<StreamChannel>` to a `StreamChannel`.
+  ///
+  /// This creates a channel using a channel completer, and sets the source
+  /// channel to the result of the future when the future completes.
+  ///
+  /// If the future completes with an error, the returned channel's stream will
+  /// instead contain just that error. The sink will silently discard all
+  /// events.
+  static StreamChannel fromFuture(Future<StreamChannel> channelFuture) {
+    var completer = new StreamChannelCompleter();
+    channelFuture.then(completer.setChannel, onError: completer.setError);
+    return completer.channel;
+  }
+
+  StreamChannelCompleter() {
+    _channel = new StreamChannel<T>(
+        _streamCompleter.stream, _sinkCompleter.sink);
+  }
+
+  /// Set a channel as the source and destination for [channel].
+  ///
+  /// A channel may be set at most once.
+  ///
+  /// Either [setChannel] or [setError] may be called at most once. Trying to
+  /// call either of them again will fail.
+  void setChannel(StreamChannel<T> channel) {
+    if (_set) throw new StateError("The channel has already been set.");
+    _set = true;
+
+    _streamCompleter.setSourceStream(channel.stream);
+    _sinkCompleter.setDestinationSink(channel.sink);
+  }
+
+  /// Indicates that there was an error connecting the channel.
+  ///
+  /// This makes the stream emit [error] and close. It makes the sink discard
+  /// all its events.
+  ///
+  /// Either [setChannel] or [setError] may be called at most once. Trying to
+  /// call either of them again will fail.
+  void setError(error, [StackTrace stackTrace]) {
+    if (_set) throw new StateError("The channel has already been set.");
+    _set = true;
+
+    _streamCompleter.setError(error, stackTrace);
+    _sinkCompleter.setDestinationSink(new NullStreamSink());
+  }
+}
diff --git a/lib/stream_channel.dart b/lib/stream_channel.dart
index fd7b093..4b3c659 100644
--- a/lib/stream_channel.dart
+++ b/lib/stream_channel.dart
@@ -6,6 +6,7 @@
 
 export 'src/delegating_stream_channel.dart';
 export 'src/multi_channel.dart';
+export 'src/stream_channel_completer.dart';
 
 /// An abstract class representing a two-way communication channel.
 ///
diff --git a/pubspec.yaml b/pubspec.yaml
index 76f22b7..a0d8d22 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -5,5 +5,7 @@
 homepage: https://github.com/dart-lang/stream_channel
 environment:
   sdk: '>=1.0.0 <2.0.0'
+dependencies:
+  async: '^1.8.0'
 dev_dependencies:
   test: '^0.12.0'
diff --git a/test/stream_channel_completer_test.dart b/test/stream_channel_completer_test.dart
new file mode 100644
index 0000000..1ee40a5
--- /dev/null
+++ b/test/stream_channel_completer_test.dart
@@ -0,0 +1,124 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+  var completer;
+  var streamController;
+  var sinkController;
+  var innerChannel;
+  setUp(() {
+    completer = new StreamChannelCompleter();
+    streamController = new StreamController();
+    sinkController = new StreamController();
+    innerChannel = new StreamChannel(
+        streamController.stream, sinkController.sink);
+  });
+
+  group("when a channel is set before accessing", () {
+    test("forwards events through the stream", () {
+      completer.setChannel(innerChannel);
+      expect(completer.channel.stream.toList(), completion(equals([1, 2, 3])));
+
+      streamController.add(1);
+      streamController.add(2);
+      streamController.add(3);
+      streamController.close();
+    });
+
+    test("forwards events through the sink", () {
+      completer.setChannel(innerChannel);
+      expect(sinkController.stream.toList(), completion(equals([1, 2, 3])));
+
+      completer.channel.sink.add(1);
+      completer.channel.sink.add(2);
+      completer.channel.sink.add(3);
+      completer.channel.sink.close();
+    });
+
+    test("forwards an error through the stream", () {
+      completer.setError("oh no");
+      expect(completer.channel.stream.first, throwsA("oh no"));
+    });
+
+    test("drops sink events", () {
+      completer.setError("oh no");
+      expect(completer.channel.sink.done, completes);
+      completer.channel.sink.add(1);
+      completer.channel.sink.addError("oh no");
+    });
+  });
+
+  group("when a channel is set after accessing", () {
+    test("forwards events through the stream", () async {
+      expect(completer.channel.stream.toList(), completion(equals([1, 2, 3])));
+      await pumpEventQueue();
+
+      completer.setChannel(innerChannel);
+      streamController.add(1);
+      streamController.add(2);
+      streamController.add(3);
+      streamController.close();
+    });
+
+    test("forwards events through the sink", () async {
+      completer.channel.sink.add(1);
+      completer.channel.sink.add(2);
+      completer.channel.sink.add(3);
+      completer.channel.sink.close();
+      await pumpEventQueue();
+
+      completer.setChannel(innerChannel);
+      expect(sinkController.stream.toList(), completion(equals([1, 2, 3])));
+    });
+
+    test("forwards an error through the stream", () async {
+      expect(completer.channel.stream.first, throwsA("oh no"));
+      await pumpEventQueue();
+
+      completer.setError("oh no");
+    });
+
+    test("drops sink events", () async {
+      expect(completer.channel.sink.done, completes);
+      completer.channel.sink.add(1);
+      completer.channel.sink.addError("oh no");
+      await pumpEventQueue();
+
+      completer.setError("oh no");
+    });
+  });
+
+  group("forFuture", () {
+    test("forwards a StreamChannel", () {
+      var channel = StreamChannelCompleter.fromFuture(
+          new Future.value(innerChannel));
+      channel.sink.add(1);
+      channel.sink.close();
+      streamController.sink.add(2);
+      streamController.sink.close();
+
+      expect(sinkController.stream.toList(), completion(equals([1])));
+      expect(channel.stream.toList(), completion(equals([2])));
+    });
+
+    test("forwards an error", () {
+      var channel = StreamChannelCompleter.fromFuture(
+          new Future.error("oh no"));
+      expect(channel.stream.toList(), throwsA("oh no"));
+    });
+  });
+
+  test("doesn't allow the channel to be set multiple times", () {
+    completer.setChannel(innerChannel);
+    expect(() => completer.setChannel(innerChannel), throwsStateError);
+    expect(() => completer.setChannel(innerChannel), throwsStateError);
+  });
+}