Extract out a StreamManager class from json_rpc.Server.

This can then be used to more easily implement Client.

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//333683003

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart/pkg/json_rpc_2@37773 260f80e4-7a28-3924-810f-c04153c831b5
diff --git a/lib/src/server.dart b/lib/src/server.dart
index c7ece5b..bd243de 100644
--- a/lib/src/server.dart
+++ b/lib/src/server.dart
@@ -13,6 +13,7 @@
 import '../error_code.dart' as error_code;
 import 'exception.dart';
 import 'parameters.dart';
+import 'two_way_stream.dart';
 import 'utils.dart';
 
 /// A JSON-RPC 2.0 server.
@@ -26,19 +27,7 @@
 /// asynchronously, it's possible for multiple methods to be invoked at the same
 /// time, or even for a single method to be invoked multiple times at once.
 class Server {
-  /// The stream for decoded requests.
-  final Stream _requests;
-
-  /// The subscription to the decoded request stream.
-  StreamSubscription _requestSubscription;
-
-  /// The sink for decoded responses.
-  final StreamSink _responses;
-
-  /// The completer for [listen].
-  ///
-  /// This is non-`null` after [listen] has been called.
-  Completer _listenCompleter;
+  TwoWayStream _streams;
 
   /// The methods registered for this server.
   final _methods = new Map<String, Function>();
@@ -57,28 +46,12 @@
   ///
   /// Note that the server won't begin listening to [requests] until
   /// [Server.listen] is called.
-  factory Server(Stream<String> requests, [StreamSink<String> responses]) {
-    if (responses == null) {
-      if (requests is! StreamSink) {
-        throw new ArgumentError("Either `requests` must be a StreamSink or "
-            "`responses` must be passed.");
-      }
-      responses = requests as StreamSink;
-    }
-
-    var wrappedResponses = mapStreamSink(responses, JSON.encode);
-    return new Server.withoutJson(requests.expand((request) {
-      var decodedRequest;
-      try {
-        decodedRequest = JSON.decode(request);
-      } on FormatException catch (error) {
-        wrappedResponses.add(new RpcException(error_code.PARSE_ERROR,
-            'Invalid JSON: ${error.message}').serialize(request));
-        return [];
-      }
-
-      return [decodedRequest];
-    }), wrappedResponses);
+  Server(Stream<String> requests, [StreamSink<String> responses]) {
+    _streams = new TwoWayStream("Server", requests, "requests",
+        responses, "responses", onInvalidInput: (message, error) {
+      _streams.add(new RpcException(error_code.PARSE_ERROR,
+          'Invalid JSON: ${error.message}').serialize(message));
+    });
   }
 
   /// Creates a [Server] that reads decoded requests from [requests] and writes
@@ -93,14 +66,8 @@
   /// Note that the server won't begin listening to [requests] until
   /// [Server.listen] is called.
   Server.withoutJson(Stream requests, [StreamSink responses])
-      : _requests = requests,
-        _responses = responses == null && requests is StreamSink ?
-            requests : responses {
-    if (_responses == null) {
-      throw new ArgumentError("Either `requests` must be a StreamSink or "
-          "`responses` must be passed.");
-    }
-  }
+      : _streams = new TwoWayStream.withoutJson(
+            "Server", requests, "requests", responses, "responses");
 
   /// Starts listening to the underlying stream.
   ///
@@ -108,45 +75,14 @@
   /// has an error.
   ///
   /// [listen] may only be called once.
-  Future listen() {
-    if (_listenCompleter != null) {
-      throw new StateError(
-          "Can only call Server.listen once on a given server.");
-    }
-
-    _listenCompleter = new Completer();
-    _requestSubscription = _requests.listen(_handleRequest,
-        onError: (error, stackTrace) {
-      if (_listenCompleter.isCompleted) return;
-      _responses.close();
-      _listenCompleter.completeError(error, stackTrace);
-    }, onDone: () {
-      if (_listenCompleter.isCompleted) return;
-      _responses.close();
-      _listenCompleter.complete();
-    }, cancelOnError: true);
-
-    return _listenCompleter.future;
-  }
+  Future listen() => _streams.listen(_handleRequest);
 
   /// Closes the server's request subscription and response sink.
   ///
   /// Returns a [Future] that completes when all resources have been released.
   ///
   /// A server can't be closed before [listen] has been called.
-  Future close() {
-    if (_listenCompleter == null) {
-      throw new StateError("Can't call Server.close before Server.listen.");
-    }
-
-    if (!_listenCompleter.isCompleted) _listenCompleter.complete();
-
-    var subscriptionFuture = _requestSubscription.cancel();
-    // TODO(nweiz): include the response future in the return value when issue
-    // 19095 is fixed.
-    _responses.close();
-    return subscriptionFuture == null ? new Future.value() : subscriptionFuture;
-  }
+  Future close() => _streams.close();
 
   /// Registers a method named [name] on this server.
   ///
@@ -199,7 +135,7 @@
         var nonNull = results.where((result) => result != null);
         return nonNull.isEmpty ? null : nonNull.toList();
       });
-    }).then(_responses.add);
+    }).then(_streams.add);
   }
 
   /// Handles an individual parsed request.
diff --git a/lib/src/two_way_stream.dart b/lib/src/two_way_stream.dart
new file mode 100644
index 0000000..f470c2f
--- /dev/null
+++ b/lib/src/two_way_stream.dart
@@ -0,0 +1,134 @@
+// Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library json_rpc_2.two_way_stream;
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'utils.dart';
+
+/// A class for managing a stream of input messages and a sink for output
+/// messages.
+///
+/// This contains stream logic that's shared between [Server] and [Client].
+class TwoWayStream {
+  /// The name of the component whose streams are being managed (e.g. "Server").
+  ///
+  /// Used for error reporting.
+  final String _name;
+
+  /// The input stream.
+  ///
+  /// This is a stream of decoded JSON objects.
+  final Stream _input;
+
+  /// The subscription to [_input].
+  StreamSubscription _inputSubscription;
+
+  /// The output sink.
+  ///
+  /// This takes decoded JSON objects.
+  final StreamSink _output;
+
+  /// The completer for [listen].
+  ///
+  /// This is non-`null` after [listen] has been called.
+  Completer _listenCompleter;
+
+  /// Creates a two-way stream.
+  ///
+  /// [input] and [output] should emit and take (respectively) JSON-encoded
+  /// strings.
+  ///
+  /// [inputName] is used in error messages as the name of the input parameter.
+  /// [outputName] is likewise used as the name of the output parameter.
+  ///
+  /// If [onInvalidInput] is passed, any errors parsing messages from [input]
+  /// are passed to it. Otherwise, they're ignored and the input is discarded.
+  factory TwoWayStream(String name, Stream<String> input, String inputName,
+      StreamSink<String> output, String outputName,
+      {void onInvalidInput(String message, FormatException error)}) {
+    if (output == null) {
+      if (input is! StreamSink) {
+        throw new ArgumentError("Either `$inputName` must be a StreamSink or "
+            "`$outputName` must be passed.");
+      }
+      output = input as StreamSink;
+    }
+
+    var wrappedOutput = mapStreamSink(output, JSON.encode);
+    return new TwoWayStream.withoutJson(name, input.expand((message) {
+      var decodedMessage;
+      try {
+        decodedMessage = JSON.decode(message);
+      } on FormatException catch (error) {
+        if (onInvalidInput != null) onInvalidInput(message, error);
+        return [];
+      }
+
+      return [decodedMessage];
+    }), inputName, wrappedOutput, outputName);
+  }
+
+  /// Creates a two-way stream that reads decoded input and writes decoded
+  /// responses.
+  ///
+  /// [input] and [output] should emit and take (respectively) decoded JSON
+  /// objects.
+  ///
+  /// [inputName] is used in error messages as the name of the input parameter.
+  /// [outputName] is likewise used as the name of the output parameter.
+  TwoWayStream.withoutJson(this._name, Stream input, String inputName,
+          StreamSink output, String outputName)
+      : _input = input,
+        _output = output == null && input is StreamSink ? input : output {
+    if (_output == null) {
+      throw new ArgumentError("Either `$inputName` must be a StreamSink or "
+          "`$outputName` must be passed.");
+    }
+  }
+
+  /// Starts listening to the input stream.
+  ///
+  /// The returned Future will complete when the input stream is closed. If the
+  /// input stream emits an error, that will be piped to the returned Future.
+  Future listen(void handleInput(input)) {
+    if (_listenCompleter != null) {
+      throw new StateError("Can only call $_name.listen once.");
+    }
+
+    _listenCompleter = new Completer();
+    _inputSubscription = _input.listen(handleInput,
+        onError: (error, stackTrace) {
+      if (_listenCompleter.isCompleted) return;
+      _output.close();
+      _listenCompleter.completeError(error, stackTrace);
+    }, onDone: () {
+      if (_listenCompleter.isCompleted) return;
+      _output.close();
+      _listenCompleter.complete();
+    }, cancelOnError: true);
+
+    return _listenCompleter.future;
+  }
+
+  /// Emit [event] on the output stream.
+  void add(event) => _output.add(event);
+
+  /// Stops listening to the input stream and closes the output stream.
+  Future close() {
+    if (_listenCompleter == null) {
+      throw new StateError("Can't call $_name.close before $_name.listen.");
+    }
+
+    if (!_listenCompleter.isCompleted) _listenCompleter.complete();
+
+    var inputFuture = _inputSubscription.cancel();
+    // TODO(nweiz): include the output future in the return value when issue
+    // 19095 is fixed.
+    _output.close();
+    return inputFuture == null ? new Future.value() : inputFuture;
+  }
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index df1a8a0..e0fca72 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: json_rpc_2
-version: 0.1.0
+version: 0.1.1-dev
 author: Dart Team <misc@dartlang.org>
 description: An implementation of the JSON-RPC 2.0 spec.
 homepage: http://www.dartlang.org