Wrap Directory.watch on linux for the watcher package.

R=rnystrom@google.com
BUG=14428

Review URL: https://codereview.chromium.org//46843003

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart/pkg/watcher@30081 260f80e4-7a28-3924-810f-c04153c831b5
diff --git a/pkgs/watcher/lib/src/directory_watcher.dart b/pkgs/watcher/lib/src/directory_watcher.dart
index 679e227..4484c2b 100644
--- a/pkgs/watcher/lib/src/directory_watcher.dart
+++ b/pkgs/watcher/lib/src/directory_watcher.dart
@@ -7,18 +7,15 @@
 import 'dart:async';
 import 'dart:io';
 
-import 'package:crypto/crypto.dart';
-
-import 'async_queue.dart';
-import 'stat.dart';
-import 'utils.dart';
 import 'watch_event.dart';
+import 'directory_watcher/linux.dart';
+import 'directory_watcher/polling.dart';
 
 /// Watches the contents of a directory and emits [WatchEvent]s when something
 /// in the directory has changed.
-class DirectoryWatcher {
+abstract class DirectoryWatcher {
   /// The directory whose contents are being monitored.
-  final String directory;
+  String get directory;
 
   /// The broadcast [Stream] of events that have occurred to files in
   /// [directory].
@@ -26,10 +23,12 @@
   /// Changes will only be monitored while this stream has subscribers. Any
   /// file changes that occur during periods when there are no subscribers
   /// will not be reported the next time a subscriber is added.
-  Stream<WatchEvent> get events => _events.stream;
-  StreamController<WatchEvent> _events;
+  Stream<WatchEvent> get events;
 
-  _WatchState _state = _WatchState.UNSUBSCRIBED;
+  /// Whether the watcher is initialized and watching for file changes.
+  ///
+  /// This is true if and only if [ready] is complete.
+  bool get isReady;
 
   /// A [Future] that completes when the watcher is initialized and watching
   /// for file changes.
@@ -41,241 +40,20 @@
   ///
   /// If the watcher is already monitoring, this returns an already complete
   /// future.
-  Future get ready => _ready.future;
-  Completer _ready = new Completer();
-
-  /// The amount of time the watcher pauses between successive polls of the
-  /// directory contents.
-  final Duration pollingDelay;
-
-  /// The previous status of the files in the directory.
-  ///
-  /// Used to tell which files have been modified.
-  final _statuses = new Map<String, _FileStatus>();
-
-  /// The subscription used while [directory] is being listed.
-  ///
-  /// Will be `null` if a list is not currently happening.
-  StreamSubscription<FileSystemEntity> _listSubscription;
-
-  /// The queue of files waiting to be processed to see if they have been
-  /// modified.
-  ///
-  /// Processing a file is asynchronous, as is listing the directory, so the
-  /// queue exists to let each of those proceed at their own rate. The lister
-  /// will enqueue files as quickly as it can. Meanwhile, files are dequeued
-  /// and processed sequentially.
-  AsyncQueue<String> _filesToProcess;
-
-  /// The set of files that have been seen in the current directory listing.
-  ///
-  /// Used to tell which files have been removed: files that are in [_statuses]
-  /// but not in here when a poll completes have been removed.
-  final _polledFiles = new Set<String>();
+  Future get ready;
 
   /// Creates a new [DirectoryWatcher] monitoring [directory].
   ///
-  /// If [pollingDelay] is passed, it specifies the amount of time the watcher
-  /// will pause between successive polls of the directory contents. Making
-  /// this shorter will give more immediate feedback at the expense of doing
-  /// more IO and higher CPU usage. Defaults to one second.
-  DirectoryWatcher(this.directory, {Duration pollingDelay})
-      : pollingDelay = pollingDelay != null ? pollingDelay :
-                                              new Duration(seconds: 1) {
-    _events = new StreamController<WatchEvent>.broadcast(
-        onListen: _watch, onCancel: _cancel);
-
-    _filesToProcess = new AsyncQueue<String>(_processFile,
-        onError: _events.addError);
-  }
-
-  /// Scans to see which files were already present before the watcher was
-  /// subscribed to, and then starts watching the directory for changes.
-  void _watch() {
-    assert(_state == _WatchState.UNSUBSCRIBED);
-    _state = _WatchState.SCANNING;
-    _poll();
-  }
-
-  /// Stops watching the directory when there are no more subscribers.
-  void _cancel() {
-    assert(_state != _WatchState.UNSUBSCRIBED);
-    _state = _WatchState.UNSUBSCRIBED;
-
-    // If we're in the middle of listing the directory, stop.
-    if (_listSubscription != null) _listSubscription.cancel();
-
-    // Don't process any remaining files.
-    _filesToProcess.clear();
-    _polledFiles.clear();
-    _statuses.clear();
-
-    _ready = new Completer();
-  }
-
-  /// Scans the contents of the directory once to see which files have been
-  /// added, removed, and modified.
-  void _poll() {
-    _filesToProcess.clear();
-    _polledFiles.clear();
-
-    endListing() {
-      assert(_state != _WatchState.UNSUBSCRIBED);
-      _listSubscription = null;
-
-      // Null tells the queue consumer that we're done listing.
-      _filesToProcess.add(null);
-    }
-
-    var stream = new Directory(directory).list(recursive: true);
-    _listSubscription = stream.listen((entity) {
-      assert(_state != _WatchState.UNSUBSCRIBED);
-
-      if (entity is! File) return;
-      _filesToProcess.add(entity.path);
-    }, onError: (error, StackTrace stackTrace) {
-      if (!isDirectoryNotFoundException(error)) {
-        // It's some unknown error. Pipe it over to the event stream so the
-        // user can see it.
-        _events.addError(error, stackTrace);
-      }
-
-      // When an error occurs, we end the listing normally, which has the
-      // desired effect of marking all files that were in the directory as
-      // being removed.
-      endListing();
-    }, onDone: endListing, cancelOnError: true);
-  }
-
-  /// Processes [file] to determine if it has been modified since the last
-  /// time it was scanned.
-  Future _processFile(String file) {
-    assert(_state != _WatchState.UNSUBSCRIBED);
-
-    // `null` is the sentinel which means the directory listing is complete.
-    if (file == null) return _completePoll();
-
-    return getModificationTime(file).then((modified) {
-      if (_checkForCancel()) return null;
-
-      var lastStatus = _statuses[file];
-
-      // If its modification time hasn't changed, assume the file is unchanged.
-      if (lastStatus != null && lastStatus.modified == modified) {
-        // The file is still here.
-        _polledFiles.add(file);
-        return null;
-      }
-
-      return _hashFile(file).then((hash) {
-        if (_checkForCancel()) return;
-
-        var status = new _FileStatus(modified, hash);
-        _statuses[file] = status;
-        _polledFiles.add(file);
-
-        // Only notify while in the watching state.
-        if (_state != _WatchState.WATCHING) return;
-
-        // And the file is different.
-        var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash);
-        if (!changed) return;
-
-        var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY;
-        _events.add(new WatchEvent(type, file));
-      });
-    });
-  }
-
-  /// After the directory listing is complete, this determines which files were
-  /// removed and then restarts the next poll.
-  Future _completePoll() {
-    // Any files that were not seen in the last poll but that we have a
-    // status for must have been removed.
-    var removedFiles = _statuses.keys.toSet().difference(_polledFiles);
-    for (var removed in removedFiles) {
-      if (_state == _WatchState.WATCHING) {
-        _events.add(new WatchEvent(ChangeType.REMOVE, removed));
-      }
-      _statuses.remove(removed);
-    }
-
-    if (_state == _WatchState.SCANNING) {
-      _state = _WatchState.WATCHING;
-      _ready.complete();
-    }
-
-    // Wait and then poll again.
-    return new Future.delayed(pollingDelay).then((_) {
-      if (_checkForCancel()) return;
-      _poll();
-    });
-  }
-
-  /// Returns `true` and clears the processing queue if the watcher has been
-  /// unsubscribed.
-  bool _checkForCancel() {
-    if (_state != _WatchState.UNSUBSCRIBED) return false;
-
-    // Don't process any more files.
-    _filesToProcess.clear();
-    return true;
-  }
-
-  /// Calculates the SHA-1 hash of the file at [path].
-  Future<List<int>> _hashFile(String path) {
-    return new File(path).readAsBytes().then((bytes) {
-      var sha1 = new SHA1();
-      sha1.add(bytes);
-      return sha1.close();
-    });
-  }
-
-  /// Returns `true` if [a] and [b] are the same hash value, i.e. the same
-  /// series of byte values.
-  bool _sameHash(List<int> a, List<int> b) {
-    // Hashes should always be the same size.
-    assert(a.length == b.length);
-
-    for (var i = 0; i < a.length; i++) {
-      if (a[i] != b[i]) return false;
-    }
-
-    return true;
-  }
-}
-
-/// Enum class for the states that the [DirectoryWatcher] can be in.
-class _WatchState {
-  /// There are no subscribers to the watcher's event stream and no watching
-  /// is going on.
-  static const UNSUBSCRIBED = const _WatchState("unsubscribed");
-
-  /// There are subscribers and the watcher is doing an initial scan of the
-  /// directory to see which files were already present before watching started.
+  /// If a native directory watcher is available for this platform, this will
+  /// use it. Otherwise, it will fall back to a [PollingDirectoryWatcher].
   ///
-  /// The watcher does not send notifications for changes that occurred while
-  /// there were no subscribers, or for files already present before watching.
-  /// The initial scan is used to determine what "before watching" state of
-  /// the file system was.
-  static const SCANNING = const _WatchState("scanning");
-
-  /// There are subscribers and the watcher is polling the directory to look
-  /// for changes.
-  static const WATCHING = const _WatchState("watching");
-
-  /// The name of the state.
-  final String name;
-
-  const _WatchState(this.name);
+  /// If [_pollingDelay] is passed, it specifies the amount of time the watcher
+  /// will pause between successive polls of the directory contents. Making this
+  /// shorter will give more immediate feedback at the expense of doing more IO
+  /// and higher CPU usage. Defaults to one second. Ignored for non-polling
+  /// watchers.
+  factory DirectoryWatcher(String directory, {Duration pollingDelay}) {
+    if (Platform.isLinux) return new LinuxDirectoryWatcher(directory);
+    return new PollingDirectoryWatcher(directory, pollingDelay: pollingDelay);
+  }
 }
-
-class _FileStatus {
-  /// The last time the file was modified.
-  DateTime modified;
-
-  /// The SHA-1 hash of the contents of the file.
-  List<int> hash;
-
-  _FileStatus(this.modified, this.hash);
-}
\ No newline at end of file
diff --git a/pkgs/watcher/lib/src/directory_watcher/linux.dart b/pkgs/watcher/lib/src/directory_watcher/linux.dart
new file mode 100644
index 0000000..9acecf1
--- /dev/null
+++ b/pkgs/watcher/lib/src/directory_watcher/linux.dart
@@ -0,0 +1,298 @@
+// Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library watcher.directory_watcher.linux;
+
+import 'dart:async';
+import 'dart:io';
+
+import '../directory_watcher.dart';
+import '../utils.dart';
+import '../watch_event.dart';
+import 'resubscribable.dart';
+
+import 'package:stack_trace/stack_trace.dart';
+
+/// Uses the inotify subsystem to watch for filesystem events.
+///
+/// Inotify doesn't suport recursively watching subdirectories, nor does
+/// [Directory.watch] polyfill that functionality. This class polyfills it
+/// instead.
+///
+/// This class also compensates for the non-inotify-specific issues of
+/// [Directory.watch] producing multiple events for a single logical action
+/// (issue 14372) and providing insufficient information about move events
+/// (issue 14424).
+class LinuxDirectoryWatcher extends ResubscribableDirectoryWatcher {
+  LinuxDirectoryWatcher(String directory)
+      : super(directory, () => new _LinuxDirectoryWatcher(directory));
+}
+
+class _LinuxDirectoryWatcher implements ManuallyClosedDirectoryWatcher {
+  final String directory;
+
+  Stream<WatchEvent> get events => _eventsController.stream;
+  final _eventsController = new StreamController<WatchEvent>.broadcast();
+
+  bool get isReady => _readyCompleter.isCompleted;
+
+  Future get ready => _readyCompleter.future;
+  final _readyCompleter = new Completer();
+
+  /// The last known state for each entry in this directory.
+  ///
+  /// The keys in this map are the paths to the directory entries; the values
+  /// are [_EntryState]s indicating whether the entries are files or
+  /// directories.
+  final _entries = new Map<String, _EntryState>();
+
+  /// The watchers for subdirectories of [directory].
+  final _subWatchers = new Map<String, _LinuxDirectoryWatcher>();
+
+  /// A set of all subscriptions that this watcher subscribes to.
+  ///
+  /// These are gathered together so that they may all be canceled when the
+  /// watcher is closed.
+  final _subscriptions = new Set<StreamSubscription>();
+
+  _LinuxDirectoryWatcher(String directory)
+      : directory = directory {
+    // Batch the inotify changes together so that we can dedup events.
+    var innerStream = new Directory(directory).watch().transform(
+        new BatchedStreamTransformer<FileSystemEvent>());
+    _listen(innerStream, _onBatch,
+        onError: _eventsController.addError,
+        onDone: _onDone);
+
+    _listen(new Directory(directory).list(), (entity) {
+      _entries[entity.path] = new _EntryState(entity is Directory);
+      if (entity is! Directory) return;
+      _watchSubdir(entity.path);
+    }, onError: (error, stackTrace) {
+      _eventsController.addError(error, stackTrace);
+      close();
+    }, onDone: () {
+      _waitUntilReady().then((_) => _readyCompleter.complete());
+    }, cancelOnError: true);
+  }
+
+  /// Returns a [Future] that completes once all the subdirectory watchers are
+  /// fully initialized.
+  Future _waitUntilReady() {
+    return Future.wait(_subWatchers.values.map((watcher) => watcher.ready))
+        .then((_) {
+      if (_subWatchers.values.every((watcher) => watcher.isReady)) return;
+      return _waitUntilReady();
+    });
+  }
+
+  void close() {
+    for (var subscription in _subscriptions) {
+      subscription.cancel();
+    }
+    for (var watcher in _subWatchers.values) {
+      watcher.close();
+    }
+
+    _subWatchers.clear();
+    _subscriptions.clear();
+    _eventsController.close();
+  }
+
+  /// Returns all files (not directories) that this watcher knows of are
+  /// recursively in the watched directory.
+  Set<String> get _allFiles {
+    var files = new Set<String>();
+    _getAllFiles(files);
+    return files;
+  }
+
+  /// Helper function for [_allFiles].
+  ///
+  /// Adds all files that this watcher knows of to [files].
+  void _getAllFiles(Set<String> files) {
+    files.addAll(_entries.keys
+        .where((path) => _entries[path] == _EntryState.FILE).toSet());
+    for (var watcher in _subWatchers.values) {
+      watcher._getAllFiles(files);
+    }
+  }
+
+  /// Watch a subdirectory of [directory] for changes.
+  ///
+  /// If the subdirectory was added after [this] began emitting events, its
+  /// contents will be emitted as ADD events.
+  void _watchSubdir(String path) {
+    if (_subWatchers.containsKey(path)) return;
+    var watcher = new _LinuxDirectoryWatcher(path);
+    _subWatchers[path] = watcher;
+
+    // TODO(nweiz): Catch any errors here that indicate that the directory in
+    // question doesn't exist and silently stop watching it instead of
+    // propagating the errors.
+    _listen(watcher.events, (event) {
+      if (isReady) _eventsController.add(event);
+    }, onError: (error, stackTrace) {
+      _eventsController.addError(error, stackTrace);
+      _eventsController.close();
+    }, onDone: () {
+      if (_subWatchers[path] == watcher) _subWatchers.remove(path);
+
+      // It's possible that a directory was removed and recreated very quickly.
+      // If so, make sure we're still watching it.
+      if (new Directory(path).existsSync()) _watchSubdir(path);
+    });
+
+    // TODO(nweiz): Right now it's possible for the watcher to emit an event for
+    // a file before the directory list is complete. This could lead to the user
+    // seeing a MODIFY or REMOVE event for a file before they see an ADD event,
+    // which is bad. We should handle that.
+    //
+    // One possibility is to provide a general means (e.g.
+    // `DirectoryWatcher.eventsAndExistingFiles`) to tell a watcher to emit
+    // events for all the files that already exist. This would be useful for
+    // top-level clients such as barback as well, and could be implemented with
+    // a wrapper similar to how listening/canceling works now.
+
+    // If a directory is added after we're finished with the initial scan, emit
+    // an event for each entry in it. This gives the user consistently gets an
+    // event for every new file.
+    watcher.ready.then((_) {
+      if (!isReady || _eventsController.isClosed) return;
+      _listen(new Directory(path).list(recursive: true), (entry) {
+        if (entry is Directory) return;
+        _eventsController.add(new WatchEvent(ChangeType.ADD, entry.path));
+      }, onError: (error, stackTrace) {
+        // Ignore an exception caused by the dir not existing. It's fine if it
+        // was added and then quickly removed.
+        if (error is FileSystemException) return;
+
+        _eventsController.addError(error, stackTrace);
+        _eventsController.close();
+      }, cancelOnError: true);
+    });
+  }
+
+  /// The callback that's run when a batch of changes comes in.
+  void _onBatch(List<FileSystemEvent> batch) {
+    var changedEntries = new Set<String>();
+    var oldEntries = new Map.from(_entries);
+
+    // inotify event batches are ordered by occurrence, so we treat them as a
+    // log of what happened to a file.
+    for (var event in batch) {
+      // If the watched directory is deleted or moved, we'll get a deletion
+      // event for it. Ignore it; we handle closing [this] when the underlying
+      // stream is closed.
+      if (event.path == directory) continue;
+
+      changedEntries.add(event.path);
+
+      if (event is FileSystemMoveEvent) {
+        changedEntries.add(event.destination);
+        _changeEntryState(event.path, ChangeType.REMOVE, event.isDirectory);
+        _changeEntryState(event.destination, ChangeType.ADD, event.isDirectory);
+      } else {
+        _changeEntryState(event.path, _changeTypeFor(event), event.isDirectory);
+      }
+    }
+
+    for (var path in changedEntries) {
+      emitEvent(ChangeType type) {
+        if (isReady) _eventsController.add(new WatchEvent(type, path));
+      }
+
+      var oldState = oldEntries[path];
+      var newState = _entries[path];
+
+      if (oldState != _EntryState.FILE && newState == _EntryState.FILE) {
+        emitEvent(ChangeType.ADD);
+      } else if (oldState == _EntryState.FILE && newState == _EntryState.FILE) {
+        emitEvent(ChangeType.MODIFY);
+      } else if (oldState == _EntryState.FILE && newState != _EntryState.FILE) {
+        emitEvent(ChangeType.REMOVE);
+      }
+
+      if (oldState == _EntryState.DIRECTORY) {
+        var watcher = _subWatchers.remove(path);
+        if (watcher == null) return;
+        for (var path in watcher._allFiles) {
+          _eventsController.add(new WatchEvent(ChangeType.REMOVE, path));
+        }
+        watcher.close();
+      }
+
+      if (newState == _EntryState.DIRECTORY) _watchSubdir(path);
+    }
+  }
+
+  /// Changes the known state of the entry at [path] based on [change] and
+  /// [isDir].
+  void _changeEntryState(String path, ChangeType change, bool isDir) {
+    if (change == ChangeType.ADD || change == ChangeType.MODIFY) {
+      _entries[path] = new _EntryState(isDir);
+    } else {
+      assert(change == ChangeType.REMOVE);
+      _entries.remove(path);
+    }
+  }
+
+  /// Determines the [ChangeType] associated with [event].
+  ChangeType _changeTypeFor(FileSystemEvent event) {
+    if (event is FileSystemDeleteEvent) return ChangeType.REMOVE;
+    if (event is FileSystemCreateEvent) return ChangeType.ADD;
+
+    assert(event is FileSystemModifyEvent);
+    return ChangeType.MODIFY;
+  }
+
+  /// Handles the underlying event stream closing, indicating that the directory
+  /// being watched was removed.
+  void _onDone() {
+    // The parent directory often gets a close event before the subdirectories
+    // are done emitting events. We wait for them to finish before we close
+    // [events] so that we can be sure to emit a remove event for every file
+    // that used to exist.
+    Future.wait(_subWatchers.values.map((watcher) {
+      try {
+        return watcher.events.toList();
+      } on StateError catch (_) {
+        // It's possible that [watcher.events] is closed but the onDone event
+        // hasn't reached us yet. It's fine if so.
+        return new Future.value();
+      }
+    })).then((_) => close());
+  }
+
+  /// Like [Stream.listen], but automatically adds the subscription to
+  /// [_subscriptions] so that it can be canceled when [close] is called.
+  void _listen(Stream stream, void onData(event), {Function onError,
+      void onDone(), bool cancelOnError}) {
+    var subscription;
+    subscription = stream.listen(onData, onError: onError, onDone: () {
+      _subscriptions.remove(subscription);
+      if (onDone != null) onDone();
+    }, cancelOnError: cancelOnError);
+    _subscriptions.add(subscription);
+  }
+}
+
+/// An enum for the possible states of entries in a watched directory.
+class _EntryState {
+  final String _name;
+
+  /// The entry is a file.
+  static const FILE = const _EntryState._("file");
+
+  /// The entry is a directory.
+  static const DIRECTORY = const _EntryState._("directory");
+
+  const _EntryState._(this._name);
+
+  /// Returns [DIRECTORY] if [isDir] is true, and [FILE] otherwise.
+  factory _EntryState(bool isDir) =>
+      isDir ? _EntryState.DIRECTORY : _EntryState.FILE;
+
+  String toString() => _name;
+}
diff --git a/pkgs/watcher/lib/src/directory_watcher/polling.dart b/pkgs/watcher/lib/src/directory_watcher/polling.dart
new file mode 100644
index 0000000..91ca005
--- /dev/null
+++ b/pkgs/watcher/lib/src/directory_watcher/polling.dart
@@ -0,0 +1,217 @@
+// Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library watcher.directory_watcher.polling;
+
+import 'dart:async';
+import 'dart:io';
+
+import 'package:crypto/crypto.dart';
+
+import '../async_queue.dart';
+import '../directory_watcher.dart';
+import '../stat.dart';
+import '../utils.dart';
+import '../watch_event.dart';
+import 'resubscribable.dart';
+
+/// Periodically polls a directory for changes.
+class PollingDirectoryWatcher extends ResubscribableDirectoryWatcher {
+  /// Creates a new polling watcher monitoring [directory].
+  ///
+  /// If [_pollingDelay] is passed, it specifies the amount of time the watcher
+  /// will pause between successive polls of the directory contents. Making this
+  /// shorter will give more immediate feedback at the expense of doing more IO
+  /// and higher CPU usage. Defaults to one second.
+  PollingDirectoryWatcher(String directory, {Duration pollingDelay})
+      : super(directory, () {
+        return new _PollingDirectoryWatcher(directory,
+            pollingDelay != null ? pollingDelay : new Duration(seconds: 1));
+      });
+}
+
+class _PollingDirectoryWatcher implements ManuallyClosedDirectoryWatcher {
+  final String directory;
+
+  Stream<WatchEvent> get events => _events.stream;
+  final _events = new StreamController<WatchEvent>.broadcast();
+
+  bool get isReady => _ready.isCompleted;
+
+  Future get ready => _ready.future;
+  final _ready = new Completer();
+
+  /// The amount of time the watcher pauses between successive polls of the
+  /// directory contents.
+  final Duration _pollingDelay;
+
+  /// The previous status of the files in the directory.
+  ///
+  /// Used to tell which files have been modified.
+  final _statuses = new Map<String, _FileStatus>();
+
+  /// The subscription used while [directory] is being listed.
+  ///
+  /// Will be `null` if a list is not currently happening.
+  StreamSubscription<FileSystemEntity> _listSubscription;
+
+  /// The queue of files waiting to be processed to see if they have been
+  /// modified.
+  ///
+  /// Processing a file is asynchronous, as is listing the directory, so the
+  /// queue exists to let each of those proceed at their own rate. The lister
+  /// will enqueue files as quickly as it can. Meanwhile, files are dequeued
+  /// and processed sequentially.
+  AsyncQueue<String> _filesToProcess;
+
+  /// The set of files that have been seen in the current directory listing.
+  ///
+  /// Used to tell which files have been removed: files that are in [_statuses]
+  /// but not in here when a poll completes have been removed.
+  final _polledFiles = new Set<String>();
+
+  _PollingDirectoryWatcher(this.directory, this._pollingDelay) {
+    _filesToProcess = new AsyncQueue<String>(_processFile,
+        onError: _events.addError);
+
+    _poll();
+  }
+
+  void close() {
+    _events.close();
+
+    // If we're in the middle of listing the directory, stop.
+    if (_listSubscription != null) _listSubscription.cancel();
+
+    // Don't process any remaining files.
+    _filesToProcess.clear();
+    _polledFiles.clear();
+    _statuses.clear();
+  }
+
+  /// Scans the contents of the directory once to see which files have been
+  /// added, removed, and modified.
+  void _poll() {
+    _filesToProcess.clear();
+    _polledFiles.clear();
+
+    endListing() {
+      assert(!_events.isClosed);
+      _listSubscription = null;
+
+      // Null tells the queue consumer that we're done listing.
+      _filesToProcess.add(null);
+    }
+
+    var stream = new Directory(directory).list(recursive: true);
+    _listSubscription = stream.listen((entity) {
+      assert(!_events.isClosed);
+
+      if (entity is! File) return;
+      _filesToProcess.add(entity.path);
+    }, onError: (error, stackTrace) {
+      if (!isDirectoryNotFoundException(error)) {
+        // It's some unknown error. Pipe it over to the event stream so the
+        // user can see it.
+        _events.addError(error, stackTrace);
+      }
+
+      // When an error occurs, we end the listing normally, which has the
+      // desired effect of marking all files that were in the directory as
+      // being removed.
+      endListing();
+    }, onDone: endListing, cancelOnError: true);
+  }
+
+  /// Processes [file] to determine if it has been modified since the last
+  /// time it was scanned.
+  Future _processFile(String file) {
+    // `null` is the sentinel which means the directory listing is complete.
+    if (file == null) return _completePoll();
+
+    return getModificationTime(file).then((modified) {
+      if (_events.isClosed) return null;
+
+      var lastStatus = _statuses[file];
+
+      // If its modification time hasn't changed, assume the file is unchanged.
+      if (lastStatus != null && lastStatus.modified == modified) {
+        // The file is still here.
+        _polledFiles.add(file);
+        return null;
+      }
+
+      return _hashFile(file).then((hash) {
+        if (_events.isClosed) return;
+
+        var status = new _FileStatus(modified, hash);
+        _statuses[file] = status;
+        _polledFiles.add(file);
+
+        // Only notify if we're ready to emit events.
+        if (!isReady) return;
+
+        // And the file is different.
+        var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash);
+        if (!changed) return;
+
+        var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY;
+        _events.add(new WatchEvent(type, file));
+      });
+    });
+  }
+
+  /// After the directory listing is complete, this determines which files were
+  /// removed and then restarts the next poll.
+  Future _completePoll() {
+    // Any files that were not seen in the last poll but that we have a
+    // status for must have been removed.
+    var removedFiles = _statuses.keys.toSet().difference(_polledFiles);
+    for (var removed in removedFiles) {
+      if (isReady) _events.add(new WatchEvent(ChangeType.REMOVE, removed));
+      _statuses.remove(removed);
+    }
+
+    if (!isReady) _ready.complete();
+
+    // Wait and then poll again.
+    return new Future.delayed(_pollingDelay).then((_) {
+      if (_events.isClosed) return;
+      _poll();
+    });
+  }
+
+  /// Calculates the SHA-1 hash of the file at [path].
+  Future<List<int>> _hashFile(String path) {
+    return new File(path).readAsBytes().then((bytes) {
+      var sha1 = new SHA1();
+      sha1.add(bytes);
+      return sha1.close();
+    });
+  }
+
+  /// Returns `true` if [a] and [b] are the same hash value, i.e. the same
+  /// series of byte values.
+  bool _sameHash(List<int> a, List<int> b) {
+    // Hashes should always be the same size.
+    assert(a.length == b.length);
+
+    for (var i = 0; i < a.length; i++) {
+      if (a[i] != b[i]) return false;
+    }
+
+    return true;
+  }
+}
+
+class _FileStatus {
+  /// The last time the file was modified.
+  DateTime modified;
+
+  /// The SHA-1 hash of the contents of the file.
+  List<int> hash;
+
+  _FileStatus(this.modified, this.hash);
+}
+
diff --git a/pkgs/watcher/lib/src/directory_watcher/resubscribable.dart b/pkgs/watcher/lib/src/directory_watcher/resubscribable.dart
new file mode 100644
index 0000000..daa813a
--- /dev/null
+++ b/pkgs/watcher/lib/src/directory_watcher/resubscribable.dart
@@ -0,0 +1,79 @@
+// Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library watcher.directory_watcher.resubscribable;
+
+import 'dart:async';
+import 'dart:io';
+
+import '../directory_watcher.dart';
+import '../utils.dart';
+import '../watch_event.dart';
+
+typedef ManuallyClosedDirectoryWatcher WatcherFactory();
+
+/// A wrapper for [ManuallyClosedDirectoryWatcher] that encapsulates support for
+/// closing the watcher when it has no subscribers and re-opening it when it's
+/// re-subscribed.
+///
+/// It's simpler to implement watchers without worrying about this behavior.
+/// This class wraps a watcher class which can be written with the simplifying
+/// assumption that it can continue emitting events until an explicit `close`
+/// method is called, at which point it will cease emitting events entirely. The
+/// [ManuallyClosedDirectoryWatcher] interface is used for these watchers.
+///
+/// This would be more cleanly implemented as a function that takes a class and
+/// emits a new class, but Dart doesn't support that sort of thing. Instead it
+/// takes a factory function that produces instances of the inner class.
+abstract class ResubscribableDirectoryWatcher implements DirectoryWatcher {
+  /// The factory function that produces instances of the inner class.
+  final WatcherFactory _factory;
+
+  final String directory;
+
+  Stream<WatchEvent> get events => _eventsController.stream;
+  StreamController<WatchEvent> _eventsController;
+
+  bool get isReady => _readyCompleter.isCompleted;
+
+  Future get ready => _readyCompleter.future;
+  var _readyCompleter = new Completer();
+
+  /// Creates a new [ResubscribableDirectoryWatcher] wrapping the watchers
+  /// emitted by [_factory].
+  ResubscribableDirectoryWatcher(this.directory, this._factory) {
+    var watcher;
+    var subscription;
+
+    _eventsController = new StreamController<WatchEvent>.broadcast(
+        onListen: () {
+      watcher = _factory();
+      subscription = watcher.events.listen(_eventsController.add,
+          onError: _eventsController.addError,
+          onDone: _eventsController.close);
+
+      // It's important that we complete the value of [_readyCompleter] at the
+      // time [onListen] is called, as opposed to the value when [watcher.ready]
+      // fires. A new completer may be created by that time.
+      watcher.ready.then(_readyCompleter.complete);
+    }, onCancel: () {
+      // Cancel the subscription before closing the watcher so that the
+      // watcher's `onDone` event doesn't close [events].
+      subscription.cancel();
+      watcher.close();
+      _readyCompleter = new Completer();
+    }, sync: true);
+  }
+}
+
+/// An interface for watchers with an explicit, manual [close] method.
+///
+/// See [ResubscribableDirectoryWatcher].
+abstract class ManuallyClosedDirectoryWatcher implements DirectoryWatcher {
+  /// Closes the watcher.
+  ///
+  /// Subclasses should close their [events] stream and release any internal
+  /// resources.
+  void close();
+}
diff --git a/pkgs/watcher/lib/src/utils.dart b/pkgs/watcher/lib/src/utils.dart
index 3d00c08..e4e4457 100644
--- a/pkgs/watcher/lib/src/utils.dart
+++ b/pkgs/watcher/lib/src/utils.dart
@@ -4,7 +4,9 @@
 
 library watcher.utils;
 
+import 'dart:async';
 import 'dart:io';
+import 'dart:collection';
 
 /// Returns `true` if [error] is a [FileSystemException] for a missing
 /// directory.
@@ -15,3 +17,57 @@
   var notFoundCode = Platform.operatingSystem == "windows" ? 3 : 2;
   return error.osError.errorCode == notFoundCode;
 }
+
+/// Returns a buffered stream that will emit the same values as the stream
+/// returned by [future] once [future] completes.
+///
+/// If [future] completes to an error, the return value will emit that error and
+/// then close.
+Stream futureStream(Future<Stream> future) {
+  var controller = new StreamController(sync: true);
+  future.then((stream) {
+    stream.listen(
+        controller.add,
+        onError: controller.addError,
+        onDone: controller.close);
+  }).catchError((e, stackTrace) {
+    controller.addError(e, stackTrace);
+    controller.close();
+  });
+  return controller.stream;
+}
+
+/// Like [new Future], but avoids around issue 11911 by using [new Future.value]
+/// under the covers.
+Future newFuture(callback()) => new Future.value().then((_) => callback());
+
+/// A stream transformer that batches all events that are sent at the same time.
+///
+/// When multiple events are synchronously added to a stream controller, the
+/// [StreamController] implementation uses [scheduleMicrotask] to schedule the
+/// asynchronous firing of each event. In order to recreate the synchronous
+/// batches, this collates all the events that are received in "nearby"
+/// microtasks.
+class BatchedStreamTransformer<T> implements StreamTransformer<T, List<T>> {
+  Stream<List<T>> bind(Stream<T> input) {
+    var batch = new Queue();
+    return new StreamTransformer<T, List<T>>.fromHandlers(
+        handleData: (event, sink) {
+      batch.add(event);
+
+      // [Timer.run] schedules an event that runs after any microtasks that have
+      // been scheduled.
+      Timer.run(() {
+        if (batch.isEmpty) return;
+        sink.add(batch.toList());
+        batch.clear();
+      });
+    }, handleDone: (sink) {
+      if (batch.isNotEmpty) {
+        sink.add(batch.toList());
+        batch.clear();
+      }
+      sink.close();
+    }).bind(input);
+  }
+}
diff --git a/pkgs/watcher/lib/watcher.dart b/pkgs/watcher/lib/watcher.dart
index c4824b8..88531f2 100644
--- a/pkgs/watcher/lib/watcher.dart
+++ b/pkgs/watcher/lib/watcher.dart
@@ -6,3 +6,4 @@
 
 export 'src/watch_event.dart';
 export 'src/directory_watcher.dart';
+export 'src/directory_watcher/polling.dart';
diff --git a/pkgs/watcher/test/directory_watcher/linux_test.dart b/pkgs/watcher/test/directory_watcher/linux_test.dart
new file mode 100644
index 0000000..ba69569
--- /dev/null
+++ b/pkgs/watcher/test/directory_watcher/linux_test.dart
@@ -0,0 +1,62 @@
+// Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:scheduled_test/scheduled_test.dart';
+import 'package:watcher/src/directory_watcher/linux.dart';
+import 'package:watcher/watcher.dart';
+
+import 'shared.dart';
+import '../utils.dart';
+
+main() {
+  initConfig();
+
+  watcherFactory = (dir) => new LinuxDirectoryWatcher(dir);
+
+  setUp(() {
+    // Increase the timeout because closing a [Directory.watch] stream blocks
+    // the main isolate for a very long time on Goobuntu, as of kernel
+    // 3.2.5-gg1336 (see issue 14606).
+    currentSchedule.timeout *= 3;
+
+    createSandbox();
+  });
+
+  sharedTests();
+
+  test('DirectoryWatcher creates a LinuxDirectoryWatcher on Linux', () {
+    expect(new DirectoryWatcher('.'),
+        new isInstanceOf<LinuxDirectoryWatcher>());
+  });
+
+  test('notifies even if the file contents are unchanged', () {
+    writeFile("a.txt", contents: "same");
+    writeFile("b.txt", contents: "before");
+    startWatcher();
+    writeFile("a.txt", contents: "same");
+    writeFile("b.txt", contents: "after");
+    expectModifyEvent("a.txt");
+    expectModifyEvent("b.txt");
+  });
+
+  test('emits events for many nested files moved out then immediately back in',
+      () {
+    withPermutations((i, j, k) =>
+        writeFile("dir/sub/sub-$i/sub-$j/file-$k.txt"));
+    startWatcher(dir: "dir");
+
+    renameDir("dir/sub", "sub");
+    renameDir("sub", "dir/sub");
+
+    inAnyOrder(() {
+      withPermutations((i, j, k) =>
+          expectRemoveEvent("dir/sub/sub-$i/sub-$j/file-$k.txt"));
+    });
+
+    inAnyOrder(() {
+      withPermutations((i, j, k) =>
+          expectAddEvent("dir/sub/sub-$i/sub-$j/file-$k.txt"));
+    });
+  });
+}
diff --git a/pkgs/watcher/test/directory_watcher/polling_test.dart b/pkgs/watcher/test/directory_watcher/polling_test.dart
new file mode 100644
index 0000000..02ed5d2
--- /dev/null
+++ b/pkgs/watcher/test/directory_watcher/polling_test.dart
@@ -0,0 +1,39 @@
+// Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:scheduled_test/scheduled_test.dart';
+import 'package:watcher/watcher.dart';
+
+import 'shared.dart';
+import '../utils.dart';
+
+main() {
+  initConfig();
+
+  // Use a short delay to make the tests run quickly.
+  watcherFactory = (dir) => new PollingDirectoryWatcher(dir,
+      pollingDelay: new Duration(milliseconds: 100));
+
+  setUp(createSandbox);
+
+  sharedTests();
+
+  test('does not notify if the file contents are unchanged', () {
+    writeFile("a.txt", contents: "same");
+    writeFile("b.txt", contents: "before");
+    startWatcher();
+    writeFile("a.txt", contents: "same");
+    writeFile("b.txt", contents: "after");
+    expectModifyEvent("b.txt");
+  });
+
+  test('does not notify if the modification time did not change', () {
+    writeFile("a.txt", contents: "before");
+    writeFile("b.txt", contents: "before");
+    startWatcher();
+    writeFile("a.txt", contents: "after", updateModified: false);
+    writeFile("b.txt", contents: "after");
+    expectModifyEvent("b.txt");
+  });
+}
diff --git a/pkgs/watcher/test/directory_watcher/shared.dart b/pkgs/watcher/test/directory_watcher/shared.dart
new file mode 100644
index 0000000..eed6069
--- /dev/null
+++ b/pkgs/watcher/test/directory_watcher/shared.dart
@@ -0,0 +1,222 @@
+// Copyright (c) 2012, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:scheduled_test/scheduled_test.dart';
+
+import '../utils.dart';
+
+sharedTests() {
+  test('does not notify for files that already exist when started', () {
+    // Make some pre-existing files.
+    writeFile("a.txt");
+    writeFile("b.txt");
+
+    startWatcher();
+
+    // Change one after the watcher is running.
+    writeFile("b.txt", contents: "modified");
+
+    // We should get a modify event for the changed file, but no add events
+    // for them before this.
+    expectModifyEvent("b.txt");
+  });
+
+  test('notifies when a file is added', () {
+    startWatcher();
+    writeFile("file.txt");
+    expectAddEvent("file.txt");
+  });
+
+  test('notifies when a file is modified', () {
+    writeFile("file.txt");
+    startWatcher();
+    writeFile("file.txt", contents: "modified");
+    expectModifyEvent("file.txt");
+  });
+
+  test('notifies when a file is removed', () {
+    writeFile("file.txt");
+    startWatcher();
+    deleteFile("file.txt");
+    expectRemoveEvent("file.txt");
+  });
+
+  test('notifies when a file is modified multiple times', () {
+    writeFile("file.txt");
+    startWatcher();
+    writeFile("file.txt", contents: "modified");
+    expectModifyEvent("file.txt");
+    writeFile("file.txt", contents: "modified again");
+    expectModifyEvent("file.txt");
+  });
+
+  test('when the watched directory is deleted, removes all files', () {
+    writeFile("dir/a.txt");
+    writeFile("dir/b.txt");
+
+    startWatcher(dir: "dir");
+
+    deleteDir("dir");
+    inAnyOrder(() {
+      expectRemoveEvent("dir/a.txt");
+      expectRemoveEvent("dir/b.txt");
+    });
+  });
+
+  group("moves", () {
+    test('notifies when a file is moved within the watched directory', () {
+      writeFile("old.txt");
+      startWatcher();
+      renameFile("old.txt", "new.txt");
+
+      inAnyOrder(() {
+        expectAddEvent("new.txt");
+        expectRemoveEvent("old.txt");
+      });
+    });
+
+    test('notifies when a file is moved from outside the watched directory',
+        () {
+      writeFile("old.txt");
+      createDir("dir");
+      startWatcher(dir: "dir");
+
+      renameFile("old.txt", "dir/new.txt");
+      expectAddEvent("dir/new.txt");
+    });
+
+    test('notifies when a file is moved outside the watched directory', () {
+      writeFile("dir/old.txt");
+      startWatcher(dir: "dir");
+
+      renameFile("dir/old.txt", "new.txt");
+      expectRemoveEvent("dir/old.txt");
+    });
+  });
+
+  group("clustered changes", () {
+    test("doesn't notify when a file is created and then immediately removed",
+        () {
+      startWatcher();
+      writeFile("file.txt");
+      deleteFile("file.txt");
+
+      // [startWatcher] will assert that no events were fired.
+    });
+
+    test("reports a modification when a file is deleted and then immediately "
+        "recreated", () {
+      writeFile("file.txt");
+      startWatcher();
+
+      deleteFile("file.txt");
+      writeFile("file.txt", contents: "re-created");
+      expectModifyEvent("file.txt");
+    });
+
+    test("reports a modification when a file is moved and then immediately "
+        "recreated", () {
+      writeFile("old.txt");
+      startWatcher();
+
+      renameFile("old.txt", "new.txt");
+      writeFile("old.txt", contents: "re-created");
+      inAnyOrder(() {
+        expectModifyEvent("old.txt");
+        expectAddEvent("new.txt");
+      });
+    });
+
+    test("reports a removal when a file is modified and then immediately "
+        "removed", () {
+      writeFile("file.txt");
+      startWatcher();
+
+      writeFile("file.txt", contents: "modified");
+      deleteFile("file.txt");
+      expectRemoveEvent("file.txt");
+    });
+
+    test("reports an add when a file is added and then immediately modified",
+        () {
+      startWatcher();
+
+      writeFile("file.txt");
+      writeFile("file.txt", contents: "modified");
+      expectAddEvent("file.txt");
+    });
+  });
+
+  group("subdirectories", () {
+    test('watches files in subdirectories', () {
+      startWatcher();
+      writeFile("a/b/c/d/file.txt");
+      expectAddEvent("a/b/c/d/file.txt");
+    });
+
+    test('notifies when a subdirectory is moved within the watched directory '
+        'and then its contents are modified', () {
+      writeFile("old/file.txt");
+      startWatcher();
+
+      renameDir("old", "new");
+      inAnyOrder(() {
+        expectRemoveEvent("old/file.txt");
+        expectAddEvent("new/file.txt");
+      });
+
+      writeFile("new/file.txt", contents: "modified");
+      expectModifyEvent("new/file.txt");
+    });
+
+    test('emits events for many nested files added at once', () {
+      withPermutations((i, j, k) =>
+          writeFile("sub/sub-$i/sub-$j/file-$k.txt"));
+
+      createDir("dir");
+      startWatcher(dir: "dir");
+      renameDir("sub", "dir/sub");
+
+      inAnyOrder(() {
+        withPermutations((i, j, k)  =>
+            expectAddEvent("dir/sub/sub-$i/sub-$j/file-$k.txt"));
+      });
+    });
+
+    test('emits events for many nested files removed at once', () {
+      withPermutations((i, j, k) =>
+          writeFile("dir/sub/sub-$i/sub-$j/file-$k.txt"));
+
+      createDir("dir");
+      startWatcher(dir: "dir");
+
+      // Rename the directory rather than deleting it because native watchers
+      // report a rename as a single DELETE event for the directory, whereas
+      // they report recursive deletion with DELETE events for every file in the
+      // directory.
+      renameDir("dir/sub", "sub");
+
+      inAnyOrder(() {
+        withPermutations((i, j, k) =>
+            expectRemoveEvent("dir/sub/sub-$i/sub-$j/file-$k.txt"));
+      });
+    });
+
+    test('emits events for many nested files moved at once', () {
+      withPermutations((i, j, k) =>
+          writeFile("dir/old/sub-$i/sub-$j/file-$k.txt"));
+
+      createDir("dir");
+      startWatcher(dir: "dir");
+      renameDir("dir/old", "dir/new");
+
+      inAnyOrder(() {
+        withPermutations((i, j, k) {
+          expectRemoveEvent("dir/old/sub-$i/sub-$j/file-$k.txt");
+          expectAddEvent("dir/new/sub-$i/sub-$j/file-$k.txt");
+        });
+      });
+    });
+  });
+}
diff --git a/pkgs/watcher/test/directory_watcher_test.dart b/pkgs/watcher/test/directory_watcher_test.dart
deleted file mode 100644
index 841dd08..0000000
--- a/pkgs/watcher/test/directory_watcher_test.dart
+++ /dev/null
@@ -1,108 +0,0 @@
-// Copyright (c) 2012, the Dart project authors.  Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-import 'package:scheduled_test/scheduled_test.dart';
-
-import 'utils.dart';
-
-main() {
-  initConfig();
-
-  setUp(createSandbox);
-
-  test('does not notify for files that already exist when started', () {
-    // Make some pre-existing files.
-    writeFile("a.txt");
-    writeFile("b.txt");
-
-    createWatcher();
-
-    // Change one after the watcher is running.
-    writeFile("b.txt", contents: "modified");
-
-    // We should get a modify event for the changed file, but no add events
-    // for them before this.
-    expectModifyEvent("b.txt");
-  });
-
-  test('notifies when a file is added', () {
-    createWatcher();
-    writeFile("file.txt");
-    expectAddEvent("file.txt");
-  });
-
-  test('notifies when a file is modified', () {
-    writeFile("file.txt");
-    createWatcher();
-    writeFile("file.txt", contents: "modified");
-    expectModifyEvent("file.txt");
-  });
-
-  test('notifies when a file is removed', () {
-    writeFile("file.txt");
-    createWatcher();
-    deleteFile("file.txt");
-    expectRemoveEvent("file.txt");
-  });
-
-  test('notifies when a file is moved', () {
-    writeFile("old.txt");
-    createWatcher();
-    renameFile("old.txt", "new.txt");
-    expectAddEvent("new.txt");
-    expectRemoveEvent("old.txt");
-  });
-
-  test('notifies when a file is modified multiple times', () {
-    writeFile("file.txt");
-    createWatcher();
-    writeFile("file.txt", contents: "modified");
-    expectModifyEvent("file.txt");
-    writeFile("file.txt", contents: "modified again");
-    expectModifyEvent("file.txt");
-  });
-
-  test('does not notify if the file contents are unchanged', () {
-    writeFile("a.txt", contents: "same");
-    writeFile("b.txt", contents: "before");
-    createWatcher();
-    writeFile("a.txt", contents: "same");
-    writeFile("b.txt", contents: "after");
-    expectModifyEvent("b.txt");
-  });
-
-  test('does not notify if the modification time did not change', () {
-    writeFile("a.txt", contents: "before");
-    writeFile("b.txt", contents: "before");
-    createWatcher();
-    writeFile("a.txt", contents: "after", updateModified: false);
-    writeFile("b.txt", contents: "after");
-    expectModifyEvent("b.txt");
-  });
-
-  test('watches files in subdirectories', () {
-    createWatcher();
-    writeFile("a/b/c/d/file.txt");
-    expectAddEvent("a/b/c/d/file.txt");
-  });
-
-  test('watches a directory created after the watcher', () {
-    // Watch a subdirectory that doesn't exist yet.
-    createWatcher(dir: "a");
-
-    // This implicity creates it.
-    writeFile("a/b/c/d/file.txt");
-    expectAddEvent("a/b/c/d/file.txt");
-  });
-
-  test('when the watched directory is deleted, removes all files', () {
-    writeFile("dir/a.txt");
-    writeFile("dir/b.txt");
-
-    createWatcher(dir: "dir");
-
-    deleteDir("dir");
-    expectRemoveEvents(["dir/a.txt", "dir/b.txt"]);
-  });
-}
diff --git a/pkgs/watcher/test/no_subscription/linux_test.dart b/pkgs/watcher/test/no_subscription/linux_test.dart
new file mode 100644
index 0000000..7978830
--- /dev/null
+++ b/pkgs/watcher/test/no_subscription/linux_test.dart
@@ -0,0 +1,20 @@
+// Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:scheduled_test/scheduled_test.dart';
+import 'package:watcher/src/directory_watcher/linux.dart';
+import 'package:watcher/watcher.dart';
+
+import 'shared.dart';
+import '../utils.dart';
+
+main() {
+  initConfig();
+
+  watcherFactory = (dir) => new LinuxDirectoryWatcher(dir);
+
+  setUp(createSandbox);
+
+  sharedTests();
+}
\ No newline at end of file
diff --git a/pkgs/watcher/test/no_subscription/polling_test.dart b/pkgs/watcher/test/no_subscription/polling_test.dart
new file mode 100644
index 0000000..fa4f0cb
--- /dev/null
+++ b/pkgs/watcher/test/no_subscription/polling_test.dart
@@ -0,0 +1,19 @@
+// Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:scheduled_test/scheduled_test.dart';
+import 'package:watcher/watcher.dart';
+
+import 'shared.dart';
+import '../utils.dart';
+
+main() {
+  initConfig();
+
+  watcherFactory = (dir) => new PollingDirectoryWatcher(dir);
+
+  setUp(createSandbox);
+
+  sharedTests();
+}
\ No newline at end of file
diff --git a/pkgs/watcher/test/no_subscription_test.dart b/pkgs/watcher/test/no_subscription/shared.dart
similarity index 73%
rename from pkgs/watcher/test/no_subscription_test.dart
rename to pkgs/watcher/test/no_subscription/shared.dart
index 2e7b6d3..cd279e1 100644
--- a/pkgs/watcher/test/no_subscription_test.dart
+++ b/pkgs/watcher/test/no_subscription/shared.dart
@@ -7,14 +7,10 @@
 import 'package:scheduled_test/scheduled_test.dart';
 import 'package:watcher/watcher.dart';
 
-import 'utils.dart';
+import '../utils.dart';
 
-main() {
-  initConfig();
-
-  setUp(createSandbox);
-
-  test('does not notify for changes when there were no subscribers', () {
+sharedTests() {
+  test('does not notify for changes when there are no subscribers', () {
     // Note that this test doesn't rely as heavily on the test functions in
     // utils.dart because it needs to be very explicit about when the event
     // stream is and is not subscribed.
@@ -51,16 +47,10 @@
         expect(event.path, endsWith("added.txt"));
         completer.complete();
       }));
-    });
 
-    // The watcher will have been cancelled and then resumed in the middle of
-    // its pause between polling loops. That means the second scan to skip
-    // what changed while we were unsubscribed won't happen until after that
-    // delay is done. Wait long enough for that to happen.
-    //
-    // We're doing * 4 here because that seems to give the slower bots enough
-    // time for this to complete.
-    schedule(() => new Future.delayed(watcher.pollingDelay * 4));
+      // Wait until the watcher is ready to dispatch events again.
+      return watcher.ready;
+    });
 
     // And add a third file.
     writeFile("added.txt");
diff --git a/pkgs/watcher/test/ready/linux_test.dart b/pkgs/watcher/test/ready/linux_test.dart
new file mode 100644
index 0000000..7978830
--- /dev/null
+++ b/pkgs/watcher/test/ready/linux_test.dart
@@ -0,0 +1,20 @@
+// Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:scheduled_test/scheduled_test.dart';
+import 'package:watcher/src/directory_watcher/linux.dart';
+import 'package:watcher/watcher.dart';
+
+import 'shared.dart';
+import '../utils.dart';
+
+main() {
+  initConfig();
+
+  watcherFactory = (dir) => new LinuxDirectoryWatcher(dir);
+
+  setUp(createSandbox);
+
+  sharedTests();
+}
\ No newline at end of file
diff --git a/pkgs/watcher/test/ready/polling_test.dart b/pkgs/watcher/test/ready/polling_test.dart
new file mode 100644
index 0000000..fa4f0cb
--- /dev/null
+++ b/pkgs/watcher/test/ready/polling_test.dart
@@ -0,0 +1,19 @@
+// Copyright (c) 2013, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:scheduled_test/scheduled_test.dart';
+import 'package:watcher/watcher.dart';
+
+import 'shared.dart';
+import '../utils.dart';
+
+main() {
+  initConfig();
+
+  watcherFactory = (dir) => new PollingDirectoryWatcher(dir);
+
+  setUp(createSandbox);
+
+  sharedTests();
+}
\ No newline at end of file
diff --git a/pkgs/watcher/test/ready_test.dart b/pkgs/watcher/test/ready/shared.dart
similarity index 95%
rename from pkgs/watcher/test/ready_test.dart
rename to pkgs/watcher/test/ready/shared.dart
index 11b77e0..af1b58f 100644
--- a/pkgs/watcher/test/ready_test.dart
+++ b/pkgs/watcher/test/ready/shared.dart
@@ -2,17 +2,11 @@
 // for details. All rights reserved. Use of this source code is governed by a
 // BSD-style license that can be found in the LICENSE file.
 
-import 'dart:async';
-
 import 'package:scheduled_test/scheduled_test.dart';
 
-import 'utils.dart';
+import '../utils.dart';
 
-main() {
-  initConfig();
-
-  setUp(createSandbox);
-
+sharedTests() {
   test('ready does not complete until after subscription', () {
     var watcher = createWatcher(waitForReady: false);
 
diff --git a/pkgs/watcher/test/utils.dart b/pkgs/watcher/test/utils.dart
index 28d57b0..d1b9f94 100644
--- a/pkgs/watcher/test/utils.dart
+++ b/pkgs/watcher/test/utils.dart
@@ -5,6 +5,7 @@
 library watcher.test.utils;
 
 import 'dart:async';
+import 'dart:collection';
 import 'dart:io';
 
 import 'package:path/path.dart' as p;
@@ -12,6 +13,7 @@
 import 'package:unittest/compact_vm_config.dart';
 import 'package:watcher/watcher.dart';
 import 'package:watcher/src/stat.dart';
+import 'package:watcher/src/utils.dart';
 
 /// The path to the temporary sandbox created for each test. All file
 /// operations are implicitly relative to this directory.
@@ -36,6 +38,14 @@
 /// increment the mod time for that file instantly.
 Map<String, int> _mockFileModificationTimes;
 
+typedef DirectoryWatcher WatcherFactory(String directory);
+
+/// Sets the function used to create the directory watcher.
+set watcherFactory(WatcherFactory factory) {
+  _watcherFactory = factory;
+}
+WatcherFactory _watcherFactory;
+
 void initConfig() {
   useCompactVMConfiguration();
   filterStacks = true;
@@ -54,10 +64,10 @@
     path = p.normalize(p.relative(path, from: _sandboxDir));
 
     // Make sure we got a path in the sandbox.
-    assert(p.isRelative(path)  && !path.startsWith(".."));
+    assert(p.isRelative(path) && !path.startsWith(".."));
 
-    return new DateTime.fromMillisecondsSinceEpoch(
-        _mockFileModificationTimes[path]);
+    var mtime = _mockFileModificationTimes[path];
+    return new DateTime.fromMillisecondsSinceEpoch(mtime == null ? 0 : mtime);
   });
 
   // Delete the sandbox when done.
@@ -86,68 +96,113 @@
     dir = p.join(_sandboxDir, dir);
   }
 
-  // Use a short delay to make the tests run quickly.
-  _watcher = new DirectoryWatcher(dir,
-      pollingDelay: new Duration(milliseconds: 100));
+  var watcher = _watcherFactory(dir);
 
   // Wait until the scan is finished so that we don't miss changes to files
   // that could occur before the scan completes.
   if (waitForReady != false) {
-    schedule(() => _watcher.ready, "wait for watcher to be ready");
+    schedule(() => watcher.ready, "wait for watcher to be ready");
   }
 
-  currentSchedule.onComplete.schedule(() {
-    _nextEvent = 0;
-    _watcher = null;
-  }, "reset watcher");
-
-  return _watcher;
+  return watcher;
 }
 
-/// Expects that the next set of events will all be changes of [type] on
-/// [paths].
+/// The stream of events from the watcher started with [startWatcher].
+Stream _watcherEvents;
+
+/// Creates a new [DirectoryWatcher] that watches a temporary directory and
+/// starts monitoring it for events.
 ///
-/// Validates that events are delivered for all paths in [paths], but allows
-/// them in any order.
-void expectEvents(ChangeType type, Iterable<String> paths) {
-  var pathSet = paths
-      .map((path) => p.join(_sandboxDir, path))
-      .map(p.normalize)
-      .toSet();
+/// If [dir] is provided, watches a subdirectory in the sandbox with that name.
+void startWatcher({String dir}) {
+  // We want to wait until we're ready *after* we subscribe to the watcher's
+  // events.
+  _watcher = createWatcher(dir: dir, waitForReady: false);
 
-  // Create an expectation for as many paths as we have.
-  var futures = [];
+  // Schedule [_watcher.events.listen] so that the watcher doesn't start
+  // watching [dir] before it exists. Expose [_watcherEvents] immediately so
+  // that it can be accessed synchronously after this.
+  _watcherEvents = futureStream(schedule(() {
+    var allEvents = new Queue();
+    var subscription = _watcher.events.listen(allEvents.add,
+        onError: currentSchedule.signalError);
 
-  for (var i = 0; i < paths.length; i++) {
-    // Immediately create the futures. This ensures we don't register too
-    // late and drop the event before we receive it.
-    var future = _watcher.events.elementAt(_nextEvent++).then((event) {
-      expect(event.type, equals(type));
-      expect(pathSet, contains(event.path));
+    currentSchedule.onComplete.schedule(() {
+      var numEvents = _nextEvent;
+      subscription.cancel();
+      _nextEvent = 0;
+      _watcher = null;
 
-      pathSet.remove(event.path);
-    });
+      // If there are already errors, don't add this to the output and make
+      // people think it might be the root cause.
+      if (currentSchedule.errors.isEmpty) {
+        expect(allEvents, hasLength(numEvents));
+      }
+    }, "reset watcher");
 
-    // Make sure the schedule is watching it in case it fails.
-    currentSchedule.wrapFuture(future);
+    return _watcher.events;
+  }, "create watcher")).asBroadcastStream();
 
-    futures.add(future);
+  schedule(() => _watcher.ready, "wait for watcher to be ready");
+}
+
+/// A future set by [inAnyOrder] that will complete to the set of events that
+/// occur in the [inAnyOrder] block.
+Future<Set<WatchEvent>> _unorderedEventFuture;
+
+/// Runs [block] and allows multiple [expectEvent] calls in that block to match
+/// events in any order.
+void inAnyOrder(block()) {
+  var oldFuture = _unorderedEventFuture;
+  try {
+    var firstEvent = _nextEvent;
+    var completer = new Completer();
+    _unorderedEventFuture = completer.future;
+    block();
+
+    _watcherEvents.skip(firstEvent).take(_nextEvent - firstEvent).toSet()
+        .then(completer.complete, onError: completer.completeError);
+    currentSchedule.wrapFuture(_unorderedEventFuture,
+        "waiting for ${_nextEvent - firstEvent} events");
+  } finally {
+    _unorderedEventFuture = oldFuture;
   }
-
-  // Schedule it so that later file modifications don't occur until after this
-  // event is received.
-  schedule(() => Future.wait(futures),
-      "wait for $type events on ${paths.join(', ')}");
 }
 
-void expectAddEvent(String path) => expectEvents(ChangeType.ADD, [path]);
-void expectModifyEvent(String path) => expectEvents(ChangeType.MODIFY, [path]);
-void expectRemoveEvent(String path) => expectEvents(ChangeType.REMOVE, [path]);
+/// Expects that the next set of event will be a change of [type] on [path].
+///
+/// Multiple calls to [expectEvent] require that the events are received in that
+/// order unless they're called in an [inAnyOrder] block.
+void expectEvent(ChangeType type, String path) {
+  var matcher = predicate((e) {
+    return e is WatchEvent && e.type == type &&
+        e.path == p.join(_sandboxDir, path);
+  }, "is $type $path");
 
-void expectRemoveEvents(Iterable<String> paths) {
-  expectEvents(ChangeType.REMOVE, paths);
+  if (_unorderedEventFuture != null) {
+    // Assign this to a local variable since it will be un-assigned by the time
+    // the scheduled callback runs.
+    var future = _unorderedEventFuture;
+
+    expect(
+        schedule(() => future, "should fire $type event on $path"),
+        completion(contains(matcher)));
+  } else {
+    var future = currentSchedule.wrapFuture(
+        _watcherEvents.elementAt(_nextEvent),
+        "waiting for $type event on $path");
+
+    expect(
+        schedule(() => future, "should fire $type event on $path"),
+        completion(matcher));
+  }
+  _nextEvent++;
 }
 
+void expectAddEvent(String path) => expectEvent(ChangeType.ADD, path);
+void expectModifyEvent(String path) => expectEvent(ChangeType.MODIFY, path);
+void expectRemoveEvent(String path) => expectEvent(ChangeType.REMOVE, path);
+
 /// Schedules writing a file in the sandbox at [path] with [contents].
 ///
 /// If [contents] is omitted, creates an empty file. If [updatedModified] is
@@ -201,6 +256,21 @@
   }, "rename file $from to $to");
 }
 
+/// Schedules creating a directory in the sandbox at [path].
+void createDir(String path) {
+  schedule(() {
+    new Directory(p.join(_sandboxDir, path)).createSync();
+  }, "create directory $path");
+}
+
+/// Schedules renaming a directory in the sandbox from [from] to [to].
+void renameDir(String from, String to) {
+  schedule(() {
+    new Directory(p.join(_sandboxDir, from))
+        .renameSync(p.join(_sandboxDir, to));
+  }, "rename directory $from to $to");
+}
+
 /// Schedules deleting a directory in the sandbox at [path].
 void deleteDir(String path) {
   schedule(() {
@@ -208,22 +278,17 @@
   }, "delete directory $path");
 }
 
-/// A [Matcher] for [WatchEvent]s.
-class _ChangeMatcher extends Matcher {
-  /// The expected change.
-  final ChangeType type;
-
-  /// The expected path.
-  final String path;
-
-  _ChangeMatcher(this.type, this.path);
-
-  Description describe(Description description) {
-    description.add("$type $path");
+/// Runs [callback] with every permutation of non-negative [i], [j], and [k]
+/// less than [limit].
+///
+/// [limit] defaults to 3.
+void withPermutations(callback(int i, int j, int k), {int limit}) {
+  if (limit == null) limit = 3;
+  for (var i = 0; i < limit; i++) {
+    for (var j = 0; j < limit; j++) {
+      for (var k = 0; k < limit; k++) {
+        callback(i, j, k);
+      }
+    }
   }
-
-  bool matches(item, Map matchState) =>
-      item is WatchEvent &&
-      item.type == type &&
-      p.normalize(item.path) == p.normalize(path);
 }