Add a Client class to json_rpc_2.

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//691053006

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart/pkg/json_rpc_2@41535 260f80e4-7a28-3924-810f-c04153c831b5
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 34123f9..15b5a30 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 1.0.0
+
+* Add a `Client` class for communicating with external JSON-RPC 2.0 servers.
+
 ## 0.1.0
 
 * Remove `Server.handleRequest()` and `Server.parseRequest()`. Instead, `new
diff --git a/lib/error_code.dart b/lib/error_code.dart
index 96cb909..14b77f2 100644
--- a/lib/error_code.dart
+++ b/lib/error_code.dart
@@ -34,3 +34,18 @@
 /// The spec reserves the range from -32000 to -32099 for implementation-defined
 /// server exceptions, but for now we only use one of those values.
 const SERVER_ERROR = -32000;
+
+/// Returns a human-readable name for [errorCode] if it's one specified by the
+/// JSON-RPC 2.0 spec.
+///
+/// If [errorCode] isn't defined in the JSON-RPC 2.0 spec, returns null.
+String name(int errorCode) {
+  switch (errorCode) {
+    case PARSE_ERROR: return "parse error";
+    case INVALID_REQUEST: return "invalid request";
+    case METHOD_NOT_FOUND: return "method not found";
+    case INVALID_PARAMS: return "invalid parameters";
+    case INTERNAL_ERROR: return "internal error";
+    default: return null;
+  }
+}
diff --git a/lib/json_rpc_2.dart b/lib/json_rpc_2.dart
index 04e4a52..3305eae 100644
--- a/lib/json_rpc_2.dart
+++ b/lib/json_rpc_2.dart
@@ -4,6 +4,7 @@
 
 library json_rpc_2;
 
+export 'src/client.dart';
 export 'src/exception.dart';
 export 'src/parameters.dart';
 export 'src/server.dart';
diff --git a/lib/src/client.dart b/lib/src/client.dart
new file mode 100644
index 0000000..2e1d273
--- /dev/null
+++ b/lib/src/client.dart
@@ -0,0 +1,198 @@
+// Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library json_rpc_2.client;
+
+import 'dart:async';
+
+import 'package:stack_trace/stack_trace.dart';
+
+import 'exception.dart';
+import 'two_way_stream.dart';
+import 'utils.dart';
+
+/// A JSON-RPC 2.0 client.
+///
+/// A client calls methods on a server and handles the server's responses to
+/// those method calls. Methods can be called with [sendRequest], or with
+/// [sendNotification] if no response is expected.
+class Client {
+  final TwoWayStream _streams;
+
+  /// The next request id.
+  var _id = 0;
+
+  /// The current batch of requests to be sent together.
+  ///
+  /// Each element is a JSON-serializable object.
+  List _batch;
+
+  /// The map of request ids for pending requests to [Completer]s that will be
+  /// completed with those requests' responses.
+  final _pendingRequests = new Map<int, Completer>();
+
+  /// Creates a [Client] that writes requests to [requests] and reads responses
+  /// from [responses].
+  ///
+  /// If [responses] is a [StreamSink] as well as a [Stream] (for example, a
+  /// `WebSocket`), [requests] may be omitted.
+  ///
+  /// Note that the client won't begin listening to [responses] until
+  /// [Client.listen] is called.
+  Client(Stream<String> responses, [StreamSink<String> requests])
+      : _streams = new TwoWayStream(
+            "Client", responses, "responses", requests, "requests");
+
+  /// Creates a [Client] that writes decoded responses to [responses] and reads
+  /// decoded requests from [requests].
+  ///
+  /// Unlike [new Client], this doesn't read or write JSON strings. Instead, it
+  /// reads and writes decoded maps or lists.
+  ///
+  /// If [responses] is a [StreamSink] as well as a [Stream], [requests] may be
+  /// omitted.
+  ///
+  /// Note that the client won't begin listening to [responses] until
+  /// [Client.listen] is called.
+  Client.withoutJson(Stream responses, [StreamSink requests])
+      : _streams = new TwoWayStream.withoutJson(
+            "Client", responses, "responses", requests, "requests");
+
+  /// Users of the library should not use this constructor.
+  Client.internal(this._streams);
+
+  /// Starts listening to the underlying stream.
+  ///
+  /// Returns a [Future] that will complete when the stream is closed or when it
+  /// has an error.
+  ///
+  /// [listen] may only be called once.
+  Future listen() => _streams.listen(_handleResponse);
+
+  /// Closes the server's request sink and response subscription.
+  ///
+  /// Returns a [Future] that completes when all resources have been released.
+  ///
+  /// A client can't be closed before [listen] has been called.
+  Future close() => _streams.close();
+
+  /// Sends a JSON-RPC 2 request to invoke the given [method].
+  ///
+  /// If passed, [parameters] is the parameters for the method. This must be
+  /// either an [Iterable] (to pass parameters by position) or a [Map] with
+  /// [String] keys (to pass parameters by name). Either way, it must be
+  /// JSON-serializable.
+  ///
+  /// If the request succeeds, this returns the response result as a decoded
+  /// JSON-serializable object. If it fails, it throws an [RpcException]
+  /// describing the failure.
+  Future sendRequest(String method, [parameters]) {
+    var id = _id++;
+    _send(method, parameters, id);
+
+    var completer = new Completer.sync();
+    _pendingRequests[id] = completer;
+    return completer.future;
+  }
+
+  /// Sends a JSON-RPC 2 request to invoke the given [method] without expecting
+  /// a response.
+  ///
+  /// If passed, [parameters] is the parameters for the method. This must be
+  /// either an [Iterable] (to pass parameters by position) or a [Map] with
+  /// [String] keys (to pass parameters by name). Either way, it must be
+  /// JSON-serializable.
+  ///
+  /// Since this is just a notification to which the server isn't expected to
+  /// send a response, it has no return value.
+  void sendNotification(String method, [parameters]) =>
+      _send(method, parameters);
+
+  /// A helper method for [sendRequest] and [sendNotification].
+  ///
+  /// Sends a request to invoke [method] with [parameters]. If [id] is given,
+  /// the request uses that id.
+  void _send(String method, parameters, [int id]) {
+    if (parameters is Iterable) parameters = parameters.toList();
+    if (parameters is! Map && parameters is! List && parameters != null) {
+      throw new ArgumentError('Only maps and lists may be used as JSON-RPC '
+          'parameters, was "$parameters".');
+    }
+
+    var message = {
+      "jsonrpc": "2.0",
+      "method": method
+    };
+    if (id != null) message["id"] = id;
+    if (parameters != null) message["params"] = parameters;
+
+    if (_batch != null) {
+      _batch.add(message);
+    } else {
+      _streams.add(message);
+    }
+  }
+
+  /// Runs [callback] and batches any requests sent until it returns.
+  ///
+  /// A batch of requests is sent in a single message on the underlying stream,
+  /// and the responses are likewise sent back in a single message.
+  ///
+  /// [callback] may be synchronous or asynchronous. If it returns a [Future],
+  /// requests will be batched until that Future returns; otherwise, requests
+  /// will only be batched while synchronously executing [callback].
+  ///
+  /// If this is called in the context of another [withBatch] call, it just
+  /// invokes [callback] without creating another batch. This means that
+  /// responses are batched until the first batch ends.
+  withBatch(callback()) {
+    if (_batch != null) return callback();
+
+    _batch = [];
+    return tryFinally(callback, () {
+      _streams.add(_batch);
+      _batch = null;
+    });
+  }
+
+  /// Handles a decoded response from the server.
+  void _handleResponse(response) {
+    if (response is List) {
+      response.forEach(_handleSingleResponse);
+    } else {
+      _handleSingleResponse(response);
+    }
+  }
+
+  /// Handles a decoded response from the server after batches have been
+  /// resolved.
+  void _handleSingleResponse(response) {
+    if (!_isResponseValid(response)) return;
+    var completer = _pendingRequests.remove(response["id"]);
+    if (response.containsKey("result")) {
+      completer.complete(response["result"]);
+    } else {
+      completer.completeError(new RpcException(
+            response["error"]["code"],
+            response["error"]["message"],
+            data: response["error"]["data"]),
+          new Chain.current());
+    }
+  }
+
+  /// Determines whether the server's response is valid per the spec.
+  bool _isResponseValid(response) {
+    if (response is! Map) return false;
+    if (response["jsonrpc"] != "2.0") return false;
+    if (!_pendingRequests.containsKey(response["id"])) return false;
+    if (response.containsKey("result")) return true;
+
+    if (!response.containsKey("error")) return false;
+    var error = response["error"];
+    if (error is! Map) return false;
+    if (error["code"] is! int) return false;
+    if (error["message"] is! String) return false;
+    return true;
+  }
+}
diff --git a/lib/src/exception.dart b/lib/src/exception.dart
index be9c2d3..2fb0713 100644
--- a/lib/src/exception.dart
+++ b/lib/src/exception.dart
@@ -64,4 +64,11 @@
       'id': id
     };
   }
+
+  String toString() {
+    var prefix = "JSON-RPC error $code";
+    var errorName = error_code.name(code);
+    if (errorName != null) prefix += " ($errorName)";
+    return "$prefix: $message";
+  }
 }
diff --git a/lib/src/server.dart b/lib/src/server.dart
index bd243de..1ba8fcc 100644
--- a/lib/src/server.dart
+++ b/lib/src/server.dart
@@ -69,6 +69,9 @@
       : _streams = new TwoWayStream.withoutJson(
             "Server", requests, "requests", responses, "responses");
 
+  /// Users of the library should not use this constructor.
+  Server.internal(this._streams);
+
   /// Starts listening to the underlying stream.
   ///
   /// Returns a [Future] that will complete when the stream is closed or when it
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index a212f58..f862cb7 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -44,6 +44,28 @@
 String getErrorMessage(error) =>
     error.toString().replaceFirst(_exceptionPrefix, '');
 
+/// Like `try`/`finally`, run [body] and ensure that [whenComplete] runs
+/// afterwards, regardless of whether [body] succeeded.
+///
+/// This is synchronicity-agnostic relative to [body]. If [body] returns a
+/// [Future], this wil run asynchronously; otherwise it will run synchronously.
+tryFinally(body(), whenComplete()) {
+  var result;
+  try {
+    result = body();
+  } catch (_) {
+    whenComplete();
+    rethrow;
+  }
+
+  if (result is! Future) {
+    whenComplete();
+    return result;
+  } else {
+    return result.whenComplete(whenComplete);
+  }
+}
+
 /// Returns a [StreamSink] that wraps [sink] and maps each event added using
 /// [callback].
 StreamSink mapStreamSink(StreamSink sink, callback(event)) =>
diff --git a/pubspec.yaml b/pubspec.yaml
index 287bba7..fed14a0 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: json_rpc_2
-version: 0.1.1-dev
+version: 1.0.0-dev
 author: Dart Team <misc@dartlang.org>
 description: An implementation of the JSON-RPC 2.0 spec.
 homepage: http://www.dartlang.org
diff --git a/test/client/client_test.dart b/test/client/client_test.dart
new file mode 100644
index 0000000..9fcf9e3
--- /dev/null
+++ b/test/client/client_test.dart
@@ -0,0 +1,239 @@
+// Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library json_rpc_2.test.client.client_test;
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:unittest/unittest.dart';
+import 'package:json_rpc_2/error_code.dart' as error_code;
+import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
+
+import 'utils.dart';
+
+void main() {
+  var controller;
+  setUp(() => controller = new ClientController());
+
+  test("sends a message and returns the response", () {
+    controller.expectRequest((request) {
+      expect(request, allOf([
+        containsPair('jsonrpc', '2.0'),
+        containsPair('method', 'foo'),
+        containsPair('params', {'param': 'value'})
+      ]));
+
+      return {
+        'jsonrpc': '2.0',
+        'result': 'bar',
+        'id': request['id']
+      };
+    });
+
+    expect(controller.client.sendRequest("foo", {'param': 'value'}),
+        completion(equals('bar')));
+  });
+
+  test("sends a notification and expects no response", () {
+    controller.expectRequest((request) {
+      expect(request, equals({
+        'jsonrpc': '2.0',
+        'method': 'foo',
+        'params': {'param': 'value'}
+      }));
+    });
+
+    controller.client.sendNotification("foo", {'param': 'value'});
+  });
+
+  test("sends a notification with positional parameters", () {
+    controller.expectRequest((request) {
+      expect(request, equals({
+        'jsonrpc': '2.0',
+        'method': 'foo',
+        'params': ['value1', 'value2']
+      }));
+    });
+
+    controller.client.sendNotification("foo", ['value1', 'value2']);
+  });
+
+  test("sends a notification with no parameters", () {
+    controller.expectRequest((request) {
+      expect(request, equals({
+        'jsonrpc': '2.0',
+        'method': 'foo'
+      }));
+    });
+
+    controller.client.sendNotification("foo");
+  });
+
+  test("sends a synchronous batch of requests", () {
+    controller.expectRequest((request) {
+      expect(request, new isInstanceOf<List>());
+      expect(request, hasLength(3));
+      expect(request[0], equals({
+        'jsonrpc': '2.0',
+        'method': 'foo'
+      }));
+      expect(request[1], allOf([
+        containsPair('jsonrpc', '2.0'),
+        containsPair('method', 'bar'),
+        containsPair('params', {'param': 'value'})
+      ]));
+      expect(request[2], allOf([
+        containsPair('jsonrpc', '2.0'),
+        containsPair('method', 'baz')
+      ]));
+
+      return [
+        {
+          'jsonrpc': '2.0',
+          'result': 'baz response',
+          'id': request[2]['id']
+        },
+        {
+          'jsonrpc': '2.0',
+          'result': 'bar response',
+          'id': request[1]['id']
+        }
+      ];
+    });
+
+    controller.client.withBatch(() {
+      controller.client.sendNotification("foo");
+      expect(controller.client.sendRequest("bar", {'param': 'value'}),
+          completion(equals("bar response")));
+      expect(controller.client.sendRequest("baz"),
+          completion(equals("baz response")));
+    });
+  });
+
+  test("sends an asynchronous batch of requests", () {
+    controller.expectRequest((request) {
+      expect(request, new isInstanceOf<List>());
+      expect(request, hasLength(3));
+      expect(request[0], equals({
+        'jsonrpc': '2.0',
+        'method': 'foo'
+      }));
+      expect(request[1], allOf([
+        containsPair('jsonrpc', '2.0'),
+        containsPair('method', 'bar'),
+        containsPair('params', {'param': 'value'})
+      ]));
+      expect(request[2], allOf([
+        containsPair('jsonrpc', '2.0'),
+        containsPair('method', 'baz')
+      ]));
+
+      return [
+        {
+          'jsonrpc': '2.0',
+          'result': 'baz response',
+          'id': request[2]['id']
+        },
+        {
+          'jsonrpc': '2.0',
+          'result': 'bar response',
+          'id': request[1]['id']
+        }
+      ];
+    });
+
+    controller.client.withBatch(() {
+      return new Future.value().then((_) {
+        controller.client.sendNotification("foo");
+        return new Future.value();
+      }).then((_) {
+        expect(controller.client.sendRequest("bar", {'param': 'value'}),
+            completion(equals("bar response")));
+        return new Future.value();
+      }).then((_) {
+        expect(controller.client.sendRequest("baz"),
+            completion(equals("baz response")));
+      });
+    });
+  });
+
+  test("reports an error from the server", () {
+    controller.expectRequest((request) {
+      expect(request, allOf([
+        containsPair('jsonrpc', '2.0'),
+        containsPair('method', 'foo')
+      ]));
+
+      return {
+        'jsonrpc': '2.0',
+        'error': {
+          'code': error_code.SERVER_ERROR,
+          'message': 'you are bad at requests',
+          'data': 'some junk'
+        },
+        'id': request['id']
+      };
+    });
+
+    expect(controller.client.sendRequest("foo", {'param': 'value'}),
+        throwsA(predicate((exception) {
+      expect(exception, new isInstanceOf<json_rpc.RpcException>());
+      expect(exception.code, equals(error_code.SERVER_ERROR));
+      expect(exception.message, equals('you are bad at requests'));
+      expect(exception.data, equals('some junk'));
+      return true;
+    })));
+  });
+
+  test("ignores bogus responses", () {
+    // Make a request so we have something to respond to.
+    controller.expectRequest((request) {
+      controller.sendJsonResponse("{invalid");
+      controller.sendResponse("not a map");
+      controller.sendResponse({
+        'jsonrpc': 'wrong version',
+        'result': 'wrong',
+        'id': request['id']
+      });
+      controller.sendResponse({
+        'jsonrpc': '2.0',
+        'result': 'wrong'
+      });
+      controller.sendResponse({
+        'jsonrpc': '2.0',
+        'id': request['id']
+      });
+      controller.sendResponse({
+        'jsonrpc': '2.0',
+        'error': 'not a map',
+        'id': request['id']
+      });
+      controller.sendResponse({
+        'jsonrpc': '2.0',
+        'error': {
+          'code': 'not an int',
+          'message': 'dang yo'
+        },
+        'id': request['id']
+      });
+      controller.sendResponse({
+        'jsonrpc': '2.0',
+        'error': {
+          'code': 123,
+          'message': 0xDEADBEEF
+        },
+        'id': request['id']
+      });
+
+      return pumpEventQueue().then((_) => {
+        'jsonrpc': '2.0',
+        'result': 'right',
+        'id': request['id']
+      });
+    });
+
+    expect(controller.client.sendRequest("foo"), completion(equals('right')));
+  });
+}
diff --git a/test/client/stream_test.dart b/test/client/stream_test.dart
new file mode 100644
index 0000000..6942ff7
--- /dev/null
+++ b/test/client/stream_test.dart
@@ -0,0 +1,96 @@
+// Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library json_rpc_2.test.client.stream_test;
+
+import 'dart:async';
+
+import 'package:unittest/unittest.dart';
+import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
+
+import 'utils.dart';
+
+void main() {
+  test(".withoutJson supports decoded stream and sink", () {
+    var responseController = new StreamController();
+    var requestController = new StreamController();
+    var client = new json_rpc.Client.withoutJson(
+        responseController.stream, requestController.sink);
+    client.listen();
+
+    expect(requestController.stream.first.then((request) {
+      expect(request, allOf([
+        containsPair('jsonrpc', '2.0'),
+        containsPair('method', 'foo')
+      ]));
+
+      responseController.add({
+        'jsonrpc': '2.0',
+        'result': 'bar',
+        'id': request['id']
+      });
+    }), completes);
+
+    client.sendRequest('foo');
+  });
+
+  test(".listen returns when the controller is closed", () {
+    var responseController = new StreamController();
+    var requestController = new StreamController();
+    var client = new json_rpc.Client.withoutJson(
+        responseController.stream, requestController.sink);
+
+    var hasListenCompeted = false;
+    expect(client.listen().then((_) => hasListenCompeted = true), completes);
+
+    return pumpEventQueue().then((_) {
+      expect(hasListenCompeted, isFalse);
+
+      // This should cause listen to complete.
+      return responseController.close();
+    });
+  });
+
+  test(".listen returns a stream error", () {
+    var responseController = new StreamController();
+    var requestController = new StreamController();
+    var client = new json_rpc.Client(
+        responseController.stream, requestController.sink);
+
+    expect(client.listen(), throwsA('oh no'));
+    responseController.addError('oh no');
+  });
+
+  test(".listen can't be called twice", () {
+    var responseController = new StreamController();
+    var requestController = new StreamController();
+    var client = new json_rpc.Client(
+        responseController.stream, requestController.sink);
+    client.listen();
+
+    expect(() => client.listen(), throwsStateError);
+  });
+
+  test(".close cancels the stream subscription and closes the sink", () {
+    var responseController = new StreamController();
+    var requestController = new StreamController();
+    var client = new json_rpc.Client(
+        responseController.stream, requestController.sink);
+
+    expect(client.listen(), completes);
+    expect(client.close(), completes);
+
+    expect(() => responseController.stream.listen((_) {}), throwsStateError);
+    expect(requestController.isClosed, isTrue);
+  });
+
+  test(".close can't be called before .listen", () {
+    var responseController = new StreamController();
+    var requestController = new StreamController();
+    var client = new json_rpc.Client(
+        responseController.stream, requestController.sink);
+
+    expect(() => client.close(), throwsStateError);
+  });
+}
diff --git a/test/client/utils.dart b/test/client/utils.dart
new file mode 100644
index 0000000..a164577
--- /dev/null
+++ b/test/client/utils.dart
@@ -0,0 +1,65 @@
+// Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library json_rpc_2.test.client.utils;
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
+import 'package:json_rpc_2/error_code.dart' as error_code;
+import 'package:unittest/unittest.dart';
+
+/// A controller used to test a [json_rpc.Client].
+class ClientController {
+  /// The controller for the client's response stream.
+  final _responseController = new StreamController<String>();
+
+  /// The controller for the client's request sink.
+  final _requestController = new StreamController<String>();
+
+  /// The client.
+  json_rpc.Client get client => _client;
+  json_rpc.Client _client;
+
+  ClientController() {
+    _client = new json_rpc.Client(
+        _responseController.stream, _requestController.sink);
+    _client.listen();
+  }
+
+  /// Expects that the client will send a request.
+  ///
+  /// The request is passed to [callback], which can return a response. If it
+  /// returns a String, that's sent as the response directly. If it returns
+  /// null, no response is sent. Otherwise, the return value is encoded and sent
+  /// as the response.
+  void expectRequest(callback(request)) {
+    expect(_requestController.stream.first.then((request) {
+      return callback(JSON.decode(request));
+    }).then((response) {
+      if (response == null) return;
+      if (response is! String) response = JSON.encode(response);
+      _responseController.add(response);
+    }), completes);
+  }
+
+  /// Sends [response], a decoded response, to [client].
+  Future sendResponse(response) => sendJsonResponse(JSON.encode(response));
+
+  /// Sends [response], a JSON-encoded response, to [client].
+  Future sendJsonResponse(String request) => _responseController.add(request);
+}
+
+/// Returns a [Future] that completes after pumping the event queue [times]
+/// times. By default, this should pump the event queue enough times to allow
+/// any code to run, as long as it's not waiting on some external event.
+Future pumpEventQueue([int times = 20]) {
+  if (times == 0) return new Future.value();
+  // We use a delayed future to allow microtask events to finish. The
+  // Future.value or Future() constructors use scheduleMicrotask themselves and
+  // would therefore not wait for microtask callbacks that are scheduled after
+  // invoking this method.
+  return new Future.delayed(Duration.ZERO, () => pumpEventQueue(times - 1));
+}