Avoid an open Peer with a closed Client (#65)

May fix https://github.com/dart-lang/sdk/issues/43012

If a `Peer` is created with a `StreamChannel` that does not follow the
stated contract it's possible that the `sink` gets closed without
receiving a done event from the `channel` which leaves the `Peer`
instance in a state that's inconsistent with the underlying `Client`.
The result is that it's possible to get a bad state trying to send a
message even with `isClosed` returns `false`.

- Make `isClosed` and `done` forward to the `_client` and `_peer` fields
  so that they can't be inconsistent.
- Forward errors to the `_server` so that it can forward them through
  `done` without an extra `Completer` to manage.
- Avoid closing the `sink` in the `Peer`. It will end up being closed by
  the server when it is handling the error, and it's the same `sink`
  instance in both places.
- Add a test that ensures that `isClosed` behaves as expected following
  a call to `close()` even when the `StreamChannel` does not follow it's
  contract.
diff --git a/lib/src/peer.dart b/lib/src/peer.dart
index 31e8936..4cf6aae 100644
--- a/lib/src/peer.dart
+++ b/lib/src/peer.dart
@@ -35,11 +35,11 @@
   /// they're responses.
   final _clientIncomingForwarder = StreamController(sync: true);
 
-  final _done = Completer<void>();
+  Future<void> _done;
   @override
-  Future get done => _done.future;
+  Future get done => _done ??= Future.wait([_client.done, _server.done]);
   @override
-  bool get isClosed => _done.isCompleted;
+  bool get isClosed => _client.isClosed || _server.isClosed;
 
   @override
   ErrorCallback get onUnhandledError => _server?.onUnhandledError;
@@ -142,15 +142,15 @@
         _serverIncomingForwarder.add(message);
       }
     }, onError: (error, stackTrace) {
-      _done.completeError(error, stackTrace);
-      _channel.sink.close();
-    }, onDone: () {
-      if (!_done.isCompleted) _done.complete();
-      close();
-    });
+      _serverIncomingForwarder.addError(error, stackTrace);
+    }, onDone: close);
     return done;
   }
 
   @override
-  Future close() => Future.wait([_client.close(), _server.close()]);
+  Future close() {
+    _client.close();
+    _server.close();
+    return done;
+  }
 }
diff --git a/test/peer_test.dart b/test/peer_test.dart
index 7284330..e209763 100644
--- a/test/peer_test.dart
+++ b/test/peer_test.dart
@@ -113,6 +113,21 @@
     await peer.close();
   });
 
+  test('considered closed with misbehaving StreamChannel', () async {
+    // If a StreamChannel does not enforce the guarantees stated in it's
+    // contract - specifically that "Closing the sink causes the stream to close
+    // before it emits any more events." - The `Peer` should still understand
+    // when it has been closed manually.
+    var channel = StreamChannel(
+      StreamController().stream,
+      StreamController(),
+    );
+    var peer = json_rpc.Peer.withoutJson(channel);
+    unawaited(peer.listen());
+    unawaited(peer.close());
+    expect(peer.isClosed, true);
+  });
+
   group('like a server,', () {
     test('can receive a call and return a response', () {
       expect(outgoing.first,