Implement IOWebSocketChannel as a WebSocketAdapterWebSocket subclass (#342)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 139ef22..5026c83 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,8 @@
   default implementation for `WebSocketChannel.connect`.
 - **BREAKING**: Remove `WebSocketChannel` constructor.
 - **BREAKING**: Make `WebSocketChannel` an `abstract interface`.
+- **BREAKING**: `IOWebSocketChannel.ready` will throw
+  `WebSocketChannelException` instead of `WebSocketException`.
 
 ## 2.4.5
 
diff --git a/lib/io.dart b/lib/io.dart
index 9f71d81..cfba457 100644
--- a/lib/io.dart
+++ b/lib/io.dart
@@ -3,51 +3,15 @@
 // BSD-style license that can be found in the LICENSE file.
 
 import 'dart:async';
-import 'dart:io';
-
-import 'package:async/async.dart';
-import 'package:stream_channel/stream_channel.dart';
+import 'dart:io' show HttpClient, WebSocket;
+import 'package:web_socket/io_web_socket.dart' as io_web_socket;
 
 import 'src/channel.dart';
 import 'src/exception.dart';
-import 'src/sink_completer.dart';
+import 'web_socket_adapter_web_socket_channel.dart';
 
 /// A [WebSocketChannel] that communicates using a `dart:io` [WebSocket].
-class IOWebSocketChannel extends StreamChannelMixin
-    implements WebSocketChannel {
-  /// The underlying `dart:io` [WebSocket].
-  ///
-  /// If the channel was constructed with [IOWebSocketChannel.connect], this is
-  /// `null` until the [WebSocket.connect] future completes.
-  WebSocket? _webSocket;
-
-  @override
-  String? get protocol => _webSocket?.protocol;
-
-  @override
-  int? get closeCode => _webSocket?.closeCode;
-
-  @override
-  String? get closeReason => _webSocket?.closeReason;
-
-  @override
-  final Stream stream;
-
-  @override
-  final WebSocketSink sink;
-
-  /// Completer for [ready].
-  final Completer<void> _readyCompleter;
-
-  @override
-  Future<void> get ready => _readyCompleter.future;
-
-  /// The underlying [WebSocket], if this channel has connected.
-  ///
-  /// If the future returned from [WebSocket.connect] has not yet completed, or
-  /// completed as an error, this will be null.
-  WebSocket? get innerWebSocket => _webSocket;
-
+class IOWebSocketChannel extends WebSocketAdapterWebSocketChannel {
   /// Creates a new WebSocket connection.
   ///
   /// Connects to [url] using [WebSocket.connect] and returns a channel that can
@@ -76,58 +40,24 @@
     Duration? connectTimeout,
     HttpClient? customClient,
   }) {
-    late IOWebSocketChannel channel;
-    final sinkCompleter = WebSocketSinkCompleter();
-    var future = WebSocket.connect(
+    var webSocketFuture = WebSocket.connect(
       url.toString(),
       headers: headers,
       protocols: protocols,
       customClient: customClient,
-    );
+    ).then((webSocket) => webSocket..pingInterval = pingInterval);
+
     if (connectTimeout != null) {
-      future = future.timeout(connectTimeout);
+      webSocketFuture = webSocketFuture.timeout(connectTimeout);
     }
-    final stream = StreamCompleter.fromFuture(future.then((webSocket) {
-      webSocket.pingInterval = pingInterval;
-      channel._webSocket = webSocket;
-      channel._readyCompleter.complete();
-      sinkCompleter.setDestinationSink(_IOWebSocketSink(webSocket));
-      return webSocket;
-    }).catchError((Object error, StackTrace stackTrace) {
-      channel._readyCompleter.completeError(error, stackTrace);
-      throw WebSocketChannelException.from(error);
-    }));
-    return channel =
-        IOWebSocketChannel._withoutSocket(stream, sinkCompleter.sink);
+
+    return IOWebSocketChannel(webSocketFuture);
   }
 
-  /// Creates a channel wrapping [socket].
-  IOWebSocketChannel(WebSocket socket)
-      : _webSocket = socket,
-        stream = socket.handleError(
-            (Object? error) => throw WebSocketChannelException.from(error)),
-        sink = _IOWebSocketSink(socket),
-        _readyCompleter = Completer()..complete();
-
-  /// Creates a channel without a socket.
-  ///
-  /// This is used with `connect` to synchronously provide a channel that later
-  /// has a socket added.
-  IOWebSocketChannel._withoutSocket(Stream stream, this.sink)
-      : _webSocket = null,
-        stream = stream.handleError(
-            (Object? error) => throw WebSocketChannelException.from(error)),
-        _readyCompleter = Completer();
-}
-
-/// A [WebSocketSink] that forwards [close] calls to a `dart:io` [WebSocket].
-class _IOWebSocketSink extends DelegatingStreamSink implements WebSocketSink {
-  /// The underlying socket.
-  final WebSocket _webSocket;
-
-  _IOWebSocketSink(WebSocket super.webSocket) : _webSocket = webSocket;
-
-  @override
-  Future close([int? closeCode, String? closeReason]) =>
-      _webSocket.close(closeCode, closeReason);
+  /// Creates a channel wrapping [webSocket].
+  IOWebSocketChannel(FutureOr<WebSocket> webSocket)
+      : super(webSocket is Future<WebSocket>
+            ? webSocket.then(io_web_socket.IOWebSocket.fromWebSocket)
+                as FutureOr<io_web_socket.IOWebSocket>
+            : io_web_socket.IOWebSocket.fromWebSocket(webSocket));
 }
diff --git a/lib/web_socket_adapter_web_socket_channel.dart b/lib/web_socket_adapter_web_socket_channel.dart
index 28780ff..2a1242a 100644
--- a/lib/web_socket_adapter_web_socket_channel.dart
+++ b/lib/web_socket_adapter_web_socket_channel.dart
@@ -66,14 +66,19 @@
   /// future will complete with an error.
   factory WebSocketAdapterWebSocketChannel.connect(Uri url,
           {Iterable<String>? protocols}) =>
-      WebSocketAdapterWebSocketChannel._(
+      WebSocketAdapterWebSocketChannel(
           WebSocket.connect(url, protocols: protocols));
 
-  // Create a [WebSocketWebSocketChannelAdapter] from an existing [WebSocket].
-  factory WebSocketAdapterWebSocketChannel.fromWebSocket(WebSocket webSocket) =>
-      WebSocketAdapterWebSocketChannel._(Future.value(webSocket));
+  // Construct a [WebSocketWebSocketChannelAdapter] from an existing
+  // [WebSocket].
+  WebSocketAdapterWebSocketChannel(FutureOr<WebSocket> webSocket) {
+    Future<WebSocket> webSocketFuture;
+    if (webSocket is WebSocket) {
+      webSocketFuture = Future.value(webSocket);
+    } else {
+      webSocketFuture = webSocket;
+    }
 
-  WebSocketAdapterWebSocketChannel._(Future<WebSocket> webSocketFuture) {
     webSocketFuture.then((webSocket) {
       var remoteClosed = false;
       webSocket.events.listen((event) {
@@ -113,7 +118,16 @@
       _protocol = webSocket.protocol;
       _readyCompleter.complete();
     }, onError: (Object e) {
-      _readyCompleter.completeError(WebSocketChannelException.from(e));
+      Exception error;
+      if (e is TimeoutException) {
+        // Required for backwards compatibility with `IOWebSocketChannel`.
+        error = e;
+      } else {
+        error = WebSocketChannelException.from(e);
+      }
+      _readyCompleter.completeError(error);
+      _controller.local.sink.addError(error);
+      _controller.local.sink.close();
     });
   }
 }
diff --git a/pubspec.yaml b/pubspec.yaml
index 2dfbc7f..d198922 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -14,7 +14,7 @@
   crypto: ^3.0.0
   stream_channel: ^2.1.0
   web: ^0.5.0
-  web_socket: ^0.1.0
+  web_socket: ^0.1.1
 
 dev_dependencies:
   dart_flutter_team_lints: ^2.0.0
diff --git a/test/io_test.dart b/test/io_test.dart
index ac1ffcc..1b7ae35 100644
--- a/test/io_test.dart
+++ b/test/io_test.dart
@@ -131,7 +131,7 @@
     });
 
     final channel = IOWebSocketChannel.connect('ws://localhost:${server.port}');
-    expect(channel.ready, throwsA(isA<WebSocketException>()));
+    expect(channel.ready, throwsA(isA<WebSocketChannelException>()));
     expect(channel.stream.drain<void>(),
         throwsA(isA<WebSocketChannelException>()));
   });
@@ -154,7 +154,7 @@
       'ws://localhost:${server.port}',
       protocols: [failedProtocol],
     );
-    expect(channel.ready, throwsA(isA<WebSocketException>()));
+    expect(channel.ready, throwsA(isA<WebSocketChannelException>()));
     expect(
       channel.stream.drain<void>(),
       throwsA(isA<WebSocketChannelException>()),
@@ -230,8 +230,7 @@
     );
 
     expect(channel.ready, throwsA(isA<TimeoutException>()));
-    expect(channel.stream.drain<void>(),
-        throwsA(isA<WebSocketChannelException>()));
+    expect(channel.stream.drain<void>(), throwsA(anything));
   });
 
   test('.custom client is passed through', () async {
diff --git a/test/web_socket_adapter_web_socket_test.dart b/test/web_socket_adapter_web_socket_test.dart
index 24f9907..c66133b 100644
--- a/test/web_socket_adapter_web_socket_test.dart
+++ b/test/web_socket_adapter_web_socket_test.dart
@@ -112,9 +112,26 @@
       expect(channel.closeReason, null);
     });
 
-    test('fromWebSocket', () async {
+    test('constructor with WebSocket', () async {
       final webSocket = await WebSocket.connect(uri);
-      final channel = WebSocketAdapterWebSocketChannel.fromWebSocket(webSocket);
+      final channel = WebSocketAdapterWebSocketChannel(webSocket);
+
+      await expectLater(channel.ready, completes);
+      channel.sink.add('This count says:');
+      channel.sink.add([1, 2, 3]);
+      channel.sink.add('And then:');
+      channel.sink.add([4, 5, 6]);
+      expect(await channel.stream.take(4).toList(), [
+        'This count says:',
+        [1, 2, 3],
+        'And then:',
+        [4, 5, 6]
+      ]);
+    });
+
+    test('constructor with Future<WebSocket>', () async {
+      final webSocketFuture = WebSocket.connect(uri);
+      final channel = WebSocketAdapterWebSocketChannel(webSocketFuture);
 
       await expectLater(channel.ready, completes);
       channel.sink.add('This count says:');