Add WebSocketChannel.`ready` field and IOWebSocketChannel factory `connectTimeout` parameter  (dart-lang/web_socket_channel#240)

diff --git a/pkgs/web_socket_channel/CHANGELOG.md b/pkgs/web_socket_channel/CHANGELOG.md
index 1c5b128..6e0de0c 100644
--- a/pkgs/web_socket_channel/CHANGELOG.md
+++ b/pkgs/web_socket_channel/CHANGELOG.md
@@ -1,5 +1,9 @@
-## 2.2.1-dev
+## 2.3.0
 
+- Added a Future `ready` property to `WebSocketChannel`, which completes when
+  the connection is established
+- Added a `connectTimeout` parameter to the `IOWebSocketChannel.connect` factory,
+  which controls the timeout of the WebSocket Future.
 - Use platform agnostic code in README example.
 
 ## 2.2.0
diff --git a/pkgs/web_socket_channel/lib/html.dart b/pkgs/web_socket_channel/lib/html.dart
index 4fe35a4..080b2bf 100644
--- a/pkgs/web_socket_channel/lib/html.dart
+++ b/pkgs/web_socket_channel/lib/html.dart
@@ -45,8 +45,15 @@
   /// [_controller.local.stream].
   String? _localCloseReason;
 
+  /// Completer for [ready].
+  late Completer<void> _readyCompleter;
+
+  @override
+  Future<void> get ready => _readyCompleter.future;
+
   @override
   Stream get stream => _controller.foreign.stream;
+
   final _controller =
       StreamChannelController(sync: true, allowForeignErrors: false);
 
@@ -71,12 +78,20 @@
 
   /// Creates a channel wrapping [innerWebSocket].
   HtmlWebSocketChannel(this.innerWebSocket) {
+    _readyCompleter = Completer();
     if (innerWebSocket.readyState == WebSocket.OPEN) {
+      _readyCompleter.complete();
       _listen();
     } else {
+      if (innerWebSocket.readyState == WebSocket.CLOSING ||
+          innerWebSocket.readyState == WebSocket.CLOSED) {
+        _readyCompleter.completeError(WebSocketChannelException(
+            'WebSocket state error: ${innerWebSocket.readyState}'));
+      }
       // The socket API guarantees that only a single open event will be
       // emitted.
       innerWebSocket.onOpen.first.then((_) {
+        _readyCompleter.complete();
         _listen();
       });
     }
@@ -84,8 +99,9 @@
     // The socket API guarantees that only a single error event will be emitted,
     // and that once it is no open or message events will be emitted.
     innerWebSocket.onError.first.then((_) {
-      _controller.local.sink
-          .addError(WebSocketChannelException('WebSocket connection failed.'));
+      final error = WebSocketChannelException('WebSocket connection failed.');
+      _readyCompleter.completeError(error);
+      _controller.local.sink.addError(error);
       _controller.local.sink.close();
     });
 
diff --git a/pkgs/web_socket_channel/lib/io.dart b/pkgs/web_socket_channel/lib/io.dart
index a7bee8f..15f2f0d 100644
--- a/pkgs/web_socket_channel/lib/io.dart
+++ b/pkgs/web_socket_channel/lib/io.dart
@@ -32,9 +32,16 @@
 
   @override
   final Stream stream;
+
   @override
   final WebSocketSink sink;
 
+  /// Completer for [ready].
+  final Completer<void> _readyCompleter;
+
+  @override
+  Future<void> get ready => _readyCompleter.future;
+
   /// The underlying [WebSocket], if this channel has connected.
   ///
   /// If the future returned from [WebSocket.connect] has not yet completed, or
@@ -55,6 +62,10 @@
   /// [pingInterval]. It defaults to `null`, indicating that ping messages are
   /// disabled.
   ///
+  /// [connectTimeout] determines how long to wait for [WebSocket.connect]
+  /// before throwing a [TimeoutException]. If connectTimeout is null then the
+  /// connection process will never time-out.
+  ///
   /// If there's an error connecting, the channel's stream emits a
   /// [WebSocketChannelException] wrapping that error and then closes.
   factory IOWebSocketChannel.connect(
@@ -62,21 +73,28 @@
     Iterable<String>? protocols,
     Map<String, dynamic>? headers,
     Duration? pingInterval,
+    Duration? connectTimeout,
   }) {
     late IOWebSocketChannel channel;
     final sinkCompleter = WebSocketSinkCompleter();
-    final stream = StreamCompleter.fromFuture(
-      WebSocket.connect(url.toString(), headers: headers, protocols: protocols)
-          .then((webSocket) {
-        webSocket.pingInterval = pingInterval;
-        channel._webSocket = webSocket;
-        sinkCompleter.setDestinationSink(_IOWebSocketSink(webSocket));
-        return webSocket;
-      }).catchError(
-        (Object error) => throw WebSocketChannelException.from(error),
-      ),
+    var future = WebSocket.connect(
+      url.toString(),
+      headers: headers,
+      protocols: protocols,
     );
-
+    if (connectTimeout != null) {
+      future = future.timeout(connectTimeout);
+    }
+    final stream = StreamCompleter.fromFuture(future.then((webSocket) {
+      webSocket.pingInterval = pingInterval;
+      channel._webSocket = webSocket;
+      channel._readyCompleter.complete();
+      sinkCompleter.setDestinationSink(_IOWebSocketSink(webSocket));
+      return webSocket;
+    }).catchError((Object error, StackTrace stackTrace) {
+      channel._readyCompleter.completeError(error, stackTrace);
+      throw WebSocketChannelException.from(error);
+    }));
     return channel =
         IOWebSocketChannel._withoutSocket(stream, sinkCompleter.sink);
   }
@@ -86,7 +104,8 @@
       : _webSocket = socket,
         stream = socket.handleError(
             (error) => throw WebSocketChannelException.from(error)),
-        sink = _IOWebSocketSink(socket);
+        sink = _IOWebSocketSink(socket),
+        _readyCompleter = Completer()..complete();
 
   /// Creates a channel without a socket.
   ///
@@ -95,7 +114,8 @@
   IOWebSocketChannel._withoutSocket(Stream stream, this.sink)
       : _webSocket = null,
         stream = stream.handleError(
-            (error) => throw WebSocketChannelException.from(error));
+            (error) => throw WebSocketChannelException.from(error)),
+        _readyCompleter = Completer();
 }
 
 /// A [WebSocketSink] that forwards [close] calls to a `dart:io` [WebSocket].
diff --git a/pkgs/web_socket_channel/lib/src/channel.dart b/pkgs/web_socket_channel/lib/src/channel.dart
index fa0cb97..d912fcc 100644
--- a/pkgs/web_socket_channel/lib/src/channel.dart
+++ b/pkgs/web_socket_channel/lib/src/channel.dart
@@ -51,6 +51,10 @@
   /// Before the connection has been closed, this will be `null`.
   String? get closeReason => _webSocket.closeReason;
 
+  /// Future indicating if the connection has been established.
+  /// It completes on successful connection to the websocket.
+  final Future<void> ready = Future.value();
+
   @override
   Stream get stream => StreamView(_webSocket);
 
diff --git a/pkgs/web_socket_channel/pubspec.yaml b/pkgs/web_socket_channel/pubspec.yaml
index 4c83080..4041d87 100644
--- a/pkgs/web_socket_channel/pubspec.yaml
+++ b/pkgs/web_socket_channel/pubspec.yaml
@@ -1,5 +1,5 @@
 name: web_socket_channel
-version: 2.2.1-dev
+version: 2.3.0
 
 description: >-
   StreamChannel wrappers for WebSockets. Provides a cross-platform
diff --git a/pkgs/web_socket_channel/test/html_test.dart b/pkgs/web_socket_channel/test/html_test.dart
index f7ec4d9..54b34d7 100644
--- a/pkgs/web_socket_channel/test/html_test.dart
+++ b/pkgs/web_socket_channel/test/html_test.dart
@@ -37,6 +37,9 @@
   test('communicates using an existing WebSocket', () async {
     final webSocket = WebSocket('ws://localhost:$port');
     final channel = HtmlWebSocketChannel(webSocket);
+
+    expect(channel.ready, completes);
+
     addTearDown(channel.sink.close);
 
     final queue = StreamQueue(channel.stream);
@@ -59,6 +62,9 @@
     await webSocket.onOpen.first;
 
     final channel = HtmlWebSocketChannel(webSocket);
+
+    expect(channel.ready, completes);
+
     addTearDown(channel.sink.close);
 
     final queue = StreamQueue(channel.stream);
@@ -66,8 +72,29 @@
     expect(await queue.next, equals('foo'));
   });
 
+  test('communicates using an connecting WebSocket', () async {
+    final webSocket = WebSocket('ws://localhost:$port');
+
+    final channel = HtmlWebSocketChannel(webSocket);
+
+    expect(channel.ready, completes);
+
+    addTearDown(channel.sink.close);
+  });
+
+  test('communicates using an existing closed WebSocket', () async {
+    final webSocket = WebSocket('ws://localhost:$port');
+    webSocket.close();
+
+    final channel = HtmlWebSocketChannel(webSocket);
+    expect(channel.ready, throwsA(isA<WebSocketChannelException>()));
+  });
+
   test('.connect defaults to binary lists', () async {
     final channel = HtmlWebSocketChannel.connect('ws://localhost:$port');
+
+    expect(channel.ready, completes);
+
     addTearDown(channel.sink.close);
 
     final queue = StreamQueue(channel.stream);
@@ -81,6 +108,9 @@
   test('.connect defaults to binary lists using platform independent api',
       () async {
     final channel = WebSocketChannel.connect(Uri.parse('ws://localhost:$port'));
+
+    expect(channel.ready, completes);
+
     addTearDown(channel.sink.close);
 
     final queue = StreamQueue(channel.stream);
@@ -94,6 +124,9 @@
   test('.connect can use blobs', () async {
     final channel = HtmlWebSocketChannel.connect('ws://localhost:$port',
         binaryType: BinaryType.blob);
+
+    expect(channel.ready, completes);
+
     addTearDown(channel.sink.close);
 
     final queue = StreamQueue(channel.stream);
@@ -126,6 +159,7 @@
     // invalid.
     final channel = HtmlWebSocketChannel.connect(
         'ws://localhost:${await serverChannel.stream.first}');
+    expect(channel.ready, throwsA(isA<WebSocketChannelException>()));
     expect(channel.stream.toList(), throwsA(isA<WebSocketChannelException>()));
   });
 }
diff --git a/pkgs/web_socket_channel/test/io_test.dart b/pkgs/web_socket_channel/test/io_test.dart
index 111abb4..f681ac6 100644
--- a/pkgs/web_socket_channel/test/io_test.dart
+++ b/pkgs/web_socket_channel/test/io_test.dart
@@ -3,6 +3,7 @@
 // BSD-style license that can be found in the LICENSE file.
 
 @TestOn('vm')
+import 'dart:async';
 import 'dart:io';
 
 import 'package:test/test.dart';
@@ -28,6 +29,8 @@
     final webSocket = await WebSocket.connect('ws://localhost:${server.port}');
     final channel = IOWebSocketChannel(webSocket);
 
+    expect(channel.ready, completes);
+
     var n = 0;
     channel.stream.listen((message) {
       if (n == 0) {
@@ -47,6 +50,7 @@
 
   test('.connect communicates immediately', () async {
     server = await HttpServer.bind('localhost', 0);
+    addTearDown(server.close);
     server.transform(WebSocketTransformer()).listen((WebSocket webSocket) {
       final channel = IOWebSocketChannel(webSocket);
       channel.stream.listen((request) {
@@ -56,6 +60,9 @@
     });
 
     final channel = IOWebSocketChannel.connect('ws://localhost:${server.port}');
+
+    expect(channel.ready, completes);
+
     channel.sink.add('ping');
 
     channel.stream.listen(
@@ -69,6 +76,7 @@
   test('.connect communicates immediately using platform independent api',
       () async {
     server = await HttpServer.bind('localhost', 0);
+    addTearDown(server.close);
     server.transform(WebSocketTransformer()).listen((WebSocket webSocket) {
       final channel = IOWebSocketChannel(webSocket);
       channel.stream.listen((request) {
@@ -79,6 +87,9 @@
 
     final channel =
         WebSocketChannel.connect(Uri.parse('ws://localhost:${server.port}'));
+
+    expect(channel.ready, completes);
+
     channel.sink.add('ping');
 
     channel.stream.listen(
@@ -91,6 +102,7 @@
 
   test('.connect with an immediate call to close', () async {
     server = await HttpServer.bind('localhost', 0);
+    addTearDown(server.close);
     server.transform(WebSocketTransformer()).listen((WebSocket webSocket) {
       expect(() async {
         final channel = IOWebSocketChannel(webSocket);
@@ -101,18 +113,23 @@
     });
 
     final channel = IOWebSocketChannel.connect('ws://localhost:${server.port}');
+
+    expect(channel.ready, completes);
+
     await channel.sink.close(5678, 'raisin');
   });
 
   test('.connect wraps a connection error in WebSocketChannelException',
       () async {
     server = await HttpServer.bind('localhost', 0);
+    addTearDown(server.close);
     server.listen((request) {
       request.response.statusCode = 404;
       request.response.close();
     });
 
     final channel = IOWebSocketChannel.connect('ws://localhost:${server.port}');
+    expect(channel.ready, throwsA(isA<WebSocketException>()));
     expect(channel.stream.drain(), throwsA(isA<WebSocketChannelException>()));
   });
 
@@ -122,6 +139,7 @@
     String selector(List<String> receivedProtocols) => passedProtocol;
 
     server = await HttpServer.bind('localhost', 0);
+    addTearDown(server.close);
     server.listen((HttpRequest request) {
       expect(
         WebSocketTransformer.upgrade(request, protocolSelector: selector),
@@ -133,6 +151,7 @@
       'ws://localhost:${server.port}',
       protocols: [failedProtocol],
     );
+    expect(channel.ready, throwsA(isA<WebSocketException>()));
     expect(
       channel.stream.drain(),
       throwsA(isA<WebSocketChannelException>()),
@@ -144,6 +163,7 @@
     String selector(List<String> receivedProtocols) => passedProtocol;
 
     server = await HttpServer.bind('localhost', 0);
+    addTearDown(server.close);
     server.listen((HttpRequest request) async {
       final webSocket = await WebSocketTransformer.upgrade(
         request,
@@ -155,7 +175,58 @@
 
     final channel = IOWebSocketChannel.connect('ws://localhost:${server.port}',
         protocols: [passedProtocol]);
+
+    expect(channel.ready, completes);
+
     await channel.stream.drain();
     expect(channel.protocol, passedProtocol);
   });
+
+  test('.connects with a timeout parameters specified', () async {
+    server = await HttpServer.bind('localhost', 0);
+    addTearDown(server.close);
+    server.transform(WebSocketTransformer()).listen((webSocket) {
+      expect(() async {
+        final channel = IOWebSocketChannel(webSocket);
+        await channel.stream.drain();
+        expect(channel.closeCode, equals(5678));
+        expect(channel.closeReason, equals('raisin'));
+      }(), completes);
+    });
+
+    final channel = IOWebSocketChannel.connect(
+      'ws://localhost:${server.port}',
+      connectTimeout: const Duration(milliseconds: 1000),
+    );
+    expect(channel.ready, completes);
+    await channel.sink.close(5678, 'raisin');
+  });
+
+  test('.respects timeout parameter when trying to connect', () async {
+    server = await HttpServer.bind('localhost', 0);
+    addTearDown(server.close);
+    server
+        .transform(StreamTransformer<HttpRequest, HttpRequest>.fromHandlers(
+            handleData: (data, sink) {
+          // Wait before we handle this request, to give the timeout a chance to
+          // kick in. We still want to make sure that we handle the request
+          // afterwards to not have false positives with the timeout
+          Timer(const Duration(milliseconds: 800), () {
+            sink.add(data);
+          });
+        }))
+        .transform(WebSocketTransformer())
+        .listen((webSocket) {
+          final channel = IOWebSocketChannel(webSocket);
+          channel.stream.drain();
+        });
+
+    final channel = IOWebSocketChannel.connect(
+      'ws://localhost:${server.port}',
+      connectTimeout: const Duration(milliseconds: 500),
+    );
+
+    expect(channel.ready, throwsA(isA<TimeoutException>()));
+    expect(channel.stream.drain(), throwsA(isA<WebSocketChannelException>()));
+  });
 }