Implement IOWebSocketChannel as a WebSocketAdapterWebSocket subclass (#342)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 139ef22..5026c83 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,8 @@
default implementation for `WebSocketChannel.connect`.
- **BREAKING**: Remove `WebSocketChannel` constructor.
- **BREAKING**: Make `WebSocketChannel` an `abstract interface`.
+- **BREAKING**: `IOWebSocketChannel.ready` will throw
+ `WebSocketChannelException` instead of `WebSocketException`.
## 2.4.5
diff --git a/lib/io.dart b/lib/io.dart
index 9f71d81..cfba457 100644
--- a/lib/io.dart
+++ b/lib/io.dart
@@ -3,51 +3,15 @@
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
-import 'dart:io';
-
-import 'package:async/async.dart';
-import 'package:stream_channel/stream_channel.dart';
+import 'dart:io' show HttpClient, WebSocket;
+import 'package:web_socket/io_web_socket.dart' as io_web_socket;
import 'src/channel.dart';
import 'src/exception.dart';
-import 'src/sink_completer.dart';
+import 'web_socket_adapter_web_socket_channel.dart';
/// A [WebSocketChannel] that communicates using a `dart:io` [WebSocket].
-class IOWebSocketChannel extends StreamChannelMixin
- implements WebSocketChannel {
- /// The underlying `dart:io` [WebSocket].
- ///
- /// If the channel was constructed with [IOWebSocketChannel.connect], this is
- /// `null` until the [WebSocket.connect] future completes.
- WebSocket? _webSocket;
-
- @override
- String? get protocol => _webSocket?.protocol;
-
- @override
- int? get closeCode => _webSocket?.closeCode;
-
- @override
- String? get closeReason => _webSocket?.closeReason;
-
- @override
- final Stream stream;
-
- @override
- final WebSocketSink sink;
-
- /// Completer for [ready].
- final Completer<void> _readyCompleter;
-
- @override
- Future<void> get ready => _readyCompleter.future;
-
- /// The underlying [WebSocket], if this channel has connected.
- ///
- /// If the future returned from [WebSocket.connect] has not yet completed, or
- /// completed as an error, this will be null.
- WebSocket? get innerWebSocket => _webSocket;
-
+class IOWebSocketChannel extends WebSocketAdapterWebSocketChannel {
/// Creates a new WebSocket connection.
///
/// Connects to [url] using [WebSocket.connect] and returns a channel that can
@@ -76,58 +40,24 @@
Duration? connectTimeout,
HttpClient? customClient,
}) {
- late IOWebSocketChannel channel;
- final sinkCompleter = WebSocketSinkCompleter();
- var future = WebSocket.connect(
+ var webSocketFuture = WebSocket.connect(
url.toString(),
headers: headers,
protocols: protocols,
customClient: customClient,
- );
+ ).then((webSocket) => webSocket..pingInterval = pingInterval);
+
if (connectTimeout != null) {
- future = future.timeout(connectTimeout);
+ webSocketFuture = webSocketFuture.timeout(connectTimeout);
}
- final stream = StreamCompleter.fromFuture(future.then((webSocket) {
- webSocket.pingInterval = pingInterval;
- channel._webSocket = webSocket;
- channel._readyCompleter.complete();
- sinkCompleter.setDestinationSink(_IOWebSocketSink(webSocket));
- return webSocket;
- }).catchError((Object error, StackTrace stackTrace) {
- channel._readyCompleter.completeError(error, stackTrace);
- throw WebSocketChannelException.from(error);
- }));
- return channel =
- IOWebSocketChannel._withoutSocket(stream, sinkCompleter.sink);
+
+ return IOWebSocketChannel(webSocketFuture);
}
- /// Creates a channel wrapping [socket].
- IOWebSocketChannel(WebSocket socket)
- : _webSocket = socket,
- stream = socket.handleError(
- (Object? error) => throw WebSocketChannelException.from(error)),
- sink = _IOWebSocketSink(socket),
- _readyCompleter = Completer()..complete();
-
- /// Creates a channel without a socket.
- ///
- /// This is used with `connect` to synchronously provide a channel that later
- /// has a socket added.
- IOWebSocketChannel._withoutSocket(Stream stream, this.sink)
- : _webSocket = null,
- stream = stream.handleError(
- (Object? error) => throw WebSocketChannelException.from(error)),
- _readyCompleter = Completer();
-}
-
-/// A [WebSocketSink] that forwards [close] calls to a `dart:io` [WebSocket].
-class _IOWebSocketSink extends DelegatingStreamSink implements WebSocketSink {
- /// The underlying socket.
- final WebSocket _webSocket;
-
- _IOWebSocketSink(WebSocket super.webSocket) : _webSocket = webSocket;
-
- @override
- Future close([int? closeCode, String? closeReason]) =>
- _webSocket.close(closeCode, closeReason);
+ /// Creates a channel wrapping [webSocket].
+ IOWebSocketChannel(FutureOr<WebSocket> webSocket)
+ : super(webSocket is Future<WebSocket>
+ ? webSocket.then(io_web_socket.IOWebSocket.fromWebSocket)
+ as FutureOr<io_web_socket.IOWebSocket>
+ : io_web_socket.IOWebSocket.fromWebSocket(webSocket));
}
diff --git a/lib/web_socket_adapter_web_socket_channel.dart b/lib/web_socket_adapter_web_socket_channel.dart
index 28780ff..2a1242a 100644
--- a/lib/web_socket_adapter_web_socket_channel.dart
+++ b/lib/web_socket_adapter_web_socket_channel.dart
@@ -66,14 +66,19 @@
/// future will complete with an error.
factory WebSocketAdapterWebSocketChannel.connect(Uri url,
{Iterable<String>? protocols}) =>
- WebSocketAdapterWebSocketChannel._(
+ WebSocketAdapterWebSocketChannel(
WebSocket.connect(url, protocols: protocols));
- // Create a [WebSocketWebSocketChannelAdapter] from an existing [WebSocket].
- factory WebSocketAdapterWebSocketChannel.fromWebSocket(WebSocket webSocket) =>
- WebSocketAdapterWebSocketChannel._(Future.value(webSocket));
+ // Construct a [WebSocketWebSocketChannelAdapter] from an existing
+ // [WebSocket].
+ WebSocketAdapterWebSocketChannel(FutureOr<WebSocket> webSocket) {
+ Future<WebSocket> webSocketFuture;
+ if (webSocket is WebSocket) {
+ webSocketFuture = Future.value(webSocket);
+ } else {
+ webSocketFuture = webSocket;
+ }
- WebSocketAdapterWebSocketChannel._(Future<WebSocket> webSocketFuture) {
webSocketFuture.then((webSocket) {
var remoteClosed = false;
webSocket.events.listen((event) {
@@ -113,7 +118,16 @@
_protocol = webSocket.protocol;
_readyCompleter.complete();
}, onError: (Object e) {
- _readyCompleter.completeError(WebSocketChannelException.from(e));
+ Exception error;
+ if (e is TimeoutException) {
+ // Required for backwards compatibility with `IOWebSocketChannel`.
+ error = e;
+ } else {
+ error = WebSocketChannelException.from(e);
+ }
+ _readyCompleter.completeError(error);
+ _controller.local.sink.addError(error);
+ _controller.local.sink.close();
});
}
}
diff --git a/pubspec.yaml b/pubspec.yaml
index 2dfbc7f..d198922 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -14,7 +14,7 @@
crypto: ^3.0.0
stream_channel: ^2.1.0
web: ^0.5.0
- web_socket: ^0.1.0
+ web_socket: ^0.1.1
dev_dependencies:
dart_flutter_team_lints: ^2.0.0
diff --git a/test/io_test.dart b/test/io_test.dart
index ac1ffcc..1b7ae35 100644
--- a/test/io_test.dart
+++ b/test/io_test.dart
@@ -131,7 +131,7 @@
});
final channel = IOWebSocketChannel.connect('ws://localhost:${server.port}');
- expect(channel.ready, throwsA(isA<WebSocketException>()));
+ expect(channel.ready, throwsA(isA<WebSocketChannelException>()));
expect(channel.stream.drain<void>(),
throwsA(isA<WebSocketChannelException>()));
});
@@ -154,7 +154,7 @@
'ws://localhost:${server.port}',
protocols: [failedProtocol],
);
- expect(channel.ready, throwsA(isA<WebSocketException>()));
+ expect(channel.ready, throwsA(isA<WebSocketChannelException>()));
expect(
channel.stream.drain<void>(),
throwsA(isA<WebSocketChannelException>()),
@@ -230,8 +230,7 @@
);
expect(channel.ready, throwsA(isA<TimeoutException>()));
- expect(channel.stream.drain<void>(),
- throwsA(isA<WebSocketChannelException>()));
+ expect(channel.stream.drain<void>(), throwsA(anything));
});
test('.custom client is passed through', () async {
diff --git a/test/web_socket_adapter_web_socket_test.dart b/test/web_socket_adapter_web_socket_test.dart
index 24f9907..c66133b 100644
--- a/test/web_socket_adapter_web_socket_test.dart
+++ b/test/web_socket_adapter_web_socket_test.dart
@@ -112,9 +112,26 @@
expect(channel.closeReason, null);
});
- test('fromWebSocket', () async {
+ test('constructor with WebSocket', () async {
final webSocket = await WebSocket.connect(uri);
- final channel = WebSocketAdapterWebSocketChannel.fromWebSocket(webSocket);
+ final channel = WebSocketAdapterWebSocketChannel(webSocket);
+
+ await expectLater(channel.ready, completes);
+ channel.sink.add('This count says:');
+ channel.sink.add([1, 2, 3]);
+ channel.sink.add('And then:');
+ channel.sink.add([4, 5, 6]);
+ expect(await channel.stream.take(4).toList(), [
+ 'This count says:',
+ [1, 2, 3],
+ 'And then:',
+ [4, 5, 6]
+ ]);
+ });
+
+ test('constructor with Future<WebSocket>', () async {
+ final webSocketFuture = WebSocket.connect(uri);
+ final channel = WebSocketAdapterWebSocketChannel(webSocketFuture);
await expectLater(channel.ready, completes);
channel.sink.add('This count says:');