Take a broader approach to filtering out bogus Mac OS watcher events.

With luck, this should unflake the test.

R=rnystrom@google.com
BUG=16079

Review URL: https://codereview.chromium.org//140013002

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart/pkg/watcher@31853 260f80e4-7a28-3924-810f-c04153c831b5
diff --git a/lib/src/directory_watcher/mac_os.dart b/lib/src/directory_watcher/mac_os.dart
index bb82a4c..ba0ede4 100644
--- a/lib/src/directory_watcher/mac_os.dart
+++ b/lib/src/directory_watcher/mac_os.dart
@@ -50,14 +50,6 @@
   Future get ready => _readyCompleter.future;
   final _readyCompleter = new Completer();
 
-  /// The number of event batches that have been received from
-  /// [Directory.watch].
-  ///
-  /// This is used to determine if the [Directory.watch] stream was falsely
-  /// closed due to issue 14849. A close caused by events in the past will only
-  /// happen before or immediately after the first batch of events.
-  int _batches = 0;
-
   /// The set of files that are known to exist recursively within the watched
   /// directory.
   ///
@@ -73,11 +65,17 @@
   /// needs to be resubscribed in order to work around issue 14849.
   StreamSubscription<FileSystemEvent> _watchSubscription;
 
-  /// A set of subscriptions that this watcher subscribes to.
-  ///
-  /// These are gathered together so that they may all be canceled when the
-  /// watcher is closed. This does not include [_watchSubscription].
-  final _subscriptions = new Set<StreamSubscription>();
+  /// The subscription to the [Directory.list] call for the initial listing of
+  /// the directory to determine its initial state.
+  StreamSubscription<FileSystemEntity> _initialListSubscription;
+
+  /// The subscription to the [Directory.list] call for listing the contents of
+  /// a subdirectory that was moved into the watched directory.
+  StreamSubscription<FileSystemEntity> _listSubscription;
+
+  /// The timer for tracking how long we wait for an initial batch of bogus
+  /// events (see issue 14373).
+  Timer _bogusEventTimer;
 
   _MacOSDirectoryWatcher(String directory, int parentId)
       : directory = directory,
@@ -85,12 +83,20 @@
         _id = "$parentId/${_count++}" {
     _startWatch();
 
-    _listen(Chain.track(new Directory(directory).list(recursive: true)),
-        (entity) {
-      if (entity is! Directory) _files.add(entity.path);
-    },
-        onError: _emitError,
-        onDone: () {
+    // Before we're ready to emit events, wait for [_listDir] to complete and
+    // for enough time to elapse that if bogus events (issue 14373) would be
+    // emitted, they will be.
+    //
+    // If we do receive a batch of events, [_onBatch] will ensure that these
+    // futures don't fire and that the directory is re-listed.
+    Future.wait([
+      _listDir().then((_) {
+        if (MacOSDirectoryWatcher.logDebugInfo) {
+          print("[$_id] finished initial directory list");
+        }
+      }),
+      _waitForBogusEvents()
+    ]).then((_) {
       if (MacOSDirectoryWatcher.logDebugInfo) {
         print("[$_id] watcher is ready, known files:");
         for (var file in _files.toSet()) {
@@ -98,20 +104,19 @@
         }
       }
       _readyCompleter.complete();
-    },
-        cancelOnError: true);
+    });
   }
 
   void close() {
     if (MacOSDirectoryWatcher.logDebugInfo) {
       print("[$_id] watcher is closed");
     }
-    for (var subscription in _subscriptions) {
-      subscription.cancel();
-    }
-    _subscriptions.clear();
     if (_watchSubscription != null) _watchSubscription.cancel();
+    if (_initialListSubscription != null) _initialListSubscription.cancel();
+    if (_listSubscription != null) _listSubscription.cancel();
     _watchSubscription = null;
+    _initialListSubscription = null;
+    _listSubscription = null;
     _eventsController.close();
   }
 
@@ -129,7 +134,28 @@
       }
     }
 
-    _batches++;
+    // If we get a batch of events before we're ready to begin emitting events,
+    // it's probable that it's a batch of pre-watcher events (see issue 14373).
+    // Ignore those events and re-list the directory.
+    if (!isReady) {
+      if (MacOSDirectoryWatcher.logDebugInfo) {
+        print("[$_id] not ready to emit events, re-listing directory");
+      }
+
+      // Cancel the timer because bogus events only occur in the first batch, so
+      // we can fire [ready] as soon as we're done listing the directory.
+      _bogusEventTimer.cancel();
+      _listDir().then((_) {
+        if (MacOSDirectoryWatcher.logDebugInfo) {
+          print("[$_id] watcher is ready, known files:");
+          for (var file in _files.toSet()) {
+            print("[$_id]   ${p.relative(file, from: directory)}");
+          }
+        }
+        _readyCompleter.complete();
+      });
+      return;
+    }
 
     _sortEvents(batch).forEach((path, events) {
       var relativePath = p.relative(path, from: directory);
@@ -165,8 +191,8 @@
 
           if (_files.containsDir(path)) continue;
 
-          _listen(Chain.track(new Directory(path).list(recursive: true)),
-              (entity) {
+          var stream = Chain.track(new Directory(path).list(recursive: true));
+          _listSubscription = stream.listen((entity) {
             if (entity is Directory) return;
             if (_files.contains(path)) return;
 
@@ -366,10 +392,10 @@
 
     _watchSubscription = null;
 
-    // If the directory still exists and we haven't seen more than one batch,
+    // If the directory still exists and we're still expecting bogus events,
     // this is probably issue 14849 rather than a real close event. We should
     // just restart the watcher.
-    if (_batches < 2 && new Directory(directory).existsSync()) {
+    if (!isReady && new Directory(directory).existsSync()) {
       if (MacOSDirectoryWatcher.logDebugInfo) {
         print("[$_id] fake closure (issue 14849), re-opening stream");
       }
@@ -398,6 +424,37 @@
         onDone: _onDone);
   }
 
+  /// Starts or restarts listing the watched directory to get an initial picture
+  /// of its state.
+  Future _listDir() {
+    assert(!isReady);
+    if (_initialListSubscription != null) _initialListSubscription.cancel();
+
+    _files.clear();
+    var completer = new Completer();
+    var stream = Chain.track(new Directory(directory).list(recursive: true));
+    _initialListSubscription = stream.listen((entity) {
+      if (entity is! Directory) _files.add(entity.path);
+    },
+        onError: _emitError,
+        onDone: completer.complete,
+        cancelOnError: true);
+    return completer.future;
+  }
+
+  /// Wait 200ms for a batch of bogus events (issue 14373) to come in.
+  ///
+  /// 200ms is short in terms of human interaction, but longer than any Mac OS
+  /// watcher tests take on the bots, so it should be safe to assume that any
+  /// bogus events will be signaled in that time frame.
+  Future _waitForBogusEvents() {
+    var completer = new Completer();
+    _bogusEventTimer = new Timer(
+        new Duration(milliseconds: 200),
+        completer.complete);
+    return completer.future;
+  }
+
   /// Emit an event with the given [type] and [path].
   void _emitEvent(ChangeType type, String path) {
     if (!isReady) return;
@@ -415,18 +472,6 @@
     close();
   }
 
-  /// Like [Stream.listen], but automatically adds the subscription to
-  /// [_subscriptions] so that it can be canceled when [close] is called.
-  void _listen(Stream stream, void onData(event), {Function onError,
-      void onDone(), bool cancelOnError}) {
-    var subscription;
-    subscription = stream.listen(onData, onError: onError, onDone: () {
-      _subscriptions.remove(subscription);
-      if (onDone != null) onDone();
-    }, cancelOnError: cancelOnError);
-    _subscriptions.add(subscription);
-  }
-
   // TODO(nweiz): remove this when issue 15042 is fixed.
   /// Return a human-friendly string representation of [event].
   String _formatEvent(FileSystemEvent event) {
diff --git a/test/utils.dart b/test/utils.dart
index 885018b..b520bbd 100644
--- a/test/utils.dart
+++ b/test/utils.dart
@@ -5,7 +5,6 @@
 library watcher.test.utils;
 
 import 'dart:async';
-import 'dart:collection';
 import 'dart:io';
 
 import 'package:path/path.dart' as p;