Fix a race condition (#16)
MultiChannel wasn't buffering incoming remote events if their virtual
channels hadn't been created locally yet.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ece9e1d..093a242 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,9 @@
+## 1.6.4
+
+* Fix a race condition in `MultiChannel` where messages from a remote virtual
+ channel could get dropped if the corresponding local channel wasn't registered
+ quickly enough.
+
## 1.6.3
* Use `pumpEventQueue()` from test.
diff --git a/lib/src/multi_channel.dart b/lib/src/multi_channel.dart
index 112997e..e541543 100644
--- a/lib/src/multi_channel.dart
+++ b/lib/src/multi_channel.dart
@@ -93,10 +93,18 @@
/// The controller for this channel.
final _mainController = new StreamChannelController(sync: true);
- /// A map from virtual channel ids to [StreamChannelController]s that should
- /// be used to communicate over those channels.
+ /// A map from input IDs to [StreamChannelController]s that should be used to
+ /// communicate over those channels.
final _controllers = <int, StreamChannelController>{};
+ /// Input IDs of controllers in [_controllers] that we've received messages
+ /// for but that have not yet had a local [virtualChannel] created.
+ final _pendingIds = new Set<int>();
+
+ /// Input IDs of virtual channels that used to exist but have since been
+ /// closed.
+ final _closedIds = new Set<int>();
+
/// The next id to use for a local virtual channel.
///
/// Ids are used to identify virtual channels. Each message is tagged with an
@@ -114,8 +122,9 @@
/// The trick is that each endpoint only uses odd ids for its own channels.
/// When sending a message over a channel that was created by the remote
/// endpoint, the channel's id plus one is used. This way each [MultiChannel]
- /// knows that if an incoming message has an odd id, it's using the local id
- /// scheme, but if it has an even id, it's using the remote id scheme.
+ /// knows that if an incoming message has an odd id, it's coming from a
+ /// channel that was originally created remotely, but if it has an even id,
+ /// it's coming from a channel that was originally created locally.
var _nextId = 1;
_MultiChannel(this._inner) {
@@ -128,21 +137,28 @@
_innerStreamSubscription = _inner.stream.listen((message) {
var id = message[0];
- var controller = _controllers[id];
- // A controller might not exist if the channel was closed before an
- // incoming message was processed.
- if (controller == null) return;
+ // If the channel was closed before an incoming message was processed,
+ // ignore that message.
+ if (_closedIds.contains(id)) return;
+
+ var controller = _controllers.putIfAbsent(id, () {
+ // If we receive a message for a controller that doesn't have a local
+ // counterpart yet, create a controller for it to buffer incoming
+ // messages for when a local connection is created.
+ _pendingIds.add(id);
+ return new StreamChannelController(sync: true);
+ });
+
if (message.length > 1) {
controller.local.sink.add(message[1]);
- return;
+ } else {
+ // A message without data indicates that the channel has been closed. We
+ // can just close the sink here without doing any more cleanup, because
+ // the sink closing will cause the stream to emit a done event which
+ // will trigger more cleanup.
+ controller.local.sink.close();
}
-
- // A message without data indicates that the channel has been closed. We
- // can only close the sink here without doing any more cleanup, because
- // the sink closing will cause the stream to emit a done event which will
- // trigger more cleanup.
- controller.local.sink.close();
},
onDone: _closeInnerChannel,
onError: _mainController.local.sink.addError);
@@ -173,16 +189,22 @@
this, inputId, new Stream.empty(), new NullStreamSink());
}
- if (_controllers.containsKey(inputId)) {
+ StreamChannelController controller;
+ if (_pendingIds.remove(inputId)) {
+ // If we've already received messages for this channel, use the controller
+ // where those messages are buffered.
+ controller = _controllers[inputId];
+ } else if (_controllers.containsKey(inputId) ||
+ _closedIds.contains(inputId)) {
throw new ArgumentError("A virtual channel with id $id already exists.");
+ } else {
+ controller = new StreamChannelController(sync: true);
+ _controllers[inputId] = controller;
}
- var controller = new StreamChannelController(sync: true);
- _controllers[inputId] = controller;
controller.local.stream.listen(
(message) => _inner.sink.add([outputId, message]),
onDone: () => _closeChannel(inputId, outputId));
-
return new VirtualChannel._(
this, outputId, controller.foreign.stream, controller.foreign.sink);
}
@@ -190,6 +212,7 @@
/// Closes the virtual channel for which incoming messages have [inputId] and
/// outgoing messages have [outputId].
void _closeChannel(int inputId, int outputId) {
+ _closedIds.add(inputId);
var controller = _controllers.remove(inputId);
controller.local.sink.close();
diff --git a/pubspec.yaml b/pubspec.yaml
index 9a123b1..393e60f 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: stream_channel
-version: 1.6.3
+version: 1.6.4
description: An abstraction for two-way communication channels.
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/stream_channel
diff --git a/test/multi_channel_test.dart b/test/multi_channel_test.dart
index 2a87caf..0c9316a 100644
--- a/test/multi_channel_test.dart
+++ b/test/multi_channel_test.dart
@@ -268,6 +268,32 @@
test("doesn't allow another virtual channel with the same id", () {
expect(() => channel2.virtualChannel(virtual1.id), throwsArgumentError);
});
+
+ test("dispatches events received before the virtual channel is created",
+ () async {
+ virtual1 = channel1.virtualChannel();
+
+ virtual1.sink.add("hello");
+ await pumpEventQueue();
+
+ virtual1.sink.add("world");
+ await pumpEventQueue();
+
+ expect(channel2.virtualChannel(virtual1.id).stream,
+ emitsInOrder(["hello", "world"]));
+ });
+
+ test(
+ "dispatches close events received before the virtual channel is "
+ "created", () async {
+ virtual1 = channel1.virtualChannel();
+
+ virtual1.sink.close();
+ await pumpEventQueue();
+
+ expect(channel2.virtualChannel(virtual1.id).stream.toList(),
+ completion(isEmpty));
+ });
});
group("when the underlying stream", () {