Fix a race condition (#16)

MultiChannel wasn't buffering incoming remote events if their virtual
channels hadn't been created locally yet.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ece9e1d..093a242 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,9 @@
+## 1.6.4
+
+* Fix a race condition in `MultiChannel` where messages from a remote virtual
+  channel could get dropped if the corresponding local channel wasn't registered
+  quickly enough.
+
 ## 1.6.3
 
 * Use `pumpEventQueue()` from test.
diff --git a/lib/src/multi_channel.dart b/lib/src/multi_channel.dart
index 112997e..e541543 100644
--- a/lib/src/multi_channel.dart
+++ b/lib/src/multi_channel.dart
@@ -93,10 +93,18 @@
   /// The controller for this channel.
   final _mainController = new StreamChannelController(sync: true);
 
-  /// A map from virtual channel ids to [StreamChannelController]s that should
-  /// be used to communicate over those channels.
+  /// A map from input IDs to [StreamChannelController]s that should be used to
+  /// communicate over those channels.
   final _controllers = <int, StreamChannelController>{};
 
+  /// Input IDs of controllers in [_controllers] that we've received messages
+  /// for but that have not yet had a local [virtualChannel] created.
+  final _pendingIds = new Set<int>();
+
+  /// Input IDs of virtual channels that used to exist but have since been
+  /// closed.
+  final _closedIds = new Set<int>();
+
   /// The next id to use for a local virtual channel.
   ///
   /// Ids are used to identify virtual channels. Each message is tagged with an
@@ -114,8 +122,9 @@
   /// The trick is that each endpoint only uses odd ids for its own channels.
   /// When sending a message over a channel that was created by the remote
   /// endpoint, the channel's id plus one is used. This way each [MultiChannel]
-  /// knows that if an incoming message has an odd id, it's using the local id
-  /// scheme, but if it has an even id, it's using the remote id scheme.
+  /// knows that if an incoming message has an odd id, it's coming from a
+  /// channel that was originally created remotely, but if it has an even id,
+  /// it's coming from a channel that was originally created locally.
   var _nextId = 1;
 
   _MultiChannel(this._inner) {
@@ -128,21 +137,28 @@
 
     _innerStreamSubscription = _inner.stream.listen((message) {
       var id = message[0];
-      var controller = _controllers[id];
 
-      // A controller might not exist if the channel was closed before an
-      // incoming message was processed.
-      if (controller == null) return;
+      // If the channel was closed before an incoming message was processed,
+      // ignore that message.
+      if (_closedIds.contains(id)) return;
+
+      var controller = _controllers.putIfAbsent(id, () {
+        // If we receive a message for a controller that doesn't have a local
+        // counterpart yet, create a controller for it to buffer incoming
+        // messages for when a local connection is created.
+        _pendingIds.add(id);
+        return new StreamChannelController(sync: true);
+      });
+
       if (message.length > 1) {
         controller.local.sink.add(message[1]);
-        return;
+      } else {
+        // A message without data indicates that the channel has been closed. We
+        // can just close the sink here without doing any more cleanup, because
+        // the sink closing will cause the stream to emit a done event which
+        // will trigger more cleanup.
+        controller.local.sink.close();
       }
-
-      // A message without data indicates that the channel has been closed. We
-      // can only close the sink here without doing any more cleanup, because
-      // the sink closing will cause the stream to emit a done event which will
-      // trigger more cleanup.
-      controller.local.sink.close();
     },
         onDone: _closeInnerChannel,
         onError: _mainController.local.sink.addError);
@@ -173,16 +189,22 @@
           this, inputId, new Stream.empty(), new NullStreamSink());
     }
 
-    if (_controllers.containsKey(inputId)) {
+    StreamChannelController controller;
+    if (_pendingIds.remove(inputId)) {
+      // If we've already received messages for this channel, use the controller
+      // where those messages are buffered.
+      controller = _controllers[inputId];
+    } else if (_controllers.containsKey(inputId) ||
+        _closedIds.contains(inputId)) {
       throw new ArgumentError("A virtual channel with id $id already exists.");
+    } else {
+      controller = new StreamChannelController(sync: true);
+      _controllers[inputId] = controller;
     }
 
-    var controller = new StreamChannelController(sync: true);
-    _controllers[inputId] = controller;
     controller.local.stream.listen(
         (message) => _inner.sink.add([outputId, message]),
         onDone: () => _closeChannel(inputId, outputId));
-
     return new VirtualChannel._(
         this, outputId, controller.foreign.stream, controller.foreign.sink);
   }
@@ -190,6 +212,7 @@
   /// Closes the virtual channel for which incoming messages have [inputId] and
   /// outgoing messages have [outputId].
   void _closeChannel(int inputId, int outputId) {
+    _closedIds.add(inputId);
     var controller = _controllers.remove(inputId);
     controller.local.sink.close();
 
diff --git a/pubspec.yaml b/pubspec.yaml
index 9a123b1..393e60f 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: stream_channel
-version: 1.6.3
+version: 1.6.4
 description: An abstraction for two-way communication channels.
 author: Dart Team <misc@dartlang.org>
 homepage: https://github.com/dart-lang/stream_channel
diff --git a/test/multi_channel_test.dart b/test/multi_channel_test.dart
index 2a87caf..0c9316a 100644
--- a/test/multi_channel_test.dart
+++ b/test/multi_channel_test.dart
@@ -268,6 +268,32 @@
     test("doesn't allow another virtual channel with the same id", () {
       expect(() => channel2.virtualChannel(virtual1.id), throwsArgumentError);
     });
+
+    test("dispatches events received before the virtual channel is created",
+        () async {
+      virtual1 = channel1.virtualChannel();
+
+      virtual1.sink.add("hello");
+      await pumpEventQueue();
+
+      virtual1.sink.add("world");
+      await pumpEventQueue();
+
+      expect(channel2.virtualChannel(virtual1.id).stream,
+          emitsInOrder(["hello", "world"]));
+    });
+
+    test(
+        "dispatches close events received before the virtual channel is "
+        "created", () async {
+      virtual1 = channel1.virtualChannel();
+
+      virtual1.sink.close();
+      await pumpEventQueue();
+
+      expect(channel2.virtualChannel(virtual1.id).stream.toList(),
+          completion(isEmpty));
+    });
   });
 
   group("when the underlying stream", () {