Add an HTML implementation of WebSocketChannel. R=kevmoo@google.com Review URL: https://codereview.chromium.org//1747113003 .
diff --git a/pkgs/web_socket_channel/README.md b/pkgs/web_socket_channel/README.md index 7e39a79..3e984aa 100644 --- a/pkgs/web_socket_channel/README.md +++ b/pkgs/web_socket_channel/README.md
@@ -2,12 +2,14 @@ wrappers for WebSocket connections. It provides a cross-platform [`WebSocketChannel`][WebSocketChannel] API, a cross-platform implementation of that API that communicates over an underlying [`StreamChannel`][stream_channel], -and [an implementation][IOWebSocketChannel] that wraps `dart:io`'s `WebSocket` -class. +[an implementation][IOWebSocketChannel] that wraps `dart:io`'s `WebSocket` +class, and [a similar implementation][HtmlWebSocketChannel] that wrap's +`dart:html`'s. [stream_channel]: https://pub.dartlang.org/packages/stream_channel [WebSocketChannel]: https://www.dartdocs.org/documentation/web_socket_channel/latest/web_socket_channel/WebSocketChannel-class.html [IOWebSocketChannel]: https://www.dartdocs.org/documentation/web_socket_channel/latest/io/IOWebSocketChannel-class.html +[HtmlWebSocketChannel]: https://www.dartdocs.org/documentation/web_socket_channel/latest/html/HtmlWebSocketChannel-class.html ## `WebSocketChannel` @@ -74,3 +76,32 @@ }); } ``` + +## `HtmlWebSocketChannel` + +The [`HtmlWebSocketChannel`][HtmlWebSocketChannel] class wraps +[`dart:html`'s `WebSocket` class][html.WebSocket]. Because it imports +`dart:html`, it has its own library, `package:web_socket_channel/html.dart`. +This allows the main `WebSocketChannel` class to be available on all platforms. + +[html.WebSocket]: https://api.dartlang.org/latest/dart-html/WebSocket-class.html + +An `HtmlWebSocketChannel` can be created by passing a `dart:html` WebSocket to +[its constructor][new HtmlWebSocketChannel]. It's more common to want to connect +directly to a `ws://` or `wss://` URL, in which case +[`new HtmlWebSocketChannel.connect()`][HtmlWebSocketChannel.connect] should be used. + +[new HtmlWebSocketChannel]: https://www.dartdocs.org/documentation/web_socket_channel/latest/html/HtmlWebSocketChannel/HtmlWebSocketChannel.html +[HtmlWebSocketChannel.connect]: https://www.dartdocs.org/documentation/web_socket_channel/latest/html/HtmlWebSocketChannel/HtmlWebSocketChannel.connect.html + +```dart +import 'package:web_socket_channel/html.dart'; + +main() async { + var channel = new HtmlWebSocketChannel.connect("ws://localhost:8181"); + channel.sink.add("connected!"); + channel.sink.listen((message) { + // ... + }); +} +```
diff --git a/pkgs/web_socket_channel/lib/html.dart b/pkgs/web_socket_channel/lib/html.dart new file mode 100644 index 0000000..9d0b070 --- /dev/null +++ b/pkgs/web_socket_channel/lib/html.dart
@@ -0,0 +1,146 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:html'; +import 'dart:typed_data'; + +import 'package:async/async.dart'; +import 'package:stream_channel/stream_channel.dart'; + +import 'src/channel.dart'; +import 'src/exception.dart'; + +/// A [WebSocketChannel] that communicates using a `dart:html` [WebSocket]. +class HtmlWebSocketChannel extends StreamChannelMixin + implements WebSocketChannel { + /// The underlying `dart:html` [WebSocket]. + final WebSocket _webSocket; + + String get protocol => _webSocket.protocol; + + int get closeCode => _closeCode; + int _closeCode; + + String get closeReason => _closeReason; + String _closeReason; + + /// The number of bytes of data that have been queued but not yet transmitted + /// to the network. + int get bufferedAmount => _webSocket.bufferedAmount; + + /// The close code set by the local user. + /// + /// To ensure proper ordering, this is stored until we get a done event on + /// [_controller.local.stream]. + int _localCloseCode; + + /// The close reason set by the local user. + /// + /// To ensure proper ordering, this is stored until we get a done event on + /// [_controller.local.stream]. + String _localCloseReason; + + Stream get stream => _controller.foreign.stream; + final _controller = new StreamChannelController( + sync: true, allowForeignErrors: false); + + WebSocketSink get sink => _sink; + WebSocketSink _sink; + + /// Creates a new WebSocket connection. + /// + /// Connects to [url] using [new WebSocket] and returns a channel that can be + /// used to communicate over the resulting socket. The [url] may be either a + /// [String] or a [Uri]. The [protocols] parameter is the same as for + /// [new WebSocket]. + /// + /// The [binaryType] parameter controls what type is used for binary messages + /// received by this socket. It defaults to [BinaryType.list], which causes + /// binary messages to be delivered as [Uint8List]s. If it's + /// [BinaryType.blob], they're delivered as [Blob]s instead. + HtmlWebSocketChannel.connect(url, {Iterable<String> protocols, + BinaryType binaryType}) + : this(new WebSocket(url.toString(), protocols) + ..binaryType = (binaryType ?? BinaryType.list).value); + + /// Creates a channel wrapping [webSocket]. + HtmlWebSocketChannel(this._webSocket){ + _sink = new _HtmlWebSocketSink(this); + + if (_webSocket.readyState == WebSocket.OPEN) { + _listen(); + } else { + // The socket API guarantees that only a single open event will be + // emitted. + _webSocket.onOpen.first.then((_) { + _listen(); + }); + } + + // The socket API guarantees that only a single error event will be emitted, + // and that once it is no open or message events will be emitted. + _webSocket.onError.first.then((_) { + _controller.local.sink.addError( + new WebSocketChannelException("WebSocket connection failed.")); + _controller.local.sink.close(); + }); + + _webSocket.onMessage.listen((event) { + var data = event.data; + if (data is ByteBuffer) data = data.asUint8List(); + _controller.local.sink.add(data); + }); + + // The socket API guarantees that only a single error event will be emitted, + // and that once it is no other events will be emitted. + _webSocket.onClose.first.then((event) { + _closeCode = event.code; + _closeReason = event.reason; + _controller.local.sink.close(); + }); + } + + /// Pipes user events to [_webSocket]. + void _listen() { + _controller.local.stream.listen((message) => _webSocket.send(message), + onDone: () => _webSocket.close(_localCloseCode, _localCloseReason)); + } +} + +/// A [WebSocketSink] that tracks the close code and reason passed to [close]. +class _HtmlWebSocketSink extends DelegatingStreamSink implements WebSocketSink { + /// The channel to which this sink belongs. + final HtmlWebSocketChannel _channel; + + _HtmlWebSocketSink(HtmlWebSocketChannel channel) + : super(channel._controller.foreign.sink), + _channel = channel; + + Future close([int closeCode, String closeReason]) { + _channel._localCloseCode = closeCode; + _channel._localCloseReason = closeReason; + return super.close(); + } +} + +/// An enum for choosing what type [HtmlWebSocketChannel] emits for binary +/// messages. +class BinaryType { + /// Tells the channel to emit binary messages as [Blob]s. + static const blob = const BinaryType._("blob", "blob"); + + /// Tells the channel to emit binary messages as [Uint8List]s. + static const list = const BinaryType._("list", "arraybuffer"); + + /// The name of the binary type, which matches its variable name. + final String name; + + /// The value as understood by the underlying [WebSocket] API. + final String value; + + const BinaryType._(this.name, this.value); + + String toString() => name; +}
diff --git a/pkgs/web_socket_channel/test/html_test.dart b/pkgs/web_socket_channel/test/html_test.dart new file mode 100644 index 0000000..f5c3555 --- /dev/null +++ b/pkgs/web_socket_channel/test/html_test.dart
@@ -0,0 +1,92 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +@TestOn('browser') +@Skip( + "This suite requires a WebSocket server, which is currently unsupported\n" + "by the test package (dart-lang/test#330). It's currently set up to talk\n" + "to a hard-coded server on localhost:1234 that is spawned in \n" + "html_test_server.dart.") + +import 'dart:async'; +import 'dart:html'; +import 'dart:typed_data'; + +import 'package:async/async.dart'; +import 'package:test/test.dart'; + +import 'package:web_socket_channel/html.dart'; + +void main() { + var channel; + tearDown(() { + if (channel != null) channel.sink.close(); + }); + + test("communicates using an existing WebSocket", () async { + var webSocket = new WebSocket("ws://localhost:1234"); + channel = new HtmlWebSocketChannel(webSocket); + + var queue = new StreamQueue(channel.stream); + channel.sink.add("foo"); + expect(await queue.next, equals("foo")); + + channel.sink.add(new Uint8List.fromList([1, 2, 3, 4, 5])); + expect(await _decodeBlob(await queue.next), equals([1, 2, 3, 4, 5])); + + webSocket.binaryType = "arraybuffer"; + channel.sink.add(new Uint8List.fromList([1, 2, 3, 4, 5])); + expect(await queue.next, equals([1, 2, 3, 4, 5])); + }); + + test("communicates using an existing open WebSocket", () async { + var webSocket = new WebSocket("ws://localhost:1234"); + await webSocket.onOpen.first; + + channel = new HtmlWebSocketChannel(webSocket); + + var queue = new StreamQueue(channel.stream); + channel.sink.add("foo"); + expect(await queue.next, equals("foo")); + }); + + test(".connect defaults to binary lists", () async { + channel = new HtmlWebSocketChannel.connect("ws://localhost:1234"); + + var queue = new StreamQueue(channel.stream); + channel.sink.add("foo"); + expect(await queue.next, equals("foo")); + + channel.sink.add(new Uint8List.fromList([1, 2, 3, 4, 5])); + expect(await queue.next, equals([1, 2, 3, 4, 5])); + }); + + test(".connect can use blobs", () async { + channel = new HtmlWebSocketChannel.connect( + "ws://localhost:1234", binaryType: BinaryType.blob); + + var queue = new StreamQueue(channel.stream); + channel.sink.add("foo"); + expect(await queue.next, equals("foo")); + + channel.sink.add(new Uint8List.fromList([1, 2, 3, 4, 5])); + expect(await _decodeBlob(await queue.next), equals([1, 2, 3, 4, 5])); + }); + + test(".connect wraps a connection error in WebSocketChannelException", + () async { + // TODO(nweiz): Make this channel use a port number that's guaranteed to be + // invalid. + var channel = new HtmlWebSocketChannel.connect("ws://localhost:1235"); + expect(channel.stream.toList(), + throwsA(new isInstanceOf<WebSocketChannelException>())); + }); +} + +Future<List<int>> _decodeBlob(Blob blob) async { + var reader = new FileReader(); + reader.readAsArrayBuffer(blob); + await reader.onLoad.first; + return reader.result; +}
diff --git a/pkgs/web_socket_channel/test/html_test_server.dart b/pkgs/web_socket_channel/test/html_test_server.dart new file mode 100644 index 0000000..e356695 --- /dev/null +++ b/pkgs/web_socket_channel/test/html_test_server.dart
@@ -0,0 +1,16 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:io'; + +main() async { + var server = await HttpServer.bind("localhost", 1234); + server.transform(new WebSocketTransformer()).listen((webSocket) { + print("connected"); + webSocket.listen((request) { + print("got $request"); + webSocket.add(request); + }); + }); +}