Add a done getter to Client, Server, and Peer.

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//810333007
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a916c9e..1934fc0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 1.1.0
+
+* Add a `done` getter to `Client`, `Server`, and `Peer`.
+
 ## 1.0.0
 
 * Add a `Client` class for communicating with external JSON-RPC 2.0 servers.
diff --git a/lib/src/client.dart b/lib/src/client.dart
index f3c63ca..f63fde6 100644
--- a/lib/src/client.dart
+++ b/lib/src/client.dart
@@ -32,6 +32,11 @@
   /// completed with those requests' responses.
   final _pendingRequests = new Map<int, Completer>();
 
+  /// Returns a [Future] that completes when the connection is closed.
+  ///
+  /// This is the same future that's returned by [listen].
+  Future get done => _streams.done;
+
   /// Creates a [Client] that writes requests to [requests] and reads responses
   /// from [responses].
   ///
diff --git a/lib/src/peer.dart b/lib/src/peer.dart
index c83724d..54c38b7 100644
--- a/lib/src/peer.dart
+++ b/lib/src/peer.dart
@@ -41,6 +41,8 @@
   /// and [_client].
   final _outgoingForwarder = new StreamController(sync: true);
 
+  Future get done => _streams.done;
+
   /// Creates a [Peer] that reads incoming messages from [incoming] and writes
   /// outgoing messages to [outgoing].
   ///
diff --git a/lib/src/server.dart b/lib/src/server.dart
index bd243de..4458e0e 100644
--- a/lib/src/server.dart
+++ b/lib/src/server.dart
@@ -38,6 +38,11 @@
   /// [RpcException.methodNotFound] exception.
   final _fallbacks = new Queue<Function>();
 
+  /// Returns a [Future] that completes when the connection is closed.
+  ///
+  /// This is the same future that's returned by [listen].
+  Future get done => _streams.done;
+
   /// Creates a [Server] that reads requests from [requests] and writes
   /// responses to [responses].
   ///
@@ -135,7 +140,9 @@
         var nonNull = results.where((result) => result != null);
         return nonNull.isEmpty ? null : nonNull.toList();
       });
-    }).then(_streams.add);
+    }).then((response) {
+      if (response != null) _streams.add(response);
+    });
   }
 
   /// Handles an individual parsed request.
diff --git a/lib/src/two_way_stream.dart b/lib/src/two_way_stream.dart
index f470c2f..5914173 100644
--- a/lib/src/two_way_stream.dart
+++ b/lib/src/two_way_stream.dart
@@ -32,10 +32,11 @@
   /// This takes decoded JSON objects.
   final StreamSink _output;
 
-  /// The completer for [listen].
+  /// Returns a [Future] that completes when the connection is closed.
   ///
-  /// This is non-`null` after [listen] has been called.
-  Completer _listenCompleter;
+  /// This is the same future that's returned by [listen].
+  Future get done => _doneCompleter.future;
+  final _doneCompleter = new Completer();
 
   /// Creates a two-way stream.
   ///
@@ -95,23 +96,22 @@
   /// The returned Future will complete when the input stream is closed. If the
   /// input stream emits an error, that will be piped to the returned Future.
   Future listen(void handleInput(input)) {
-    if (_listenCompleter != null) {
+    if (_inputSubscription != null) {
       throw new StateError("Can only call $_name.listen once.");
     }
 
-    _listenCompleter = new Completer();
     _inputSubscription = _input.listen(handleInput,
         onError: (error, stackTrace) {
-      if (_listenCompleter.isCompleted) return;
+      if (_doneCompleter.isCompleted) return;
       _output.close();
-      _listenCompleter.completeError(error, stackTrace);
+      _doneCompleter.completeError(error, stackTrace);
     }, onDone: () {
-      if (_listenCompleter.isCompleted) return;
+      if (_doneCompleter.isCompleted) return;
       _output.close();
-      _listenCompleter.complete();
+      _doneCompleter.complete();
     }, cancelOnError: true);
 
-    return _listenCompleter.future;
+    return _doneCompleter.future;
   }
 
   /// Emit [event] on the output stream.
@@ -119,11 +119,11 @@
 
   /// Stops listening to the input stream and closes the output stream.
   Future close() {
-    if (_listenCompleter == null) {
+    if (_inputSubscription == null) {
       throw new StateError("Can't call $_name.close before $_name.listen.");
     }
 
-    if (!_listenCompleter.isCompleted) _listenCompleter.complete();
+    if (!_doneCompleter.isCompleted) _doneCompleter.complete();
 
     var inputFuture = _inputSubscription.cancel();
     // TODO(nweiz): include the output future in the return value when issue
diff --git a/pubspec.yaml b/pubspec.yaml
index 09f5441..abc05ab 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: json_rpc_2
-version: 1.0.0
+version: 1.1.0
 author: Dart Team <misc@dartlang.org>
 description: An implementation of the JSON-RPC 2.0 spec.
 homepage: http://github.com/dart-lang/json_rpc_2