Convert json_rpc.Server to take a Stream and StreamSink.
R=rnystrom@google.com
Review URL: https://codereview.chromium.org//309503005
git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart/pkg/json_rpc_2@37012 260f80e4-7a28-3924-810f-c04153c831b5
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ba15fb3..34123f9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,15 @@
+## 0.1.0
+
+* Remove `Server.handleRequest()` and `Server.parseRequest()`. Instead, `new
+ Server()` takes a `Stream` and a `StreamSink` and uses those behind-the-scenes
+ for its communication.
+
+* Add `Server.listen()`, which causes the server to begin listening to the
+ underlying request stream.
+
+* Add `Server.close()`, which closes the underlying request stream and response
+ sink.
+
## 0.0.2+3
* Widen the version constraint for `stack_trace`.
diff --git a/lib/src/server.dart b/lib/src/server.dart
index d05c54f..c7ece5b 100644
--- a/lib/src/server.dart
+++ b/lib/src/server.dart
@@ -26,6 +26,20 @@
/// asynchronously, it's possible for multiple methods to be invoked at the same
/// time, or even for a single method to be invoked multiple times at once.
class Server {
+ /// The stream for decoded requests.
+ final Stream _requests;
+
+ /// The subscription to the decoded request stream.
+ StreamSubscription _requestSubscription;
+
+ /// The sink for decoded responses.
+ final StreamSink _responses;
+
+ /// The completer for [listen].
+ ///
+ /// This is non-`null` after [listen] has been called.
+ Completer _listenCompleter;
+
/// The methods registered for this server.
final _methods = new Map<String, Function>();
@@ -35,7 +49,104 @@
/// [RpcException.methodNotFound] exception.
final _fallbacks = new Queue<Function>();
- Server();
+ /// Creates a [Server] that reads requests from [requests] and writes
+ /// responses to [responses].
+ ///
+ /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a
+ /// `WebSocket`), [responses] may be omitted.
+ ///
+ /// Note that the server won't begin listening to [requests] until
+ /// [Server.listen] is called.
+ factory Server(Stream<String> requests, [StreamSink<String> responses]) {
+ if (responses == null) {
+ if (requests is! StreamSink) {
+ throw new ArgumentError("Either `requests` must be a StreamSink or "
+ "`responses` must be passed.");
+ }
+ responses = requests as StreamSink;
+ }
+
+ var wrappedResponses = mapStreamSink(responses, JSON.encode);
+ return new Server.withoutJson(requests.expand((request) {
+ var decodedRequest;
+ try {
+ decodedRequest = JSON.decode(request);
+ } on FormatException catch (error) {
+ wrappedResponses.add(new RpcException(error_code.PARSE_ERROR,
+ 'Invalid JSON: ${error.message}').serialize(request));
+ return [];
+ }
+
+ return [decodedRequest];
+ }), wrappedResponses);
+ }
+
+ /// Creates a [Server] that reads decoded requests from [requests] and writes
+ /// decoded responses to [responses].
+ ///
+ /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it
+ /// reads and writes decoded maps or lists.
+ ///
+ /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be
+ /// omitted.
+ ///
+ /// Note that the server won't begin listening to [requests] until
+ /// [Server.listen] is called.
+ Server.withoutJson(Stream requests, [StreamSink responses])
+ : _requests = requests,
+ _responses = responses == null && requests is StreamSink ?
+ requests : responses {
+ if (_responses == null) {
+ throw new ArgumentError("Either `requests` must be a StreamSink or "
+ "`responses` must be passed.");
+ }
+ }
+
+ /// Starts listening to the underlying stream.
+ ///
+ /// Returns a [Future] that will complete when the stream is closed or when it
+ /// has an error.
+ ///
+ /// [listen] may only be called once.
+ Future listen() {
+ if (_listenCompleter != null) {
+ throw new StateError(
+ "Can only call Server.listen once on a given server.");
+ }
+
+ _listenCompleter = new Completer();
+ _requestSubscription = _requests.listen(_handleRequest,
+ onError: (error, stackTrace) {
+ if (_listenCompleter.isCompleted) return;
+ _responses.close();
+ _listenCompleter.completeError(error, stackTrace);
+ }, onDone: () {
+ if (_listenCompleter.isCompleted) return;
+ _responses.close();
+ _listenCompleter.complete();
+ }, cancelOnError: true);
+
+ return _listenCompleter.future;
+ }
+
+ /// Closes the server's request subscription and response sink.
+ ///
+ /// Returns a [Future] that completes when all resources have been released.
+ ///
+ /// A server can't be closed before [listen] has been called.
+ Future close() {
+ if (_listenCompleter == null) {
+ throw new StateError("Can't call Server.close before Server.listen.");
+ }
+
+ if (!_listenCompleter.isCompleted) _listenCompleter.complete();
+
+ var subscriptionFuture = _requestSubscription.cancel();
+ // TODO(nweiz): include the response future in the return value when issue
+ // 19095 is fixed.
+ _responses.close();
+ return subscriptionFuture == null ? new Future.value() : subscriptionFuture;
+ }
/// Registers a method named [name] on this server.
///
@@ -69,14 +180,14 @@
_fallbacks.add(callback);
}
- /// Handle a request that's already been parsed from JSON.
+ /// Handle a request.
///
/// [request] is expected to be a JSON-serializable object representing a
/// request sent by a client. This calls the appropriate method or methods for
/// handling that request and returns a JSON-serializable response, or `null`
/// if no response should be sent. [callback] may send custom
/// errors by throwing an [RpcException].
- Future handleRequest(request) {
+ Future _handleRequest(request) {
return syncFuture(() {
if (request is! List) return _handleSingleRequest(request);
if (request.isEmpty) {
@@ -88,28 +199,7 @@
var nonNull = results.where((result) => result != null);
return nonNull.isEmpty ? null : nonNull.toList();
});
- });
- }
-
- /// Parses and handles a JSON serialized request.
- ///
- /// This calls the appropriate method or methods for handling that request and
- /// returns a JSON string, or `null` if no response should be sent.
- Future<String> parseRequest(String request) {
- return syncFuture(() {
- var decodedRequest;
- try {
- decodedRequest = JSON.decode(request);
- } on FormatException catch (error) {
- return new RpcException(error_code.PARSE_ERROR, 'Invalid JSON: '
- '${error.message}').serialize(request);
- }
-
- return handleRequest(decodedRequest);
- }).then((response) {
- if (response == null) return null;
- return JSON.encode(response);
- });
+ }).then(_responses.add);
}
/// Handles an individual parsed request.
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index 1eff004..a212f58 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -43,3 +43,24 @@
/// [toString], so we remove that if it exists.
String getErrorMessage(error) =>
error.toString().replaceFirst(_exceptionPrefix, '');
+
+/// Returns a [StreamSink] that wraps [sink] and maps each event added using
+/// [callback].
+StreamSink mapStreamSink(StreamSink sink, callback(event)) =>
+ new _MappedStreamSink(sink, callback);
+
+/// A [StreamSink] wrapper that maps each event added to the sink.
+class _MappedStreamSink implements StreamSink {
+ final StreamSink _inner;
+ final Function _callback;
+
+ Future get done => _inner.done;
+
+ _MappedStreamSink(this._inner, this._callback);
+
+ void add(event) => _inner.add(_callback(event));
+ void addError(error, [StackTrace stackTrace]) =>
+ _inner.addError(error, stackTrace);
+ Future addStream(Stream stream) => _inner.addStream(stream.map(_callback));
+ Future close() => _inner.close();
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index 2cf1bc8..df1a8a0 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: json_rpc_2
-version: 0.0.2+3
+version: 0.1.0
author: Dart Team <misc@dartlang.org>
description: An implementation of the JSON-RPC 2.0 spec.
homepage: http://www.dartlang.org
@@ -7,7 +7,7 @@
dependencies:
stack_trace: '>=0.9.1 <2.0.0'
dev_dependencies:
- unittest: ">=0.9.0 <0.10.0"
+ unittest: ">=0.9.0 <0.12.0"
environment:
sdk: ">=1.2.0 <2.0.0"
diff --git a/test/server/batch_test.dart b/test/server/batch_test.dart
index 441df58..7dda84f 100644
--- a/test/server/batch_test.dart
+++ b/test/server/batch_test.dart
@@ -13,16 +13,17 @@
import 'utils.dart';
void main() {
- var server;
+ var controller;
setUp(() {
- server = new json_rpc.Server()
+ controller = new ServerController();
+ controller.server
..registerMethod('foo', () => 'foo')
..registerMethod('id', (params) => params.value)
..registerMethod('arg', (params) => params['arg'].value);
});
test('handles a batch of requests', () {
- expect(server.handleRequest([
+ expect(controller.handleRequest([
{'jsonrpc': '2.0', 'method': 'foo', 'id': 1},
{'jsonrpc': '2.0', 'method': 'id', 'params': ['value'], 'id': 2},
{'jsonrpc': '2.0', 'method': 'arg', 'params': {'arg': 'value'}, 'id': 3}
@@ -34,7 +35,7 @@
});
test('handles errors individually', () {
- expect(server.handleRequest([
+ expect(controller.handleRequest([
{'jsonrpc': '2.0', 'method': 'foo', 'id': 1},
{'jsonrpc': '2.0', 'method': 'zap', 'id': 2},
{'jsonrpc': '2.0', 'method': 'arg', 'params': {'arg': 'value'}, 'id': 3}
@@ -54,7 +55,7 @@
});
test('handles notifications individually', () {
- expect(server.handleRequest([
+ expect(controller.handleRequest([
{'jsonrpc': '2.0', 'method': 'foo', 'id': 1},
{'jsonrpc': '2.0', 'method': 'id', 'params': ['value']},
{'jsonrpc': '2.0', 'method': 'arg', 'params': {'arg': 'value'}, 'id': 3}
@@ -65,7 +66,7 @@
});
test('returns nothing if every request is a notification', () {
- expect(server.handleRequest([
+ expect(controller.handleRequest([
{'jsonrpc': '2.0', 'method': 'foo'},
{'jsonrpc': '2.0', 'method': 'id', 'params': ['value']},
{'jsonrpc': '2.0', 'method': 'arg', 'params': {'arg': 'value'}}
@@ -73,24 +74,12 @@
});
test('returns an error if the batch is empty', () {
- expectErrorResponse(server, [], error_code.INVALID_REQUEST,
+ expectErrorResponse(controller, [], error_code.INVALID_REQUEST,
'A batch must contain at least one request.');
});
- test('handles a batch of requests parsed from JSON', () {
- expect(server.parseRequest(JSON.encode([
- {'jsonrpc': '2.0', 'method': 'foo', 'id': 1},
- {'jsonrpc': '2.0', 'method': 'id', 'params': ['value'], 'id': 2},
- {'jsonrpc': '2.0', 'method': 'arg', 'params': {'arg': 'value'}, 'id': 3}
- ])), completion(equals(JSON.encode([
- {'jsonrpc': '2.0', 'result': 'foo', 'id': 1},
- {'jsonrpc': '2.0', 'result': ['value'], 'id': 2},
- {'jsonrpc': '2.0', 'result': 'value', 'id': 3}
- ]))));
- });
-
test('disallows nested batches', () {
- expect(server.handleRequest([
+ expect(controller.handleRequest([
[{'jsonrpc': '2.0', 'method': 'foo', 'id': 1}]
]), completion(equals([{
'jsonrpc': '2.0',
diff --git a/test/server/invalid_request_test.dart b/test/server/invalid_request_test.dart
index feeefea..74cf86f 100644
--- a/test/server/invalid_request_test.dart
+++ b/test/server/invalid_request_test.dart
@@ -13,23 +13,23 @@
import 'utils.dart';
void main() {
- var server;
- setUp(() => server = new json_rpc.Server());
+ var controller;
+ setUp(() => controller = new ServerController());
test("a non-Array/Object request is invalid", () {
- expectErrorResponse(server, 'foo', error_code.INVALID_REQUEST,
+ expectErrorResponse(controller, 'foo', error_code.INVALID_REQUEST,
'Request must be an Array or an Object.');
});
test("requests must have a jsonrpc key", () {
- expectErrorResponse(server, {
+ expectErrorResponse(controller, {
'method': 'foo',
'id': 1234
}, error_code.INVALID_REQUEST, 'Request must contain a "jsonrpc" key.');
});
test("the jsonrpc version must be 2.0", () {
- expectErrorResponse(server, {
+ expectErrorResponse(controller, {
'jsonrpc': '1.0',
'method': 'foo',
'id': 1234
@@ -38,14 +38,14 @@
});
test("requests must have a method key", () {
- expectErrorResponse(server, {
+ expectErrorResponse(controller, {
'jsonrpc': '2.0',
'id': 1234
}, error_code.INVALID_REQUEST, 'Request must contain a "method" key.');
});
test("request method must be a string", () {
- expectErrorResponse(server, {
+ expectErrorResponse(controller, {
'jsonrpc': '2.0',
'method': 1234,
'id': 1234
@@ -54,7 +54,7 @@
});
test("request params must be an Array or Object", () {
- expectErrorResponse(server, {
+ expectErrorResponse(controller, {
'jsonrpc': '2.0',
'method': 'foo',
'params': 1234,
@@ -64,7 +64,7 @@
});
test("request id may not be an Array or Object", () {
- expect(server.handleRequest({
+ expect(controller.handleRequest({
'jsonrpc': '2.0',
'method': 'foo',
'id': {'bad': 'id'}
diff --git a/test/server/server_test.dart b/test/server/server_test.dart
index c18a8ca..a89a0d9 100644
--- a/test/server/server_test.dart
+++ b/test/server/server_test.dart
@@ -13,15 +13,15 @@
import 'utils.dart';
void main() {
- var server;
- setUp(() => server = new json_rpc.Server());
+ var controller;
+ setUp(() => controller = new ServerController());
test("calls a registered method with the given name", () {
- server.registerMethod('foo', (params) {
+ controller.server.registerMethod('foo', (params) {
return {'params': params.value};
});
- expect(server.handleRequest({
+ expect(controller.handleRequest({
'jsonrpc': '2.0',
'method': 'foo',
'params': {'param': 'value'},
@@ -34,9 +34,9 @@
});
test("calls a method that takes no parameters", () {
- server.registerMethod('foo', () => 'foo');
+ controller.server.registerMethod('foo', () => 'foo');
- expect(server.handleRequest({
+ expect(controller.handleRequest({
'jsonrpc': '2.0',
'method': 'foo',
'id': 1234
@@ -48,9 +48,9 @@
});
test("a method that takes no parameters rejects parameters", () {
- server.registerMethod('foo', () => 'foo');
+ controller.server.registerMethod('foo', () => 'foo');
- expectErrorResponse(server, {
+ expectErrorResponse(controller, {
'jsonrpc': '2.0',
'method': 'foo',
'params': {},
@@ -61,9 +61,9 @@
});
test("an unexpected error in a method is captured", () {
- server.registerMethod('foo', () => throw new FormatException('bad format'));
+ controller.server.registerMethod('foo', () => throw new FormatException('bad format'));
- expect(server.handleRequest({
+ expect(controller.handleRequest({
'jsonrpc': '2.0',
'method': 'foo',
'id': 1234
@@ -83,9 +83,9 @@
});
test("doesn't return a result for a notification", () {
- server.registerMethod('foo', (args) => 'result');
+ controller.server.registerMethod('foo', (args) => 'result');
- expect(server.handleRequest({
+ expect(controller.handleRequest({
'jsonrpc': '2.0',
'method': 'foo',
'params': {}
@@ -93,11 +93,11 @@
});
test("includes the error data in the response", () {
- server.registerMethod('foo', (params) {
+ controller.server.registerMethod('foo', (params) {
throw new json_rpc.RpcException(5, 'Error message.', data: 'data value');
});
- expectErrorResponse(server, {
+ expectErrorResponse(controller, {
'jsonrpc': '2.0',
'method': 'foo',
'params': {},
@@ -108,58 +108,28 @@
data: 'data value');
});
- group("JSON", () {
- test("handles a request parsed from JSON", () {
- server.registerMethod('foo', (params) {
- return {'params': params.value};
- });
-
- expect(server.parseRequest(JSON.encode({
+ test("a JSON parse error is rejected", () {
+ return controller.handleJsonRequest('invalid json {').then((result) {
+ expect(JSON.decode(result), {
'jsonrpc': '2.0',
- 'method': 'foo',
- 'params': {'param': 'value'},
- 'id': 1234
- })), completion(equals(JSON.encode({
- 'jsonrpc': '2.0',
- 'result': {'params': {'param': 'value'}},
- 'id': 1234
- }))));
- });
-
- test("handles a notification parsed from JSON", () {
- server.registerMethod('foo', (params) {
- return {'params': params};
- });
-
- expect(server.parseRequest(JSON.encode({
- 'jsonrpc': '2.0',
- 'method': 'foo',
- 'params': {'param': 'value'}
- })), completion(isNull));
- });
-
- test("a JSON parse error is rejected", () {
- return server.parseRequest('invalid json {').then((result) {
- expect(JSON.decode(result), {
- 'jsonrpc': '2.0',
- 'error': {
- 'code': error_code.PARSE_ERROR,
- 'message': startsWith("Invalid JSON: "),
- 'data': {'request': 'invalid json {'}
- },
- 'id': null
- });
+ 'error': {
+ 'code': error_code.PARSE_ERROR,
+ 'message': startsWith("Invalid JSON: "),
+ 'data': {'request': 'invalid json {'}
+ },
+ 'id': null
});
});
});
group("fallbacks", () {
test("calls a fallback if no method matches", () {
- server.registerMethod('foo', () => 'foo');
- server.registerMethod('bar', () => 'foo');
- server.registerFallback((params) => {'fallback': params.value});
+ controller.server
+ ..registerMethod('foo', () => 'foo')
+ ..registerMethod('bar', () => 'foo')
+ ..registerFallback((params) => {'fallback': params.value});
- expect(server.handleRequest({
+ expect(controller.handleRequest({
'jsonrpc': '2.0',
'method': 'baz',
'params': {'param': 'value'},
@@ -172,13 +142,13 @@
});
test("calls the first matching fallback", () {
- server.registerFallback((params) =>
- throw new json_rpc.RpcException.methodNotFound(params.method));
+ controller.server
+ ..registerFallback((params) =>
+ throw new json_rpc.RpcException.methodNotFound(params.method))
+ ..registerFallback((params) => 'fallback 2')
+ ..registerFallback((params) => 'fallback 3');
- server.registerFallback((params) => 'fallback 2');
- server.registerFallback((params) => 'fallback 3');
-
- expect(server.handleRequest({
+ expect(controller.handleRequest({
'jsonrpc': '2.0',
'method': 'fallback 2',
'id': 1234
@@ -190,9 +160,10 @@
});
test("an unexpected error in a fallback is captured", () {
- server.registerFallback((_) => throw new FormatException('bad format'));
+ controller.server.registerFallback((_) =>
+ throw new FormatException('bad format'));
- expect(server.handleRequest({
+ expect(controller.handleRequest({
'jsonrpc': '2.0',
'method': 'foo',
'id': 1234
@@ -213,7 +184,8 @@
});
test("disallows multiple methods with the same name", () {
- server.registerMethod('foo', () => null);
- expect(() => server.registerMethod('foo', () => null), throwsArgumentError);
+ controller.server.registerMethod('foo', () => null);
+ expect(() => controller.server.registerMethod('foo', () => null),
+ throwsArgumentError);
});
}
diff --git a/test/server/stream_test.dart b/test/server/stream_test.dart
new file mode 100644
index 0000000..5459e3e
--- /dev/null
+++ b/test/server/stream_test.dart
@@ -0,0 +1,98 @@
+// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library json_rpc_2.test.server.stream_test;
+
+import 'dart:async';
+
+import 'package:unittest/unittest.dart';
+import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
+
+import 'utils.dart';
+
+void main() {
+ test(".withoutJson supports decoded stream and sink", () {
+ var requestController = new StreamController();
+ var responseController = new StreamController();
+ var server = new json_rpc.Server.withoutJson(
+ requestController.stream, responseController.sink);
+ server.listen();
+
+ server.registerMethod('foo', (params) {
+ return {'params': params.value};
+ });
+
+ requestController.add({
+ 'jsonrpc': '2.0',
+ 'method': 'foo',
+ 'params': {'param': 'value'},
+ 'id': 1234
+ });
+
+ expect(responseController.stream.first, completion(equals({
+ 'jsonrpc': '2.0',
+ 'result': {'params': {'param': 'value'}},
+ 'id': 1234
+ })));
+ });
+
+ test(".listen returns when the controller is closed", () {
+ var requestController = new StreamController();
+ var responseController = new StreamController();
+ var server = new json_rpc.Server(
+ requestController.stream, responseController.sink);
+
+ var hasListenCompeted = false;
+ expect(server.listen().then((_) => hasListenCompeted = true), completes);
+
+ return pumpEventQueue().then((_) {
+ expect(hasListenCompeted, isFalse);
+
+ // This should cause listen to complete.
+ return requestController.close();
+ });
+ });
+
+ test(".listen returns a stream error", () {
+ var requestController = new StreamController();
+ var responseController = new StreamController();
+ var server = new json_rpc.Server(
+ requestController.stream, responseController.sink);
+
+ expect(server.listen(), throwsA('oh no'));
+ requestController.addError('oh no');
+ });
+
+ test(".listen can't be called twice", () {
+ var requestController = new StreamController();
+ var responseController = new StreamController();
+ var server = new json_rpc.Server(
+ requestController.stream, responseController.sink);
+ server.listen();
+
+ expect(() => server.listen(), throwsStateError);
+ });
+
+ test(".close cancels the stream subscription and closes the sink", () {
+ var requestController = new StreamController();
+ var responseController = new StreamController();
+ var server = new json_rpc.Server(
+ requestController.stream, responseController.sink);
+
+ expect(server.listen(), completes);
+ expect(server.close(), completes);
+
+ expect(() => requestController.stream.listen((_) {}), throwsStateError);
+ expect(responseController.isClosed, isTrue);
+ });
+
+ test(".close can't be called before .listen", () {
+ var requestController = new StreamController();
+ var responseController = new StreamController();
+ var server = new json_rpc.Server(
+ requestController.stream, responseController.sink);
+
+ expect(() => server.close(), throwsStateError);
+ });
+}
diff --git a/test/server/utils.dart b/test/server/utils.dart
index 6f92c0a..a4fd36a 100644
--- a/test/server/utils.dart
+++ b/test/server/utils.dart
@@ -4,16 +4,53 @@
library json_rpc_2.test.server.util;
-import 'package:unittest/unittest.dart';
-import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
+import 'dart:async';
+import 'dart:convert';
-void expectErrorResponse(json_rpc.Server server, request, int errorCode,
+import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
+import 'package:json_rpc_2/error_code.dart' as error_code;
+import 'package:unittest/unittest.dart';
+
+/// A controller used to test a [json_rpc.Server].
+class ServerController {
+ /// The controller for the server's request stream.
+ final _requestController = new StreamController<String>();
+
+ /// The controller for the server's response sink.
+ final _responseController = new StreamController<String>();
+
+ /// The server.
+ json_rpc.Server get server => _server;
+ json_rpc.Server _server;
+
+ ServerController() {
+ _server = new json_rpc.Server(
+ _requestController.stream, _responseController.sink);
+ _server.listen();
+ }
+
+ /// Passes [request], a decoded request, to [server] and returns its decoded
+ /// response.
+ Future handleRequest(request) =>
+ handleJsonRequest(JSON.encode(request)).then(JSON.decode);
+
+ /// Passes [request], a JSON-encoded request, to [server] and returns its
+ /// encoded response.
+ Future handleJsonRequest(String request) {
+ _requestController.add(request);
+ return _responseController.stream.first;
+ }
+}
+
+/// Expects that [controller]'s server will return an error response to
+/// [request] with the given [errorCode], [message], and [data].
+void expectErrorResponse(ServerController controller, request, int errorCode,
String message, {data}) {
var id;
if (request is Map) id = request['id'];
if (data == null) data = {'request': request};
- expect(server.handleRequest(request), completion(equals({
+ expect(controller.handleRequest(request), completion(equals({
'jsonrpc': '2.0',
'id': id,
'error': {
@@ -24,10 +61,25 @@
})));
}
+/// Returns a matcher that matches a [json_rpc.RpcException] with an
+/// `invalid_params` error code.
Matcher throwsInvalidParams(String message) {
return throwsA(predicate((error) {
expect(error, new isInstanceOf<json_rpc.RpcException>());
+ expect(error.code, equals(error_code.INVALID_PARAMS));
expect(error.message, equals(message));
return true;
}));
}
+
+/// Returns a [Future] that completes after pumping the event queue [times]
+/// times. By default, this should pump the event queue enough times to allow
+/// any code to run, as long as it's not waiting on some external event.
+Future pumpEventQueue([int times = 20]) {
+ if (times == 0) return new Future.value();
+ // We use a delayed future to allow microtask events to finish. The
+ // Future.value or Future() constructors use scheduleMicrotask themselves and
+ // would therefore not wait for microtask callbacks that are scheduled after
+ // invoking this method.
+ return new Future.delayed(Duration.ZERO, () => pumpEventQueue(times - 1));
+}