Add Server.isClosed and Client.isClosed.

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//1328503003 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a1bd217..7de0e87 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,13 @@
+## 1.2.0
+
+* Add `Client.isClosed` and `Server.isClosed`, which make it possible to
+  synchronously determine whether the connection is open. In particular, this
+  makes it possible to reliably tell whether it's safe to call
+  `Client.sendRequest`.
+
+* Fix a race condition in `Server` where a `StateError` could be thrown if the
+  connection was closed in the middle of handling a request.
+
 ## 1.1.1
 
 * Update the README to match the current API.
diff --git a/lib/src/client.dart b/lib/src/client.dart
index f63fde6..763a987 100644
--- a/lib/src/client.dart
+++ b/lib/src/client.dart
@@ -37,6 +37,9 @@
   /// This is the same future that's returned by [listen].
   Future get done => _streams.done;
 
+  /// Whether the connection is closed.
+  bool get isClosed => _streams.isClosed;
+
   /// Creates a [Client] that writes requests to [requests] and reads responses
   /// from [responses].
   ///
diff --git a/lib/src/peer.dart b/lib/src/peer.dart
index 54c38b7..7821d9f 100644
--- a/lib/src/peer.dart
+++ b/lib/src/peer.dart
@@ -42,6 +42,7 @@
   final _outgoingForwarder = new StreamController(sync: true);
 
   Future get done => _streams.done;
+  bool get isClosed => _streams.isClosed;
 
   /// Creates a [Peer] that reads incoming messages from [incoming] and writes
   /// outgoing messages to [outgoing].
diff --git a/lib/src/server.dart b/lib/src/server.dart
index 4458e0e..4188e78 100644
--- a/lib/src/server.dart
+++ b/lib/src/server.dart
@@ -43,6 +43,9 @@
   /// This is the same future that's returned by [listen].
   Future get done => _streams.done;
 
+  /// Whether the connection is closed.
+  bool get isClosed => _streams.isClosed;
+
   /// Creates a [Server] that reads requests from [requests] and writes
   /// responses to [responses].
   ///
@@ -141,7 +144,7 @@
         return nonNull.isEmpty ? null : nonNull.toList();
       });
     }).then((response) {
-      if (response != null) _streams.add(response);
+      if (!_streams.isClosed && response != null) _streams.add(response);
     });
   }
 
diff --git a/lib/src/two_way_stream.dart b/lib/src/two_way_stream.dart
index 5914173..88a8dcc 100644
--- a/lib/src/two_way_stream.dart
+++ b/lib/src/two_way_stream.dart
@@ -38,6 +38,9 @@
   Future get done => _doneCompleter.future;
   final _doneCompleter = new Completer();
 
+  /// Whether the stream has been closed.
+  bool get isClosed => _doneCompleter.isCompleted;
+
   /// Creates a two-way stream.
   ///
   /// [input] and [output] should emit and take (respectively) JSON-encoded
@@ -103,12 +106,12 @@
     _inputSubscription = _input.listen(handleInput,
         onError: (error, stackTrace) {
       if (_doneCompleter.isCompleted) return;
-      _output.close();
       _doneCompleter.completeError(error, stackTrace);
+      _output.close();
     }, onDone: () {
       if (_doneCompleter.isCompleted) return;
-      _output.close();
       _doneCompleter.complete();
+      _output.close();
     }, cancelOnError: true);
 
     return _doneCompleter.future;
diff --git a/pubspec.yaml b/pubspec.yaml
index d895698..348dc35 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: json_rpc_2
-version: 1.1.2-dev
+version: 1.2.0
 author: Dart Team <misc@dartlang.org>
 description: An implementation of the JSON-RPC 2.0 spec.
 homepage: http://github.com/dart-lang/json_rpc_2
diff --git a/test/client/stream_test.dart b/test/client/stream_test.dart
index 20b21bd..ba5db5b 100644
--- a/test/client/stream_test.dart
+++ b/test/client/stream_test.dart
@@ -79,7 +79,10 @@
         responseController.stream, requestController.sink);
 
     expect(client.listen(), completes);
+
+    expect(client.isClosed, isFalse);
     expect(client.close(), completes);
+    expect(client.isClosed, isTrue);
 
     expect(() => responseController.stream.listen((_) {}), throwsStateError);
     expect(requestController.isClosed, isTrue);
diff --git a/test/server/stream_test.dart b/test/server/stream_test.dart
index a7a2d86..8fd1239 100644
--- a/test/server/stream_test.dart
+++ b/test/server/stream_test.dart
@@ -81,7 +81,10 @@
         requestController.stream, responseController.sink);
 
     expect(server.listen(), completes);
+
+    expect(server.isClosed, isFalse);
     expect(server.close(), completes);
+    expect(server.isClosed, isTrue);
 
     expect(() => requestController.stream.listen((_) {}), throwsStateError);
     expect(responseController.isClosed, isTrue);