Copy the web socket stuff from http_parser. I'm moving this to a separate package where it makes sense to also have implementations of WebSocketChannel that wrap the dart:io and dart:html WebSocket classes. This will make it a lot easier for users to use libraries (such as json_rpc_2) that require StreamChannels with well-behaved socket channels. R=kevmoo@google.com Review URL: https://codereview.chromium.org//1734773002 .
diff --git a/pkgs/web_socket_channel/codereview.settings b/pkgs/web_socket_channel/codereview.settings new file mode 100644 index 0000000..9770fa1 --- /dev/null +++ b/pkgs/web_socket_channel/codereview.settings
@@ -0,0 +1,3 @@ +CODE_REVIEW_SERVER: http://codereview.chromium.org/ +VIEW_VC: https://github.com/dart-lang/web_socket_channel/commit/ +CC_LIST: reviews@dartlang.org \ No newline at end of file
diff --git a/pkgs/web_socket_channel/lib/src/channel.dart b/pkgs/web_socket_channel/lib/src/channel.dart new file mode 100644 index 0000000..a8219a1 --- /dev/null +++ b/pkgs/web_socket_channel/lib/src/channel.dart
@@ -0,0 +1,127 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:async/async.dart'; +import 'package:crypto/crypto.dart'; +import 'package:stream_channel/stream_channel.dart'; + +import 'copy/web_socket_impl.dart'; + +/// A [StreamChannel] that communicates over a WebSocket. +/// +/// This is implemented by classes that use `dart:io` and `dart:html`. The [new +/// WebSocketChannel] constructor can also be used on any platform to connect to +/// use the WebSocket protocol over a pre-existing channel. +/// +/// All implementations emit [WebSocketChannelException]s. These exceptions wrap +/// the native exception types where possible. +class WebSocketChannel extends StreamChannelMixin { + /// The underlying web socket. + /// + /// This is essentially a copy of `dart:io`'s WebSocket implementation, with + /// the IO-specific pieces factored out. + final WebSocketImpl _webSocket; + + /// The interval for sending ping signals. + /// + /// If a ping message is not answered by a pong message from the peer, the + /// `WebSocket` is assumed disconnected and the connection is closed with a + /// [WebSocketStatus.GOING_AWAY] close code. When a ping signal is sent, the + /// pong message must be received within [pingInterval]. + /// + /// There are never two outstanding pings at any given time, and the next ping + /// timer starts when the pong is received. + /// + /// By default, the [pingInterval] is `null`, indicating that ping messages + /// are disabled. Some implementations may not support setting it. + Duration get pingInterval => _webSocket.pingInterval; + set pingInterval(Duration value) => _webSocket.pingInterval = value; + + /// The subprotocol selected by the server. + /// + /// For a client socket, this is initially `null`. After the WebSocket + /// connection is established the value is set to the subprotocol selected by + /// the server. If no subprotocol is negotiated the value will remain `null`. + String get protocol => _webSocket.protocol; + + /// The [close code][] set when the WebSocket connection is closed. + /// + /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5 + /// + /// Before the connection has been closed, this will be `null`. + int get closeCode => _webSocket.closeCode; + + /// The [close reason][] set when the WebSocket connection is closed. + /// + /// [close reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6 + /// + /// Before the connection has been closed, this will be `null`. + String get closeReason => _webSocket.closeReason; + + Stream get stream => new StreamView(_webSocket); + + /// The sink for sending values to the other endpoint. + /// + /// This supports additional arguments to [WebSocketSink.close] that provide + /// the remote endpoint reasons for closing the connection. + WebSocketSink get sink => new WebSocketSink._(_webSocket); + + /// Signs a `Sec-WebSocket-Key` header sent by a WebSocket client as part of + /// the [initial handshake]. + /// + /// The return value should be sent back to the client in a + /// `Sec-WebSocket-Accept` header. + /// + /// [initial handshake]: https://tools.ietf.org/html/rfc6455#section-4.2.2 + static String signKey(String key) { + var hash = new SHA1(); + // We use [codeUnits] here rather than UTF-8-decoding the string because + // [key] is expected to be base64 encoded, and so will be pure ASCII. + hash.add((key + webSocketGUID).codeUnits); + return CryptoUtils.bytesToBase64(hash.close()); + } + + /// Creates a new WebSocket handling messaging across an existing [channel]. + /// + /// This is a cross-platform constructor; it doesn't use either `dart:io` or + /// `dart:html`. It's also HTTP-API-agnostic, which means that the initial + /// [WebSocket handshake][] must have already been completed on the socket + /// before this is called. + /// + /// [protocol] should be the protocol negotiated by this handshake, if any. + /// + /// If this is a WebSocket server, [serverSide] should be `true` (the + /// default); if it's a client, [serverSide] should be `false`. + /// + /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4 + WebSocketChannel(StreamChannel<List<int>> channel, + {String protocol, bool serverSide: true}) + : _webSocket = new WebSocketImpl.fromSocket( + channel.stream, channel.sink, protocol, serverSide); +} + +/// The sink exposed by a [WebSocketChannel]. +/// +/// This is like a normal [StreamSink], except that it supports extra arguments +/// to [close]. +class WebSocketSink extends DelegatingStreamSink { + final WebSocketImpl _webSocket; + + WebSocketSink._(WebSocketImpl webSocket) + : super(webSocket), + _webSocket = webSocket; + + /// Closes the web socket connection. + /// + /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent + /// to the remote peer, respectively. If they are omitted, the peer will see + /// a "no status received" code with no reason. + /// + /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5 + /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6 + Future close([int closeCode, String closeReason]) => + _webSocket.close(closeCode, closeReason); +}
diff --git a/pkgs/web_socket_channel/lib/src/copy/bytes_builder.dart b/pkgs/web_socket_channel/lib/src/copy/bytes_builder.dart new file mode 100644 index 0000000..39d44fe --- /dev/null +++ b/pkgs/web_socket_channel/lib/src/copy/bytes_builder.dart
@@ -0,0 +1,215 @@ +// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +// This is a copy of "dart:io"'s BytesBuilder implementation, from +// sdk/lib/io/bytes_builder.dart. It's copied here to make it available to +// non-"dart:io" applications (issue 18348). +// +// Because it's copied directly, there are no modifications from the original. +// +// This is up-to-date as of sdk revision +// 86227840d75d974feb238f8b3c59c038b99c05cf. +import 'dart:math'; +import 'dart:typed_data'; + +/** + * Builds a list of bytes, allowing bytes and lists of bytes to be added at the + * end. + * + * Used to efficiently collect bytes and lists of bytes. + */ +abstract class BytesBuilder { + /** + * Construct a new empty [BytesBuilder]. + * + * If [copy] is true, the data is always copied when added to the list. If + * it [copy] is false, the data is only copied if needed. That means that if + * the lists are changed after added to the [BytesBuilder], it may effect the + * output. Default is `true`. + */ + factory BytesBuilder({bool copy: true}) { + if (copy) { + return new _CopyingBytesBuilder(); + } else { + return new _BytesBuilder(); + } + } + + /** + * Appends [bytes] to the current contents of the builder. + * + * Each value of [bytes] will be bit-representation truncated to the range + * 0 .. 255. + */ + void add(List<int> bytes); + + /** + * Append [byte] to the current contents of the builder. + * + * The [byte] will be bit-representation truncated to the range 0 .. 255. + */ + void addByte(int byte); + + /** + * Returns the contents of `this` and clears `this`. + * + * The list returned is a view of the the internal buffer, limited to the + * [length]. + */ + List<int> takeBytes(); + + /** + * Returns a copy of the current contents of the builder. + * + * Leaves the contents of the builder intact. + */ + List<int> toBytes(); + + /** + * The number of bytes in the builder. + */ + int get length; + + /** + * Returns `true` if the buffer is empty. + */ + bool get isEmpty; + + /** + * Returns `true` if the buffer is not empty. + */ + bool get isNotEmpty; + + /** + * Clear the contents of the builder. + */ + void clear(); +} + + +class _CopyingBytesBuilder implements BytesBuilder { + // Start with 1024 bytes. + static const int _INIT_SIZE = 1024; + + int _length = 0; + Uint8List _buffer; + + void add(List<int> bytes) { + int bytesLength = bytes.length; + if (bytesLength == 0) return; + int required = _length + bytesLength; + if (_buffer == null) { + int size = _pow2roundup(required); + size = max(size, _INIT_SIZE); + _buffer = new Uint8List(size); + } else if (_buffer.length < required) { + // We will create a list in the range of 2-4 times larger than + // required. + int size = _pow2roundup(required) * 2; + var newBuffer = new Uint8List(size); + newBuffer.setRange(0, _buffer.length, _buffer); + _buffer = newBuffer; + } + assert(_buffer.length >= required); + if (bytes is Uint8List) { + _buffer.setRange(_length, required, bytes); + } else { + for (int i = 0; i < bytesLength; i++) { + _buffer[_length + i] = bytes[i]; + } + } + _length = required; + } + + void addByte(int byte) => add([byte]); + + List<int> takeBytes() { + if (_buffer == null) return new Uint8List(0); + var buffer = new Uint8List.view(_buffer.buffer, 0, _length); + clear(); + return buffer; + } + + List<int> toBytes() { + if (_buffer == null) return new Uint8List(0); + return new Uint8List.fromList( + new Uint8List.view(_buffer.buffer, 0, _length)); + } + + int get length => _length; + + bool get isEmpty => _length == 0; + + bool get isNotEmpty => _length != 0; + + void clear() { + _length = 0; + _buffer = null; + } + + int _pow2roundup(int x) { + --x; + x |= x >> 1; + x |= x >> 2; + x |= x >> 4; + x |= x >> 8; + x |= x >> 16; + return x + 1; + } +} + + +class _BytesBuilder implements BytesBuilder { + int _length = 0; + final List _chunks = []; + + void add(List<int> bytes) { + if (bytes is! Uint8List) { + bytes = new Uint8List.fromList(bytes); + } + _chunks.add(bytes); + _length += bytes.length; + } + + void addByte(int byte) => add([byte]); + + List<int> takeBytes() { + if (_chunks.length == 0) return new Uint8List(0); + if (_chunks.length == 1) { + var buffer = _chunks.single; + clear(); + return buffer; + } + var buffer = new Uint8List(_length); + int offset = 0; + for (var chunk in _chunks) { + buffer.setRange(offset, offset + chunk.length, chunk); + offset += chunk.length; + } + clear(); + return buffer; + } + + List<int> toBytes() { + if (_chunks.length == 0) return new Uint8List(0); + var buffer = new Uint8List(_length); + int offset = 0; + for (var chunk in _chunks) { + buffer.setRange(offset, offset + chunk.length, chunk); + offset += chunk.length; + } + return buffer; + } + + int get length => _length; + + bool get isEmpty => _length == 0; + + bool get isNotEmpty => _length != 0; + + void clear() { + _length = 0; + _chunks.clear(); + } +}
diff --git a/pkgs/web_socket_channel/lib/src/copy/io_sink.dart b/pkgs/web_socket_channel/lib/src/copy/io_sink.dart new file mode 100644 index 0000000..0578bdb --- /dev/null +++ b/pkgs/web_socket_channel/lib/src/copy/io_sink.dart
@@ -0,0 +1,145 @@ +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +// The following code is copied from sdk/lib/io/io_sink.dart. The "dart:io" +// implementation isn't used directly to support non-"dart:io" applications. +// +// Because it's copied directly, only modifications necessary to support the +// desired public API and to remove "dart:io" dependencies have been made. +// +// This is up-to-date as of sdk revision +// 86227840d75d974feb238f8b3c59c038b99c05cf. +import 'dart:async'; + +class StreamSinkImpl<T> implements StreamSink<T> { + final StreamConsumer<T> _target; + Completer _doneCompleter = new Completer(); + Future _doneFuture; + StreamController<T> _controllerInstance; + Completer _controllerCompleter; + bool _isClosed = false; + bool _isBound = false; + bool _hasError = false; + + StreamSinkImpl(this._target) { + _doneFuture = _doneCompleter.future; + } + + void add(T data) { + if (_isClosed) return; + _controller.add(data); + } + + void addError(error, [StackTrace stackTrace]) { + _controller.addError(error, stackTrace); + } + + Future addStream(Stream<T> stream) { + if (_isBound) { + throw new StateError("StreamSink is already bound to a stream"); + } + _isBound = true; + if (_hasError) return done; + // Wait for any sync operations to complete. + Future targetAddStream() { + return _target.addStream(stream) + .whenComplete(() { + _isBound = false; + }); + } + if (_controllerInstance == null) return targetAddStream(); + var future = _controllerCompleter.future; + _controllerInstance.close(); + return future.then((_) => targetAddStream()); + } + + Future flush() { + if (_isBound) { + throw new StateError("StreamSink is bound to a stream"); + } + if (_controllerInstance == null) return new Future.value(this); + // Adding an empty stream-controller will return a future that will complete + // when all data is done. + _isBound = true; + var future = _controllerCompleter.future; + _controllerInstance.close(); + return future.whenComplete(() { + _isBound = false; + }); + } + + Future close() { + if (_isBound) { + throw new StateError("StreamSink is bound to a stream"); + } + if (!_isClosed) { + _isClosed = true; + if (_controllerInstance != null) { + _controllerInstance.close(); + } else { + _closeTarget(); + } + } + return done; + } + + void _closeTarget() { + _target.close().then(_completeDoneValue, onError: _completeDoneError); + } + + Future get done => _doneFuture; + + void _completeDoneValue(value) { + if (_doneCompleter == null) return; + _doneCompleter.complete(value); + _doneCompleter = null; + } + + void _completeDoneError(error, StackTrace stackTrace) { + if (_doneCompleter == null) return; + _hasError = true; + _doneCompleter.completeError(error, stackTrace); + _doneCompleter = null; + } + + StreamController<T> get _controller { + if (_isBound) { + throw new StateError("StreamSink is bound to a stream"); + } + if (_isClosed) { + throw new StateError("StreamSink is closed"); + } + if (_controllerInstance == null) { + _controllerInstance = new StreamController<T>(sync: true); + _controllerCompleter = new Completer(); + _target.addStream(_controller.stream) + .then( + (_) { + if (_isBound) { + // A new stream takes over - forward values to that stream. + _controllerCompleter.complete(this); + _controllerCompleter = null; + _controllerInstance = null; + } else { + // No new stream, .close was called. Close _target. + _closeTarget(); + } + }, + onError: (error, stackTrace) { + if (_isBound) { + // A new stream takes over - forward errors to that stream. + _controllerCompleter.completeError(error, stackTrace); + _controllerCompleter = null; + _controllerInstance = null; + } else { + // No new stream. No need to close target, as it have already + // failed. + _completeDoneError(error, stackTrace); + } + }); + } + return _controllerInstance; + } +} +
diff --git a/pkgs/web_socket_channel/lib/src/copy/web_socket.dart b/pkgs/web_socket_channel/lib/src/copy/web_socket.dart new file mode 100644 index 0000000..53460ba --- /dev/null +++ b/pkgs/web_socket_channel/lib/src/copy/web_socket.dart
@@ -0,0 +1,40 @@ +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +// The following code is copied from sdk/lib/io/websocket.dart. The "dart:io" +// implementation isn't used directly to support non-"dart:io" applications. +// +// Because it's copied directly, only modifications necessary to support the +// desired public API and to remove "dart:io" dependencies have been made. +// +// This is up-to-date as of sdk revision +// 86227840d75d974feb238f8b3c59c038b99c05cf. +/** + * Web socket status codes used when closing a web socket connection. + */ +abstract class WebSocketStatus { + static const int NORMAL_CLOSURE = 1000; + static const int GOING_AWAY = 1001; + static const int PROTOCOL_ERROR = 1002; + static const int UNSUPPORTED_DATA = 1003; + static const int RESERVED_1004 = 1004; + static const int NO_STATUS_RECEIVED = 1005; + static const int ABNORMAL_CLOSURE = 1006; + static const int INVALID_FRAME_PAYLOAD_DATA = 1007; + static const int POLICY_VIOLATION = 1008; + static const int MESSAGE_TOO_BIG = 1009; + static const int MISSING_MANDATORY_EXTENSION = 1010; + static const int INTERNAL_SERVER_ERROR = 1011; + static const int RESERVED_1015 = 1015; +} + +abstract class WebSocket { + /** + * Possible states of the connection. + */ + static const int CONNECTING = 0; + static const int OPEN = 1; + static const int CLOSING = 2; + static const int CLOSED = 3; +}
diff --git a/pkgs/web_socket_channel/lib/src/copy/web_socket_impl.dart b/pkgs/web_socket_channel/lib/src/copy/web_socket_impl.dart new file mode 100644 index 0000000..c10d8e8 --- /dev/null +++ b/pkgs/web_socket_channel/lib/src/copy/web_socket_impl.dart
@@ -0,0 +1,860 @@ +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +// The following code is copied from sdk/lib/io/websocket_impl.dart. The +// "dart:io" implementation isn't used directly to support non-"dart:io" +// applications. +// +// Because it's copied directly, only modifications necessary to support the +// desired public API and to remove "dart:io" dependencies have been made. +// +// This is up-to-date as of sdk revision +// 86227840d75d974feb238f8b3c59c038b99c05cf. +import 'dart:async'; +import 'dart:convert'; +import 'dart:math'; +import 'dart:typed_data'; + +import '../exception.dart'; +import 'bytes_builder.dart'; +import 'io_sink.dart'; +import 'web_socket.dart'; + +const String webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + +final _random = new Random(); + +// Matches _WebSocketOpcode. +class _WebSocketMessageType { + static const int NONE = 0; + static const int TEXT = 1; + static const int BINARY = 2; +} + + +class _WebSocketOpcode { + static const int CONTINUATION = 0; + static const int TEXT = 1; + static const int BINARY = 2; + static const int RESERVED_3 = 3; + static const int RESERVED_4 = 4; + static const int RESERVED_5 = 5; + static const int RESERVED_6 = 6; + static const int RESERVED_7 = 7; + static const int CLOSE = 8; + static const int PING = 9; + static const int PONG = 10; + static const int RESERVED_B = 11; + static const int RESERVED_C = 12; + static const int RESERVED_D = 13; + static const int RESERVED_E = 14; + static const int RESERVED_F = 15; +} + +/** + * The web socket protocol transformer handles the protocol byte stream + * which is supplied through the [:handleData:]. As the protocol is processed, + * it'll output frame data as either a List<int> or String. + * + * Important infomation about usage: Be sure you use cancelOnError, so the + * socket will be closed when the processer encounter an error. Not using it + * will lead to undefined behaviour. + */ +// TODO(ajohnsen): make this transformer reusable? +class _WebSocketProtocolTransformer implements StreamTransformer, EventSink { + static const int START = 0; + static const int LEN_FIRST = 1; + static const int LEN_REST = 2; + static const int MASK = 3; + static const int PAYLOAD = 4; + static const int CLOSED = 5; + static const int FAILURE = 6; + + int _state = START; + bool _fin = false; + int _opcode = -1; + int _len = -1; + bool _masked = false; + int _remainingLenBytes = -1; + int _remainingMaskingKeyBytes = 4; + int _remainingPayloadBytes = -1; + int _unmaskingIndex = 0; + int _currentMessageType = _WebSocketMessageType.NONE; + int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; + String closeReason = ""; + + EventSink _eventSink; + + final bool _serverSide; + final List _maskingBytes = new List(4); + final BytesBuilder _payload = new BytesBuilder(copy: false); + + _WebSocketProtocolTransformer([this._serverSide = false]); + + Stream bind(Stream stream) { + return new Stream.eventTransformed( + stream, + (EventSink eventSink) { + if (_eventSink != null) { + throw new StateError("WebSocket transformer already used."); + } + _eventSink = eventSink; + return this; + }); + } + + void addError(Object error, [StackTrace stackTrace]) => + _eventSink.addError(error, stackTrace); + + void close() => _eventSink.close(); + + /** + * Process data received from the underlying communication channel. + */ + void add(Uint8List buffer) { + int count = buffer.length; + int index = 0; + int lastIndex = count; + if (_state == CLOSED) { + throw new WebSocketChannelException("Data on closed connection"); + } + if (_state == FAILURE) { + throw new WebSocketChannelException("Data on failed connection"); + } + while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { + int byte = buffer[index]; + if (_state <= LEN_REST) { + if (_state == START) { + _fin = (byte & 0x80) != 0; + if ((byte & 0x70) != 0) { + // The RSV1, RSV2 bits RSV3 must be all zero. + throw new WebSocketChannelException("Protocol error"); + } + _opcode = (byte & 0xF); + if (_opcode <= _WebSocketOpcode.BINARY) { + if (_opcode == _WebSocketOpcode.CONTINUATION) { + if (_currentMessageType == _WebSocketMessageType.NONE) { + throw new WebSocketChannelException("Protocol error"); + } + } else { + assert(_opcode == _WebSocketOpcode.TEXT || + _opcode == _WebSocketOpcode.BINARY); + if (_currentMessageType != _WebSocketMessageType.NONE) { + throw new WebSocketChannelException("Protocol error"); + } + _currentMessageType = _opcode; + } + } else if (_opcode >= _WebSocketOpcode.CLOSE && + _opcode <= _WebSocketOpcode.PONG) { + // Control frames cannot be fragmented. + if (!_fin) throw new WebSocketChannelException("Protocol error"); + } else { + throw new WebSocketChannelException("Protocol error"); + } + _state = LEN_FIRST; + } else if (_state == LEN_FIRST) { + _masked = (byte & 0x80) != 0; + _len = byte & 0x7F; + if (_isControlFrame() && _len > 125) { + throw new WebSocketChannelException("Protocol error"); + } + if (_len == 126) { + _len = 0; + _remainingLenBytes = 2; + _state = LEN_REST; + } else if (_len == 127) { + _len = 0; + _remainingLenBytes = 8; + _state = LEN_REST; + } else { + assert(_len < 126); + _lengthDone(); + } + } else { + assert(_state == LEN_REST); + _len = _len << 8 | byte; + _remainingLenBytes--; + if (_remainingLenBytes == 0) { + _lengthDone(); + } + } + } else { + if (_state == MASK) { + _maskingBytes[4 - _remainingMaskingKeyBytes--] = byte; + if (_remainingMaskingKeyBytes == 0) { + _maskDone(); + } + } else { + assert(_state == PAYLOAD); + // The payload is not handled one byte at a time but in blocks. + int payloadLength = min(lastIndex - index, _remainingPayloadBytes); + _remainingPayloadBytes -= payloadLength; + // Unmask payload if masked. + if (_masked) { + _unmask(index, payloadLength, buffer); + } + // Control frame and data frame share _payloads. + _payload.add( + new Uint8List.view(buffer.buffer, index, payloadLength)); + index += payloadLength; + if (_isControlFrame()) { + if (_remainingPayloadBytes == 0) _controlFrameEnd(); + } else { + if (_currentMessageType != _WebSocketMessageType.TEXT && + _currentMessageType != _WebSocketMessageType.BINARY) { + throw new WebSocketChannelException("Protocol error"); + } + if (_remainingPayloadBytes == 0) _messageFrameEnd(); + } + + // Hack - as we always do index++ below. + index--; + } + } + + // Move to the next byte. + index++; + } + } + + void _unmask(int index, int length, Uint8List buffer) { + const int BLOCK_SIZE = 16; + // Skip Int32x4-version if message is small. + if (length >= BLOCK_SIZE) { + // Start by aligning to 16 bytes. + final int startOffset = BLOCK_SIZE - (index & 15); + final int end = index + startOffset; + for (int i = index; i < end; i++) { + buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; + } + index += startOffset; + length -= startOffset; + final int blockCount = length ~/ BLOCK_SIZE; + if (blockCount > 0) { + // Create mask block. + int mask = 0; + for (int i = 3; i >= 0; i--) { + mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; + } + Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); + Int32x4List blockBuffer = new Int32x4List.view( + buffer.buffer, index, blockCount); + for (int i = 0; i < blockBuffer.length; i++) { + blockBuffer[i] ^= blockMask; + } + final int bytes = blockCount * BLOCK_SIZE; + index += bytes; + length -= bytes; + } + } + // Handle end. + final int end = index + length; + for (int i = index; i < end; i++) { + buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; + } + } + + void _lengthDone() { + if (_masked) { + if (!_serverSide) { + throw new WebSocketChannelException( + "Received masked frame from server"); + } + _state = MASK; + } else { + if (_serverSide) { + throw new WebSocketChannelException( + "Received unmasked frame from client"); + } + _remainingPayloadBytes = _len; + _startPayload(); + } + } + + void _maskDone() { + _remainingPayloadBytes = _len; + _startPayload(); + } + + void _startPayload() { + // If there is no actual payload perform perform callbacks without + // going through the PAYLOAD state. + if (_remainingPayloadBytes == 0) { + if (_isControlFrame()) { + switch (_opcode) { + case _WebSocketOpcode.CLOSE: + _state = CLOSED; + _eventSink.close(); + break; + case _WebSocketOpcode.PING: + _eventSink.add(new _WebSocketPing()); + break; + case _WebSocketOpcode.PONG: + _eventSink.add(new _WebSocketPong()); + break; + } + _prepareForNextFrame(); + } else { + _messageFrameEnd(); + } + } else { + _state = PAYLOAD; + } + } + + void _messageFrameEnd() { + if (_fin) { + switch (_currentMessageType) { + case _WebSocketMessageType.TEXT: + _eventSink.add(UTF8.decode(_payload.takeBytes())); + break; + case _WebSocketMessageType.BINARY: + _eventSink.add(_payload.takeBytes()); + break; + } + _currentMessageType = _WebSocketMessageType.NONE; + } + _prepareForNextFrame(); + } + + void _controlFrameEnd() { + switch (_opcode) { + case _WebSocketOpcode.CLOSE: + closeCode = WebSocketStatus.NO_STATUS_RECEIVED; + var payload = _payload.takeBytes(); + if (payload.length > 0) { + if (payload.length == 1) { + throw new WebSocketChannelException("Protocol error"); + } + closeCode = payload[0] << 8 | payload[1]; + if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) { + throw new WebSocketChannelException("Protocol error"); + } + if (payload.length > 2) { + closeReason = UTF8.decode(payload.sublist(2)); + } + } + _state = CLOSED; + _eventSink.close(); + break; + + case _WebSocketOpcode.PING: + _eventSink.add(new _WebSocketPing(_payload.takeBytes())); + break; + + case _WebSocketOpcode.PONG: + _eventSink.add(new _WebSocketPong(_payload.takeBytes())); + break; + } + _prepareForNextFrame(); + } + + bool _isControlFrame() { + return _opcode == _WebSocketOpcode.CLOSE || + _opcode == _WebSocketOpcode.PING || + _opcode == _WebSocketOpcode.PONG; + } + + void _prepareForNextFrame() { + if (_state != CLOSED && _state != FAILURE) _state = START; + _fin = false; + _opcode = -1; + _len = -1; + _remainingLenBytes = -1; + _remainingMaskingKeyBytes = 4; + _remainingPayloadBytes = -1; + _unmaskingIndex = 0; + } +} + + +class _WebSocketPing { + final List<int> payload; + _WebSocketPing([this.payload = null]); +} + + +class _WebSocketPong { + final List<int> payload; + _WebSocketPong([this.payload = null]); +} + +// TODO(ajohnsen): Make this transformer reusable. +class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink { + final WebSocketImpl webSocket; + EventSink _eventSink; + + _WebSocketOutgoingTransformer(this.webSocket); + + Stream bind(Stream stream) { + return new Stream.eventTransformed( + stream, + (EventSink eventSink) { + if (_eventSink != null) { + throw new StateError("WebSocket transformer already used"); + } + _eventSink = eventSink; + return this; + }); + } + + void add(message) { + if (message is _WebSocketPong) { + addFrame(_WebSocketOpcode.PONG, message.payload); + return; + } + if (message is _WebSocketPing) { + addFrame(_WebSocketOpcode.PING, message.payload); + return; + } + List<int> data; + int opcode; + if (message != null) { + if (message is String) { + opcode = _WebSocketOpcode.TEXT; + data = UTF8.encode(message); + } else { + if (message is !List<int>) { + throw new ArgumentError(message); + } + opcode = _WebSocketOpcode.BINARY; + data = message; + } + } else { + opcode = _WebSocketOpcode.TEXT; + } + addFrame(opcode, data); + } + + void addError(Object error, [StackTrace stackTrace]) => + _eventSink.addError(error, stackTrace); + + void close() { + int code = webSocket._outCloseCode; + String reason = webSocket._outCloseReason; + List<int> data; + if (code != null) { + data = new List<int>(); + data.add((code >> 8) & 0xFF); + data.add(code & 0xFF); + if (reason != null) { + data.addAll(UTF8.encode(reason)); + } + } + addFrame(_WebSocketOpcode.CLOSE, data); + _eventSink.close(); + } + + void addFrame(int opcode, List<int> data) => + createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add); + + static Iterable createFrame(int opcode, List<int> data, bool serverSide) { + bool mask = !serverSide; // Masking not implemented for server. + int dataLength = data == null ? 0 : data.length; + // Determine the header size. + int headerSize = (mask) ? 6 : 2; + if (dataLength > 65535) { + headerSize += 8; + } else if (dataLength > 125) { + headerSize += 2; + } + Uint8List header = new Uint8List(headerSize); + int index = 0; + // Set FIN and opcode. + header[index++] = 0x80 | opcode; + // Determine size and position of length field. + int lengthBytes = 1; + if (dataLength > 65535) { + header[index++] = 127; + lengthBytes = 8; + } else if (dataLength > 125) { + header[index++] = 126; + lengthBytes = 2; + } + // Write the length in network byte order into the header. + for (int i = 0; i < lengthBytes; i++) { + header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; + } + if (mask) { + header[1] |= 1 << 7; + var maskBytes = [_random.nextInt(256), _random.nextInt(256), + _random.nextInt(256), _random.nextInt(256)]; + header.setRange(index, index + 4, maskBytes); + index += 4; + if (data != null) { + Uint8List list; + // If this is a text message just do the masking inside the + // encoded data. + if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { + list = data; + } else { + if (data is Uint8List) { + list = new Uint8List.fromList(data); + } else { + list = new Uint8List(data.length); + for (int i = 0; i < data.length; i++) { + if (data[i] < 0 || 255 < data[i]) { + throw new ArgumentError( + "List element is not a byte value " + "(value ${data[i]} at index $i)"); + } + list[i] = data[i]; + } + } + } + const int BLOCK_SIZE = 16; + int blockCount = list.length ~/ BLOCK_SIZE; + if (blockCount > 0) { + // Create mask block. + int mask = 0; + for (int i = 3; i >= 0; i--) { + mask = (mask << 8) | maskBytes[i]; + } + Int32x4 blockMask = new Int32x4(mask, mask, mask, mask); + Int32x4List blockBuffer = new Int32x4List.view( + list.buffer, 0, blockCount); + for (int i = 0; i < blockBuffer.length; i++) { + blockBuffer[i] ^= blockMask; + } + } + // Handle end. + for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { + list[i] ^= maskBytes[i & 3]; + } + data = list; + } + } + assert(index == headerSize); + if (data == null) { + return [header]; + } else { + return [header, data]; + } + } +} + + +class _WebSocketConsumer implements StreamConsumer { + final WebSocketImpl webSocket; + final StreamSink<List<int>> sink; + StreamController _controller; + StreamSubscription _subscription; + bool _issuedPause = false; + bool _closed = false; + Completer _closeCompleter = new Completer(); + Completer _completer; + + _WebSocketConsumer(this.webSocket, this.sink); + + void _onListen() { + if (_subscription != null) { + _subscription.cancel(); + } + } + + void _onPause() { + if (_subscription != null) { + _subscription.pause(); + } else { + _issuedPause = true; + } + } + + void _onResume() { + if (_subscription != null) { + _subscription.resume(); + } else { + _issuedPause = false; + } + } + + void _cancel() { + if (_subscription != null) { + var subscription = _subscription; + _subscription = null; + subscription.cancel(); + } + } + + _ensureController() { + if (_controller != null) return; + _controller = new StreamController(sync: true, + onPause: _onPause, + onResume: _onResume, + onCancel: _onListen); + var stream = _controller.stream.transform( + new _WebSocketOutgoingTransformer(webSocket)); + sink.addStream(stream) + .then((_) { + _done(); + _closeCompleter.complete(webSocket); + }, onError: (error, StackTrace stackTrace) { + _closed = true; + _cancel(); + if (error is ArgumentError) { + if (!_done(error, stackTrace)) { + _closeCompleter.completeError(error, stackTrace); + } + } else { + _done(); + _closeCompleter.complete(webSocket); + } + }); + } + + bool _done([error, StackTrace stackTrace]) { + if (_completer == null) return false; + if (error != null) { + _completer.completeError(error, stackTrace); + } else { + _completer.complete(webSocket); + } + _completer = null; + return true; + } + + Future addStream(var stream) { + if (_closed) { + stream.listen(null).cancel(); + return new Future.value(webSocket); + } + _ensureController(); + _completer = new Completer(); + _subscription = stream.listen( + (data) { + _controller.add(data); + }, + onDone: _done, + onError: _done, + cancelOnError: true); + if (_issuedPause) { + _subscription.pause(); + _issuedPause = false; + } + return _completer.future; + } + + Future close() { + _ensureController(); + Future closeSocket() { + return sink.close().catchError((_) {}).then((_) => webSocket); + } + _controller.close(); + return _closeCompleter.future.then((_) => closeSocket()); + } + + void add(data) { + if (_closed) return; + _ensureController(); + _controller.add(data); + } + + void closeSocket() { + _closed = true; + _cancel(); + close(); + } +} + + +class WebSocketImpl extends Stream with _ServiceObject implements StreamSink { + // Use default Map so we keep order. + static Map<int, WebSocketImpl> _webSockets = new Map<int, WebSocketImpl>(); + + final String protocol; + + StreamController _controller; + StreamSubscription _subscription; + StreamSink _sink; + + final bool _serverSide; + int _readyState = WebSocket.CONNECTING; + bool _writeClosed = false; + int _closeCode; + String _closeReason; + Duration _pingInterval; + Timer _pingTimer; + _WebSocketConsumer _consumer; + + int _outCloseCode; + String _outCloseReason; + Timer _closeTimer; + + WebSocketImpl.fromSocket(Stream<List<int>> stream, + StreamSink<List<int>> sink, this.protocol, [this._serverSide = false]) { + _consumer = new _WebSocketConsumer(this, sink); + _sink = new StreamSinkImpl(_consumer); + _readyState = WebSocket.OPEN; + + var transformer = new _WebSocketProtocolTransformer(_serverSide); + _subscription = stream.transform(transformer).listen( + (data) { + if (data is _WebSocketPing) { + if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload)); + } else if (data is _WebSocketPong) { + // Simply set pingInterval, as it'll cancel any timers. + pingInterval = _pingInterval; + } else { + _controller.add(data); + } + }, + onError: (error) { + if (_closeTimer != null) _closeTimer.cancel(); + if (error is FormatException) { + _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); + } else { + _close(WebSocketStatus.PROTOCOL_ERROR); + } + // An error happened, set the close code set above. + _closeCode = _outCloseCode; + _closeReason = _outCloseReason; + _controller.close(); + }, + onDone: () { + if (_closeTimer != null) _closeTimer.cancel(); + if (_readyState == WebSocket.OPEN) { + _readyState = WebSocket.CLOSING; + if (!_isReservedStatusCode(transformer.closeCode)) { + _close(transformer.closeCode); + } else { + _close(); + } + _readyState = WebSocket.CLOSED; + } + // Protocol close, use close code from transformer. + _closeCode = transformer.closeCode; + _closeReason = transformer.closeReason; + _controller.close(); + }, + cancelOnError: true); + _subscription.pause(); + _controller = new StreamController(sync: true, + onListen: () => _subscription.resume(), + onCancel: () { + _subscription.cancel(); + _subscription = null; + }, + onPause: _subscription.pause, + onResume: _subscription.resume); + + _webSockets[_serviceId] = this; + } + + StreamSubscription listen(void onData(message), + {Function onError, + void onDone(), + bool cancelOnError}) { + return _controller.stream.listen(onData, + onError: onError, + onDone: onDone, + cancelOnError: cancelOnError); + } + + Duration get pingInterval => _pingInterval; + + void set pingInterval(Duration interval) { + if (_writeClosed) return; + if (_pingTimer != null) _pingTimer.cancel(); + _pingInterval = interval; + + if (_pingInterval == null) return; + + _pingTimer = new Timer(_pingInterval, () { + if (_writeClosed) return; + _consumer.add(new _WebSocketPing()); + _pingTimer = new Timer(_pingInterval, () { + // No pong received. + _close(WebSocketStatus.GOING_AWAY); + }); + }); + } + + int get readyState => _readyState; + + String get extensions => null; + int get closeCode => _closeCode; + String get closeReason => _closeReason; + + void add(data) => _sink.add(data); + void addError(error, [StackTrace stackTrace]) => + _sink.addError(error, stackTrace); + Future addStream(Stream stream) => _sink.addStream(stream); + Future get done => _sink.done; + + Future close([int code, String reason]) { + if (_isReservedStatusCode(code)) { + throw new WebSocketChannelException("Reserved status code $code"); + } + if (_outCloseCode == null) { + _outCloseCode = code; + _outCloseReason = reason; + } + if (!_controller.isClosed) { + // If a close has not yet been received from the other end then + // 1) make sure to listen on the stream so the close frame will be + // processed if received. + // 2) set a timer terminate the connection if a close frame is + // not received. + if (!_controller.hasListener && _subscription != null) { + _controller.stream.drain().catchError((_) => {}); + } + if (_closeTimer == null) { + // When closing the web-socket, we no longer accept data. + _closeTimer = new Timer(const Duration(seconds: 5), () { + // Reuse code and reason from the local close. + _closeCode = _outCloseCode; + _closeReason = _outCloseReason; + if (_subscription != null) _subscription.cancel(); + _controller.close(); + _webSockets.remove(_serviceId); + }); + } + } + return _sink.close(); + } + + void _close([int code, String reason]) { + if (_writeClosed) return; + if (_outCloseCode == null) { + _outCloseCode = code; + _outCloseReason = reason; + } + _writeClosed = true; + _consumer.closeSocket(); + _webSockets.remove(_serviceId); + } + + // The _toJSON, _serviceTypePath, and _serviceTypeName methods + // have been deleted for http_parser. The methods were unused in WebSocket + // code and produced warnings. + + static bool _isReservedStatusCode(int code) { + return code != null && + (code < WebSocketStatus.NORMAL_CLOSURE || + code == WebSocketStatus.RESERVED_1004 || + code == WebSocketStatus.NO_STATUS_RECEIVED || + code == WebSocketStatus.ABNORMAL_CLOSURE || + (code > WebSocketStatus.INTERNAL_SERVER_ERROR && + code < WebSocketStatus.RESERVED_1015) || + (code >= WebSocketStatus.RESERVED_1015 && + code < 3000)); + } +} + +// The following code is from sdk/lib/io/service_object.dart. + +int _nextServiceId = 1; + +// TODO(ajohnsen): Use other way of getting a uniq id. +abstract class _ServiceObject { + int __serviceId = 0; + int get _serviceId { + if (__serviceId == 0) __serviceId = _nextServiceId++; + return __serviceId; + } + + // The _toJSON, _servicePath, _serviceTypePath, _serviceTypeName, and + // _serviceType methods have been deleted for http_parser. The methods were + // unused in WebSocket code and produced warnings. +}
diff --git a/pkgs/web_socket_channel/lib/src/exception.dart b/pkgs/web_socket_channel/lib/src/exception.dart new file mode 100644 index 0000000..d06dc60 --- /dev/null +++ b/pkgs/web_socket_channel/lib/src/exception.dart
@@ -0,0 +1,19 @@ +// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'channel.dart'; + +/// An exception thrown by a [WebSocketChannel]. +class WebSocketChannelException implements Exception { + final String message; + + /// The exception that caused this one, if available. + final inner; + + WebSocketChannelException([this.message, this.inner]); + + String toString() => message == null + ? "WebSocketChannelException" : + "WebSocketChannelException: $message"; +}
diff --git a/pkgs/web_socket_channel/lib/web_socket_channel.dart b/pkgs/web_socket_channel/lib/web_socket_channel.dart new file mode 100644 index 0000000..299604c --- /dev/null +++ b/pkgs/web_socket_channel/lib/web_socket_channel.dart
@@ -0,0 +1,6 @@ +// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +export 'src/channel.dart'; +export 'src/exception.dart';
diff --git a/pkgs/web_socket_channel/pubspec.yaml b/pkgs/web_socket_channel/pubspec.yaml index 1d45b6f..c6ad187 100644 --- a/pkgs/web_socket_channel/pubspec.yaml +++ b/pkgs/web_socket_channel/pubspec.yaml
@@ -7,5 +7,10 @@ environment: sdk: '>=1.0.0 <2.0.0' +dependencies: + async: '^1.3.0' + crypto: '^0.9.0' + stream_channel: '^1.2.0' + dev_dependencies: test: '^0.12.0'
diff --git a/pkgs/web_socket_channel/test/web_socket_test.dart b/pkgs/web_socket_channel/test/web_socket_test.dart new file mode 100644 index 0000000..a105b67 --- /dev/null +++ b/pkgs/web_socket_channel/test/web_socket_test.dart
@@ -0,0 +1,97 @@ +// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +@TestOn('vm') + +import 'dart:io'; + +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; + +import 'package:web_socket_channel/web_socket_channel.dart'; + +void main() { + group("using WebSocketChannel", () { + test("a client can communicate with a WebSocket server", () async { + var server = await HttpServer.bind("localhost", 0); + server.transform(new WebSocketTransformer()).listen((webSocket) { + webSocket.add("hello!"); + webSocket.listen((request) { + expect(request, equals("ping")); + webSocket.add("pong"); + webSocket.close(); + }); + }); + + var client = new HttpClient(); + var request = await client.openUrl( + "GET", Uri.parse("http://localhost:${server.port}")); + request.headers + ..set("Connection", "Upgrade") + ..set("Upgrade", "websocket") + ..set("Sec-WebSocket-Key", "x3JJHMbDL1EzLkh9GBhXDw==") + ..set("Sec-WebSocket-Version", "13"); + + var response = await request.close(); + var socket = await response.detachSocket(); + var innerChannel = new StreamChannel(socket, socket); + var webSocket = new WebSocketChannel(innerChannel, serverSide: false); + + var n = 0; + await webSocket.stream.listen((message) { + if (n == 0) { + expect(message, equals("hello!")); + webSocket.sink.add("ping"); + } else if (n == 1) { + expect(message, equals("pong")); + webSocket.sink.close(); + server.close(); + } else { + fail("Only expected two messages."); + } + n++; + }).asFuture(); + }); + + test("a server can communicate with a WebSocket client", () async { + var server = await HttpServer.bind("localhost", 0); + server.listen((request) async { + var response = request.response; + response.statusCode = 101; + response.headers + ..set("Connection", "Upgrade") + ..set("Upgrade", "websocket") + ..set("Sec-WebSocket-Accept", WebSocketChannel + .signKey(request.headers.value('Sec-WebSocket-Key'))); + response.contentLength = 0; + + var socket = await response.detachSocket(); + var innerChannel = new StreamChannel(socket, socket); + var webSocket = new WebSocketChannel(innerChannel); + webSocket.sink.add("hello!"); + + var message = await webSocket.stream.first; + expect(message, equals("ping")); + webSocket.sink.add("pong"); + webSocket.sink.close(); + }); + + var webSocket = await WebSocket.connect('ws://localhost:${server.port}'); + var n = 0; + await webSocket.listen((message) { + if (n == 0) { + expect(message, equals("hello!")); + webSocket.add("ping"); + } else if (n == 1) { + expect(message, equals("pong")); + webSocket.close(); + server.close(); + } else { + fail("Only expected two messages."); + } + n++; + }).asFuture(); + }); + }); +}