Convert json_rpc.Server to take a Stream and StreamSink. R=rnystrom@google.com Review URL: https://codereview.chromium.org//309503005 git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart/pkg/json_rpc_2@37012 260f80e4-7a28-3924-810f-c04153c831b5
diff --git a/pkgs/json_rpc_2/CHANGELOG.md b/pkgs/json_rpc_2/CHANGELOG.md index ba15fb3..34123f9 100644 --- a/pkgs/json_rpc_2/CHANGELOG.md +++ b/pkgs/json_rpc_2/CHANGELOG.md
@@ -1,3 +1,15 @@ +## 0.1.0 + +* Remove `Server.handleRequest()` and `Server.parseRequest()`. Instead, `new + Server()` takes a `Stream` and a `StreamSink` and uses those behind-the-scenes + for its communication. + +* Add `Server.listen()`, which causes the server to begin listening to the + underlying request stream. + +* Add `Server.close()`, which closes the underlying request stream and response + sink. + ## 0.0.2+3 * Widen the version constraint for `stack_trace`.
diff --git a/pkgs/json_rpc_2/lib/src/server.dart b/pkgs/json_rpc_2/lib/src/server.dart index d05c54f..c7ece5b 100644 --- a/pkgs/json_rpc_2/lib/src/server.dart +++ b/pkgs/json_rpc_2/lib/src/server.dart
@@ -26,6 +26,20 @@ /// asynchronously, it's possible for multiple methods to be invoked at the same /// time, or even for a single method to be invoked multiple times at once. class Server { + /// The stream for decoded requests. + final Stream _requests; + + /// The subscription to the decoded request stream. + StreamSubscription _requestSubscription; + + /// The sink for decoded responses. + final StreamSink _responses; + + /// The completer for [listen]. + /// + /// This is non-`null` after [listen] has been called. + Completer _listenCompleter; + /// The methods registered for this server. final _methods = new Map<String, Function>(); @@ -35,7 +49,104 @@ /// [RpcException.methodNotFound] exception. final _fallbacks = new Queue<Function>(); - Server(); + /// Creates a [Server] that reads requests from [requests] and writes + /// responses to [responses]. + /// + /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a + /// `WebSocket`), [responses] may be omitted. + /// + /// Note that the server won't begin listening to [requests] until + /// [Server.listen] is called. + factory Server(Stream<String> requests, [StreamSink<String> responses]) { + if (responses == null) { + if (requests is! StreamSink) { + throw new ArgumentError("Either `requests` must be a StreamSink or " + "`responses` must be passed."); + } + responses = requests as StreamSink; + } + + var wrappedResponses = mapStreamSink(responses, JSON.encode); + return new Server.withoutJson(requests.expand((request) { + var decodedRequest; + try { + decodedRequest = JSON.decode(request); + } on FormatException catch (error) { + wrappedResponses.add(new RpcException(error_code.PARSE_ERROR, + 'Invalid JSON: ${error.message}').serialize(request)); + return []; + } + + return [decodedRequest]; + }), wrappedResponses); + } + + /// Creates a [Server] that reads decoded requests from [requests] and writes + /// decoded responses to [responses]. + /// + /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it + /// reads and writes decoded maps or lists. + /// + /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be + /// omitted. + /// + /// Note that the server won't begin listening to [requests] until + /// [Server.listen] is called. + Server.withoutJson(Stream requests, [StreamSink responses]) + : _requests = requests, + _responses = responses == null && requests is StreamSink ? + requests : responses { + if (_responses == null) { + throw new ArgumentError("Either `requests` must be a StreamSink or " + "`responses` must be passed."); + } + } + + /// Starts listening to the underlying stream. + /// + /// Returns a [Future] that will complete when the stream is closed or when it + /// has an error. + /// + /// [listen] may only be called once. + Future listen() { + if (_listenCompleter != null) { + throw new StateError( + "Can only call Server.listen once on a given server."); + } + + _listenCompleter = new Completer(); + _requestSubscription = _requests.listen(_handleRequest, + onError: (error, stackTrace) { + if (_listenCompleter.isCompleted) return; + _responses.close(); + _listenCompleter.completeError(error, stackTrace); + }, onDone: () { + if (_listenCompleter.isCompleted) return; + _responses.close(); + _listenCompleter.complete(); + }, cancelOnError: true); + + return _listenCompleter.future; + } + + /// Closes the server's request subscription and response sink. + /// + /// Returns a [Future] that completes when all resources have been released. + /// + /// A server can't be closed before [listen] has been called. + Future close() { + if (_listenCompleter == null) { + throw new StateError("Can't call Server.close before Server.listen."); + } + + if (!_listenCompleter.isCompleted) _listenCompleter.complete(); + + var subscriptionFuture = _requestSubscription.cancel(); + // TODO(nweiz): include the response future in the return value when issue + // 19095 is fixed. + _responses.close(); + return subscriptionFuture == null ? new Future.value() : subscriptionFuture; + } /// Registers a method named [name] on this server. /// @@ -69,14 +180,14 @@ _fallbacks.add(callback); } - /// Handle a request that's already been parsed from JSON. + /// Handle a request. /// /// [request] is expected to be a JSON-serializable object representing a /// request sent by a client. This calls the appropriate method or methods for /// handling that request and returns a JSON-serializable response, or `null` /// if no response should be sent. [callback] may send custom /// errors by throwing an [RpcException]. - Future handleRequest(request) { + Future _handleRequest(request) { return syncFuture(() { if (request is! List) return _handleSingleRequest(request); if (request.isEmpty) { @@ -88,28 +199,7 @@ var nonNull = results.where((result) => result != null); return nonNull.isEmpty ? null : nonNull.toList(); }); - }); - } - - /// Parses and handles a JSON serialized request. - /// - /// This calls the appropriate method or methods for handling that request and - /// returns a JSON string, or `null` if no response should be sent. - Future<String> parseRequest(String request) { - return syncFuture(() { - var decodedRequest; - try { - decodedRequest = JSON.decode(request); - } on FormatException catch (error) { - return new RpcException(error_code.PARSE_ERROR, 'Invalid JSON: ' - '${error.message}').serialize(request); - } - - return handleRequest(decodedRequest); - }).then((response) { - if (response == null) return null; - return JSON.encode(response); - }); + }).then(_responses.add); } /// Handles an individual parsed request.
diff --git a/pkgs/json_rpc_2/lib/src/utils.dart b/pkgs/json_rpc_2/lib/src/utils.dart index 1eff004..a212f58 100644 --- a/pkgs/json_rpc_2/lib/src/utils.dart +++ b/pkgs/json_rpc_2/lib/src/utils.dart
@@ -43,3 +43,24 @@ /// [toString], so we remove that if it exists. String getErrorMessage(error) => error.toString().replaceFirst(_exceptionPrefix, ''); + +/// Returns a [StreamSink] that wraps [sink] and maps each event added using +/// [callback]. +StreamSink mapStreamSink(StreamSink sink, callback(event)) => + new _MappedStreamSink(sink, callback); + +/// A [StreamSink] wrapper that maps each event added to the sink. +class _MappedStreamSink implements StreamSink { + final StreamSink _inner; + final Function _callback; + + Future get done => _inner.done; + + _MappedStreamSink(this._inner, this._callback); + + void add(event) => _inner.add(_callback(event)); + void addError(error, [StackTrace stackTrace]) => + _inner.addError(error, stackTrace); + Future addStream(Stream stream) => _inner.addStream(stream.map(_callback)); + Future close() => _inner.close(); +}
diff --git a/pkgs/json_rpc_2/pubspec.yaml b/pkgs/json_rpc_2/pubspec.yaml index 2cf1bc8..df1a8a0 100644 --- a/pkgs/json_rpc_2/pubspec.yaml +++ b/pkgs/json_rpc_2/pubspec.yaml
@@ -1,5 +1,5 @@ name: json_rpc_2 -version: 0.0.2+3 +version: 0.1.0 author: Dart Team <misc@dartlang.org> description: An implementation of the JSON-RPC 2.0 spec. homepage: http://www.dartlang.org @@ -7,7 +7,7 @@ dependencies: stack_trace: '>=0.9.1 <2.0.0' dev_dependencies: - unittest: ">=0.9.0 <0.10.0" + unittest: ">=0.9.0 <0.12.0" environment: sdk: ">=1.2.0 <2.0.0"
diff --git a/pkgs/json_rpc_2/test/server/batch_test.dart b/pkgs/json_rpc_2/test/server/batch_test.dart index 441df58..7dda84f 100644 --- a/pkgs/json_rpc_2/test/server/batch_test.dart +++ b/pkgs/json_rpc_2/test/server/batch_test.dart
@@ -13,16 +13,17 @@ import 'utils.dart'; void main() { - var server; + var controller; setUp(() { - server = new json_rpc.Server() + controller = new ServerController(); + controller.server ..registerMethod('foo', () => 'foo') ..registerMethod('id', (params) => params.value) ..registerMethod('arg', (params) => params['arg'].value); }); test('handles a batch of requests', () { - expect(server.handleRequest([ + expect(controller.handleRequest([ {'jsonrpc': '2.0', 'method': 'foo', 'id': 1}, {'jsonrpc': '2.0', 'method': 'id', 'params': ['value'], 'id': 2}, {'jsonrpc': '2.0', 'method': 'arg', 'params': {'arg': 'value'}, 'id': 3} @@ -34,7 +35,7 @@ }); test('handles errors individually', () { - expect(server.handleRequest([ + expect(controller.handleRequest([ {'jsonrpc': '2.0', 'method': 'foo', 'id': 1}, {'jsonrpc': '2.0', 'method': 'zap', 'id': 2}, {'jsonrpc': '2.0', 'method': 'arg', 'params': {'arg': 'value'}, 'id': 3} @@ -54,7 +55,7 @@ }); test('handles notifications individually', () { - expect(server.handleRequest([ + expect(controller.handleRequest([ {'jsonrpc': '2.0', 'method': 'foo', 'id': 1}, {'jsonrpc': '2.0', 'method': 'id', 'params': ['value']}, {'jsonrpc': '2.0', 'method': 'arg', 'params': {'arg': 'value'}, 'id': 3} @@ -65,7 +66,7 @@ }); test('returns nothing if every request is a notification', () { - expect(server.handleRequest([ + expect(controller.handleRequest([ {'jsonrpc': '2.0', 'method': 'foo'}, {'jsonrpc': '2.0', 'method': 'id', 'params': ['value']}, {'jsonrpc': '2.0', 'method': 'arg', 'params': {'arg': 'value'}} @@ -73,24 +74,12 @@ }); test('returns an error if the batch is empty', () { - expectErrorResponse(server, [], error_code.INVALID_REQUEST, + expectErrorResponse(controller, [], error_code.INVALID_REQUEST, 'A batch must contain at least one request.'); }); - test('handles a batch of requests parsed from JSON', () { - expect(server.parseRequest(JSON.encode([ - {'jsonrpc': '2.0', 'method': 'foo', 'id': 1}, - {'jsonrpc': '2.0', 'method': 'id', 'params': ['value'], 'id': 2}, - {'jsonrpc': '2.0', 'method': 'arg', 'params': {'arg': 'value'}, 'id': 3} - ])), completion(equals(JSON.encode([ - {'jsonrpc': '2.0', 'result': 'foo', 'id': 1}, - {'jsonrpc': '2.0', 'result': ['value'], 'id': 2}, - {'jsonrpc': '2.0', 'result': 'value', 'id': 3} - ])))); - }); - test('disallows nested batches', () { - expect(server.handleRequest([ + expect(controller.handleRequest([ [{'jsonrpc': '2.0', 'method': 'foo', 'id': 1}] ]), completion(equals([{ 'jsonrpc': '2.0',
diff --git a/pkgs/json_rpc_2/test/server/invalid_request_test.dart b/pkgs/json_rpc_2/test/server/invalid_request_test.dart index feeefea..74cf86f 100644 --- a/pkgs/json_rpc_2/test/server/invalid_request_test.dart +++ b/pkgs/json_rpc_2/test/server/invalid_request_test.dart
@@ -13,23 +13,23 @@ import 'utils.dart'; void main() { - var server; - setUp(() => server = new json_rpc.Server()); + var controller; + setUp(() => controller = new ServerController()); test("a non-Array/Object request is invalid", () { - expectErrorResponse(server, 'foo', error_code.INVALID_REQUEST, + expectErrorResponse(controller, 'foo', error_code.INVALID_REQUEST, 'Request must be an Array or an Object.'); }); test("requests must have a jsonrpc key", () { - expectErrorResponse(server, { + expectErrorResponse(controller, { 'method': 'foo', 'id': 1234 }, error_code.INVALID_REQUEST, 'Request must contain a "jsonrpc" key.'); }); test("the jsonrpc version must be 2.0", () { - expectErrorResponse(server, { + expectErrorResponse(controller, { 'jsonrpc': '1.0', 'method': 'foo', 'id': 1234 @@ -38,14 +38,14 @@ }); test("requests must have a method key", () { - expectErrorResponse(server, { + expectErrorResponse(controller, { 'jsonrpc': '2.0', 'id': 1234 }, error_code.INVALID_REQUEST, 'Request must contain a "method" key.'); }); test("request method must be a string", () { - expectErrorResponse(server, { + expectErrorResponse(controller, { 'jsonrpc': '2.0', 'method': 1234, 'id': 1234 @@ -54,7 +54,7 @@ }); test("request params must be an Array or Object", () { - expectErrorResponse(server, { + expectErrorResponse(controller, { 'jsonrpc': '2.0', 'method': 'foo', 'params': 1234, @@ -64,7 +64,7 @@ }); test("request id may not be an Array or Object", () { - expect(server.handleRequest({ + expect(controller.handleRequest({ 'jsonrpc': '2.0', 'method': 'foo', 'id': {'bad': 'id'}
diff --git a/pkgs/json_rpc_2/test/server/server_test.dart b/pkgs/json_rpc_2/test/server/server_test.dart index c18a8ca..a89a0d9 100644 --- a/pkgs/json_rpc_2/test/server/server_test.dart +++ b/pkgs/json_rpc_2/test/server/server_test.dart
@@ -13,15 +13,15 @@ import 'utils.dart'; void main() { - var server; - setUp(() => server = new json_rpc.Server()); + var controller; + setUp(() => controller = new ServerController()); test("calls a registered method with the given name", () { - server.registerMethod('foo', (params) { + controller.server.registerMethod('foo', (params) { return {'params': params.value}; }); - expect(server.handleRequest({ + expect(controller.handleRequest({ 'jsonrpc': '2.0', 'method': 'foo', 'params': {'param': 'value'}, @@ -34,9 +34,9 @@ }); test("calls a method that takes no parameters", () { - server.registerMethod('foo', () => 'foo'); + controller.server.registerMethod('foo', () => 'foo'); - expect(server.handleRequest({ + expect(controller.handleRequest({ 'jsonrpc': '2.0', 'method': 'foo', 'id': 1234 @@ -48,9 +48,9 @@ }); test("a method that takes no parameters rejects parameters", () { - server.registerMethod('foo', () => 'foo'); + controller.server.registerMethod('foo', () => 'foo'); - expectErrorResponse(server, { + expectErrorResponse(controller, { 'jsonrpc': '2.0', 'method': 'foo', 'params': {}, @@ -61,9 +61,9 @@ }); test("an unexpected error in a method is captured", () { - server.registerMethod('foo', () => throw new FormatException('bad format')); + controller.server.registerMethod('foo', () => throw new FormatException('bad format')); - expect(server.handleRequest({ + expect(controller.handleRequest({ 'jsonrpc': '2.0', 'method': 'foo', 'id': 1234 @@ -83,9 +83,9 @@ }); test("doesn't return a result for a notification", () { - server.registerMethod('foo', (args) => 'result'); + controller.server.registerMethod('foo', (args) => 'result'); - expect(server.handleRequest({ + expect(controller.handleRequest({ 'jsonrpc': '2.0', 'method': 'foo', 'params': {} @@ -93,11 +93,11 @@ }); test("includes the error data in the response", () { - server.registerMethod('foo', (params) { + controller.server.registerMethod('foo', (params) { throw new json_rpc.RpcException(5, 'Error message.', data: 'data value'); }); - expectErrorResponse(server, { + expectErrorResponse(controller, { 'jsonrpc': '2.0', 'method': 'foo', 'params': {}, @@ -108,58 +108,28 @@ data: 'data value'); }); - group("JSON", () { - test("handles a request parsed from JSON", () { - server.registerMethod('foo', (params) { - return {'params': params.value}; - }); - - expect(server.parseRequest(JSON.encode({ + test("a JSON parse error is rejected", () { + return controller.handleJsonRequest('invalid json {').then((result) { + expect(JSON.decode(result), { 'jsonrpc': '2.0', - 'method': 'foo', - 'params': {'param': 'value'}, - 'id': 1234 - })), completion(equals(JSON.encode({ - 'jsonrpc': '2.0', - 'result': {'params': {'param': 'value'}}, - 'id': 1234 - })))); - }); - - test("handles a notification parsed from JSON", () { - server.registerMethod('foo', (params) { - return {'params': params}; - }); - - expect(server.parseRequest(JSON.encode({ - 'jsonrpc': '2.0', - 'method': 'foo', - 'params': {'param': 'value'} - })), completion(isNull)); - }); - - test("a JSON parse error is rejected", () { - return server.parseRequest('invalid json {').then((result) { - expect(JSON.decode(result), { - 'jsonrpc': '2.0', - 'error': { - 'code': error_code.PARSE_ERROR, - 'message': startsWith("Invalid JSON: "), - 'data': {'request': 'invalid json {'} - }, - 'id': null - }); + 'error': { + 'code': error_code.PARSE_ERROR, + 'message': startsWith("Invalid JSON: "), + 'data': {'request': 'invalid json {'} + }, + 'id': null }); }); }); group("fallbacks", () { test("calls a fallback if no method matches", () { - server.registerMethod('foo', () => 'foo'); - server.registerMethod('bar', () => 'foo'); - server.registerFallback((params) => {'fallback': params.value}); + controller.server + ..registerMethod('foo', () => 'foo') + ..registerMethod('bar', () => 'foo') + ..registerFallback((params) => {'fallback': params.value}); - expect(server.handleRequest({ + expect(controller.handleRequest({ 'jsonrpc': '2.0', 'method': 'baz', 'params': {'param': 'value'}, @@ -172,13 +142,13 @@ }); test("calls the first matching fallback", () { - server.registerFallback((params) => - throw new json_rpc.RpcException.methodNotFound(params.method)); + controller.server + ..registerFallback((params) => + throw new json_rpc.RpcException.methodNotFound(params.method)) + ..registerFallback((params) => 'fallback 2') + ..registerFallback((params) => 'fallback 3'); - server.registerFallback((params) => 'fallback 2'); - server.registerFallback((params) => 'fallback 3'); - - expect(server.handleRequest({ + expect(controller.handleRequest({ 'jsonrpc': '2.0', 'method': 'fallback 2', 'id': 1234 @@ -190,9 +160,10 @@ }); test("an unexpected error in a fallback is captured", () { - server.registerFallback((_) => throw new FormatException('bad format')); + controller.server.registerFallback((_) => + throw new FormatException('bad format')); - expect(server.handleRequest({ + expect(controller.handleRequest({ 'jsonrpc': '2.0', 'method': 'foo', 'id': 1234 @@ -213,7 +184,8 @@ }); test("disallows multiple methods with the same name", () { - server.registerMethod('foo', () => null); - expect(() => server.registerMethod('foo', () => null), throwsArgumentError); + controller.server.registerMethod('foo', () => null); + expect(() => controller.server.registerMethod('foo', () => null), + throwsArgumentError); }); }
diff --git a/pkgs/json_rpc_2/test/server/stream_test.dart b/pkgs/json_rpc_2/test/server/stream_test.dart new file mode 100644 index 0000000..5459e3e --- /dev/null +++ b/pkgs/json_rpc_2/test/server/stream_test.dart
@@ -0,0 +1,98 @@ +// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +library json_rpc_2.test.server.stream_test; + +import 'dart:async'; + +import 'package:unittest/unittest.dart'; +import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc; + +import 'utils.dart'; + +void main() { + test(".withoutJson supports decoded stream and sink", () { + var requestController = new StreamController(); + var responseController = new StreamController(); + var server = new json_rpc.Server.withoutJson( + requestController.stream, responseController.sink); + server.listen(); + + server.registerMethod('foo', (params) { + return {'params': params.value}; + }); + + requestController.add({ + 'jsonrpc': '2.0', + 'method': 'foo', + 'params': {'param': 'value'}, + 'id': 1234 + }); + + expect(responseController.stream.first, completion(equals({ + 'jsonrpc': '2.0', + 'result': {'params': {'param': 'value'}}, + 'id': 1234 + }))); + }); + + test(".listen returns when the controller is closed", () { + var requestController = new StreamController(); + var responseController = new StreamController(); + var server = new json_rpc.Server( + requestController.stream, responseController.sink); + + var hasListenCompeted = false; + expect(server.listen().then((_) => hasListenCompeted = true), completes); + + return pumpEventQueue().then((_) { + expect(hasListenCompeted, isFalse); + + // This should cause listen to complete. + return requestController.close(); + }); + }); + + test(".listen returns a stream error", () { + var requestController = new StreamController(); + var responseController = new StreamController(); + var server = new json_rpc.Server( + requestController.stream, responseController.sink); + + expect(server.listen(), throwsA('oh no')); + requestController.addError('oh no'); + }); + + test(".listen can't be called twice", () { + var requestController = new StreamController(); + var responseController = new StreamController(); + var server = new json_rpc.Server( + requestController.stream, responseController.sink); + server.listen(); + + expect(() => server.listen(), throwsStateError); + }); + + test(".close cancels the stream subscription and closes the sink", () { + var requestController = new StreamController(); + var responseController = new StreamController(); + var server = new json_rpc.Server( + requestController.stream, responseController.sink); + + expect(server.listen(), completes); + expect(server.close(), completes); + + expect(() => requestController.stream.listen((_) {}), throwsStateError); + expect(responseController.isClosed, isTrue); + }); + + test(".close can't be called before .listen", () { + var requestController = new StreamController(); + var responseController = new StreamController(); + var server = new json_rpc.Server( + requestController.stream, responseController.sink); + + expect(() => server.close(), throwsStateError); + }); +}
diff --git a/pkgs/json_rpc_2/test/server/utils.dart b/pkgs/json_rpc_2/test/server/utils.dart index 6f92c0a..a4fd36a 100644 --- a/pkgs/json_rpc_2/test/server/utils.dart +++ b/pkgs/json_rpc_2/test/server/utils.dart
@@ -4,16 +4,53 @@ library json_rpc_2.test.server.util; -import 'package:unittest/unittest.dart'; -import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc; +import 'dart:async'; +import 'dart:convert'; -void expectErrorResponse(json_rpc.Server server, request, int errorCode, +import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc; +import 'package:json_rpc_2/error_code.dart' as error_code; +import 'package:unittest/unittest.dart'; + +/// A controller used to test a [json_rpc.Server]. +class ServerController { + /// The controller for the server's request stream. + final _requestController = new StreamController<String>(); + + /// The controller for the server's response sink. + final _responseController = new StreamController<String>(); + + /// The server. + json_rpc.Server get server => _server; + json_rpc.Server _server; + + ServerController() { + _server = new json_rpc.Server( + _requestController.stream, _responseController.sink); + _server.listen(); + } + + /// Passes [request], a decoded request, to [server] and returns its decoded + /// response. + Future handleRequest(request) => + handleJsonRequest(JSON.encode(request)).then(JSON.decode); + + /// Passes [request], a JSON-encoded request, to [server] and returns its + /// encoded response. + Future handleJsonRequest(String request) { + _requestController.add(request); + return _responseController.stream.first; + } +} + +/// Expects that [controller]'s server will return an error response to +/// [request] with the given [errorCode], [message], and [data]. +void expectErrorResponse(ServerController controller, request, int errorCode, String message, {data}) { var id; if (request is Map) id = request['id']; if (data == null) data = {'request': request}; - expect(server.handleRequest(request), completion(equals({ + expect(controller.handleRequest(request), completion(equals({ 'jsonrpc': '2.0', 'id': id, 'error': { @@ -24,10 +61,25 @@ }))); } +/// Returns a matcher that matches a [json_rpc.RpcException] with an +/// `invalid_params` error code. Matcher throwsInvalidParams(String message) { return throwsA(predicate((error) { expect(error, new isInstanceOf<json_rpc.RpcException>()); + expect(error.code, equals(error_code.INVALID_PARAMS)); expect(error.message, equals(message)); return true; })); } + +/// Returns a [Future] that completes after pumping the event queue [times] +/// times. By default, this should pump the event queue enough times to allow +/// any code to run, as long as it's not waiting on some external event. +Future pumpEventQueue([int times = 20]) { + if (times == 0) return new Future.value(); + // We use a delayed future to allow microtask events to finish. The + // Future.value or Future() constructors use scheduleMicrotask themselves and + // would therefore not wait for microtask callbacks that are scheduled after + // invoking this method. + return new Future.delayed(Duration.ZERO, () => pumpEventQueue(times - 1)); +}