Copy the web socket stuff from http_parser.
I'm moving this to a separate package where it makes sense to also have
implementations of WebSocketChannel that wrap the dart:io and dart:html
WebSocket classes. This will make it a lot easier for users to use
libraries (such as json_rpc_2) that require StreamChannels with
well-behaved socket channels.
R=kevmoo@google.com
Review URL: https://codereview.chromium.org//1734773002 .
diff --git a/codereview.settings b/codereview.settings
new file mode 100644
index 0000000..9770fa1
--- /dev/null
+++ b/codereview.settings
@@ -0,0 +1,3 @@
+CODE_REVIEW_SERVER: http://codereview.chromium.org/
+VIEW_VC: https://github.com/dart-lang/web_socket_channel/commit/
+CC_LIST: reviews@dartlang.org
\ No newline at end of file
diff --git a/lib/src/channel.dart b/lib/src/channel.dart
new file mode 100644
index 0000000..a8219a1
--- /dev/null
+++ b/lib/src/channel.dart
@@ -0,0 +1,127 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:crypto/crypto.dart';
+import 'package:stream_channel/stream_channel.dart';
+
+import 'copy/web_socket_impl.dart';
+
+/// A [StreamChannel] that communicates over a WebSocket.
+///
+/// This is implemented by classes that use `dart:io` and `dart:html`. The [new
+/// WebSocketChannel] constructor can also be used on any platform to connect to
+/// use the WebSocket protocol over a pre-existing channel.
+///
+/// All implementations emit [WebSocketChannelException]s. These exceptions wrap
+/// the native exception types where possible.
+class WebSocketChannel extends StreamChannelMixin {
+ /// The underlying web socket.
+ ///
+ /// This is essentially a copy of `dart:io`'s WebSocket implementation, with
+ /// the IO-specific pieces factored out.
+ final WebSocketImpl _webSocket;
+
+ /// The interval for sending ping signals.
+ ///
+ /// If a ping message is not answered by a pong message from the peer, the
+ /// `WebSocket` is assumed disconnected and the connection is closed with a
+ /// [WebSocketStatus.GOING_AWAY] close code. When a ping signal is sent, the
+ /// pong message must be received within [pingInterval].
+ ///
+ /// There are never two outstanding pings at any given time, and the next ping
+ /// timer starts when the pong is received.
+ ///
+ /// By default, the [pingInterval] is `null`, indicating that ping messages
+ /// are disabled. Some implementations may not support setting it.
+ Duration get pingInterval => _webSocket.pingInterval;
+ set pingInterval(Duration value) => _webSocket.pingInterval = value;
+
+ /// The subprotocol selected by the server.
+ ///
+ /// For a client socket, this is initially `null`. After the WebSocket
+ /// connection is established the value is set to the subprotocol selected by
+ /// the server. If no subprotocol is negotiated the value will remain `null`.
+ String get protocol => _webSocket.protocol;
+
+ /// The [close code][] set when the WebSocket connection is closed.
+ ///
+ /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5
+ ///
+ /// Before the connection has been closed, this will be `null`.
+ int get closeCode => _webSocket.closeCode;
+
+ /// The [close reason][] set when the WebSocket connection is closed.
+ ///
+ /// [close reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6
+ ///
+ /// Before the connection has been closed, this will be `null`.
+ String get closeReason => _webSocket.closeReason;
+
+ Stream get stream => new StreamView(_webSocket);
+
+ /// The sink for sending values to the other endpoint.
+ ///
+ /// This supports additional arguments to [WebSocketSink.close] that provide
+ /// the remote endpoint reasons for closing the connection.
+ WebSocketSink get sink => new WebSocketSink._(_webSocket);
+
+ /// Signs a `Sec-WebSocket-Key` header sent by a WebSocket client as part of
+ /// the [initial handshake].
+ ///
+ /// The return value should be sent back to the client in a
+ /// `Sec-WebSocket-Accept` header.
+ ///
+ /// [initial handshake]: https://tools.ietf.org/html/rfc6455#section-4.2.2
+ static String signKey(String key) {
+ var hash = new SHA1();
+ // We use [codeUnits] here rather than UTF-8-decoding the string because
+ // [key] is expected to be base64 encoded, and so will be pure ASCII.
+ hash.add((key + webSocketGUID).codeUnits);
+ return CryptoUtils.bytesToBase64(hash.close());
+ }
+
+ /// Creates a new WebSocket handling messaging across an existing [channel].
+ ///
+ /// This is a cross-platform constructor; it doesn't use either `dart:io` or
+ /// `dart:html`. It's also HTTP-API-agnostic, which means that the initial
+ /// [WebSocket handshake][] must have already been completed on the socket
+ /// before this is called.
+ ///
+ /// [protocol] should be the protocol negotiated by this handshake, if any.
+ ///
+ /// If this is a WebSocket server, [serverSide] should be `true` (the
+ /// default); if it's a client, [serverSide] should be `false`.
+ ///
+ /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4
+ WebSocketChannel(StreamChannel<List<int>> channel,
+ {String protocol, bool serverSide: true})
+ : _webSocket = new WebSocketImpl.fromSocket(
+ channel.stream, channel.sink, protocol, serverSide);
+}
+
+/// The sink exposed by a [WebSocketChannel].
+///
+/// This is like a normal [StreamSink], except that it supports extra arguments
+/// to [close].
+class WebSocketSink extends DelegatingStreamSink {
+ final WebSocketImpl _webSocket;
+
+ WebSocketSink._(WebSocketImpl webSocket)
+ : super(webSocket),
+ _webSocket = webSocket;
+
+ /// Closes the web socket connection.
+ ///
+ /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent
+ /// to the remote peer, respectively. If they are omitted, the peer will see
+ /// a "no status received" code with no reason.
+ ///
+ /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5
+ /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6
+ Future close([int closeCode, String closeReason]) =>
+ _webSocket.close(closeCode, closeReason);
+}
diff --git a/lib/src/copy/bytes_builder.dart b/lib/src/copy/bytes_builder.dart
new file mode 100644
index 0000000..39d44fe
--- /dev/null
+++ b/lib/src/copy/bytes_builder.dart
@@ -0,0 +1,215 @@
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// This is a copy of "dart:io"'s BytesBuilder implementation, from
+// sdk/lib/io/bytes_builder.dart. It's copied here to make it available to
+// non-"dart:io" applications (issue 18348).
+//
+// Because it's copied directly, there are no modifications from the original.
+//
+// This is up-to-date as of sdk revision
+// 86227840d75d974feb238f8b3c59c038b99c05cf.
+import 'dart:math';
+import 'dart:typed_data';
+
+/**
+ * Builds a list of bytes, allowing bytes and lists of bytes to be added at the
+ * end.
+ *
+ * Used to efficiently collect bytes and lists of bytes.
+ */
+abstract class BytesBuilder {
+ /**
+ * Construct a new empty [BytesBuilder].
+ *
+ * If [copy] is true, the data is always copied when added to the list. If
+ * it [copy] is false, the data is only copied if needed. That means that if
+ * the lists are changed after added to the [BytesBuilder], it may effect the
+ * output. Default is `true`.
+ */
+ factory BytesBuilder({bool copy: true}) {
+ if (copy) {
+ return new _CopyingBytesBuilder();
+ } else {
+ return new _BytesBuilder();
+ }
+ }
+
+ /**
+ * Appends [bytes] to the current contents of the builder.
+ *
+ * Each value of [bytes] will be bit-representation truncated to the range
+ * 0 .. 255.
+ */
+ void add(List<int> bytes);
+
+ /**
+ * Append [byte] to the current contents of the builder.
+ *
+ * The [byte] will be bit-representation truncated to the range 0 .. 255.
+ */
+ void addByte(int byte);
+
+ /**
+ * Returns the contents of `this` and clears `this`.
+ *
+ * The list returned is a view of the the internal buffer, limited to the
+ * [length].
+ */
+ List<int> takeBytes();
+
+ /**
+ * Returns a copy of the current contents of the builder.
+ *
+ * Leaves the contents of the builder intact.
+ */
+ List<int> toBytes();
+
+ /**
+ * The number of bytes in the builder.
+ */
+ int get length;
+
+ /**
+ * Returns `true` if the buffer is empty.
+ */
+ bool get isEmpty;
+
+ /**
+ * Returns `true` if the buffer is not empty.
+ */
+ bool get isNotEmpty;
+
+ /**
+ * Clear the contents of the builder.
+ */
+ void clear();
+}
+
+
+class _CopyingBytesBuilder implements BytesBuilder {
+ // Start with 1024 bytes.
+ static const int _INIT_SIZE = 1024;
+
+ int _length = 0;
+ Uint8List _buffer;
+
+ void add(List<int> bytes) {
+ int bytesLength = bytes.length;
+ if (bytesLength == 0) return;
+ int required = _length + bytesLength;
+ if (_buffer == null) {
+ int size = _pow2roundup(required);
+ size = max(size, _INIT_SIZE);
+ _buffer = new Uint8List(size);
+ } else if (_buffer.length < required) {
+ // We will create a list in the range of 2-4 times larger than
+ // required.
+ int size = _pow2roundup(required) * 2;
+ var newBuffer = new Uint8List(size);
+ newBuffer.setRange(0, _buffer.length, _buffer);
+ _buffer = newBuffer;
+ }
+ assert(_buffer.length >= required);
+ if (bytes is Uint8List) {
+ _buffer.setRange(_length, required, bytes);
+ } else {
+ for (int i = 0; i < bytesLength; i++) {
+ _buffer[_length + i] = bytes[i];
+ }
+ }
+ _length = required;
+ }
+
+ void addByte(int byte) => add([byte]);
+
+ List<int> takeBytes() {
+ if (_buffer == null) return new Uint8List(0);
+ var buffer = new Uint8List.view(_buffer.buffer, 0, _length);
+ clear();
+ return buffer;
+ }
+
+ List<int> toBytes() {
+ if (_buffer == null) return new Uint8List(0);
+ return new Uint8List.fromList(
+ new Uint8List.view(_buffer.buffer, 0, _length));
+ }
+
+ int get length => _length;
+
+ bool get isEmpty => _length == 0;
+
+ bool get isNotEmpty => _length != 0;
+
+ void clear() {
+ _length = 0;
+ _buffer = null;
+ }
+
+ int _pow2roundup(int x) {
+ --x;
+ x |= x >> 1;
+ x |= x >> 2;
+ x |= x >> 4;
+ x |= x >> 8;
+ x |= x >> 16;
+ return x + 1;
+ }
+}
+
+
+class _BytesBuilder implements BytesBuilder {
+ int _length = 0;
+ final List _chunks = [];
+
+ void add(List<int> bytes) {
+ if (bytes is! Uint8List) {
+ bytes = new Uint8List.fromList(bytes);
+ }
+ _chunks.add(bytes);
+ _length += bytes.length;
+ }
+
+ void addByte(int byte) => add([byte]);
+
+ List<int> takeBytes() {
+ if (_chunks.length == 0) return new Uint8List(0);
+ if (_chunks.length == 1) {
+ var buffer = _chunks.single;
+ clear();
+ return buffer;
+ }
+ var buffer = new Uint8List(_length);
+ int offset = 0;
+ for (var chunk in _chunks) {
+ buffer.setRange(offset, offset + chunk.length, chunk);
+ offset += chunk.length;
+ }
+ clear();
+ return buffer;
+ }
+
+ List<int> toBytes() {
+ if (_chunks.length == 0) return new Uint8List(0);
+ var buffer = new Uint8List(_length);
+ int offset = 0;
+ for (var chunk in _chunks) {
+ buffer.setRange(offset, offset + chunk.length, chunk);
+ offset += chunk.length;
+ }
+ return buffer;
+ }
+
+ int get length => _length;
+
+ bool get isEmpty => _length == 0;
+
+ bool get isNotEmpty => _length != 0;
+
+ void clear() {
+ _length = 0;
+ _chunks.clear();
+ }
+}
diff --git a/lib/src/copy/io_sink.dart b/lib/src/copy/io_sink.dart
new file mode 100644
index 0000000..0578bdb
--- /dev/null
+++ b/lib/src/copy/io_sink.dart
@@ -0,0 +1,145 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// The following code is copied from sdk/lib/io/io_sink.dart. The "dart:io"
+// implementation isn't used directly to support non-"dart:io" applications.
+//
+// Because it's copied directly, only modifications necessary to support the
+// desired public API and to remove "dart:io" dependencies have been made.
+//
+// This is up-to-date as of sdk revision
+// 86227840d75d974feb238f8b3c59c038b99c05cf.
+import 'dart:async';
+
+class StreamSinkImpl<T> implements StreamSink<T> {
+ final StreamConsumer<T> _target;
+ Completer _doneCompleter = new Completer();
+ Future _doneFuture;
+ StreamController<T> _controllerInstance;
+ Completer _controllerCompleter;
+ bool _isClosed = false;
+ bool _isBound = false;
+ bool _hasError = false;
+
+ StreamSinkImpl(this._target) {
+ _doneFuture = _doneCompleter.future;
+ }
+
+ void add(T data) {
+ if (_isClosed) return;
+ _controller.add(data);
+ }
+
+ void addError(error, [StackTrace stackTrace]) {
+ _controller.addError(error, stackTrace);
+ }
+
+ Future addStream(Stream<T> stream) {
+ if (_isBound) {
+ throw new StateError("StreamSink is already bound to a stream");
+ }
+ _isBound = true;
+ if (_hasError) return done;
+ // Wait for any sync operations to complete.
+ Future targetAddStream() {
+ return _target.addStream(stream)
+ .whenComplete(() {
+ _isBound = false;
+ });
+ }
+ if (_controllerInstance == null) return targetAddStream();
+ var future = _controllerCompleter.future;
+ _controllerInstance.close();
+ return future.then((_) => targetAddStream());
+ }
+
+ Future flush() {
+ if (_isBound) {
+ throw new StateError("StreamSink is bound to a stream");
+ }
+ if (_controllerInstance == null) return new Future.value(this);
+ // Adding an empty stream-controller will return a future that will complete
+ // when all data is done.
+ _isBound = true;
+ var future = _controllerCompleter.future;
+ _controllerInstance.close();
+ return future.whenComplete(() {
+ _isBound = false;
+ });
+ }
+
+ Future close() {
+ if (_isBound) {
+ throw new StateError("StreamSink is bound to a stream");
+ }
+ if (!_isClosed) {
+ _isClosed = true;
+ if (_controllerInstance != null) {
+ _controllerInstance.close();
+ } else {
+ _closeTarget();
+ }
+ }
+ return done;
+ }
+
+ void _closeTarget() {
+ _target.close().then(_completeDoneValue, onError: _completeDoneError);
+ }
+
+ Future get done => _doneFuture;
+
+ void _completeDoneValue(value) {
+ if (_doneCompleter == null) return;
+ _doneCompleter.complete(value);
+ _doneCompleter = null;
+ }
+
+ void _completeDoneError(error, StackTrace stackTrace) {
+ if (_doneCompleter == null) return;
+ _hasError = true;
+ _doneCompleter.completeError(error, stackTrace);
+ _doneCompleter = null;
+ }
+
+ StreamController<T> get _controller {
+ if (_isBound) {
+ throw new StateError("StreamSink is bound to a stream");
+ }
+ if (_isClosed) {
+ throw new StateError("StreamSink is closed");
+ }
+ if (_controllerInstance == null) {
+ _controllerInstance = new StreamController<T>(sync: true);
+ _controllerCompleter = new Completer();
+ _target.addStream(_controller.stream)
+ .then(
+ (_) {
+ if (_isBound) {
+ // A new stream takes over - forward values to that stream.
+ _controllerCompleter.complete(this);
+ _controllerCompleter = null;
+ _controllerInstance = null;
+ } else {
+ // No new stream, .close was called. Close _target.
+ _closeTarget();
+ }
+ },
+ onError: (error, stackTrace) {
+ if (_isBound) {
+ // A new stream takes over - forward errors to that stream.
+ _controllerCompleter.completeError(error, stackTrace);
+ _controllerCompleter = null;
+ _controllerInstance = null;
+ } else {
+ // No new stream. No need to close target, as it have already
+ // failed.
+ _completeDoneError(error, stackTrace);
+ }
+ });
+ }
+ return _controllerInstance;
+ }
+}
+
diff --git a/lib/src/copy/web_socket.dart b/lib/src/copy/web_socket.dart
new file mode 100644
index 0000000..53460ba
--- /dev/null
+++ b/lib/src/copy/web_socket.dart
@@ -0,0 +1,40 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// The following code is copied from sdk/lib/io/websocket.dart. The "dart:io"
+// implementation isn't used directly to support non-"dart:io" applications.
+//
+// Because it's copied directly, only modifications necessary to support the
+// desired public API and to remove "dart:io" dependencies have been made.
+//
+// This is up-to-date as of sdk revision
+// 86227840d75d974feb238f8b3c59c038b99c05cf.
+/**
+ * Web socket status codes used when closing a web socket connection.
+ */
+abstract class WebSocketStatus {
+ static const int NORMAL_CLOSURE = 1000;
+ static const int GOING_AWAY = 1001;
+ static const int PROTOCOL_ERROR = 1002;
+ static const int UNSUPPORTED_DATA = 1003;
+ static const int RESERVED_1004 = 1004;
+ static const int NO_STATUS_RECEIVED = 1005;
+ static const int ABNORMAL_CLOSURE = 1006;
+ static const int INVALID_FRAME_PAYLOAD_DATA = 1007;
+ static const int POLICY_VIOLATION = 1008;
+ static const int MESSAGE_TOO_BIG = 1009;
+ static const int MISSING_MANDATORY_EXTENSION = 1010;
+ static const int INTERNAL_SERVER_ERROR = 1011;
+ static const int RESERVED_1015 = 1015;
+}
+
+abstract class WebSocket {
+ /**
+ * Possible states of the connection.
+ */
+ static const int CONNECTING = 0;
+ static const int OPEN = 1;
+ static const int CLOSING = 2;
+ static const int CLOSED = 3;
+}
diff --git a/lib/src/copy/web_socket_impl.dart b/lib/src/copy/web_socket_impl.dart
new file mode 100644
index 0000000..c10d8e8
--- /dev/null
+++ b/lib/src/copy/web_socket_impl.dart
@@ -0,0 +1,860 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// The following code is copied from sdk/lib/io/websocket_impl.dart. The
+// "dart:io" implementation isn't used directly to support non-"dart:io"
+// applications.
+//
+// Because it's copied directly, only modifications necessary to support the
+// desired public API and to remove "dart:io" dependencies have been made.
+//
+// This is up-to-date as of sdk revision
+// 86227840d75d974feb238f8b3c59c038b99c05cf.
+import 'dart:async';
+import 'dart:convert';
+import 'dart:math';
+import 'dart:typed_data';
+
+import '../exception.dart';
+import 'bytes_builder.dart';
+import 'io_sink.dart';
+import 'web_socket.dart';
+
+const String webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+
+final _random = new Random();
+
+// Matches _WebSocketOpcode.
+class _WebSocketMessageType {
+ static const int NONE = 0;
+ static const int TEXT = 1;
+ static const int BINARY = 2;
+}
+
+
+class _WebSocketOpcode {
+ static const int CONTINUATION = 0;
+ static const int TEXT = 1;
+ static const int BINARY = 2;
+ static const int RESERVED_3 = 3;
+ static const int RESERVED_4 = 4;
+ static const int RESERVED_5 = 5;
+ static const int RESERVED_6 = 6;
+ static const int RESERVED_7 = 7;
+ static const int CLOSE = 8;
+ static const int PING = 9;
+ static const int PONG = 10;
+ static const int RESERVED_B = 11;
+ static const int RESERVED_C = 12;
+ static const int RESERVED_D = 13;
+ static const int RESERVED_E = 14;
+ static const int RESERVED_F = 15;
+}
+
+/**
+ * The web socket protocol transformer handles the protocol byte stream
+ * which is supplied through the [:handleData:]. As the protocol is processed,
+ * it'll output frame data as either a List<int> or String.
+ *
+ * Important infomation about usage: Be sure you use cancelOnError, so the
+ * socket will be closed when the processer encounter an error. Not using it
+ * will lead to undefined behaviour.
+ */
+// TODO(ajohnsen): make this transformer reusable?
+class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
+ static const int START = 0;
+ static const int LEN_FIRST = 1;
+ static const int LEN_REST = 2;
+ static const int MASK = 3;
+ static const int PAYLOAD = 4;
+ static const int CLOSED = 5;
+ static const int FAILURE = 6;
+
+ int _state = START;
+ bool _fin = false;
+ int _opcode = -1;
+ int _len = -1;
+ bool _masked = false;
+ int _remainingLenBytes = -1;
+ int _remainingMaskingKeyBytes = 4;
+ int _remainingPayloadBytes = -1;
+ int _unmaskingIndex = 0;
+ int _currentMessageType = _WebSocketMessageType.NONE;
+ int closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
+ String closeReason = "";
+
+ EventSink _eventSink;
+
+ final bool _serverSide;
+ final List _maskingBytes = new List(4);
+ final BytesBuilder _payload = new BytesBuilder(copy: false);
+
+ _WebSocketProtocolTransformer([this._serverSide = false]);
+
+ Stream bind(Stream stream) {
+ return new Stream.eventTransformed(
+ stream,
+ (EventSink eventSink) {
+ if (_eventSink != null) {
+ throw new StateError("WebSocket transformer already used.");
+ }
+ _eventSink = eventSink;
+ return this;
+ });
+ }
+
+ void addError(Object error, [StackTrace stackTrace]) =>
+ _eventSink.addError(error, stackTrace);
+
+ void close() => _eventSink.close();
+
+ /**
+ * Process data received from the underlying communication channel.
+ */
+ void add(Uint8List buffer) {
+ int count = buffer.length;
+ int index = 0;
+ int lastIndex = count;
+ if (_state == CLOSED) {
+ throw new WebSocketChannelException("Data on closed connection");
+ }
+ if (_state == FAILURE) {
+ throw new WebSocketChannelException("Data on failed connection");
+ }
+ while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) {
+ int byte = buffer[index];
+ if (_state <= LEN_REST) {
+ if (_state == START) {
+ _fin = (byte & 0x80) != 0;
+ if ((byte & 0x70) != 0) {
+ // The RSV1, RSV2 bits RSV3 must be all zero.
+ throw new WebSocketChannelException("Protocol error");
+ }
+ _opcode = (byte & 0xF);
+ if (_opcode <= _WebSocketOpcode.BINARY) {
+ if (_opcode == _WebSocketOpcode.CONTINUATION) {
+ if (_currentMessageType == _WebSocketMessageType.NONE) {
+ throw new WebSocketChannelException("Protocol error");
+ }
+ } else {
+ assert(_opcode == _WebSocketOpcode.TEXT ||
+ _opcode == _WebSocketOpcode.BINARY);
+ if (_currentMessageType != _WebSocketMessageType.NONE) {
+ throw new WebSocketChannelException("Protocol error");
+ }
+ _currentMessageType = _opcode;
+ }
+ } else if (_opcode >= _WebSocketOpcode.CLOSE &&
+ _opcode <= _WebSocketOpcode.PONG) {
+ // Control frames cannot be fragmented.
+ if (!_fin) throw new WebSocketChannelException("Protocol error");
+ } else {
+ throw new WebSocketChannelException("Protocol error");
+ }
+ _state = LEN_FIRST;
+ } else if (_state == LEN_FIRST) {
+ _masked = (byte & 0x80) != 0;
+ _len = byte & 0x7F;
+ if (_isControlFrame() && _len > 125) {
+ throw new WebSocketChannelException("Protocol error");
+ }
+ if (_len == 126) {
+ _len = 0;
+ _remainingLenBytes = 2;
+ _state = LEN_REST;
+ } else if (_len == 127) {
+ _len = 0;
+ _remainingLenBytes = 8;
+ _state = LEN_REST;
+ } else {
+ assert(_len < 126);
+ _lengthDone();
+ }
+ } else {
+ assert(_state == LEN_REST);
+ _len = _len << 8 | byte;
+ _remainingLenBytes--;
+ if (_remainingLenBytes == 0) {
+ _lengthDone();
+ }
+ }
+ } else {
+ if (_state == MASK) {
+ _maskingBytes[4 - _remainingMaskingKeyBytes--] = byte;
+ if (_remainingMaskingKeyBytes == 0) {
+ _maskDone();
+ }
+ } else {
+ assert(_state == PAYLOAD);
+ // The payload is not handled one byte at a time but in blocks.
+ int payloadLength = min(lastIndex - index, _remainingPayloadBytes);
+ _remainingPayloadBytes -= payloadLength;
+ // Unmask payload if masked.
+ if (_masked) {
+ _unmask(index, payloadLength, buffer);
+ }
+ // Control frame and data frame share _payloads.
+ _payload.add(
+ new Uint8List.view(buffer.buffer, index, payloadLength));
+ index += payloadLength;
+ if (_isControlFrame()) {
+ if (_remainingPayloadBytes == 0) _controlFrameEnd();
+ } else {
+ if (_currentMessageType != _WebSocketMessageType.TEXT &&
+ _currentMessageType != _WebSocketMessageType.BINARY) {
+ throw new WebSocketChannelException("Protocol error");
+ }
+ if (_remainingPayloadBytes == 0) _messageFrameEnd();
+ }
+
+ // Hack - as we always do index++ below.
+ index--;
+ }
+ }
+
+ // Move to the next byte.
+ index++;
+ }
+ }
+
+ void _unmask(int index, int length, Uint8List buffer) {
+ const int BLOCK_SIZE = 16;
+ // Skip Int32x4-version if message is small.
+ if (length >= BLOCK_SIZE) {
+ // Start by aligning to 16 bytes.
+ final int startOffset = BLOCK_SIZE - (index & 15);
+ final int end = index + startOffset;
+ for (int i = index; i < end; i++) {
+ buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3];
+ }
+ index += startOffset;
+ length -= startOffset;
+ final int blockCount = length ~/ BLOCK_SIZE;
+ if (blockCount > 0) {
+ // Create mask block.
+ int mask = 0;
+ for (int i = 3; i >= 0; i--) {
+ mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3];
+ }
+ Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
+ Int32x4List blockBuffer = new Int32x4List.view(
+ buffer.buffer, index, blockCount);
+ for (int i = 0; i < blockBuffer.length; i++) {
+ blockBuffer[i] ^= blockMask;
+ }
+ final int bytes = blockCount * BLOCK_SIZE;
+ index += bytes;
+ length -= bytes;
+ }
+ }
+ // Handle end.
+ final int end = index + length;
+ for (int i = index; i < end; i++) {
+ buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3];
+ }
+ }
+
+ void _lengthDone() {
+ if (_masked) {
+ if (!_serverSide) {
+ throw new WebSocketChannelException(
+ "Received masked frame from server");
+ }
+ _state = MASK;
+ } else {
+ if (_serverSide) {
+ throw new WebSocketChannelException(
+ "Received unmasked frame from client");
+ }
+ _remainingPayloadBytes = _len;
+ _startPayload();
+ }
+ }
+
+ void _maskDone() {
+ _remainingPayloadBytes = _len;
+ _startPayload();
+ }
+
+ void _startPayload() {
+ // If there is no actual payload perform perform callbacks without
+ // going through the PAYLOAD state.
+ if (_remainingPayloadBytes == 0) {
+ if (_isControlFrame()) {
+ switch (_opcode) {
+ case _WebSocketOpcode.CLOSE:
+ _state = CLOSED;
+ _eventSink.close();
+ break;
+ case _WebSocketOpcode.PING:
+ _eventSink.add(new _WebSocketPing());
+ break;
+ case _WebSocketOpcode.PONG:
+ _eventSink.add(new _WebSocketPong());
+ break;
+ }
+ _prepareForNextFrame();
+ } else {
+ _messageFrameEnd();
+ }
+ } else {
+ _state = PAYLOAD;
+ }
+ }
+
+ void _messageFrameEnd() {
+ if (_fin) {
+ switch (_currentMessageType) {
+ case _WebSocketMessageType.TEXT:
+ _eventSink.add(UTF8.decode(_payload.takeBytes()));
+ break;
+ case _WebSocketMessageType.BINARY:
+ _eventSink.add(_payload.takeBytes());
+ break;
+ }
+ _currentMessageType = _WebSocketMessageType.NONE;
+ }
+ _prepareForNextFrame();
+ }
+
+ void _controlFrameEnd() {
+ switch (_opcode) {
+ case _WebSocketOpcode.CLOSE:
+ closeCode = WebSocketStatus.NO_STATUS_RECEIVED;
+ var payload = _payload.takeBytes();
+ if (payload.length > 0) {
+ if (payload.length == 1) {
+ throw new WebSocketChannelException("Protocol error");
+ }
+ closeCode = payload[0] << 8 | payload[1];
+ if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) {
+ throw new WebSocketChannelException("Protocol error");
+ }
+ if (payload.length > 2) {
+ closeReason = UTF8.decode(payload.sublist(2));
+ }
+ }
+ _state = CLOSED;
+ _eventSink.close();
+ break;
+
+ case _WebSocketOpcode.PING:
+ _eventSink.add(new _WebSocketPing(_payload.takeBytes()));
+ break;
+
+ case _WebSocketOpcode.PONG:
+ _eventSink.add(new _WebSocketPong(_payload.takeBytes()));
+ break;
+ }
+ _prepareForNextFrame();
+ }
+
+ bool _isControlFrame() {
+ return _opcode == _WebSocketOpcode.CLOSE ||
+ _opcode == _WebSocketOpcode.PING ||
+ _opcode == _WebSocketOpcode.PONG;
+ }
+
+ void _prepareForNextFrame() {
+ if (_state != CLOSED && _state != FAILURE) _state = START;
+ _fin = false;
+ _opcode = -1;
+ _len = -1;
+ _remainingLenBytes = -1;
+ _remainingMaskingKeyBytes = 4;
+ _remainingPayloadBytes = -1;
+ _unmaskingIndex = 0;
+ }
+}
+
+
+class _WebSocketPing {
+ final List<int> payload;
+ _WebSocketPing([this.payload = null]);
+}
+
+
+class _WebSocketPong {
+ final List<int> payload;
+ _WebSocketPong([this.payload = null]);
+}
+
+// TODO(ajohnsen): Make this transformer reusable.
+class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
+ final WebSocketImpl webSocket;
+ EventSink _eventSink;
+
+ _WebSocketOutgoingTransformer(this.webSocket);
+
+ Stream bind(Stream stream) {
+ return new Stream.eventTransformed(
+ stream,
+ (EventSink eventSink) {
+ if (_eventSink != null) {
+ throw new StateError("WebSocket transformer already used");
+ }
+ _eventSink = eventSink;
+ return this;
+ });
+ }
+
+ void add(message) {
+ if (message is _WebSocketPong) {
+ addFrame(_WebSocketOpcode.PONG, message.payload);
+ return;
+ }
+ if (message is _WebSocketPing) {
+ addFrame(_WebSocketOpcode.PING, message.payload);
+ return;
+ }
+ List<int> data;
+ int opcode;
+ if (message != null) {
+ if (message is String) {
+ opcode = _WebSocketOpcode.TEXT;
+ data = UTF8.encode(message);
+ } else {
+ if (message is !List<int>) {
+ throw new ArgumentError(message);
+ }
+ opcode = _WebSocketOpcode.BINARY;
+ data = message;
+ }
+ } else {
+ opcode = _WebSocketOpcode.TEXT;
+ }
+ addFrame(opcode, data);
+ }
+
+ void addError(Object error, [StackTrace stackTrace]) =>
+ _eventSink.addError(error, stackTrace);
+
+ void close() {
+ int code = webSocket._outCloseCode;
+ String reason = webSocket._outCloseReason;
+ List<int> data;
+ if (code != null) {
+ data = new List<int>();
+ data.add((code >> 8) & 0xFF);
+ data.add(code & 0xFF);
+ if (reason != null) {
+ data.addAll(UTF8.encode(reason));
+ }
+ }
+ addFrame(_WebSocketOpcode.CLOSE, data);
+ _eventSink.close();
+ }
+
+ void addFrame(int opcode, List<int> data) =>
+ createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add);
+
+ static Iterable createFrame(int opcode, List<int> data, bool serverSide) {
+ bool mask = !serverSide; // Masking not implemented for server.
+ int dataLength = data == null ? 0 : data.length;
+ // Determine the header size.
+ int headerSize = (mask) ? 6 : 2;
+ if (dataLength > 65535) {
+ headerSize += 8;
+ } else if (dataLength > 125) {
+ headerSize += 2;
+ }
+ Uint8List header = new Uint8List(headerSize);
+ int index = 0;
+ // Set FIN and opcode.
+ header[index++] = 0x80 | opcode;
+ // Determine size and position of length field.
+ int lengthBytes = 1;
+ if (dataLength > 65535) {
+ header[index++] = 127;
+ lengthBytes = 8;
+ } else if (dataLength > 125) {
+ header[index++] = 126;
+ lengthBytes = 2;
+ }
+ // Write the length in network byte order into the header.
+ for (int i = 0; i < lengthBytes; i++) {
+ header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF;
+ }
+ if (mask) {
+ header[1] |= 1 << 7;
+ var maskBytes = [_random.nextInt(256), _random.nextInt(256),
+ _random.nextInt(256), _random.nextInt(256)];
+ header.setRange(index, index + 4, maskBytes);
+ index += 4;
+ if (data != null) {
+ Uint8List list;
+ // If this is a text message just do the masking inside the
+ // encoded data.
+ if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) {
+ list = data;
+ } else {
+ if (data is Uint8List) {
+ list = new Uint8List.fromList(data);
+ } else {
+ list = new Uint8List(data.length);
+ for (int i = 0; i < data.length; i++) {
+ if (data[i] < 0 || 255 < data[i]) {
+ throw new ArgumentError(
+ "List element is not a byte value "
+ "(value ${data[i]} at index $i)");
+ }
+ list[i] = data[i];
+ }
+ }
+ }
+ const int BLOCK_SIZE = 16;
+ int blockCount = list.length ~/ BLOCK_SIZE;
+ if (blockCount > 0) {
+ // Create mask block.
+ int mask = 0;
+ for (int i = 3; i >= 0; i--) {
+ mask = (mask << 8) | maskBytes[i];
+ }
+ Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
+ Int32x4List blockBuffer = new Int32x4List.view(
+ list.buffer, 0, blockCount);
+ for (int i = 0; i < blockBuffer.length; i++) {
+ blockBuffer[i] ^= blockMask;
+ }
+ }
+ // Handle end.
+ for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) {
+ list[i] ^= maskBytes[i & 3];
+ }
+ data = list;
+ }
+ }
+ assert(index == headerSize);
+ if (data == null) {
+ return [header];
+ } else {
+ return [header, data];
+ }
+ }
+}
+
+
+class _WebSocketConsumer implements StreamConsumer {
+ final WebSocketImpl webSocket;
+ final StreamSink<List<int>> sink;
+ StreamController _controller;
+ StreamSubscription _subscription;
+ bool _issuedPause = false;
+ bool _closed = false;
+ Completer _closeCompleter = new Completer();
+ Completer _completer;
+
+ _WebSocketConsumer(this.webSocket, this.sink);
+
+ void _onListen() {
+ if (_subscription != null) {
+ _subscription.cancel();
+ }
+ }
+
+ void _onPause() {
+ if (_subscription != null) {
+ _subscription.pause();
+ } else {
+ _issuedPause = true;
+ }
+ }
+
+ void _onResume() {
+ if (_subscription != null) {
+ _subscription.resume();
+ } else {
+ _issuedPause = false;
+ }
+ }
+
+ void _cancel() {
+ if (_subscription != null) {
+ var subscription = _subscription;
+ _subscription = null;
+ subscription.cancel();
+ }
+ }
+
+ _ensureController() {
+ if (_controller != null) return;
+ _controller = new StreamController(sync: true,
+ onPause: _onPause,
+ onResume: _onResume,
+ onCancel: _onListen);
+ var stream = _controller.stream.transform(
+ new _WebSocketOutgoingTransformer(webSocket));
+ sink.addStream(stream)
+ .then((_) {
+ _done();
+ _closeCompleter.complete(webSocket);
+ }, onError: (error, StackTrace stackTrace) {
+ _closed = true;
+ _cancel();
+ if (error is ArgumentError) {
+ if (!_done(error, stackTrace)) {
+ _closeCompleter.completeError(error, stackTrace);
+ }
+ } else {
+ _done();
+ _closeCompleter.complete(webSocket);
+ }
+ });
+ }
+
+ bool _done([error, StackTrace stackTrace]) {
+ if (_completer == null) return false;
+ if (error != null) {
+ _completer.completeError(error, stackTrace);
+ } else {
+ _completer.complete(webSocket);
+ }
+ _completer = null;
+ return true;
+ }
+
+ Future addStream(var stream) {
+ if (_closed) {
+ stream.listen(null).cancel();
+ return new Future.value(webSocket);
+ }
+ _ensureController();
+ _completer = new Completer();
+ _subscription = stream.listen(
+ (data) {
+ _controller.add(data);
+ },
+ onDone: _done,
+ onError: _done,
+ cancelOnError: true);
+ if (_issuedPause) {
+ _subscription.pause();
+ _issuedPause = false;
+ }
+ return _completer.future;
+ }
+
+ Future close() {
+ _ensureController();
+ Future closeSocket() {
+ return sink.close().catchError((_) {}).then((_) => webSocket);
+ }
+ _controller.close();
+ return _closeCompleter.future.then((_) => closeSocket());
+ }
+
+ void add(data) {
+ if (_closed) return;
+ _ensureController();
+ _controller.add(data);
+ }
+
+ void closeSocket() {
+ _closed = true;
+ _cancel();
+ close();
+ }
+}
+
+
+class WebSocketImpl extends Stream with _ServiceObject implements StreamSink {
+ // Use default Map so we keep order.
+ static Map<int, WebSocketImpl> _webSockets = new Map<int, WebSocketImpl>();
+
+ final String protocol;
+
+ StreamController _controller;
+ StreamSubscription _subscription;
+ StreamSink _sink;
+
+ final bool _serverSide;
+ int _readyState = WebSocket.CONNECTING;
+ bool _writeClosed = false;
+ int _closeCode;
+ String _closeReason;
+ Duration _pingInterval;
+ Timer _pingTimer;
+ _WebSocketConsumer _consumer;
+
+ int _outCloseCode;
+ String _outCloseReason;
+ Timer _closeTimer;
+
+ WebSocketImpl.fromSocket(Stream<List<int>> stream,
+ StreamSink<List<int>> sink, this.protocol, [this._serverSide = false]) {
+ _consumer = new _WebSocketConsumer(this, sink);
+ _sink = new StreamSinkImpl(_consumer);
+ _readyState = WebSocket.OPEN;
+
+ var transformer = new _WebSocketProtocolTransformer(_serverSide);
+ _subscription = stream.transform(transformer).listen(
+ (data) {
+ if (data is _WebSocketPing) {
+ if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
+ } else if (data is _WebSocketPong) {
+ // Simply set pingInterval, as it'll cancel any timers.
+ pingInterval = _pingInterval;
+ } else {
+ _controller.add(data);
+ }
+ },
+ onError: (error) {
+ if (_closeTimer != null) _closeTimer.cancel();
+ if (error is FormatException) {
+ _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
+ } else {
+ _close(WebSocketStatus.PROTOCOL_ERROR);
+ }
+ // An error happened, set the close code set above.
+ _closeCode = _outCloseCode;
+ _closeReason = _outCloseReason;
+ _controller.close();
+ },
+ onDone: () {
+ if (_closeTimer != null) _closeTimer.cancel();
+ if (_readyState == WebSocket.OPEN) {
+ _readyState = WebSocket.CLOSING;
+ if (!_isReservedStatusCode(transformer.closeCode)) {
+ _close(transformer.closeCode);
+ } else {
+ _close();
+ }
+ _readyState = WebSocket.CLOSED;
+ }
+ // Protocol close, use close code from transformer.
+ _closeCode = transformer.closeCode;
+ _closeReason = transformer.closeReason;
+ _controller.close();
+ },
+ cancelOnError: true);
+ _subscription.pause();
+ _controller = new StreamController(sync: true,
+ onListen: () => _subscription.resume(),
+ onCancel: () {
+ _subscription.cancel();
+ _subscription = null;
+ },
+ onPause: _subscription.pause,
+ onResume: _subscription.resume);
+
+ _webSockets[_serviceId] = this;
+ }
+
+ StreamSubscription listen(void onData(message),
+ {Function onError,
+ void onDone(),
+ bool cancelOnError}) {
+ return _controller.stream.listen(onData,
+ onError: onError,
+ onDone: onDone,
+ cancelOnError: cancelOnError);
+ }
+
+ Duration get pingInterval => _pingInterval;
+
+ void set pingInterval(Duration interval) {
+ if (_writeClosed) return;
+ if (_pingTimer != null) _pingTimer.cancel();
+ _pingInterval = interval;
+
+ if (_pingInterval == null) return;
+
+ _pingTimer = new Timer(_pingInterval, () {
+ if (_writeClosed) return;
+ _consumer.add(new _WebSocketPing());
+ _pingTimer = new Timer(_pingInterval, () {
+ // No pong received.
+ _close(WebSocketStatus.GOING_AWAY);
+ });
+ });
+ }
+
+ int get readyState => _readyState;
+
+ String get extensions => null;
+ int get closeCode => _closeCode;
+ String get closeReason => _closeReason;
+
+ void add(data) => _sink.add(data);
+ void addError(error, [StackTrace stackTrace]) =>
+ _sink.addError(error, stackTrace);
+ Future addStream(Stream stream) => _sink.addStream(stream);
+ Future get done => _sink.done;
+
+ Future close([int code, String reason]) {
+ if (_isReservedStatusCode(code)) {
+ throw new WebSocketChannelException("Reserved status code $code");
+ }
+ if (_outCloseCode == null) {
+ _outCloseCode = code;
+ _outCloseReason = reason;
+ }
+ if (!_controller.isClosed) {
+ // If a close has not yet been received from the other end then
+ // 1) make sure to listen on the stream so the close frame will be
+ // processed if received.
+ // 2) set a timer terminate the connection if a close frame is
+ // not received.
+ if (!_controller.hasListener && _subscription != null) {
+ _controller.stream.drain().catchError((_) => {});
+ }
+ if (_closeTimer == null) {
+ // When closing the web-socket, we no longer accept data.
+ _closeTimer = new Timer(const Duration(seconds: 5), () {
+ // Reuse code and reason from the local close.
+ _closeCode = _outCloseCode;
+ _closeReason = _outCloseReason;
+ if (_subscription != null) _subscription.cancel();
+ _controller.close();
+ _webSockets.remove(_serviceId);
+ });
+ }
+ }
+ return _sink.close();
+ }
+
+ void _close([int code, String reason]) {
+ if (_writeClosed) return;
+ if (_outCloseCode == null) {
+ _outCloseCode = code;
+ _outCloseReason = reason;
+ }
+ _writeClosed = true;
+ _consumer.closeSocket();
+ _webSockets.remove(_serviceId);
+ }
+
+ // The _toJSON, _serviceTypePath, and _serviceTypeName methods
+ // have been deleted for http_parser. The methods were unused in WebSocket
+ // code and produced warnings.
+
+ static bool _isReservedStatusCode(int code) {
+ return code != null &&
+ (code < WebSocketStatus.NORMAL_CLOSURE ||
+ code == WebSocketStatus.RESERVED_1004 ||
+ code == WebSocketStatus.NO_STATUS_RECEIVED ||
+ code == WebSocketStatus.ABNORMAL_CLOSURE ||
+ (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
+ code < WebSocketStatus.RESERVED_1015) ||
+ (code >= WebSocketStatus.RESERVED_1015 &&
+ code < 3000));
+ }
+}
+
+// The following code is from sdk/lib/io/service_object.dart.
+
+int _nextServiceId = 1;
+
+// TODO(ajohnsen): Use other way of getting a uniq id.
+abstract class _ServiceObject {
+ int __serviceId = 0;
+ int get _serviceId {
+ if (__serviceId == 0) __serviceId = _nextServiceId++;
+ return __serviceId;
+ }
+
+ // The _toJSON, _servicePath, _serviceTypePath, _serviceTypeName, and
+ // _serviceType methods have been deleted for http_parser. The methods were
+ // unused in WebSocket code and produced warnings.
+}
diff --git a/lib/src/exception.dart b/lib/src/exception.dart
new file mode 100644
index 0000000..d06dc60
--- /dev/null
+++ b/lib/src/exception.dart
@@ -0,0 +1,19 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'channel.dart';
+
+/// An exception thrown by a [WebSocketChannel].
+class WebSocketChannelException implements Exception {
+ final String message;
+
+ /// The exception that caused this one, if available.
+ final inner;
+
+ WebSocketChannelException([this.message, this.inner]);
+
+ String toString() => message == null
+ ? "WebSocketChannelException" :
+ "WebSocketChannelException: $message";
+}
diff --git a/lib/web_socket_channel.dart b/lib/web_socket_channel.dart
new file mode 100644
index 0000000..299604c
--- /dev/null
+++ b/lib/web_socket_channel.dart
@@ -0,0 +1,6 @@
+// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+export 'src/channel.dart';
+export 'src/exception.dart';
diff --git a/pubspec.yaml b/pubspec.yaml
index 1d45b6f..c6ad187 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -7,5 +7,10 @@
environment:
sdk: '>=1.0.0 <2.0.0'
+dependencies:
+ async: '^1.3.0'
+ crypto: '^0.9.0'
+ stream_channel: '^1.2.0'
+
dev_dependencies:
test: '^0.12.0'
diff --git a/test/web_socket_test.dart b/test/web_socket_test.dart
new file mode 100644
index 0000000..a105b67
--- /dev/null
+++ b/test/web_socket_test.dart
@@ -0,0 +1,97 @@
+// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+@TestOn('vm')
+
+import 'dart:io';
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+import 'package:web_socket_channel/web_socket_channel.dart';
+
+void main() {
+ group("using WebSocketChannel", () {
+ test("a client can communicate with a WebSocket server", () async {
+ var server = await HttpServer.bind("localhost", 0);
+ server.transform(new WebSocketTransformer()).listen((webSocket) {
+ webSocket.add("hello!");
+ webSocket.listen((request) {
+ expect(request, equals("ping"));
+ webSocket.add("pong");
+ webSocket.close();
+ });
+ });
+
+ var client = new HttpClient();
+ var request = await client.openUrl(
+ "GET", Uri.parse("http://localhost:${server.port}"));
+ request.headers
+ ..set("Connection", "Upgrade")
+ ..set("Upgrade", "websocket")
+ ..set("Sec-WebSocket-Key", "x3JJHMbDL1EzLkh9GBhXDw==")
+ ..set("Sec-WebSocket-Version", "13");
+
+ var response = await request.close();
+ var socket = await response.detachSocket();
+ var innerChannel = new StreamChannel(socket, socket);
+ var webSocket = new WebSocketChannel(innerChannel, serverSide: false);
+
+ var n = 0;
+ await webSocket.stream.listen((message) {
+ if (n == 0) {
+ expect(message, equals("hello!"));
+ webSocket.sink.add("ping");
+ } else if (n == 1) {
+ expect(message, equals("pong"));
+ webSocket.sink.close();
+ server.close();
+ } else {
+ fail("Only expected two messages.");
+ }
+ n++;
+ }).asFuture();
+ });
+
+ test("a server can communicate with a WebSocket client", () async {
+ var server = await HttpServer.bind("localhost", 0);
+ server.listen((request) async {
+ var response = request.response;
+ response.statusCode = 101;
+ response.headers
+ ..set("Connection", "Upgrade")
+ ..set("Upgrade", "websocket")
+ ..set("Sec-WebSocket-Accept", WebSocketChannel
+ .signKey(request.headers.value('Sec-WebSocket-Key')));
+ response.contentLength = 0;
+
+ var socket = await response.detachSocket();
+ var innerChannel = new StreamChannel(socket, socket);
+ var webSocket = new WebSocketChannel(innerChannel);
+ webSocket.sink.add("hello!");
+
+ var message = await webSocket.stream.first;
+ expect(message, equals("ping"));
+ webSocket.sink.add("pong");
+ webSocket.sink.close();
+ });
+
+ var webSocket = await WebSocket.connect('ws://localhost:${server.port}');
+ var n = 0;
+ await webSocket.listen((message) {
+ if (n == 0) {
+ expect(message, equals("hello!"));
+ webSocket.add("ping");
+ } else if (n == 1) {
+ expect(message, equals("pong"));
+ webSocket.close();
+ server.close();
+ } else {
+ fail("Only expected two messages.");
+ }
+ n++;
+ }).asFuture();
+ });
+ });
+}