Allow a type on MultiChannel (#26)

- Add generic type arguments to the classes and relevant fields.
- Explicitly indicate which arguments take a `StreamChannel<dynamic>`.
- Update the test for MultiChannel to use a consistent type.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index eafdb81..b284e5e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,7 @@
 ## 1.6.7
 
 * Update SDK version to 2.0.0-dev.17.0.
+* Add a type argument to `MultiChannel`.
 
 ## 1.6.6
 
diff --git a/lib/src/multi_channel.dart b/lib/src/multi_channel.dart
index e541543..e7e2584 100644
--- a/lib/src/multi_channel.dart
+++ b/lib/src/multi_channel.dart
@@ -41,24 +41,25 @@
 ///
 /// Each virtual channel may be closed individually. When all of them are
 /// closed, the underlying [StreamSink] is closed automatically.
-abstract class MultiChannel implements StreamChannel {
+abstract class MultiChannel<T> implements StreamChannel<T> {
   /// The default input stream.
   ///
   /// This connects to the remote [sink].
-  Stream get stream;
+  Stream<T> get stream;
 
   /// The default output stream.
   ///
   /// This connects to the remote [stream]. If this is closed, the remote
   /// [stream] will close, but other virtual channels will remain open and new
   /// virtual channels may be opened.
-  StreamSink get sink;
+  StreamSink<T> get sink;
 
   /// Creates a new [MultiChannel] that sends and receives messages over
   /// [inner].
   ///
   /// The inner channel must take JSON-like objects.
-  factory MultiChannel(StreamChannel inner) => new _MultiChannel(inner);
+  factory MultiChannel(StreamChannel<dynamic> inner) =>
+      new _MultiChannel<T>(inner);
 
   /// Creates a new virtual channel.
   ///
@@ -71,31 +72,32 @@
   ///
   /// Throws an [ArgumentError] if a virtual channel already exists for [id].
   /// Throws a [StateError] if the underlying channel is closed.
-  VirtualChannel virtualChannel([id]);
+  VirtualChannel<T> virtualChannel([id]);
 }
 
 /// The implementation of [MultiChannel].
 ///
 /// This is private so that [VirtualChannel] can inherit from [MultiChannel]
 /// without having to implement all the private members.
-class _MultiChannel extends StreamChannelMixin implements MultiChannel {
+class _MultiChannel<T> extends StreamChannelMixin<T>
+    implements MultiChannel<T> {
   /// The inner channel over which all communication is conducted.
   ///
   /// This will be `null` if the underlying communication channel is closed.
-  StreamChannel _inner;
+  StreamChannel<dynamic> _inner;
 
   /// The subscription to [_inner.stream].
-  StreamSubscription _innerStreamSubscription;
+  StreamSubscription<dynamic> _innerStreamSubscription;
 
-  Stream get stream => _mainController.foreign.stream;
-  StreamSink get sink => _mainController.foreign.sink;
+  Stream<T> get stream => _mainController.foreign.stream;
+  StreamSink<T> get sink => _mainController.foreign.sink;
 
   /// The controller for this channel.
-  final _mainController = new StreamChannelController(sync: true);
+  final _mainController = new StreamChannelController<T>(sync: true);
 
   /// A map from input IDs to [StreamChannelController]s that should be used to
   /// communicate over those channels.
-  final _controllers = <int, StreamChannelController>{};
+  final _controllers = <int, StreamChannelController<T>>{};
 
   /// Input IDs of controllers in [_controllers] that we've received messages
   /// for but that have not yet had a local [virtualChannel] created.
@@ -164,7 +166,7 @@
         onError: _mainController.local.sink.addError);
   }
 
-  VirtualChannel virtualChannel([id]) {
+  VirtualChannel<T> virtualChannel([id]) {
     var inputId;
     var outputId;
     if (id != null) {
@@ -189,7 +191,7 @@
           this, inputId, new Stream.empty(), new NullStreamSink());
     }
 
-    StreamChannelController controller;
+    StreamChannelController<T> controller;
     if (_pendingIds.remove(inputId)) {
       // If we've already received messages for this channel, use the controller
       // where those messages are buffered.
@@ -244,9 +246,10 @@
 /// This implements [MultiChannel] for convenience.
 /// [VirtualChannel.virtualChannel] is semantically identical to the parent's
 /// [MultiChannel.virtualChannel].
-class VirtualChannel extends StreamChannelMixin implements MultiChannel {
+class VirtualChannel<T> extends StreamChannelMixin<T>
+    implements MultiChannel<T> {
   /// The [MultiChannel] that created this.
-  final MultiChannel _parent;
+  final MultiChannel<T> _parent;
 
   /// The identifier for this channel.
   ///
@@ -255,10 +258,10 @@
   /// except that it will be JSON-serializable.
   final id;
 
-  final Stream stream;
-  final StreamSink sink;
+  final Stream<T> stream;
+  final StreamSink<T> sink;
 
   VirtualChannel._(this._parent, this.id, this.stream, this.sink);
 
-  VirtualChannel virtualChannel([id]) => _parent.virtualChannel(id);
+  VirtualChannel<T> virtualChannel([id]) => _parent.virtualChannel(id);
 }
diff --git a/pubspec.yaml b/pubspec.yaml
index 47e24c2..bbb29d8 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: stream_channel
-version: 1.6.7-dev
+version: 1.6.7
 description: An abstraction for two-way communication channels.
 author: Dart Team <misc@dartlang.org>
 homepage: https://github.com/dart-lang/stream_channel
diff --git a/test/multi_channel_test.dart b/test/multi_channel_test.dart
index 0c9316a..99a53c3 100644
--- a/test/multi_channel_test.dart
+++ b/test/multi_channel_test.dart
@@ -11,8 +11,8 @@
   var channel2;
   setUp(() {
     controller = new StreamChannelController();
-    channel1 = new MultiChannel(controller.local);
-    channel2 = new MultiChannel(controller.foreign);
+    channel1 = new MultiChannel<int>(controller.local);
+    channel2 = new MultiChannel<int>(controller.foreign);
   });
 
   group("the default virtual channel", () {
@@ -20,15 +20,15 @@
       var first = true;
       channel2.stream.listen(expectAsync1((message) {
         if (first) {
-          expect(message, equals("hello"));
+          expect(message, equals(1));
           first = false;
         } else {
-          expect(message, equals("world"));
+          expect(message, equals(2));
         }
       }, count: 2));
 
-      channel1.sink.add("hello");
-      channel1.sink.add("world");
+      channel1.sink.add(1);
+      channel1.sink.add(2);
     });
 
     test("closes the remote virtual channel when it closes", () {
@@ -95,10 +95,10 @@
       var first = true;
       virtual2.stream.listen(expectAsync1((message) {
         if (first) {
-          expect(message, equals("hello"));
+          expect(message, equals(1));
           first = false;
         } else {
-          expect(message, equals("world"));
+          expect(message, equals(2));
         }
       }, count: 2));
 
@@ -109,8 +109,8 @@
       }
       channel2.stream.listen(expectAsync1((_) {}, count: 0));
 
-      virtual1.sink.add("hello");
-      virtual1.sink.add("world");
+      virtual1.sink.add(1);
+      virtual1.sink.add(2);
     });
 
     test("closes the remote virtual channel when it closes", () {
@@ -174,12 +174,12 @@
       expect(virtual1.id, equals(virtual3.id));
 
       virtual2.stream
-          .listen(expectAsync1((message) => expect(message, equals("hello"))));
-      virtual4.stream.listen(
-          expectAsync1((message) => expect(message, equals("goodbye"))));
+          .listen(expectAsync1((message) => expect(message, equals(1))));
+      virtual4.stream
+          .listen(expectAsync1((message) => expect(message, equals(2))));
 
-      virtual1.sink.add("hello");
-      virtual3.sink.add("goodbye");
+      virtual1.sink.add(1);
+      virtual3.sink.add(2);
     });
   });
 
@@ -195,10 +195,10 @@
       var first = true;
       virtual1.stream.listen(expectAsync1((message) {
         if (first) {
-          expect(message, equals("hello"));
+          expect(message, equals(1));
           first = false;
         } else {
-          expect(message, equals("world"));
+          expect(message, equals(2));
         }
       }, count: 2));
 
@@ -209,8 +209,8 @@
       }
       channel1.stream.listen(expectAsync1((_) {}, count: 0));
 
-      virtual2.sink.add("hello");
-      virtual2.sink.add("world");
+      virtual2.sink.add(1);
+      virtual2.sink.add(2);
     });
 
     test("closes the remote virtual channel when it closes", () {
@@ -273,14 +273,13 @@
         () async {
       virtual1 = channel1.virtualChannel();
 
-      virtual1.sink.add("hello");
+      virtual1.sink.add(1);
       await pumpEventQueue();
 
-      virtual1.sink.add("world");
+      virtual1.sink.add(2);
       await pumpEventQueue();
 
-      expect(channel2.virtualChannel(virtual1.id).stream,
-          emitsInOrder(["hello", "world"]));
+      expect(channel2.virtualChannel(virtual1.id).stream, emitsInOrder([1, 2]));
     });
 
     test(