Create an adapter for `package:web_socket` (dart-lang/web_socket_channel#339)
diff --git a/pkgs/web_socket_channel/CHANGELOG.md b/pkgs/web_socket_channel/CHANGELOG.md
index b7f318e..06d7d68 100644
--- a/pkgs/web_socket_channel/CHANGELOG.md
+++ b/pkgs/web_socket_channel/CHANGELOG.md
@@ -1,11 +1,16 @@
+## 3.0.0-wip
+
+- Provide an adapter around `package:web_socket` `WebSocket`s and make it the
+ default implementation for `WebSocketChannel.connect`.
+
## 2.4.5
-* use secure random number generator for frame masking.
+- use secure random number generator for frame masking.
## 2.4.4
-* Require Dart `^3.3`
-* Require `package:web` `^0.5.0`.
+- Require Dart `^3.3`
+- Require `package:web` `^0.5.0`.
## 2.4.3
diff --git a/pkgs/web_socket_channel/lib/src/_connect_api.dart b/pkgs/web_socket_channel/lib/src/_connect_api.dart
deleted file mode 100644
index c58910e..0000000
--- a/pkgs/web_socket_channel/lib/src/_connect_api.dart
+++ /dev/null
@@ -1,15 +0,0 @@
-// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-import '../web_socket_channel.dart';
-
-/// Creates a new WebSocket connection.
-///
-/// Connects to [uri] using and returns a channel that can be used to
-/// communicate over the resulting socket.
-///
-/// The optional [protocols] parameter is the same as `WebSocket.connect`.
-WebSocketChannel connect(Uri uri, {Iterable<String>? protocols}) {
- throw UnsupportedError('No implementation of the connect api provided');
-}
diff --git a/pkgs/web_socket_channel/lib/src/_connect_html.dart b/pkgs/web_socket_channel/lib/src/_connect_html.dart
deleted file mode 100644
index e725d1b..0000000
--- a/pkgs/web_socket_channel/lib/src/_connect_html.dart
+++ /dev/null
@@ -1,16 +0,0 @@
-// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-import '../html.dart';
-
-import '../web_socket_channel.dart';
-
-/// Creates a new WebSocket connection.
-///
-/// Connects to [uri] using and returns a channel that can be used to
-/// communicate over the resulting socket.
-///
-/// The optional [protocols] parameter is the same as `WebSocket.connect`.
-WebSocketChannel connect(Uri uri, {Iterable<String>? protocols}) =>
- HtmlWebSocketChannel.connect(uri, protocols: protocols);
diff --git a/pkgs/web_socket_channel/lib/src/_connect_io.dart b/pkgs/web_socket_channel/lib/src/_connect_io.dart
deleted file mode 100644
index a7a82c4..0000000
--- a/pkgs/web_socket_channel/lib/src/_connect_io.dart
+++ /dev/null
@@ -1,15 +0,0 @@
-// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-import '../io.dart';
-import '../web_socket_channel.dart';
-
-/// Creates a new WebSocket connection.
-///
-/// Connects to [uri] using and returns a channel that can be used to
-/// communicate over the resulting socket.
-///
-/// The optional [protocols] parameter is the same as `WebSocket.connect`.
-WebSocketChannel connect(Uri uri, {Iterable<String>? protocols}) =>
- IOWebSocketChannel.connect(uri, protocols: protocols);
diff --git a/pkgs/web_socket_channel/lib/src/channel.dart b/pkgs/web_socket_channel/lib/src/channel.dart
index 4143128..825088d 100644
--- a/pkgs/web_socket_channel/lib/src/channel.dart
+++ b/pkgs/web_socket_channel/lib/src/channel.dart
@@ -9,9 +9,7 @@
import 'package:crypto/crypto.dart';
import 'package:stream_channel/stream_channel.dart';
-import '_connect_api.dart'
- if (dart.library.io) '_connect_io.dart'
- if (dart.library.js_interop) '_connect_html.dart' as platform;
+import '../web_socket_adapter_web_socket_channel.dart';
import 'copy/web_socket_impl.dart';
import 'exception.dart';
@@ -141,7 +139,7 @@
/// If there are errors creating the connection the [ready] future will
/// complete with an error.
factory WebSocketChannel.connect(Uri uri, {Iterable<String>? protocols}) =>
- platform.connect(uri, protocols: protocols);
+ WebSocketAdapterWebSocketChannel.connect(uri, protocols: protocols);
}
/// The sink exposed by a [WebSocketChannel].
diff --git a/pkgs/web_socket_channel/lib/web_socket_adapter_web_socket_channel.dart b/pkgs/web_socket_channel/lib/web_socket_adapter_web_socket_channel.dart
new file mode 100644
index 0000000..28780ff
--- /dev/null
+++ b/pkgs/web_socket_channel/lib/web_socket_adapter_web_socket_channel.dart
@@ -0,0 +1,136 @@
+// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:typed_data';
+
+import 'package:async/async.dart';
+import 'package:stream_channel/stream_channel.dart';
+import 'package:web_socket/web_socket.dart';
+
+import 'src/channel.dart';
+import 'src/exception.dart';
+
+/// A [WebSocketChannel] implemented using [WebSocket].
+class WebSocketAdapterWebSocketChannel extends StreamChannelMixin
+ implements WebSocketChannel {
+ @override
+ String? get protocol => _protocol;
+ String? _protocol;
+
+ @override
+ int? get closeCode => _closeCode;
+ int? _closeCode;
+
+ @override
+ String? get closeReason => _closeReason;
+ String? _closeReason;
+
+ /// The close code set by the local user.
+ ///
+ /// To ensure proper ordering, this is stored until we get a done event on
+ /// [_controller.local.stream].
+ int? _localCloseCode;
+
+ /// The close reason set by the local user.
+ ///
+ /// To ensure proper ordering, this is stored until we get a done event on
+ /// [_controller.local.stream].
+ String? _localCloseReason;
+
+ /// Completer for [ready].
+ final _readyCompleter = Completer<void>();
+
+ @override
+ Future<void> get ready => _readyCompleter.future;
+
+ @override
+ Stream get stream => _controller.foreign.stream;
+
+ final _controller =
+ StreamChannelController<Object?>(sync: true, allowForeignErrors: false);
+
+ @override
+ late final WebSocketSink sink = _WebSocketSink(this);
+
+ /// Creates a new WebSocket connection.
+ ///
+ /// If provided, the [protocols] argument indicates that subprotocols that
+ /// the peer is able to select. See
+ /// [RFC-6455 1.9](https://datatracker.ietf.org/doc/html/rfc6455#section-1.9).
+ ///
+ /// After construction, the [WebSocketAdapterWebSocketChannel] may not be
+ /// connected to the peer. The [ready] future will complete after the channel
+ /// is connected. If there are errors creating the connection the [ready]
+ /// future will complete with an error.
+ factory WebSocketAdapterWebSocketChannel.connect(Uri url,
+ {Iterable<String>? protocols}) =>
+ WebSocketAdapterWebSocketChannel._(
+ WebSocket.connect(url, protocols: protocols));
+
+ // Create a [WebSocketWebSocketChannelAdapter] from an existing [WebSocket].
+ factory WebSocketAdapterWebSocketChannel.fromWebSocket(WebSocket webSocket) =>
+ WebSocketAdapterWebSocketChannel._(Future.value(webSocket));
+
+ WebSocketAdapterWebSocketChannel._(Future<WebSocket> webSocketFuture) {
+ webSocketFuture.then((webSocket) {
+ var remoteClosed = false;
+ webSocket.events.listen((event) {
+ switch (event) {
+ case TextDataReceived(text: final text):
+ _controller.local.sink.add(text);
+ case BinaryDataReceived(data: final data):
+ _controller.local.sink.add(data);
+ case CloseReceived(code: final code, reason: final reason):
+ remoteClosed = true;
+ _closeCode = code;
+ _closeReason = reason;
+ _controller.local.sink.close();
+ }
+ });
+ _controller.local.stream.listen((obj) {
+ try {
+ switch (obj) {
+ case final String s:
+ webSocket.sendText(s);
+ case final Uint8List b:
+ webSocket.sendBytes(b);
+ case final List<int> b:
+ webSocket.sendBytes(Uint8List.fromList(b));
+ default:
+ throw UnsupportedError('Cannot send ${obj.runtimeType}');
+ }
+ } on WebSocketConnectionClosed catch (_) {
+ // There is nowhere to surface this error; `_controller.local.sink`
+ // has already been closed.
+ }
+ }, onDone: () {
+ if (!remoteClosed) {
+ webSocket.close(_localCloseCode, _localCloseReason);
+ }
+ });
+ _protocol = webSocket.protocol;
+ _readyCompleter.complete();
+ }, onError: (Object e) {
+ _readyCompleter.completeError(WebSocketChannelException.from(e));
+ });
+ }
+}
+
+/// A [WebSocketSink] that tracks the close code and reason passed to [close].
+class _WebSocketSink extends DelegatingStreamSink implements WebSocketSink {
+ /// The channel to which this sink belongs.
+ final WebSocketAdapterWebSocketChannel _channel;
+
+ _WebSocketSink(WebSocketAdapterWebSocketChannel channel)
+ : _channel = channel,
+ super(channel._controller.foreign.sink);
+
+ @override
+ Future close([int? closeCode, String? closeReason]) {
+ _channel._localCloseCode = closeCode;
+ _channel._localCloseReason = closeReason;
+ return super.close();
+ }
+}
diff --git a/pkgs/web_socket_channel/pubspec.yaml b/pkgs/web_socket_channel/pubspec.yaml
index 0e44661..c69f4b7 100644
--- a/pkgs/web_socket_channel/pubspec.yaml
+++ b/pkgs/web_socket_channel/pubspec.yaml
@@ -1,5 +1,5 @@
name: web_socket_channel
-version: 2.4.5
+version: 3.0.0-wip
description: >-
StreamChannel wrappers for WebSockets. Provides a cross-platform
WebSocketChannel API, a cross-platform implementation of that API that
@@ -14,7 +14,14 @@
crypto: ^3.0.0
stream_channel: ^2.1.0
web: ^0.5.0
+ web_socket: ^0.1.0
dev_dependencies:
dart_flutter_team_lints: ^2.0.0
test: ^1.16.0
+
+# Remove this when versions of `package:test` and `shelf_web_socket` that support
+# channel_web_socket 3.0 are released.
+dependency_overrides:
+ shelf_web_socket: 1.0.4
+ test: 1.25.2
diff --git a/pkgs/web_socket_channel/test/echo_server_vm.dart b/pkgs/web_socket_channel/test/echo_server_vm.dart
new file mode 100644
index 0000000..99ef2a2
--- /dev/null
+++ b/pkgs/web_socket_channel/test/echo_server_vm.dart
@@ -0,0 +1,34 @@
+// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:io';
+
+import 'package:stream_channel/stream_channel.dart';
+
+Future<void> hybridMain(StreamChannel<Object?> channel) async {
+ late HttpServer server;
+
+ server = (await HttpServer.bind('localhost', 0))
+ ..transform(WebSocketTransformer())
+ .listen((WebSocket webSocket) => webSocket.listen((data) {
+ if (data == 'close') {
+ webSocket.close(3001, 'you asked me to');
+ } else {
+ webSocket.add(data);
+ }
+ }));
+
+ channel.sink.add(server.port);
+ await channel
+ .stream.first; // Any writes indicates that the server should exit.
+ unawaited(server.close());
+}
+
+/// Starts an WebSocket server that echos the payload of the request.
+Future<StreamChannel<Object?>> startServer() async {
+ final controller = StreamChannelController<Object?>(sync: true);
+ unawaited(hybridMain(controller.foreign));
+ return controller.local;
+}
diff --git a/pkgs/web_socket_channel/test/echo_server_web.dart b/pkgs/web_socket_channel/test/echo_server_web.dart
new file mode 100644
index 0000000..030b702
--- /dev/null
+++ b/pkgs/web_socket_channel/test/echo_server_web.dart
@@ -0,0 +1,35 @@
+// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+/// Starts an WebSocket server that echos the payload of the request.
+/// Copied from `echo_server_vm.dart`.
+Future<StreamChannel<Object?>> startServer() async => spawnHybridCode(r'''
+import 'dart:async';
+import 'dart:io';
+
+import 'package:stream_channel/stream_channel.dart';
+
+/// Starts an WebSocket server that echos the payload of the request.
+Future<void> hybridMain(StreamChannel<Object?> channel) async {
+ late HttpServer server;
+
+ server = (await HttpServer.bind('localhost', 0))
+ ..transform(WebSocketTransformer())
+ .listen((WebSocket webSocket) => webSocket.listen((data) {
+ if (data == 'close') {
+ webSocket.close(3001, 'you asked me to');
+ } else {
+ webSocket.add(data);
+ }
+ }));
+
+ channel.sink.add(server.port);
+ await channel
+ .stream.first; // Any writes indicates that the server should exit.
+ unawaited(server.close());
+}
+''');
diff --git a/pkgs/web_socket_channel/test/io_test.dart b/pkgs/web_socket_channel/test/io_test.dart
index b2b7cbf..ac1ffcc 100644
--- a/pkgs/web_socket_channel/test/io_test.dart
+++ b/pkgs/web_socket_channel/test/io_test.dart
@@ -24,7 +24,7 @@
channel.stream.listen((request) {
expect(request, equals('ping'));
channel.sink.add('pong');
- channel.sink.close(5678, 'raisin');
+ channel.sink.close(3678, 'raisin');
});
});
@@ -45,7 +45,7 @@
}
n++;
}, onDone: expectAsync0(() {
- expect(channel.closeCode, equals(5678));
+ expect(channel.closeCode, equals(3678));
expect(channel.closeReason, equals('raisin'));
}));
});
@@ -70,7 +70,7 @@
channel.stream.listen(
expectAsync1((message) {
expect(message, equals('pong'));
- channel.sink.close(5678, 'raisin');
+ channel.sink.close(3678, 'raisin');
}, count: 1),
onDone: expectAsync0(() {}));
});
@@ -97,7 +97,7 @@
channel.stream.listen(
expectAsync1((message) {
expect(message, equals('pong'));
- channel.sink.close(5678, 'raisin');
+ channel.sink.close(3678, 'raisin');
}, count: 1),
onDone: expectAsync0(() {}));
});
@@ -109,7 +109,7 @@
expect(() async {
final channel = IOWebSocketChannel(webSocket);
await channel.stream.drain<void>();
- expect(channel.closeCode, equals(5678));
+ expect(channel.closeCode, equals(3678));
expect(channel.closeReason, equals('raisin'));
}(), completes);
});
@@ -118,7 +118,7 @@
expect(channel.ready, completes);
- await channel.sink.close(5678, 'raisin');
+ await channel.sink.close(3678, 'raisin');
});
test('.connect wraps a connection error in WebSocketChannelException',
@@ -192,7 +192,7 @@
expect(() async {
final channel = IOWebSocketChannel(webSocket);
await channel.stream.drain<void>();
- expect(channel.closeCode, equals(5678));
+ expect(channel.closeCode, equals(3678));
expect(channel.closeReason, equals('raisin'));
}(), completes);
});
@@ -202,7 +202,7 @@
connectTimeout: const Duration(milliseconds: 1000),
);
expect(channel.ready, completes);
- await channel.sink.close(5678, 'raisin');
+ await channel.sink.close(3678, 'raisin');
});
test('.respects timeout parameter when trying to connect', () async {
diff --git a/pkgs/web_socket_channel/test/web_socket_adapter_web_socket_test.dart b/pkgs/web_socket_channel/test/web_socket_adapter_web_socket_test.dart
new file mode 100644
index 0000000..24f9907
--- /dev/null
+++ b/pkgs/web_socket_channel/test/web_socket_adapter_web_socket_test.dart
@@ -0,0 +1,132 @@
+// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:typed_data';
+
+import 'package:async/async.dart';
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+import 'package:web_socket/web_socket.dart';
+import 'package:web_socket_channel/src/exception.dart';
+import 'package:web_socket_channel/web_socket_adapter_web_socket_channel.dart';
+import 'package:web_socket_channel/web_socket_channel.dart';
+import 'echo_server_vm.dart'
+ if (dart.library.js_interop) 'echo_server_web.dart';
+
+void main() {
+ group('WebSocketWebSocketChannelAdapter', () {
+ late Uri uri;
+ late StreamChannel<Object?> httpServerChannel;
+ late StreamQueue<Object?> httpServerQueue;
+
+ setUp(() async {
+ httpServerChannel = await startServer();
+ httpServerQueue = StreamQueue(httpServerChannel.stream);
+
+ // When run under dart2wasm, JSON numbers are always returned as [double].
+ final port = ((await httpServerQueue.next) as num).toInt();
+ uri = Uri.parse('ws://localhost:$port');
+ });
+ tearDown(() async {
+ httpServerChannel.sink.add(null);
+ });
+
+ test('failed connect', () async {
+ final channel =
+ WebSocketAdapterWebSocketChannel.connect(Uri.parse('ws://notahost'));
+
+ await expectLater(
+ channel.ready, throwsA(isA<WebSocketChannelException>()));
+ });
+
+ test('good connect', () async {
+ final channel = WebSocketAdapterWebSocketChannel.connect(uri);
+ await expectLater(channel.ready, completes);
+ await channel.sink.close();
+ });
+
+ test('echo empty text', () async {
+ final channel = WebSocketAdapterWebSocketChannel.connect(uri);
+ await expectLater(channel.ready, completes);
+ channel.sink.add('');
+ expect(await channel.stream.first, '');
+ await channel.sink.close();
+ });
+
+ test('echo empty binary', () async {
+ final channel = WebSocketAdapterWebSocketChannel.connect(uri);
+ await expectLater(channel.ready, completes);
+ channel.sink.add(Uint8List.fromList(<int>[]));
+ expect(await channel.stream.first, isEmpty);
+ await channel.sink.close();
+ });
+
+ test('echo hello', () async {
+ final channel = WebSocketAdapterWebSocketChannel.connect(uri);
+ await expectLater(channel.ready, completes);
+ channel.sink.add('hello');
+ expect(await channel.stream.first, 'hello');
+ await channel.sink.close();
+ });
+
+ test('echo [1,2,3]', () async {
+ final channel = WebSocketAdapterWebSocketChannel.connect(uri);
+ await expectLater(channel.ready, completes);
+ channel.sink.add([1, 2, 3]);
+ expect(await channel.stream.first, [1, 2, 3]);
+ await channel.sink.close();
+ });
+
+ test('alternative string and binary request and response', () async {
+ final channel = WebSocketAdapterWebSocketChannel.connect(uri);
+ await expectLater(channel.ready, completes);
+ channel.sink.add('This count says:');
+ channel.sink.add([1, 2, 3]);
+ channel.sink.add('And then:');
+ channel.sink.add([4, 5, 6]);
+ expect(await channel.stream.take(4).toList(), [
+ 'This count says:',
+ [1, 2, 3],
+ 'And then:',
+ [4, 5, 6]
+ ]);
+ });
+
+ test('remote close', () async {
+ final channel = WebSocketAdapterWebSocketChannel.connect(uri);
+ await expectLater(channel.ready, completes);
+ channel.sink.add('close'); // Asks the peer to close.
+ // Give the server time to send a close frame.
+ await Future<void>.delayed(const Duration(seconds: 1));
+ expect(channel.closeCode, 3001);
+ expect(channel.closeReason, 'you asked me to');
+ await channel.sink.close();
+ });
+
+ test('local close', () async {
+ final channel = WebSocketAdapterWebSocketChannel.connect(uri);
+ await expectLater(channel.ready, completes);
+ await channel.sink.close(3005, 'please close');
+ expect(channel.closeCode, null);
+ expect(channel.closeReason, null);
+ });
+
+ test('fromWebSocket', () async {
+ final webSocket = await WebSocket.connect(uri);
+ final channel = WebSocketAdapterWebSocketChannel.fromWebSocket(webSocket);
+
+ await expectLater(channel.ready, completes);
+ channel.sink.add('This count says:');
+ channel.sink.add([1, 2, 3]);
+ channel.sink.add('And then:');
+ channel.sink.add([4, 5, 6]);
+ expect(await channel.stream.take(4).toList(), [
+ 'This count says:',
+ [1, 2, 3],
+ 'And then:',
+ [4, 5, 6]
+ ]);
+ });
+ });
+}