diff --git a/CHANGELOG.md b/CHANGELOG.md index b7f318e..5026c83 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md
@@ -1,11 +1,20 @@ +## 3.0.0-wip + +- Provide an adapter around `package:web_socket` `WebSocket`s and make it the + default implementation for `WebSocketChannel.connect`. +- **BREAKING**: Remove `WebSocketChannel` constructor. +- **BREAKING**: Make `WebSocketChannel` an `abstract interface`. +- **BREAKING**: `IOWebSocketChannel.ready` will throw + `WebSocketChannelException` instead of `WebSocketException`. + ## 2.4.5 -* use secure random number generator for frame masking. +- use secure random number generator for frame masking. ## 2.4.4 -* Require Dart `^3.3` -* Require `package:web` `^0.5.0`. +- Require Dart `^3.3` +- Require `package:web` `^0.5.0`. ## 2.4.3
diff --git a/lib/adapter_web_socket_channel.dart b/lib/adapter_web_socket_channel.dart new file mode 100644 index 0000000..8415f27 --- /dev/null +++ b/lib/adapter_web_socket_channel.dart
@@ -0,0 +1,149 @@ +// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:typed_data'; + +import 'package:async/async.dart'; +import 'package:stream_channel/stream_channel.dart'; +import 'package:web_socket/web_socket.dart'; + +import 'src/channel.dart'; +import 'src/exception.dart'; + +/// A [WebSocketChannel] implemented using [WebSocket]. +class AdapterWebSocketChannel extends StreamChannelMixin + implements WebSocketChannel { + @override + String? get protocol => _protocol; + String? _protocol; + + @override + int? get closeCode => _closeCode; + int? _closeCode; + + @override + String? get closeReason => _closeReason; + String? _closeReason; + + /// The close code set by the local user. + /// + /// To ensure proper ordering, this is stored until we get a done event on + /// [_controller.local.stream]. + int? _localCloseCode; + + /// The close reason set by the local user. + /// + /// To ensure proper ordering, this is stored until we get a done event on + /// [_controller.local.stream]. + String? _localCloseReason; + + /// Completer for [ready]. + final _readyCompleter = Completer<void>(); + + @override + Future<void> get ready => _readyCompleter.future; + + @override + Stream get stream => _controller.foreign.stream; + + final _controller = + StreamChannelController<Object?>(sync: true, allowForeignErrors: false); + + @override + late final WebSocketSink sink = _WebSocketSink(this); + + /// Creates a new WebSocket connection. + /// + /// If provided, the [protocols] argument indicates that subprotocols that + /// the peer is able to select. See + /// [RFC-6455 1.9](https://datatracker.ietf.org/doc/html/rfc6455#section-1.9). + /// + /// After construction, the [AdapterWebSocketChannel] may not be + /// connected to the peer. The [ready] future will complete after the channel + /// is connected. If there are errors creating the connection the [ready] + /// future will complete with an error. + factory AdapterWebSocketChannel.connect(Uri url, + {Iterable<String>? protocols}) => + AdapterWebSocketChannel(WebSocket.connect(url, protocols: protocols)); + + // Construct a [WebSocketWebSocketChannelAdapter] from an existing + // [WebSocket]. + AdapterWebSocketChannel(FutureOr<WebSocket> webSocket) { + Future<WebSocket> webSocketFuture; + if (webSocket is WebSocket) { + webSocketFuture = Future.value(webSocket); + } else { + webSocketFuture = webSocket; + } + + webSocketFuture.then((webSocket) { + var remoteClosed = false; + webSocket.events.listen((event) { + switch (event) { + case TextDataReceived(text: final text): + _controller.local.sink.add(text); + case BinaryDataReceived(data: final data): + _controller.local.sink.add(data); + case CloseReceived(code: final code, reason: final reason): + remoteClosed = true; + _closeCode = code; + _closeReason = reason; + _controller.local.sink.close(); + } + }); + _controller.local.stream.listen((obj) { + try { + switch (obj) { + case final String s: + webSocket.sendText(s); + case final Uint8List b: + webSocket.sendBytes(b); + case final List<int> b: + webSocket.sendBytes(Uint8List.fromList(b)); + default: + throw UnsupportedError('Cannot send ${obj.runtimeType}'); + } + } on WebSocketConnectionClosed catch (_) { + // There is nowhere to surface this error; `_controller.local.sink` + // has already been closed. + } + }, onDone: () { + if (!remoteClosed) { + webSocket.close(_localCloseCode, _localCloseReason); + } + }); + _protocol = webSocket.protocol; + _readyCompleter.complete(); + }, onError: (Object e) { + Exception error; + if (e is TimeoutException) { + // Required for backwards compatibility with `IOWebSocketChannel`. + error = e; + } else { + error = WebSocketChannelException.from(e); + } + _readyCompleter.completeError(error); + _controller.local.sink.addError(error); + _controller.local.sink.close(); + }); + } +} + +/// A [WebSocketSink] that tracks the close code and reason passed to [close]. +class _WebSocketSink extends DelegatingStreamSink implements WebSocketSink { + /// The channel to which this sink belongs. + final AdapterWebSocketChannel _channel; + + _WebSocketSink(AdapterWebSocketChannel channel) + : _channel = channel, + super(channel._controller.foreign.sink); + + @override + Future close([int? closeCode, String? closeReason]) { + _channel._localCloseCode = closeCode; + _channel._localCloseReason = closeReason; + return super.close(); + } +}
diff --git a/lib/io.dart b/lib/io.dart index 9f71d81..ff10d1a 100644 --- a/lib/io.dart +++ b/lib/io.dart
@@ -3,51 +3,16 @@ // BSD-style license that can be found in the LICENSE file. import 'dart:async'; -import 'dart:io'; +import 'dart:io' show HttpClient, WebSocket; -import 'package:async/async.dart'; -import 'package:stream_channel/stream_channel.dart'; +import 'package:web_socket/io_web_socket.dart' show IOWebSocket; +import 'adapter_web_socket_channel.dart'; import 'src/channel.dart'; import 'src/exception.dart'; -import 'src/sink_completer.dart'; /// A [WebSocketChannel] that communicates using a `dart:io` [WebSocket]. -class IOWebSocketChannel extends StreamChannelMixin - implements WebSocketChannel { - /// The underlying `dart:io` [WebSocket]. - /// - /// If the channel was constructed with [IOWebSocketChannel.connect], this is - /// `null` until the [WebSocket.connect] future completes. - WebSocket? _webSocket; - - @override - String? get protocol => _webSocket?.protocol; - - @override - int? get closeCode => _webSocket?.closeCode; - - @override - String? get closeReason => _webSocket?.closeReason; - - @override - final Stream stream; - - @override - final WebSocketSink sink; - - /// Completer for [ready]. - final Completer<void> _readyCompleter; - - @override - Future<void> get ready => _readyCompleter.future; - - /// The underlying [WebSocket], if this channel has connected. - /// - /// If the future returned from [WebSocket.connect] has not yet completed, or - /// completed as an error, this will be null. - WebSocket? get innerWebSocket => _webSocket; - +class IOWebSocketChannel extends AdapterWebSocketChannel { /// Creates a new WebSocket connection. /// /// Connects to [url] using [WebSocket.connect] and returns a channel that can @@ -76,58 +41,23 @@ Duration? connectTimeout, HttpClient? customClient, }) { - late IOWebSocketChannel channel; - final sinkCompleter = WebSocketSinkCompleter(); - var future = WebSocket.connect( + var webSocketFuture = WebSocket.connect( url.toString(), headers: headers, protocols: protocols, customClient: customClient, - ); + ).then((webSocket) => webSocket..pingInterval = pingInterval); + if (connectTimeout != null) { - future = future.timeout(connectTimeout); + webSocketFuture = webSocketFuture.timeout(connectTimeout); } - final stream = StreamCompleter.fromFuture(future.then((webSocket) { - webSocket.pingInterval = pingInterval; - channel._webSocket = webSocket; - channel._readyCompleter.complete(); - sinkCompleter.setDestinationSink(_IOWebSocketSink(webSocket)); - return webSocket; - }).catchError((Object error, StackTrace stackTrace) { - channel._readyCompleter.completeError(error, stackTrace); - throw WebSocketChannelException.from(error); - })); - return channel = - IOWebSocketChannel._withoutSocket(stream, sinkCompleter.sink); + + return IOWebSocketChannel(webSocketFuture); } - /// Creates a channel wrapping [socket]. - IOWebSocketChannel(WebSocket socket) - : _webSocket = socket, - stream = socket.handleError( - (Object? error) => throw WebSocketChannelException.from(error)), - sink = _IOWebSocketSink(socket), - _readyCompleter = Completer()..complete(); - - /// Creates a channel without a socket. - /// - /// This is used with `connect` to synchronously provide a channel that later - /// has a socket added. - IOWebSocketChannel._withoutSocket(Stream stream, this.sink) - : _webSocket = null, - stream = stream.handleError( - (Object? error) => throw WebSocketChannelException.from(error)), - _readyCompleter = Completer(); -} - -/// A [WebSocketSink] that forwards [close] calls to a `dart:io` [WebSocket]. -class _IOWebSocketSink extends DelegatingStreamSink implements WebSocketSink { - /// The underlying socket. - final WebSocket _webSocket; - - _IOWebSocketSink(WebSocket super.webSocket) : _webSocket = webSocket; - - @override - Future close([int? closeCode, String? closeReason]) => - _webSocket.close(closeCode, closeReason); + /// Creates a channel wrapping [webSocket]. + IOWebSocketChannel(FutureOr<WebSocket> webSocket) + : super(webSocket is Future<WebSocket> + ? webSocket.then(IOWebSocket.fromWebSocket) as FutureOr<IOWebSocket> + : IOWebSocket.fromWebSocket(webSocket)); }
diff --git a/lib/src/_connect_api.dart b/lib/src/_connect_api.dart deleted file mode 100644 index c58910e..0000000 --- a/lib/src/_connect_api.dart +++ /dev/null
@@ -1,15 +0,0 @@ -// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -import '../web_socket_channel.dart'; - -/// Creates a new WebSocket connection. -/// -/// Connects to [uri] using and returns a channel that can be used to -/// communicate over the resulting socket. -/// -/// The optional [protocols] parameter is the same as `WebSocket.connect`. -WebSocketChannel connect(Uri uri, {Iterable<String>? protocols}) { - throw UnsupportedError('No implementation of the connect api provided'); -}
diff --git a/lib/src/_connect_html.dart b/lib/src/_connect_html.dart deleted file mode 100644 index e725d1b..0000000 --- a/lib/src/_connect_html.dart +++ /dev/null
@@ -1,16 +0,0 @@ -// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -import '../html.dart'; - -import '../web_socket_channel.dart'; - -/// Creates a new WebSocket connection. -/// -/// Connects to [uri] using and returns a channel that can be used to -/// communicate over the resulting socket. -/// -/// The optional [protocols] parameter is the same as `WebSocket.connect`. -WebSocketChannel connect(Uri uri, {Iterable<String>? protocols}) => - HtmlWebSocketChannel.connect(uri, protocols: protocols);
diff --git a/lib/src/_connect_io.dart b/lib/src/_connect_io.dart deleted file mode 100644 index a7a82c4..0000000 --- a/lib/src/_connect_io.dart +++ /dev/null
@@ -1,15 +0,0 @@ -// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -import '../io.dart'; -import '../web_socket_channel.dart'; - -/// Creates a new WebSocket connection. -/// -/// Connects to [uri] using and returns a channel that can be used to -/// communicate over the resulting socket. -/// -/// The optional [protocols] parameter is the same as `WebSocket.connect`. -WebSocketChannel connect(Uri uri, {Iterable<String>? protocols}) => - IOWebSocketChannel.connect(uri, protocols: protocols);
diff --git a/lib/src/channel.dart b/lib/src/channel.dart index 4143128..f8a560e 100644 --- a/lib/src/channel.dart +++ b/lib/src/channel.dart
@@ -9,12 +9,11 @@ import 'package:crypto/crypto.dart'; import 'package:stream_channel/stream_channel.dart'; -import '_connect_api.dart' - if (dart.library.io) '_connect_io.dart' - if (dart.library.js_interop) '_connect_html.dart' as platform; -import 'copy/web_socket_impl.dart'; +import '../adapter_web_socket_channel.dart'; import 'exception.dart'; +const String _webSocketGUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; + /// A [StreamChannel] that communicates over a WebSocket. /// /// This is implemented by classes that use `dart:io` and `dart:html`. @@ -23,33 +22,27 @@ /// /// All implementations emit [WebSocketChannelException]s. These exceptions wrap /// the native exception types where possible. -class WebSocketChannel extends StreamChannelMixin { - /// The underlying web socket. - /// - /// This is essentially a copy of `dart:io`'s WebSocket implementation, with - /// the IO-specific pieces factored out. - final WebSocketImpl _webSocket; - +abstract interface class WebSocketChannel extends StreamChannelMixin { /// The subprotocol selected by the server. /// /// For a client socket, this is initially `null`. After the WebSocket /// connection is established the value is set to the subprotocol selected by /// the server. If no subprotocol is negotiated the value will remain `null`. - String? get protocol => _webSocket.protocol; + String? get protocol; /// The [close code][] set when the WebSocket connection is closed. /// /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5 /// /// Before the connection has been closed, this will be `null`. - int? get closeCode => _webSocket.closeCode; + int? get closeCode; /// The [close reason][] set when the WebSocket connection is closed. /// /// [close reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6 /// /// Before the connection has been closed, this will be `null`. - String? get closeReason => _webSocket.closeReason; + String? get closeReason; /// A future that will complete when the WebSocket connection has been /// established. @@ -76,17 +69,14 @@ /// // send data. /// channel.sink.add('Hello World'); /// ``` - final Future<void> ready = Future.value(); - - @override - Stream get stream => StreamView(_webSocket); + Future<void> get ready; /// The sink for sending values to the other endpoint. /// /// This supports additional arguments to [WebSocketSink.close] that provide /// the remote endpoint reasons for closing the connection. @override - WebSocketSink get sink => WebSocketSink._(_webSocket); + WebSocketSink get sink; /// Signs a `Sec-WebSocket-Key` header sent by a WebSocket client as part of /// the [initial handshake][]. @@ -100,33 +90,7 @@ // [key] is expected to be base64 encoded, and so will be pure ASCII. => convert.base64 - .encode(sha1.convert((key + webSocketGUID).codeUnits).bytes); - - /// Creates a new WebSocket handling messaging across an existing [channel]. - /// - /// This is a cross-platform constructor; it doesn't use either `dart:io` or - /// `dart:html`. It's also HTTP-API-agnostic, which means that the initial - /// [WebSocket handshake][] must have already been completed on the socket - /// before this is called. - /// - /// [protocol] should be the protocol negotiated by this handshake, if any. - /// - /// [pingInterval] controls the interval for sending ping signals. If a ping - /// message is not answered by a pong message from the peer, the WebSocket is - /// assumed disconnected and the connection is closed with a `goingAway` close - /// code. When a ping signal is sent, the pong message must be received within - /// [pingInterval]. It defaults to `null`, indicating that ping messages are - /// disabled. - /// - /// If this is a WebSocket server, [serverSide] should be `true` (the - /// default); if it's a client, [serverSide] should be `false`. - /// - /// [WebSocket handshake]: https://tools.ietf.org/html/rfc6455#section-4 - WebSocketChannel(StreamChannel<List<int>> channel, - {String? protocol, Duration? pingInterval, bool serverSide = true}) - : _webSocket = WebSocketImpl.fromSocket( - channel.stream, channel.sink, protocol, serverSide) - ..pingInterval = pingInterval; + .encode(sha1.convert((key + _webSocketGUID).codeUnits).bytes); /// Creates a new WebSocket connection. /// @@ -140,19 +104,15 @@ /// The [ready] future will complete after the channel is connected. /// If there are errors creating the connection the [ready] future will /// complete with an error. - factory WebSocketChannel.connect(Uri uri, {Iterable<String>? protocols}) => - platform.connect(uri, protocols: protocols); + static WebSocketChannel connect(Uri uri, {Iterable<String>? protocols}) => + AdapterWebSocketChannel.connect(uri, protocols: protocols); } /// The sink exposed by a [WebSocketChannel]. /// /// This is like a normal [StreamSink], except that it supports extra arguments /// to [close]. -class WebSocketSink extends DelegatingStreamSink { - final WebSocketImpl _webSocket; - - WebSocketSink._(WebSocketImpl super.webSocket) : _webSocket = webSocket; - +abstract interface class WebSocketSink implements DelegatingStreamSink { /// Closes the web socket connection. /// /// [closeCode] and [closeReason] are the [close code][] and [reason][] sent @@ -162,6 +122,5 @@ /// [close code]: https://tools.ietf.org/html/rfc6455#section-7.1.5 /// [reason]: https://tools.ietf.org/html/rfc6455#section-7.1.6 @override - Future close([int? closeCode, String? closeReason]) => - _webSocket.close(closeCode, closeReason); + Future close([int? closeCode, String? closeReason]); }
diff --git a/lib/src/copy/io_sink.dart b/lib/src/copy/io_sink.dart deleted file mode 100644 index 6dfe7a0..0000000 --- a/lib/src/copy/io_sink.dart +++ /dev/null
@@ -1,152 +0,0 @@ -// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -// The following code is copied from sdk/lib/io/io_sink.dart. The "dart:io" -// implementation isn't used directly to support non-"dart:io" applications. -// -// Because it's copied directly, only modifications necessary to support the -// desired public API and to remove "dart:io" dependencies have been made. -// -// This is up-to-date as of sdk revision -// 365f7b5a8b6ef900a5ee23913b7203569b81b175. - -import 'dart:async'; - -class StreamSinkImpl<T> implements StreamSink<T> { - final StreamConsumer<T> _target; - final Completer _doneCompleter = Completer(); - StreamController<T>? _controllerInstance; - Completer? _controllerCompleter; - bool _isClosed = false; - bool _isBound = false; - bool _hasError = false; - - StreamSinkImpl(this._target); - - // The _reportClosedSink method has been deleted for web_socket_channel. This - // method did nothing but print to stderr, which is unavailable here. - - @override - void add(T data) { - if (_isClosed) { - return; - } - _controller.add(data); - } - - @override - void addError(Object error, [StackTrace? stackTrace]) { - if (_isClosed) { - return; - } - _controller.addError(error, stackTrace); - } - - @override - Future addStream(Stream<T> stream) { - if (_isBound) { - throw StateError('StreamSink is already bound to a stream'); - } - if (_hasError) return done; - - _isBound = true; - final future = _controllerCompleter == null - ? _target.addStream(stream) - : _controllerCompleter!.future.then((_) => _target.addStream(stream)); - _controllerInstance?.close(); - - // Wait for any pending events in [_controller] to be dispatched before - // adding [stream]. - return future.whenComplete(() { - _isBound = false; - }); - } - - Future flush() { - if (_isBound) { - throw StateError('StreamSink is bound to a stream'); - } - if (_controllerInstance == null) return Future.value(this); - // Adding an empty stream-controller will return a future that will complete - // when all data is done. - _isBound = true; - final future = _controllerCompleter!.future; - _controllerInstance!.close(); - return future.whenComplete(() { - _isBound = false; - }); - } - - @override - Future close() { - if (_isBound) { - throw StateError('StreamSink is bound to a stream'); - } - if (!_isClosed) { - _isClosed = true; - if (_controllerInstance != null) { - _controllerInstance!.close(); - } else { - _closeTarget(); - } - } - return done; - } - - void _closeTarget() { - _target.close().then(_completeDoneValue, onError: _completeDoneError); - } - - @override - Future get done => _doneCompleter.future; - - void _completeDoneValue(Object? value) { - if (!_doneCompleter.isCompleted) { - _doneCompleter.complete(value); - } - } - - void _completeDoneError(Object error, StackTrace stackTrace) { - if (!_doneCompleter.isCompleted) { - _hasError = true; - _doneCompleter.completeError(error, stackTrace); - } - } - - StreamController<T> get _controller { - if (_isBound) { - throw StateError('StreamSink is bound to a stream'); - } - if (_isClosed) { - throw StateError('StreamSink is closed'); - } - if (_controllerInstance == null) { - _controllerInstance = StreamController<T>(sync: true); - _controllerCompleter = Completer(); - _target.addStream(_controller.stream).then((_) { - if (_isBound) { - // A new stream takes over - forward values to that stream. - _controllerCompleter!.complete(this); - _controllerCompleter = null; - _controllerInstance = null; - } else { - // No new stream, .close was called. Close _target. - _closeTarget(); - } - }, onError: (Object error, StackTrace stackTrace) { - if (_isBound) { - // A new stream takes over - forward errors to that stream. - _controllerCompleter!.completeError(error, stackTrace); - _controllerCompleter = null; - _controllerInstance = null; - } else { - // No new stream. No need to close target, as it has already - // failed. - _completeDoneError(error, stackTrace); - } - }); - } - return _controllerInstance!; - } -}
diff --git a/lib/src/copy/web_socket.dart b/lib/src/copy/web_socket.dart deleted file mode 100644 index 0bfb498..0000000 --- a/lib/src/copy/web_socket.dart +++ /dev/null
@@ -1,39 +0,0 @@ -// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -// The following code is copied from sdk/lib/io/websocket.dart. The "dart:io" -// implementation isn't used directly to support non-"dart:io" applications. -// -// Because it's copied directly, only modifications necessary to support the -// desired public API and to remove "dart:io" dependencies have been made. -// -// This is up-to-date as of sdk revision -// 365f7b5a8b6ef900a5ee23913b7203569b81b175. - -// ignore_for_file: constant_identifier_names - -/// Web socket status codes used when closing a web socket connection. -abstract class WebSocketStatus { - static const int NORMAL_CLOSURE = 1000; - static const int GOING_AWAY = 1001; - static const int PROTOCOL_ERROR = 1002; - static const int UNSUPPORTED_DATA = 1003; - static const int RESERVED_1004 = 1004; - static const int NO_STATUS_RECEIVED = 1005; - static const int ABNORMAL_CLOSURE = 1006; - static const int INVALID_FRAME_PAYLOAD_DATA = 1007; - static const int POLICY_VIOLATION = 1008; - static const int MESSAGE_TOO_BIG = 1009; - static const int MISSING_MANDATORY_EXTENSION = 1010; - static const int INTERNAL_SERVER_ERROR = 1011; - static const int RESERVED_1015 = 1015; -} - -abstract class WebSocket { - /// Possible states of the connection. - static const int CONNECTING = 0; - static const int OPEN = 1; - static const int CLOSING = 2; - static const int CLOSED = 3; -}
diff --git a/lib/src/copy/web_socket_impl.dart b/lib/src/copy/web_socket_impl.dart deleted file mode 100644 index cf74bb2..0000000 --- a/lib/src/copy/web_socket_impl.dart +++ /dev/null
@@ -1,895 +0,0 @@ -// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -// The following code is copied from sdk/lib/io/websocket_impl.dart. The -// "dart:io" implementation isn't used directly to support non-"dart:io" -// applications. -// -// Because it's copied directly, only modifications necessary to support the -// desired public API and to remove "dart:io" dependencies have been made. -// -// This is up-to-date as of sdk revision -// 365f7b5a8b6ef900a5ee23913b7203569b81b175. - -// ignore_for_file: unused_field, constant_identifier_names - -import 'dart:async'; -import 'dart:convert'; -import 'dart:math'; -import 'dart:typed_data'; - -import '../exception.dart'; -import 'io_sink.dart'; -import 'web_socket.dart'; - -const String webSocketGUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; - -final _random = Random.secure(); - -// Matches _WebSocketOpcode. -class _WebSocketMessageType { - static const int NONE = 0; - static const int TEXT = 1; - static const int BINARY = 2; -} - -class _WebSocketOpcode { - static const int CONTINUATION = 0; - static const int TEXT = 1; - static const int BINARY = 2; - static const int RESERVED_3 = 3; - static const int RESERVED_4 = 4; - static const int RESERVED_5 = 5; - static const int RESERVED_6 = 6; - static const int RESERVED_7 = 7; - static const int CLOSE = 8; - static const int PING = 9; - static const int PONG = 10; - static const int RESERVED_B = 11; - static const int RESERVED_C = 12; - static const int RESERVED_D = 13; - static const int RESERVED_E = 14; - static const int RESERVED_F = 15; -} - -/// The web socket protocol transformer handles the protocol byte stream -/// which is supplied through the `handleData`. As the protocol is processed, -/// it'll output frame data as either a List<int> or String. -/// -/// Important information about usage: Be sure you use cancelOnError, so the -/// socket will be closed when the processor encounter an error. Not using it -/// will lead to undefined behaviour. -class _WebSocketProtocolTransformer extends StreamTransformerBase<List<int>, - dynamic /*List<int>|_WebSocketPing|_WebSocketPong*/ > - implements EventSink<List<int>> { - static const int START = 0; - static const int LEN_FIRST = 1; - static const int LEN_REST = 2; - static const int MASK = 3; - static const int PAYLOAD = 4; - static const int CLOSED = 5; - static const int FAILURE = 6; - static const int FIN = 0x80; - static const int RSV1 = 0x40; - static const int RSV2 = 0x20; - static const int RSV3 = 0x10; - static const int OPCODE = 0xF; - - int _state = START; - bool _fin = false; - int _opcode = -1; - int _len = -1; - bool _masked = false; - int _remainingLenBytes = -1; - int _remainingMaskingKeyBytes = 4; - int _remainingPayloadBytes = -1; - int _unmaskingIndex = 0; - int _currentMessageType = _WebSocketMessageType.NONE; - int closeCode = WebSocketStatus.NO_STATUS_RECEIVED; - String closeReason = ''; - - EventSink<dynamic /*List<int>|_WebSocketPing|_WebSocketPong*/ >? _eventSink; - - final bool _serverSide; - final List<int> _maskingBytes = List.filled(4, 0); - final BytesBuilder _payload = BytesBuilder(copy: false); - - _WebSocketProtocolTransformer([this._serverSide = false]); - - @override - Stream<dynamic /*List<int>|_WebSocketPing|_WebSocketPong*/ > bind( - Stream<List<int>> stream) => - Stream.eventTransformed(stream, (EventSink eventSink) { - if (_eventSink != null) { - throw StateError('WebSocket transformer already used.'); - } - _eventSink = eventSink; - return this; - }); - - @override - void addError(Object error, [StackTrace? stackTrace]) { - _eventSink!.addError(error, stackTrace); - } - - @override - void close() { - _eventSink!.close(); - } - - /// Process data received from the underlying communication channel. - @override - void add(List<int> bytes) { - final buffer = bytes is Uint8List ? bytes : Uint8List.fromList(bytes); - var index = 0; - final lastIndex = buffer.length; - if (_state == CLOSED) { - throw WebSocketChannelException('Data on closed connection'); - } - if (_state == FAILURE) { - throw WebSocketChannelException('Data on failed connection'); - } - while ((index < lastIndex) && _state != CLOSED && _state != FAILURE) { - final byte = buffer[index]; - if (_state <= LEN_REST) { - if (_state == START) { - _fin = (byte & FIN) != 0; - - if ((byte & (RSV2 | RSV3)) != 0) { - // The RSV2, RSV3 bits must both be zero. - throw WebSocketChannelException('Protocol error'); - } - - _opcode = byte & OPCODE; - - if (_opcode <= _WebSocketOpcode.BINARY) { - if (_opcode == _WebSocketOpcode.CONTINUATION) { - if (_currentMessageType == _WebSocketMessageType.NONE) { - throw WebSocketChannelException('Protocol error'); - } - } else { - assert(_opcode == _WebSocketOpcode.TEXT || - _opcode == _WebSocketOpcode.BINARY); - if (_currentMessageType != _WebSocketMessageType.NONE) { - throw WebSocketChannelException('Protocol error'); - } - _currentMessageType = _opcode; - } - } else if (_opcode >= _WebSocketOpcode.CLOSE && - _opcode <= _WebSocketOpcode.PONG) { - // Control frames cannot be fragmented. - if (!_fin) throw WebSocketChannelException('Protocol error'); - } else { - throw WebSocketChannelException('Protocol error'); - } - _state = LEN_FIRST; - } else if (_state == LEN_FIRST) { - _masked = (byte & 0x80) != 0; - _len = byte & 0x7F; - if (_isControlFrame() && _len > 125) { - throw WebSocketChannelException('Protocol error'); - } - if (_len == 126) { - _len = 0; - _remainingLenBytes = 2; - _state = LEN_REST; - } else if (_len == 127) { - _len = 0; - _remainingLenBytes = 8; - _state = LEN_REST; - } else { - assert(_len < 126); - _lengthDone(); - } - } else { - assert(_state == LEN_REST); - _len = _len << 8 | byte; - _remainingLenBytes--; - if (_remainingLenBytes == 0) { - _lengthDone(); - } - } - } else { - if (_state == MASK) { - _maskingBytes[4 - _remainingMaskingKeyBytes--] = byte; - if (_remainingMaskingKeyBytes == 0) { - _maskDone(); - } - } else { - assert(_state == PAYLOAD); - // The payload is not handled one byte at a time but in blocks. - final payloadLength = min(lastIndex - index, _remainingPayloadBytes); - _remainingPayloadBytes -= payloadLength; - // Unmask payload if masked. - if (_masked) { - _unmask(index, payloadLength, buffer); - } - // Control frame and data frame share _payloads. - _payload.add(Uint8List.view(buffer.buffer, index, payloadLength)); - index += payloadLength; - if (_isControlFrame()) { - if (_remainingPayloadBytes == 0) _controlFrameEnd(); - } else { - if (_currentMessageType != _WebSocketMessageType.TEXT && - _currentMessageType != _WebSocketMessageType.BINARY) { - throw WebSocketChannelException('Protocol error'); - } - if (_remainingPayloadBytes == 0) _messageFrameEnd(); - } - - // Hack - as we always do index++ below. - index--; - } - } - - // Move to the next byte. - index++; - } - } - - void _unmask(int index, int length, Uint8List buffer) { - const BLOCK_SIZE = 16; - // Skip Int32x4-version if message is small. - if (length >= BLOCK_SIZE) { - // Start by aligning to 16 bytes. - final startOffset = BLOCK_SIZE - (index & 15); - final end = index + startOffset; - for (var i = index; i < end; i++) { - buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; - } - index += startOffset; - length -= startOffset; - final blockCount = length ~/ BLOCK_SIZE; - if (blockCount > 0) { - // Create mask block. - var mask = 0; - for (var i = 3; i >= 0; i--) { - mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3]; - } - final blockMask = Int32x4(mask, mask, mask, mask); - final blockBuffer = Int32x4List.view(buffer.buffer, index, blockCount); - for (var i = 0; i < blockBuffer.length; i++) { - blockBuffer[i] ^= blockMask; - } - final bytes = blockCount * BLOCK_SIZE; - index += bytes; - length -= bytes; - } - } - // Handle end. - final end = index + length; - for (var i = index; i < end; i++) { - buffer[i] ^= _maskingBytes[_unmaskingIndex++ & 3]; - } - } - - void _lengthDone() { - if (_masked) { - if (!_serverSide) { - throw WebSocketChannelException('Received masked frame from server'); - } - _state = MASK; - } else { - if (_serverSide) { - throw WebSocketChannelException('Received unmasked frame from client'); - } - _remainingPayloadBytes = _len; - _startPayload(); - } - } - - void _maskDone() { - _remainingPayloadBytes = _len; - _startPayload(); - } - - void _startPayload() { - // If there is no actual payload perform perform callbacks without - // going through the PAYLOAD state. - if (_remainingPayloadBytes == 0) { - if (_isControlFrame()) { - switch (_opcode) { - case _WebSocketOpcode.CLOSE: - _state = CLOSED; - _eventSink!.close(); - break; - case _WebSocketOpcode.PING: - _eventSink!.add(_WebSocketPing()); - break; - case _WebSocketOpcode.PONG: - _eventSink!.add(_WebSocketPong()); - break; - } - _prepareForNextFrame(); - } else { - _messageFrameEnd(); - } - } else { - _state = PAYLOAD; - } - } - - void _messageFrameEnd() { - if (_fin) { - final bytes = _payload.takeBytes(); - - switch (_currentMessageType) { - case _WebSocketMessageType.TEXT: - _eventSink!.add(utf8.decode(bytes)); - break; - case _WebSocketMessageType.BINARY: - _eventSink!.add(bytes); - break; - } - _currentMessageType = _WebSocketMessageType.NONE; - } - _prepareForNextFrame(); - } - - void _controlFrameEnd() { - switch (_opcode) { - case _WebSocketOpcode.CLOSE: - closeCode = WebSocketStatus.NO_STATUS_RECEIVED; - final payload = _payload.takeBytes(); - if (payload.isNotEmpty) { - if (payload.length == 1) { - throw WebSocketChannelException('Protocol error'); - } - closeCode = payload[0] << 8 | payload[1]; - if (closeCode == WebSocketStatus.NO_STATUS_RECEIVED) { - throw WebSocketChannelException('Protocol error'); - } - if (payload.length > 2) { - closeReason = utf8.decode(payload.sublist(2)); - } - } - _state = CLOSED; - _eventSink!.close(); - break; - - case _WebSocketOpcode.PING: - _eventSink!.add(_WebSocketPing(_payload.takeBytes())); - break; - - case _WebSocketOpcode.PONG: - _eventSink!.add(_WebSocketPong(_payload.takeBytes())); - break; - } - _prepareForNextFrame(); - } - - bool _isControlFrame() => - _opcode == _WebSocketOpcode.CLOSE || - _opcode == _WebSocketOpcode.PING || - _opcode == _WebSocketOpcode.PONG; - - void _prepareForNextFrame() { - if (_state != CLOSED && _state != FAILURE) _state = START; - _fin = false; - _opcode = -1; - _len = -1; - _remainingLenBytes = -1; - _remainingMaskingKeyBytes = 4; - _remainingPayloadBytes = -1; - _unmaskingIndex = 0; - } -} - -class _WebSocketPing { - final List<int>? payload; - - _WebSocketPing([this.payload]); -} - -class _WebSocketPong { - final List<int>? payload; - - _WebSocketPong([this.payload]); -} - -// TODO(ajohnsen): Make this transformer reusable. -class _WebSocketOutgoingTransformer - extends StreamTransformerBase<dynamic, List<int>> implements EventSink { - final WebSocketImpl webSocket; - EventSink<List<int>>? _eventSink; - - _WebSocketOutgoingTransformer(this.webSocket); - - @override - Stream<List<int>> bind(Stream stream) => - Stream<List<int>>.eventTransformed(stream, - (EventSink<List<int>> eventSink) { - if (_eventSink != null) { - throw StateError('WebSocket transformer already used'); - } - _eventSink = eventSink; - return this; - }); - - @override - void add(Object? message) { - if (message is _WebSocketPong) { - addFrame(_WebSocketOpcode.PONG, message.payload); - return; - } - if (message is _WebSocketPing) { - addFrame(_WebSocketOpcode.PING, message.payload); - return; - } - List<int>? data; - int opcode; - if (message != null) { - if (message is String) { - opcode = _WebSocketOpcode.TEXT; - data = utf8.encode(message); - } else if (message is List<int>) { - opcode = _WebSocketOpcode.BINARY; - data = message; - } else { - throw ArgumentError(message); - } - } else { - opcode = _WebSocketOpcode.TEXT; - } - addFrame(opcode, data); - } - - @override - void addError(Object error, [StackTrace? stackTrace]) { - _eventSink!.addError(error, stackTrace); - } - - @override - void close() { - final code = webSocket._outCloseCode; - final reason = webSocket._outCloseReason; - List<int>? data; - if (code != null) { - data = <int>[]; - data.add((code >> 8) & 0xFF); - data.add(code & 0xFF); - if (reason != null) { - data.addAll(utf8.encode(reason)); - } - } - addFrame(_WebSocketOpcode.CLOSE, data); - _eventSink!.close(); - } - - void addFrame(int opcode, List<int>? data) { - createFrame( - opcode, - data, - webSocket._serverSide, - // Logic around _deflateHelper was removed here, since there will - // never be a deflate helper for a cross-platform WebSocket client. - false) - .forEach((e) { - _eventSink!.add(e); - }); - } - - static Iterable<List<int>> createFrame( - int opcode, List<int>? data, bool serverSide, bool compressed) { - final mask = !serverSide; // Masking not implemented for server. - final dataLength = data == null ? 0 : data.length; - // Determine the header size. - var headerSize = mask ? 6 : 2; - if (dataLength > 65535) { - headerSize += 8; - } else if (dataLength > 125) { - headerSize += 2; - } - final header = Uint8List(headerSize); - var index = 0; - - // Set FIN and opcode. - final hoc = _WebSocketProtocolTransformer.FIN | - (compressed ? _WebSocketProtocolTransformer.RSV1 : 0) | - (opcode & _WebSocketProtocolTransformer.OPCODE); - - header[index++] = hoc; - // Determine size and position of length field. - var lengthBytes = 1; - if (dataLength > 65535) { - header[index++] = 127; - lengthBytes = 8; - } else if (dataLength > 125) { - header[index++] = 126; - lengthBytes = 2; - } - // Write the length in network byte order into the header. - for (var i = 0; i < lengthBytes; i++) { - header[index++] = dataLength >> (((lengthBytes - 1) - i) * 8) & 0xFF; - } - if (mask) { - header[1] |= 1 << 7; - final maskBytes = [ - _random.nextInt(256), - _random.nextInt(256), - _random.nextInt(256), - _random.nextInt(256) - ]; - header.setRange(index, index + 4, maskBytes); - index += 4; - if (data != null) { - Uint8List list; - // If this is a text message just do the masking inside the - // encoded data. - if (opcode == _WebSocketOpcode.TEXT && data is Uint8List) { - list = data; - } else { - if (data is Uint8List) { - list = Uint8List.fromList(data); - } else { - list = Uint8List(data.length); - for (var i = 0; i < data.length; i++) { - if (data[i] < 0 || 255 < data[i]) { - throw ArgumentError('List element is not a byte value ' - '(value ${data[i]} at index $i)'); - } - list[i] = data[i]; - } - } - } - const BLOCK_SIZE = 16; - final blockCount = list.length ~/ BLOCK_SIZE; - if (blockCount > 0) { - // Create mask block. - var mask = 0; - for (var i = 3; i >= 0; i--) { - mask = (mask << 8) | maskBytes[i]; - } - final blockMask = Int32x4(mask, mask, mask, mask); - final blockBuffer = Int32x4List.view(list.buffer, 0, blockCount); - for (var i = 0; i < blockBuffer.length; i++) { - blockBuffer[i] ^= blockMask; - } - } - // Handle end. - for (var i = blockCount * BLOCK_SIZE; i < list.length; i++) { - list[i] ^= maskBytes[i & 3]; - } - data = list; - } - } - assert(index == headerSize); - if (data == null) { - return [header]; - } else { - return [header, data]; - } - } -} - -class _WebSocketConsumer implements StreamConsumer { - final WebSocketImpl webSocket; - final StreamSink<List<int>> sink; - StreamController? _controller; - - // ignore: cancel_subscriptions - StreamSubscription? _subscription; - bool _issuedPause = false; - bool _closed = false; - final Completer _closeCompleter = Completer<WebSocketImpl>(); - Completer<WebSocketImpl>? _completer; - - _WebSocketConsumer(this.webSocket, this.sink); - - void _onListen() { - if (_subscription != null) { - _subscription!.cancel(); - } - } - - void _onPause() { - if (_subscription != null) { - _subscription!.pause(); - } else { - _issuedPause = true; - } - } - - void _onResume() { - if (_subscription != null) { - _subscription!.resume(); - } else { - _issuedPause = false; - } - } - - void _cancel() { - if (_subscription != null) { - final subscription = _subscription; - _subscription = null; - subscription!.cancel(); - } - } - - void _ensureController() { - if (_controller != null) return; - _controller = StreamController( - sync: true, - onPause: _onPause, - onResume: _onResume, - onCancel: _onListen); - final stream = - _WebSocketOutgoingTransformer(webSocket).bind(_controller!.stream); - sink.addStream(stream).then((_) { - _done(); - _closeCompleter.complete(webSocket); - }, onError: (Object error, StackTrace stackTrace) { - _closed = true; - _cancel(); - if (error is ArgumentError) { - if (!_done(error, stackTrace)) { - _closeCompleter.completeError(error, stackTrace); - } - } else { - _done(); - _closeCompleter.complete(webSocket); - } - }); - } - - bool _done([Object? error, StackTrace? stackTrace]) { - if (_completer == null) return false; - if (error != null) { - _completer!.completeError(error, stackTrace); - } else { - _completer!.complete(webSocket); - } - _completer = null; - return true; - } - - @override - Future addStream(Stream stream) { - if (_closed) { - stream.listen(null).cancel(); - return Future.value(webSocket); - } - _ensureController(); - _completer = Completer(); - _subscription = stream.listen((data) { - _controller!.add(data); - }, onDone: _done, onError: _done, cancelOnError: true); - if (_issuedPause) { - _subscription!.pause(); - _issuedPause = false; - } - return _completer!.future; - } - - @override - Future close() { - _ensureController(); - Future closeSocket() => - sink.close().catchError((_) {}).then((_) => webSocket); - - _controller!.close(); - return _closeCompleter.future.then((_) => closeSocket()); - } - - void add(Object? data) { - if (_closed) return; - _ensureController(); - _controller!.add(data); - } - - void closeSocket() { - _closed = true; - _cancel(); - close(); - } -} - -class WebSocketImpl extends Stream with _ServiceObject implements StreamSink { - // Use default Map so we keep order. - static final Map<int, WebSocketImpl> _webSockets = <int, WebSocketImpl>{}; - static const int DEFAULT_WINDOW_BITS = 15; - static const String PER_MESSAGE_DEFLATE = 'permessage-deflate'; - - final String? protocol; - - late final StreamController _controller; - - // ignore: cancel_subscriptions - StreamSubscription? _subscription; - late final StreamSink _sink; - - final bool _serverSide; - int _readyState = WebSocket.CONNECTING; - bool _writeClosed = false; - int? _closeCode; - String? _closeReason; - Duration? _pingInterval; - Timer? _pingTimer; - late final _WebSocketConsumer _consumer; - - int? _outCloseCode; - String? _outCloseReason; - Timer? _closeTimer; - - WebSocketImpl.fromSocket( - Stream<List<int>> stream, StreamSink<List<int>> sink, this.protocol, - [this._serverSide = false]) { - _consumer = _WebSocketConsumer(this, sink); - _sink = StreamSinkImpl(_consumer); - _readyState = WebSocket.OPEN; - - final transformer = _WebSocketProtocolTransformer(_serverSide); - _subscription = transformer.bind(stream).listen((data) { - if (data is _WebSocketPing) { - if (!_writeClosed) _consumer.add(_WebSocketPong(data.payload)); - } else if (data is _WebSocketPong) { - // Simply set pingInterval, as it'll cancel any timers. - pingInterval = _pingInterval; - } else { - _controller.add(data); - } - }, onError: (Object error) { - if (_closeTimer != null) _closeTimer!.cancel(); - if (error is FormatException) { - _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA); - } else { - _close(WebSocketStatus.PROTOCOL_ERROR); - } - // An error happened, set the close code set above. - _closeCode = _outCloseCode; - _closeReason = _outCloseReason; - _controller.close(); - }, onDone: () { - if (_closeTimer != null) _closeTimer!.cancel(); - if (_readyState == WebSocket.OPEN) { - _readyState = WebSocket.CLOSING; - if (!_isReservedStatusCode(transformer.closeCode)) { - _close(transformer.closeCode, transformer.closeReason); - } else { - _close(); - } - _readyState = WebSocket.CLOSED; - } - // Protocol close, use close code from transformer. - _closeCode = transformer.closeCode; - _closeReason = transformer.closeReason; - _controller.close(); - }, cancelOnError: true); - _subscription!.pause(); - _controller = StreamController( - sync: true, - onListen: () => _subscription!.resume(), - onCancel: () { - _subscription!.cancel(); - _subscription = null; - }, - onPause: _subscription!.pause, - onResume: _subscription!.resume); - - _webSockets[_serviceId] = this; - } - - @override - StreamSubscription listen(void Function(dynamic)? onData, - {Function? onError, void Function()? onDone, bool? cancelOnError}) => - _controller.stream.listen(onData, - onError: onError, onDone: onDone, cancelOnError: cancelOnError); - - Duration? get pingInterval => _pingInterval; - - set pingInterval(Duration? interval) { - if (_writeClosed) return; - if (_pingTimer != null) _pingTimer!.cancel(); - _pingInterval = interval; - - if (_pingInterval == null) return; - - _pingTimer = Timer(_pingInterval!, () { - if (_writeClosed) return; - _consumer.add(_WebSocketPing()); - _pingTimer = Timer(_pingInterval!, () { - // No pong received. - _close(WebSocketStatus.GOING_AWAY); - }); - }); - } - - int get readyState => _readyState; - - String? get extensions => null; - - int? get closeCode => _closeCode; - - String? get closeReason => _closeReason; - - @override - void add(Object? data) { - _sink.add(data); - } - - @override - void addError(Object error, [StackTrace? stackTrace]) { - _sink.addError(error, stackTrace); - } - - @override - Future addStream(Stream stream) => _sink.addStream(stream); - - @override - Future get done => _sink.done; - - @override - Future close([int? code, String? reason]) { - if (_isReservedStatusCode(code)) { - throw WebSocketChannelException('Reserved status code $code'); - } - if (_outCloseCode == null) { - _outCloseCode = code; - _outCloseReason = reason; - } - if (!_controller.isClosed) { - // If a close has not yet been received from the other end then - // 1) make sure to listen on the stream so the close frame will be - // processed if received. - // 2) set a timer terminate the connection if a close frame is - // not received. - if (!_controller.hasListener && _subscription != null) { - _controller.stream.drain<void>().catchError((_) => <String, dynamic>{}); - } - // When closing the web-socket, we no longer accept data. - _closeTimer ??= Timer(const Duration(seconds: 5), () { - // Reuse code and reason from the local close. - _closeCode = _outCloseCode; - _closeReason = _outCloseReason; - if (_subscription != null) _subscription!.cancel(); - _controller.close(); - _webSockets.remove(_serviceId); - }); - } - return _sink.close(); - } - - void _close([int? code, String? reason]) { - if (_writeClosed) return; - if (_outCloseCode == null) { - _outCloseCode = code; - _outCloseReason = reason; - } - _writeClosed = true; - _consumer.closeSocket(); - _webSockets.remove(_serviceId); - } - - // The _toJSON, _serviceTypePath, and _serviceTypeName methods have been - // deleted for web_socket_channel. The methods were unused in WebSocket code - // and produced warnings. - - static bool _isReservedStatusCode(int? code) => - code != null && - (code < WebSocketStatus.NORMAL_CLOSURE || - code == WebSocketStatus.RESERVED_1004 || - code == WebSocketStatus.NO_STATUS_RECEIVED || - code == WebSocketStatus.ABNORMAL_CLOSURE || - (code > WebSocketStatus.INTERNAL_SERVER_ERROR && - code < WebSocketStatus.RESERVED_1015) || - (code >= WebSocketStatus.RESERVED_1015 && code < 3000)); -} - -// The following code is from sdk/lib/io/service_object.dart. - -int _nextServiceId = 1; - -// TODO(ajohnsen): Use other way of getting a uniq id. -mixin class _ServiceObject { - int __serviceId = 0; - - int get _serviceId { - if (__serviceId == 0) __serviceId = _nextServiceId++; - return __serviceId; - } - -// The _toJSON, _servicePath, _serviceTypePath, _serviceTypeName, and -// _serviceType methods have been deleted for http_parser. The methods were -// unused in WebSocket code and produced warnings. -}
diff --git a/pubspec.yaml b/pubspec.yaml index 0e44661..d198922 100644 --- a/pubspec.yaml +++ b/pubspec.yaml
@@ -1,5 +1,5 @@ name: web_socket_channel -version: 2.4.5 +version: 3.0.0-wip description: >- StreamChannel wrappers for WebSockets. Provides a cross-platform WebSocketChannel API, a cross-platform implementation of that API that @@ -14,7 +14,18 @@ crypto: ^3.0.0 stream_channel: ^2.1.0 web: ^0.5.0 + web_socket: ^0.1.1 dev_dependencies: dart_flutter_team_lints: ^2.0.0 test: ^1.16.0 + +# Remove this when versions of `package:test` and `shelf_web_socket` that support +# channel_web_socket 3.0 are released. +dependency_overrides: + shelf_web_socket: + git: + ref: master + url: https://github.com/dart-lang/shelf.git + path: pkgs/shelf_web_socket + test: 1.25.2
diff --git a/test/adapter_web_socket_channel_test.dart b/test/adapter_web_socket_channel_test.dart new file mode 100644 index 0000000..44ed7a9 --- /dev/null +++ b/test/adapter_web_socket_channel_test.dart
@@ -0,0 +1,150 @@ +// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:typed_data'; + +import 'package:async/async.dart'; +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; +import 'package:web_socket/web_socket.dart'; +import 'package:web_socket_channel/adapter_web_socket_channel.dart'; +import 'package:web_socket_channel/src/exception.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; + +import 'echo_server_vm.dart' + if (dart.library.js_interop) 'echo_server_web.dart'; + +void main() { + group('AdapterWebSocketChannel', () { + late Uri uri; + late StreamChannel<Object?> httpServerChannel; + late StreamQueue<Object?> httpServerQueue; + + setUp(() async { + httpServerChannel = await startServer(); + httpServerQueue = StreamQueue(httpServerChannel.stream); + + // When run under dart2wasm, JSON numbers are always returned as [double]. + final port = ((await httpServerQueue.next) as num).toInt(); + uri = Uri.parse('ws://localhost:$port'); + }); + tearDown(() async { + httpServerChannel.sink.add(null); + }); + + test('failed connect', () async { + final channel = + AdapterWebSocketChannel.connect(Uri.parse('ws://notahost')); + + await expectLater( + channel.ready, throwsA(isA<WebSocketChannelException>())); + }); + + test('good connect', () async { + final channel = AdapterWebSocketChannel.connect(uri); + await expectLater(channel.ready, completes); + await channel.sink.close(); + }); + + test('echo empty text', () async { + final channel = AdapterWebSocketChannel.connect(uri); + await expectLater(channel.ready, completes); + channel.sink.add(''); + expect(await channel.stream.first, ''); + await channel.sink.close(); + }); + + test('echo empty binary', () async { + final channel = AdapterWebSocketChannel.connect(uri); + await expectLater(channel.ready, completes); + channel.sink.add(Uint8List.fromList(<int>[])); + expect(await channel.stream.first, isEmpty); + await channel.sink.close(); + }); + + test('echo hello', () async { + final channel = AdapterWebSocketChannel.connect(uri); + await expectLater(channel.ready, completes); + channel.sink.add('hello'); + expect(await channel.stream.first, 'hello'); + await channel.sink.close(); + }); + + test('echo [1,2,3]', () async { + final channel = AdapterWebSocketChannel.connect(uri); + await expectLater(channel.ready, completes); + channel.sink.add([1, 2, 3]); + expect(await channel.stream.first, [1, 2, 3]); + await channel.sink.close(); + }); + + test('alternative string and binary request and response', () async { + final channel = AdapterWebSocketChannel.connect(uri); + await expectLater(channel.ready, completes); + channel.sink.add('This count says:'); + channel.sink.add([1, 2, 3]); + channel.sink.add('And then:'); + channel.sink.add([4, 5, 6]); + expect(await channel.stream.take(4).toList(), [ + 'This count says:', + [1, 2, 3], + 'And then:', + [4, 5, 6] + ]); + }); + + test('remote close', () async { + final channel = AdapterWebSocketChannel.connect(uri); + await expectLater(channel.ready, completes); + channel.sink.add('close'); // Asks the peer to close. + // Give the server time to send a close frame. + await Future<void>.delayed(const Duration(seconds: 1)); + expect(channel.closeCode, 3001); + expect(channel.closeReason, 'you asked me to'); + await channel.sink.close(); + }); + + test('local close', () async { + final channel = AdapterWebSocketChannel.connect(uri); + await expectLater(channel.ready, completes); + await channel.sink.close(3005, 'please close'); + expect(channel.closeCode, null); + expect(channel.closeReason, null); + }); + + test('constructor with WebSocket', () async { + final webSocket = await WebSocket.connect(uri); + final channel = AdapterWebSocketChannel(webSocket); + + await expectLater(channel.ready, completes); + channel.sink.add('This count says:'); + channel.sink.add([1, 2, 3]); + channel.sink.add('And then:'); + channel.sink.add([4, 5, 6]); + expect(await channel.stream.take(4).toList(), [ + 'This count says:', + [1, 2, 3], + 'And then:', + [4, 5, 6] + ]); + }); + + test('constructor with Future<WebSocket>', () async { + final webSocketFuture = WebSocket.connect(uri); + final channel = AdapterWebSocketChannel(webSocketFuture); + + await expectLater(channel.ready, completes); + channel.sink.add('This count says:'); + channel.sink.add([1, 2, 3]); + channel.sink.add('And then:'); + channel.sink.add([4, 5, 6]); + expect(await channel.stream.take(4).toList(), [ + 'This count says:', + [1, 2, 3], + 'And then:', + [4, 5, 6] + ]); + }); + }); +}
diff --git a/test/echo_server_vm.dart b/test/echo_server_vm.dart new file mode 100644 index 0000000..99ef2a2 --- /dev/null +++ b/test/echo_server_vm.dart
@@ -0,0 +1,34 @@ +// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:io'; + +import 'package:stream_channel/stream_channel.dart'; + +Future<void> hybridMain(StreamChannel<Object?> channel) async { + late HttpServer server; + + server = (await HttpServer.bind('localhost', 0)) + ..transform(WebSocketTransformer()) + .listen((WebSocket webSocket) => webSocket.listen((data) { + if (data == 'close') { + webSocket.close(3001, 'you asked me to'); + } else { + webSocket.add(data); + } + })); + + channel.sink.add(server.port); + await channel + .stream.first; // Any writes indicates that the server should exit. + unawaited(server.close()); +} + +/// Starts an WebSocket server that echos the payload of the request. +Future<StreamChannel<Object?>> startServer() async { + final controller = StreamChannelController<Object?>(sync: true); + unawaited(hybridMain(controller.foreign)); + return controller.local; +}
diff --git a/test/echo_server_web.dart b/test/echo_server_web.dart new file mode 100644 index 0000000..030b702 --- /dev/null +++ b/test/echo_server_web.dart
@@ -0,0 +1,35 @@ +// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; + +/// Starts an WebSocket server that echos the payload of the request. +/// Copied from `echo_server_vm.dart`. +Future<StreamChannel<Object?>> startServer() async => spawnHybridCode(r''' +import 'dart:async'; +import 'dart:io'; + +import 'package:stream_channel/stream_channel.dart'; + +/// Starts an WebSocket server that echos the payload of the request. +Future<void> hybridMain(StreamChannel<Object?> channel) async { + late HttpServer server; + + server = (await HttpServer.bind('localhost', 0)) + ..transform(WebSocketTransformer()) + .listen((WebSocket webSocket) => webSocket.listen((data) { + if (data == 'close') { + webSocket.close(3001, 'you asked me to'); + } else { + webSocket.add(data); + } + })); + + channel.sink.add(server.port); + await channel + .stream.first; // Any writes indicates that the server should exit. + unawaited(server.close()); +} +''');
diff --git a/test/io_test.dart b/test/io_test.dart index b2b7cbf..1b7ae35 100644 --- a/test/io_test.dart +++ b/test/io_test.dart
@@ -24,7 +24,7 @@ channel.stream.listen((request) { expect(request, equals('ping')); channel.sink.add('pong'); - channel.sink.close(5678, 'raisin'); + channel.sink.close(3678, 'raisin'); }); }); @@ -45,7 +45,7 @@ } n++; }, onDone: expectAsync0(() { - expect(channel.closeCode, equals(5678)); + expect(channel.closeCode, equals(3678)); expect(channel.closeReason, equals('raisin')); })); }); @@ -70,7 +70,7 @@ channel.stream.listen( expectAsync1((message) { expect(message, equals('pong')); - channel.sink.close(5678, 'raisin'); + channel.sink.close(3678, 'raisin'); }, count: 1), onDone: expectAsync0(() {})); }); @@ -97,7 +97,7 @@ channel.stream.listen( expectAsync1((message) { expect(message, equals('pong')); - channel.sink.close(5678, 'raisin'); + channel.sink.close(3678, 'raisin'); }, count: 1), onDone: expectAsync0(() {})); }); @@ -109,7 +109,7 @@ expect(() async { final channel = IOWebSocketChannel(webSocket); await channel.stream.drain<void>(); - expect(channel.closeCode, equals(5678)); + expect(channel.closeCode, equals(3678)); expect(channel.closeReason, equals('raisin')); }(), completes); }); @@ -118,7 +118,7 @@ expect(channel.ready, completes); - await channel.sink.close(5678, 'raisin'); + await channel.sink.close(3678, 'raisin'); }); test('.connect wraps a connection error in WebSocketChannelException', @@ -131,7 +131,7 @@ }); final channel = IOWebSocketChannel.connect('ws://localhost:${server.port}'); - expect(channel.ready, throwsA(isA<WebSocketException>())); + expect(channel.ready, throwsA(isA<WebSocketChannelException>())); expect(channel.stream.drain<void>(), throwsA(isA<WebSocketChannelException>())); }); @@ -154,7 +154,7 @@ 'ws://localhost:${server.port}', protocols: [failedProtocol], ); - expect(channel.ready, throwsA(isA<WebSocketException>())); + expect(channel.ready, throwsA(isA<WebSocketChannelException>())); expect( channel.stream.drain<void>(), throwsA(isA<WebSocketChannelException>()), @@ -192,7 +192,7 @@ expect(() async { final channel = IOWebSocketChannel(webSocket); await channel.stream.drain<void>(); - expect(channel.closeCode, equals(5678)); + expect(channel.closeCode, equals(3678)); expect(channel.closeReason, equals('raisin')); }(), completes); }); @@ -202,7 +202,7 @@ connectTimeout: const Duration(milliseconds: 1000), ); expect(channel.ready, completes); - await channel.sink.close(5678, 'raisin'); + await channel.sink.close(3678, 'raisin'); }); test('.respects timeout parameter when trying to connect', () async { @@ -230,8 +230,7 @@ ); expect(channel.ready, throwsA(isA<TimeoutException>())); - expect(channel.stream.drain<void>(), - throwsA(isA<WebSocketChannelException>())); + expect(channel.stream.drain<void>(), throwsA(anything)); }); test('.custom client is passed through', () async {
diff --git a/test/web_socket_test.dart b/test/web_socket_test.dart deleted file mode 100644 index 7aaabc7..0000000 --- a/test/web_socket_test.dart +++ /dev/null
@@ -1,100 +0,0 @@ -// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -@TestOn('vm') -library; - -import 'dart:io'; - -import 'package:stream_channel/stream_channel.dart'; -import 'package:test/test.dart'; -import 'package:web_socket_channel/web_socket_channel.dart'; - -void main() { - group('using WebSocketChannel', () { - test('a client can communicate with a WebSocket server', () async { - final server = await HttpServer.bind('localhost', 0); - server.transform(WebSocketTransformer()).listen((webSocket) { - webSocket.add('hello!'); - webSocket.listen((request) { - expect(request, equals('ping')); - webSocket.add('pong'); - webSocket.close(); - }); - }); - - final client = HttpClient(); - final request = await client.openUrl( - 'GET', Uri.parse('http://localhost:${server.port}')); - request.headers - ..set('Connection', 'Upgrade') - ..set('Upgrade', 'websocket') - ..set('Sec-WebSocket-Key', 'x3JJHMbDL1EzLkh9GBhXDw==') - ..set('Sec-WebSocket-Version', '13'); - - final response = await request.close(); - final socket = await response.detachSocket(); - final innerChannel = StreamChannel<List<int>>(socket, socket); - final webSocket = WebSocketChannel(innerChannel, serverSide: false); - - var n = 0; - await webSocket.stream.listen((message) { - if (n == 0) { - expect(message, equals('hello!')); - webSocket.sink.add('ping'); - } else if (n == 1) { - expect(message, equals('pong')); - webSocket.sink.close(); - server.close(); - } else { - fail('Only expected two messages.'); - } - n++; - }).asFuture<void>(); - }); - - test('a server can communicate with a WebSocket client', () async { - final server = await HttpServer.bind('localhost', 0); - server.listen((request) async { - final response = request.response; - response.statusCode = 101; - response.headers - ..set('Connection', 'Upgrade') - ..set('Upgrade', 'websocket') - ..set( - 'Sec-WebSocket-Accept', - WebSocketChannel.signKey( - request.headers.value('Sec-WebSocket-Key')!)); - response.contentLength = 0; - - final socket = await response.detachSocket(); - final innerChannel = StreamChannel<List<int>>(socket, socket); - final webSocket = WebSocketChannel(innerChannel); - webSocket.sink.add('hello!'); - - final message = await webSocket.stream.first; - expect(message, equals('ping')); - webSocket.sink.add('pong'); - await webSocket.sink.close(); - }); - - final webSocket = - await WebSocket.connect('ws://localhost:${server.port}'); - var n = 0; - await webSocket.listen((message) { - if (n == 0) { - expect(message, equals('hello!')); - webSocket.add('ping'); - } else if (n == 1) { - expect(message, equals('pong')); - webSocket.close(); - server.close(); - } else { - fail('Only expected two messages.'); - } - n++; - }).asFuture<void>(); - }); - }); -}