Add an IO implementation of WebSocketChannel.
R=kevmoo@google.com
Review URL: https://codereview.chromium.org//1756613002 .
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..48b2d28
--- /dev/null
+++ b/README.md
@@ -0,0 +1,79 @@
+The `web_socket_channel` package provides [`StreamChannel`][stream_channel]
+wrappers for WebSocket connections. It provides a cross-platform
+[`WebSocketChannel`][WebSocketChannel] API, a cross-platform implementation of
+that API that communicates over an underlying [`StreamChannel`][stream_channel],
+and [an implementation][IOWebSocketChannel] that wraps `dart:io`'s `WebSocket`
+class.
+
+[stream_channel]: https://pub.dartlang.org/packages/stream_channel
+[WebSocketChannel]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel-class.html
+[IOWebSocketChannel]: https://www.dartdocs.org/documentation/web_socket_channel/latest/io/IOWebSocketChannel-class.html
+
+## `WebSocketChannel`
+
+The [`WebSocketChannel`][WebSocketChannel] class's most important role is as the
+interface for WebSocket stream channels across all implementations and all
+platforms. In addition to the base `StreamChannel` interface, it adds a
+[`protocol`][protocol] getter that returns the negotiated protocol for the
+socket; a [`pingInterval`][pingInterval] property that allows you to control the
+socket's keep-alive behavior; and [`closeCode`][closeCode] and
+[`closeReason`][closeReason] getters that provide information about why the
+socket closed.
+
+[protocol]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/protocol.html
+[pingInterval]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/pingInterval.html
+[closeCode]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/closeCode.html
+[closeReason]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/closeReason.html
+
+The channel's [`sink` property][sink] is also special. It returns a
+[`WebSocketSink`][WebSocketSink], which is just like a `StreamSink` except that
+its [`close()`][sink.close] method supports optional `closeCode` and
+`closeReason` parameters. These parameters allow the caller to signal to the
+other socket exactly why they're closing the connection.
+
+[sink]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/sink.html
+[WebSocketSink]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketSink-class.html
+[sink.close]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketSink/close.html
+
+`WebSocketChannel` also works as a cross-platform implementation of the
+WebSocket protocol. Because it can't initiate or handle HTTP requests in a
+cross-platform way, the [`new WebSocketChannel()` constructor][new] takes an
+underlying [`StreamChannel`][stream_channel] over which it communicates using
+the WebSocket protocol. It also provides the static [`signKey()`][signKey]
+method to make it easier to implement the [initial WebSocket handshake][]. These
+are used in the [`shelf_web_socket`][shelf_web_socket] package to support
+WebSockets in a cross-platform way.
+
+[new]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/WebSocketChannel.html
+[signKey]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel/signKey.html
+[initial WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4.2.2
+[shelf_web_socket]: https://pub.dartlang.org/packages/shelf_web_socket
+
+## `IOWebSocketChannel`
+
+The [`IOWebSocketChannel`][IOWebSocketChannel] class wraps
+[`dart:io`'s `WebSocket` class][io.WebSocket]. Because it imports `dart:io`, it
+has its own library, `package:web_socket_channel/io.dart`. This allows the main
+`WebSocketChannel` class to be available on all platforms.
+
+[io.WebSocket]: https://api.dartlang.org/latest/dart-io/WebSocket-class.html
+
+An `IOWebSocketChannel` can be created by passing a `dart:io` WebSocket to
+[its constructor][new IOWebSocketChannel]. It's more common to want to connect
+directly to a `ws://` or `wss://` URL, in which case
+[`new IOWebSocketChannel.connect()`][IOWebSocketChannel.connect] should be used.
+
+[new IOWebSocketChannel]: https://www.dartdocs.org/documentation/web_socket_channel/latest/io/IOWebSocketChannel/IOWebSocketChannel.html
+[IOWebSocketChannel.connect]: https://www.dartdocs.org/documentation/web_socket_channel/latest/io/IOWebSocketChannel/IOWebSocketChannel.connect.html
+
+```dart
+import 'package:web_socket_channel/io.dart';
+
+main() async {
+ var channel = new IOWebSocketChannel.connect("ws://localhost:8181");
+ channel.sink.add("connected!");
+ channel.sink.listen((message) {
+ // ...
+ });
+}
+```
diff --git a/lib/io.dart b/lib/io.dart
new file mode 100644
index 0000000..4c06c07
--- /dev/null
+++ b/lib/io.dart
@@ -0,0 +1,113 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:io';
+
+import 'package:async/async.dart';
+import 'package:stream_channel/stream_channel.dart';
+
+import 'src/channel.dart';
+import 'src/exception.dart';
+import 'src/sink_completer.dart';
+
+/// A [WebSocketChannel] that communicates using a `dart:io` [WebSocket].
+class IOWebSocketChannel extends StreamChannelMixin
+ implements WebSocketChannel {
+ /// The underlying `dart:io` [WebSocket].
+ ///
+ /// If the channel was constructed with [IOWebSocketChannel.connect], this is
+ /// `null` until the [WebSocket.connect] future completes.
+ WebSocket _webSocket;
+
+ Duration get pingInterval =>
+ _webSocket == null ? _pingInterval : _webSocket.pingInterval;
+
+ set pingInterval(Duration value) {
+ if (_webSocket == null) {
+ _pingInterval = value;
+ } else {
+ _webSocket.pingInterval = value;
+ }
+ }
+
+ /// The ping interval set by the user.
+ ///
+ /// This is stored independently of [_webSocket] so that the user can set it
+ /// prior to [_webSocket] getting a value.
+ Duration _pingInterval;
+
+ String get protocol => _webSocket?.protocol;
+ int get closeCode => _webSocket?.closeCode;
+ String get closeReason => _webSocket?.closeReason;
+
+ final Stream stream;
+ final WebSocketSink sink;
+
+ // TODO(nweiz): Add a compression parameter after the initial release.
+
+ /// Creates a new WebSocket connection.
+ ///
+ /// Connects to [url] using [WebSocket.connect] and returns a channel that can
+ /// be used to communicate over the resulting socket. The [url] may be either
+ /// a [String] or a [Uri]; otherwise, the parameters are the same as
+ /// [WebSocket.connect].
+ ///
+ /// If there's an error connecting, the channel's stream emits a
+ /// [WebSocketChannelException] wrapping that error and then closes.
+ factory IOWebSocketChannel.connect(url, {Iterable<String> protocols,
+ Map<String, dynamic> headers}) {
+ var channel;
+ var sinkCompleter = new WebSocketSinkCompleter();
+ var stream = StreamCompleter.fromFuture(
+ WebSocket.connect(url.toString(), headers: headers).then((webSocket) {
+ channel._setWebSocket(webSocket);
+ sinkCompleter.setDestinationSink(new _IOWebSocketSink(webSocket));
+ return webSocket;
+ }).catchError((error) => throw new WebSocketChannelException.from(error)));
+
+ channel = new IOWebSocketChannel._withoutSocket(stream, sinkCompleter.sink);
+ return channel;
+ }
+
+ /// Creates a channel wrapping [socket].
+ IOWebSocketChannel(WebSocket socket)
+ : _webSocket = socket,
+ stream = socket.handleError((error) =>
+ throw new WebSocketChannelException.from(error)),
+ sink = new _IOWebSocketSink(socket);
+
+ /// Creates a channel without a socket.
+ ///
+ /// This is used with [connect] to synchronously provide a channel that later
+ /// has a socket added.
+ IOWebSocketChannel._withoutSocket(Stream stream, this.sink)
+ : _webSocket = null,
+ stream = stream.handleError((error) =>
+ throw new WebSocketChannelException.from(error));
+
+ /// Sets the underlying web socket.
+ ///
+ /// This is called by [connect] once the [WebSocket.connect] future has
+ /// completed.
+ void _setWebSocket(WebSocket webSocket) {
+ assert(_webSocket == null);
+
+ _webSocket = webSocket;
+ if (_pingInterval != null) _webSocket.pingInterval = pingInterval;
+ }
+}
+
+/// A [WebSocketSink] that forwards [close] calls to a `dart:io` [WebSocket].
+class _IOWebSocketSink extends DelegatingStreamSink implements WebSocketSink {
+ /// The underlying socket.
+ final WebSocket _webSocket;
+
+ _IOWebSocketSink(WebSocket webSocket)
+ : super(webSocket),
+ _webSocket = webSocket;
+
+ Future close([int closeCode, String closeReason]) =>
+ _webSocket.close(closeCode, closeReason);
+}
diff --git a/lib/src/channel.dart b/lib/src/channel.dart
index a8219a1..a4e3ed5 100644
--- a/lib/src/channel.dart
+++ b/lib/src/channel.dart
@@ -70,7 +70,7 @@
WebSocketSink get sink => new WebSocketSink._(_webSocket);
/// Signs a `Sec-WebSocket-Key` header sent by a WebSocket client as part of
- /// the [initial handshake].
+ /// the [initial handshake][].
///
/// The return value should be sent back to the client in a
/// `Sec-WebSocket-Accept` header.
diff --git a/lib/src/exception.dart b/lib/src/exception.dart
index d06dc60..47545ed 100644
--- a/lib/src/exception.dart
+++ b/lib/src/exception.dart
@@ -11,7 +11,11 @@
/// The exception that caused this one, if available.
final inner;
- WebSocketChannelException([this.message, this.inner]);
+ WebSocketChannelException([this.message]) : inner = null;
+
+ WebSocketChannelException.from(inner)
+ : message = inner.toString(),
+ inner = inner;
String toString() => message == null
? "WebSocketChannelException" :
diff --git a/lib/src/sink_completer.dart b/lib/src/sink_completer.dart
new file mode 100644
index 0000000..d932fd7
--- /dev/null
+++ b/lib/src/sink_completer.dart
@@ -0,0 +1,153 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'channel.dart';
+
+/// A [WebSocketSink] where the destination is provided later.
+///
+/// This is like a [StreamSinkCompleter], except that it properly forwards
+/// paramters to [WebSocketSink.close].
+class WebSocketSinkCompleter {
+ /// The sink for this completer.
+ ///
+ /// When a destination sink is provided, events that have been passed to the
+ /// sink will be forwarded to the destination.
+ ///
+ /// Events can be added to the sink either before or after a destination sink
+ /// is set.
+ final WebSocketSink sink = new _CompleterSink();
+
+ /// Returns [sink] typed as a [_CompleterSink].
+ _CompleterSink get _sink => sink;
+
+ /// Sets a sink as the destination for events from the
+ /// [WebSocketSinkCompleter]'s [sink].
+ ///
+ /// The completer's [sink] will act exactly as [destinationSink].
+ ///
+ /// If the destination sink is set before events are added to [sink], further
+ /// events are forwarded directly to [destinationSink].
+ ///
+ /// If events are added to [sink] before setting the destination sink, they're
+ /// buffered until the destination is available.
+ ///
+ /// A destination sink may be set at most once.
+ void setDestinationSink(WebSocketSink destinationSink) {
+ if (_sink._destinationSink != null) {
+ throw new StateError("Destination sink already set");
+ }
+ _sink._setDestinationSink(destinationSink);
+ }
+}
+
+/// [WebSocketSink] completed by [WebSocketSinkCompleter].
+class _CompleterSink implements WebSocketSink {
+ /// Controller for an intermediate sink.
+ ///
+ /// Created if the user adds events to this sink before the destination sink
+ /// is set.
+ StreamController _controller;
+
+ /// Completer for [done].
+ ///
+ /// Created if the user requests the [done] future before the destination sink
+ /// is set.
+ Completer _doneCompleter;
+
+ /// Destination sink for the events added to this sink.
+ ///
+ /// Set when [WebSocketSinkCompleter.setDestinationSink] is called.
+ WebSocketSink _destinationSink;
+
+ /// The close code passed to [close].
+ int _closeCode;
+
+ /// The close reason passed to [close].
+ String _closeReason;
+
+ /// Whether events should be sent directly to [_destinationSink], as opposed
+ /// to going through [_controller].
+ bool get _canSendDirectly => _controller == null && _destinationSink != null;
+
+ Future get done {
+ if (_doneCompleter != null) return _doneCompleter.future;
+ if (_destinationSink == null) {
+ _doneCompleter = new Completer.sync();
+ return _doneCompleter.future;
+ }
+ return _destinationSink.done;
+ }
+
+ void add(event) {
+ if (_canSendDirectly) {
+ _destinationSink.add(event);
+ } else {
+ _ensureController();
+ _controller.add(event);
+ }
+ }
+
+ void addError(error, [StackTrace stackTrace]) {
+ if (_canSendDirectly) {
+ _destinationSink.addError(error, stackTrace);
+ } else {
+ _ensureController();
+ _controller.addError(error, stackTrace);
+ }
+ }
+
+ Future addStream(Stream stream) {
+ if (_canSendDirectly) return _destinationSink.addStream(stream);
+
+ _ensureController();
+ return _controller.addStream(stream, cancelOnError: false);
+ }
+
+ Future close([int closeCode, String closeReason]) {
+ if (_canSendDirectly) {
+ _destinationSink.close(closeCode, closeReason);
+ } else {
+ _closeCode = closeCode;
+ _closeReason = closeReason;
+ _ensureController();
+ _controller.close();
+ }
+ return done;
+ }
+
+ /// Create [_controller] if it doesn't yet exist.
+ void _ensureController() {
+ if (_controller == null) _controller = new StreamController(sync: true);
+ }
+
+ /// Sets the destination sink to which events from this sink will be provided.
+ ///
+ /// If set before the user adds events, events will be added directly to the
+ /// destination sink. If the user adds events earlier, an intermediate sink is
+ /// created using a stream controller, and the destination sink is linked to
+ /// it later.
+ void _setDestinationSink(WebSocketSink sink) {
+ assert(_destinationSink == null);
+ _destinationSink = sink;
+
+ // If the user has already added data, it's buffered in the controller, so
+ // we add it to the sink.
+ if (_controller != null) {
+ // Catch any error that may come from [addStream] or [sink.close]. They'll
+ // be reported through [done] anyway.
+ sink
+ .addStream(_controller.stream)
+ .whenComplete(() => sink.close(_closeCode, _closeReason))
+ .catchError((_) {});
+ }
+
+ // If the user has already asked when the sink is done, connect the sink's
+ // done callback to that completer.
+ if (_doneCompleter != null) {
+ _doneCompleter.complete(sink.done);
+ }
+ }
+}
\ No newline at end of file
diff --git a/lib/web_socket_channel.dart b/lib/web_socket_channel.dart
index 299604c..54c05a0 100644
--- a/lib/web_socket_channel.dart
+++ b/lib/web_socket_channel.dart
@@ -1,4 +1,4 @@
-// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
diff --git a/test/io_test.dart b/test/io_test.dart
new file mode 100644
index 0000000..fcf8c7c
--- /dev/null
+++ b/test/io_test.dart
@@ -0,0 +1,101 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+@TestOn('vm')
+
+import 'dart:io';
+
+import 'package:test/test.dart';
+
+import 'package:web_socket_channel/io.dart';
+import 'package:web_socket_channel/web_socket_channel.dart';
+
+void main() {
+ var server;
+ tearDown(() async {
+ if (server != null) await server.close();
+ });
+
+ test("communicates using existing WebSockets", () async {
+ server = await HttpServer.bind("localhost", 0);
+ server.transform(new WebSocketTransformer()).listen((webSocket) {
+ var channel = new IOWebSocketChannel(webSocket);
+ channel.sink.add("hello!");
+ channel.stream.listen((request) {
+ expect(request, equals("ping"));
+ channel.sink.add("pong");
+ channel.sink.close(5678, "raisin");
+ });
+ });
+
+ var webSocket = await WebSocket.connect("ws://localhost:${server.port}");
+ var channel = new IOWebSocketChannel(webSocket);
+
+ var n = 0;
+ channel.stream.listen((message) {
+ if (n == 0) {
+ expect(message, equals("hello!"));
+ channel.sink.add("ping");
+ } else if (n == 1) {
+ expect(message, equals("pong"));
+ } else {
+ fail("Only expected two messages.");
+ }
+ n++;
+ }, onDone: expectAsync(() {
+ expect(channel.closeCode, equals(5678));
+ expect(channel.closeReason, equals("raisin"));
+ }));
+ });
+
+ test(".connect communicates immediately", () async {
+ server = await HttpServer.bind("localhost", 0);
+ server.transform(new WebSocketTransformer()).listen((webSocket) {
+ var channel = new IOWebSocketChannel(webSocket);
+ channel.stream.listen((request) {
+ expect(request, equals("ping"));
+ channel.sink.add("pong");
+ });
+ });
+
+ var channel = new IOWebSocketChannel.connect(
+ "ws://localhost:${server.port}");
+ channel.sink.add("ping");
+
+ channel.stream.listen(expectAsync((message) {
+ expect(message, equals("pong"));
+ channel.sink.close(5678, "raisin");
+ }, count: 1), onDone: expectAsync(() {}));
+ });
+
+ test(".connect with an immediate call to close", () async {
+ server = await HttpServer.bind("localhost", 0);
+ server.transform(new WebSocketTransformer()).listen((webSocket) {
+ expect(() async {
+ var channel = new IOWebSocketChannel(webSocket);
+ await channel.stream.listen(null).asFuture();
+ expect(channel.closeCode, equals(5678));
+ expect(channel.closeReason, equals("raisin"));
+ }(), completes);
+ });
+
+ var channel = new IOWebSocketChannel.connect(
+ "ws://localhost:${server.port}");
+ channel.sink.close(5678, "raisin");
+ });
+
+ test(".connect wraps a connection error in WebSocketChannelException",
+ () async {
+ server = await HttpServer.bind("localhost", 0);
+ server.listen((request) {
+ request.response.statusCode = 404;
+ request.response.close();
+ });
+
+ var channel = new IOWebSocketChannel.connect(
+ "ws://localhost:${server.port}");
+ expect(channel.stream.toList(),
+ throwsA(new isInstanceOf<WebSocketChannelException>()));
+ });
+}