[flutter_tools] do not add events to closed sink in throttle transform (#66468)

The throttle duration could delay past the point where the destination sink was closed. Check if it is closed before adding an event. Fixes a crash on dev: StateError: Bad State: Stream is already closed.
diff --git a/packages/flutter_tools/lib/src/protocol_discovery.dart b/packages/flutter_tools/lib/src/protocol_discovery.dart
index 3d1bc3d..bdcbb45 100644
--- a/packages/flutter_tools/lib/src/protocol_discovery.dart
+++ b/packages/flutter_tools/lib/src/protocol_discovery.dart
@@ -234,6 +234,7 @@
   S latestLine;
   int lastExecution;
   Future<void> throttleFuture;
+  bool done = false;
 
   return StreamTransformer<S, S>
     .fromHandlers(
@@ -249,14 +250,20 @@
         final int nextExecutionTime = isFirstMessage || remainingTime > waitDuration.inMilliseconds
           ? 0
           : waitDuration.inMilliseconds - remainingTime;
-
         throttleFuture ??= Future<void>
           .delayed(Duration(milliseconds: nextExecutionTime))
           .whenComplete(() {
+            if (done) {
+              return;
+            }
             sink.add(latestLine);
             throttleFuture = null;
             lastExecution = DateTime.now().millisecondsSinceEpoch;
           });
+      },
+      handleDone: (EventSink<S> sink) {
+        done = true;
+        sink.close();
       }
     );
 }
diff --git a/packages/flutter_tools/test/general.shard/protocol_discovery_test.dart b/packages/flutter_tools/test/general.shard/protocol_discovery_test.dart
index a833521..0868df3 100644
--- a/packages/flutter_tools/test/general.shard/protocol_discovery_test.dart
+++ b/packages/flutter_tools/test/general.shard/protocol_discovery_test.dart
@@ -195,6 +195,18 @@
         expect('$uri', 'http://127.0.0.1:12345/PTwjm8Ii8qg=/');
       });
 
+      testUsingContext('protocol discovery does not crash if the log reader is closed while delaying', () async {
+        initialize(devicePort: 12346, throttleDuration: const Duration(milliseconds: 10));
+        final Future<List<Uri>> results = discoverer.uris.toList();
+        logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12346/PTwjm8Ii8qg=/');
+        logReader.addLine('I/flutter : Observatory listening on http://127.0.0.1:12346/PTwjm8Ii8qg=/');
+        await logReader.dispose();
+
+        // Give time for throttle to finish.
+        await Future<void>.delayed(const Duration(milliseconds: 11));
+        expect(await results, isEmpty);
+      });
+
       testUsingContext('uris in the stream are throttled', () async {
         const Duration kThrottleDuration = Duration(milliseconds: 10);