Fix a race condition in IsolateChannel.connectReceive() (#92)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8932188..c78f64f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,7 +1,9 @@
-## 2.1.2-dev
+## 2.1.2
* Require Dart 2.19
* Add an example.
+* Fix a race condition in `IsolateChannel.connectReceive()` where the channel
+ could hang forever if its sink was closed before the connection was established.
## 2.1.1
diff --git a/lib/src/isolate_channel.dart b/lib/src/isolate_channel.dart
index 55c9814..3fbd46d 100644
--- a/lib/src/isolate_channel.dart
+++ b/lib/src/isolate_channel.dart
@@ -43,16 +43,26 @@
factory IsolateChannel.connectReceive(ReceivePort receivePort) {
// We can't use a [StreamChannelCompleter] here because we need the return
// value to be an [IsolateChannel].
+ var isCompleted = false;
var streamCompleter = StreamCompleter<T>();
var sinkCompleter = StreamSinkCompleter<T>();
- var channel =
- IsolateChannel<T>._(streamCompleter.stream, sinkCompleter.sink);
+
+ var channel = IsolateChannel<T>._(streamCompleter.stream, sinkCompleter.sink
+ .transform(StreamSinkTransformer.fromHandlers(handleDone: (sink) {
+ if (!isCompleted) {
+ receivePort.close();
+ streamCompleter.setSourceStream(Stream.empty());
+ sinkCompleter.setDestinationSink(NullStreamSink<T>());
+ }
+ sink.close();
+ })));
// The first message across the ReceivePort should be a SendPort pointing to
// the remote end. If it's not, we'll make the stream emit an error
// complaining.
late StreamSubscription<dynamic> subscription;
subscription = receivePort.listen((message) {
+ isCompleted = true;
if (message is SendPort) {
var controller =
StreamChannelController<T>(allowForeignErrors: false, sync: true);
diff --git a/pubspec.yaml b/pubspec.yaml
index 5eb57ae..0b4f62d 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: stream_channel
-version: 2.1.2-dev
+version: 2.1.2
description: >-
An abstraction for two-way communication channels based on the Dart Stream
class.
diff --git a/test/isolate_channel_test.dart b/test/isolate_channel_test.dart
index 1850664..10f1fe5 100644
--- a/test/isolate_channel_test.dart
+++ b/test/isolate_channel_test.dart
@@ -162,5 +162,13 @@
expect(connectedChannel.stream.toList(), throwsStateError);
expect(connectedChannel.sink.done, completes);
});
+
+ test('the receiving channel closes gracefully without a connection',
+ () async {
+ var connectedChannel = IsolateChannel.connectReceive(connectPort);
+ await connectedChannel.sink.close();
+ await expectLater(connectedChannel.stream.toList(), completion(isEmpty));
+ await expectLater(connectedChannel.sink.done, completes);
+ });
});
}