Add StreamChannel and MultiChannel.

These are exact copies of the classes and tests from the test package.

R=rnystrom@google.com, kevmoo@google.com

Review URL: https://codereview.chromium.org//1610443002 .
diff --git a/lib/src/multi_channel.dart b/lib/src/multi_channel.dart
new file mode 100644
index 0000000..e87deb3
--- /dev/null
+++ b/lib/src/multi_channel.dart
@@ -0,0 +1,244 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import '../stream_channel.dart';
+
+/// A class that multiplexes multiple virtual channels across a single
+/// underlying transport layer.
+///
+/// This should be connected to another [MultiChannel] on the other end of the
+/// underlying channel. It starts with a single default virtual channel,
+/// accessible via [stream] and [sink]. Additional virtual channels can be
+/// created with [virtualChannel].
+///
+/// When a virtual channel is created by one endpoint, the other must connect to
+/// it before messages may be sent through it. The first endpoint passes its
+/// [VirtualChannel.id] to the second, which then creates a channel from that id
+/// also using [virtualChannel]. For example:
+///
+/// ```dart
+/// // First endpoint
+/// var virtual = multiChannel.virtualChannel();
+/// multiChannel.sink.add({
+///   "channel": virtual.id
+/// });
+///
+/// // Second endpoint
+/// multiChannel.stream.listen((message) {
+///   var virtual = multiChannel.virtualChannel(message["channel"]);
+///   // ...
+/// });
+/// ```
+///
+/// Sending errors across a [MultiChannel] is not supported. Any errors from the
+/// underlying stream will be reported only via the default
+/// [MultiChannel.stream].
+///
+/// Each virtual channel may be closed individually. When all of them are
+/// closed, the underlying [StreamSink] is closed automatically.
+abstract class MultiChannel implements StreamChannel {
+  /// The default input stream.
+  ///
+  /// This connects to the remote [sink].
+  Stream get stream;
+
+  /// The default output stream.
+  ///
+  /// This connects to the remote [stream]. If this is closed, the remote
+  /// [stream] will close, but other virtual channels will remain open and new
+  /// virtual channels may be opened.
+  StreamSink get sink;
+
+  /// Creates a new [MultiChannel] that sends messages over [innerStream] and
+  /// [innerSink].
+  ///
+  /// The inner streams must take JSON-like objects.
+  factory MultiChannel(Stream innerStream, StreamSink innerSink) =>
+      new _MultiChannel(innerStream, innerSink);
+
+  /// Creates a new virtual channel.
+  ///
+  /// If [id] is not passed, this creates a virtual channel from scratch. Before
+  /// it's used, its [VirtualChannel.id] must be sent to the remote endpoint
+  /// where [virtualChannel] should be called with that id.
+  ///
+  /// If [id] is passed, this creates a virtual channel corresponding to the
+  /// channel with that id on the remote channel.
+  ///
+  /// Throws an [ArgumentError] if a virtual channel already exists for [id].
+  /// Throws a [StateError] if the underlying channel is closed.
+  VirtualChannel virtualChannel([id]);
+}
+
+/// The implementation of [MultiChannel].
+///
+/// This is private so that [VirtualChannel] can inherit from [MultiChannel]
+/// without having to implement all the private members.
+class _MultiChannel extends StreamChannelMixin implements MultiChannel {
+  /// The inner stream over which all communication is received.
+  ///
+  /// This will be `null` if the underlying communication channel is closed.
+  Stream _innerStream;
+
+  /// The inner sink over which all communication is sent.
+  ///
+  /// This will be `null` if the underlying communication channel is closed.
+  StreamSink _innerSink;
+
+  /// The subscription to [_innerStream].
+  StreamSubscription _innerStreamSubscription;
+
+  Stream get stream => _streamController.stream;
+  final _streamController = new StreamController(sync: true);
+
+  StreamSink get sink => _sinkController.sink;
+  final _sinkController = new StreamController(sync: true);
+
+  /// A map from virtual channel ids to [StreamController]s that should be used
+  /// to write messages received from those channels.
+  final _streamControllers = new Map<int, StreamController>();
+
+  /// A map from virtual channel ids to [StreamControllers]s that are used
+  /// to receive messages to write to those channels.
+  ///
+  /// Note that this uses the same keys as [_streamControllers].
+  final _sinkControllers = new Map<int, StreamController>();
+
+  /// The next id to use for a local virtual channel.
+  ///
+  /// Ids are used to identify virtual channels. Each message is tagged with an
+  /// id; the receiving [MultiChannel] uses this id to look up which
+  /// [VirtualChannel] the message should be dispatched to.
+  ///
+  /// The id scheme for virtual channels is somewhat complicated. This is
+  /// necessary to ensure that there are no conflicts even when both endpoints
+  /// have virtual channels with the same id; since both endpoints can send and
+  /// receive messages across each virtual channel, a naïve scheme would make it
+  /// impossible to tell whether a message was from a channel that originated in
+  /// the remote endpoint or a reply on a channel that originated in the local
+  /// endpoint.
+  ///
+  /// The trick is that each endpoint only uses odd ids for its own channels.
+  /// When sending a message over a channel that was created by the remote
+  /// endpoint, the channel's id plus one is used. This way each [MultiChannel]
+  /// knows that if an incoming message has an odd id, it's using the local id
+  /// scheme, but if it has an even id, it's using the remote id scheme.
+  var _nextId = 1;
+
+  _MultiChannel(this._innerStream, this._innerSink) {
+    // The default connection is a special case which has id 0 on both ends.
+    // This allows it to begin connected without having to send over an id.
+    _streamControllers[0] = _streamController;
+    _sinkControllers[0] = _sinkController;
+    _sinkController.stream.listen(
+        (message) => _innerSink.add([0, message]),
+        onDone: () => _closeChannel(0, 0));
+
+    _innerStreamSubscription = _innerStream.listen((message) {
+      var id = message[0];
+      var sink = _streamControllers[id];
+
+      // A sink might not exist if the channel was closed before an incoming
+      // message was processed.
+      if (sink == null) return;
+      if (message.length > 1) {
+        sink.add(message[1]);
+        return;
+      }
+
+      // A message without data indicates that the channel has been closed.
+      _sinkControllers[id].close();
+    }, onDone: _closeInnerChannel,
+        onError: _streamController.addError);
+  }
+
+  VirtualChannel virtualChannel([id]) {
+    if (_innerStream == null) {
+      throw new StateError("The underlying channel is closed.");
+    }
+
+    var inputId;
+    var outputId;
+    if (id != null) {
+      // Since the user is passing in an id, we're connected to a remote
+      // VirtualChannel. This means messages they send over this channel will
+      // have the original odd id, but our replies will have an even id.
+      inputId = id;
+      outputId = (id as int) + 1;
+    } else {
+      // Since we're generating an id, we originated this VirtualChannel. This
+      // means messages we send over this channel will have the original odd id,
+      // but the remote channel's replies will have an even id.
+      inputId = _nextId + 1;
+      outputId = _nextId;
+      _nextId += 2;
+    }
+
+    if (_streamControllers.containsKey(inputId)) {
+      throw new ArgumentError("A virtual channel with id $id already exists.");
+    }
+
+    var streamController = new StreamController(sync: true);
+    var sinkController = new StreamController(sync: true);
+    _streamControllers[inputId] = streamController;
+    _sinkControllers[inputId] = sinkController;
+    sinkController.stream.listen(
+        (message) => _innerSink.add([outputId, message]),
+        onDone: () => _closeChannel(inputId, outputId));
+
+    return new VirtualChannel._(
+        this, outputId, streamController.stream, sinkController.sink);
+  }
+
+  /// Closes the virtual channel for which incoming messages have [inputId] and
+  /// outgoing messages have [outputId].
+  void _closeChannel(int inputId, int outputId) {
+    _streamControllers.remove(inputId).close();
+    _sinkControllers.remove(inputId).close();
+
+    if (_innerSink == null) return;
+
+    // A message without data indicates that the virtual channel has been
+    // closed.
+    _innerSink.add([outputId]);
+    if (_streamControllers.isEmpty) _closeInnerChannel();
+  }
+
+  /// Closes the underlying communication channel.
+  void _closeInnerChannel() {
+    _innerSink.close();
+    _innerStreamSubscription.cancel();
+    _innerStream = null;
+    _innerSink = null;
+    for (var controller in _sinkControllers.values.toList()) {
+      controller.close();
+    }
+  }
+}
+
+/// A virtual channel created by [MultiChannel].
+///
+/// This implements [MultiChannel] for convenience.
+/// [VirtualChannel.virtualChannel] is semantically identical to the parent's
+/// [MultiChannel.virtualChannel].
+class VirtualChannel extends StreamChannelMixin implements MultiChannel {
+  /// The [MultiChannel] that created this.
+  final MultiChannel _parent;
+
+  /// The identifier for this channel.
+  ///
+  /// This can be sent across the [MultiChannel] to provide the remote endpoint
+  /// a means to connect to this channel. Nothing about this is guaranteed
+  /// except that it will be JSON-serializable.
+  final id;
+
+  final Stream stream;
+  final StreamSink sink;
+
+  VirtualChannel._(this._parent, this.id, this.stream, this.sink);
+
+  VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id);
+}
diff --git a/lib/stream_channel.dart b/lib/stream_channel.dart
new file mode 100644
index 0000000..c4e28a0
--- /dev/null
+++ b/lib/stream_channel.dart
@@ -0,0 +1,50 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+export 'src/multi_channel.dart';
+
+/// An abstract class representing a two-way communication channel.
+///
+/// Subclasses are strongly encouraged to mix in or extend [StreamChannelMixin]
+/// to get default implementations of the various instance methods. Adding new
+/// methods to this interface will not be considered a breaking change if
+/// implementations are also added to [StreamChannelMixin].
+abstract class StreamChannel<T> {
+  /// The stream that emits values from the other endpoint.
+  Stream<T> get stream;
+
+  /// The sink for sending values to the other endpoint.
+  StreamSink<T> get sink;
+
+  /// Creates a new [StreamChannel] that communicates over [stream] and [sink].
+  factory StreamChannel(Stream<T> stream, StreamSink<T> sink) =>
+      new _StreamChannel<T>(stream, sink);
+
+  /// Connects [this] to [other], so that any values emitted by either are sent
+  /// directly to the other.
+  void pipe(StreamChannel<T> other);
+}
+
+/// An implementation of [StreamChannel] that simply takes a stream and a sink
+/// as parameters.
+///
+/// This is distinct from [StreamChannel] so that it can use
+/// [StreamChannelMixin].
+class _StreamChannel<T> extends StreamChannelMixin<T> {
+  final Stream<T> stream;
+  final StreamSink<T> sink;
+
+  _StreamChannel(this.stream, this.sink);
+}
+
+/// A mixin that implements the instance methods of [StreamChannel] in terms of
+/// [stream] and [sink].
+abstract class StreamChannelMixin<T> implements StreamChannel<T> {
+  void pipe(StreamChannel<T> other) {
+    stream.pipe(other.sink);
+    other.stream.pipe(sink);
+  }
+}
diff --git a/test/multi_channel_test.dart b/test/multi_channel_test.dart
new file mode 100644
index 0000000..e459412
--- /dev/null
+++ b/test/multi_channel_test.dart
@@ -0,0 +1,310 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+  var oneToTwo;
+  var twoToOne;
+  var channel1;
+  var channel2;
+  setUp(() {
+    oneToTwo = new StreamController();
+    twoToOne = new StreamController();
+    channel1 = new MultiChannel(twoToOne.stream, oneToTwo.sink);
+    channel2 = new MultiChannel(oneToTwo.stream, twoToOne.sink);
+  });
+
+  group("the default virtual channel", () {
+    test("begins connected", () {
+      var first = true;
+      channel2.stream.listen(expectAsync((message) {
+        if (first) {
+          expect(message, equals("hello"));
+          first = false;
+        } else {
+          expect(message, equals("world"));
+        }
+      }, count: 2));
+
+      channel1.sink.add("hello");
+      channel1.sink.add("world");
+    });
+
+    test("closes the remote virtual channel when it closes", () {
+      expect(channel2.stream.toList(), completion(isEmpty));
+      expect(channel2.sink.done, completes);
+
+      channel1.sink.close();
+    });
+
+    test("closes the local virtual channel when it closes", () {
+      expect(channel1.stream.toList(), completion(isEmpty));
+      expect(channel1.sink.done, completes);
+
+      channel1.sink.close();
+    });
+
+    test("doesn't closes the local virtual channel when the stream "
+        "subscription is canceled", () {
+      channel1.sink.done.then(expectAsync((_) {}, count: 0));
+
+      channel1.stream.listen((_) {}).cancel();
+
+      // Ensure that there's enough time for the channel to close if it's going
+      // to.
+      return pumpEventQueue();
+    });
+
+    test("closes the underlying channel when it closes without any other "
+        "virtual channels", () {
+      expect(oneToTwo.done, completes);
+      expect(twoToOne.done, completes);
+
+      channel1.sink.close();
+    });
+
+    test("doesn't close the underlying channel when it closes with other "
+        "virtual channels", () {
+      oneToTwo.done.then(expectAsync((_) {}, count: 0));
+      twoToOne.done.then(expectAsync((_) {}, count: 0));
+
+      // Establish another virtual connection which should keep the underlying
+      // connection open.
+      channel2.virtualChannel(channel1.virtualChannel().id);
+      channel1.sink.close();
+
+      // Ensure that there's enough time for the underlying channel to complete
+      // if it's going to.
+      return pumpEventQueue();
+    });
+  });
+
+  group("a locally-created virtual channel", () {
+    var virtual1;
+    var virtual2;
+    setUp(() {
+      virtual1 = channel1.virtualChannel();
+      virtual2 = channel2.virtualChannel(virtual1.id);
+    });
+
+    test("sends messages only to the other virtual channel", () {
+      var first = true;
+      virtual2.stream.listen(expectAsync((message) {
+        if (first) {
+          expect(message, equals("hello"));
+          first = false;
+        } else {
+          expect(message, equals("world"));
+        }
+      }, count: 2));
+
+      // No other virtual channels should receive the message.
+      for (var i = 0; i < 10; i++) {
+        var virtual = channel2.virtualChannel(channel1.virtualChannel().id);
+        virtual.stream.listen(expectAsync((_) {}, count: 0));
+      }
+      channel2.stream.listen(expectAsync((_) {}, count: 0));
+
+      virtual1.sink.add("hello");
+      virtual1.sink.add("world");
+    });
+
+    test("closes the remote virtual channel when it closes", () {
+      expect(virtual2.stream.toList(), completion(isEmpty));
+      expect(virtual2.sink.done, completes);
+
+      virtual1.sink.close();
+    });
+
+    test("closes the local virtual channel when it closes", () {
+      expect(virtual1.stream.toList(), completion(isEmpty));
+      expect(virtual1.sink.done, completes);
+
+      virtual1.sink.close();
+    });
+
+    test("doesn't closes the local virtual channel when the stream "
+        "subscription is canceled", () {
+      virtual1.sink.done.then(expectAsync((_) {}, count: 0));
+      virtual1.stream.listen((_) {}).cancel();
+
+      // Ensure that there's enough time for the channel to close if it's going
+      // to.
+      return pumpEventQueue();
+    });
+
+    test("closes the underlying channel when it closes without any other "
+        "virtual channels", () async {
+      // First close the default channel so we can test the new channel as the
+      // last living virtual channel.
+      channel1.sink.close();
+
+      await channel2.stream.toList();
+      expect(oneToTwo.done, completes);
+      expect(twoToOne.done, completes);
+
+      virtual1.sink.close();
+    });
+
+    test("doesn't close the underlying channel when it closes with other "
+        "virtual channels", () {
+      oneToTwo.done.then(expectAsync((_) {}, count: 0));
+      twoToOne.done.then(expectAsync((_) {}, count: 0));
+
+      virtual1.sink.close();
+
+      // Ensure that there's enough time for the underlying channel to complete
+      // if it's going to.
+      return pumpEventQueue();
+    });
+
+    test("doesn't conflict with a remote virtual channel", () {
+      var virtual3 = channel2.virtualChannel();
+      var virtual4 = channel1.virtualChannel(virtual3.id);
+
+      // This is an implementation detail, but we assert it here to make sure
+      // we're properly testing two channels with the same id.
+      expect(virtual1.id, equals(virtual3.id));
+
+      virtual2.stream.listen(
+          expectAsync((message) => expect(message, equals("hello"))));
+      virtual4.stream.listen(
+          expectAsync((message) => expect(message, equals("goodbye"))));
+
+      virtual1.sink.add("hello");
+      virtual3.sink.add("goodbye");
+    });
+  });
+
+  group("a remotely-created virtual channel", () {
+    var virtual1;
+    var virtual2;
+    setUp(() {
+      virtual1 = channel1.virtualChannel();
+      virtual2 = channel2.virtualChannel(virtual1.id);
+    });
+
+    test("sends messages only to the other virtual channel", () {
+      var first = true;
+      virtual1.stream.listen(expectAsync((message) {
+        if (first) {
+          expect(message, equals("hello"));
+          first = false;
+        } else {
+          expect(message, equals("world"));
+        }
+      }, count: 2));
+
+      // No other virtual channels should receive the message.
+      for (var i = 0; i < 10; i++) {
+        var virtual = channel2.virtualChannel(channel1.virtualChannel().id);
+        virtual.stream.listen(expectAsync((_) {}, count: 0));
+      }
+      channel1.stream.listen(expectAsync((_) {}, count: 0));
+
+      virtual2.sink.add("hello");
+      virtual2.sink.add("world");
+    });
+
+    test("closes the remote virtual channel when it closes", () {
+      expect(virtual1.stream.toList(), completion(isEmpty));
+      expect(virtual1.sink.done, completes);
+
+      virtual2.sink.close();
+    });
+
+    test("closes the local virtual channel when it closes", () {
+      expect(virtual2.stream.toList(), completion(isEmpty));
+      expect(virtual2.sink.done, completes);
+
+      virtual2.sink.close();
+    });
+
+    test("doesn't closes the local virtual channel when the stream "
+        "subscription is canceled", () {
+      virtual2.sink.done.then(expectAsync((_) {}, count: 0));
+      virtual2.stream.listen((_) {}).cancel();
+
+      // Ensure that there's enough time for the channel to close if it's going
+      // to.
+      return pumpEventQueue();
+    });
+
+    test("closes the underlying channel when it closes without any other "
+        "virtual channels", () async {
+      // First close the default channel so we can test the new channel as the
+      // last living virtual channel.
+      channel2.sink.close();
+
+      await channel1.stream.toList();
+      expect(oneToTwo.done, completes);
+      expect(twoToOne.done, completes);
+
+      virtual2.sink.close();
+    });
+
+    test("doesn't close the underlying channel when it closes with other "
+        "virtual channels", () {
+      oneToTwo.done.then(expectAsync((_) {}, count: 0));
+      twoToOne.done.then(expectAsync((_) {}, count: 0));
+
+      virtual2.sink.close();
+
+      // Ensure that there's enough time for the underlying channel to complete
+      // if it's going to.
+      return pumpEventQueue();
+    });
+
+    test("doesn't allow another virtual channel with the same id", () {
+      expect(() => channel2.virtualChannel(virtual1.id),
+          throwsArgumentError);
+    });
+  });
+
+  group("when the underlying stream", () {
+    var virtual1;
+    var virtual2;
+    setUp(() {
+      virtual1 = channel1.virtualChannel();
+      virtual2 = channel2.virtualChannel(virtual1.id);
+    });
+
+    test("closes, all virtual channels close", () {
+      expect(channel1.stream.toList(), completion(isEmpty));
+      expect(channel1.sink.done, completes);
+      expect(channel2.stream.toList(), completion(isEmpty));
+      expect(channel2.sink.done, completes);
+      expect(virtual1.stream.toList(), completion(isEmpty));
+      expect(virtual1.sink.done, completes);
+      expect(virtual2.stream.toList(), completion(isEmpty));
+      expect(virtual2.sink.done, completes);
+
+      oneToTwo.close();
+    });
+
+    test("closes, no more virtual channels may be created", () {
+      expect(channel1.sink.done.then((_) => channel1.virtualChannel()),
+          throwsStateError);
+      expect(channel2.sink.done.then((_) => channel2.virtualChannel()),
+          throwsStateError);
+
+      oneToTwo.close();
+    });
+
+    test("emits an error, the error is sent only to the default channel", () {
+      channel1.stream.listen(expectAsync((_) {}, count: 0),
+          onError: expectAsync((error) => expect(error, equals("oh no"))));
+      virtual1.stream.listen(expectAsync((_) {}, count: 0),
+          onError: expectAsync((_) {}, count: 0));
+
+      twoToOne.addError("oh no");
+    });
+  });
+}
diff --git a/test/utils.dart b/test/utils.dart
new file mode 100644
index 0000000..130a3e1
--- /dev/null
+++ b/test/utils.dart
@@ -0,0 +1,20 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+/// Returns a [Future] that completes after pumping the event queue [times]
+/// times.
+///
+/// By default, this should pump the event queue enough times to allow any code
+/// to run, as long as it's not waiting on some external event.
+Future pumpEventQueue([int times=20]) {
+  if (times == 0) return new Future.value();
+  // Use [new Future] future to allow microtask events to finish. The [new
+  // Future.value] constructor uses scheduleMicrotask itself and would therefore
+  // not wait for microtask callbacks that are scheduled after invoking this
+  // method.
+  return new Future(() => pumpEventQueue(times - 1));
+}
+