Add a Client class to json_rpc_2.
R=rnystrom@google.com
Review URL: https://codereview.chromium.org//691053006
git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart/pkg/json_rpc_2@41535 260f80e4-7a28-3924-810f-c04153c831b5
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 34123f9..15b5a30 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 1.0.0
+
+* Add a `Client` class for communicating with external JSON-RPC 2.0 servers.
+
## 0.1.0
* Remove `Server.handleRequest()` and `Server.parseRequest()`. Instead, `new
diff --git a/lib/error_code.dart b/lib/error_code.dart
index 96cb909..14b77f2 100644
--- a/lib/error_code.dart
+++ b/lib/error_code.dart
@@ -34,3 +34,18 @@
/// The spec reserves the range from -32000 to -32099 for implementation-defined
/// server exceptions, but for now we only use one of those values.
const SERVER_ERROR = -32000;
+
+/// Returns a human-readable name for [errorCode] if it's one specified by the
+/// JSON-RPC 2.0 spec.
+///
+/// If [errorCode] isn't defined in the JSON-RPC 2.0 spec, returns null.
+String name(int errorCode) {
+ switch (errorCode) {
+ case PARSE_ERROR: return "parse error";
+ case INVALID_REQUEST: return "invalid request";
+ case METHOD_NOT_FOUND: return "method not found";
+ case INVALID_PARAMS: return "invalid parameters";
+ case INTERNAL_ERROR: return "internal error";
+ default: return null;
+ }
+}
diff --git a/lib/json_rpc_2.dart b/lib/json_rpc_2.dart
index 04e4a52..3305eae 100644
--- a/lib/json_rpc_2.dart
+++ b/lib/json_rpc_2.dart
@@ -4,6 +4,7 @@
library json_rpc_2;
+export 'src/client.dart';
export 'src/exception.dart';
export 'src/parameters.dart';
export 'src/server.dart';
diff --git a/lib/src/client.dart b/lib/src/client.dart
new file mode 100644
index 0000000..2e1d273
--- /dev/null
+++ b/lib/src/client.dart
@@ -0,0 +1,198 @@
+// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library json_rpc_2.client;
+
+import 'dart:async';
+
+import 'package:stack_trace/stack_trace.dart';
+
+import 'exception.dart';
+import 'two_way_stream.dart';
+import 'utils.dart';
+
+/// A JSON-RPC 2.0 client.
+///
+/// A client calls methods on a server and handles the server's responses to
+/// those method calls. Methods can be called with [sendRequest], or with
+/// [sendNotification] if no response is expected.
+class Client {
+ final TwoWayStream _streams;
+
+ /// The next request id.
+ var _id = 0;
+
+ /// The current batch of requests to be sent together.
+ ///
+ /// Each element is a JSON-serializable object.
+ List _batch;
+
+ /// The map of request ids for pending requests to [Completer]s that will be
+ /// completed with those requests' responses.
+ final _pendingRequests = new Map<int, Completer>();
+
+ /// Creates a [Client] that writes requests to [requests] and reads responses
+ /// from [responses].
+ ///
+ /// If [responses] is a [StreamSink] as well as a [Stream] (for example, a
+ /// `WebSocket`), [requests] may be omitted.
+ ///
+ /// Note that the client won't begin listening to [responses] until
+ /// [Client.listen] is called.
+ Client(Stream<String> responses, [StreamSink<String> requests])
+ : _streams = new TwoWayStream(
+ "Client", responses, "responses", requests, "requests");
+
+ /// Creates a [Client] that writes decoded responses to [responses] and reads
+ /// decoded requests from [requests].
+ ///
+ /// Unlike [new Client], this doesn't read or write JSON strings. Instead, it
+ /// reads and writes decoded maps or lists.
+ ///
+ /// If [responses] is a [StreamSink] as well as a [Stream], [requests] may be
+ /// omitted.
+ ///
+ /// Note that the client won't begin listening to [responses] until
+ /// [Client.listen] is called.
+ Client.withoutJson(Stream responses, [StreamSink requests])
+ : _streams = new TwoWayStream.withoutJson(
+ "Client", responses, "responses", requests, "requests");
+
+ /// Users of the library should not use this constructor.
+ Client.internal(this._streams);
+
+ /// Starts listening to the underlying stream.
+ ///
+ /// Returns a [Future] that will complete when the stream is closed or when it
+ /// has an error.
+ ///
+ /// [listen] may only be called once.
+ Future listen() => _streams.listen(_handleResponse);
+
+ /// Closes the server's request sink and response subscription.
+ ///
+ /// Returns a [Future] that completes when all resources have been released.
+ ///
+ /// A client can't be closed before [listen] has been called.
+ Future close() => _streams.close();
+
+ /// Sends a JSON-RPC 2 request to invoke the given [method].
+ ///
+ /// If passed, [parameters] is the parameters for the method. This must be
+ /// either an [Iterable] (to pass parameters by position) or a [Map] with
+ /// [String] keys (to pass parameters by name). Either way, it must be
+ /// JSON-serializable.
+ ///
+ /// If the request succeeds, this returns the response result as a decoded
+ /// JSON-serializable object. If it fails, it throws an [RpcException]
+ /// describing the failure.
+ Future sendRequest(String method, [parameters]) {
+ var id = _id++;
+ _send(method, parameters, id);
+
+ var completer = new Completer.sync();
+ _pendingRequests[id] = completer;
+ return completer.future;
+ }
+
+ /// Sends a JSON-RPC 2 request to invoke the given [method] without expecting
+ /// a response.
+ ///
+ /// If passed, [parameters] is the parameters for the method. This must be
+ /// either an [Iterable] (to pass parameters by position) or a [Map] with
+ /// [String] keys (to pass parameters by name). Either way, it must be
+ /// JSON-serializable.
+ ///
+ /// Since this is just a notification to which the server isn't expected to
+ /// send a response, it has no return value.
+ void sendNotification(String method, [parameters]) =>
+ _send(method, parameters);
+
+ /// A helper method for [sendRequest] and [sendNotification].
+ ///
+ /// Sends a request to invoke [method] with [parameters]. If [id] is given,
+ /// the request uses that id.
+ void _send(String method, parameters, [int id]) {
+ if (parameters is Iterable) parameters = parameters.toList();
+ if (parameters is! Map && parameters is! List && parameters != null) {
+ throw new ArgumentError('Only maps and lists may be used as JSON-RPC '
+ 'parameters, was "$parameters".');
+ }
+
+ var message = {
+ "jsonrpc": "2.0",
+ "method": method
+ };
+ if (id != null) message["id"] = id;
+ if (parameters != null) message["params"] = parameters;
+
+ if (_batch != null) {
+ _batch.add(message);
+ } else {
+ _streams.add(message);
+ }
+ }
+
+ /// Runs [callback] and batches any requests sent until it returns.
+ ///
+ /// A batch of requests is sent in a single message on the underlying stream,
+ /// and the responses are likewise sent back in a single message.
+ ///
+ /// [callback] may be synchronous or asynchronous. If it returns a [Future],
+ /// requests will be batched until that Future returns; otherwise, requests
+ /// will only be batched while synchronously executing [callback].
+ ///
+ /// If this is called in the context of another [withBatch] call, it just
+ /// invokes [callback] without creating another batch. This means that
+ /// responses are batched until the first batch ends.
+ withBatch(callback()) {
+ if (_batch != null) return callback();
+
+ _batch = [];
+ return tryFinally(callback, () {
+ _streams.add(_batch);
+ _batch = null;
+ });
+ }
+
+ /// Handles a decoded response from the server.
+ void _handleResponse(response) {
+ if (response is List) {
+ response.forEach(_handleSingleResponse);
+ } else {
+ _handleSingleResponse(response);
+ }
+ }
+
+ /// Handles a decoded response from the server after batches have been
+ /// resolved.
+ void _handleSingleResponse(response) {
+ if (!_isResponseValid(response)) return;
+ var completer = _pendingRequests.remove(response["id"]);
+ if (response.containsKey("result")) {
+ completer.complete(response["result"]);
+ } else {
+ completer.completeError(new RpcException(
+ response["error"]["code"],
+ response["error"]["message"],
+ data: response["error"]["data"]),
+ new Chain.current());
+ }
+ }
+
+ /// Determines whether the server's response is valid per the spec.
+ bool _isResponseValid(response) {
+ if (response is! Map) return false;
+ if (response["jsonrpc"] != "2.0") return false;
+ if (!_pendingRequests.containsKey(response["id"])) return false;
+ if (response.containsKey("result")) return true;
+
+ if (!response.containsKey("error")) return false;
+ var error = response["error"];
+ if (error is! Map) return false;
+ if (error["code"] is! int) return false;
+ if (error["message"] is! String) return false;
+ return true;
+ }
+}
diff --git a/lib/src/exception.dart b/lib/src/exception.dart
index be9c2d3..2fb0713 100644
--- a/lib/src/exception.dart
+++ b/lib/src/exception.dart
@@ -64,4 +64,11 @@
'id': id
};
}
+
+ String toString() {
+ var prefix = "JSON-RPC error $code";
+ var errorName = error_code.name(code);
+ if (errorName != null) prefix += " ($errorName)";
+ return "$prefix: $message";
+ }
}
diff --git a/lib/src/server.dart b/lib/src/server.dart
index bd243de..1ba8fcc 100644
--- a/lib/src/server.dart
+++ b/lib/src/server.dart
@@ -69,6 +69,9 @@
: _streams = new TwoWayStream.withoutJson(
"Server", requests, "requests", responses, "responses");
+ /// Users of the library should not use this constructor.
+ Server.internal(this._streams);
+
/// Starts listening to the underlying stream.
///
/// Returns a [Future] that will complete when the stream is closed or when it
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index a212f58..f862cb7 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -44,6 +44,28 @@
String getErrorMessage(error) =>
error.toString().replaceFirst(_exceptionPrefix, '');
+/// Like `try`/`finally`, run [body] and ensure that [whenComplete] runs
+/// afterwards, regardless of whether [body] succeeded.
+///
+/// This is synchronicity-agnostic relative to [body]. If [body] returns a
+/// [Future], this wil run asynchronously; otherwise it will run synchronously.
+tryFinally(body(), whenComplete()) {
+ var result;
+ try {
+ result = body();
+ } catch (_) {
+ whenComplete();
+ rethrow;
+ }
+
+ if (result is! Future) {
+ whenComplete();
+ return result;
+ } else {
+ return result.whenComplete(whenComplete);
+ }
+}
+
/// Returns a [StreamSink] that wraps [sink] and maps each event added using
/// [callback].
StreamSink mapStreamSink(StreamSink sink, callback(event)) =>
diff --git a/pubspec.yaml b/pubspec.yaml
index 287bba7..fed14a0 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: json_rpc_2
-version: 0.1.1-dev
+version: 1.0.0-dev
author: Dart Team <misc@dartlang.org>
description: An implementation of the JSON-RPC 2.0 spec.
homepage: http://www.dartlang.org
diff --git a/test/client/client_test.dart b/test/client/client_test.dart
new file mode 100644
index 0000000..9fcf9e3
--- /dev/null
+++ b/test/client/client_test.dart
@@ -0,0 +1,239 @@
+// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library json_rpc_2.test.client.client_test;
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:unittest/unittest.dart';
+import 'package:json_rpc_2/error_code.dart' as error_code;
+import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
+
+import 'utils.dart';
+
+void main() {
+ var controller;
+ setUp(() => controller = new ClientController());
+
+ test("sends a message and returns the response", () {
+ controller.expectRequest((request) {
+ expect(request, allOf([
+ containsPair('jsonrpc', '2.0'),
+ containsPair('method', 'foo'),
+ containsPair('params', {'param': 'value'})
+ ]));
+
+ return {
+ 'jsonrpc': '2.0',
+ 'result': 'bar',
+ 'id': request['id']
+ };
+ });
+
+ expect(controller.client.sendRequest("foo", {'param': 'value'}),
+ completion(equals('bar')));
+ });
+
+ test("sends a notification and expects no response", () {
+ controller.expectRequest((request) {
+ expect(request, equals({
+ 'jsonrpc': '2.0',
+ 'method': 'foo',
+ 'params': {'param': 'value'}
+ }));
+ });
+
+ controller.client.sendNotification("foo", {'param': 'value'});
+ });
+
+ test("sends a notification with positional parameters", () {
+ controller.expectRequest((request) {
+ expect(request, equals({
+ 'jsonrpc': '2.0',
+ 'method': 'foo',
+ 'params': ['value1', 'value2']
+ }));
+ });
+
+ controller.client.sendNotification("foo", ['value1', 'value2']);
+ });
+
+ test("sends a notification with no parameters", () {
+ controller.expectRequest((request) {
+ expect(request, equals({
+ 'jsonrpc': '2.0',
+ 'method': 'foo'
+ }));
+ });
+
+ controller.client.sendNotification("foo");
+ });
+
+ test("sends a synchronous batch of requests", () {
+ controller.expectRequest((request) {
+ expect(request, new isInstanceOf<List>());
+ expect(request, hasLength(3));
+ expect(request[0], equals({
+ 'jsonrpc': '2.0',
+ 'method': 'foo'
+ }));
+ expect(request[1], allOf([
+ containsPair('jsonrpc', '2.0'),
+ containsPair('method', 'bar'),
+ containsPair('params', {'param': 'value'})
+ ]));
+ expect(request[2], allOf([
+ containsPair('jsonrpc', '2.0'),
+ containsPair('method', 'baz')
+ ]));
+
+ return [
+ {
+ 'jsonrpc': '2.0',
+ 'result': 'baz response',
+ 'id': request[2]['id']
+ },
+ {
+ 'jsonrpc': '2.0',
+ 'result': 'bar response',
+ 'id': request[1]['id']
+ }
+ ];
+ });
+
+ controller.client.withBatch(() {
+ controller.client.sendNotification("foo");
+ expect(controller.client.sendRequest("bar", {'param': 'value'}),
+ completion(equals("bar response")));
+ expect(controller.client.sendRequest("baz"),
+ completion(equals("baz response")));
+ });
+ });
+
+ test("sends an asynchronous batch of requests", () {
+ controller.expectRequest((request) {
+ expect(request, new isInstanceOf<List>());
+ expect(request, hasLength(3));
+ expect(request[0], equals({
+ 'jsonrpc': '2.0',
+ 'method': 'foo'
+ }));
+ expect(request[1], allOf([
+ containsPair('jsonrpc', '2.0'),
+ containsPair('method', 'bar'),
+ containsPair('params', {'param': 'value'})
+ ]));
+ expect(request[2], allOf([
+ containsPair('jsonrpc', '2.0'),
+ containsPair('method', 'baz')
+ ]));
+
+ return [
+ {
+ 'jsonrpc': '2.0',
+ 'result': 'baz response',
+ 'id': request[2]['id']
+ },
+ {
+ 'jsonrpc': '2.0',
+ 'result': 'bar response',
+ 'id': request[1]['id']
+ }
+ ];
+ });
+
+ controller.client.withBatch(() {
+ return new Future.value().then((_) {
+ controller.client.sendNotification("foo");
+ return new Future.value();
+ }).then((_) {
+ expect(controller.client.sendRequest("bar", {'param': 'value'}),
+ completion(equals("bar response")));
+ return new Future.value();
+ }).then((_) {
+ expect(controller.client.sendRequest("baz"),
+ completion(equals("baz response")));
+ });
+ });
+ });
+
+ test("reports an error from the server", () {
+ controller.expectRequest((request) {
+ expect(request, allOf([
+ containsPair('jsonrpc', '2.0'),
+ containsPair('method', 'foo')
+ ]));
+
+ return {
+ 'jsonrpc': '2.0',
+ 'error': {
+ 'code': error_code.SERVER_ERROR,
+ 'message': 'you are bad at requests',
+ 'data': 'some junk'
+ },
+ 'id': request['id']
+ };
+ });
+
+ expect(controller.client.sendRequest("foo", {'param': 'value'}),
+ throwsA(predicate((exception) {
+ expect(exception, new isInstanceOf<json_rpc.RpcException>());
+ expect(exception.code, equals(error_code.SERVER_ERROR));
+ expect(exception.message, equals('you are bad at requests'));
+ expect(exception.data, equals('some junk'));
+ return true;
+ })));
+ });
+
+ test("ignores bogus responses", () {
+ // Make a request so we have something to respond to.
+ controller.expectRequest((request) {
+ controller.sendJsonResponse("{invalid");
+ controller.sendResponse("not a map");
+ controller.sendResponse({
+ 'jsonrpc': 'wrong version',
+ 'result': 'wrong',
+ 'id': request['id']
+ });
+ controller.sendResponse({
+ 'jsonrpc': '2.0',
+ 'result': 'wrong'
+ });
+ controller.sendResponse({
+ 'jsonrpc': '2.0',
+ 'id': request['id']
+ });
+ controller.sendResponse({
+ 'jsonrpc': '2.0',
+ 'error': 'not a map',
+ 'id': request['id']
+ });
+ controller.sendResponse({
+ 'jsonrpc': '2.0',
+ 'error': {
+ 'code': 'not an int',
+ 'message': 'dang yo'
+ },
+ 'id': request['id']
+ });
+ controller.sendResponse({
+ 'jsonrpc': '2.0',
+ 'error': {
+ 'code': 123,
+ 'message': 0xDEADBEEF
+ },
+ 'id': request['id']
+ });
+
+ return pumpEventQueue().then((_) => {
+ 'jsonrpc': '2.0',
+ 'result': 'right',
+ 'id': request['id']
+ });
+ });
+
+ expect(controller.client.sendRequest("foo"), completion(equals('right')));
+ });
+}
diff --git a/test/client/stream_test.dart b/test/client/stream_test.dart
new file mode 100644
index 0000000..6942ff7
--- /dev/null
+++ b/test/client/stream_test.dart
@@ -0,0 +1,96 @@
+// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library json_rpc_2.test.client.stream_test;
+
+import 'dart:async';
+
+import 'package:unittest/unittest.dart';
+import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
+
+import 'utils.dart';
+
+void main() {
+ test(".withoutJson supports decoded stream and sink", () {
+ var responseController = new StreamController();
+ var requestController = new StreamController();
+ var client = new json_rpc.Client.withoutJson(
+ responseController.stream, requestController.sink);
+ client.listen();
+
+ expect(requestController.stream.first.then((request) {
+ expect(request, allOf([
+ containsPair('jsonrpc', '2.0'),
+ containsPair('method', 'foo')
+ ]));
+
+ responseController.add({
+ 'jsonrpc': '2.0',
+ 'result': 'bar',
+ 'id': request['id']
+ });
+ }), completes);
+
+ client.sendRequest('foo');
+ });
+
+ test(".listen returns when the controller is closed", () {
+ var responseController = new StreamController();
+ var requestController = new StreamController();
+ var client = new json_rpc.Client.withoutJson(
+ responseController.stream, requestController.sink);
+
+ var hasListenCompeted = false;
+ expect(client.listen().then((_) => hasListenCompeted = true), completes);
+
+ return pumpEventQueue().then((_) {
+ expect(hasListenCompeted, isFalse);
+
+ // This should cause listen to complete.
+ return responseController.close();
+ });
+ });
+
+ test(".listen returns a stream error", () {
+ var responseController = new StreamController();
+ var requestController = new StreamController();
+ var client = new json_rpc.Client(
+ responseController.stream, requestController.sink);
+
+ expect(client.listen(), throwsA('oh no'));
+ responseController.addError('oh no');
+ });
+
+ test(".listen can't be called twice", () {
+ var responseController = new StreamController();
+ var requestController = new StreamController();
+ var client = new json_rpc.Client(
+ responseController.stream, requestController.sink);
+ client.listen();
+
+ expect(() => client.listen(), throwsStateError);
+ });
+
+ test(".close cancels the stream subscription and closes the sink", () {
+ var responseController = new StreamController();
+ var requestController = new StreamController();
+ var client = new json_rpc.Client(
+ responseController.stream, requestController.sink);
+
+ expect(client.listen(), completes);
+ expect(client.close(), completes);
+
+ expect(() => responseController.stream.listen((_) {}), throwsStateError);
+ expect(requestController.isClosed, isTrue);
+ });
+
+ test(".close can't be called before .listen", () {
+ var responseController = new StreamController();
+ var requestController = new StreamController();
+ var client = new json_rpc.Client(
+ responseController.stream, requestController.sink);
+
+ expect(() => client.close(), throwsStateError);
+ });
+}
diff --git a/test/client/utils.dart b/test/client/utils.dart
new file mode 100644
index 0000000..a164577
--- /dev/null
+++ b/test/client/utils.dart
@@ -0,0 +1,65 @@
+// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library json_rpc_2.test.client.utils;
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
+import 'package:json_rpc_2/error_code.dart' as error_code;
+import 'package:unittest/unittest.dart';
+
+/// A controller used to test a [json_rpc.Client].
+class ClientController {
+ /// The controller for the client's response stream.
+ final _responseController = new StreamController<String>();
+
+ /// The controller for the client's request sink.
+ final _requestController = new StreamController<String>();
+
+ /// The client.
+ json_rpc.Client get client => _client;
+ json_rpc.Client _client;
+
+ ClientController() {
+ _client = new json_rpc.Client(
+ _responseController.stream, _requestController.sink);
+ _client.listen();
+ }
+
+ /// Expects that the client will send a request.
+ ///
+ /// The request is passed to [callback], which can return a response. If it
+ /// returns a String, that's sent as the response directly. If it returns
+ /// null, no response is sent. Otherwise, the return value is encoded and sent
+ /// as the response.
+ void expectRequest(callback(request)) {
+ expect(_requestController.stream.first.then((request) {
+ return callback(JSON.decode(request));
+ }).then((response) {
+ if (response == null) return;
+ if (response is! String) response = JSON.encode(response);
+ _responseController.add(response);
+ }), completes);
+ }
+
+ /// Sends [response], a decoded response, to [client].
+ Future sendResponse(response) => sendJsonResponse(JSON.encode(response));
+
+ /// Sends [response], a JSON-encoded response, to [client].
+ Future sendJsonResponse(String request) => _responseController.add(request);
+}
+
+/// Returns a [Future] that completes after pumping the event queue [times]
+/// times. By default, this should pump the event queue enough times to allow
+/// any code to run, as long as it's not waiting on some external event.
+Future pumpEventQueue([int times = 20]) {
+ if (times == 0) return new Future.value();
+ // We use a delayed future to allow microtask events to finish. The
+ // Future.value or Future() constructors use scheduleMicrotask themselves and
+ // would therefore not wait for microtask callbacks that are scheduled after
+ // invoking this method.
+ return new Future.delayed(Duration.ZERO, () => pumpEventQueue(times - 1));
+}