Create an adapter for `package:web_socket` (#339)
diff --git a/CHANGELOG.md b/CHANGELOG.md index b7f318e..06d7d68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md
@@ -1,11 +1,16 @@ +## 3.0.0-wip + +- Provide an adapter around `package:web_socket` `WebSocket`s and make it the + default implementation for `WebSocketChannel.connect`. + ## 2.4.5 -* use secure random number generator for frame masking. +- use secure random number generator for frame masking. ## 2.4.4 -* Require Dart `^3.3` -* Require `package:web` `^0.5.0`. +- Require Dart `^3.3` +- Require `package:web` `^0.5.0`. ## 2.4.3
diff --git a/lib/src/_connect_api.dart b/lib/src/_connect_api.dart deleted file mode 100644 index c58910e..0000000 --- a/lib/src/_connect_api.dart +++ /dev/null
@@ -1,15 +0,0 @@ -// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -import '../web_socket_channel.dart'; - -/// Creates a new WebSocket connection. -/// -/// Connects to [uri] using and returns a channel that can be used to -/// communicate over the resulting socket. -/// -/// The optional [protocols] parameter is the same as `WebSocket.connect`. -WebSocketChannel connect(Uri uri, {Iterable<String>? protocols}) { - throw UnsupportedError('No implementation of the connect api provided'); -}
diff --git a/lib/src/_connect_html.dart b/lib/src/_connect_html.dart deleted file mode 100644 index e725d1b..0000000 --- a/lib/src/_connect_html.dart +++ /dev/null
@@ -1,16 +0,0 @@ -// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -import '../html.dart'; - -import '../web_socket_channel.dart'; - -/// Creates a new WebSocket connection. -/// -/// Connects to [uri] using and returns a channel that can be used to -/// communicate over the resulting socket. -/// -/// The optional [protocols] parameter is the same as `WebSocket.connect`. -WebSocketChannel connect(Uri uri, {Iterable<String>? protocols}) => - HtmlWebSocketChannel.connect(uri, protocols: protocols);
diff --git a/lib/src/_connect_io.dart b/lib/src/_connect_io.dart deleted file mode 100644 index a7a82c4..0000000 --- a/lib/src/_connect_io.dart +++ /dev/null
@@ -1,15 +0,0 @@ -// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -import '../io.dart'; -import '../web_socket_channel.dart'; - -/// Creates a new WebSocket connection. -/// -/// Connects to [uri] using and returns a channel that can be used to -/// communicate over the resulting socket. -/// -/// The optional [protocols] parameter is the same as `WebSocket.connect`. -WebSocketChannel connect(Uri uri, {Iterable<String>? protocols}) => - IOWebSocketChannel.connect(uri, protocols: protocols);
diff --git a/lib/src/channel.dart b/lib/src/channel.dart index 4143128..825088d 100644 --- a/lib/src/channel.dart +++ b/lib/src/channel.dart
@@ -9,9 +9,7 @@ import 'package:crypto/crypto.dart'; import 'package:stream_channel/stream_channel.dart'; -import '_connect_api.dart' - if (dart.library.io) '_connect_io.dart' - if (dart.library.js_interop) '_connect_html.dart' as platform; +import '../web_socket_adapter_web_socket_channel.dart'; import 'copy/web_socket_impl.dart'; import 'exception.dart'; @@ -141,7 +139,7 @@ /// If there are errors creating the connection the [ready] future will /// complete with an error. factory WebSocketChannel.connect(Uri uri, {Iterable<String>? protocols}) => - platform.connect(uri, protocols: protocols); + WebSocketAdapterWebSocketChannel.connect(uri, protocols: protocols); } /// The sink exposed by a [WebSocketChannel].
diff --git a/lib/web_socket_adapter_web_socket_channel.dart b/lib/web_socket_adapter_web_socket_channel.dart new file mode 100644 index 0000000..28780ff --- /dev/null +++ b/lib/web_socket_adapter_web_socket_channel.dart
@@ -0,0 +1,136 @@ +// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:typed_data'; + +import 'package:async/async.dart'; +import 'package:stream_channel/stream_channel.dart'; +import 'package:web_socket/web_socket.dart'; + +import 'src/channel.dart'; +import 'src/exception.dart'; + +/// A [WebSocketChannel] implemented using [WebSocket]. +class WebSocketAdapterWebSocketChannel extends StreamChannelMixin + implements WebSocketChannel { + @override + String? get protocol => _protocol; + String? _protocol; + + @override + int? get closeCode => _closeCode; + int? _closeCode; + + @override + String? get closeReason => _closeReason; + String? _closeReason; + + /// The close code set by the local user. + /// + /// To ensure proper ordering, this is stored until we get a done event on + /// [_controller.local.stream]. + int? _localCloseCode; + + /// The close reason set by the local user. + /// + /// To ensure proper ordering, this is stored until we get a done event on + /// [_controller.local.stream]. + String? _localCloseReason; + + /// Completer for [ready]. + final _readyCompleter = Completer<void>(); + + @override + Future<void> get ready => _readyCompleter.future; + + @override + Stream get stream => _controller.foreign.stream; + + final _controller = + StreamChannelController<Object?>(sync: true, allowForeignErrors: false); + + @override + late final WebSocketSink sink = _WebSocketSink(this); + + /// Creates a new WebSocket connection. + /// + /// If provided, the [protocols] argument indicates that subprotocols that + /// the peer is able to select. See + /// [RFC-6455 1.9](https://datatracker.ietf.org/doc/html/rfc6455#section-1.9). + /// + /// After construction, the [WebSocketAdapterWebSocketChannel] may not be + /// connected to the peer. The [ready] future will complete after the channel + /// is connected. If there are errors creating the connection the [ready] + /// future will complete with an error. + factory WebSocketAdapterWebSocketChannel.connect(Uri url, + {Iterable<String>? protocols}) => + WebSocketAdapterWebSocketChannel._( + WebSocket.connect(url, protocols: protocols)); + + // Create a [WebSocketWebSocketChannelAdapter] from an existing [WebSocket]. + factory WebSocketAdapterWebSocketChannel.fromWebSocket(WebSocket webSocket) => + WebSocketAdapterWebSocketChannel._(Future.value(webSocket)); + + WebSocketAdapterWebSocketChannel._(Future<WebSocket> webSocketFuture) { + webSocketFuture.then((webSocket) { + var remoteClosed = false; + webSocket.events.listen((event) { + switch (event) { + case TextDataReceived(text: final text): + _controller.local.sink.add(text); + case BinaryDataReceived(data: final data): + _controller.local.sink.add(data); + case CloseReceived(code: final code, reason: final reason): + remoteClosed = true; + _closeCode = code; + _closeReason = reason; + _controller.local.sink.close(); + } + }); + _controller.local.stream.listen((obj) { + try { + switch (obj) { + case final String s: + webSocket.sendText(s); + case final Uint8List b: + webSocket.sendBytes(b); + case final List<int> b: + webSocket.sendBytes(Uint8List.fromList(b)); + default: + throw UnsupportedError('Cannot send ${obj.runtimeType}'); + } + } on WebSocketConnectionClosed catch (_) { + // There is nowhere to surface this error; `_controller.local.sink` + // has already been closed. + } + }, onDone: () { + if (!remoteClosed) { + webSocket.close(_localCloseCode, _localCloseReason); + } + }); + _protocol = webSocket.protocol; + _readyCompleter.complete(); + }, onError: (Object e) { + _readyCompleter.completeError(WebSocketChannelException.from(e)); + }); + } +} + +/// A [WebSocketSink] that tracks the close code and reason passed to [close]. +class _WebSocketSink extends DelegatingStreamSink implements WebSocketSink { + /// The channel to which this sink belongs. + final WebSocketAdapterWebSocketChannel _channel; + + _WebSocketSink(WebSocketAdapterWebSocketChannel channel) + : _channel = channel, + super(channel._controller.foreign.sink); + + @override + Future close([int? closeCode, String? closeReason]) { + _channel._localCloseCode = closeCode; + _channel._localCloseReason = closeReason; + return super.close(); + } +}
diff --git a/pubspec.yaml b/pubspec.yaml index 0e44661..c69f4b7 100644 --- a/pubspec.yaml +++ b/pubspec.yaml
@@ -1,5 +1,5 @@ name: web_socket_channel -version: 2.4.5 +version: 3.0.0-wip description: >- StreamChannel wrappers for WebSockets. Provides a cross-platform WebSocketChannel API, a cross-platform implementation of that API that @@ -14,7 +14,14 @@ crypto: ^3.0.0 stream_channel: ^2.1.0 web: ^0.5.0 + web_socket: ^0.1.0 dev_dependencies: dart_flutter_team_lints: ^2.0.0 test: ^1.16.0 + +# Remove this when versions of `package:test` and `shelf_web_socket` that support +# channel_web_socket 3.0 are released. +dependency_overrides: + shelf_web_socket: 1.0.4 + test: 1.25.2
diff --git a/test/echo_server_vm.dart b/test/echo_server_vm.dart new file mode 100644 index 0000000..99ef2a2 --- /dev/null +++ b/test/echo_server_vm.dart
@@ -0,0 +1,34 @@ +// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; +import 'dart:io'; + +import 'package:stream_channel/stream_channel.dart'; + +Future<void> hybridMain(StreamChannel<Object?> channel) async { + late HttpServer server; + + server = (await HttpServer.bind('localhost', 0)) + ..transform(WebSocketTransformer()) + .listen((WebSocket webSocket) => webSocket.listen((data) { + if (data == 'close') { + webSocket.close(3001, 'you asked me to'); + } else { + webSocket.add(data); + } + })); + + channel.sink.add(server.port); + await channel + .stream.first; // Any writes indicates that the server should exit. + unawaited(server.close()); +} + +/// Starts an WebSocket server that echos the payload of the request. +Future<StreamChannel<Object?>> startServer() async { + final controller = StreamChannelController<Object?>(sync: true); + unawaited(hybridMain(controller.foreign)); + return controller.local; +}
diff --git a/test/echo_server_web.dart b/test/echo_server_web.dart new file mode 100644 index 0000000..030b702 --- /dev/null +++ b/test/echo_server_web.dart
@@ -0,0 +1,35 @@ +// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; + +/// Starts an WebSocket server that echos the payload of the request. +/// Copied from `echo_server_vm.dart`. +Future<StreamChannel<Object?>> startServer() async => spawnHybridCode(r''' +import 'dart:async'; +import 'dart:io'; + +import 'package:stream_channel/stream_channel.dart'; + +/// Starts an WebSocket server that echos the payload of the request. +Future<void> hybridMain(StreamChannel<Object?> channel) async { + late HttpServer server; + + server = (await HttpServer.bind('localhost', 0)) + ..transform(WebSocketTransformer()) + .listen((WebSocket webSocket) => webSocket.listen((data) { + if (data == 'close') { + webSocket.close(3001, 'you asked me to'); + } else { + webSocket.add(data); + } + })); + + channel.sink.add(server.port); + await channel + .stream.first; // Any writes indicates that the server should exit. + unawaited(server.close()); +} +''');
diff --git a/test/io_test.dart b/test/io_test.dart index b2b7cbf..ac1ffcc 100644 --- a/test/io_test.dart +++ b/test/io_test.dart
@@ -24,7 +24,7 @@ channel.stream.listen((request) { expect(request, equals('ping')); channel.sink.add('pong'); - channel.sink.close(5678, 'raisin'); + channel.sink.close(3678, 'raisin'); }); }); @@ -45,7 +45,7 @@ } n++; }, onDone: expectAsync0(() { - expect(channel.closeCode, equals(5678)); + expect(channel.closeCode, equals(3678)); expect(channel.closeReason, equals('raisin')); })); }); @@ -70,7 +70,7 @@ channel.stream.listen( expectAsync1((message) { expect(message, equals('pong')); - channel.sink.close(5678, 'raisin'); + channel.sink.close(3678, 'raisin'); }, count: 1), onDone: expectAsync0(() {})); }); @@ -97,7 +97,7 @@ channel.stream.listen( expectAsync1((message) { expect(message, equals('pong')); - channel.sink.close(5678, 'raisin'); + channel.sink.close(3678, 'raisin'); }, count: 1), onDone: expectAsync0(() {})); }); @@ -109,7 +109,7 @@ expect(() async { final channel = IOWebSocketChannel(webSocket); await channel.stream.drain<void>(); - expect(channel.closeCode, equals(5678)); + expect(channel.closeCode, equals(3678)); expect(channel.closeReason, equals('raisin')); }(), completes); }); @@ -118,7 +118,7 @@ expect(channel.ready, completes); - await channel.sink.close(5678, 'raisin'); + await channel.sink.close(3678, 'raisin'); }); test('.connect wraps a connection error in WebSocketChannelException', @@ -192,7 +192,7 @@ expect(() async { final channel = IOWebSocketChannel(webSocket); await channel.stream.drain<void>(); - expect(channel.closeCode, equals(5678)); + expect(channel.closeCode, equals(3678)); expect(channel.closeReason, equals('raisin')); }(), completes); }); @@ -202,7 +202,7 @@ connectTimeout: const Duration(milliseconds: 1000), ); expect(channel.ready, completes); - await channel.sink.close(5678, 'raisin'); + await channel.sink.close(3678, 'raisin'); }); test('.respects timeout parameter when trying to connect', () async {
diff --git a/test/web_socket_adapter_web_socket_test.dart b/test/web_socket_adapter_web_socket_test.dart new file mode 100644 index 0000000..24f9907 --- /dev/null +++ b/test/web_socket_adapter_web_socket_test.dart
@@ -0,0 +1,132 @@ +// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:typed_data'; + +import 'package:async/async.dart'; +import 'package:stream_channel/stream_channel.dart'; +import 'package:test/test.dart'; +import 'package:web_socket/web_socket.dart'; +import 'package:web_socket_channel/src/exception.dart'; +import 'package:web_socket_channel/web_socket_adapter_web_socket_channel.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'echo_server_vm.dart' + if (dart.library.js_interop) 'echo_server_web.dart'; + +void main() { + group('WebSocketWebSocketChannelAdapter', () { + late Uri uri; + late StreamChannel<Object?> httpServerChannel; + late StreamQueue<Object?> httpServerQueue; + + setUp(() async { + httpServerChannel = await startServer(); + httpServerQueue = StreamQueue(httpServerChannel.stream); + + // When run under dart2wasm, JSON numbers are always returned as [double]. + final port = ((await httpServerQueue.next) as num).toInt(); + uri = Uri.parse('ws://localhost:$port'); + }); + tearDown(() async { + httpServerChannel.sink.add(null); + }); + + test('failed connect', () async { + final channel = + WebSocketAdapterWebSocketChannel.connect(Uri.parse('ws://notahost')); + + await expectLater( + channel.ready, throwsA(isA<WebSocketChannelException>())); + }); + + test('good connect', () async { + final channel = WebSocketAdapterWebSocketChannel.connect(uri); + await expectLater(channel.ready, completes); + await channel.sink.close(); + }); + + test('echo empty text', () async { + final channel = WebSocketAdapterWebSocketChannel.connect(uri); + await expectLater(channel.ready, completes); + channel.sink.add(''); + expect(await channel.stream.first, ''); + await channel.sink.close(); + }); + + test('echo empty binary', () async { + final channel = WebSocketAdapterWebSocketChannel.connect(uri); + await expectLater(channel.ready, completes); + channel.sink.add(Uint8List.fromList(<int>[])); + expect(await channel.stream.first, isEmpty); + await channel.sink.close(); + }); + + test('echo hello', () async { + final channel = WebSocketAdapterWebSocketChannel.connect(uri); + await expectLater(channel.ready, completes); + channel.sink.add('hello'); + expect(await channel.stream.first, 'hello'); + await channel.sink.close(); + }); + + test('echo [1,2,3]', () async { + final channel = WebSocketAdapterWebSocketChannel.connect(uri); + await expectLater(channel.ready, completes); + channel.sink.add([1, 2, 3]); + expect(await channel.stream.first, [1, 2, 3]); + await channel.sink.close(); + }); + + test('alternative string and binary request and response', () async { + final channel = WebSocketAdapterWebSocketChannel.connect(uri); + await expectLater(channel.ready, completes); + channel.sink.add('This count says:'); + channel.sink.add([1, 2, 3]); + channel.sink.add('And then:'); + channel.sink.add([4, 5, 6]); + expect(await channel.stream.take(4).toList(), [ + 'This count says:', + [1, 2, 3], + 'And then:', + [4, 5, 6] + ]); + }); + + test('remote close', () async { + final channel = WebSocketAdapterWebSocketChannel.connect(uri); + await expectLater(channel.ready, completes); + channel.sink.add('close'); // Asks the peer to close. + // Give the server time to send a close frame. + await Future<void>.delayed(const Duration(seconds: 1)); + expect(channel.closeCode, 3001); + expect(channel.closeReason, 'you asked me to'); + await channel.sink.close(); + }); + + test('local close', () async { + final channel = WebSocketAdapterWebSocketChannel.connect(uri); + await expectLater(channel.ready, completes); + await channel.sink.close(3005, 'please close'); + expect(channel.closeCode, null); + expect(channel.closeReason, null); + }); + + test('fromWebSocket', () async { + final webSocket = await WebSocket.connect(uri); + final channel = WebSocketAdapterWebSocketChannel.fromWebSocket(webSocket); + + await expectLater(channel.ready, completes); + channel.sink.add('This count says:'); + channel.sink.add([1, 2, 3]); + channel.sink.add('And then:'); + channel.sink.add([4, 5, 6]); + expect(await channel.stream.take(4).toList(), [ + 'This count says:', + [1, 2, 3], + 'And then:', + [4, 5, 6] + ]); + }); + }); +}