Windows DirectoryWatcher buffer exhaustion recovery workaround. (#2149)

diff --git a/pkgs/watcher/CHANGELOG.md b/pkgs/watcher/CHANGELOG.md
index 6d22c5e..b4727e5 100644
--- a/pkgs/watcher/CHANGELOG.md
+++ b/pkgs/watcher/CHANGELOG.md
@@ -1,3 +1,11 @@
+## 1.1.3-wip
+
+- Improve handling of
+  `FileSystemException: Directory watcher closed unexpectedly` on Windows. The
+  watcher was already attempting to restart after this error and resume sending
+  events. But, the restart would sometimes silently fail. Now, it is more
+  reliable.
+
 ## 1.1.2
 
 - Fix a bug on Windows where a file creation event could be reported twice when creating
diff --git a/pkgs/watcher/lib/src/directory_watcher.dart b/pkgs/watcher/lib/src/directory_watcher.dart
index 158b86b..8caf09f 100644
--- a/pkgs/watcher/lib/src/directory_watcher.dart
+++ b/pkgs/watcher/lib/src/directory_watcher.dart
@@ -12,6 +12,14 @@
 
 /// Watches the contents of a directory and emits [WatchEvent]s when something
 /// in the directory has changed.
+///
+/// On Windows, the underlying SDK `Directory.watch` fails if too many events
+/// are received while Dart is busy, for example during a long-running
+/// synchronous operation. When this happens, some events are dropped.
+/// `DirectoryWatcher` restarts the watch and sends a `FileSystemException` with
+/// the message "Directory watcher closed unexpectedly" on the event stream. The
+/// code using the watcher needs to do additional work to account for the
+/// dropped events, for example by recomputing interesting files from scratch.
 abstract class DirectoryWatcher implements Watcher {
   /// The directory whose contents are being monitored.
   @Deprecated('Expires in 1.0.0. Use DirectoryWatcher.path instead.')
@@ -29,8 +37,10 @@
   /// watchers.
   factory DirectoryWatcher(String directory, {Duration? pollingDelay}) {
     if (FileSystemEntity.isWatchSupported) {
-      var customWatcher =
-          createCustomDirectoryWatcher(directory, pollingDelay: pollingDelay);
+      var customWatcher = createCustomDirectoryWatcher(
+        directory,
+        pollingDelay: pollingDelay,
+      );
       if (customWatcher != null) return customWatcher;
       if (Platform.isLinux) return LinuxDirectoryWatcher(directory);
       if (Platform.isMacOS) return MacOSDirectoryWatcher(directory);
diff --git a/pkgs/watcher/lib/src/directory_watcher/windows.dart b/pkgs/watcher/lib/src/directory_watcher/windows.dart
index 5607948..6d88e65 100644
--- a/pkgs/watcher/lib/src/directory_watcher/windows.dart
+++ b/pkgs/watcher/lib/src/directory_watcher/windows.dart
@@ -126,30 +126,34 @@
     // Check if [path] is already the root directory.
     if (FileSystemEntity.identicalSync(parent, path)) return;
     var parentStream = Directory(parent).watch(recursive: false);
-    _parentWatchSubscription = parentStream.listen((event) {
-      // Only look at events for 'directory'.
-      if (p.basename(event.path) != p.basename(absoluteDir)) return;
-      // Test if the directory is removed. FileSystemEntity.typeSync will
-      // return NOT_FOUND if it's unable to decide upon the type, including
-      // access denied issues, which may happen when the directory is deleted.
-      // FileSystemMoveEvent and FileSystemDeleteEvent events will always mean
-      // the directory is now gone.
-      if (event is FileSystemMoveEvent ||
-          event is FileSystemDeleteEvent ||
-          (FileSystemEntity.typeSync(path) == FileSystemEntityType.notFound)) {
-        for (var path in _files.paths) {
-          _emitEvent(ChangeType.REMOVE, path);
+    _parentWatchSubscription = parentStream.listen(
+      (event) {
+        // Only look at events for 'directory'.
+        if (p.basename(event.path) != p.basename(absoluteDir)) return;
+        // Test if the directory is removed. FileSystemEntity.typeSync will
+        // return NOT_FOUND if it's unable to decide upon the type, including
+        // access denied issues, which may happen when the directory is deleted.
+        // FileSystemMoveEvent and FileSystemDeleteEvent events will always mean
+        // the directory is now gone.
+        if (event is FileSystemMoveEvent ||
+            event is FileSystemDeleteEvent ||
+            (FileSystemEntity.typeSync(path) ==
+                FileSystemEntityType.notFound)) {
+          for (var path in _files.paths) {
+            _emitEvent(ChangeType.REMOVE, path);
+          }
+          _files.clear();
+          close();
         }
-        _files.clear();
-        close();
-      }
-    }, onError: (error) {
-      // Ignore errors, simply close the stream. The user listens on
-      // [directory], and while it can fail to listen on the parent, we may
-      // still be able to listen on the path requested.
-      _parentWatchSubscription?.cancel();
-      _parentWatchSubscription = null;
-    });
+      },
+      onError: (error) {
+        // Ignore errors, simply close the stream. The user listens on
+        // [directory], and while it can fail to listen on the parent, we may
+        // still be able to listen on the path requested.
+        _parentWatchSubscription?.cancel();
+        _parentWatchSubscription = null;
+      },
+    );
   }
 
   void _onEvent(FileSystemEvent event) {
@@ -225,16 +229,18 @@
     // Events within directories that already have events are superfluous; the
     // directory's full contents will be examined anyway, so we ignore such
     // events. Emitting them could cause useless or out-of-order events.
-    var directories = unionAll(batch.map((event) {
-      if (!event.isDirectory) return <String>{};
-      if (event is FileSystemMoveEvent) {
-        var destination = event.destination;
-        if (destination != null) {
-          return {event.path, destination};
+    var directories = unionAll(
+      batch.map((event) {
+        if (!event.isDirectory) return <String>{};
+        if (event is FileSystemMoveEvent) {
+          var destination = event.destination;
+          if (destination != null) {
+            return {event.path, destination};
+          }
         }
-      }
-      return {event.path};
-    }));
+        return {event.path};
+      }),
+    );
 
     bool isInModifiedDirectory(String path) =>
         directories.any((dir) => path != dir && p.isWithin(dir, path));
@@ -285,9 +291,11 @@
       // REMOVE; otherwise there will also be a REMOVE or CREATE event
       // (respectively) that will be contradictory.
       if (event is FileSystemModifyEvent) continue;
-      assert(event is FileSystemCreateEvent ||
-          event is FileSystemDeleteEvent ||
-          event is FileSystemMoveEvent);
+      assert(
+        event is FileSystemCreateEvent ||
+            event is FileSystemDeleteEvent ||
+            event is FileSystemMoveEvent,
+      );
 
       // If we previously thought this was a MODIFY, we now consider it to be a
       // CREATE or REMOVE event. This is safe for the same reason as above.
@@ -297,9 +305,11 @@
       }
 
       // A CREATE event contradicts a REMOVE event and vice versa.
-      assert(type == FileSystemEvent.create ||
-          type == FileSystemEvent.delete ||
-          type == FileSystemEvent.move);
+      assert(
+        type == FileSystemEvent.create ||
+            type == FileSystemEvent.delete ||
+            type == FileSystemEvent.move,
+      );
       if (type != event.type) return null;
     }
 
@@ -383,21 +393,31 @@
   void _startWatch() {
     // Note: "watcher closed" exceptions do not get sent over the stream
     // returned by watch, and must be caught via a zone handler.
-    runZonedGuarded(() {
-      var innerStream = Directory(path).watch(recursive: true);
-      _watchSubscription = innerStream.listen(_onEvent,
-          onError: _eventsController.addError, onDone: _onDone);
-    }, (error, stackTrace) {
-      if (error is FileSystemException &&
-          error.message.startsWith('Directory watcher closed unexpectedly')) {
-        _watchSubscription?.cancel();
-        _eventsController.addError(error, stackTrace);
-        _startWatch();
-      } else {
-        // ignore: only_throw_errors
-        throw error;
-      }
-    });
+    runZonedGuarded(
+      () {
+        var innerStream = Directory(path).watch(recursive: true);
+        _watchSubscription = innerStream.listen(
+          _onEvent,
+          onError: _eventsController.addError,
+          onDone: _onDone,
+        );
+      },
+      (error, stackTrace) async {
+        if (error is FileSystemException &&
+            error.message.startsWith('Directory watcher closed unexpectedly')) {
+          // Wait to work around https://github.com/dart-lang/sdk/issues/61378.
+          // Give the VM time to reset state after the error. See the issue for
+          // more discussion of the workaround.
+          await _watchSubscription?.cancel();
+          await Future<void>.delayed(const Duration(milliseconds: 1));
+          _eventsController.addError(error, stackTrace);
+          _startWatch();
+        } else {
+          // ignore: only_throw_errors
+          throw error;
+        }
+      },
+    );
   }
 
   /// Starts or restarts listing the watched directory to get an initial picture
@@ -413,8 +433,12 @@
       if (entity is! Directory) _files.add(entity.path);
     }
 
-    _initialListSubscription = stream.listen(handleEntity,
-        onError: _emitError, onDone: completer.complete, cancelOnError: true);
+    _initialListSubscription = stream.listen(
+      handleEntity,
+      onError: _emitError,
+      onDone: completer.complete,
+      cancelOnError: true,
+    );
     return completer.future;
   }
 
diff --git a/pkgs/watcher/pubspec.yaml b/pkgs/watcher/pubspec.yaml
index b86dbf1..bccdfac 100644
--- a/pkgs/watcher/pubspec.yaml
+++ b/pkgs/watcher/pubspec.yaml
@@ -1,5 +1,5 @@
 name: watcher
-version: 1.1.2
+version: 1.1.3-wip
 description: >-
   A file system watcher. It monitors changes to contents of directories and
   sends notifications when files have been added, removed, or modified.
diff --git a/pkgs/watcher/test/directory_watcher/windows_test.dart b/pkgs/watcher/test/directory_watcher/windows_test.dart
index 5be87ec..a6bc14d 100644
--- a/pkgs/watcher/test/directory_watcher/windows_test.dart
+++ b/pkgs/watcher/test/directory_watcher/windows_test.dart
@@ -25,40 +25,123 @@
     expect(DirectoryWatcher('.'), const TypeMatcher<WindowsDirectoryWatcher>());
   });
 
-  test('Regression test for https://github.com/dart-lang/tools/issues/2110',
-      () async {
-    late StreamSubscription<WatchEvent> sub;
-    try {
-      final temp = Directory.systemTemp.createTempSync();
+  test(
+    'Regression test for https://github.com/dart-lang/tools/issues/2110',
+    () async {
+      late StreamSubscription<WatchEvent> sub;
+      try {
+        final temp = Directory.systemTemp.createTempSync();
+        final watcher = DirectoryWatcher(temp.path);
+        final events = <WatchEvent>[];
+        sub = watcher.events.listen(events.add);
+        await watcher.ready;
+
+        // Create a file in a directory that doesn't exist. This forces the
+        // directory to be created first before the child file.
+        //
+        // When directory creation is detected by the watcher, it calls
+        // `Directory.list` on the directory to determine if there's files that
+        // have been created or modified. It's possible that the watcher will
+        // have already detected the file creation event before `Directory.list`
+        // returns. Before https://github.com/dart-lang/tools/issues/2110 was
+        // resolved, the check to ensure an event hadn't already been emitted
+        // for the file creation was incorrect, leading to the event being
+        // emitted again in some circumstances.
+        final file = File(p.join(temp.path, 'foo', 'file.txt'))
+          ..createSync(recursive: true);
+
+        // Introduce a short delay to allow for the directory watcher to detect
+        // the creation of foo/ and foo/file.txt.
+        await Future<void>.delayed(const Duration(seconds: 1));
+
+        // There should only be a single file added event.
+        expect(events, hasLength(1));
+        expect(
+          events.first.toString(),
+          WatchEvent(ChangeType.ADD, file.path).toString(),
+        );
+      } finally {
+        await sub.cancel();
+      }
+    },
+  );
+
+  // The Windows native watcher has a buffer that gets exhausted if events are
+  // not handled quickly enough. Then, it throws an error and stops watching.
+  // The exhaustion is reliably triggered if enough events arrive during a sync
+  // block. The `package:watcher` implementation tries to catch this and recover
+  // by starting a new watcher.
+  group('Buffer exhaustion', () {
+    late StreamSubscription<Object> subscription;
+    late Directory temp;
+    late int eventsSeen;
+    late int errorsSeen;
+
+    setUp(() async {
+      temp = Directory.systemTemp.createTempSync();
       final watcher = DirectoryWatcher(temp.path);
-      final events = <WatchEvent>[];
-      sub = watcher.events.listen(events.add);
+
+      eventsSeen = 0;
+      errorsSeen = 0;
+      subscription = watcher.events.listen(
+        (e) {
+          ++eventsSeen;
+        },
+        onError: (_, __) {
+          ++errorsSeen;
+        },
+      );
       await watcher.ready;
+    });
 
-      // Create a file in a directory that doesn't exist. This forces the
-      // directory to be created first before the child file.
-      //
-      // When directory creation is detected by the watcher, it calls
-      // `Directory.list` on the directory to determine if there's files that
-      // have been created or modified. It's possible that the watcher will have
-      // already detected the file creation event before `Directory.list`
-      // returns. Before https://github.com/dart-lang/tools/issues/2110 was
-      // resolved, the check to ensure an event hadn't already been emitted for
-      // the file creation was incorrect, leading to the event being emitted
-      // again in some circumstances.
-      final file = File(p.join(temp.path, 'foo', 'file.txt'))
-        ..createSync(recursive: true);
+    tearDown(() {
+      subscription.cancel();
+    });
 
-      // Introduce a short delay to allow for the directory watcher to detect
-      // the creation of foo/ and foo/file.txt.
-      await Future<void>.delayed(const Duration(seconds: 1));
+    test('recovery', () async {
+      // Use a long filename to fill the buffer.
+      final file = File('${temp.path}\\file'.padRight(255, 'a'));
 
-      // There should only be a single file added event.
-      expect(events, hasLength(1));
-      expect(events.first.toString(),
-          WatchEvent(ChangeType.ADD, file.path).toString());
-    } finally {
-      await sub.cancel();
-    }
+      // Repeatedly trigger buffer exhaustion, to check that recovery is
+      // reliable.
+      for (var times = 0; times != 200; ++times) {
+        errorsSeen = 0;
+        eventsSeen = 0;
+
+        // Syncronously trigger 200 events. Because this is a sync block, the VM
+        // won't handle the events, so this has a very high chance of triggering
+        // a buffer exhaustion.
+        //
+        // If a buffer exhaustion happens, `package:watcher` turns this into an
+        // error on the event stream, so `errorsSeen` will get incremented once.
+        // The number of changes 200 is chosen so this is very likely to happen.
+        // If there is _not_ an exhaustion, the 200 events will show on the
+        // stream as a single event because they are changes of the same file.
+        // So, `eventsSeen` will instead be incremented once.
+        for (var i = 0; i != 200; ++i) {
+          file.writeAsStringSync('');
+        }
+
+        // Events only happen when there is an async gap, wait for such a gap.
+        await Future<void>.delayed(const Duration(milliseconds: 10));
+
+        // If everything is going well, there should have been either one event
+        // seen or one error seen.
+        if (errorsSeen == 0 && eventsSeen == 0) {
+          // It looks like the watcher is now broken: there were file changes
+          // but no event and no error. Do some non-sync writes to confirm
+          // whether the watcher really is now broken.
+          for (var i = 0; i != 5; ++i) {
+            await file.writeAsString('');
+          }
+          await Future<void>.delayed(const Duration(milliseconds: 10));
+          fail(
+            'On attempt ${times + 1}, watcher registered nothing. '
+            'On retry, it registered: $errorsSeen error(s), $eventsSeen '
+            'event(s).',
+          );
+        }
+      }
+    });
   });
 }