Fix `Peer` requests not terminating when the channel closes (#52)

The `listen()` method of `Peer` never propagates close events from its manager to the `client` field. This causes in-flight requests to never terminate as the clean up handler in `client.dart` is never called.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 11e0cc2..9e56723 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,7 @@
 ## 2.2.1-dev
 
+* Fix `Peer` requests not terminating when the underlying channel is closed.
+
 ## 2.2.0
 
 * Added `strictProtocolChecks` named parameter to `Server` and `Peer`
diff --git a/lib/src/peer.dart b/lib/src/peer.dart
index eeb7cd9..7f89dd2 100644
--- a/lib/src/peer.dart
+++ b/lib/src/peer.dart
@@ -142,7 +142,7 @@
         // server since it knows how to send error responses.
         _serverIncomingForwarder.add(message);
       }
-    });
+    }).whenComplete(close);
   }
 
   @override
diff --git a/test/peer_test.dart b/test/peer_test.dart
index 33184b0..a9c295a 100644
--- a/test/peer_test.dart
+++ b/test/peer_test.dart
@@ -5,6 +5,7 @@
 import 'dart:async';
 import 'dart:convert';
 
+import 'package:pedantic/pedantic.dart';
 import 'package:stream_channel/stream_channel.dart';
 import 'package:test/test.dart';
 
@@ -84,6 +85,21 @@
         expect(peer.sendRequest('w', {'x': 'y'}), completion(equals('z')));
       });
     });
+
+    test('requests terminates when the channel is closed', () async {
+      var incomingController = StreamController();
+      var channel = StreamChannel.withGuarantees(
+        incomingController.stream,
+        StreamController(),
+      );
+      var peer = json_rpc.Peer.withoutJson(channel);
+      unawaited(peer.listen());
+
+      var response = peer.sendRequest('foo');
+      await incomingController.close();
+
+      expect(response, throwsStateError);
+    });
   });
 
   group('like a server,', () {