Use StreamChannel.
This converts the constructors to take StreamChannels, and changes some
edge-case semantics to be more familiar to StreamChannel (and WebSocket)
users.
R=rnystrom@google.com
Review URL: https://codereview.chromium.org//1652413002 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 524cbad..649c738 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,21 @@
+## 2.0.0
+
+* **Breaking change:** all constructors now take a `StreamChannel` rather than a
+ `Stream`/`StreamSink` pair.
+
+* `Client.sendRequest()` and `Client.sendNotification()` no longer throw
+ `StateError`s after the connection has been closed but before `Client.close()`
+ has been called.
+
+* The various `close()` methods may now be called before their corresponding
+ `listen()` methods.
+
+* The various `close()` methods now wait on the result of closing the underlying
+ `StreamSink`. Be aware that [in some circumstances][issue 19095]
+ `StreamController`s' `Sink.close()` futures may never complete.
+
+[issue 19095]: https://github.com/dart-lang/sdk/issues/19095
+
## 1.2.0
* Add `Client.isClosed` and `Server.isClosed`, which make it possible to
diff --git a/README.md b/README.md
index 67afc31..8078fa2 100644
--- a/README.md
+++ b/README.md
@@ -9,68 +9,66 @@
```dart
import "package:json_rpc_2/json_rpc_2.dart" as json_rpc;
+import "package:stream_channel/stream_channel.dart";
-void main() {
- WebSocket.connect('ws://localhost:4321').then((socket) {
- // You can start the server with a Stream for requests and a StreamSink for
- // responses, or with an object that's both, like a WebSocket.
- var server = new json_rpc.Server(socket);
+main() async {
+ var socket = await WebSocket.connect('ws://localhost:4321');
+ var server = new json_rpc.Server(new StreamChannel(socket, socket));
- // Any string may be used as a method name. JSON-RPC 2.0 methods are
- // case-sensitive.
- var i = 0;
- server.registerMethod("count", () {
- // Just return the value to be sent as a response to the client. This can
- // be anything JSON-serializable, or a Future that completes to something
- // JSON-serializable.
- return i++;
- });
-
- // Methods can take parameters. They're presented as a [Parameters] object
- // which makes it easy to validate that the expected parameters exist.
- server.registerMethod("echo", (params) {
- // If the request doesn't have a "message" parameter, this will
- // automatically send a response notifying the client that the request
- // was invalid.
- return params.getNamed("message");
- });
-
- // [Parameters] has methods for verifying argument types.
- server.registerMethod("subtract", (params) {
- // If "minuend" or "subtrahend" aren't numbers, this will reject the
- // request.
- return params.getNum("minuend") - params.getNum("subtrahend");
- });
-
- // [Parameters] also supports optional arguments.
- server.registerMethod("sort", (params) {
- var list = params.getList("list");
- list.sort();
- if (params.getBool("descending", orElse: () => false)) {
- return params.list.reversed;
- } else {
- return params.list;
- }
- });
-
- // A method can send an error response by throwing a
- // `json_rpc.RpcException`. Any positive number may be used as an
- // application- defined error code.
- const DIVIDE_BY_ZERO = 1;
- server.registerMethod("divide", (params) {
- var divisor = params.getNum("divisor");
- if (divisor == 0) {
- throw new json_rpc.RpcException(
- DIVIDE_BY_ZERO, "Cannot divide by zero.");
- }
-
- return params.getNum("dividend") / divisor;
- });
-
- // To give you time to register all your methods, the server won't actually
- // start listening for requests until you call `listen`.
- server.listen();
+ // Any string may be used as a method name. JSON-RPC 2.0 methods are
+ // case-sensitive.
+ var i = 0;
+ server.registerMethod("count", () {
+ // Just return the value to be sent as a response to the client. This can
+ // be anything JSON-serializable, or a Future that completes to something
+ // JSON-serializable.
+ return i++;
});
+
+ // Methods can take parameters. They're presented as a [Parameters] object
+ // which makes it easy to validate that the expected parameters exist.
+ server.registerMethod("echo", (params) {
+ // If the request doesn't have a "message" parameter, this will
+ // automatically send a response notifying the client that the request
+ // was invalid.
+ return params.getNamed("message");
+ });
+
+ // [Parameters] has methods for verifying argument types.
+ server.registerMethod("subtract", (params) {
+ // If "minuend" or "subtrahend" aren't numbers, this will reject the
+ // request.
+ return params.getNum("minuend") - params.getNum("subtrahend");
+ });
+
+ // [Parameters] also supports optional arguments.
+ server.registerMethod("sort", (params) {
+ var list = params.getList("list");
+ list.sort();
+ if (params.getBool("descending", orElse: () => false)) {
+ return params.list.reversed;
+ } else {
+ return params.list;
+ }
+ });
+
+ // A method can send an error response by throwing a
+ // `json_rpc.RpcException`. Any positive number may be used as an
+ // application- defined error code.
+ const DIVIDE_BY_ZERO = 1;
+ server.registerMethod("divide", (params) {
+ var divisor = params.getNum("divisor");
+ if (divisor == 0) {
+ throw new json_rpc.RpcException(
+ DIVIDE_BY_ZERO, "Cannot divide by zero.");
+ }
+
+ return params.getNum("dividend") / divisor;
+ });
+
+ // To give you time to register all your methods, the server won't actually
+ // start listening for requests until you call `listen`.
+ server.listen();
}
```
@@ -82,38 +80,36 @@
```dart
import "package:json_rpc_2/json_rpc_2.dart" as json_rpc;
+import "package:stream_channel/stream_channel.dart";
-void main() {
- WebSocket.connect('ws://localhost:4321').then((socket) {
- // Just like the server, a client takes a Stream and a StreamSink or a
- // single object that's both.
- var client = new json_rpc.Client(socket);
+main() async {
+ var socket = await WebSocket.connect('ws://localhost:4321');
+ var client = new json_rpc.Client(new StreamChannel(socket, socket));
- // This calls the "count" method on the server. A Future is returned that
- // will complete to the value contained in the server's response.
- client.sendRequest("count").then((result) => print("Count is $result."));
+ // This calls the "count" method on the server. A Future is returned that
+ // will complete to the value contained in the server's response.
+ client.sendRequest("count").then((result) => print("Count is $result."));
- // Parameters are passed as a simple Map or, for positional parameters, an
- // Iterable. Make sure they're JSON-serializable!
- client.sendRequest("echo", {"message": "hello"})
- .then((echo) => print('Echo says "$echo"!'));
+ // Parameters are passed as a simple Map or, for positional parameters, an
+ // Iterable. Make sure they're JSON-serializable!
+ client.sendRequest("echo", {"message": "hello"})
+ .then((echo) => print('Echo says "$echo"!'));
- // A notification is a way to call a method that tells the server that no
- // result is expected. Its return type is `void`; even if it causes an
- // error, you won't hear back.
- client.sendNotification("count");
+ // A notification is a way to call a method that tells the server that no
+ // result is expected. Its return type is `void`; even if it causes an
+ // error, you won't hear back.
+ client.sendNotification("count");
- // If the server sends an error response, the returned Future will complete
- // with an RpcException. You can catch this error and inspect its error
- // code, message, and any data that the server sent along with it.
- client.sendRequest("divide", {"dividend": 2, "divisor": 0})
- .catchError((error) {
- print("RPC error ${error.code}: ${error.message}");
- });
-
- // The client won't subscribe to the input stream until you call `listen`.
- client.listen();
+ // If the server sends an error response, the returned Future will complete
+ // with an RpcException. You can catch this error and inspect its error
+ // code, message, and any data that the server sent along with it.
+ client.sendRequest("divide", {"dividend": 2, "divisor": 0})
+ .catchError((error) {
+ print("RPC error ${error.code}: ${error.message}");
});
+
+ // The client won't subscribe to the input stream until you call `listen`.
+ client.listen();
}
```
diff --git a/lib/src/channel_manager.dart b/lib/src/channel_manager.dart
new file mode 100644
index 0000000..35d75a6
--- /dev/null
+++ b/lib/src/channel_manager.dart
@@ -0,0 +1,79 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_channel/stream_channel.dart';
+
+/// Wraps a [StreamChannel] and handles logic that's shared between [Server],
+/// [Client], and [Peer].
+///
+/// These classes don't provide the user direct access to a
+/// [StreamSubscription]. Instead, they use the future returned by [listen] to
+/// notify the user of the remote endpoint closing or producing an error.
+class ChannelManager {
+ /// The name of the component whose channel is wrapped (e.g. "Server").
+ ///
+ /// Used for error reporting.
+ final String _name;
+
+ /// The underlying channel.
+ final StreamChannel _channel;
+
+ /// Returns a [Future] that completes when the connection is closed.
+ ///
+ /// This is the same future that's returned by [listen].
+ Future get done => _doneCompleter.future;
+ final _doneCompleter = new Completer();
+
+ /// Whether the underlying communication channel is closed.
+ bool get isClosed => _doneCompleter.isCompleted;
+
+ /// Whether [listen] has been called.
+ bool _listenCalled = false;
+
+ /// Whether [close] has been called.
+ ///
+ /// Note that [isClosed] tracks whether the underlying connection is closed,
+ /// whereas this tracks only whether it was explicitly closed from this end.
+ bool _closeCalled = false;
+
+ ChannelManager(this._name, this._channel);
+
+ /// Starts listening to the channel.
+ ///
+ /// The returned Future will complete when the input stream is closed. If the
+ /// input stream emits an error, that will be piped to the returned Future.
+ Future listen(void handleInput(input)) {
+ if (_listenCalled) {
+ throw new StateError("Can only call $_name.listen() once.");
+ }
+ _listenCalled = true;
+
+ _channel.stream.listen(handleInput,
+ onError: (error, stackTrace) {
+ _doneCompleter.completeError(error, stackTrace);
+ _channel.sink.close();
+ },
+ onDone: _doneCompleter.complete,
+ cancelOnError: true);
+
+ return done;
+ }
+
+ /// Emit [event].
+ void add(event) {
+ if (isClosed && !_closeCalled) return;
+ _channel.sink.add(event);
+ }
+
+ /// Closes the channel.
+ Future close() {
+ _closeCalled = true;
+ if (!_doneCompleter.isCompleted) {
+ _doneCompleter.complete(_channel.sink.close());
+ }
+ return done;
+ }
+}
diff --git a/lib/src/client.dart b/lib/src/client.dart
index e75e7f3..5df21e4 100644
--- a/lib/src/client.dart
+++ b/lib/src/client.dart
@@ -5,9 +5,10 @@
import 'dart:async';
import 'package:stack_trace/stack_trace.dart';
+import 'package:stream_channel/stream_channel.dart';
+import 'channel_manager.dart';
import 'exception.dart';
-import 'two_way_stream.dart';
import 'utils.dart';
/// A JSON-RPC 2.0 client.
@@ -16,7 +17,7 @@
/// those method calls. Methods can be called with [sendRequest], or with
/// [sendNotification] if no response is expected.
class Client {
- final TwoWayStream _streams;
+ final ChannelManager _manager;
/// The next request id.
var _id = 0;
@@ -29,55 +30,53 @@
/// The map of request ids to pending requests.
final _pendingRequests = new Map<int, _Request>();
- /// Returns a [Future] that completes when the connection is closed.
+ /// Returns a [Future] that completes when the underlying connection is
+ /// closed.
///
- /// This is the same future that's returned by [listen].
- Future get done => _streams.done;
+ /// This is the same future that's returned by [listen] and [close]. It may
+ /// complete before [close] is called if the remote endpoint closes the
+ /// connection.
+ Future get done => _manager.done;
- /// Whether the connection is closed.
- bool get isClosed => _streams.isClosed;
-
- /// Creates a [Client] that writes requests to [requests] and reads responses
- /// from [responses].
+ /// Whether the underlying connection is closed.
///
- /// If [responses] is a [StreamSink] as well as a [Stream] (for example, a
- /// `WebSocket`), [requests] may be omitted.
+ /// Note that this will be `true` before [close] is called if the remote
+ /// endpoint closes the connection.
+ bool get isClosed => _manager.isClosed;
+
+ /// Creates a [Client] that communicates over [channel].
///
/// Note that the client won't begin listening to [responses] until
/// [Client.listen] is called.
- Client(Stream<String> responses, [StreamSink<String> requests])
- : _streams = new TwoWayStream(
- "Client", responses, "responses", requests, "requests");
+ Client(StreamChannel<String> channel)
+ : this.withoutJson(channel
+ .transform(jsonDocument)
+ .transformStream(ignoreFormatExceptions));
- /// Creates a [Client] that writes decoded responses to [responses] and reads
- /// decoded requests from [requests].
+ /// Creates a [Client] that communicates using decoded messages over
+ /// [channel].
///
/// Unlike [new Client], this doesn't read or write JSON strings. Instead, it
/// reads and writes decoded maps or lists.
///
- /// If [responses] is a [StreamSink] as well as a [Stream], [requests] may be
- /// omitted.
- ///
/// Note that the client won't begin listening to [responses] until
/// [Client.listen] is called.
- Client.withoutJson(Stream responses, [StreamSink requests])
- : _streams = new TwoWayStream.withoutJson(
- "Client", responses, "responses", requests, "requests");
+ Client.withoutJson(StreamChannel channel)
+ : _manager = new ChannelManager("Client", channel);
/// Starts listening to the underlying stream.
///
- /// Returns a [Future] that will complete when the stream is closed or when it
- /// has an error.
+ /// Returns a [Future] that will complete when the connection is closed or
+ /// when it has an error. This is the same as [done].
///
/// [listen] may only be called once.
- Future listen() => _streams.listen(_handleResponse);
+ Future listen() => _manager.listen(_handleResponse);
- /// Closes the server's request sink and response subscription.
+ /// Closes the underlying connection.
///
/// Returns a [Future] that completes when all resources have been released.
- ///
- /// A client can't be closed before [listen] has been called.
- Future close() => _streams.close();
+ /// This is the same as [done].
+ Future close() => _manager.close();
/// Sends a JSON-RPC 2 request to invoke the given [method].
///
@@ -132,7 +131,7 @@
if (_batch != null) {
_batch.add(message);
} else {
- _streams.add(message);
+ _manager.add(message);
}
}
@@ -153,7 +152,7 @@
_batch = [];
return tryFinally(callback, () {
- _streams.add(_batch);
+ _manager.add(_batch);
_batch = null;
});
}
diff --git a/lib/src/peer.dart b/lib/src/peer.dart
index a6707e2..810d173 100644
--- a/lib/src/peer.dart
+++ b/lib/src/peer.dart
@@ -4,12 +4,13 @@
import 'dart:async';
-import '../error_code.dart' as error_code;
+import 'package:stream_channel/stream_channel.dart';
+
+import 'channel_manager.dart';
import 'client.dart';
-import 'exception.dart';
import 'parameters.dart';
import 'server.dart';
-import 'two_way_stream.dart';
+import 'utils.dart';
/// A JSON-RPC 2.0 client *and* server.
///
@@ -17,7 +18,7 @@
/// 2.0 endpoint. It sends both requests and responses across the same
/// communication channel and expects to connect to a peer that does the same.
class Peer implements Client, Server {
- TwoWayStream _streams;
+ final ChannelManager _manager;
/// The underlying client that handles request-sending and response-receiving
/// logic.
@@ -35,55 +36,31 @@
/// they're responses.
final _clientIncomingForwarder = new StreamController(sync: true);
- /// A stream controller that forwards outgoing messages from both [_server]
- /// and [_client].
- final _outgoingForwarder = new StreamController(sync: true);
+ Future get done => _manager.done;
+ bool get isClosed => _manager.isClosed;
- Future get done => _streams.done;
- bool get isClosed => _streams.isClosed;
-
- /// Creates a [Peer] that reads incoming messages from [incoming] and writes
- /// outgoing messages to [outgoing].
+ /// Creates a [Peer] that communicates over [channel].
///
- /// If [incoming] is a [StreamSink] as well as a [Stream] (for example, a
- /// `WebSocket`), [outgoing] may be omitted.
- ///
- /// Note that the peer won't begin listening to [incoming] until [Peer.listen]
+ /// Note that the peer won't begin listening to [channel] until [Peer.listen]
/// is called.
- Peer(Stream<String> incoming, [StreamSink<String> outgoing]) {
- _streams = new TwoWayStream("Peer", incoming, "incoming",
- outgoing, "outgoing", onInvalidInput: (message, error) {
- _streams.add(new RpcException(error_code.PARSE_ERROR,
- 'Invalid JSON: ${error.message}').serialize(message));
- });
+ Peer(StreamChannel<String> channel)
+ : this.withoutJson(channel
+ .transform(jsonDocument)
+ .transform(respondToFormatExceptions));
- _outgoingForwarder.stream.listen(_streams.add);
- _server = new Server.withoutJson(
- _serverIncomingForwarder.stream, _outgoingForwarder);
- _client = new Client.withoutJson(
- _clientIncomingForwarder.stream, _outgoingForwarder);
- }
-
- /// Creates a [Peer] that reads incoming decoded messages from [incoming] and
- /// writes outgoing decoded messages to [outgoing].
+ /// Creates a [Peer] that communicates using decoded messages over [channel].
///
/// Unlike [new Peer], this doesn't read or write JSON strings. Instead, it
/// reads and writes decoded maps or lists.
///
- /// If [incoming] is a [StreamSink] as well as a [Stream], [outgoing] may be
- /// omitted.
- ///
- /// Note that the peer won't begin listening to [incoming] until
+ /// Note that the peer won't begin listening to [channel] until
/// [Peer.listen] is called.
- Peer.withoutJson(Stream incoming, [StreamSink outgoing]) {
- _streams = new TwoWayStream.withoutJson("Peer", incoming, "incoming",
- outgoing, "outgoing");
-
- _outgoingForwarder.stream.listen(_streams.add);
- _server = new Server.withoutJson(
- _serverIncomingForwarder.stream, _outgoingForwarder);
- _client = new Client.withoutJson(
- _clientIncomingForwarder.stream, _outgoingForwarder);
+ Peer.withoutJson(StreamChannel channel)
+ : _manager = new ChannelManager("Peer", channel) {
+ _server = new Server.withoutJson(new StreamChannel(
+ _serverIncomingForwarder.stream, channel.sink));
+ _client = new Client.withoutJson(new StreamChannel(
+ _clientIncomingForwarder.stream, channel.sink));
}
// Client methods.
@@ -109,7 +86,7 @@
Future listen() {
_client.listen();
_server.listen();
- return _streams.listen((message) {
+ return _manager.listen((message) {
if (message is Map) {
if (message.containsKey('result') || message.containsKey('error')) {
_clientIncomingForwarder.add(message);
@@ -133,5 +110,5 @@
}
Future close() =>
- Future.wait([_client.close(), _server.close(), _streams.close()]);
+ Future.wait([_client.close(), _server.close(), _manager.close()]);
}
diff --git a/lib/src/server.dart b/lib/src/server.dart
index 06dd0b9..3ef59ed 100644
--- a/lib/src/server.dart
+++ b/lib/src/server.dart
@@ -7,11 +7,12 @@
import 'dart:convert';
import 'package:stack_trace/stack_trace.dart';
+import 'package:stream_channel/stream_channel.dart';
import '../error_code.dart' as error_code;
+import 'channel_manager.dart';
import 'exception.dart';
import 'parameters.dart';
-import 'two_way_stream.dart';
import 'utils.dart';
/// A JSON-RPC 2.0 server.
@@ -25,7 +26,7 @@
/// asynchronously, it's possible for multiple methods to be invoked at the same
/// time, or even for a single method to be invoked multiple times at once.
class Server {
- TwoWayStream _streams;
+ final ChannelManager _manager;
/// The methods registered for this server.
final _methods = new Map<String, Function>();
@@ -36,59 +37,53 @@
/// [RpcException.methodNotFound] exception.
final _fallbacks = new Queue<Function>();
- /// Returns a [Future] that completes when the connection is closed.
+ /// Returns a [Future] that completes when the underlying connection is
+ /// closed.
///
- /// This is the same future that's returned by [listen].
- Future get done => _streams.done;
+ /// This is the same future that's returned by [listen] and [close]. It may
+ /// complete before [close] is called if the remote endpoint closes the
+ /// connection.
+ Future get done => _manager.done;
- /// Whether the connection is closed.
- bool get isClosed => _streams.isClosed;
-
- /// Creates a [Server] that reads requests from [requests] and writes
- /// responses to [responses].
+ /// Whether the underlying connection is closed.
///
- /// If [requests] is a [StreamSink] as well as a [Stream] (for example, a
- /// `WebSocket`), [responses] may be omitted.
+ /// Note that this will be `true` before [close] is called if the remote
+ /// endpoint closes the connection.
+ bool get isClosed => _manager.isClosed;
+
+ /// Creates a [Server] that communicates over [channel].
///
/// Note that the server won't begin listening to [requests] until
/// [Server.listen] is called.
- Server(Stream<String> requests, [StreamSink<String> responses]) {
- _streams = new TwoWayStream("Server", requests, "requests",
- responses, "responses", onInvalidInput: (message, error) {
- _streams.add(new RpcException(error_code.PARSE_ERROR,
- 'Invalid JSON: ${error.message}').serialize(message));
- });
- }
+ Server(StreamChannel<String> channel)
+ : this.withoutJson(channel
+ .transform(jsonDocument)
+ .transform(respondToFormatExceptions));
- /// Creates a [Server] that reads decoded requests from [requests] and writes
- /// decoded responses to [responses].
+ /// Creates a [Server] that communicates using decoded messages over
+ /// [channel].
///
/// Unlike [new Server], this doesn't read or write JSON strings. Instead, it
/// reads and writes decoded maps or lists.
///
- /// If [requests] is a [StreamSink] as well as a [Stream], [responses] may be
- /// omitted.
- ///
/// Note that the server won't begin listening to [requests] until
/// [Server.listen] is called.
- Server.withoutJson(Stream requests, [StreamSink responses])
- : _streams = new TwoWayStream.withoutJson(
- "Server", requests, "requests", responses, "responses");
+ Server.withoutJson(StreamChannel channel)
+ : _manager = new ChannelManager("Server", channel);
/// Starts listening to the underlying stream.
///
- /// Returns a [Future] that will complete when the stream is closed or when it
- /// has an error.
+ /// Returns a [Future] that will complete when the connection is closed or
+ /// when it has an error. This is the same as [done].
///
/// [listen] may only be called once.
- Future listen() => _streams.listen(_handleRequest);
+ Future listen() => _manager.listen(_handleRequest);
- /// Closes the server's request subscription and response sink.
+ /// Closes the underlying connection.
///
/// Returns a [Future] that completes when all resources have been released.
- ///
- /// A server can't be closed before [listen] has been called.
- Future close() => _streams.close();
+ /// This is the same as [done].
+ Future close() => _manager.close();
/// Registers a method named [name] on this server.
///
@@ -129,21 +124,24 @@
/// handling that request and returns a JSON-serializable response, or `null`
/// if no response should be sent. [callback] may send custom
/// errors by throwing an [RpcException].
- Future _handleRequest(request) {
- return syncFuture(() {
- if (request is! List) return _handleSingleRequest(request);
- if (request.isEmpty) {
- return new RpcException(error_code.INVALID_REQUEST, 'A batch must '
- 'contain at least one request.').serialize(request);
- }
+ Future _handleRequest(request) async {
+ var response;
+ if (request is! List) {
+ response = await _handleSingleRequest(request);
+ if (response == null) return;
+ } else if (request.isEmpty) {
+ response = new RpcException(
+ error_code.INVALID_REQUEST,
+ 'A batch must contain at least one request.')
+ .serialize(request);
+ } else {
+ var results = await Future.wait(request.map(_handleSingleRequest));
+ var nonNull = results.where((result) => result != null);
+ if (nonNull.isEmpty) return;
+ response = nonNull.toList();
+ }
- return Future.wait(request.map(_handleSingleRequest)).then((results) {
- var nonNull = results.where((result) => result != null);
- return nonNull.isEmpty ? null : nonNull.toList();
- });
- }).then((response) {
- if (!_streams.isClosed && response != null) _streams.add(response);
- });
+ if (!isClosed) _manager.add(response);
}
/// Handles an individual parsed request.
diff --git a/lib/src/two_way_stream.dart b/lib/src/two_way_stream.dart
deleted file mode 100644
index 4f20686..0000000
--- a/lib/src/two_way_stream.dart
+++ /dev/null
@@ -1,135 +0,0 @@
-// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-import 'dart:async';
-import 'dart:convert';
-
-import 'utils.dart';
-
-/// A class for managing a stream of input messages and a sink for output
-/// messages.
-///
-/// This contains stream logic that's shared between [Server] and [Client].
-class TwoWayStream {
- /// The name of the component whose streams are being managed (e.g. "Server").
- ///
- /// Used for error reporting.
- final String _name;
-
- /// The input stream.
- ///
- /// This is a stream of decoded JSON objects.
- final Stream _input;
-
- /// The subscription to [_input].
- StreamSubscription _inputSubscription;
-
- /// The output sink.
- ///
- /// This takes decoded JSON objects.
- final StreamSink _output;
-
- /// Returns a [Future] that completes when the connection is closed.
- ///
- /// This is the same future that's returned by [listen].
- Future get done => _doneCompleter.future;
- final _doneCompleter = new Completer();
-
- /// Whether the stream has been closed.
- bool get isClosed => _doneCompleter.isCompleted;
-
- /// Creates a two-way stream.
- ///
- /// [input] and [output] should emit and take (respectively) JSON-encoded
- /// strings.
- ///
- /// [inputName] is used in error messages as the name of the input parameter.
- /// [outputName] is likewise used as the name of the output parameter.
- ///
- /// If [onInvalidInput] is passed, any errors parsing messages from [input]
- /// are passed to it. Otherwise, they're ignored and the input is discarded.
- factory TwoWayStream(String name, Stream<String> input, String inputName,
- StreamSink<String> output, String outputName,
- {void onInvalidInput(String message, FormatException error)}) {
- if (output == null) {
- if (input is! StreamSink) {
- throw new ArgumentError("Either `$inputName` must be a StreamSink or "
- "`$outputName` must be passed.");
- }
- output = input as StreamSink;
- }
-
- var wrappedOutput = mapStreamSink(output, JSON.encode);
- return new TwoWayStream.withoutJson(name, input.expand((message) {
- var decodedMessage;
- try {
- decodedMessage = JSON.decode(message);
- } on FormatException catch (error) {
- if (onInvalidInput != null) onInvalidInput(message, error);
- return [];
- }
-
- return [decodedMessage];
- }), inputName, wrappedOutput, outputName);
- }
-
- /// Creates a two-way stream that reads decoded input and writes decoded
- /// responses.
- ///
- /// [input] and [output] should emit and take (respectively) decoded JSON
- /// objects.
- ///
- /// [inputName] is used in error messages as the name of the input parameter.
- /// [outputName] is likewise used as the name of the output parameter.
- TwoWayStream.withoutJson(this._name, Stream input, String inputName,
- StreamSink output, String outputName)
- : _input = input,
- _output = output == null && input is StreamSink ? input : output {
- if (_output == null) {
- throw new ArgumentError("Either `$inputName` must be a StreamSink or "
- "`$outputName` must be passed.");
- }
- }
-
- /// Starts listening to the input stream.
- ///
- /// The returned Future will complete when the input stream is closed. If the
- /// input stream emits an error, that will be piped to the returned Future.
- Future listen(void handleInput(input)) {
- if (_inputSubscription != null) {
- throw new StateError("Can only call $_name.listen once.");
- }
-
- _inputSubscription = _input.listen(handleInput,
- onError: (error, stackTrace) {
- if (_doneCompleter.isCompleted) return;
- _doneCompleter.completeError(error, stackTrace);
- _output.close();
- }, onDone: () {
- if (_doneCompleter.isCompleted) return;
- _doneCompleter.complete();
- _output.close();
- }, cancelOnError: true);
-
- return _doneCompleter.future;
- }
-
- /// Emit [event] on the output stream.
- void add(event) => _output.add(event);
-
- /// Stops listening to the input stream and closes the output stream.
- Future close() {
- if (_inputSubscription == null) {
- throw new StateError("Can't call $_name.close before $_name.listen.");
- }
-
- if (!_doneCompleter.isCompleted) _doneCompleter.complete();
-
- var inputFuture = _inputSubscription.cancel();
- // TODO(nweiz): include the output future in the return value when issue
- // 19095 is fixed.
- _output.close();
- return inputFuture == null ? new Future.value() : inputFuture;
- }
-}
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index dde6b4a..8871b56 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -5,6 +5,10 @@
import 'dart:async';
import 'package:stack_trace/stack_trace.dart';
+import 'package:stream_channel/stream_channel.dart';
+
+import '../error_code.dart' as error_code;
+import 'exception.dart';
typedef ZeroArgumentFunction();
@@ -64,6 +68,35 @@
}
}
+/// A transformer that silently drops [FormatException]s.
+final ignoreFormatExceptions = new StreamTransformer.fromHandlers(
+ handleError: (error, stackTrace, sink) {
+ if (error is FormatException) return;
+ sink.addError(error, stackTrace);
+});
+
+/// A transformer that sends error responses on [FormatException]s.
+final StreamChannelTransformer respondToFormatExceptions =
+ new _RespondToFormatExceptionsTransformer();
+
+/// The implementation of [respondToFormatExceptions].
+class _RespondToFormatExceptionsTransformer
+ implements StreamChannelTransformer {
+ StreamChannel bind(StreamChannel channel) {
+ var transformed;
+ transformed = channel.changeStream((stream) {
+ return stream.handleError((error) {
+ if (error is! FormatException) throw error;
+
+ var exception = new RpcException(
+ error_code.PARSE_ERROR, 'Invalid JSON: ${error.message}');
+ transformed.sink.add(exception.serialize(error.source));
+ });
+ });
+ return transformed;
+ }
+}
+
/// Returns a [StreamSink] that wraps [sink] and maps each event added using
/// [callback].
StreamSink mapStreamSink(StreamSink sink, callback(event)) =>
diff --git a/pubspec.yaml b/pubspec.yaml
index 348dc35..630b9cc 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,12 +1,12 @@
name: json_rpc_2
-version: 1.2.0
+version: 2.0.0
author: Dart Team <misc@dartlang.org>
description: An implementation of the JSON-RPC 2.0 spec.
homepage: http://github.com/dart-lang/json_rpc_2
dependencies:
stack_trace: '>=0.9.1 <2.0.0'
+ stream_channel: '^1.1.0'
dev_dependencies:
test: ">=0.12.0 <0.13.0"
environment:
- sdk: ">=1.2.0 <2.0.0"
-
+ sdk: ">=1.8.0 <2.0.0"
diff --git a/test/client/stream_test.dart b/test/client/stream_test.dart
index 146e753..b9a31c6 100644
--- a/test/client/stream_test.dart
+++ b/test/client/stream_test.dart
@@ -4,17 +4,25 @@
import 'dart:async';
+import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart';
+
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'utils.dart';
void main() {
+ var responseController;
+ var requestController;
+ var client;
+ setUp(() {
+ responseController = new StreamController();
+ requestController = new StreamController();
+ client = new json_rpc.Client.withoutJson(
+ new StreamChannel(responseController.stream, requestController.sink));
+ });
+
test(".withoutJson supports decoded stream and sink", () {
- var responseController = new StreamController();
- var requestController = new StreamController();
- var client = new json_rpc.Client.withoutJson(
- responseController.stream, requestController.sink);
client.listen();
expect(requestController.stream.first.then((request) {
@@ -34,11 +42,6 @@
});
test(".listen returns when the controller is closed", () {
- var responseController = new StreamController();
- var requestController = new StreamController();
- var client = new json_rpc.Client.withoutJson(
- responseController.stream, requestController.sink);
-
var hasListenCompeted = false;
expect(client.listen().then((_) => hasListenCompeted = true), completes);
@@ -51,30 +54,18 @@
});
test(".listen returns a stream error", () {
- var responseController = new StreamController();
- var requestController = new StreamController();
- var client = new json_rpc.Client(
- responseController.stream, requestController.sink);
-
expect(client.listen(), throwsA('oh no'));
responseController.addError('oh no');
});
test(".listen can't be called twice", () {
- var responseController = new StreamController();
- var requestController = new StreamController();
- var client = new json_rpc.Client(
- responseController.stream, requestController.sink);
client.listen();
-
expect(() => client.listen(), throwsStateError);
});
test(".close cancels the stream subscription and closes the sink", () {
- var responseController = new StreamController();
- var requestController = new StreamController();
- var client = new json_rpc.Client(
- responseController.stream, requestController.sink);
+ // Work around sdk#19095.
+ requestController.stream.listen(null);
expect(client.listen(), completes);
@@ -85,13 +76,4 @@
expect(() => responseController.stream.listen((_) {}), throwsStateError);
expect(requestController.isClosed, isTrue);
});
-
- test(".close can't be called before .listen", () {
- var responseController = new StreamController();
- var requestController = new StreamController();
- var client = new json_rpc.Client(
- responseController.stream, requestController.sink);
-
- expect(() => client.close(), throwsStateError);
- });
}
diff --git a/test/client/utils.dart b/test/client/utils.dart
index 8684892..5eb0b60 100644
--- a/test/client/utils.dart
+++ b/test/client/utils.dart
@@ -5,9 +5,11 @@
import 'dart:async';
import 'dart:convert';
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'package:json_rpc_2/error_code.dart' as error_code;
-import 'package:test/test.dart';
/// A controller used to test a [json_rpc.Client].
class ClientController {
@@ -23,7 +25,7 @@
ClientController() {
_client = new json_rpc.Client(
- _responseController.stream, _requestController.sink);
+ new StreamChannel(_responseController.stream, _requestController.sink));
_client.listen();
}
diff --git a/test/peer_test.dart b/test/peer_test.dart
index 7008b72..89ab2d6 100644
--- a/test/peer_test.dart
+++ b/test/peer_test.dart
@@ -5,7 +5,9 @@
import 'dart:async';
import 'dart:convert';
+import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart';
+
import 'package:json_rpc_2/error_code.dart' as error_code;
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
@@ -19,7 +21,7 @@
var outgoingController = new StreamController();
outgoing = outgoingController.stream;
peer = new json_rpc.Peer.withoutJson(
- incomingController.stream, outgoingController);
+ new StreamChannel(incomingController.stream, outgoingController));
});
group("like a client,", () {
@@ -165,7 +167,7 @@
var incomingController = new StreamController();
var outgoingController = new StreamController();
var jsonPeer = new json_rpc.Peer(
- incomingController.stream, outgoingController);
+ new StreamChannel(incomingController.stream, outgoingController));
expect(outgoingController.stream.first.then(JSON.decode), completion({
"jsonrpc": "2.0",
diff --git a/test/server/stream_test.dart b/test/server/stream_test.dart
index 58c5e62..7be0001 100644
--- a/test/server/stream_test.dart
+++ b/test/server/stream_test.dart
@@ -4,17 +4,25 @@
import 'dart:async';
+import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart';
+
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'utils.dart';
void main() {
+ var requestController;
+ var responseController;
+ var server;
+ setUp(() {
+ requestController = new StreamController();
+ responseController = new StreamController();
+ server = new json_rpc.Server.withoutJson(
+ new StreamChannel(requestController.stream, responseController.sink));
+ });
+
test(".withoutJson supports decoded stream and sink", () {
- var requestController = new StreamController();
- var responseController = new StreamController();
- var server = new json_rpc.Server.withoutJson(
- requestController.stream, responseController.sink);
server.listen();
server.registerMethod('foo', (params) {
@@ -36,11 +44,6 @@
});
test(".listen returns when the controller is closed", () {
- var requestController = new StreamController();
- var responseController = new StreamController();
- var server = new json_rpc.Server(
- requestController.stream, responseController.sink);
-
var hasListenCompeted = false;
expect(server.listen().then((_) => hasListenCompeted = true), completes);
@@ -53,30 +56,19 @@
});
test(".listen returns a stream error", () {
- var requestController = new StreamController();
- var responseController = new StreamController();
- var server = new json_rpc.Server(
- requestController.stream, responseController.sink);
-
expect(server.listen(), throwsA('oh no'));
requestController.addError('oh no');
});
test(".listen can't be called twice", () {
- var requestController = new StreamController();
- var responseController = new StreamController();
- var server = new json_rpc.Server(
- requestController.stream, responseController.sink);
server.listen();
expect(() => server.listen(), throwsStateError);
});
test(".close cancels the stream subscription and closes the sink", () {
- var requestController = new StreamController();
- var responseController = new StreamController();
- var server = new json_rpc.Server(
- requestController.stream, responseController.sink);
+ // Work around sdk#19095.
+ responseController.stream.listen(null);
expect(server.listen(), completes);
@@ -87,13 +79,4 @@
expect(() => requestController.stream.listen((_) {}), throwsStateError);
expect(responseController.isClosed, isTrue);
});
-
- test(".close can't be called before .listen", () {
- var requestController = new StreamController();
- var responseController = new StreamController();
- var server = new json_rpc.Server(
- requestController.stream, responseController.sink);
-
- expect(() => server.close(), throwsStateError);
- });
}
diff --git a/test/server/utils.dart b/test/server/utils.dart
index 9b4b020..f33cb13 100644
--- a/test/server/utils.dart
+++ b/test/server/utils.dart
@@ -5,9 +5,11 @@
import 'dart:async';
import 'dart:convert';
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'package:json_rpc_2/error_code.dart' as error_code;
-import 'package:test/test.dart';
/// A controller used to test a [json_rpc.Server].
class ServerController {
@@ -23,7 +25,7 @@
ServerController() {
_server = new json_rpc.Server(
- _requestController.stream, _responseController.sink);
+ new StreamChannel(_requestController.stream, _responseController.sink));
_server.listen();
}