Implement IOWebSocketChannel as a WebSocketAdapterWebSocket subclass (dart-lang/web_socket_channel#342)
diff --git a/pkgs/web_socket_channel/CHANGELOG.md b/pkgs/web_socket_channel/CHANGELOG.md index 139ef22..5026c83 100644 --- a/pkgs/web_socket_channel/CHANGELOG.md +++ b/pkgs/web_socket_channel/CHANGELOG.md
@@ -4,6 +4,8 @@ default implementation for `WebSocketChannel.connect`. - **BREAKING**: Remove `WebSocketChannel` constructor. - **BREAKING**: Make `WebSocketChannel` an `abstract interface`. +- **BREAKING**: `IOWebSocketChannel.ready` will throw + `WebSocketChannelException` instead of `WebSocketException`. ## 2.4.5
diff --git a/pkgs/web_socket_channel/lib/io.dart b/pkgs/web_socket_channel/lib/io.dart index 9f71d81..cfba457 100644 --- a/pkgs/web_socket_channel/lib/io.dart +++ b/pkgs/web_socket_channel/lib/io.dart
@@ -3,51 +3,15 @@ // BSD-style license that can be found in the LICENSE file. import 'dart:async'; -import 'dart:io'; - -import 'package:async/async.dart'; -import 'package:stream_channel/stream_channel.dart'; +import 'dart:io' show HttpClient, WebSocket; +import 'package:web_socket/io_web_socket.dart' as io_web_socket; import 'src/channel.dart'; import 'src/exception.dart'; -import 'src/sink_completer.dart'; +import 'web_socket_adapter_web_socket_channel.dart'; /// A [WebSocketChannel] that communicates using a `dart:io` [WebSocket]. -class IOWebSocketChannel extends StreamChannelMixin - implements WebSocketChannel { - /// The underlying `dart:io` [WebSocket]. - /// - /// If the channel was constructed with [IOWebSocketChannel.connect], this is - /// `null` until the [WebSocket.connect] future completes. - WebSocket? _webSocket; - - @override - String? get protocol => _webSocket?.protocol; - - @override - int? get closeCode => _webSocket?.closeCode; - - @override - String? get closeReason => _webSocket?.closeReason; - - @override - final Stream stream; - - @override - final WebSocketSink sink; - - /// Completer for [ready]. - final Completer<void> _readyCompleter; - - @override - Future<void> get ready => _readyCompleter.future; - - /// The underlying [WebSocket], if this channel has connected. - /// - /// If the future returned from [WebSocket.connect] has not yet completed, or - /// completed as an error, this will be null. - WebSocket? get innerWebSocket => _webSocket; - +class IOWebSocketChannel extends WebSocketAdapterWebSocketChannel { /// Creates a new WebSocket connection. /// /// Connects to [url] using [WebSocket.connect] and returns a channel that can @@ -76,58 +40,24 @@ Duration? connectTimeout, HttpClient? customClient, }) { - late IOWebSocketChannel channel; - final sinkCompleter = WebSocketSinkCompleter(); - var future = WebSocket.connect( + var webSocketFuture = WebSocket.connect( url.toString(), headers: headers, protocols: protocols, customClient: customClient, - ); + ).then((webSocket) => webSocket..pingInterval = pingInterval); + if (connectTimeout != null) { - future = future.timeout(connectTimeout); + webSocketFuture = webSocketFuture.timeout(connectTimeout); } - final stream = StreamCompleter.fromFuture(future.then((webSocket) { - webSocket.pingInterval = pingInterval; - channel._webSocket = webSocket; - channel._readyCompleter.complete(); - sinkCompleter.setDestinationSink(_IOWebSocketSink(webSocket)); - return webSocket; - }).catchError((Object error, StackTrace stackTrace) { - channel._readyCompleter.completeError(error, stackTrace); - throw WebSocketChannelException.from(error); - })); - return channel = - IOWebSocketChannel._withoutSocket(stream, sinkCompleter.sink); + + return IOWebSocketChannel(webSocketFuture); } - /// Creates a channel wrapping [socket]. - IOWebSocketChannel(WebSocket socket) - : _webSocket = socket, - stream = socket.handleError( - (Object? error) => throw WebSocketChannelException.from(error)), - sink = _IOWebSocketSink(socket), - _readyCompleter = Completer()..complete(); - - /// Creates a channel without a socket. - /// - /// This is used with `connect` to synchronously provide a channel that later - /// has a socket added. - IOWebSocketChannel._withoutSocket(Stream stream, this.sink) - : _webSocket = null, - stream = stream.handleError( - (Object? error) => throw WebSocketChannelException.from(error)), - _readyCompleter = Completer(); -} - -/// A [WebSocketSink] that forwards [close] calls to a `dart:io` [WebSocket]. -class _IOWebSocketSink extends DelegatingStreamSink implements WebSocketSink { - /// The underlying socket. - final WebSocket _webSocket; - - _IOWebSocketSink(WebSocket super.webSocket) : _webSocket = webSocket; - - @override - Future close([int? closeCode, String? closeReason]) => - _webSocket.close(closeCode, closeReason); + /// Creates a channel wrapping [webSocket]. + IOWebSocketChannel(FutureOr<WebSocket> webSocket) + : super(webSocket is Future<WebSocket> + ? webSocket.then(io_web_socket.IOWebSocket.fromWebSocket) + as FutureOr<io_web_socket.IOWebSocket> + : io_web_socket.IOWebSocket.fromWebSocket(webSocket)); }
diff --git a/pkgs/web_socket_channel/lib/web_socket_adapter_web_socket_channel.dart b/pkgs/web_socket_channel/lib/web_socket_adapter_web_socket_channel.dart index 28780ff..2a1242a 100644 --- a/pkgs/web_socket_channel/lib/web_socket_adapter_web_socket_channel.dart +++ b/pkgs/web_socket_channel/lib/web_socket_adapter_web_socket_channel.dart
@@ -66,14 +66,19 @@ /// future will complete with an error. factory WebSocketAdapterWebSocketChannel.connect(Uri url, {Iterable<String>? protocols}) => - WebSocketAdapterWebSocketChannel._( + WebSocketAdapterWebSocketChannel( WebSocket.connect(url, protocols: protocols)); - // Create a [WebSocketWebSocketChannelAdapter] from an existing [WebSocket]. - factory WebSocketAdapterWebSocketChannel.fromWebSocket(WebSocket webSocket) => - WebSocketAdapterWebSocketChannel._(Future.value(webSocket)); + // Construct a [WebSocketWebSocketChannelAdapter] from an existing + // [WebSocket]. + WebSocketAdapterWebSocketChannel(FutureOr<WebSocket> webSocket) { + Future<WebSocket> webSocketFuture; + if (webSocket is WebSocket) { + webSocketFuture = Future.value(webSocket); + } else { + webSocketFuture = webSocket; + } - WebSocketAdapterWebSocketChannel._(Future<WebSocket> webSocketFuture) { webSocketFuture.then((webSocket) { var remoteClosed = false; webSocket.events.listen((event) { @@ -113,7 +118,16 @@ _protocol = webSocket.protocol; _readyCompleter.complete(); }, onError: (Object e) { - _readyCompleter.completeError(WebSocketChannelException.from(e)); + Exception error; + if (e is TimeoutException) { + // Required for backwards compatibility with `IOWebSocketChannel`. + error = e; + } else { + error = WebSocketChannelException.from(e); + } + _readyCompleter.completeError(error); + _controller.local.sink.addError(error); + _controller.local.sink.close(); }); } }
diff --git a/pkgs/web_socket_channel/pubspec.yaml b/pkgs/web_socket_channel/pubspec.yaml index 2dfbc7f..d198922 100644 --- a/pkgs/web_socket_channel/pubspec.yaml +++ b/pkgs/web_socket_channel/pubspec.yaml
@@ -14,7 +14,7 @@ crypto: ^3.0.0 stream_channel: ^2.1.0 web: ^0.5.0 - web_socket: ^0.1.0 + web_socket: ^0.1.1 dev_dependencies: dart_flutter_team_lints: ^2.0.0
diff --git a/pkgs/web_socket_channel/test/io_test.dart b/pkgs/web_socket_channel/test/io_test.dart index ac1ffcc..1b7ae35 100644 --- a/pkgs/web_socket_channel/test/io_test.dart +++ b/pkgs/web_socket_channel/test/io_test.dart
@@ -131,7 +131,7 @@ }); final channel = IOWebSocketChannel.connect('ws://localhost:${server.port}'); - expect(channel.ready, throwsA(isA<WebSocketException>())); + expect(channel.ready, throwsA(isA<WebSocketChannelException>())); expect(channel.stream.drain<void>(), throwsA(isA<WebSocketChannelException>())); }); @@ -154,7 +154,7 @@ 'ws://localhost:${server.port}', protocols: [failedProtocol], ); - expect(channel.ready, throwsA(isA<WebSocketException>())); + expect(channel.ready, throwsA(isA<WebSocketChannelException>())); expect( channel.stream.drain<void>(), throwsA(isA<WebSocketChannelException>()), @@ -230,8 +230,7 @@ ); expect(channel.ready, throwsA(isA<TimeoutException>())); - expect(channel.stream.drain<void>(), - throwsA(isA<WebSocketChannelException>())); + expect(channel.stream.drain<void>(), throwsA(anything)); }); test('.custom client is passed through', () async {
diff --git a/pkgs/web_socket_channel/test/web_socket_adapter_web_socket_test.dart b/pkgs/web_socket_channel/test/web_socket_adapter_web_socket_test.dart index 24f9907..c66133b 100644 --- a/pkgs/web_socket_channel/test/web_socket_adapter_web_socket_test.dart +++ b/pkgs/web_socket_channel/test/web_socket_adapter_web_socket_test.dart
@@ -112,9 +112,26 @@ expect(channel.closeReason, null); }); - test('fromWebSocket', () async { + test('constructor with WebSocket', () async { final webSocket = await WebSocket.connect(uri); - final channel = WebSocketAdapterWebSocketChannel.fromWebSocket(webSocket); + final channel = WebSocketAdapterWebSocketChannel(webSocket); + + await expectLater(channel.ready, completes); + channel.sink.add('This count says:'); + channel.sink.add([1, 2, 3]); + channel.sink.add('And then:'); + channel.sink.add([4, 5, 6]); + expect(await channel.stream.take(4).toList(), [ + 'This count says:', + [1, 2, 3], + 'And then:', + [4, 5, 6] + ]); + }); + + test('constructor with Future<WebSocket>', () async { + final webSocketFuture = WebSocket.connect(uri); + final channel = WebSocketAdapterWebSocketChannel(webSocketFuture); await expectLater(channel.ready, completes); channel.sink.add('This count says:');