Add WebSocketChannel.`ready` field and IOWebSocketChannel factory `connectTimeout` parameter (dart-lang/web_socket_channel#240)
diff --git a/pkgs/web_socket_channel/CHANGELOG.md b/pkgs/web_socket_channel/CHANGELOG.md
index 1c5b128..6e0de0c 100644
--- a/pkgs/web_socket_channel/CHANGELOG.md
+++ b/pkgs/web_socket_channel/CHANGELOG.md
@@ -1,5 +1,9 @@
-## 2.2.1-dev
+## 2.3.0
+- Added a Future `ready` property to `WebSocketChannel`, which completes when
+ the connection is established
+- Added a `connectTimeout` parameter to the `IOWebSocketChannel.connect` factory,
+ which controls the timeout of the WebSocket Future.
- Use platform agnostic code in README example.
## 2.2.0
diff --git a/pkgs/web_socket_channel/lib/html.dart b/pkgs/web_socket_channel/lib/html.dart
index 4fe35a4..080b2bf 100644
--- a/pkgs/web_socket_channel/lib/html.dart
+++ b/pkgs/web_socket_channel/lib/html.dart
@@ -45,8 +45,15 @@
/// [_controller.local.stream].
String? _localCloseReason;
+ /// Completer for [ready].
+ late Completer<void> _readyCompleter;
+
+ @override
+ Future<void> get ready => _readyCompleter.future;
+
@override
Stream get stream => _controller.foreign.stream;
+
final _controller =
StreamChannelController(sync: true, allowForeignErrors: false);
@@ -71,12 +78,20 @@
/// Creates a channel wrapping [innerWebSocket].
HtmlWebSocketChannel(this.innerWebSocket) {
+ _readyCompleter = Completer();
if (innerWebSocket.readyState == WebSocket.OPEN) {
+ _readyCompleter.complete();
_listen();
} else {
+ if (innerWebSocket.readyState == WebSocket.CLOSING ||
+ innerWebSocket.readyState == WebSocket.CLOSED) {
+ _readyCompleter.completeError(WebSocketChannelException(
+ 'WebSocket state error: ${innerWebSocket.readyState}'));
+ }
// The socket API guarantees that only a single open event will be
// emitted.
innerWebSocket.onOpen.first.then((_) {
+ _readyCompleter.complete();
_listen();
});
}
@@ -84,8 +99,9 @@
// The socket API guarantees that only a single error event will be emitted,
// and that once it is no open or message events will be emitted.
innerWebSocket.onError.first.then((_) {
- _controller.local.sink
- .addError(WebSocketChannelException('WebSocket connection failed.'));
+ final error = WebSocketChannelException('WebSocket connection failed.');
+ _readyCompleter.completeError(error);
+ _controller.local.sink.addError(error);
_controller.local.sink.close();
});
diff --git a/pkgs/web_socket_channel/lib/io.dart b/pkgs/web_socket_channel/lib/io.dart
index a7bee8f..15f2f0d 100644
--- a/pkgs/web_socket_channel/lib/io.dart
+++ b/pkgs/web_socket_channel/lib/io.dart
@@ -32,9 +32,16 @@
@override
final Stream stream;
+
@override
final WebSocketSink sink;
+ /// Completer for [ready].
+ final Completer<void> _readyCompleter;
+
+ @override
+ Future<void> get ready => _readyCompleter.future;
+
/// The underlying [WebSocket], if this channel has connected.
///
/// If the future returned from [WebSocket.connect] has not yet completed, or
@@ -55,6 +62,10 @@
/// [pingInterval]. It defaults to `null`, indicating that ping messages are
/// disabled.
///
+ /// [connectTimeout] determines how long to wait for [WebSocket.connect]
+ /// before throwing a [TimeoutException]. If connectTimeout is null then the
+ /// connection process will never time-out.
+ ///
/// If there's an error connecting, the channel's stream emits a
/// [WebSocketChannelException] wrapping that error and then closes.
factory IOWebSocketChannel.connect(
@@ -62,21 +73,28 @@
Iterable<String>? protocols,
Map<String, dynamic>? headers,
Duration? pingInterval,
+ Duration? connectTimeout,
}) {
late IOWebSocketChannel channel;
final sinkCompleter = WebSocketSinkCompleter();
- final stream = StreamCompleter.fromFuture(
- WebSocket.connect(url.toString(), headers: headers, protocols: protocols)
- .then((webSocket) {
- webSocket.pingInterval = pingInterval;
- channel._webSocket = webSocket;
- sinkCompleter.setDestinationSink(_IOWebSocketSink(webSocket));
- return webSocket;
- }).catchError(
- (Object error) => throw WebSocketChannelException.from(error),
- ),
+ var future = WebSocket.connect(
+ url.toString(),
+ headers: headers,
+ protocols: protocols,
);
-
+ if (connectTimeout != null) {
+ future = future.timeout(connectTimeout);
+ }
+ final stream = StreamCompleter.fromFuture(future.then((webSocket) {
+ webSocket.pingInterval = pingInterval;
+ channel._webSocket = webSocket;
+ channel._readyCompleter.complete();
+ sinkCompleter.setDestinationSink(_IOWebSocketSink(webSocket));
+ return webSocket;
+ }).catchError((Object error, StackTrace stackTrace) {
+ channel._readyCompleter.completeError(error, stackTrace);
+ throw WebSocketChannelException.from(error);
+ }));
return channel =
IOWebSocketChannel._withoutSocket(stream, sinkCompleter.sink);
}
@@ -86,7 +104,8 @@
: _webSocket = socket,
stream = socket.handleError(
(error) => throw WebSocketChannelException.from(error)),
- sink = _IOWebSocketSink(socket);
+ sink = _IOWebSocketSink(socket),
+ _readyCompleter = Completer()..complete();
/// Creates a channel without a socket.
///
@@ -95,7 +114,8 @@
IOWebSocketChannel._withoutSocket(Stream stream, this.sink)
: _webSocket = null,
stream = stream.handleError(
- (error) => throw WebSocketChannelException.from(error));
+ (error) => throw WebSocketChannelException.from(error)),
+ _readyCompleter = Completer();
}
/// A [WebSocketSink] that forwards [close] calls to a `dart:io` [WebSocket].
diff --git a/pkgs/web_socket_channel/lib/src/channel.dart b/pkgs/web_socket_channel/lib/src/channel.dart
index fa0cb97..d912fcc 100644
--- a/pkgs/web_socket_channel/lib/src/channel.dart
+++ b/pkgs/web_socket_channel/lib/src/channel.dart
@@ -51,6 +51,10 @@
/// Before the connection has been closed, this will be `null`.
String? get closeReason => _webSocket.closeReason;
+ /// Future indicating if the connection has been established.
+ /// It completes on successful connection to the websocket.
+ final Future<void> ready = Future.value();
+
@override
Stream get stream => StreamView(_webSocket);
diff --git a/pkgs/web_socket_channel/pubspec.yaml b/pkgs/web_socket_channel/pubspec.yaml
index 4c83080..4041d87 100644
--- a/pkgs/web_socket_channel/pubspec.yaml
+++ b/pkgs/web_socket_channel/pubspec.yaml
@@ -1,5 +1,5 @@
name: web_socket_channel
-version: 2.2.1-dev
+version: 2.3.0
description: >-
StreamChannel wrappers for WebSockets. Provides a cross-platform
diff --git a/pkgs/web_socket_channel/test/html_test.dart b/pkgs/web_socket_channel/test/html_test.dart
index f7ec4d9..54b34d7 100644
--- a/pkgs/web_socket_channel/test/html_test.dart
+++ b/pkgs/web_socket_channel/test/html_test.dart
@@ -37,6 +37,9 @@
test('communicates using an existing WebSocket', () async {
final webSocket = WebSocket('ws://localhost:$port');
final channel = HtmlWebSocketChannel(webSocket);
+
+ expect(channel.ready, completes);
+
addTearDown(channel.sink.close);
final queue = StreamQueue(channel.stream);
@@ -59,6 +62,9 @@
await webSocket.onOpen.first;
final channel = HtmlWebSocketChannel(webSocket);
+
+ expect(channel.ready, completes);
+
addTearDown(channel.sink.close);
final queue = StreamQueue(channel.stream);
@@ -66,8 +72,29 @@
expect(await queue.next, equals('foo'));
});
+ test('communicates using an connecting WebSocket', () async {
+ final webSocket = WebSocket('ws://localhost:$port');
+
+ final channel = HtmlWebSocketChannel(webSocket);
+
+ expect(channel.ready, completes);
+
+ addTearDown(channel.sink.close);
+ });
+
+ test('communicates using an existing closed WebSocket', () async {
+ final webSocket = WebSocket('ws://localhost:$port');
+ webSocket.close();
+
+ final channel = HtmlWebSocketChannel(webSocket);
+ expect(channel.ready, throwsA(isA<WebSocketChannelException>()));
+ });
+
test('.connect defaults to binary lists', () async {
final channel = HtmlWebSocketChannel.connect('ws://localhost:$port');
+
+ expect(channel.ready, completes);
+
addTearDown(channel.sink.close);
final queue = StreamQueue(channel.stream);
@@ -81,6 +108,9 @@
test('.connect defaults to binary lists using platform independent api',
() async {
final channel = WebSocketChannel.connect(Uri.parse('ws://localhost:$port'));
+
+ expect(channel.ready, completes);
+
addTearDown(channel.sink.close);
final queue = StreamQueue(channel.stream);
@@ -94,6 +124,9 @@
test('.connect can use blobs', () async {
final channel = HtmlWebSocketChannel.connect('ws://localhost:$port',
binaryType: BinaryType.blob);
+
+ expect(channel.ready, completes);
+
addTearDown(channel.sink.close);
final queue = StreamQueue(channel.stream);
@@ -126,6 +159,7 @@
// invalid.
final channel = HtmlWebSocketChannel.connect(
'ws://localhost:${await serverChannel.stream.first}');
+ expect(channel.ready, throwsA(isA<WebSocketChannelException>()));
expect(channel.stream.toList(), throwsA(isA<WebSocketChannelException>()));
});
}
diff --git a/pkgs/web_socket_channel/test/io_test.dart b/pkgs/web_socket_channel/test/io_test.dart
index 111abb4..f681ac6 100644
--- a/pkgs/web_socket_channel/test/io_test.dart
+++ b/pkgs/web_socket_channel/test/io_test.dart
@@ -3,6 +3,7 @@
// BSD-style license that can be found in the LICENSE file.
@TestOn('vm')
+import 'dart:async';
import 'dart:io';
import 'package:test/test.dart';
@@ -28,6 +29,8 @@
final webSocket = await WebSocket.connect('ws://localhost:${server.port}');
final channel = IOWebSocketChannel(webSocket);
+ expect(channel.ready, completes);
+
var n = 0;
channel.stream.listen((message) {
if (n == 0) {
@@ -47,6 +50,7 @@
test('.connect communicates immediately', () async {
server = await HttpServer.bind('localhost', 0);
+ addTearDown(server.close);
server.transform(WebSocketTransformer()).listen((WebSocket webSocket) {
final channel = IOWebSocketChannel(webSocket);
channel.stream.listen((request) {
@@ -56,6 +60,9 @@
});
final channel = IOWebSocketChannel.connect('ws://localhost:${server.port}');
+
+ expect(channel.ready, completes);
+
channel.sink.add('ping');
channel.stream.listen(
@@ -69,6 +76,7 @@
test('.connect communicates immediately using platform independent api',
() async {
server = await HttpServer.bind('localhost', 0);
+ addTearDown(server.close);
server.transform(WebSocketTransformer()).listen((WebSocket webSocket) {
final channel = IOWebSocketChannel(webSocket);
channel.stream.listen((request) {
@@ -79,6 +87,9 @@
final channel =
WebSocketChannel.connect(Uri.parse('ws://localhost:${server.port}'));
+
+ expect(channel.ready, completes);
+
channel.sink.add('ping');
channel.stream.listen(
@@ -91,6 +102,7 @@
test('.connect with an immediate call to close', () async {
server = await HttpServer.bind('localhost', 0);
+ addTearDown(server.close);
server.transform(WebSocketTransformer()).listen((WebSocket webSocket) {
expect(() async {
final channel = IOWebSocketChannel(webSocket);
@@ -101,18 +113,23 @@
});
final channel = IOWebSocketChannel.connect('ws://localhost:${server.port}');
+
+ expect(channel.ready, completes);
+
await channel.sink.close(5678, 'raisin');
});
test('.connect wraps a connection error in WebSocketChannelException',
() async {
server = await HttpServer.bind('localhost', 0);
+ addTearDown(server.close);
server.listen((request) {
request.response.statusCode = 404;
request.response.close();
});
final channel = IOWebSocketChannel.connect('ws://localhost:${server.port}');
+ expect(channel.ready, throwsA(isA<WebSocketException>()));
expect(channel.stream.drain(), throwsA(isA<WebSocketChannelException>()));
});
@@ -122,6 +139,7 @@
String selector(List<String> receivedProtocols) => passedProtocol;
server = await HttpServer.bind('localhost', 0);
+ addTearDown(server.close);
server.listen((HttpRequest request) {
expect(
WebSocketTransformer.upgrade(request, protocolSelector: selector),
@@ -133,6 +151,7 @@
'ws://localhost:${server.port}',
protocols: [failedProtocol],
);
+ expect(channel.ready, throwsA(isA<WebSocketException>()));
expect(
channel.stream.drain(),
throwsA(isA<WebSocketChannelException>()),
@@ -144,6 +163,7 @@
String selector(List<String> receivedProtocols) => passedProtocol;
server = await HttpServer.bind('localhost', 0);
+ addTearDown(server.close);
server.listen((HttpRequest request) async {
final webSocket = await WebSocketTransformer.upgrade(
request,
@@ -155,7 +175,58 @@
final channel = IOWebSocketChannel.connect('ws://localhost:${server.port}',
protocols: [passedProtocol]);
+
+ expect(channel.ready, completes);
+
await channel.stream.drain();
expect(channel.protocol, passedProtocol);
});
+
+ test('.connects with a timeout parameters specified', () async {
+ server = await HttpServer.bind('localhost', 0);
+ addTearDown(server.close);
+ server.transform(WebSocketTransformer()).listen((webSocket) {
+ expect(() async {
+ final channel = IOWebSocketChannel(webSocket);
+ await channel.stream.drain();
+ expect(channel.closeCode, equals(5678));
+ expect(channel.closeReason, equals('raisin'));
+ }(), completes);
+ });
+
+ final channel = IOWebSocketChannel.connect(
+ 'ws://localhost:${server.port}',
+ connectTimeout: const Duration(milliseconds: 1000),
+ );
+ expect(channel.ready, completes);
+ await channel.sink.close(5678, 'raisin');
+ });
+
+ test('.respects timeout parameter when trying to connect', () async {
+ server = await HttpServer.bind('localhost', 0);
+ addTearDown(server.close);
+ server
+ .transform(StreamTransformer<HttpRequest, HttpRequest>.fromHandlers(
+ handleData: (data, sink) {
+ // Wait before we handle this request, to give the timeout a chance to
+ // kick in. We still want to make sure that we handle the request
+ // afterwards to not have false positives with the timeout
+ Timer(const Duration(milliseconds: 800), () {
+ sink.add(data);
+ });
+ }))
+ .transform(WebSocketTransformer())
+ .listen((webSocket) {
+ final channel = IOWebSocketChannel(webSocket);
+ channel.stream.drain();
+ });
+
+ final channel = IOWebSocketChannel.connect(
+ 'ws://localhost:${server.port}',
+ connectTimeout: const Duration(milliseconds: 500),
+ );
+
+ expect(channel.ready, throwsA(isA<TimeoutException>()));
+ expect(channel.stream.drain(), throwsA(isA<WebSocketChannelException>()));
+ });
}