Avoid an open Peer with a closed Client (#65)
May fix https://github.com/dart-lang/sdk/issues/43012
If a `Peer` is created with a `StreamChannel` that does not follow the
stated contract it's possible that the `sink` gets closed without
receiving a done event from the `channel` which leaves the `Peer`
instance in a state that's inconsistent with the underlying `Client`.
The result is that it's possible to get a bad state trying to send a
message even with `isClosed` returns `false`.
- Make `isClosed` and `done` forward to the `_client` and `_peer` fields
so that they can't be inconsistent.
- Forward errors to the `_server` so that it can forward them through
`done` without an extra `Completer` to manage.
- Avoid closing the `sink` in the `Peer`. It will end up being closed by
the server when it is handling the error, and it's the same `sink`
instance in both places.
- Add a test that ensures that `isClosed` behaves as expected following
a call to `close()` even when the `StreamChannel` does not follow it's
contract.
diff --git a/lib/src/peer.dart b/lib/src/peer.dart
index 31e8936..4cf6aae 100644
--- a/lib/src/peer.dart
+++ b/lib/src/peer.dart
@@ -35,11 +35,11 @@
/// they're responses.
final _clientIncomingForwarder = StreamController(sync: true);
- final _done = Completer<void>();
+ Future<void> _done;
@override
- Future get done => _done.future;
+ Future get done => _done ??= Future.wait([_client.done, _server.done]);
@override
- bool get isClosed => _done.isCompleted;
+ bool get isClosed => _client.isClosed || _server.isClosed;
@override
ErrorCallback get onUnhandledError => _server?.onUnhandledError;
@@ -142,15 +142,15 @@
_serverIncomingForwarder.add(message);
}
}, onError: (error, stackTrace) {
- _done.completeError(error, stackTrace);
- _channel.sink.close();
- }, onDone: () {
- if (!_done.isCompleted) _done.complete();
- close();
- });
+ _serverIncomingForwarder.addError(error, stackTrace);
+ }, onDone: close);
return done;
}
@override
- Future close() => Future.wait([_client.close(), _server.close()]);
+ Future close() {
+ _client.close();
+ _server.close();
+ return done;
+ }
}
diff --git a/test/peer_test.dart b/test/peer_test.dart
index 7284330..e209763 100644
--- a/test/peer_test.dart
+++ b/test/peer_test.dart
@@ -113,6 +113,21 @@
await peer.close();
});
+ test('considered closed with misbehaving StreamChannel', () async {
+ // If a StreamChannel does not enforce the guarantees stated in it's
+ // contract - specifically that "Closing the sink causes the stream to close
+ // before it emits any more events." - The `Peer` should still understand
+ // when it has been closed manually.
+ var channel = StreamChannel(
+ StreamController().stream,
+ StreamController(),
+ );
+ var peer = json_rpc.Peer.withoutJson(channel);
+ unawaited(peer.listen());
+ unawaited(peer.close());
+ expect(peer.isClosed, true);
+ });
+
group('like a server,', () {
test('can receive a call and return a response', () {
expect(outgoing.first,