Fix a StateError in IOWebSocket (#1177)

This occurs when data is received from the peer after the connection has been closed locally.
diff --git a/pkgs/web_socket/CHANGELOG.md b/pkgs/web_socket/CHANGELOG.md
index e0df6cb..55b26d3 100644
--- a/pkgs/web_socket/CHANGELOG.md
+++ b/pkgs/web_socket/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 0.1.2
+
+- Fix a `StateError` in `IOWebSocket` when data is received from the peer
+  after the connection has been closed locally.
+
 ## 0.1.1
 
 - Add the ability to create a `package:web_socket` `WebSocket` given a
diff --git a/pkgs/web_socket/lib/src/io_web_socket.dart b/pkgs/web_socket/lib/src/io_web_socket.dart
index 5225c07..8b82218 100644
--- a/pkgs/web_socket/lib/src/io_web_socket.dart
+++ b/pkgs/web_socket/lib/src/io_web_socket.dart
@@ -56,6 +56,7 @@
   IOWebSocket._(this._webSocket) {
     _webSocket.listen(
       (event) {
+        if (_events.isClosed) return;
         switch (event) {
           case String e:
             _events.add(TextDataReceived(e));
@@ -64,6 +65,7 @@
         }
       },
       onError: (Object e, StackTrace st) {
+        if (_events.isClosed) return;
         final wse = switch (e) {
           io.WebSocketException(message: final message) =>
             WebSocketException(message),
@@ -72,12 +74,11 @@
         _events.addError(wse, st);
       },
       onDone: () {
-        if (!_events.isClosed) {
-          _events
-            ..add(CloseReceived(
-                _webSocket.closeCode, _webSocket.closeReason ?? ''))
-            ..close();
-        }
+        if (_events.isClosed) return;
+        _events
+          ..add(
+              CloseReceived(_webSocket.closeCode, _webSocket.closeReason ?? ''))
+          ..close();
       },
     );
   }
diff --git a/pkgs/web_socket/pubspec.yaml b/pkgs/web_socket/pubspec.yaml
index 1c341f7..f468dcb 100644
--- a/pkgs/web_socket/pubspec.yaml
+++ b/pkgs/web_socket/pubspec.yaml
@@ -3,7 +3,7 @@
   Any easy-to-use library for communicating with WebSockets
   that has multiple implementations.
 repository: https://github.com/dart-lang/http/tree/master/pkgs/web_socket
-version: 0.1.1
+version: 0.1.2
 
 environment:
   sdk: ^3.3.0
diff --git a/pkgs/web_socket_conformance_tests/lib/src/close_local_tests.dart b/pkgs/web_socket_conformance_tests/lib/src/close_local_tests.dart
index 2fe27bb..cb496ed 100644
--- a/pkgs/web_socket_conformance_tests/lib/src/close_local_tests.dart
+++ b/pkgs/web_socket_conformance_tests/lib/src/close_local_tests.dart
@@ -10,10 +10,35 @@
 import 'close_local_server_vm.dart'
     if (dart.library.html) 'close_local_server_web.dart';
 
+import 'continuously_writing_server_vm.dart'
+    if (dart.library.html) 'continuously_writing_server_web.dart'
+    as writing_server;
+
 /// Tests that the [WebSocket] can correctly close the connection to the peer.
 void testCloseLocal(
     Future<WebSocket> Function(Uri uri, {Iterable<String>? protocols})
         channelFactory) {
+  group('remote writing', () {
+    late Uri uri;
+    late StreamChannel<Object?> httpServerChannel;
+    late StreamQueue<Object?> httpServerQueue;
+
+    setUp(() async {
+      httpServerChannel = await writing_server.startServer();
+      httpServerQueue = StreamQueue(httpServerChannel.stream);
+      uri = Uri.parse('ws://localhost:${await httpServerQueue.next}');
+    });
+    tearDown(() async {
+      httpServerChannel.sink.add(null);
+    });
+
+    test('peer writes after close are ignored', () async {
+      final channel = await channelFactory(uri);
+      await channel.close();
+      expect(await channel.events.isEmpty, true);
+    });
+  });
+
   group('local close', () {
     late Uri uri;
     late StreamChannel<Object?> httpServerChannel;
diff --git a/pkgs/web_socket_conformance_tests/lib/src/continuously_writing_server.dart b/pkgs/web_socket_conformance_tests/lib/src/continuously_writing_server.dart
new file mode 100644
index 0000000..a082d96
--- /dev/null
+++ b/pkgs/web_socket_conformance_tests/lib/src/continuously_writing_server.dart
@@ -0,0 +1,25 @@
+// Copyright (c) 2024, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:io';
+
+import 'package:stream_channel/stream_channel.dart';
+
+/// Starts an WebSocket server that sends a lot of data to the peer.
+void hybridMain(StreamChannel<Object?> channel) async {
+  late HttpServer server;
+
+  server = (await HttpServer.bind('localhost', 0))
+    ..transform(WebSocketTransformer()).listen((WebSocket webSocket) {
+      for (var i = 0; i < 10000; ++i) {
+        webSocket.add('Hello World!');
+      }
+    });
+
+  channel.sink.add(server.port);
+  await channel
+      .stream.first; // Any writes indicates that the server should exit.
+  unawaited(server.close());
+}
diff --git a/pkgs/web_socket_conformance_tests/lib/src/continuously_writing_server_vm.dart b/pkgs/web_socket_conformance_tests/lib/src/continuously_writing_server_vm.dart
new file mode 100644
index 0000000..51246c2
--- /dev/null
+++ b/pkgs/web_socket_conformance_tests/lib/src/continuously_writing_server_vm.dart
@@ -0,0 +1,12 @@
+// Generated by generate_server_wrappers.dart. Do not edit.
+
+import 'package:stream_channel/stream_channel.dart';
+
+import 'continuously_writing_server.dart';
+
+/// Starts the redirect test HTTP server in the same process.
+Future<StreamChannel<Object?>> startServer() async {
+  final controller = StreamChannelController<Object?>(sync: true);
+  hybridMain(controller.foreign);
+  return controller.local;
+}
diff --git a/pkgs/web_socket_conformance_tests/lib/src/continuously_writing_server_web.dart b/pkgs/web_socket_conformance_tests/lib/src/continuously_writing_server_web.dart
new file mode 100644
index 0000000..c28fe3f
--- /dev/null
+++ b/pkgs/web_socket_conformance_tests/lib/src/continuously_writing_server_web.dart
@@ -0,0 +1,9 @@
+// Generated by generate_server_wrappers.dart. Do not edit.
+
+import 'package:stream_channel/stream_channel.dart';
+import 'package:test/test.dart';
+
+/// Starts the redirect test HTTP server out-of-process.
+Future<StreamChannel<Object?>> startServer() async => spawnHybridUri(Uri(
+    scheme: 'package',
+    path: 'web_socket_conformance_tests/src/continuously_writing_server.dart'));