Use StreamChannel.

This converts the constructors to take StreamChannels, and changes some
edge-case semantics to be more familiar to StreamChannel (and WebSocket)
users.

R=rnystrom@google.com

Review URL: https://codereview.chromium.org//1652413002 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 524cbad..649c738 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,21 @@
+## 2.0.0
+
+* **Breaking change:** all constructors now take a `StreamChannel` rather than a
+  `Stream`/`StreamSink` pair.
+
+* `Client.sendRequest()` and `Client.sendNotification()` no longer throw
+  `StateError`s after the connection has been closed but before `Client.close()`
+  has been called.
+
+* The various `close()` methods may now be called before their corresponding
+  `listen()` methods.
+
+* The various `close()` methods now wait on the result of closing the underlying
+  `StreamSink`. Be aware that [in some circumstances][issue 19095]
+  `StreamController`s' `Sink.close()` futures may never complete.
+
+[issue 19095]: https://github.com/dart-lang/sdk/issues/19095
+
 ## 1.2.0
 
 * Add `Client.isClosed` and `Server.isClosed`, which make it possible to
diff --git a/README.md b/README.md
index 67afc31..8078fa2 100644
--- a/README.md
+++ b/README.md
@@ -9,68 +9,66 @@
 
 ```dart
 import "package:json_rpc_2/json_rpc_2.dart" as json_rpc;
+import "package:stream_channel/stream_channel.dart";
 
-void main() {
-  WebSocket.connect('ws://localhost:4321').then((socket) {
-    // You can start the server with a Stream for requests and a StreamSink for
-    // responses, or with an object that's both, like a WebSocket.
-    var server = new json_rpc.Server(socket);
+main() async {
+  var socket = await WebSocket.connect('ws://localhost:4321');
+  var server = new json_rpc.Server(new StreamChannel(socket, socket));
 
-    // Any string may be used as a method name. JSON-RPC 2.0 methods are
-    // case-sensitive.
-    var i = 0;
-    server.registerMethod("count", () {
-      // Just return the value to be sent as a response to the client. This can
-      // be anything JSON-serializable, or a Future that completes to something
-      // JSON-serializable.
-      return i++;
-    });
-
-    // Methods can take parameters. They're presented as a [Parameters] object
-    // which makes it easy to validate that the expected parameters exist.
-    server.registerMethod("echo", (params) {
-      // If the request doesn't have a "message" parameter, this will
-      // automatically send a response notifying the client that the request
-      // was invalid.
-      return params.getNamed("message");
-    });
-
-    // [Parameters] has methods for verifying argument types.
-    server.registerMethod("subtract", (params) {
-      // If "minuend" or "subtrahend" aren't numbers, this will reject the
-      // request.
-      return params.getNum("minuend") - params.getNum("subtrahend");
-    });
-
-    // [Parameters] also supports optional arguments.
-    server.registerMethod("sort", (params) {
-      var list = params.getList("list");
-      list.sort();
-      if (params.getBool("descending", orElse: () => false)) {
-        return params.list.reversed;
-      } else {
-        return params.list;
-      }
-    });
-
-    // A method can send an error response by throwing a
-    // `json_rpc.RpcException`. Any positive number may be used as an
-    // application- defined error code.
-    const DIVIDE_BY_ZERO = 1;
-    server.registerMethod("divide", (params) {
-      var divisor = params.getNum("divisor");
-      if (divisor == 0) {
-        throw new json_rpc.RpcException(
-            DIVIDE_BY_ZERO, "Cannot divide by zero.");
-      }
-
-      return params.getNum("dividend") / divisor;
-    });
-
-    // To give you time to register all your methods, the server won't actually
-    // start listening for requests until you call `listen`.
-    server.listen();
+  // Any string may be used as a method name. JSON-RPC 2.0 methods are
+  // case-sensitive.
+  var i = 0;
+  server.registerMethod("count", () {
+    // Just return the value to be sent as a response to the client. This can
+    // be anything JSON-serializable, or a Future that completes to something
+    // JSON-serializable.
+    return i++;
   });
+
+  // Methods can take parameters. They're presented as a [Parameters] object
+  // which makes it easy to validate that the expected parameters exist.
+  server.registerMethod("echo", (params) {
+    // If the request doesn't have a "message" parameter, this will
+    // automatically send a response notifying the client that the request
+    // was invalid.
+    return params.getNamed("message");
+  });
+
+  // [Parameters] has methods for verifying argument types.
+  server.registerMethod("subtract", (params) {
+    // If "minuend" or "subtrahend" aren't numbers, this will reject the
+    // request.
+    return params.getNum("minuend") - params.getNum("subtrahend");
+  });
+
+  // [Parameters] also supports optional arguments.
+  server.registerMethod("sort", (params) {
+    var list = params.getList("list");
+    list.sort();
+    if (params.getBool("descending", orElse: () => false)) {
+      return params.list.reversed;
+    } else {
+      return params.list;
+    }
+  });
+
+  // A method can send an error response by throwing a
+  // `json_rpc.RpcException`. Any positive number may be used as an
+  // application- defined error code.
+  const DIVIDE_BY_ZERO = 1;
+  server.registerMethod("divide", (params) {
+    var divisor = params.getNum("divisor");
+    if (divisor == 0) {
+      throw new json_rpc.RpcException(
+          DIVIDE_BY_ZERO, "Cannot divide by zero.");
+    }
+
+    return params.getNum("dividend") / divisor;
+  });
+
+  // To give you time to register all your methods, the server won't actually
+  // start listening for requests until you call `listen`.
+  server.listen();
 }
 ```
 
@@ -82,38 +80,36 @@
 
 ```dart
 import "package:json_rpc_2/json_rpc_2.dart" as json_rpc;
+import "package:stream_channel/stream_channel.dart";
 
-void main() {
-  WebSocket.connect('ws://localhost:4321').then((socket) {
-    // Just like the server, a client takes a Stream and a StreamSink or a
-    // single object that's both.
-    var client = new json_rpc.Client(socket);
+main() async {
+  var socket = await WebSocket.connect('ws://localhost:4321');
+  var client = new json_rpc.Client(new StreamChannel(socket, socket));
 
-    // This calls the "count" method on the server. A Future is returned that
-    // will complete to the value contained in the server's response.
-    client.sendRequest("count").then((result) => print("Count is $result."));
+  // This calls the "count" method on the server. A Future is returned that
+  // will complete to the value contained in the server's response.
+  client.sendRequest("count").then((result) => print("Count is $result."));
 
-    // Parameters are passed as a simple Map or, for positional parameters, an
-    // Iterable. Make sure they're JSON-serializable!
-    client.sendRequest("echo", {"message": "hello"})
-        .then((echo) => print('Echo says "$echo"!'));
+  // Parameters are passed as a simple Map or, for positional parameters, an
+  // Iterable. Make sure they're JSON-serializable!
+  client.sendRequest("echo", {"message": "hello"})
+      .then((echo) => print('Echo says "$echo"!'));
 
-    // A notification is a way to call a method that tells the server that no
-    // result is expected. Its return type is `void`; even if it causes an
-    // error, you won't hear back.
-    client.sendNotification("count");
+  // A notification is a way to call a method that tells the server that no
+  // result is expected. Its return type is `void`; even if it causes an
+  // error, you won't hear back.
+  client.sendNotification("count");
 
-    // If the server sends an error response, the returned Future will complete
-    // with an RpcException. You can catch this error and inspect its error
-    // code, message, and any data that the server sent along with it.
-    client.sendRequest("divide", {"dividend": 2, "divisor": 0})
-        .catchError((error) {
-      print("RPC error ${error.code}: ${error.message}");
-    });
-
-    // The client won't subscribe to the input stream until you call `listen`.
-    client.listen();
+  // If the server sends an error response, the returned Future will complete
+  // with an RpcException. You can catch this error and inspect its error
+  // code, message, and any data that the server sent along with it.
+  client.sendRequest("divide", {"dividend": 2, "divisor": 0})
+      .catchError((error) {
+    print("RPC error ${error.code}: ${error.message}");
   });
+
+  // The client won't subscribe to the input stream until you call `listen`.
+  client.listen();
 }
 ```
 
diff --git a/lib/src/channel_manager.dart b/lib/src/channel_manager.dart
new file mode 100644
index 0000000..35d75a6
--- /dev/null
+++ b/lib/src/channel_manager.dart
@@ -0,0 +1,79 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_channel/stream_channel.dart';
+
+/// Wraps a [StreamChannel] and handles logic that's shared between [Server],
+/// [Client], and [Peer].
+///
+/// These classes don't provide the user direct access to a
+/// [StreamSubscription]. Instead, they use the future returned by [listen] to
+/// notify the user of the remote endpoint closing or producing an error.
+class ChannelManager {
+  /// The name of the component whose channel is wrapped (e.g. "Server").
+  ///
+  /// Used for error reporting.
+  final String _name;
+
+  /// The underlying channel.
+  final StreamChannel _channel;
+
+  /// Returns a [Future] that completes when the connection is closed.
+  ///
+  /// This is the same future that's returned by [listen].
+  Future get done => _doneCompleter.future;
+  final _doneCompleter = new Completer();
+
+  /// Whether the underlying communication channel is closed.
+  bool get isClosed => _doneCompleter.isCompleted;
+
+  /// Whether [listen] has been called.
+  bool _listenCalled = false;
+
+  /// Whether [close] has been called.
+  ///
+  /// Note that [isClosed] tracks whether the underlying connection is closed,
+  /// whereas this tracks only whether it was explicitly closed from this end.
+  bool _closeCalled = false;
+
+  ChannelManager(this._name, this._channel);
+
+  /// Starts listening to the channel.
+  ///
+  /// The returned Future will complete when the input stream is closed. If the
+  /// input stream emits an error, that will be piped to the returned Future.
+  Future listen(void handleInput(input)) {
+    if (_listenCalled) {
+      throw new StateError("Can only call $_name.listen() once.");
+    }
+    _listenCalled = true;
+
+    _channel.stream.listen(handleInput,
+        onError: (error, stackTrace) {
+          _doneCompleter.completeError(error, stackTrace);
+          _channel.sink.close();
+        },
+        onDone: _doneCompleter.complete,
+        cancelOnError: true);
+
+    return done;
+  }
+
+  /// Emit [event].
+  void add(event) {
+    if (isClosed && !_closeCalled) return;
+    _channel.sink.add(event);
+  }
+
+  /// Closes the channel.
+  Future close() {
+    _closeCalled = true;
+    if (!_doneCompleter.isCompleted) {
+      _doneCompleter.complete(_channel.sink.close());
+    }
+    return done;
+  }
+}
diff --git a/lib/src/client.dart b/lib/src/client.dart
index e75e7f3..5df21e4 100644
--- a/lib/src/client.dart
+++ b/lib/src/client.dart
@@ -5,9 +5,10 @@
 import 'dart:async';
 
 import 'package:stack_trace/stack_trace.dart';
+import 'package:stream_channel/stream_channel.dart';
 
+import 'channel_manager.dart';
 import 'exception.dart';
-import 'two_way_stream.dart';
 import 'utils.dart';
 
 /// A JSON-RPC 2.0 client.
@@ -16,7 +17,7 @@
 /// those method calls. Methods can be called with [sendRequest], or with
 /// [sendNotification] if no response is expected.
 class Client {
-  final TwoWayStream _streams;
+  final ChannelManager _manager;
 
   /// The next request id.
   var _id = 0;
@@ -29,55 +30,53 @@
   /// The map of request ids to pending requests.
   final _pendingRequests = new Map<int, _Request>();
 
-  /// Returns a [Future] that completes when the connection is closed.
+  /// Returns a [Future] that completes when the underlying connection is
+  /// closed.
   ///
-  /// This is the same future that's returned by [listen].
-  Future get done => _streams.done;
+  /// This is the same future that's returned by [listen] and [close]. It may
+  /// complete before [close] is called if the remote endpoint closes the
+  /// connection.
+  Future get done => _manager.done;
 
-  /// Whether the connection is closed.
-  bool get isClosed => _streams.isClosed;
-
-  /// Creates a [Client] that writes requests to [requests] and reads responses
-  /// from [responses].
+  /// Whether the underlying connection is closed.
   ///
-  /// If [responses] is a [StreamSink] as well as a [Stream] (for example, a
-  /// `WebSocket`), [requests] may be omitted.
+  /// Note that this will be `true` before [close] is called if the remote
+  /// endpoint closes the connection.
+  bool get isClosed => _manager.isClosed;
+
+  /// Creates a [Client] that communicates over [channel].
   ///
   /// Note that the client won't begin listening to [responses] until
   /// [Client.listen] is called.
-  Client(Stream<String> responses, [StreamSink<String> requests])
-      : _streams = new TwoWayStream(
-            "Client", responses, "responses", requests, "requests");
+  Client(StreamChannel<String> channel)
+      : this.withoutJson(channel
+            .transform(jsonDocument)
+            .transformStream(ignoreFormatExceptions));
 
-  /// Creates a [Client] that writes decoded responses to [responses] and reads
-  /// decoded requests from [requests].
+  /// Creates a [Client] that communicates using decoded messages over
+  /// [channel].
   ///
   /// Unlike [new Client], this doesn't read or write JSON strings. Instead, it
   /// reads and writes decoded maps or lists.
   ///
-  /// If [responses] is a [StreamSink] as well as a [Stream], [requests] may be
-  /// omitted.
-  ///
   /// Note that the client won't begin listening to [responses] until
   /// [Client.listen] is called.
-  Client.withoutJson(Stream responses, [StreamSink requests])
-      : _streams = new TwoWayStream.withoutJson(
-            "Client", responses, "responses", requests, "requests");
+  Client.withoutJson(StreamChannel channel)
+      : _manager = new ChannelManager("Client", channel);
 
   /// Starts listening to the underlying stream.
   ///
-  /// Returns a [Future] that will complete when the stream is closed or when it
-  /// has an error.
+  /// Returns a [Future] that will complete when the connection is closed or
+  /// when it has an error. This is the same as [done].
   ///
   /// [listen] may only be called once.
-  Future listen() => _streams.listen(_handleResponse);
+  Future listen() => _manager.listen(_handleResponse);
 
-  /// Closes the server's request sink and response subscription.
+  /// Closes the underlying connection.
   ///
   /// Returns a [Future] that completes when all resources have been released.
-  ///
-  /// A client can't be closed before [listen] has been called.
-  Future close() => _streams.close();
+  /// This is the same as [done].
+  Future close() => _manager.close();
 
   /// Sends a JSON-RPC 2 request to invoke the given [method].
   ///
@@ -132,7 +131,7 @@
     if (_batch != null) {
       _batch.add(message);
     } else {
-      _streams.add(message);
+      _manager.add(message);
     }
   }
 
@@ -153,7 +152,7 @@
 
     _batch = [];
     return tryFinally(callback, () {
-      _streams.add(_batch);
+      _manager.add(_batch);
       _batch = null;
     });
   }
diff --git a/lib/src/peer.dart b/lib/src/peer.dart
index a6707e2..810d173 100644
--- a/lib/src/peer.dart
+++ b/lib/src/peer.dart
@@ -4,12 +4,13 @@
 
 import 'dart:async';
 
-import '../error_code.dart' as error_code;
+import 'package:stream_channel/stream_channel.dart';
+
+import 'channel_manager.dart';
 import 'client.dart';
-import 'exception.dart';
 import 'parameters.dart';
 import 'server.dart';
-import 'two_way_stream.dart';
+import 'utils.dart';
 
 /// A JSON-RPC 2.0 client *and* server.
 ///
@@ -17,7 +18,7 @@
 /// 2.0 endpoint. It sends both requests and responses across the same
 /// communication channel and expects to connect to a peer that does the same.
 class Peer implements Client, Server {
-  TwoWayStream _streams;
+  final ChannelManager _manager;
 
   /// The underlying client that handles request-sending and response-receiving
   /// logic.
@@ -35,55 +36,31 @@
   /// they're responses.
   final _clientIncomingForwarder = new StreamController(sync: true);
 
-  /// A stream controller that forwards outgoing messages from both [_server]
-  /// and [_client].
-  final _outgoingForwarder = new StreamController(sync: true);
+  Future get done => _manager.done;
+  bool get isClosed => _manager.isClosed;
 
-  Future get done => _streams.done;
-  bool get isClosed => _streams.isClosed;
-
-  /// Creates a [Peer] that reads incoming messages from [incoming] and writes
-  /// outgoing messages to [outgoing].
+  /// Creates a [Peer] that communicates over [channel].
   ///
-  /// If [incoming] is a [StreamSink] as well as a [Stream] (for example, a
-  /// `WebSocket`), [outgoing] may be omitted.
-  ///
-  /// Note that the peer won't begin listening to [incoming] until [Peer.listen]
+  /// Note that the peer won't begin listening to [channel] until [Peer.listen]
   /// is called.
-  Peer(Stream<String> incoming, [StreamSink<String> outgoing]) {
-    _streams = new TwoWayStream("Peer", incoming, "incoming",
-        outgoing, "outgoing", onInvalidInput: (message, error) {
-      _streams.add(new RpcException(error_code.PARSE_ERROR,
-          'Invalid JSON: ${error.message}').serialize(message));
-    });
+  Peer(StreamChannel<String> channel)
+      : this.withoutJson(channel
+            .transform(jsonDocument)
+            .transform(respondToFormatExceptions));
 
-    _outgoingForwarder.stream.listen(_streams.add);
-    _server = new Server.withoutJson(
-        _serverIncomingForwarder.stream, _outgoingForwarder);
-    _client = new Client.withoutJson(
-        _clientIncomingForwarder.stream, _outgoingForwarder);
-  }
-
-  /// Creates a [Peer] that reads incoming decoded messages from [incoming] and
-  /// writes outgoing decoded messages to [outgoing].
+  /// Creates a [Peer] that communicates using decoded messages over [channel].
   ///
   /// Unlike [new Peer], this doesn't read or write JSON strings. Instead, it
   /// reads and writes decoded maps or lists.
   ///
-  /// If [incoming] is a [StreamSink] as well as a [Stream], [outgoing] may be
-  /// omitted.
-  ///
-  /// Note that the peer won't begin listening to [incoming] until
+  /// Note that the peer won't begin listening to [channel] until
   /// [Peer.listen] is called.
-  Peer.withoutJson(Stream incoming, [StreamSink outgoing]) {
-    _streams = new TwoWayStream.withoutJson("Peer", incoming, "incoming",
-        outgoing, "outgoing");
-
-    _outgoingForwarder.stream.listen(_streams.add);
-    _server = new Server.withoutJson(
-        _serverIncomingForwarder.stream, _outgoingForwarder);
-    _client = new Client.withoutJson(
-        _clientIncomingForwarder.stream, _outgoingForwarder);
+  Peer.withoutJson(StreamChannel channel)
+      : _manager = new ChannelManager("Peer", channel) {
+    _server = new Server.withoutJson(new StreamChannel(
+        _serverIncomingForwarder.stream, channel.sink));
+    _client = new Client.withoutJson(new StreamChannel(
+        _clientIncomingForwarder.stream, channel.sink));
   }
 
   // Client methods.
@@ -109,7 +86,7 @@
   Future listen() {
     _client.listen();
     _server.listen();
-    return _streams.listen((message) {
+    return _manager.listen((message) {
       if (message is Map) {
         if (message.containsKey('result') || message.containsKey('error')) {
           _clientIncomingForwarder.add(message);
@@ -133,5 +110,5 @@
   }
 
   Future close() =>
-      Future.wait([_client.close(), _server.close(), _streams.close()]);
+      Future.wait([_client.close(), _server.close(), _manager.close()]);
 }
diff --git a/lib/src/server.dart b/lib/src/server.dart
index 06dd0b9..3ef59ed 100644
--- a/lib/src/server.dart
+++ b/lib/src/server.dart
@@ -7,11 +7,12 @@
 import 'dart:convert';
 
 import 'package:stack_trace/stack_trace.dart';
+import 'package:stream_channel/stream_channel.dart';
 
 import '../error_code.dart' as error_code;
+import 'channel_manager.dart';
 import 'exception.dart';
 import 'parameters.dart';
-import 'two_way_stream.dart';
 import 'utils.dart';
 
 /// A JSON-RPC 2.0 server.
@@ -25,7 +26,7 @@
 /// asynchronously, it's possible for multiple methods to be invoked at the same
 /// time, or even for a single method to be invoked multiple times at once.
 class Server {
-  TwoWayStream _streams;
+  final ChannelManager _manager;
 
   /// The methods registered for this server.
   final _methods = new Map<String, Function>();
@@ -36,59 +37,53 @@
   /// [RpcException.methodNotFound] exception.
   final _fallbacks = new Queue<Function>();
 
-  /// Returns a [Future] that completes when the connection is closed.
+  /// Returns a [Future] that completes when the underlying connection is
+  /// closed.
   ///
-  /// This is the same future that's returned by [listen].
-  Future get done => _streams.done;
+  /// This is the same future that's returned by [listen] and [close]. It may
+  /// complete before [close] is called if the remote endpoint closes the
+  /// connection.
+  Future get done => _manager.done;
 
-  /// Whether the connection is closed.
-  bool get isClosed => _streams.isClosed;
-
-  /// Creates a [Server] that reads requests from [requests] and writes
-  /// responses to [responses].
+  /// Whether the underlying connection is closed.
   ///
-  /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a
-  /// `WebSocket`), [responses] may be omitted.
+  /// Note that this will be `true` before [close] is called if the remote
+  /// endpoint closes the connection.
+  bool get isClosed => _manager.isClosed;
+
+  /// Creates a [Server] that communicates over [channel].
   ///
   /// Note that the server won't begin listening to [requests] until
   /// [Server.listen] is called.
-  Server(Stream<String> requests, [StreamSink<String> responses]) {
-    _streams = new TwoWayStream("Server", requests, "requests",
-        responses, "responses", onInvalidInput: (message, error) {
-      _streams.add(new RpcException(error_code.PARSE_ERROR,
-          'Invalid JSON: ${error.message}').serialize(message));
-    });
-  }
+  Server(StreamChannel<String> channel)
+      : this.withoutJson(channel
+            .transform(jsonDocument)
+            .transform(respondToFormatExceptions));
 
-  /// Creates a [Server] that reads decoded requests from [requests] and writes
-  /// decoded responses to [responses].
+  /// Creates a [Server] that communicates using decoded messages over
+  /// [channel].
   ///
   /// Unlike [new Server], this doesn't read or write JSON strings. Instead, it
   /// reads and writes decoded maps or lists.
   ///
-  /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be
-  /// omitted.
-  ///
   /// Note that the server won't begin listening to [requests] until
   /// [Server.listen] is called.
-  Server.withoutJson(Stream requests, [StreamSink responses])
-      : _streams = new TwoWayStream.withoutJson(
-            "Server", requests, "requests", responses, "responses");
+  Server.withoutJson(StreamChannel channel)
+      : _manager = new ChannelManager("Server", channel);
 
   /// Starts listening to the underlying stream.
   ///
-  /// Returns a [Future] that will complete when the stream is closed or when it
-  /// has an error.
+  /// Returns a [Future] that will complete when the connection is closed or
+  /// when it has an error. This is the same as [done].
   ///
   /// [listen] may only be called once.
-  Future listen() => _streams.listen(_handleRequest);
+  Future listen() => _manager.listen(_handleRequest);
 
-  /// Closes the server's request subscription and response sink.
+  /// Closes the underlying connection.
   ///
   /// Returns a [Future] that completes when all resources have been released.
-  ///
-  /// A server can't be closed before [listen] has been called.
-  Future close() => _streams.close();
+  /// This is the same as [done].
+  Future close() => _manager.close();
 
   /// Registers a method named [name] on this server.
   ///
@@ -129,21 +124,24 @@
   /// handling that request and returns a JSON-serializable response, or `null`
   /// if no response should be sent. [callback] may send custom
   /// errors by throwing an [RpcException].
-  Future _handleRequest(request) {
-    return syncFuture(() {
-      if (request is! List) return _handleSingleRequest(request);
-      if (request.isEmpty) {
-        return new RpcException(error_code.INVALID_REQUEST, 'A batch must '
-            'contain at least one request.').serialize(request);
-      }
+  Future _handleRequest(request) async {
+    var response;
+    if (request is! List) {
+      response = await _handleSingleRequest(request);
+      if (response == null) return;
+    } else if (request.isEmpty) {
+      response = new RpcException(
+              error_code.INVALID_REQUEST,
+              'A batch must contain at least one request.')
+          .serialize(request);
+    } else {
+      var results = await Future.wait(request.map(_handleSingleRequest));
+      var nonNull = results.where((result) => result != null);
+      if (nonNull.isEmpty) return;
+      response = nonNull.toList();
+    }
 
-      return Future.wait(request.map(_handleSingleRequest)).then((results) {
-        var nonNull = results.where((result) => result != null);
-        return nonNull.isEmpty ? null : nonNull.toList();
-      });
-    }).then((response) {
-      if (!_streams.isClosed && response != null) _streams.add(response);
-    });
+    if (!isClosed) _manager.add(response);
   }
 
   /// Handles an individual parsed request.
diff --git a/lib/src/two_way_stream.dart b/lib/src/two_way_stream.dart
deleted file mode 100644
index 4f20686..0000000
--- a/lib/src/two_way_stream.dart
+++ /dev/null
@@ -1,135 +0,0 @@
-// Copyright (c) 2014, the Dart project authors.  Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-import 'dart:async';
-import 'dart:convert';
-
-import 'utils.dart';
-
-/// A class for managing a stream of input messages and a sink for output
-/// messages.
-///
-/// This contains stream logic that's shared between [Server] and [Client].
-class TwoWayStream {
-  /// The name of the component whose streams are being managed (e.g. "Server").
-  ///
-  /// Used for error reporting.
-  final String _name;
-
-  /// The input stream.
-  ///
-  /// This is a stream of decoded JSON objects.
-  final Stream _input;
-
-  /// The subscription to [_input].
-  StreamSubscription _inputSubscription;
-
-  /// The output sink.
-  ///
-  /// This takes decoded JSON objects.
-  final StreamSink _output;
-
-  /// Returns a [Future] that completes when the connection is closed.
-  ///
-  /// This is the same future that's returned by [listen].
-  Future get done => _doneCompleter.future;
-  final _doneCompleter = new Completer();
-
-  /// Whether the stream has been closed.
-  bool get isClosed => _doneCompleter.isCompleted;
-
-  /// Creates a two-way stream.
-  ///
-  /// [input] and [output] should emit and take (respectively) JSON-encoded
-  /// strings.
-  ///
-  /// [inputName] is used in error messages as the name of the input parameter.
-  /// [outputName] is likewise used as the name of the output parameter.
-  ///
-  /// If [onInvalidInput] is passed, any errors parsing messages from [input]
-  /// are passed to it. Otherwise, they're ignored and the input is discarded.
-  factory TwoWayStream(String name, Stream<String> input, String inputName,
-      StreamSink<String> output, String outputName,
-      {void onInvalidInput(String message, FormatException error)}) {
-    if (output == null) {
-      if (input is! StreamSink) {
-        throw new ArgumentError("Either `$inputName` must be a StreamSink or "
-            "`$outputName` must be passed.");
-      }
-      output = input as StreamSink;
-    }
-
-    var wrappedOutput = mapStreamSink(output, JSON.encode);
-    return new TwoWayStream.withoutJson(name, input.expand((message) {
-      var decodedMessage;
-      try {
-        decodedMessage = JSON.decode(message);
-      } on FormatException catch (error) {
-        if (onInvalidInput != null) onInvalidInput(message, error);
-        return [];
-      }
-
-      return [decodedMessage];
-    }), inputName, wrappedOutput, outputName);
-  }
-
-  /// Creates a two-way stream that reads decoded input and writes decoded
-  /// responses.
-  ///
-  /// [input] and [output] should emit and take (respectively) decoded JSON
-  /// objects.
-  ///
-  /// [inputName] is used in error messages as the name of the input parameter.
-  /// [outputName] is likewise used as the name of the output parameter.
-  TwoWayStream.withoutJson(this._name, Stream input, String inputName,
-          StreamSink output, String outputName)
-      : _input = input,
-        _output = output == null && input is StreamSink ? input : output {
-    if (_output == null) {
-      throw new ArgumentError("Either `$inputName` must be a StreamSink or "
-          "`$outputName` must be passed.");
-    }
-  }
-
-  /// Starts listening to the input stream.
-  ///
-  /// The returned Future will complete when the input stream is closed. If the
-  /// input stream emits an error, that will be piped to the returned Future.
-  Future listen(void handleInput(input)) {
-    if (_inputSubscription != null) {
-      throw new StateError("Can only call $_name.listen once.");
-    }
-
-    _inputSubscription = _input.listen(handleInput,
-        onError: (error, stackTrace) {
-      if (_doneCompleter.isCompleted) return;
-      _doneCompleter.completeError(error, stackTrace);
-      _output.close();
-    }, onDone: () {
-      if (_doneCompleter.isCompleted) return;
-      _doneCompleter.complete();
-      _output.close();
-    }, cancelOnError: true);
-
-    return _doneCompleter.future;
-  }
-
-  /// Emit [event] on the output stream.
-  void add(event) => _output.add(event);
-
-  /// Stops listening to the input stream and closes the output stream.
-  Future close() {
-    if (_inputSubscription == null) {
-      throw new StateError("Can't call $_name.close before $_name.listen.");
-    }
-
-    if (!_doneCompleter.isCompleted) _doneCompleter.complete();
-
-    var inputFuture = _inputSubscription.cancel();
-    // TODO(nweiz): include the output future in the return value when issue
-    // 19095 is fixed.
-    _output.close();
-    return inputFuture == null ? new Future.value() : inputFuture;
-  }
-}
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index dde6b4a..8871b56 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -5,6 +5,10 @@
 import 'dart:async';
 
 import 'package:stack_trace/stack_trace.dart';
+import 'package:stream_channel/stream_channel.dart';
+
+import '../error_code.dart' as error_code;
+import 'exception.dart';
 
 typedef ZeroArgumentFunction();
 
@@ -64,6 +68,35 @@
   }
 }
 
+/// A transformer that silently drops [FormatException]s.
+final ignoreFormatExceptions = new StreamTransformer.fromHandlers(
+    handleError: (error, stackTrace, sink) {
+  if (error is FormatException) return;
+  sink.addError(error, stackTrace);
+});
+
+/// A transformer that sends error responses on [FormatException]s.
+final StreamChannelTransformer respondToFormatExceptions =
+    new _RespondToFormatExceptionsTransformer();
+
+/// The implementation of [respondToFormatExceptions].
+class _RespondToFormatExceptionsTransformer
+    implements StreamChannelTransformer {
+  StreamChannel bind(StreamChannel channel) {
+    var transformed;
+    transformed = channel.changeStream((stream) {
+      return stream.handleError((error) {
+        if (error is! FormatException) throw error;
+
+        var exception = new RpcException(
+            error_code.PARSE_ERROR, 'Invalid JSON: ${error.message}');
+        transformed.sink.add(exception.serialize(error.source));
+      });
+    });
+    return transformed;
+  }
+}
+
 /// Returns a [StreamSink] that wraps [sink] and maps each event added using
 /// [callback].
 StreamSink mapStreamSink(StreamSink sink, callback(event)) =>
diff --git a/pubspec.yaml b/pubspec.yaml
index 348dc35..630b9cc 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,12 +1,12 @@
 name: json_rpc_2
-version: 1.2.0
+version: 2.0.0
 author: Dart Team <misc@dartlang.org>
 description: An implementation of the JSON-RPC 2.0 spec.
 homepage: http://github.com/dart-lang/json_rpc_2
 dependencies:
   stack_trace: '>=0.9.1 <2.0.0'
+  stream_channel: '^1.1.0'
 dev_dependencies:
   test: ">=0.12.0 <0.13.0"
 environment:
-  sdk: ">=1.2.0 <2.0.0"
-
+  sdk: ">=1.8.0 <2.0.0"
diff --git a/test/client/stream_test.dart b/test/client/stream_test.dart
index 146e753..b9a31c6 100644
--- a/test/client/stream_test.dart
+++ b/test/client/stream_test.dart
@@ -4,17 +4,25 @@
 
 import 'dart:async';
 
+import 'package:stream_channel/stream_channel.dart';
 import 'package:test/test.dart';
+
 import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
 
 import 'utils.dart';
 
 void main() {
+  var responseController;
+  var requestController;
+  var client;
+  setUp(() {
+    responseController = new StreamController();
+    requestController = new StreamController();
+    client = new json_rpc.Client.withoutJson(
+        new StreamChannel(responseController.stream, requestController.sink));
+  });
+
   test(".withoutJson supports decoded stream and sink", () {
-    var responseController = new StreamController();
-    var requestController = new StreamController();
-    var client = new json_rpc.Client.withoutJson(
-        responseController.stream, requestController.sink);
     client.listen();
 
     expect(requestController.stream.first.then((request) {
@@ -34,11 +42,6 @@
   });
 
   test(".listen returns when the controller is closed", () {
-    var responseController = new StreamController();
-    var requestController = new StreamController();
-    var client = new json_rpc.Client.withoutJson(
-        responseController.stream, requestController.sink);
-
     var hasListenCompeted = false;
     expect(client.listen().then((_) => hasListenCompeted = true), completes);
 
@@ -51,30 +54,18 @@
   });
 
   test(".listen returns a stream error", () {
-    var responseController = new StreamController();
-    var requestController = new StreamController();
-    var client = new json_rpc.Client(
-        responseController.stream, requestController.sink);
-
     expect(client.listen(), throwsA('oh no'));
     responseController.addError('oh no');
   });
 
   test(".listen can't be called twice", () {
-    var responseController = new StreamController();
-    var requestController = new StreamController();
-    var client = new json_rpc.Client(
-        responseController.stream, requestController.sink);
     client.listen();
-
     expect(() => client.listen(), throwsStateError);
   });
 
   test(".close cancels the stream subscription and closes the sink", () {
-    var responseController = new StreamController();
-    var requestController = new StreamController();
-    var client = new json_rpc.Client(
-        responseController.stream, requestController.sink);
+    // Work around sdk#19095.
+    requestController.stream.listen(null);
 
     expect(client.listen(), completes);
 
@@ -85,13 +76,4 @@
     expect(() => responseController.stream.listen((_) {}), throwsStateError);
     expect(requestController.isClosed, isTrue);
   });
-
-  test(".close can't be called before .listen", () {
-    var responseController = new StreamController();
-    var requestController = new StreamController();
-    var client = new json_rpc.Client(
-        responseController.stream, requestController.sink);
-
-    expect(() => client.close(), throwsStateError);
-  });
 }
diff --git a/test/client/utils.dart b/test/client/utils.dart
index 8684892..5eb0b60 100644
--- a/test/client/utils.dart
+++ b/test/client/utils.dart
@@ -5,9 +5,11 @@
 import 'dart:async';
 import 'dart:convert';
 
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
 import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
 import 'package:json_rpc_2/error_code.dart' as error_code;
-import 'package:test/test.dart';
 
 /// A controller used to test a [json_rpc.Client].
 class ClientController {
@@ -23,7 +25,7 @@
 
   ClientController() {
     _client = new json_rpc.Client(
-        _responseController.stream, _requestController.sink);
+        new StreamChannel(_responseController.stream, _requestController.sink));
     _client.listen();
   }
 
diff --git a/test/peer_test.dart b/test/peer_test.dart
index 7008b72..89ab2d6 100644
--- a/test/peer_test.dart
+++ b/test/peer_test.dart
@@ -5,7 +5,9 @@
 import 'dart:async';
 import 'dart:convert';
 
+import 'package:stream_channel/stream_channel.dart';
 import 'package:test/test.dart';
+
 import 'package:json_rpc_2/error_code.dart' as error_code;
 import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
 
@@ -19,7 +21,7 @@
     var outgoingController = new StreamController();
     outgoing = outgoingController.stream;
     peer = new json_rpc.Peer.withoutJson(
-        incomingController.stream, outgoingController);
+        new StreamChannel(incomingController.stream, outgoingController));
   });
 
   group("like a client,", () {
@@ -165,7 +167,7 @@
       var incomingController = new StreamController();
       var outgoingController = new StreamController();
       var jsonPeer = new json_rpc.Peer(
-          incomingController.stream, outgoingController);
+          new StreamChannel(incomingController.stream, outgoingController));
 
       expect(outgoingController.stream.first.then(JSON.decode), completion({
         "jsonrpc": "2.0",
diff --git a/test/server/stream_test.dart b/test/server/stream_test.dart
index 58c5e62..7be0001 100644
--- a/test/server/stream_test.dart
+++ b/test/server/stream_test.dart
@@ -4,17 +4,25 @@
 
 import 'dart:async';
 
+import 'package:stream_channel/stream_channel.dart';
 import 'package:test/test.dart';
+
 import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
 
 import 'utils.dart';
 
 void main() {
+  var requestController;
+  var responseController;
+  var server;
+  setUp(() {
+    requestController = new StreamController();
+    responseController = new StreamController();
+    server = new json_rpc.Server.withoutJson(
+        new StreamChannel(requestController.stream, responseController.sink));
+  });
+
   test(".withoutJson supports decoded stream and sink", () {
-    var requestController = new StreamController();
-    var responseController = new StreamController();
-    var server = new json_rpc.Server.withoutJson(
-        requestController.stream, responseController.sink);
     server.listen();
 
     server.registerMethod('foo', (params) {
@@ -36,11 +44,6 @@
   });
 
   test(".listen returns when the controller is closed", () {
-    var requestController = new StreamController();
-    var responseController = new StreamController();
-    var server = new json_rpc.Server(
-        requestController.stream, responseController.sink);
-
     var hasListenCompeted = false;
     expect(server.listen().then((_) => hasListenCompeted = true), completes);
 
@@ -53,30 +56,19 @@
   });
 
   test(".listen returns a stream error", () {
-    var requestController = new StreamController();
-    var responseController = new StreamController();
-    var server = new json_rpc.Server(
-        requestController.stream, responseController.sink);
-
     expect(server.listen(), throwsA('oh no'));
     requestController.addError('oh no');
   });
 
   test(".listen can't be called twice", () {
-    var requestController = new StreamController();
-    var responseController = new StreamController();
-    var server = new json_rpc.Server(
-        requestController.stream, responseController.sink);
     server.listen();
 
     expect(() => server.listen(), throwsStateError);
   });
 
   test(".close cancels the stream subscription and closes the sink", () {
-    var requestController = new StreamController();
-    var responseController = new StreamController();
-    var server = new json_rpc.Server(
-        requestController.stream, responseController.sink);
+    // Work around sdk#19095.
+    responseController.stream.listen(null);
 
     expect(server.listen(), completes);
 
@@ -87,13 +79,4 @@
     expect(() => requestController.stream.listen((_) {}), throwsStateError);
     expect(responseController.isClosed, isTrue);
   });
-
-  test(".close can't be called before .listen", () {
-    var requestController = new StreamController();
-    var responseController = new StreamController();
-    var server = new json_rpc.Server(
-        requestController.stream, responseController.sink);
-
-    expect(() => server.close(), throwsStateError);
-  });
 }
diff --git a/test/server/utils.dart b/test/server/utils.dart
index 9b4b020..f33cb13 100644
--- a/test/server/utils.dart
+++ b/test/server/utils.dart
@@ -5,9 +5,11 @@
 import 'dart:async';
 import 'dart:convert';
 
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
 import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
 import 'package:json_rpc_2/error_code.dart' as error_code;
-import 'package:test/test.dart';
 
 /// A controller used to test a [json_rpc.Server].
 class ServerController {
@@ -23,7 +25,7 @@
 
   ServerController() {
     _server = new json_rpc.Server(
-        _requestController.stream, _responseController.sink);
+        new StreamChannel(_requestController.stream, _responseController.sink));
     _server.listen();
   }