| // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| part of dart._http; |
| |
| const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; |
| const String _clientNoContextTakeover = "client_no_context_takeover"; |
| const String _serverNoContextTakeover = "server_no_context_takeover"; |
| const String _clientMaxWindowBits = "client_max_window_bits"; |
| const String _serverMaxWindowBits = "server_max_window_bits"; |
| |
| // Matches _WebSocketOpcode. |
| class _WebSocketMessageType { |
| static const int NONE = 0; |
| static const int TEXT = 1; |
| static const int BINARY = 2; |
| } |
| |
| class _WebSocketOpcode { |
| static const int CONTINUATION = 0; |
| static const int TEXT = 1; |
| static const int BINARY = 2; |
| static const int RESERVED_3 = 3; |
| static const int RESERVED_4 = 4; |
| static const int RESERVED_5 = 5; |
| static const int RESERVED_6 = 6; |
| static const int RESERVED_7 = 7; |
| static const int CLOSE = 8; |
| static const int PING = 9; |
| static const int PONG = 10; |
| static const int RESERVED_B = 11; |
| static const int RESERVED_C = 12; |
| static const int RESERVED_D = 13; |
| static const int RESERVED_E = 14; |
| static const int RESERVED_F = 15; |
| } |
| |
| class _EncodedString { |
| final List<int> bytes; |
| _EncodedString(this.bytes); |
| } |
| |
| /// Stores the header and integer value derived from negotiation of |
| /// client_max_window_bits and server_max_window_bits. headerValue will be |
| /// set in the Websocket response headers. |
| class _CompressionMaxWindowBits { |
| String headerValue; |
| int maxWindowBits; |
| _CompressionMaxWindowBits(this.headerValue, this.maxWindowBits); |
| String toString() => headerValue; |
| } |
| |
| /// The web socket protocol transformer handles the protocol byte stream |
| /// which is supplied through the `handleData`. As the protocol is processed, |
| /// it'll output frame data as either a List<int> or String. |
| /// |
| /// Important information about usage: Be sure you use cancelOnError, so the |
| /// socket will be closed when the processor encounter an error. Not using it |
| /// will lead to undefined behaviour. |
| class _WebSocketProtocolTransformer extends StreamTransformerBase<List<int>, |
| dynamic /*List<int>|_WebSocketPing|_WebSocketPong*/ > |
| implements EventSink<List<int>> { |
| static const int START = 0; |
| static const int LEN_FIRST = 1; |
| static const int LEN_REST = 2; |
| static const int MASK = 3; |
| static const int PAYLOAD = 4; |
| static const int CLOSED = 5; |
| static const int FAILURE = 6; |
| static const int FIN = 0x80; |
| static const int RSV1 = 0x40; |
| static const int RSV2 = 0x20; |
| static const int RSV3 = 0x10; |
| static const int OPCODE = 0xF; |
| |
| int _state = START; |
| bool _fin = false; |
| bool _compressed = false; |
| int _opcode = -1; |
| int _len = -1; |
| bool _masked = false; |
| int _remainingLenBytes = -1; |
| int _remainingMaskingKeyBytes = 4; |
| int _remainingPayloadBytes = -1; |
| int _unmaskingIndex = 0; |
| int _currentMessageType = _WebSocketMessageType.NONE; |
| int closeCode = WebSocketStatus.noStatusReceived; |
| String closeReason = ""; |
| |
| EventSink<dynamic /*List<int>|_WebSocketPing|_WebSocketPong*/ >? _eventSink; |
| |
| final bool _serverSide; |
| final Uint8List _maskingBytes = Uint8List(4); |
| final BytesBuilder _payload = BytesBuilder(copy: false); |
| |
| final _WebSocketPerMessageDeflate? _deflate; |
| _WebSocketProtocolTransformer([this._serverSide = false, this._deflate]); |
| |
| Stream<dynamic /*List<int>|_WebSocketPing|_WebSocketPong*/ > bind( |
| Stream<List<int>> stream) { |
| return Stream.eventTransformed(stream, (EventSink eventSink) { |
| if (_eventSink != null) { |
| throw StateError("WebSocket transformer already used."); |
| } |
| _eventSink = eventSink; |
| return this; |
| }); |
| } |
| |
| void addError(Object error, [StackTrace? stackTrace]) { |
| // TODO(40614): Remove once non-nullability is sound. |
| ArgumentError.checkNotNull(error, "error"); |
| _eventSink!.addError(error, stackTrace); |
| } |
| |
| void close() { |
| _eventSink!.close(); |
| } |
| |
| /// Process data received from the underlying communication channel. |
| void add(List<int> bytes) { |
| var buffer = bytes is Uint8List ? bytes : Uint8List.fromList(bytes); |
| int index = 0; |
| int lastIndex = buffer.length; |
| if (_state == CLOSED) { |
| throw WebSocketException("Data on closed connection"); |
| } |
| if (_state == FAILURE) { |
| throw WebSocketException("Data on failed connection"); |
| } |
| while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { |
| int byte = buffer[index]; |
| if (_state <= LEN_REST) { |
| if (_state == START) { |
| _fin = (byte & FIN) != 0; |
| |
| if ((byte & (RSV2 | RSV3)) != 0) { |
| // The RSV2, RSV3 bits must both be zero. |
| throw WebSocketException("Protocol error"); |
| } |
| |
| _opcode = (byte & OPCODE); |
| |
| if (_opcode != _WebSocketOpcode.CONTINUATION) { |
| if ((byte & RSV1) != 0) { |
| _compressed = true; |
| } else { |
| _compressed = false; |
| } |
| } |
| |
| if (_opcode <= _WebSocketOpcode.BINARY) { |
| if (_opcode == _WebSocketOpcode.CONTINUATION) { |
| if (_currentMessageType == _WebSocketMessageType.NONE) { |
| throw WebSocketException("Protocol error"); |
| } |
| } else { |
| assert(_opcode == _WebSocketOpcode.TEXT || |
| _opcode == _WebSocketOpcode.BINARY); |
| if (_currentMessageType != _WebSocketMessageType.NONE) { |
| throw WebSocketException("Protocol error"); |
| } |
| _currentMessageType = _opcode; |
| } |
| } else if (_opcode >= _WebSocketOpcode.CLOSE && |
| _opcode <= _WebSocketOpcode.PONG) { |
| // Control frames cannot be fragmented. |
| if (!_fin) throw WebSocketException("Protocol error"); |
| } else { |
| throw WebSocketException("Protocol error"); |
| } |
| _state = LEN_FIRST; |
| } else if (_state == LEN_FIRST) { |
| _masked = (byte & 0x80) != 0; |
| _len = byte & 0x7F; |
| if (_isControlFrame() && _len > 125) { |
| throw WebSocketException("Protocol error"); |
| } |
| if (_len == 126) { |
| _len = 0; |
| _remainingLenBytes = 2; |
| _state = LEN_REST; |
| } else if (_len == 127) { |
| _len = 0; |
| _remainingLenBytes = 8; |
| _state = LEN_REST; |
| } else { |
| assert(_len < 126); |
| _lengthDone(); |
| } |
| } else { |
| assert(_state == LEN_REST); |
| _len = _len << 8 | byte; |
| _remainingLenBytes--; |
| if (_remainingLenBytes == 0) { |
| _lengthDone(); |
| } |
| } |
| } else { |
| if (_state == MASK) { |
| _maskingBytes[4 - _remainingMaskingKeyBytes--] = byte; |
| if (_remainingMaskingKeyBytes == 0) { |
| _maskDone(); |
| } |
| } else { |
| assert(_state == PAYLOAD); |
| // The payload is not handled one byte at a time but in blocks. |
| int payloadLength = min(lastIndex - index, _remainingPayloadBytes); |
| _remainingPayloadBytes -= payloadLength; |
| // Unmask payload if masked. |
| if (_masked) { |
| _unmask(index, payloadLength, buffer); |
| } |
| // Control frame and data frame share _payloads. |
| _payload.add(Uint8List.view( |
| buffer.buffer, buffer.offsetInBytes + index, payloadLength)); |
| index += payloadLength; |
| if (_isControlFrame()) { |
| if (_remainingPayloadBytes == 0) _controlFrameEnd(); |
| } else { |
| if (_currentMessageType != _WebSocketMessageType.TEXT && |
| _currentMessageType != _WebSocketMessageType.BINARY) { |
| throw WebSocketException("Protocol error"); |
| } |
| if (_remainingPayloadBytes == 0) _messageFrameEnd(); |
| } |
| |
| // Hack - as we always do index++ below. |
| index--; |
| } |
| } |
| |
| // Move to the next byte. |
| index++; |
| } |
| } |
| |
| void _unmask(int index, int length, Uint8List buffer) { |
| const int BLOCK_SIZE = 16; |
| // Skip Int32x4-version if message is small. |
| if (length >= BLOCK_SIZE) { |
| // Start by aligning to 16 bytes. |
| final int startOffset = BLOCK_SIZE - (index & 15); |
| final int end = index + startOffset; |
| for (int i = index; i < end; i++) { |
| buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; |
| } |
| index += startOffset; |
| length -= startOffset; |
| final int blockCount = length ~/ BLOCK_SIZE; |
| if (blockCount > 0) { |
| // Create mask block. |
| int mask = 0; |
| for (int i = 3; i >= 0; i--) { |
| mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; |
| } |
| Int32x4 blockMask = Int32x4(mask, mask, mask, mask); |
| Int32x4List blockBuffer = Int32x4List.view( |
| buffer.buffer, buffer.offsetInBytes + index, blockCount); |
| for (int i = 0; i < blockBuffer.length; i++) { |
| blockBuffer[i] ^= blockMask; |
| } |
| final int bytes = blockCount * BLOCK_SIZE; |
| index += bytes; |
| length -= bytes; |
| } |
| } |
| // Handle end. |
| final int end = index + length; |
| for (int i = index; i < end; i++) { |
| buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; |
| } |
| } |
| |
| void _lengthDone() { |
| if (_masked) { |
| if (!_serverSide) { |
| throw WebSocketException("Received masked frame from server"); |
| } |
| _state = MASK; |
| } else { |
| if (_serverSide) { |
| throw WebSocketException("Received unmasked frame from client"); |
| } |
| _remainingPayloadBytes = _len; |
| _startPayload(); |
| } |
| } |
| |
| void _maskDone() { |
| _remainingPayloadBytes = _len; |
| _startPayload(); |
| } |
| |
| void _startPayload() { |
| // If there is no actual payload perform perform callbacks without |
| // going through the PAYLOAD state. |
| if (_remainingPayloadBytes == 0) { |
| if (_isControlFrame()) { |
| switch (_opcode) { |
| case _WebSocketOpcode.CLOSE: |
| _state = CLOSED; |
| _eventSink!.close(); |
| break; |
| case _WebSocketOpcode.PING: |
| _eventSink!.add(_WebSocketPing()); |
| break; |
| case _WebSocketOpcode.PONG: |
| _eventSink!.add(_WebSocketPong()); |
| break; |
| } |
| _prepareForNextFrame(); |
| } else { |
| _messageFrameEnd(); |
| } |
| } else { |
| _state = PAYLOAD; |
| } |
| } |
| |
| void _messageFrameEnd() { |
| if (_fin) { |
| var bytes = _payload.takeBytes(); |
| var deflate = _deflate; |
| if (deflate != null && _compressed) { |
| bytes = deflate.processIncomingMessage(bytes); |
| } |
| |
| switch (_currentMessageType) { |
| case _WebSocketMessageType.TEXT: |
| _eventSink!.add(utf8.decode(bytes)); |
| break; |
| case _WebSocketMessageType.BINARY: |
| _eventSink!.add(bytes); |
| break; |
| } |
| _currentMessageType = _WebSocketMessageType.NONE; |
| } |
| _prepareForNextFrame(); |
| } |
| |
| void _controlFrameEnd() { |
| switch (_opcode) { |
| case _WebSocketOpcode.CLOSE: |
| closeCode = WebSocketStatus.noStatusReceived; |
| var payload = _payload.takeBytes(); |
| if (payload.isNotEmpty) { |
| if (payload.length == 1) { |
| throw WebSocketException("Protocol error"); |
| } |
| closeCode = payload[0] << 8 | payload[1]; |
| if (closeCode == WebSocketStatus.noStatusReceived) { |
| throw WebSocketException("Protocol error"); |
| } |
| if (payload.length > 2) { |
| closeReason = utf8.decode(payload.sublist(2)); |
| } |
| } |
| _state = CLOSED; |
| _eventSink!.close(); |
| break; |
| |
| case _WebSocketOpcode.PING: |
| _eventSink!.add(_WebSocketPing(_payload.takeBytes())); |
| break; |
| |
| case _WebSocketOpcode.PONG: |
| _eventSink!.add(_WebSocketPong(_payload.takeBytes())); |
| break; |
| } |
| _prepareForNextFrame(); |
| } |
| |
| bool _isControlFrame() { |
| return _opcode == _WebSocketOpcode.CLOSE || |
| _opcode == _WebSocketOpcode.PING || |
| _opcode == _WebSocketOpcode.PONG; |
| } |
| |
| void _prepareForNextFrame() { |
| if (_state != CLOSED && _state != FAILURE) _state = START; |
| _fin = false; |
| _opcode = -1; |
| _len = -1; |
| _remainingLenBytes = -1; |
| _remainingMaskingKeyBytes = 4; |
| _remainingPayloadBytes = -1; |
| _unmaskingIndex = 0; |
| } |
| } |
| |
| class _WebSocketPing { |
| final List<int>? payload; |
| _WebSocketPing([this.payload]); |
| } |
| |
| class _WebSocketPong { |
| final List<int>? payload; |
| _WebSocketPong([this.payload]); |
| } |
| |
| typedef /*String|Future<String>*/ _ProtocolSelector = Function( |
| List<String> protocols); |
| |
| class _WebSocketTransformerImpl |
| extends StreamTransformerBase<HttpRequest, WebSocket> |
| implements WebSocketTransformer { |
| final StreamController<WebSocket> _controller = |
| StreamController<WebSocket>(sync: true); |
| final _ProtocolSelector? _protocolSelector; |
| final CompressionOptions _compression; |
| |
| _WebSocketTransformerImpl(this._protocolSelector, this._compression); |
| |
| Stream<WebSocket> bind(Stream<HttpRequest> stream) { |
| stream.listen((request) { |
| _upgrade(request, _protocolSelector, _compression) |
| .then((WebSocket webSocket) => _controller.add(webSocket)) |
| .catchError(_controller.addError); |
| }, onDone: () { |
| _controller.close(); |
| }); |
| |
| return _controller.stream; |
| } |
| |
| static List<String> _tokenizeFieldValue(String headerValue) { |
| List<String> tokens = <String>[]; |
| int start = 0; |
| int index = 0; |
| while (index < headerValue.length) { |
| if (headerValue[index] == ",") { |
| tokens.add(headerValue.substring(start, index)); |
| start = index + 1; |
| } else if (headerValue[index] == " " || headerValue[index] == "\t") { |
| start++; |
| } |
| index++; |
| } |
| tokens.add(headerValue.substring(start, index)); |
| return tokens; |
| } |
| |
| static Future<WebSocket> _upgrade(HttpRequest request, |
| _ProtocolSelector? protocolSelector, CompressionOptions compression) { |
| var response = request.response; |
| if (!_isUpgradeRequest(request)) { |
| // Send error response. |
| response |
| ..statusCode = HttpStatus.badRequest |
| ..close(); |
| return Future.error( |
| WebSocketException("Invalid WebSocket upgrade request")); |
| } |
| |
| Future<WebSocket> upgrade(String? protocol) { |
| // Send the upgrade response. |
| response |
| ..statusCode = HttpStatus.switchingProtocols |
| ..headers.add(HttpHeaders.connectionHeader, "Upgrade") |
| ..headers.add(HttpHeaders.upgradeHeader, "websocket"); |
| String key = request.headers.value("Sec-WebSocket-Key")!; |
| _SHA1 sha1 = _SHA1(); |
| sha1.add("$key$_webSocketGUID".codeUnits); |
| String accept = base64Encode(sha1.close()); |
| response.headers.add("Sec-WebSocket-Accept", accept); |
| if (protocol != null) { |
| response.headers.add("Sec-WebSocket-Protocol", protocol); |
| } |
| |
| var deflate = _negotiateCompression(request, response, compression); |
| |
| response.headers.contentLength = 0; |
| return response.detachSocket().then<WebSocket>((socket) => |
| _WebSocketImpl._fromSocket( |
| socket, protocol, compression, true, deflate)); |
| } |
| |
| var protocols = request.headers['Sec-WebSocket-Protocol']; |
| if (protocols != null && protocolSelector != null) { |
| // The suggested protocols can be spread over multiple lines, each |
| // consisting of multiple protocols. To unify all of them, first join |
| // the lists with ', ' and then tokenize. |
| var tokenizedProtocols = _tokenizeFieldValue(protocols.join(', ')); |
| return Future<String>(() => protocolSelector(tokenizedProtocols)) |
| .then<String>((protocol) { |
| if (!tokenizedProtocols.contains(protocol)) { |
| throw WebSocketException( |
| "Selected protocol is not in the list of available protocols"); |
| } |
| return protocol; |
| }).catchError((error) { |
| response |
| ..statusCode = HttpStatus.internalServerError |
| ..close(); |
| throw error; |
| }).then<WebSocket>(upgrade); |
| } else { |
| return upgrade(null); |
| } |
| } |
| |
| static _WebSocketPerMessageDeflate? _negotiateCompression(HttpRequest request, |
| HttpResponse response, CompressionOptions compression) { |
| var extensionHeader = request.headers.value("Sec-WebSocket-Extensions"); |
| |
| extensionHeader ??= ""; |
| |
| var hv = HeaderValue.parse(extensionHeader, valueSeparator: ','); |
| if (compression.enabled && hv.value == _WebSocketImpl.PER_MESSAGE_DEFLATE) { |
| var info = compression._createHeader(hv); |
| |
| response.headers.add("Sec-WebSocket-Extensions", info.headerValue); |
| var serverNoContextTakeover = |
| (hv.parameters.containsKey(_serverNoContextTakeover) && |
| compression.serverNoContextTakeover); |
| var clientNoContextTakeover = |
| (hv.parameters.containsKey(_clientNoContextTakeover) && |
| compression.clientNoContextTakeover); |
| var deflate = _WebSocketPerMessageDeflate( |
| serverNoContextTakeover: serverNoContextTakeover, |
| clientNoContextTakeover: clientNoContextTakeover, |
| serverMaxWindowBits: info.maxWindowBits, |
| clientMaxWindowBits: info.maxWindowBits, |
| serverSide: true); |
| |
| return deflate; |
| } |
| |
| return null; |
| } |
| |
| static bool _isUpgradeRequest(HttpRequest request) { |
| if (request.method != "GET") { |
| return false; |
| } |
| var connectionHeader = request.headers[HttpHeaders.connectionHeader]; |
| if (connectionHeader == null) { |
| return false; |
| } |
| bool isUpgrade = false; |
| for (var value in connectionHeader) { |
| if (value.toLowerCase() == "upgrade") { |
| isUpgrade = true; |
| break; |
| } |
| } |
| if (!isUpgrade) return false; |
| String? upgrade = request.headers.value(HttpHeaders.upgradeHeader); |
| if (upgrade == null || upgrade.toLowerCase() != "websocket") { |
| return false; |
| } |
| String? version = request.headers.value("Sec-WebSocket-Version"); |
| if (version == null || version != "13") { |
| return false; |
| } |
| String? key = request.headers.value("Sec-WebSocket-Key"); |
| if (key == null) { |
| return false; |
| } |
| return true; |
| } |
| } |
| |
| class _WebSocketPerMessageDeflate { |
| bool serverNoContextTakeover; |
| bool clientNoContextTakeover; |
| int clientMaxWindowBits; |
| int serverMaxWindowBits; |
| bool serverSide; |
| |
| RawZLibFilter? decoder; |
| RawZLibFilter? encoder; |
| |
| _WebSocketPerMessageDeflate( |
| {this.clientMaxWindowBits = _WebSocketImpl.DEFAULT_WINDOW_BITS, |
| this.serverMaxWindowBits = _WebSocketImpl.DEFAULT_WINDOW_BITS, |
| this.serverNoContextTakeover = false, |
| this.clientNoContextTakeover = false, |
| this.serverSide = false}); |
| |
| RawZLibFilter _ensureDecoder() => decoder ??= RawZLibFilter.inflateFilter( |
| windowBits: serverSide ? clientMaxWindowBits : serverMaxWindowBits, |
| raw: true); |
| |
| RawZLibFilter _ensureEncoder() => encoder ??= RawZLibFilter.deflateFilter( |
| windowBits: serverSide ? serverMaxWindowBits : clientMaxWindowBits, |
| raw: true); |
| |
| Uint8List processIncomingMessage(List<int> msg) { |
| var decoder = _ensureDecoder(); |
| |
| var data = <int>[]; |
| data.addAll(msg); |
| data.addAll(const [0x00, 0x00, 0xff, 0xff]); |
| |
| decoder.process(data, 0, data.length); |
| final result = BytesBuilder(); |
| |
| while (true) { |
| final out = decoder.processed(); |
| if (out == null) break; |
| result.add(out); |
| } |
| |
| if ((serverSide && clientNoContextTakeover) || |
| (!serverSide && serverNoContextTakeover)) { |
| this.decoder = null; |
| } |
| |
| return result.takeBytes(); |
| } |
| |
| List<int> processOutgoingMessage(List<int> msg) { |
| var encoder = _ensureEncoder(); |
| var result = <int>[]; |
| Uint8List buffer; |
| |
| if (msg is! Uint8List) { |
| for (var i = 0; i < msg.length; i++) { |
| if (msg[i] < 0 || 255 < msg[i]) { |
| throw ArgumentError("List element is not a byte value " |
| "(value ${msg[i]} at index $i)"); |
| } |
| } |
| buffer = Uint8List.fromList(msg); |
| } else { |
| buffer = msg; |
| } |
| |
| encoder.process(buffer, 0, buffer.length); |
| |
| while (true) { |
| final out = encoder.processed(); |
| if (out == null) break; |
| result.addAll(out); |
| } |
| |
| if ((!serverSide && clientNoContextTakeover) || |
| (serverSide && serverNoContextTakeover)) { |
| this.encoder = null; |
| } |
| |
| if (result.length > 4) { |
| result = result.sublist(0, result.length - 4); |
| } |
| |
| // RFC 7692 7.2.3.6. "Generating an Empty Fragment" says that if the |
| // compression library doesn't generate any data when the bufer is empty, |
| // then an empty uncompressed deflate block is used for this purpose. The |
| // 0x00 block has the BFINAL header bit set to 0 and the BTYPE header set to |
| // 00 along with 5 bits of padding. This block decodes to zero bytes. |
| if (result.isEmpty) { |
| return [0x00]; |
| } |
| |
| return result; |
| } |
| } |
| |
| // TODO(ajohnsen): Make this transformer reusable. |
| class _WebSocketOutgoingTransformer |
| extends StreamTransformerBase<dynamic, List<int>> implements EventSink { |
| final _WebSocketImpl webSocket; |
| EventSink<List<int>>? _eventSink; |
| |
| final _WebSocketPerMessageDeflate? _deflateHelper; |
| |
| _WebSocketOutgoingTransformer(this.webSocket) |
| : _deflateHelper = webSocket._deflate; |
| |
| Stream<List<int>> bind(Stream stream) { |
| return Stream<List<int>>.eventTransformed(stream, |
| (EventSink<List<int>> eventSink) { |
| if (_eventSink != null) { |
| throw StateError("WebSocket transformer already used"); |
| } |
| _eventSink = eventSink; |
| return this; |
| }); |
| } |
| |
| void add(message) { |
| if (message is _WebSocketPong) { |
| addFrame(_WebSocketOpcode.PONG, message.payload); |
| return; |
| } |
| if (message is _WebSocketPing) { |
| addFrame(_WebSocketOpcode.PING, message.payload); |
| return; |
| } |
| List<int>? data; |
| int opcode; |
| if (message != null) { |
| List<int> messageData; |
| if (message is String) { |
| opcode = _WebSocketOpcode.TEXT; |
| messageData = utf8.encode(message); |
| } else if (message is List<int>) { |
| opcode = _WebSocketOpcode.BINARY; |
| messageData = message; |
| } else if (message is _EncodedString) { |
| opcode = _WebSocketOpcode.TEXT; |
| messageData = message.bytes; |
| } else { |
| throw ArgumentError(message); |
| } |
| var deflateHelper = _deflateHelper; |
| if (deflateHelper != null) { |
| messageData = deflateHelper.processOutgoingMessage(messageData); |
| } |
| data = messageData; |
| } else { |
| opcode = _WebSocketOpcode.TEXT; |
| } |
| addFrame(opcode, data); |
| } |
| |
| void addError(Object error, [StackTrace? stackTrace]) { |
| // TODO(40614): Remove once non-nullability is sound. |
| ArgumentError.checkNotNull(error, "error"); |
| _eventSink!.addError(error, stackTrace); |
| } |
| |
| void close() { |
| int? code = webSocket._outCloseCode; |
| String? reason = webSocket._outCloseReason; |
| List<int>? data; |
| if (code != null) { |
| data = [ |
| (code >> 8) & 0xFF, |
| code & 0xFF, |
| if (reason != null) ...utf8.encode(reason) |
| ]; |
| } |
| addFrame(_WebSocketOpcode.CLOSE, data); |
| _eventSink!.close(); |
| } |
| |
| void addFrame(int opcode, List<int>? data) { |
| createFrame( |
| opcode, |
| data, |
| webSocket._serverSide, |
| _deflateHelper != null && |
| (opcode == _WebSocketOpcode.TEXT || |
| opcode == _WebSocketOpcode.BINARY)) |
| .forEach((e) { |
| _eventSink!.add(e); |
| }); |
| } |
| |
| static Iterable<List<int>> createFrame( |
| int opcode, List<int>? data, bool serverSide, bool compressed) { |
| bool mask = !serverSide; // Masking not implemented for server. |
| int dataLength = data == null ? 0 : data.length; |
| // Determine the header size. |
| int headerSize = (mask) ? 6 : 2; |
| if (dataLength > 65535) { |
| headerSize += 8; |
| } else if (dataLength > 125) { |
| headerSize += 2; |
| } |
| Uint8List header = Uint8List(headerSize); |
| int index = 0; |
| |
| // Set FIN and opcode. |
| var hoc = _WebSocketProtocolTransformer.FIN | |
| (compressed ? _WebSocketProtocolTransformer.RSV1 : 0) | |
| (opcode & _WebSocketProtocolTransformer.OPCODE); |
| |
| header[index++] = hoc; |
| // Determine size and position of length field. |
| int lengthBytes = 1; |
| if (dataLength > 65535) { |
| header[index++] = 127; |
| lengthBytes = 8; |
| } else if (dataLength > 125) { |
| header[index++] = 126; |
| lengthBytes = 2; |
| } |
| // Write the length in network byte order into the header. |
| for (int i = 0; i < lengthBytes; i++) { |
| header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; |
| } |
| if (mask) { |
| header[1] |= 1 << 7; |
| var maskBytes = _CryptoUtils.getRandomBytes(4); |
| header.setRange(index, index + 4, maskBytes); |
| index += 4; |
| if (data != null) { |
| Uint8List list; |
| // If this is a text message just do the masking inside the |
| // encoded data. |
| if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { |
| list = data; |
| } else { |
| if (data is Uint8List) { |
| list = Uint8List.fromList(data); |
| } else { |
| list = Uint8List(data.length); |
| for (int i = 0; i < data.length; i++) { |
| if (data[i] < 0 || 255 < data[i]) { |
| throw ArgumentError("List element is not a byte value " |
| "(value ${data[i]} at index $i)"); |
| } |
| list[i] = data[i]; |
| } |
| } |
| } |
| const int BLOCK_SIZE = 16; |
| int blockCount = list.length ~/ BLOCK_SIZE; |
| if (blockCount > 0) { |
| // Create mask block. |
| int mask = 0; |
| for (int i = 3; i >= 0; i--) { |
| mask = (mask << 8) | maskBytes[i]; |
| } |
| Int32x4 blockMask = Int32x4(mask, mask, mask, mask); |
| Int32x4List blockBuffer = |
| Int32x4List.view(list.buffer, list.offsetInBytes, blockCount); |
| for (int i = 0; i < blockBuffer.length; i++) { |
| blockBuffer[i] ^= blockMask; |
| } |
| } |
| // Handle end. |
| for (int i = blockCount * BLOCK_SIZE; i < list.length; i++) { |
| list[i] ^= maskBytes[i & 3]; |
| } |
| data = list; |
| } |
| } |
| assert(index == headerSize); |
| if (data == null) { |
| return [header]; |
| } else { |
| return [header, data]; |
| } |
| } |
| } |
| |
| class _WebSocketConsumer implements StreamConsumer { |
| final _WebSocketImpl webSocket; |
| final Socket socket; |
| StreamController? _controller; |
| StreamSubscription? _subscription; |
| bool _issuedPause = false; |
| bool _closed = false; |
| final Completer _closeCompleter = Completer<WebSocket>(); |
| Completer? _completer; |
| |
| _WebSocketConsumer(this.webSocket, this.socket); |
| |
| void _onListen() { |
| _subscription?.cancel(); |
| } |
| |
| void _onPause() { |
| var subscription = _subscription; |
| if (subscription != null) { |
| subscription.pause(); |
| } else { |
| _issuedPause = true; |
| } |
| } |
| |
| void _onResume() { |
| var subscription = _subscription; |
| if (subscription != null) { |
| subscription.resume(); |
| } else { |
| _issuedPause = false; |
| } |
| } |
| |
| void _cancel() { |
| var subscription = _subscription; |
| if (subscription != null) { |
| _subscription = null; |
| subscription.cancel(); |
| } |
| } |
| |
| StreamController _ensureController() { |
| var controller = _controller; |
| if (controller != null) return controller; |
| controller = _controller = StreamController( |
| sync: true, |
| onPause: _onPause, |
| onResume: _onResume, |
| onCancel: _onListen); |
| var stream = |
| controller.stream.transform(_WebSocketOutgoingTransformer(webSocket)); |
| socket.addStream(stream).then((_) { |
| _done(); |
| _closeCompleter.complete(webSocket); |
| }, onError: (Object error, StackTrace stackTrace) { |
| _closed = true; |
| _cancel(); |
| if (error is ArgumentError) { |
| if (!_done(error, stackTrace)) { |
| _closeCompleter.completeError(error, stackTrace); |
| } |
| } else { |
| _done(); |
| _closeCompleter.complete(webSocket); |
| } |
| }); |
| return controller; |
| } |
| |
| bool _done([Object? error, StackTrace? stackTrace]) { |
| var completer = _completer; |
| if (completer == null) return false; |
| if (error != null) { |
| completer.completeError(error, stackTrace); |
| } else { |
| completer.complete(webSocket); |
| } |
| _completer = null; |
| return true; |
| } |
| |
| Future addStream(Stream stream) { |
| if (_closed) { |
| stream.listen(null).cancel(); |
| return Future.value(webSocket); |
| } |
| _ensureController(); |
| var completer = _completer = Completer(); |
| var subscription = _subscription = stream.listen((data) { |
| _controller!.add(data); |
| }, onDone: _done, onError: _done, cancelOnError: true); |
| if (_issuedPause) { |
| subscription.pause(); |
| _issuedPause = false; |
| } |
| return completer.future; |
| } |
| |
| Future close() { |
| _ensureController().close(); |
| |
| return _closeCompleter.future |
| .then((_) => socket.close().catchError((_) {}).then((_) => webSocket)); |
| } |
| |
| void add(data) { |
| if (_closed) return; |
| var controller = _ensureController(); |
| // Stop sending message if _controller has been closed. |
| // https://github.com/dart-lang/sdk/issues/37441 |
| if (controller.isClosed) return; |
| controller.add(data); |
| } |
| |
| void closeSocket() { |
| _closed = true; |
| _cancel(); |
| close(); |
| } |
| } |
| |
| class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket { |
| // Use default Map so we keep order. |
| static final Map<int, _WebSocketImpl> _webSockets = <int, _WebSocketImpl>{}; |
| static const int DEFAULT_WINDOW_BITS = 15; |
| static const String PER_MESSAGE_DEFLATE = "permessage-deflate"; |
| |
| final String? protocol; |
| |
| final StreamController _controller; |
| StreamSubscription? _subscription; |
| late StreamSink _sink; |
| |
| final Socket _socket; |
| final bool _serverSide; |
| int _readyState = WebSocket.connecting; |
| bool _writeClosed = false; |
| int? _closeCode; |
| String? _closeReason; |
| Duration? _pingInterval; |
| Timer? _pingTimer; |
| late _WebSocketConsumer _consumer; |
| |
| int? _outCloseCode; |
| String? _outCloseReason; |
| Timer? _closeTimer; |
| _WebSocketPerMessageDeflate? _deflate; |
| |
| static final HttpClient _httpClient = HttpClient(); |
| |
| static Future<WebSocket> connect( |
| String url, Iterable<String>? protocols, Map<String, dynamic>? headers, |
| {CompressionOptions compression = CompressionOptions.compressionDefault, |
| HttpClient? customClient}) { |
| Uri uri = Uri.parse(url); |
| if (!uri.isScheme("ws") && !uri.isScheme("wss")) { |
| throw WebSocketException("Unsupported URL scheme '${uri.scheme}'"); |
| } |
| |
| Random random = Random(); |
| // Generate 16 random bytes. |
| Uint8List nonceData = Uint8List(16); |
| for (int i = 0; i < 16; i++) { |
| nonceData[i] = random.nextInt(256); |
| } |
| String nonce = base64Encode(nonceData); |
| |
| final callerStackTrace = StackTrace.current; |
| |
| uri = Uri( |
| scheme: uri.isScheme("wss") ? "https" : "http", |
| userInfo: uri.userInfo, |
| host: uri.host, |
| port: uri.port, |
| path: uri.path, |
| query: uri.query, |
| fragment: uri.fragment); |
| return (customClient ?? _httpClient).openUrl("GET", uri).then((request) { |
| if (uri.userInfo != null && uri.userInfo.isNotEmpty) { |
| // If the URL contains user information use that for basic |
| // authorization. |
| String auth = base64Encode(utf8.encode(uri.userInfo)); |
| request.headers.set(HttpHeaders.authorizationHeader, "Basic $auth"); |
| } |
| if (headers != null) { |
| headers.forEach((field, value) => request.headers.add(field, value)); |
| } |
| // Setup the initial handshake. |
| request.headers |
| ..set(HttpHeaders.connectionHeader, "Upgrade") |
| ..set(HttpHeaders.upgradeHeader, "websocket") |
| ..set("Sec-WebSocket-Key", nonce) |
| ..set("Cache-Control", "no-cache") |
| ..set("Sec-WebSocket-Version", "13"); |
| if (protocols != null) { |
| request.headers.add("Sec-WebSocket-Protocol", protocols.toList()); |
| } |
| |
| if (compression.enabled) { |
| request.headers |
| .add("Sec-WebSocket-Extensions", compression._createHeader()); |
| } |
| |
| return request.close(); |
| }).then((response) { |
| Future<WebSocket> error(String message) { |
| // Flush data. |
| response.detachSocket().then((socket) { |
| socket.destroy(); |
| }); |
| return Future<WebSocket>.error( |
| WebSocketException(message), callerStackTrace); |
| } |
| |
| var connectionHeader = response.headers[HttpHeaders.connectionHeader]; |
| if (response.statusCode != HttpStatus.switchingProtocols || |
| connectionHeader == null || |
| !connectionHeader.any((value) => value.toLowerCase() == "upgrade") || |
| response.headers.value(HttpHeaders.upgradeHeader)!.toLowerCase() != |
| "websocket") { |
| return error("Connection to '$uri' was not upgraded to websocket"); |
| } |
| String? accept = response.headers.value("Sec-WebSocket-Accept"); |
| if (accept == null) { |
| return error( |
| "Response did not contain a 'Sec-WebSocket-Accept' header"); |
| } |
| _SHA1 sha1 = _SHA1(); |
| sha1.add("$nonce$_webSocketGUID".codeUnits); |
| List<int> expectedAccept = sha1.close(); |
| List<int> receivedAccept = base64Decode(accept); |
| if (expectedAccept.length != receivedAccept.length) { |
| return error( |
| "Response header 'Sec-WebSocket-Accept' is the wrong length"); |
| } |
| for (int i = 0; i < expectedAccept.length; i++) { |
| if (expectedAccept[i] != receivedAccept[i]) { |
| return error("Bad response 'Sec-WebSocket-Accept' header"); |
| } |
| } |
| var protocol = response.headers.value('Sec-WebSocket-Protocol'); |
| |
| _WebSocketPerMessageDeflate? deflate = |
| negotiateClientCompression(response, compression); |
| |
| return response.detachSocket().then<WebSocket>((socket) => |
| _WebSocketImpl._fromSocket( |
| socket, protocol, compression, false, deflate)); |
| }); |
| } |
| |
| static _WebSocketPerMessageDeflate? negotiateClientCompression( |
| HttpClientResponse response, CompressionOptions compression) { |
| String extensionHeader = |
| response.headers.value('Sec-WebSocket-Extensions') ?? ""; |
| |
| var hv = HeaderValue.parse(extensionHeader, valueSeparator: ','); |
| |
| if (compression.enabled && hv.value == PER_MESSAGE_DEFLATE) { |
| var serverNoContextTakeover = |
| hv.parameters.containsKey(_serverNoContextTakeover); |
| var clientNoContextTakeover = |
| hv.parameters.containsKey(_clientNoContextTakeover); |
| |
| int getWindowBits(String type) { |
| var o = hv.parameters[type]; |
| if (o == null) { |
| return DEFAULT_WINDOW_BITS; |
| } |
| |
| return int.tryParse(o) ?? DEFAULT_WINDOW_BITS; |
| } |
| |
| return _WebSocketPerMessageDeflate( |
| clientMaxWindowBits: getWindowBits(_clientMaxWindowBits), |
| serverMaxWindowBits: getWindowBits(_serverMaxWindowBits), |
| clientNoContextTakeover: clientNoContextTakeover, |
| serverNoContextTakeover: serverNoContextTakeover); |
| } |
| |
| return null; |
| } |
| |
| _WebSocketImpl._fromSocket( |
| this._socket, this.protocol, CompressionOptions compression, |
| [this._serverSide = false, _WebSocketPerMessageDeflate? deflate]) |
| : _controller = StreamController(sync: true) { |
| _consumer = _WebSocketConsumer(this, _socket); |
| _sink = _StreamSinkImpl(_consumer); |
| _readyState = WebSocket.open; |
| _deflate = deflate; |
| |
| var transformer = _WebSocketProtocolTransformer(_serverSide, deflate); |
| var subscription = _subscription = transformer.bind(_socket).listen((data) { |
| if (data is _WebSocketPing) { |
| if (!_writeClosed) _consumer.add(_WebSocketPong(data.payload)); |
| } else if (data is _WebSocketPong) { |
| // Simply set pingInterval, as it'll cancel any timers. |
| pingInterval = _pingInterval; |
| } else { |
| _controller.add(data); |
| } |
| }, onError: (Object error, StackTrace stackTrace) { |
| _closeTimer?.cancel(); |
| if (error is FormatException) { |
| _close(WebSocketStatus.invalidFramePayloadData); |
| } else { |
| _close(WebSocketStatus.protocolError); |
| } |
| // An error happened, set the close code set above. |
| _closeCode = _outCloseCode; |
| _closeReason = _outCloseReason; |
| _controller.close(); |
| }, onDone: () { |
| _closeTimer?.cancel(); |
| if (_readyState == WebSocket.open) { |
| _readyState = WebSocket.closing; |
| if (!_isReservedStatusCode(transformer.closeCode)) { |
| _close(transformer.closeCode, transformer.closeReason); |
| } else { |
| _close(); |
| } |
| _readyState = WebSocket.closed; |
| } |
| // Protocol close, use close code from transformer. |
| _closeCode = transformer.closeCode; |
| _closeReason = transformer.closeReason; |
| _controller.close(); |
| }, cancelOnError: true); |
| subscription.pause(); |
| _controller |
| ..onListen = subscription.resume |
| ..onCancel = () { |
| _subscription!.cancel(); |
| _subscription = null; |
| } |
| ..onPause = subscription.pause |
| ..onResume = subscription.resume; |
| |
| _webSockets[_serviceId] = this; |
| } |
| |
| StreamSubscription listen(void onData(message)?, |
| {Function? onError, void Function()? onDone, bool? cancelOnError}) { |
| return _controller.stream.listen(onData, |
| onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
| } |
| |
| Duration? get pingInterval => _pingInterval; |
| |
| void set pingInterval(Duration? interval) { |
| if (_writeClosed) return; |
| _pingTimer?.cancel(); |
| _pingInterval = interval; |
| |
| if (interval == null) return; |
| |
| _pingTimer = Timer(interval, () { |
| if (_writeClosed) return; |
| _consumer.add(_WebSocketPing()); |
| _pingTimer = Timer(interval, () { |
| _closeTimer?.cancel(); |
| // No pong received. |
| _close(WebSocketStatus.goingAway); |
| _closeCode = _outCloseCode; |
| _closeReason = _outCloseReason; |
| _controller.close(); |
| }); |
| }); |
| } |
| |
| int get readyState => _readyState; |
| |
| String get extensions => ""; |
| int? get closeCode => _closeCode; |
| String? get closeReason => _closeReason; |
| |
| void add(data) { |
| _sink.add(data); |
| } |
| |
| void addUtf8Text(List<int> bytes) { |
| // TODO(40614): Remove once non-nullability is sound. |
| ArgumentError.checkNotNull(bytes, "bytes"); |
| _sink.add(_EncodedString(bytes)); |
| } |
| |
| void addError(Object error, [StackTrace? stackTrace]) { |
| _sink.addError(error, stackTrace); |
| } |
| |
| Future addStream(Stream stream) => _sink.addStream(stream); |
| Future get done => _sink.done; |
| |
| Future close([int? code, String? reason]) { |
| if (_isReservedStatusCode(code)) { |
| throw WebSocketException("Reserved status code $code"); |
| } |
| if (_outCloseCode == null) { |
| _outCloseCode = code; |
| _outCloseReason = reason; |
| } |
| if (!_controller.isClosed) { |
| // If a close has not yet been received from the other end then |
| // 1) make sure to listen on the stream so the close frame will be |
| // processed if received. |
| // 2) set a timer terminate the connection if a close frame is |
| // not received. |
| if (!_controller.hasListener && _subscription != null) { |
| _controller.stream.drain().catchError((_) => {}); |
| } |
| _closeTimer ??= Timer(const Duration(seconds: 5), () { |
| // Reuse code and reason from the local close. |
| _closeCode = _outCloseCode; |
| _closeReason = _outCloseReason; |
| _subscription?.cancel(); |
| _controller.close(); |
| _webSockets.remove(_serviceId); |
| }); |
| } |
| return _sink.close(); |
| } |
| |
| static String? get userAgent => _httpClient.userAgent; |
| |
| static set userAgent(String? userAgent) { |
| _httpClient.userAgent = userAgent; |
| } |
| |
| void _close([int? code, String? reason]) { |
| if (_writeClosed) return; |
| if (_outCloseCode == null) { |
| _outCloseCode = code; |
| _outCloseReason = reason; |
| } |
| _writeClosed = true; |
| _consumer.closeSocket(); |
| _webSockets.remove(_serviceId); |
| } |
| |
| String get _serviceTypePath => 'io/websockets'; |
| String get _serviceTypeName => 'WebSocket'; |
| |
| Map<String, dynamic> _toJSON(bool ref) { |
| var name = '${_socket.address.host}:${_socket.port}'; |
| var r = <String, dynamic>{ |
| 'id': _servicePath, |
| 'type': _serviceType(ref), |
| 'name': name, |
| 'user_name': name, |
| }; |
| if (ref) { |
| return r; |
| } |
| try { |
| r['socket'] = (_socket as dynamic)._toJSON(true); |
| } catch (_) { |
| r['socket'] = { |
| 'id': _servicePath, |
| 'type': '@Socket', |
| 'name': 'UserSocket', |
| 'user_name': 'UserSocket', |
| }; |
| } |
| return r; |
| } |
| |
| static bool _isReservedStatusCode(int? code) { |
| return code != null && |
| (code < WebSocketStatus.normalClosure || |
| code == WebSocketStatus.reserved1004 || |
| code == WebSocketStatus.noStatusReceived || |
| code == WebSocketStatus.abnormalClosure || |
| (code > WebSocketStatus.internalServerError && |
| code < WebSocketStatus.reserved1015) || |
| (code >= WebSocketStatus.reserved1015 && code < 3000)); |
| } |
| } |