Don't leave dangling request futures. (#19)

Closes #18
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9375a9c..0b6fdd1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,12 @@
+## 2.0.4
+
+* `Client.sendRequest()` now throws a `StateError` if the client is closed while
+  the request is in-flight. This avoids dangling `Future`s that will never be
+  completed.
+
+* Both `Client.sendRequest()` and `Client.sendNotification()` now throw
+  `StateError`s if they're called after the client is closed.
+
 ## 2.0.3
 
 * Fix new strong-mode warnings.
diff --git a/lib/src/channel_manager.dart b/lib/src/channel_manager.dart
index 8f66870..8981d68 100644
--- a/lib/src/channel_manager.dart
+++ b/lib/src/channel_manager.dart
@@ -25,7 +25,7 @@
   ///
   /// This is the same future that's returned by [listen].
   Future get done => _doneCompleter.future;
-  final _doneCompleter = new Completer();
+  final _doneCompleter = new Completer.sync();
 
   /// Whether the underlying communication channel is closed.
   bool get isClosed => _doneCompleter.isCompleted;
diff --git a/lib/src/client.dart b/lib/src/client.dart
index 0568da0..7cd4282 100644
--- a/lib/src/client.dart
+++ b/lib/src/client.dart
@@ -61,7 +61,18 @@
   /// Note that the client won't begin listening to [responses] until
   /// [Client.listen] is called.
   Client.withoutJson(StreamChannel channel)
-      : _manager = new ChannelManager("Client", channel);
+      : _manager = new ChannelManager("Client", channel) {
+    _manager.done.whenComplete(() {
+      for (var request in _pendingRequests.values) {
+        request.completer.completeError(
+            new StateError("The client closed with pending requests."),
+            StackTrace.current);
+      }
+      _pendingRequests.clear();
+    }).catchError((_) {
+      // Avoid an unhandled error.
+    });
+  }
 
   /// Starts listening to the underlying stream.
   ///
@@ -87,6 +98,9 @@
   /// If the request succeeds, this returns the response result as a decoded
   /// JSON-serializable object. If it fails, it throws an [RpcException]
   /// describing the failure.
+  ///
+  /// Throws a [StateError] if the client is closed while the request is in
+  /// flight, or if the client is closed when this method is called.
   Future sendRequest(String method, [parameters]) {
     var id = _id++;
     _send(method, parameters, id);
@@ -106,6 +120,8 @@
   ///
   /// Since this is just a notification to which the server isn't expected to
   /// send a response, it has no return value.
+  ///
+  /// Throws a [StateError] if the client is closed when this method is called.
   void sendNotification(String method, [parameters]) =>
       _send(method, parameters);
 
@@ -119,6 +135,7 @@
       throw new ArgumentError('Only maps and lists may be used as JSON-RPC '
           'parameters, was "$parameters".');
     }
+    if (isClosed) throw new StateError("The client is closed.");
 
     var message = <String, dynamic>{
       "jsonrpc": "2.0",
diff --git a/pubspec.yaml b/pubspec.yaml
index c2b9f55..57a1e37 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: json_rpc_2
-version: 2.0.3
+version: 2.0.4
 author: Dart Team <misc@dartlang.org>
 description: An implementation of the JSON-RPC 2.0 spec.
 homepage: http://github.com/dart-lang/json_rpc_2
diff --git a/test/client/client_test.dart b/test/client/client_test.dart
index 5601daa..5147a0b 100644
--- a/test/client/client_test.dart
+++ b/test/client/client_test.dart
@@ -184,6 +184,12 @@
     })));
   });
 
+  test("requests throw StateErrors if the client is closed", () {
+    controller.client.close();
+    expect(() => controller.client.sendRequest("foo"), throwsStateError);
+    expect(() => controller.client.sendNotification("foo"), throwsStateError);
+  });
+
   test("ignores bogus responses", () {
     // Make a request so we have something to respond to.
     controller.expectRequest((request) {
diff --git a/test/client/stream_test.dart b/test/client/stream_test.dart
index b9a31c6..031d38d 100644
--- a/test/client/stream_test.dart
+++ b/test/client/stream_test.dart
@@ -76,4 +76,27 @@
     expect(() => responseController.stream.listen((_) {}), throwsStateError);
     expect(requestController.isClosed, isTrue);
   });
+
+  group("a stream error", () {
+    test("is reported through .done", () {
+      expect(client.listen(), throwsA("oh no!"));
+      expect(client.done, throwsA("oh no!"));
+      responseController.addError("oh no!");
+    });
+
+    test("cause a pending request to throw a StateError", () {
+      expect(client.listen(), throwsA("oh no!"));
+      expect(client.sendRequest('foo'), throwsStateError);
+      responseController.addError("oh no!");
+    });
+
+    test("causes future requests to throw StateErrors", () async {
+      expect(client.listen(), throwsA("oh no!"));
+      responseController.addError("oh no!");
+      await pumpEventQueue();
+
+      expect(() => client.sendRequest('foo'), throwsStateError);
+      expect(() => client.sendNotification('foo'), throwsStateError);
+    });
+  });
 }