Use stream matchers to unflake the mac OS watcher tests.

R=rnystrom@google.com
BUG=15024

Review URL: https://codereview.chromium.org//129473003

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart/pkg/watcher@31705 260f80e4-7a28-3924-810f-c04153c831b5
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index a235f7d..163e9f4 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -73,6 +73,18 @@
 /// under the covers.
 Future newFuture(callback()) => new Future.value().then((_) => callback());
 
+/// Returns a [Future] that completes after pumping the event queue [times]
+/// times. By default, this should pump the event queue enough times to allow
+/// any code to run, as long as it's not waiting on some external event.
+Future pumpEventQueue([int times=20]) {
+  if (times == 0) return new Future.value();
+  // We use a delayed future to allow microtask events to finish. The
+  // Future.value or Future() constructors use scheduleMicrotask themselves and
+  // would therefore not wait for microtask callbacks that are scheduled after
+  // invoking this method.
+  return new Future.delayed(Duration.ZERO, () => pumpEventQueue(times - 1));
+}
+
 /// A stream transformer that batches all events that are sent at the same time.
 ///
 /// When multiple events are synchronously added to a stream controller, the
diff --git a/pubspec.yaml b/pubspec.yaml
index c77dddf..bf00d9e 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -9,7 +9,7 @@
   path: ">=0.9.0 <2.0.0"
   stack_trace: ">=0.9.1 <0.10.0"
 dev_dependencies:
-  scheduled_test: ">=0.9.0 <0.10.0"
-  unittest: ">=0.9.0 <0.10.0"
+  scheduled_test: ">=0.9.3-dev <0.10.0"
+  unittest: ">=0.9.2 <0.10.0"
 environment:
   sdk: ">=0.8.10+6 <2.0.0"
diff --git a/test/directory_watcher/linux_test.dart b/test/directory_watcher/linux_test.dart
index c2d0176..cae38ad 100644
--- a/test/directory_watcher/linux_test.dart
+++ b/test/directory_watcher/linux_test.dart
@@ -32,14 +32,15 @@
     renameDir("dir/sub", "sub");
     renameDir("sub", "dir/sub");
 
-    inAnyOrder(() {
-      withPermutations((i, j, k) =>
-          expectRemoveEvent("dir/sub/sub-$i/sub-$j/file-$k.txt"));
-    });
+    allowEither(() {
+      inAnyOrder(withPermutations((i, j, k) =>
+          isRemoveEvent("dir/sub/sub-$i/sub-$j/file-$k.txt")));
 
-    inAnyOrder(() {
-      withPermutations((i, j, k) =>
-          expectAddEvent("dir/sub/sub-$i/sub-$j/file-$k.txt"));
+      inAnyOrder(withPermutations((i, j, k) =>
+          isAddEvent("dir/sub/sub-$i/sub-$j/file-$k.txt")));
+    }, () {
+      inAnyOrder(withPermutations((i, j, k) =>
+          isModifyEvent("dir/sub/sub-$i/sub-$j/file-$k.txt")));
     });
   });
 }
diff --git a/test/directory_watcher/mac_os_test.dart b/test/directory_watcher/mac_os_test.dart
index 077787a..43567f0 100644
--- a/test/directory_watcher/mac_os_test.dart
+++ b/test/directory_watcher/mac_os_test.dart
@@ -36,24 +36,26 @@
     expectAddEvent("dir/newer.txt");
   });
 
-  test('emits events for many nested files moved out then immediately back in',
-      () {
-    withPermutations((i, j, k) =>
-        writeFile("dir/sub/sub-$i/sub-$j/file-$k.txt"));
+  // TODO(nweiz): re-enable this when issue 16003 is fixed.
+  // test('emits events for many nested files moved out then immediately back in',
+  //     () {
+  //   withPermutations((i, j, k) =>
+  //       writeFile("dir/sub/sub-$i/sub-$j/file-$k.txt"));
 
-    startWatcher(dir: "dir");
+  //   startWatcher(dir: "dir");
 
-    renameDir("dir/sub", "sub");
-    renameDir("sub", "dir/sub");
+  //   renameDir("dir/sub", "sub");
+  //   renameDir("sub", "dir/sub");
 
-    inAnyOrder(() {
-      withPermutations((i, j, k) =>
-          expectRemoveEvent("dir/sub/sub-$i/sub-$j/file-$k.txt"));
-    });
+  //   allowEither(() {
+  //     inAnyOrder(withPermutations((i, j, k) =>
+  //         isRemoveEvent("dir/sub/sub-$i/sub-$j/file-$k.txt")));
 
-    inAnyOrder(() {
-      withPermutations((i, j, k) =>
-          expectAddEvent("dir/sub/sub-$i/sub-$j/file-$k.txt"));
-    });
-  });
+  //     inAnyOrder(withPermutations((i, j, k) =>
+  //         isAddEvent("dir/sub/sub-$i/sub-$j/file-$k.txt")));
+  //   }, () {
+  //     inAnyOrder(withPermutations((i, j, k) =>
+  //         isModifyEvent("dir/sub/sub-$i/sub-$j/file-$k.txt")));
+  //   });
+  // });
 }
diff --git a/test/directory_watcher/shared.dart b/test/directory_watcher/shared.dart
index 2e0186f..849cea0 100644
--- a/test/directory_watcher/shared.dart
+++ b/test/directory_watcher/shared.dart
@@ -3,6 +3,7 @@
 // BSD-style license that can be found in the LICENSE file.
 
 import 'package:scheduled_test/scheduled_test.dart';
+import 'package:watcher/src/utils.dart';
 
 import '../utils.dart';
 
@@ -58,10 +59,10 @@
 
     writeFile("a.txt", contents: "same");
     writeFile("b.txt", contents: "after");
-    inAnyOrder(() {
-      expectModifyEvent("a.txt");
-      expectModifyEvent("b.txt");
-    });
+    inAnyOrder([
+      isModifyEvent("a.txt"),
+      isModifyEvent("b.txt")
+    ]);
   });
 
   test('when the watched directory is deleted, removes all files', () {
@@ -71,10 +72,10 @@
     startWatcher(dir: "dir");
 
     deleteDir("dir");
-    inAnyOrder(() {
-      expectRemoveEvent("dir/a.txt");
-      expectRemoveEvent("dir/b.txt");
-    });
+    inAnyOrder([
+      isRemoveEvent("dir/a.txt"),
+      isRemoveEvent("dir/b.txt")
+    ]);
   });
 
   group("moves", () {
@@ -83,10 +84,10 @@
       startWatcher();
       renameFile("old.txt", "new.txt");
 
-      inAnyOrder(() {
-        expectAddEvent("new.txt");
-        expectRemoveEvent("old.txt");
-      });
+      inAnyOrder([
+        isAddEvent("new.txt"),
+        isRemoveEvent("old.txt")
+      ]);
     });
 
     test('notifies when a file is moved from outside the watched directory',
@@ -108,6 +109,13 @@
     });
   });
 
+  // Most of the time, when multiple filesystem actions happen in sequence,
+  // they'll be batched together and the watcher will see them all at once.
+  // These tests verify that the watcher normalizes and combine these events
+  // properly. However, very occasionally the events will be reported in
+  // separate batches, and the watcher will report them as though they occurred
+  // far apart in time, so each of these tests has a "backup case" to allow for
+  // that as well.
   group("clustered changes", () {
     test("doesn't notify when a file is created and then immediately removed",
         () {
@@ -115,7 +123,12 @@
       writeFile("file.txt");
       deleteFile("file.txt");
 
-      // [startWatcher] will assert that no events were fired.
+      // Backup case.
+      startClosingEventStream();
+      allowEvents(() {
+        expectAddEvent("file.txt");
+        expectRemoveEvent("file.txt");
+      });
     });
 
     test("reports a modification when a file is deleted and then immediately "
@@ -125,7 +138,14 @@
 
       deleteFile("file.txt");
       writeFile("file.txt", contents: "re-created");
-      expectModifyEvent("file.txt");
+
+      allowEither(() {
+        expectModifyEvent("file.txt");
+      }, () {
+        // Backup case.
+        expectRemoveEvent("file.txt");
+        expectAddEvent("file.txt");
+      });
     });
 
     test("reports a modification when a file is moved and then immediately "
@@ -135,9 +155,17 @@
 
       renameFile("old.txt", "new.txt");
       writeFile("old.txt", contents: "re-created");
-      inAnyOrder(() {
-        expectModifyEvent("old.txt");
+
+      allowEither(() {
+        inAnyOrder([
+          isModifyEvent("old.txt"),
+          isAddEvent("new.txt")
+        ]);
+      }, () {
+        // Backup case.
+        expectRemoveEvent("old.txt");
         expectAddEvent("new.txt");
+        expectAddEvent("old.txt");
       });
     });
 
@@ -148,6 +176,10 @@
 
       writeFile("file.txt", contents: "modified");
       deleteFile("file.txt");
+
+      // Backup case.
+      allowModifyEvent("file.txt");
+
       expectRemoveEvent("file.txt");
     });
 
@@ -157,7 +189,12 @@
 
       writeFile("file.txt");
       writeFile("file.txt", contents: "modified");
+
       expectAddEvent("file.txt");
+
+      // Backup case.
+      startClosingEventStream();
+      allowModifyEvent("file.txt");
     });
   });
 
@@ -174,10 +211,10 @@
       startWatcher();
 
       renameDir("old", "new");
-      inAnyOrder(() {
-        expectRemoveEvent("old/file.txt");
-        expectAddEvent("new/file.txt");
-      });
+      inAnyOrder([
+        isRemoveEvent("old/file.txt"),
+        isAddEvent("new/file.txt")
+      ]);
 
       writeFile("new/file.txt", contents: "modified");
       expectModifyEvent("new/file.txt");
@@ -191,10 +228,8 @@
       startWatcher(dir: "dir");
       renameDir("sub", "dir/sub");
 
-      inAnyOrder(() {
-        withPermutations((i, j, k)  =>
-            expectAddEvent("dir/sub/sub-$i/sub-$j/file-$k.txt"));
-      });
+      inAnyOrder(withPermutations((i, j, k)  =>
+          isAddEvent("dir/sub/sub-$i/sub-$j/file-$k.txt")));
     });
 
     test('emits events for many nested files removed at once', () {
@@ -210,10 +245,8 @@
       // directory.
       renameDir("dir/sub", "sub");
 
-      inAnyOrder(() {
-        withPermutations((i, j, k) =>
-            expectRemoveEvent("dir/sub/sub-$i/sub-$j/file-$k.txt"));
-      });
+      inAnyOrder(withPermutations((i, j, k) =>
+          isRemoveEvent("dir/sub/sub-$i/sub-$j/file-$k.txt")));
     });
 
     test('emits events for many nested files moved at once', () {
@@ -224,12 +257,12 @@
       startWatcher(dir: "dir");
       renameDir("dir/old", "dir/new");
 
-      inAnyOrder(() {
-        withPermutations((i, j, k) {
-          expectRemoveEvent("dir/old/sub-$i/sub-$j/file-$k.txt");
-          expectAddEvent("dir/new/sub-$i/sub-$j/file-$k.txt");
-        });
-      });
+      inAnyOrder(unionAll(withPermutations((i, j, k) {
+        return new Set.from([
+          isRemoveEvent("dir/old/sub-$i/sub-$j/file-$k.txt"),
+          isAddEvent("dir/new/sub-$i/sub-$j/file-$k.txt")
+        ]);
+      })));
     });
 
     test("emits events for many files added at once in a subdirectory with the "
@@ -241,11 +274,11 @@
 
       deleteFile("dir/sub");
       renameDir("old", "dir/sub");
-      inAnyOrder(() {
-        expectRemoveEvent("dir/sub");
-        withPermutations((i, j, k)  =>
-            expectAddEvent("dir/sub/sub-$i/sub-$j/file-$k.txt"));
-      });
+
+      var events = withPermutations((i, j, k)  =>
+          isAddEvent("dir/sub/sub-$i/sub-$j/file-$k.txt"));
+      events.add(isRemoveEvent("dir/sub"));
+      inAnyOrder(events);
     });
   });
 }
diff --git a/test/utils.dart b/test/utils.dart
index a7bd9b6..f7e35f1 100644
--- a/test/utils.dart
+++ b/test/utils.dart
@@ -9,6 +9,7 @@
 import 'dart:io';
 
 import 'package:path/path.dart' as p;
+import 'package:scheduled_test/scheduled_stream.dart';
 import 'package:scheduled_test/scheduled_test.dart';
 import 'package:unittest/compact_vm_config.dart';
 import 'package:watcher/watcher.dart';
@@ -25,11 +26,6 @@
 /// The [DirectoryWatcher] being used for the current scheduled test.
 DirectoryWatcher _watcher;
 
-/// The index in [_watcher]'s event stream for the next event. When event
-/// expectations are set using [expectEvent] (et. al.), they use this to
-/// expect a series of events in order.
-var _nextEvent = 0;
-
 /// The mock modification times (in milliseconds since epoch) for each file.
 ///
 /// The actual file system has pretty coarse granularity for file modification
@@ -111,7 +107,7 @@
 }
 
 /// The stream of events from the watcher started with [startWatcher].
-Stream _watcherEvents;
+ScheduledStream<WatchEvent> _watcherEvents;
 
 /// Creates a new [DirectoryWatcher] that watches a temporary directory and
 /// starts monitoring it for events.
@@ -130,82 +126,102 @@
   // Schedule [_watcher.events.listen] so that the watcher doesn't start
   // watching [dir] before it exists. Expose [_watcherEvents] immediately so
   // that it can be accessed synchronously after this.
-  _watcherEvents = futureStream(schedule(() {
-    var allEvents = new Queue();
-    var subscription = _watcher.events.listen(allEvents.add,
-        onError: currentSchedule.signalError);
-
+  _watcherEvents = new ScheduledStream(futureStream(schedule(() {
     currentSchedule.onComplete.schedule(() {
       if (MacOSDirectoryWatcher.logDebugInfo) {
         print("stopping watcher for $testCase");
       }
 
-      var numEvents = _nextEvent;
-      subscription.cancel();
-      _nextEvent = 0;
       _watcher = null;
+      if (!_closePending) _watcherEvents.close();
 
       // If there are already errors, don't add this to the output and make
       // people think it might be the root cause.
       if (currentSchedule.errors.isEmpty) {
-        expect(allEvents, hasLength(numEvents));
-      } else {
-        currentSchedule.addDebugInfo("Events fired:\n${allEvents.join('\n')}");
+        _watcherEvents.expect(isDone);
       }
     }, "reset watcher");
 
     return _watcher.events;
-  }, "create watcher"), broadcast: true);
+  }, "create watcher"), broadcast: true));
 
   schedule(() => _watcher.ready, "wait for watcher to be ready");
 }
 
-/// A future set by [inAnyOrder] that will complete to the set of events that
-/// occur in the [inAnyOrder] block.
-Future<Set<WatchEvent>> _unorderedEventFuture;
+/// Whether an event to close [_watcherEvents] has been scheduled.
+bool _closePending = false;
 
-/// Runs [block] and allows multiple [expectEvent] calls in that block to match
-/// events in any order.
-void inAnyOrder(block()) {
-  var oldFuture = _unorderedEventFuture;
+/// Schedule closing the directory watcher stream after the event queue has been
+/// pumped.
+///
+/// This is necessary when events are allowed to occur, but don't have to occur,
+/// at the end of a test. Otherwise, if they don't occur, the test will wait
+/// indefinitely because they might in the future and because the watcher is
+/// normally only closed after the test completes.
+void startClosingEventStream() {
+  schedule(() {
+    _closePending = true;
+    pumpEventQueue().then((_) => _watcherEvents.close()).whenComplete(() {
+      _closePending = false;
+    });
+  }, 'start closing event stream');
+}
+
+/// A list of [StreamMatcher]s that have been collected using
+/// [_collectStreamMatcher].
+List<StreamMatcher> _collectedStreamMatchers;
+
+/// Collects all stream matchers that are registered within [block] into a
+/// single stream matcher.
+///
+/// The returned matcher will match each of the collected matchers in order.
+StreamMatcher _collectStreamMatcher(block()) {
+  var oldStreamMatchers = _collectedStreamMatchers;
+  _collectedStreamMatchers = new List<StreamMatcher>();
   try {
-    var firstEvent = _nextEvent;
-    var completer = new Completer();
-    _unorderedEventFuture = completer.future;
     block();
-
-    _watcherEvents.skip(firstEvent).take(_nextEvent - firstEvent).toSet()
-        .then(completer.complete, onError: completer.completeError);
-    currentSchedule.wrapFuture(_unorderedEventFuture,
-        "waiting for ${_nextEvent - firstEvent} events");
+    return inOrder(_collectedStreamMatchers);
   } finally {
-    _unorderedEventFuture = oldFuture;
+    _collectedStreamMatchers = oldStreamMatchers;
   }
 }
 
-/// Expects that the next set of event will be a change of [type] on [path].
+/// Either add [streamMatcher] as an expectation to [_watcherEvents], or collect
+/// it with [_collectStreamMatcher].
 ///
-/// Multiple calls to [expectEvent] require that the events are received in that
-/// order unless they're called in an [inAnyOrder] block.
-void expectEvent(ChangeType type, String path) {
-  if (_unorderedEventFuture != null) {
-    // Assign this to a local variable since it will be un-assigned by the time
-    // the scheduled callback runs.
-    var future = _unorderedEventFuture;
-
-    expect(
-        schedule(() => future, "should fire $type event on $path"),
-        completion(contains(isWatchEvent(type, path))));
+/// [streamMatcher] can be a [StreamMatcher], a [Matcher], or a value.
+void _expectOrCollect(streamMatcher) {
+  if (_collectedStreamMatchers != null) {
+    _collectedStreamMatchers.add(new StreamMatcher.wrap(streamMatcher));
   } else {
-    var future = currentSchedule.wrapFuture(
-        _watcherEvents.elementAt(_nextEvent),
-        "waiting for $type event on $path");
-
-    expect(
-        schedule(() => future, "should fire $type event on $path"),
-        completion(isWatchEvent(type, path)));
+    _watcherEvents.expect(streamMatcher);
   }
-  _nextEvent++;
+}
+
+/// Expects that [matchers] will match emitted events in any order.
+///
+/// [matchers] may be [Matcher]s or values, but not [StreamMatcher]s.
+void inAnyOrder(Iterable matchers) {
+  matchers = matchers.toSet();
+  _expectOrCollect(nextValues(matchers.length, unorderedMatches(matchers)));
+}
+
+/// Expects that the expectations established in either [block1] or [block2]
+/// will match the emitted events.
+///
+/// If both blocks match, the one that consumed more events will be used.
+void allowEither(block1(), block2()) {
+  _expectOrCollect(either(
+      _collectStreamMatcher(block1), _collectStreamMatcher(block2)));
+}
+
+/// Allows the expectations established in [block] to match the emitted events.
+///
+/// If the expectations in [block] don't match, no error will be raised and no
+/// events will be consumed. If this is used at the end of a test,
+/// [startClosingEventStream] should be called before it.
+void allowEvents(block()) {
+  _expectOrCollect(allow(_collectStreamMatcher(block)));
 }
 
 /// Returns a matcher that matches a [WatchEvent] with the given [type] and
@@ -217,9 +233,53 @@
   }, "is $type $path");
 }
 
-void expectAddEvent(String path) => expectEvent(ChangeType.ADD, path);
-void expectModifyEvent(String path) => expectEvent(ChangeType.MODIFY, path);
-void expectRemoveEvent(String path) => expectEvent(ChangeType.REMOVE, path);
+/// Returns a [Matcher] that matches a [WatchEvent] for an add event for [path].
+Matcher isAddEvent(String path) => isWatchEvent(ChangeType.ADD, path);
+
+/// Returns a [Matcher] that matches a [WatchEvent] for a modification event for
+/// [path].
+Matcher isModifyEvent(String path) => isWatchEvent(ChangeType.MODIFY, path);
+
+/// Returns a [Matcher] that matches a [WatchEvent] for a removal event for
+/// [path].
+Matcher isRemoveEvent(String path) => isWatchEvent(ChangeType.REMOVE, path);
+
+/// Expects that the next event emitted will be for an add event for [path].
+void expectAddEvent(String path) =>
+  _expectOrCollect(isWatchEvent(ChangeType.ADD, path));
+
+/// Expects that the next event emitted will be for a modification event for
+/// [path].
+void expectModifyEvent(String path) =>
+  _expectOrCollect(isWatchEvent(ChangeType.MODIFY, path));
+
+/// Expects that the next event emitted will be for a removal event for [path].
+void expectRemoveEvent(String path) =>
+  _expectOrCollect(isWatchEvent(ChangeType.REMOVE, path));
+
+/// Consumes an add event for [path] if one is emitted at this point in the
+/// schedule, but doesn't throw an error if it isn't.
+///
+/// If this is used at the end of a test, [startClosingEventStream] should be
+/// called before it.
+void allowAddEvent(String path) =>
+  _expectOrCollect(allow(isWatchEvent(ChangeType.ADD, path)));
+
+/// Consumes a modification event for [path] if one is emitted at this point in
+/// the schedule, but doesn't throw an error if it isn't.
+///
+/// If this is used at the end of a test, [startClosingEventStream] should be
+/// called before it.
+void allowModifyEvent(String path) =>
+  _expectOrCollect(allow(isWatchEvent(ChangeType.MODIFY, path)));
+
+/// Consumes a removal event for [path] if one is emitted at this point in the
+/// schedule, but doesn't throw an error if it isn't.
+///
+/// If this is used at the end of a test, [startClosingEventStream] should be
+/// called before it.
+void allowRemoveEvent(String path) =>
+  _expectOrCollect(allow(isWatchEvent(ChangeType.REMOVE, path)));
 
 /// Schedules writing a file in the sandbox at [path] with [contents].
 ///
@@ -299,14 +359,18 @@
 /// Runs [callback] with every permutation of non-negative [i], [j], and [k]
 /// less than [limit].
 ///
+/// Returns a set of all values returns by [callback].
+///
 /// [limit] defaults to 3.
-void withPermutations(callback(int i, int j, int k), {int limit}) {
+Set withPermutations(callback(int i, int j, int k), {int limit}) {
   if (limit == null) limit = 3;
+  var results = new Set();
   for (var i = 0; i < limit; i++) {
     for (var j = 0; j < limit; j++) {
       for (var k = 0; k < limit; k++) {
-        callback(i, j, k);
+        results.add(callback(i, j, k));
       }
     }
   }
+  return results;
 }