Convert json_rpc.Server to take a Stream and StreamSink.

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//309503005

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart/pkg/json_rpc_2@37012 260f80e4-7a28-3924-810f-c04153c831b5
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ba15fb3..34123f9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,15 @@
+## 0.1.0
+
+* Remove `Server.handleRequest()` and `Server.parseRequest()`. Instead, `new
+  Server()` takes a `Stream` and a `StreamSink` and uses those behind-the-scenes
+  for its communication.
+
+* Add `Server.listen()`, which causes the server to begin listening to the
+  underlying request stream.
+
+* Add `Server.close()`, which closes the underlying request stream and response
+  sink.
+
 ## 0.0.2+3
 
 * Widen the version constraint for `stack_trace`.
diff --git a/lib/src/server.dart b/lib/src/server.dart
index d05c54f..c7ece5b 100644
--- a/lib/src/server.dart
+++ b/lib/src/server.dart
@@ -26,6 +26,20 @@
 /// asynchronously, it's possible for multiple methods to be invoked at the same
 /// time, or even for a single method to be invoked multiple times at once.
 class Server {
+  /// The stream for decoded requests.
+  final Stream _requests;
+
+  /// The subscription to the decoded request stream.
+  StreamSubscription _requestSubscription;
+
+  /// The sink for decoded responses.
+  final StreamSink _responses;
+
+  /// The completer for [listen].
+  ///
+  /// This is non-`null` after [listen] has been called.
+  Completer _listenCompleter;
+
   /// The methods registered for this server.
   final _methods = new Map<String, Function>();
 
@@ -35,7 +49,104 @@
   /// [RpcException.methodNotFound] exception.
   final _fallbacks = new Queue<Function>();
 
-  Server();
+  /// Creates a [Server] that reads requests from [requests] and writes
+  /// responses to [responses].
+  ///
+  /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a
+  /// `WebSocket`), [responses] may be omitted.
+  ///
+  /// Note that the server won't begin listening to [requests] until
+  /// [Server.listen] is called.
+  factory Server(Stream<String> requests, [StreamSink<String> responses]) {
+    if (responses == null) {
+      if (requests is! StreamSink) {
+        throw new ArgumentError("Either `requests` must be a StreamSink or "
+            "`responses` must be passed.");
+      }
+      responses = requests as StreamSink;
+    }
+
+    var wrappedResponses = mapStreamSink(responses, JSON.encode);
+    return new Server.withoutJson(requests.expand((request) {
+      var decodedRequest;
+      try {
+        decodedRequest = JSON.decode(request);
+      } on FormatException catch (error) {
+        wrappedResponses.add(new RpcException(error_code.PARSE_ERROR,
+            'Invalid JSON: ${error.message}').serialize(request));
+        return [];
+      }
+
+      return [decodedRequest];
+    }), wrappedResponses);
+  }
+
+  /// Creates a [Server] that reads decoded requests from [requests] and writes
+  /// decoded responses to [responses].
+  ///
+  /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it
+  /// reads and writes decoded maps or lists.
+  ///
+  /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be
+  /// omitted.
+  ///
+  /// Note that the server won't begin listening to [requests] until
+  /// [Server.listen] is called.
+  Server.withoutJson(Stream requests, [StreamSink responses])
+      : _requests = requests,
+        _responses = responses == null && requests is StreamSink ?
+            requests : responses {
+    if (_responses == null) {
+      throw new ArgumentError("Either `requests` must be a StreamSink or "
+          "`responses` must be passed.");
+    }
+  }
+
+  /// Starts listening to the underlying stream.
+  ///
+  /// Returns a [Future] that will complete when the stream is closed or when it
+  /// has an error.
+  ///
+  /// [listen] may only be called once.
+  Future listen() {
+    if (_listenCompleter != null) {
+      throw new StateError(
+          "Can only call Server.listen once on a given server.");
+    }
+
+    _listenCompleter = new Completer();
+    _requestSubscription = _requests.listen(_handleRequest,
+        onError: (error, stackTrace) {
+      if (_listenCompleter.isCompleted) return;
+      _responses.close();
+      _listenCompleter.completeError(error, stackTrace);
+    }, onDone: () {
+      if (_listenCompleter.isCompleted) return;
+      _responses.close();
+      _listenCompleter.complete();
+    }, cancelOnError: true);
+
+    return _listenCompleter.future;
+  }
+
+  /// Closes the server's request subscription and response sink.
+  ///
+  /// Returns a [Future] that completes when all resources have been released.
+  ///
+  /// A server can't be closed before [listen] has been called.
+  Future close() {
+    if (_listenCompleter == null) {
+      throw new StateError("Can't call Server.close before Server.listen.");
+    }
+
+    if (!_listenCompleter.isCompleted) _listenCompleter.complete();
+
+    var subscriptionFuture = _requestSubscription.cancel();
+    // TODO(nweiz): include the response future in the return value when issue
+    // 19095 is fixed.
+    _responses.close();
+    return subscriptionFuture == null ? new Future.value() : subscriptionFuture;
+  }
 
   /// Registers a method named [name] on this server.
   ///
@@ -69,14 +180,14 @@
     _fallbacks.add(callback);
   }
 
-  /// Handle a request that's already been parsed from JSON.
+  /// Handle a request.
   ///
   /// [request] is expected to be a JSON-serializable object representing a
   /// request sent by a client. This calls the appropriate method or methods for
   /// handling that request and returns a JSON-serializable response, or `null`
   /// if no response should be sent. [callback] may send custom
   /// errors by throwing an [RpcException].
-  Future handleRequest(request) {
+  Future _handleRequest(request) {
     return syncFuture(() {
       if (request is! List) return _handleSingleRequest(request);
       if (request.isEmpty) {
@@ -88,28 +199,7 @@
         var nonNull = results.where((result) => result != null);
         return nonNull.isEmpty ? null : nonNull.toList();
       });
-    });
-  }
-
-  /// Parses and handles a JSON serialized request.
-  ///
-  /// This calls the appropriate method or methods for handling that request and
-  /// returns a JSON string, or `null` if no response should be sent.
-  Future<String> parseRequest(String request) {
-    return syncFuture(() {
-      var decodedRequest;
-      try {
-        decodedRequest = JSON.decode(request);
-      } on FormatException catch (error) {
-        return new RpcException(error_code.PARSE_ERROR, 'Invalid JSON: '
-            '${error.message}').serialize(request);
-      }
-
-      return handleRequest(decodedRequest);
-    }).then((response) {
-      if (response == null) return null;
-      return JSON.encode(response);
-    });
+    }).then(_responses.add);
   }
 
   /// Handles an individual parsed request.
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index 1eff004..a212f58 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -43,3 +43,24 @@
 /// [toString], so we remove that if it exists.
 String getErrorMessage(error) =>
     error.toString().replaceFirst(_exceptionPrefix, '');
+
+/// Returns a [StreamSink] that wraps [sink] and maps each event added using
+/// [callback].
+StreamSink mapStreamSink(StreamSink sink, callback(event)) =>
+    new _MappedStreamSink(sink, callback);
+
+/// A [StreamSink] wrapper that maps each event added to the sink.
+class _MappedStreamSink implements StreamSink {
+  final StreamSink _inner;
+  final Function _callback;
+
+  Future get done => _inner.done;
+
+  _MappedStreamSink(this._inner, this._callback);
+
+  void add(event) => _inner.add(_callback(event));
+  void addError(error, [StackTrace stackTrace]) =>
+      _inner.addError(error, stackTrace);
+  Future addStream(Stream stream) => _inner.addStream(stream.map(_callback));
+  Future close() => _inner.close();
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index 2cf1bc8..df1a8a0 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: json_rpc_2
-version: 0.0.2+3
+version: 0.1.0
 author: Dart Team <misc@dartlang.org>
 description: An implementation of the JSON-RPC 2.0 spec.
 homepage: http://www.dartlang.org
@@ -7,7 +7,7 @@
 dependencies:
   stack_trace: '>=0.9.1 <2.0.0'
 dev_dependencies:
-  unittest: ">=0.9.0 <0.10.0"
+  unittest: ">=0.9.0 <0.12.0"
 environment:
   sdk: ">=1.2.0 <2.0.0"
 
diff --git a/test/server/batch_test.dart b/test/server/batch_test.dart
index 441df58..7dda84f 100644
--- a/test/server/batch_test.dart
+++ b/test/server/batch_test.dart
@@ -13,16 +13,17 @@
 import 'utils.dart';
 
 void main() {
-  var server;
+  var controller;
   setUp(() {
-    server = new json_rpc.Server()
+    controller = new ServerController();
+    controller.server
         ..registerMethod('foo', () => 'foo')
         ..registerMethod('id', (params) => params.value)
         ..registerMethod('arg', (params) => params['arg'].value);
   });
 
   test('handles a batch of requests', () {
-    expect(server.handleRequest([
+    expect(controller.handleRequest([
       {'jsonrpc': '2.0', 'method': 'foo', 'id': 1},
       {'jsonrpc': '2.0', 'method': 'id', 'params': ['value'], 'id': 2},
       {'jsonrpc': '2.0', 'method': 'arg', 'params': {'arg': 'value'}, 'id': 3}
@@ -34,7 +35,7 @@
   });
 
   test('handles errors individually', () {
-    expect(server.handleRequest([
+    expect(controller.handleRequest([
       {'jsonrpc': '2.0', 'method': 'foo', 'id': 1},
       {'jsonrpc': '2.0', 'method': 'zap', 'id': 2},
       {'jsonrpc': '2.0', 'method': 'arg', 'params': {'arg': 'value'}, 'id': 3}
@@ -54,7 +55,7 @@
   });
 
   test('handles notifications individually', () {
-    expect(server.handleRequest([
+    expect(controller.handleRequest([
       {'jsonrpc': '2.0', 'method': 'foo', 'id': 1},
       {'jsonrpc': '2.0', 'method': 'id', 'params': ['value']},
       {'jsonrpc': '2.0', 'method': 'arg', 'params': {'arg': 'value'}, 'id': 3}
@@ -65,7 +66,7 @@
   });
 
   test('returns nothing if every request is a notification', () {
-    expect(server.handleRequest([
+    expect(controller.handleRequest([
       {'jsonrpc': '2.0', 'method': 'foo'},
       {'jsonrpc': '2.0', 'method': 'id', 'params': ['value']},
       {'jsonrpc': '2.0', 'method': 'arg', 'params': {'arg': 'value'}}
@@ -73,24 +74,12 @@
   });
 
   test('returns an error if the batch is empty', () {
-    expectErrorResponse(server, [], error_code.INVALID_REQUEST,
+    expectErrorResponse(controller, [], error_code.INVALID_REQUEST,
         'A batch must contain at least one request.');
   });
 
-  test('handles a batch of requests parsed from JSON', () {
-    expect(server.parseRequest(JSON.encode([
-      {'jsonrpc': '2.0', 'method': 'foo', 'id': 1},
-      {'jsonrpc': '2.0', 'method': 'id', 'params': ['value'], 'id': 2},
-      {'jsonrpc': '2.0', 'method': 'arg', 'params': {'arg': 'value'}, 'id': 3}
-    ])), completion(equals(JSON.encode([
-      {'jsonrpc': '2.0', 'result': 'foo', 'id': 1},
-      {'jsonrpc': '2.0', 'result': ['value'], 'id': 2},
-      {'jsonrpc': '2.0', 'result': 'value', 'id': 3}
-    ]))));
-  });
-
   test('disallows nested batches', () {
-    expect(server.handleRequest([
+    expect(controller.handleRequest([
       [{'jsonrpc': '2.0', 'method': 'foo', 'id': 1}]
     ]), completion(equals([{
       'jsonrpc': '2.0',
diff --git a/test/server/invalid_request_test.dart b/test/server/invalid_request_test.dart
index feeefea..74cf86f 100644
--- a/test/server/invalid_request_test.dart
+++ b/test/server/invalid_request_test.dart
@@ -13,23 +13,23 @@
 import 'utils.dart';
 
 void main() {
-  var server;
-  setUp(() => server = new json_rpc.Server());
+  var controller;
+  setUp(() => controller = new ServerController());
 
   test("a non-Array/Object request is invalid", () {
-    expectErrorResponse(server, 'foo', error_code.INVALID_REQUEST,
+    expectErrorResponse(controller, 'foo', error_code.INVALID_REQUEST,
         'Request must be an Array or an Object.');
   });
 
   test("requests must have a jsonrpc key", () {
-    expectErrorResponse(server, {
+    expectErrorResponse(controller, {
       'method': 'foo',
       'id': 1234
     }, error_code.INVALID_REQUEST, 'Request must contain a "jsonrpc" key.');
   });
 
   test("the jsonrpc version must be 2.0", () {
-    expectErrorResponse(server, {
+    expectErrorResponse(controller, {
       'jsonrpc': '1.0',
       'method': 'foo',
       'id': 1234
@@ -38,14 +38,14 @@
   });
 
   test("requests must have a method key", () {
-    expectErrorResponse(server, {
+    expectErrorResponse(controller, {
       'jsonrpc': '2.0',
       'id': 1234
     }, error_code.INVALID_REQUEST, 'Request must contain a "method" key.');
   });
 
   test("request method must be a string", () {
-    expectErrorResponse(server, {
+    expectErrorResponse(controller, {
       'jsonrpc': '2.0',
       'method': 1234,
       'id': 1234
@@ -54,7 +54,7 @@
   });
 
   test("request params must be an Array or Object", () {
-    expectErrorResponse(server, {
+    expectErrorResponse(controller, {
       'jsonrpc': '2.0',
       'method': 'foo',
       'params': 1234,
@@ -64,7 +64,7 @@
   });
 
   test("request id may not be an Array or Object", () {
-    expect(server.handleRequest({
+    expect(controller.handleRequest({
       'jsonrpc': '2.0',
       'method': 'foo',
       'id': {'bad': 'id'}
diff --git a/test/server/server_test.dart b/test/server/server_test.dart
index c18a8ca..a89a0d9 100644
--- a/test/server/server_test.dart
+++ b/test/server/server_test.dart
@@ -13,15 +13,15 @@
 import 'utils.dart';
 
 void main() {
-  var server;
-  setUp(() => server = new json_rpc.Server());
+  var controller;
+  setUp(() => controller = new ServerController());
 
   test("calls a registered method with the given name", () {
-    server.registerMethod('foo', (params) {
+    controller.server.registerMethod('foo', (params) {
       return {'params': params.value};
     });
 
-    expect(server.handleRequest({
+    expect(controller.handleRequest({
       'jsonrpc': '2.0',
       'method': 'foo',
       'params': {'param': 'value'},
@@ -34,9 +34,9 @@
   });
 
   test("calls a method that takes no parameters", () {
-    server.registerMethod('foo', () => 'foo');
+    controller.server.registerMethod('foo', () => 'foo');
 
-    expect(server.handleRequest({
+    expect(controller.handleRequest({
       'jsonrpc': '2.0',
       'method': 'foo',
       'id': 1234
@@ -48,9 +48,9 @@
   });
 
   test("a method that takes no parameters rejects parameters", () {
-    server.registerMethod('foo', () => 'foo');
+    controller.server.registerMethod('foo', () => 'foo');
 
-    expectErrorResponse(server, {
+    expectErrorResponse(controller, {
       'jsonrpc': '2.0',
       'method': 'foo',
       'params': {},
@@ -61,9 +61,9 @@
   });
 
   test("an unexpected error in a method is captured", () {
-    server.registerMethod('foo', () => throw new FormatException('bad format'));
+    controller.server.registerMethod('foo', () => throw new FormatException('bad format'));
 
-    expect(server.handleRequest({
+    expect(controller.handleRequest({
       'jsonrpc': '2.0',
       'method': 'foo',
       'id': 1234
@@ -83,9 +83,9 @@
   });
 
   test("doesn't return a result for a notification", () {
-    server.registerMethod('foo', (args) => 'result');
+    controller.server.registerMethod('foo', (args) => 'result');
 
-    expect(server.handleRequest({
+    expect(controller.handleRequest({
       'jsonrpc': '2.0',
       'method': 'foo',
       'params': {}
@@ -93,11 +93,11 @@
   });
 
   test("includes the error data in the response", () {
-    server.registerMethod('foo', (params) {
+    controller.server.registerMethod('foo', (params) {
       throw new json_rpc.RpcException(5, 'Error message.', data: 'data value');
     });
 
-    expectErrorResponse(server, {
+    expectErrorResponse(controller, {
       'jsonrpc': '2.0',
       'method': 'foo',
       'params': {},
@@ -108,58 +108,28 @@
         data: 'data value');
   });
 
-  group("JSON", () {
-    test("handles a request parsed from JSON", () {
-      server.registerMethod('foo', (params) {
-        return {'params': params.value};
-      });
-
-      expect(server.parseRequest(JSON.encode({
+  test("a JSON parse error is rejected", () {
+    return controller.handleJsonRequest('invalid json {').then((result) {
+      expect(JSON.decode(result), {
         'jsonrpc': '2.0',
-        'method': 'foo',
-        'params': {'param': 'value'},
-        'id': 1234
-      })), completion(equals(JSON.encode({
-        'jsonrpc': '2.0',
-        'result': {'params': {'param': 'value'}},
-        'id': 1234
-      }))));
-    });
-
-    test("handles a notification parsed from JSON", () {
-      server.registerMethod('foo', (params) {
-        return {'params': params};
-      });
-
-      expect(server.parseRequest(JSON.encode({
-        'jsonrpc': '2.0',
-        'method': 'foo',
-        'params': {'param': 'value'}
-      })), completion(isNull));
-    });
-
-    test("a JSON parse error is rejected", () {
-      return server.parseRequest('invalid json {').then((result) {
-        expect(JSON.decode(result), {
-          'jsonrpc': '2.0',
-          'error': {
-            'code': error_code.PARSE_ERROR,
-            'message': startsWith("Invalid JSON: "),
-            'data': {'request': 'invalid json {'}
-          },
-          'id': null
-        });
+        'error': {
+          'code': error_code.PARSE_ERROR,
+          'message': startsWith("Invalid JSON: "),
+          'data': {'request': 'invalid json {'}
+        },
+        'id': null
       });
     });
   });
 
   group("fallbacks", () {
     test("calls a fallback if no method matches", () {
-      server.registerMethod('foo', () => 'foo');
-      server.registerMethod('bar', () => 'foo');
-      server.registerFallback((params) => {'fallback': params.value});
+      controller.server
+          ..registerMethod('foo', () => 'foo')
+          ..registerMethod('bar', () => 'foo')
+          ..registerFallback((params) => {'fallback': params.value});
 
-      expect(server.handleRequest({
+      expect(controller.handleRequest({
         'jsonrpc': '2.0',
         'method': 'baz',
         'params': {'param': 'value'},
@@ -172,13 +142,13 @@
     });
 
     test("calls the first matching fallback", () {
-      server.registerFallback((params) =>
-          throw new json_rpc.RpcException.methodNotFound(params.method));
+      controller.server
+          ..registerFallback((params) =>
+              throw new json_rpc.RpcException.methodNotFound(params.method))
+          ..registerFallback((params) => 'fallback 2')
+          ..registerFallback((params) => 'fallback 3');
 
-      server.registerFallback((params) => 'fallback 2');
-      server.registerFallback((params) => 'fallback 3');
-
-      expect(server.handleRequest({
+      expect(controller.handleRequest({
         'jsonrpc': '2.0',
         'method': 'fallback 2',
         'id': 1234
@@ -190,9 +160,10 @@
     });
 
     test("an unexpected error in a fallback is captured", () {
-      server.registerFallback((_) => throw new FormatException('bad format'));
+      controller.server.registerFallback((_) =>
+          throw new FormatException('bad format'));
 
-      expect(server.handleRequest({
+      expect(controller.handleRequest({
         'jsonrpc': '2.0',
         'method': 'foo',
         'id': 1234
@@ -213,7 +184,8 @@
   });
 
   test("disallows multiple methods with the same name", () {
-    server.registerMethod('foo', () => null);
-    expect(() => server.registerMethod('foo', () => null), throwsArgumentError);
+    controller.server.registerMethod('foo', () => null);
+    expect(() => controller.server.registerMethod('foo', () => null),
+        throwsArgumentError);
   });
 }
diff --git a/test/server/stream_test.dart b/test/server/stream_test.dart
new file mode 100644
index 0000000..5459e3e
--- /dev/null
+++ b/test/server/stream_test.dart
@@ -0,0 +1,98 @@
+// Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library json_rpc_2.test.server.stream_test;
+
+import 'dart:async';
+
+import 'package:unittest/unittest.dart';
+import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
+
+import 'utils.dart';
+
+void main() {
+  test(".withoutJson supports decoded stream and sink", () {
+    var requestController = new StreamController();
+    var responseController = new StreamController();
+    var server = new json_rpc.Server.withoutJson(
+        requestController.stream, responseController.sink);
+    server.listen();
+
+    server.registerMethod('foo', (params) {
+      return {'params': params.value};
+    });
+
+    requestController.add({
+      'jsonrpc': '2.0',
+      'method': 'foo',
+      'params': {'param': 'value'},
+      'id': 1234
+    });
+
+    expect(responseController.stream.first, completion(equals({
+      'jsonrpc': '2.0',
+      'result': {'params': {'param': 'value'}},
+      'id': 1234
+    })));
+  });
+
+  test(".listen returns when the controller is closed", () {
+    var requestController = new StreamController();
+    var responseController = new StreamController();
+    var server = new json_rpc.Server(
+        requestController.stream, responseController.sink);
+
+    var hasListenCompeted = false;
+    expect(server.listen().then((_) => hasListenCompeted = true), completes);
+
+    return pumpEventQueue().then((_) {
+      expect(hasListenCompeted, isFalse);
+
+      // This should cause listen to complete.
+      return requestController.close();
+    });
+  });
+
+  test(".listen returns a stream error", () {
+    var requestController = new StreamController();
+    var responseController = new StreamController();
+    var server = new json_rpc.Server(
+        requestController.stream, responseController.sink);
+
+    expect(server.listen(), throwsA('oh no'));
+    requestController.addError('oh no');
+  });
+
+  test(".listen can't be called twice", () {
+    var requestController = new StreamController();
+    var responseController = new StreamController();
+    var server = new json_rpc.Server(
+        requestController.stream, responseController.sink);
+    server.listen();
+
+    expect(() => server.listen(), throwsStateError);
+  });
+
+  test(".close cancels the stream subscription and closes the sink", () {
+    var requestController = new StreamController();
+    var responseController = new StreamController();
+    var server = new json_rpc.Server(
+        requestController.stream, responseController.sink);
+
+    expect(server.listen(), completes);
+    expect(server.close(), completes);
+
+    expect(() => requestController.stream.listen((_) {}), throwsStateError);
+    expect(responseController.isClosed, isTrue);
+  });
+
+  test(".close can't be called before .listen", () {
+    var requestController = new StreamController();
+    var responseController = new StreamController();
+    var server = new json_rpc.Server(
+        requestController.stream, responseController.sink);
+
+    expect(() => server.close(), throwsStateError);
+  });
+}
diff --git a/test/server/utils.dart b/test/server/utils.dart
index 6f92c0a..a4fd36a 100644
--- a/test/server/utils.dart
+++ b/test/server/utils.dart
@@ -4,16 +4,53 @@
 
 library json_rpc_2.test.server.util;
 
-import 'package:unittest/unittest.dart';
-import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
+import 'dart:async';
+import 'dart:convert';
 
-void expectErrorResponse(json_rpc.Server server, request, int errorCode,
+import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
+import 'package:json_rpc_2/error_code.dart' as error_code;
+import 'package:unittest/unittest.dart';
+
+/// A controller used to test a [json_rpc.Server].
+class ServerController {
+  /// The controller for the server's request stream.
+  final _requestController = new StreamController<String>();
+
+  /// The controller for the server's response sink.
+  final _responseController = new StreamController<String>();
+
+  /// The server.
+  json_rpc.Server get server => _server;
+  json_rpc.Server _server;
+
+  ServerController() {
+    _server = new json_rpc.Server(
+        _requestController.stream, _responseController.sink);
+    _server.listen();
+  }
+
+  /// Passes [request], a decoded request, to [server] and returns its decoded
+  /// response.
+  Future handleRequest(request) =>
+      handleJsonRequest(JSON.encode(request)).then(JSON.decode);
+
+  /// Passes [request], a JSON-encoded request, to [server] and returns its
+  /// encoded response.
+  Future handleJsonRequest(String request) {
+    _requestController.add(request);
+    return _responseController.stream.first;
+  }
+}
+
+/// Expects that [controller]'s server will return an error response to
+/// [request] with the given [errorCode], [message], and [data].
+void expectErrorResponse(ServerController controller, request, int errorCode,
     String message, {data}) {
   var id;
   if (request is Map) id = request['id'];
   if (data == null) data = {'request': request};
 
-  expect(server.handleRequest(request), completion(equals({
+  expect(controller.handleRequest(request), completion(equals({
     'jsonrpc': '2.0',
     'id': id,
     'error': {
@@ -24,10 +61,25 @@
   })));
 }
 
+/// Returns a matcher that matches a [json_rpc.RpcException] with an
+/// `invalid_params` error code.
 Matcher throwsInvalidParams(String message) {
   return throwsA(predicate((error) {
     expect(error, new isInstanceOf<json_rpc.RpcException>());
+    expect(error.code, equals(error_code.INVALID_PARAMS));
     expect(error.message, equals(message));
     return true;
   }));
 }
+
+/// Returns a [Future] that completes after pumping the event queue [times]
+/// times. By default, this should pump the event queue enough times to allow
+/// any code to run, as long as it's not waiting on some external event.
+Future pumpEventQueue([int times = 20]) {
+  if (times == 0) return new Future.value();
+  // We use a delayed future to allow microtask events to finish. The
+  // Future.value or Future() constructors use scheduleMicrotask themselves and
+  // would therefore not wait for microtask callbacks that are scheduled after
+  // invoking this method.
+  return new Future.delayed(Duration.ZERO, () => pumpEventQueue(times - 1));
+}