Extract out a StreamManager class from json_rpc.Server.
This can then be used to more easily implement Client.
R=rnystrom@google.com
Review URL: https://codereview.chromium.org//333683003
git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart/pkg/json_rpc_2@37773 260f80e4-7a28-3924-810f-c04153c831b5
diff --git a/lib/src/server.dart b/lib/src/server.dart
index c7ece5b..bd243de 100644
--- a/lib/src/server.dart
+++ b/lib/src/server.dart
@@ -13,6 +13,7 @@
import '../error_code.dart' as error_code;
import 'exception.dart';
import 'parameters.dart';
+import 'two_way_stream.dart';
import 'utils.dart';
/// A JSON-RPC 2.0 server.
@@ -26,19 +27,7 @@
/// asynchronously, it's possible for multiple methods to be invoked at the same
/// time, or even for a single method to be invoked multiple times at once.
class Server {
- /// The stream for decoded requests.
- final Stream _requests;
-
- /// The subscription to the decoded request stream.
- StreamSubscription _requestSubscription;
-
- /// The sink for decoded responses.
- final StreamSink _responses;
-
- /// The completer for [listen].
- ///
- /// This is non-`null` after [listen] has been called.
- Completer _listenCompleter;
+ TwoWayStream _streams;
/// The methods registered for this server.
final _methods = new Map<String, Function>();
@@ -57,28 +46,12 @@
///
/// Note that the server won't begin listening to [requests] until
/// [Server.listen] is called.
- factory Server(Stream<String> requests, [StreamSink<String> responses]) {
- if (responses == null) {
- if (requests is! StreamSink) {
- throw new ArgumentError("Either `requests` must be a StreamSink or "
- "`responses` must be passed.");
- }
- responses = requests as StreamSink;
- }
-
- var wrappedResponses = mapStreamSink(responses, JSON.encode);
- return new Server.withoutJson(requests.expand((request) {
- var decodedRequest;
- try {
- decodedRequest = JSON.decode(request);
- } on FormatException catch (error) {
- wrappedResponses.add(new RpcException(error_code.PARSE_ERROR,
- 'Invalid JSON: ${error.message}').serialize(request));
- return [];
- }
-
- return [decodedRequest];
- }), wrappedResponses);
+ Server(Stream<String> requests, [StreamSink<String> responses]) {
+ _streams = new TwoWayStream("Server", requests, "requests",
+ responses, "responses", onInvalidInput: (message, error) {
+ _streams.add(new RpcException(error_code.PARSE_ERROR,
+ 'Invalid JSON: ${error.message}').serialize(message));
+ });
}
/// Creates a [Server] that reads decoded requests from [requests] and writes
@@ -93,14 +66,8 @@
/// Note that the server won't begin listening to [requests] until
/// [Server.listen] is called.
Server.withoutJson(Stream requests, [StreamSink responses])
- : _requests = requests,
- _responses = responses == null && requests is StreamSink ?
- requests : responses {
- if (_responses == null) {
- throw new ArgumentError("Either `requests` must be a StreamSink or "
- "`responses` must be passed.");
- }
- }
+ : _streams = new TwoWayStream.withoutJson(
+ "Server", requests, "requests", responses, "responses");
/// Starts listening to the underlying stream.
///
@@ -108,45 +75,14 @@
/// has an error.
///
/// [listen] may only be called once.
- Future listen() {
- if (_listenCompleter != null) {
- throw new StateError(
- "Can only call Server.listen once on a given server.");
- }
-
- _listenCompleter = new Completer();
- _requestSubscription = _requests.listen(_handleRequest,
- onError: (error, stackTrace) {
- if (_listenCompleter.isCompleted) return;
- _responses.close();
- _listenCompleter.completeError(error, stackTrace);
- }, onDone: () {
- if (_listenCompleter.isCompleted) return;
- _responses.close();
- _listenCompleter.complete();
- }, cancelOnError: true);
-
- return _listenCompleter.future;
- }
+ Future listen() => _streams.listen(_handleRequest);
/// Closes the server's request subscription and response sink.
///
/// Returns a [Future] that completes when all resources have been released.
///
/// A server can't be closed before [listen] has been called.
- Future close() {
- if (_listenCompleter == null) {
- throw new StateError("Can't call Server.close before Server.listen.");
- }
-
- if (!_listenCompleter.isCompleted) _listenCompleter.complete();
-
- var subscriptionFuture = _requestSubscription.cancel();
- // TODO(nweiz): include the response future in the return value when issue
- // 19095 is fixed.
- _responses.close();
- return subscriptionFuture == null ? new Future.value() : subscriptionFuture;
- }
+ Future close() => _streams.close();
/// Registers a method named [name] on this server.
///
@@ -199,7 +135,7 @@
var nonNull = results.where((result) => result != null);
return nonNull.isEmpty ? null : nonNull.toList();
});
- }).then(_responses.add);
+ }).then(_streams.add);
}
/// Handles an individual parsed request.
diff --git a/lib/src/two_way_stream.dart b/lib/src/two_way_stream.dart
new file mode 100644
index 0000000..f470c2f
--- /dev/null
+++ b/lib/src/two_way_stream.dart
@@ -0,0 +1,134 @@
+// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library json_rpc_2.two_way_stream;
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'utils.dart';
+
+/// A class for managing a stream of input messages and a sink for output
+/// messages.
+///
+/// This contains stream logic that's shared between [Server] and [Client].
+class TwoWayStream {
+ /// The name of the component whose streams are being managed (e.g. "Server").
+ ///
+ /// Used for error reporting.
+ final String _name;
+
+ /// The input stream.
+ ///
+ /// This is a stream of decoded JSON objects.
+ final Stream _input;
+
+ /// The subscription to [_input].
+ StreamSubscription _inputSubscription;
+
+ /// The output sink.
+ ///
+ /// This takes decoded JSON objects.
+ final StreamSink _output;
+
+ /// The completer for [listen].
+ ///
+ /// This is non-`null` after [listen] has been called.
+ Completer _listenCompleter;
+
+ /// Creates a two-way stream.
+ ///
+ /// [input] and [output] should emit and take (respectively) JSON-encoded
+ /// strings.
+ ///
+ /// [inputName] is used in error messages as the name of the input parameter.
+ /// [outputName] is likewise used as the name of the output parameter.
+ ///
+ /// If [onInvalidInput] is passed, any errors parsing messages from [input]
+ /// are passed to it. Otherwise, they're ignored and the input is discarded.
+ factory TwoWayStream(String name, Stream<String> input, String inputName,
+ StreamSink<String> output, String outputName,
+ {void onInvalidInput(String message, FormatException error)}) {
+ if (output == null) {
+ if (input is! StreamSink) {
+ throw new ArgumentError("Either `$inputName` must be a StreamSink or "
+ "`$outputName` must be passed.");
+ }
+ output = input as StreamSink;
+ }
+
+ var wrappedOutput = mapStreamSink(output, JSON.encode);
+ return new TwoWayStream.withoutJson(name, input.expand((message) {
+ var decodedMessage;
+ try {
+ decodedMessage = JSON.decode(message);
+ } on FormatException catch (error) {
+ if (onInvalidInput != null) onInvalidInput(message, error);
+ return [];
+ }
+
+ return [decodedMessage];
+ }), inputName, wrappedOutput, outputName);
+ }
+
+ /// Creates a two-way stream that reads decoded input and writes decoded
+ /// responses.
+ ///
+ /// [input] and [output] should emit and take (respectively) decoded JSON
+ /// objects.
+ ///
+ /// [inputName] is used in error messages as the name of the input parameter.
+ /// [outputName] is likewise used as the name of the output parameter.
+ TwoWayStream.withoutJson(this._name, Stream input, String inputName,
+ StreamSink output, String outputName)
+ : _input = input,
+ _output = output == null && input is StreamSink ? input : output {
+ if (_output == null) {
+ throw new ArgumentError("Either `$inputName` must be a StreamSink or "
+ "`$outputName` must be passed.");
+ }
+ }
+
+ /// Starts listening to the input stream.
+ ///
+ /// The returned Future will complete when the input stream is closed. If the
+ /// input stream emits an error, that will be piped to the returned Future.
+ Future listen(void handleInput(input)) {
+ if (_listenCompleter != null) {
+ throw new StateError("Can only call $_name.listen once.");
+ }
+
+ _listenCompleter = new Completer();
+ _inputSubscription = _input.listen(handleInput,
+ onError: (error, stackTrace) {
+ if (_listenCompleter.isCompleted) return;
+ _output.close();
+ _listenCompleter.completeError(error, stackTrace);
+ }, onDone: () {
+ if (_listenCompleter.isCompleted) return;
+ _output.close();
+ _listenCompleter.complete();
+ }, cancelOnError: true);
+
+ return _listenCompleter.future;
+ }
+
+ /// Emit [event] on the output stream.
+ void add(event) => _output.add(event);
+
+ /// Stops listening to the input stream and closes the output stream.
+ Future close() {
+ if (_listenCompleter == null) {
+ throw new StateError("Can't call $_name.close before $_name.listen.");
+ }
+
+ if (!_listenCompleter.isCompleted) _listenCompleter.complete();
+
+ var inputFuture = _inputSubscription.cancel();
+ // TODO(nweiz): include the output future in the return value when issue
+ // 19095 is fixed.
+ _output.close();
+ return inputFuture == null ? new Future.value() : inputFuture;
+ }
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index df1a8a0..e0fca72 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: json_rpc_2
-version: 0.1.0
+version: 0.1.1-dev
author: Dart Team <misc@dartlang.org>
description: An implementation of the JSON-RPC 2.0 spec.
homepage: http://www.dartlang.org