Wrap Directory.watch on linux for the watcher package.
R=rnystrom@google.com
BUG=14428
Review URL: https://codereview.chromium.org//46843003
git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart/pkg/watcher@30081 260f80e4-7a28-3924-810f-c04153c831b5
diff --git a/pkgs/watcher/lib/src/directory_watcher.dart b/pkgs/watcher/lib/src/directory_watcher.dart
index 679e227..4484c2b 100644
--- a/pkgs/watcher/lib/src/directory_watcher.dart
+++ b/pkgs/watcher/lib/src/directory_watcher.dart
@@ -7,18 +7,15 @@
import 'dart:async';
import 'dart:io';
-import 'package:crypto/crypto.dart';
-
-import 'async_queue.dart';
-import 'stat.dart';
-import 'utils.dart';
import 'watch_event.dart';
+import 'directory_watcher/linux.dart';
+import 'directory_watcher/polling.dart';
/// Watches the contents of a directory and emits [WatchEvent]s when something
/// in the directory has changed.
-class DirectoryWatcher {
+abstract class DirectoryWatcher {
/// The directory whose contents are being monitored.
- final String directory;
+ String get directory;
/// The broadcast [Stream] of events that have occurred to files in
/// [directory].
@@ -26,10 +23,12 @@
/// Changes will only be monitored while this stream has subscribers. Any
/// file changes that occur during periods when there are no subscribers
/// will not be reported the next time a subscriber is added.
- Stream<WatchEvent> get events => _events.stream;
- StreamController<WatchEvent> _events;
+ Stream<WatchEvent> get events;
- _WatchState _state = _WatchState.UNSUBSCRIBED;
+ /// Whether the watcher is initialized and watching for file changes.
+ ///
+ /// This is true if and only if [ready] is complete.
+ bool get isReady;
/// A [Future] that completes when the watcher is initialized and watching
/// for file changes.
@@ -41,241 +40,20 @@
///
/// If the watcher is already monitoring, this returns an already complete
/// future.
- Future get ready => _ready.future;
- Completer _ready = new Completer();
-
- /// The amount of time the watcher pauses between successive polls of the
- /// directory contents.
- final Duration pollingDelay;
-
- /// The previous status of the files in the directory.
- ///
- /// Used to tell which files have been modified.
- final _statuses = new Map<String, _FileStatus>();
-
- /// The subscription used while [directory] is being listed.
- ///
- /// Will be `null` if a list is not currently happening.
- StreamSubscription<FileSystemEntity> _listSubscription;
-
- /// The queue of files waiting to be processed to see if they have been
- /// modified.
- ///
- /// Processing a file is asynchronous, as is listing the directory, so the
- /// queue exists to let each of those proceed at their own rate. The lister
- /// will enqueue files as quickly as it can. Meanwhile, files are dequeued
- /// and processed sequentially.
- AsyncQueue<String> _filesToProcess;
-
- /// The set of files that have been seen in the current directory listing.
- ///
- /// Used to tell which files have been removed: files that are in [_statuses]
- /// but not in here when a poll completes have been removed.
- final _polledFiles = new Set<String>();
+ Future get ready;
/// Creates a new [DirectoryWatcher] monitoring [directory].
///
- /// If [pollingDelay] is passed, it specifies the amount of time the watcher
- /// will pause between successive polls of the directory contents. Making
- /// this shorter will give more immediate feedback at the expense of doing
- /// more IO and higher CPU usage. Defaults to one second.
- DirectoryWatcher(this.directory, {Duration pollingDelay})
- : pollingDelay = pollingDelay != null ? pollingDelay :
- new Duration(seconds: 1) {
- _events = new StreamController<WatchEvent>.broadcast(
- onListen: _watch, onCancel: _cancel);
-
- _filesToProcess = new AsyncQueue<String>(_processFile,
- onError: _events.addError);
- }
-
- /// Scans to see which files were already present before the watcher was
- /// subscribed to, and then starts watching the directory for changes.
- void _watch() {
- assert(_state == _WatchState.UNSUBSCRIBED);
- _state = _WatchState.SCANNING;
- _poll();
- }
-
- /// Stops watching the directory when there are no more subscribers.
- void _cancel() {
- assert(_state != _WatchState.UNSUBSCRIBED);
- _state = _WatchState.UNSUBSCRIBED;
-
- // If we're in the middle of listing the directory, stop.
- if (_listSubscription != null) _listSubscription.cancel();
-
- // Don't process any remaining files.
- _filesToProcess.clear();
- _polledFiles.clear();
- _statuses.clear();
-
- _ready = new Completer();
- }
-
- /// Scans the contents of the directory once to see which files have been
- /// added, removed, and modified.
- void _poll() {
- _filesToProcess.clear();
- _polledFiles.clear();
-
- endListing() {
- assert(_state != _WatchState.UNSUBSCRIBED);
- _listSubscription = null;
-
- // Null tells the queue consumer that we're done listing.
- _filesToProcess.add(null);
- }
-
- var stream = new Directory(directory).list(recursive: true);
- _listSubscription = stream.listen((entity) {
- assert(_state != _WatchState.UNSUBSCRIBED);
-
- if (entity is! File) return;
- _filesToProcess.add(entity.path);
- }, onError: (error, StackTrace stackTrace) {
- if (!isDirectoryNotFoundException(error)) {
- // It's some unknown error. Pipe it over to the event stream so the
- // user can see it.
- _events.addError(error, stackTrace);
- }
-
- // When an error occurs, we end the listing normally, which has the
- // desired effect of marking all files that were in the directory as
- // being removed.
- endListing();
- }, onDone: endListing, cancelOnError: true);
- }
-
- /// Processes [file] to determine if it has been modified since the last
- /// time it was scanned.
- Future _processFile(String file) {
- assert(_state != _WatchState.UNSUBSCRIBED);
-
- // `null` is the sentinel which means the directory listing is complete.
- if (file == null) return _completePoll();
-
- return getModificationTime(file).then((modified) {
- if (_checkForCancel()) return null;
-
- var lastStatus = _statuses[file];
-
- // If its modification time hasn't changed, assume the file is unchanged.
- if (lastStatus != null && lastStatus.modified == modified) {
- // The file is still here.
- _polledFiles.add(file);
- return null;
- }
-
- return _hashFile(file).then((hash) {
- if (_checkForCancel()) return;
-
- var status = new _FileStatus(modified, hash);
- _statuses[file] = status;
- _polledFiles.add(file);
-
- // Only notify while in the watching state.
- if (_state != _WatchState.WATCHING) return;
-
- // And the file is different.
- var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash);
- if (!changed) return;
-
- var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY;
- _events.add(new WatchEvent(type, file));
- });
- });
- }
-
- /// After the directory listing is complete, this determines which files were
- /// removed and then restarts the next poll.
- Future _completePoll() {
- // Any files that were not seen in the last poll but that we have a
- // status for must have been removed.
- var removedFiles = _statuses.keys.toSet().difference(_polledFiles);
- for (var removed in removedFiles) {
- if (_state == _WatchState.WATCHING) {
- _events.add(new WatchEvent(ChangeType.REMOVE, removed));
- }
- _statuses.remove(removed);
- }
-
- if (_state == _WatchState.SCANNING) {
- _state = _WatchState.WATCHING;
- _ready.complete();
- }
-
- // Wait and then poll again.
- return new Future.delayed(pollingDelay).then((_) {
- if (_checkForCancel()) return;
- _poll();
- });
- }
-
- /// Returns `true` and clears the processing queue if the watcher has been
- /// unsubscribed.
- bool _checkForCancel() {
- if (_state != _WatchState.UNSUBSCRIBED) return false;
-
- // Don't process any more files.
- _filesToProcess.clear();
- return true;
- }
-
- /// Calculates the SHA-1 hash of the file at [path].
- Future<List<int>> _hashFile(String path) {
- return new File(path).readAsBytes().then((bytes) {
- var sha1 = new SHA1();
- sha1.add(bytes);
- return sha1.close();
- });
- }
-
- /// Returns `true` if [a] and [b] are the same hash value, i.e. the same
- /// series of byte values.
- bool _sameHash(List<int> a, List<int> b) {
- // Hashes should always be the same size.
- assert(a.length == b.length);
-
- for (var i = 0; i < a.length; i++) {
- if (a[i] != b[i]) return false;
- }
-
- return true;
- }
-}
-
-/// Enum class for the states that the [DirectoryWatcher] can be in.
-class _WatchState {
- /// There are no subscribers to the watcher's event stream and no watching
- /// is going on.
- static const UNSUBSCRIBED = const _WatchState("unsubscribed");
-
- /// There are subscribers and the watcher is doing an initial scan of the
- /// directory to see which files were already present before watching started.
+ /// If a native directory watcher is available for this platform, this will
+ /// use it. Otherwise, it will fall back to a [PollingDirectoryWatcher].
///
- /// The watcher does not send notifications for changes that occurred while
- /// there were no subscribers, or for files already present before watching.
- /// The initial scan is used to determine what "before watching" state of
- /// the file system was.
- static const SCANNING = const _WatchState("scanning");
-
- /// There are subscribers and the watcher is polling the directory to look
- /// for changes.
- static const WATCHING = const _WatchState("watching");
-
- /// The name of the state.
- final String name;
-
- const _WatchState(this.name);
+ /// If [_pollingDelay] is passed, it specifies the amount of time the watcher
+ /// will pause between successive polls of the directory contents. Making this
+ /// shorter will give more immediate feedback at the expense of doing more IO
+ /// and higher CPU usage. Defaults to one second. Ignored for non-polling
+ /// watchers.
+ factory DirectoryWatcher(String directory, {Duration pollingDelay}) {
+ if (Platform.isLinux) return new LinuxDirectoryWatcher(directory);
+ return new PollingDirectoryWatcher(directory, pollingDelay: pollingDelay);
+ }
}
-
-class _FileStatus {
- /// The last time the file was modified.
- DateTime modified;
-
- /// The SHA-1 hash of the contents of the file.
- List<int> hash;
-
- _FileStatus(this.modified, this.hash);
-}
\ No newline at end of file
diff --git a/pkgs/watcher/lib/src/directory_watcher/linux.dart b/pkgs/watcher/lib/src/directory_watcher/linux.dart
new file mode 100644
index 0000000..9acecf1
--- /dev/null
+++ b/pkgs/watcher/lib/src/directory_watcher/linux.dart
@@ -0,0 +1,298 @@
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library watcher.directory_watcher.linux;
+
+import 'dart:async';
+import 'dart:io';
+
+import '../directory_watcher.dart';
+import '../utils.dart';
+import '../watch_event.dart';
+import 'resubscribable.dart';
+
+import 'package:stack_trace/stack_trace.dart';
+
+/// Uses the inotify subsystem to watch for filesystem events.
+///
+/// Inotify doesn't suport recursively watching subdirectories, nor does
+/// [Directory.watch] polyfill that functionality. This class polyfills it
+/// instead.
+///
+/// This class also compensates for the non-inotify-specific issues of
+/// [Directory.watch] producing multiple events for a single logical action
+/// (issue 14372) and providing insufficient information about move events
+/// (issue 14424).
+class LinuxDirectoryWatcher extends ResubscribableDirectoryWatcher {
+ LinuxDirectoryWatcher(String directory)
+ : super(directory, () => new _LinuxDirectoryWatcher(directory));
+}
+
+class _LinuxDirectoryWatcher implements ManuallyClosedDirectoryWatcher {
+ final String directory;
+
+ Stream<WatchEvent> get events => _eventsController.stream;
+ final _eventsController = new StreamController<WatchEvent>.broadcast();
+
+ bool get isReady => _readyCompleter.isCompleted;
+
+ Future get ready => _readyCompleter.future;
+ final _readyCompleter = new Completer();
+
+ /// The last known state for each entry in this directory.
+ ///
+ /// The keys in this map are the paths to the directory entries; the values
+ /// are [_EntryState]s indicating whether the entries are files or
+ /// directories.
+ final _entries = new Map<String, _EntryState>();
+
+ /// The watchers for subdirectories of [directory].
+ final _subWatchers = new Map<String, _LinuxDirectoryWatcher>();
+
+ /// A set of all subscriptions that this watcher subscribes to.
+ ///
+ /// These are gathered together so that they may all be canceled when the
+ /// watcher is closed.
+ final _subscriptions = new Set<StreamSubscription>();
+
+ _LinuxDirectoryWatcher(String directory)
+ : directory = directory {
+ // Batch the inotify changes together so that we can dedup events.
+ var innerStream = new Directory(directory).watch().transform(
+ new BatchedStreamTransformer<FileSystemEvent>());
+ _listen(innerStream, _onBatch,
+ onError: _eventsController.addError,
+ onDone: _onDone);
+
+ _listen(new Directory(directory).list(), (entity) {
+ _entries[entity.path] = new _EntryState(entity is Directory);
+ if (entity is! Directory) return;
+ _watchSubdir(entity.path);
+ }, onError: (error, stackTrace) {
+ _eventsController.addError(error, stackTrace);
+ close();
+ }, onDone: () {
+ _waitUntilReady().then((_) => _readyCompleter.complete());
+ }, cancelOnError: true);
+ }
+
+ /// Returns a [Future] that completes once all the subdirectory watchers are
+ /// fully initialized.
+ Future _waitUntilReady() {
+ return Future.wait(_subWatchers.values.map((watcher) => watcher.ready))
+ .then((_) {
+ if (_subWatchers.values.every((watcher) => watcher.isReady)) return;
+ return _waitUntilReady();
+ });
+ }
+
+ void close() {
+ for (var subscription in _subscriptions) {
+ subscription.cancel();
+ }
+ for (var watcher in _subWatchers.values) {
+ watcher.close();
+ }
+
+ _subWatchers.clear();
+ _subscriptions.clear();
+ _eventsController.close();
+ }
+
+ /// Returns all files (not directories) that this watcher knows of are
+ /// recursively in the watched directory.
+ Set<String> get _allFiles {
+ var files = new Set<String>();
+ _getAllFiles(files);
+ return files;
+ }
+
+ /// Helper function for [_allFiles].
+ ///
+ /// Adds all files that this watcher knows of to [files].
+ void _getAllFiles(Set<String> files) {
+ files.addAll(_entries.keys
+ .where((path) => _entries[path] == _EntryState.FILE).toSet());
+ for (var watcher in _subWatchers.values) {
+ watcher._getAllFiles(files);
+ }
+ }
+
+ /// Watch a subdirectory of [directory] for changes.
+ ///
+ /// If the subdirectory was added after [this] began emitting events, its
+ /// contents will be emitted as ADD events.
+ void _watchSubdir(String path) {
+ if (_subWatchers.containsKey(path)) return;
+ var watcher = new _LinuxDirectoryWatcher(path);
+ _subWatchers[path] = watcher;
+
+ // TODO(nweiz): Catch any errors here that indicate that the directory in
+ // question doesn't exist and silently stop watching it instead of
+ // propagating the errors.
+ _listen(watcher.events, (event) {
+ if (isReady) _eventsController.add(event);
+ }, onError: (error, stackTrace) {
+ _eventsController.addError(error, stackTrace);
+ _eventsController.close();
+ }, onDone: () {
+ if (_subWatchers[path] == watcher) _subWatchers.remove(path);
+
+ // It's possible that a directory was removed and recreated very quickly.
+ // If so, make sure we're still watching it.
+ if (new Directory(path).existsSync()) _watchSubdir(path);
+ });
+
+ // TODO(nweiz): Right now it's possible for the watcher to emit an event for
+ // a file before the directory list is complete. This could lead to the user
+ // seeing a MODIFY or REMOVE event for a file before they see an ADD event,
+ // which is bad. We should handle that.
+ //
+ // One possibility is to provide a general means (e.g.
+ // `DirectoryWatcher.eventsAndExistingFiles`) to tell a watcher to emit
+ // events for all the files that already exist. This would be useful for
+ // top-level clients such as barback as well, and could be implemented with
+ // a wrapper similar to how listening/canceling works now.
+
+ // If a directory is added after we're finished with the initial scan, emit
+ // an event for each entry in it. This gives the user consistently gets an
+ // event for every new file.
+ watcher.ready.then((_) {
+ if (!isReady || _eventsController.isClosed) return;
+ _listen(new Directory(path).list(recursive: true), (entry) {
+ if (entry is Directory) return;
+ _eventsController.add(new WatchEvent(ChangeType.ADD, entry.path));
+ }, onError: (error, stackTrace) {
+ // Ignore an exception caused by the dir not existing. It's fine if it
+ // was added and then quickly removed.
+ if (error is FileSystemException) return;
+
+ _eventsController.addError(error, stackTrace);
+ _eventsController.close();
+ }, cancelOnError: true);
+ });
+ }
+
+ /// The callback that's run when a batch of changes comes in.
+ void _onBatch(List<FileSystemEvent> batch) {
+ var changedEntries = new Set<String>();
+ var oldEntries = new Map.from(_entries);
+
+ // inotify event batches are ordered by occurrence, so we treat them as a
+ // log of what happened to a file.
+ for (var event in batch) {
+ // If the watched directory is deleted or moved, we'll get a deletion
+ // event for it. Ignore it; we handle closing [this] when the underlying
+ // stream is closed.
+ if (event.path == directory) continue;
+
+ changedEntries.add(event.path);
+
+ if (event is FileSystemMoveEvent) {
+ changedEntries.add(event.destination);
+ _changeEntryState(event.path, ChangeType.REMOVE, event.isDirectory);
+ _changeEntryState(event.destination, ChangeType.ADD, event.isDirectory);
+ } else {
+ _changeEntryState(event.path, _changeTypeFor(event), event.isDirectory);
+ }
+ }
+
+ for (var path in changedEntries) {
+ emitEvent(ChangeType type) {
+ if (isReady) _eventsController.add(new WatchEvent(type, path));
+ }
+
+ var oldState = oldEntries[path];
+ var newState = _entries[path];
+
+ if (oldState != _EntryState.FILE && newState == _EntryState.FILE) {
+ emitEvent(ChangeType.ADD);
+ } else if (oldState == _EntryState.FILE && newState == _EntryState.FILE) {
+ emitEvent(ChangeType.MODIFY);
+ } else if (oldState == _EntryState.FILE && newState != _EntryState.FILE) {
+ emitEvent(ChangeType.REMOVE);
+ }
+
+ if (oldState == _EntryState.DIRECTORY) {
+ var watcher = _subWatchers.remove(path);
+ if (watcher == null) return;
+ for (var path in watcher._allFiles) {
+ _eventsController.add(new WatchEvent(ChangeType.REMOVE, path));
+ }
+ watcher.close();
+ }
+
+ if (newState == _EntryState.DIRECTORY) _watchSubdir(path);
+ }
+ }
+
+ /// Changes the known state of the entry at [path] based on [change] and
+ /// [isDir].
+ void _changeEntryState(String path, ChangeType change, bool isDir) {
+ if (change == ChangeType.ADD || change == ChangeType.MODIFY) {
+ _entries[path] = new _EntryState(isDir);
+ } else {
+ assert(change == ChangeType.REMOVE);
+ _entries.remove(path);
+ }
+ }
+
+ /// Determines the [ChangeType] associated with [event].
+ ChangeType _changeTypeFor(FileSystemEvent event) {
+ if (event is FileSystemDeleteEvent) return ChangeType.REMOVE;
+ if (event is FileSystemCreateEvent) return ChangeType.ADD;
+
+ assert(event is FileSystemModifyEvent);
+ return ChangeType.MODIFY;
+ }
+
+ /// Handles the underlying event stream closing, indicating that the directory
+ /// being watched was removed.
+ void _onDone() {
+ // The parent directory often gets a close event before the subdirectories
+ // are done emitting events. We wait for them to finish before we close
+ // [events] so that we can be sure to emit a remove event for every file
+ // that used to exist.
+ Future.wait(_subWatchers.values.map((watcher) {
+ try {
+ return watcher.events.toList();
+ } on StateError catch (_) {
+ // It's possible that [watcher.events] is closed but the onDone event
+ // hasn't reached us yet. It's fine if so.
+ return new Future.value();
+ }
+ })).then((_) => close());
+ }
+
+ /// Like [Stream.listen], but automatically adds the subscription to
+ /// [_subscriptions] so that it can be canceled when [close] is called.
+ void _listen(Stream stream, void onData(event), {Function onError,
+ void onDone(), bool cancelOnError}) {
+ var subscription;
+ subscription = stream.listen(onData, onError: onError, onDone: () {
+ _subscriptions.remove(subscription);
+ if (onDone != null) onDone();
+ }, cancelOnError: cancelOnError);
+ _subscriptions.add(subscription);
+ }
+}
+
+/// An enum for the possible states of entries in a watched directory.
+class _EntryState {
+ final String _name;
+
+ /// The entry is a file.
+ static const FILE = const _EntryState._("file");
+
+ /// The entry is a directory.
+ static const DIRECTORY = const _EntryState._("directory");
+
+ const _EntryState._(this._name);
+
+ /// Returns [DIRECTORY] if [isDir] is true, and [FILE] otherwise.
+ factory _EntryState(bool isDir) =>
+ isDir ? _EntryState.DIRECTORY : _EntryState.FILE;
+
+ String toString() => _name;
+}
diff --git a/pkgs/watcher/lib/src/directory_watcher/polling.dart b/pkgs/watcher/lib/src/directory_watcher/polling.dart
new file mode 100644
index 0000000..91ca005
--- /dev/null
+++ b/pkgs/watcher/lib/src/directory_watcher/polling.dart
@@ -0,0 +1,217 @@
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library watcher.directory_watcher.polling;
+
+import 'dart:async';
+import 'dart:io';
+
+import 'package:crypto/crypto.dart';
+
+import '../async_queue.dart';
+import '../directory_watcher.dart';
+import '../stat.dart';
+import '../utils.dart';
+import '../watch_event.dart';
+import 'resubscribable.dart';
+
+/// Periodically polls a directory for changes.
+class PollingDirectoryWatcher extends ResubscribableDirectoryWatcher {
+ /// Creates a new polling watcher monitoring [directory].
+ ///
+ /// If [_pollingDelay] is passed, it specifies the amount of time the watcher
+ /// will pause between successive polls of the directory contents. Making this
+ /// shorter will give more immediate feedback at the expense of doing more IO
+ /// and higher CPU usage. Defaults to one second.
+ PollingDirectoryWatcher(String directory, {Duration pollingDelay})
+ : super(directory, () {
+ return new _PollingDirectoryWatcher(directory,
+ pollingDelay != null ? pollingDelay : new Duration(seconds: 1));
+ });
+}
+
+class _PollingDirectoryWatcher implements ManuallyClosedDirectoryWatcher {
+ final String directory;
+
+ Stream<WatchEvent> get events => _events.stream;
+ final _events = new StreamController<WatchEvent>.broadcast();
+
+ bool get isReady => _ready.isCompleted;
+
+ Future get ready => _ready.future;
+ final _ready = new Completer();
+
+ /// The amount of time the watcher pauses between successive polls of the
+ /// directory contents.
+ final Duration _pollingDelay;
+
+ /// The previous status of the files in the directory.
+ ///
+ /// Used to tell which files have been modified.
+ final _statuses = new Map<String, _FileStatus>();
+
+ /// The subscription used while [directory] is being listed.
+ ///
+ /// Will be `null` if a list is not currently happening.
+ StreamSubscription<FileSystemEntity> _listSubscription;
+
+ /// The queue of files waiting to be processed to see if they have been
+ /// modified.
+ ///
+ /// Processing a file is asynchronous, as is listing the directory, so the
+ /// queue exists to let each of those proceed at their own rate. The lister
+ /// will enqueue files as quickly as it can. Meanwhile, files are dequeued
+ /// and processed sequentially.
+ AsyncQueue<String> _filesToProcess;
+
+ /// The set of files that have been seen in the current directory listing.
+ ///
+ /// Used to tell which files have been removed: files that are in [_statuses]
+ /// but not in here when a poll completes have been removed.
+ final _polledFiles = new Set<String>();
+
+ _PollingDirectoryWatcher(this.directory, this._pollingDelay) {
+ _filesToProcess = new AsyncQueue<String>(_processFile,
+ onError: _events.addError);
+
+ _poll();
+ }
+
+ void close() {
+ _events.close();
+
+ // If we're in the middle of listing the directory, stop.
+ if (_listSubscription != null) _listSubscription.cancel();
+
+ // Don't process any remaining files.
+ _filesToProcess.clear();
+ _polledFiles.clear();
+ _statuses.clear();
+ }
+
+ /// Scans the contents of the directory once to see which files have been
+ /// added, removed, and modified.
+ void _poll() {
+ _filesToProcess.clear();
+ _polledFiles.clear();
+
+ endListing() {
+ assert(!_events.isClosed);
+ _listSubscription = null;
+
+ // Null tells the queue consumer that we're done listing.
+ _filesToProcess.add(null);
+ }
+
+ var stream = new Directory(directory).list(recursive: true);
+ _listSubscription = stream.listen((entity) {
+ assert(!_events.isClosed);
+
+ if (entity is! File) return;
+ _filesToProcess.add(entity.path);
+ }, onError: (error, stackTrace) {
+ if (!isDirectoryNotFoundException(error)) {
+ // It's some unknown error. Pipe it over to the event stream so the
+ // user can see it.
+ _events.addError(error, stackTrace);
+ }
+
+ // When an error occurs, we end the listing normally, which has the
+ // desired effect of marking all files that were in the directory as
+ // being removed.
+ endListing();
+ }, onDone: endListing, cancelOnError: true);
+ }
+
+ /// Processes [file] to determine if it has been modified since the last
+ /// time it was scanned.
+ Future _processFile(String file) {
+ // `null` is the sentinel which means the directory listing is complete.
+ if (file == null) return _completePoll();
+
+ return getModificationTime(file).then((modified) {
+ if (_events.isClosed) return null;
+
+ var lastStatus = _statuses[file];
+
+ // If its modification time hasn't changed, assume the file is unchanged.
+ if (lastStatus != null && lastStatus.modified == modified) {
+ // The file is still here.
+ _polledFiles.add(file);
+ return null;
+ }
+
+ return _hashFile(file).then((hash) {
+ if (_events.isClosed) return;
+
+ var status = new _FileStatus(modified, hash);
+ _statuses[file] = status;
+ _polledFiles.add(file);
+
+ // Only notify if we're ready to emit events.
+ if (!isReady) return;
+
+ // And the file is different.
+ var changed = lastStatus == null || !_sameHash(lastStatus.hash, hash);
+ if (!changed) return;
+
+ var type = lastStatus == null ? ChangeType.ADD : ChangeType.MODIFY;
+ _events.add(new WatchEvent(type, file));
+ });
+ });
+ }
+
+ /// After the directory listing is complete, this determines which files were
+ /// removed and then restarts the next poll.
+ Future _completePoll() {
+ // Any files that were not seen in the last poll but that we have a
+ // status for must have been removed.
+ var removedFiles = _statuses.keys.toSet().difference(_polledFiles);
+ for (var removed in removedFiles) {
+ if (isReady) _events.add(new WatchEvent(ChangeType.REMOVE, removed));
+ _statuses.remove(removed);
+ }
+
+ if (!isReady) _ready.complete();
+
+ // Wait and then poll again.
+ return new Future.delayed(_pollingDelay).then((_) {
+ if (_events.isClosed) return;
+ _poll();
+ });
+ }
+
+ /// Calculates the SHA-1 hash of the file at [path].
+ Future<List<int>> _hashFile(String path) {
+ return new File(path).readAsBytes().then((bytes) {
+ var sha1 = new SHA1();
+ sha1.add(bytes);
+ return sha1.close();
+ });
+ }
+
+ /// Returns `true` if [a] and [b] are the same hash value, i.e. the same
+ /// series of byte values.
+ bool _sameHash(List<int> a, List<int> b) {
+ // Hashes should always be the same size.
+ assert(a.length == b.length);
+
+ for (var i = 0; i < a.length; i++) {
+ if (a[i] != b[i]) return false;
+ }
+
+ return true;
+ }
+}
+
+class _FileStatus {
+ /// The last time the file was modified.
+ DateTime modified;
+
+ /// The SHA-1 hash of the contents of the file.
+ List<int> hash;
+
+ _FileStatus(this.modified, this.hash);
+}
+
diff --git a/pkgs/watcher/lib/src/directory_watcher/resubscribable.dart b/pkgs/watcher/lib/src/directory_watcher/resubscribable.dart
new file mode 100644
index 0000000..daa813a
--- /dev/null
+++ b/pkgs/watcher/lib/src/directory_watcher/resubscribable.dart
@@ -0,0 +1,79 @@
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library watcher.directory_watcher.resubscribable;
+
+import 'dart:async';
+import 'dart:io';
+
+import '../directory_watcher.dart';
+import '../utils.dart';
+import '../watch_event.dart';
+
+typedef ManuallyClosedDirectoryWatcher WatcherFactory();
+
+/// A wrapper for [ManuallyClosedDirectoryWatcher] that encapsulates support for
+/// closing the watcher when it has no subscribers and re-opening it when it's
+/// re-subscribed.
+///
+/// It's simpler to implement watchers without worrying about this behavior.
+/// This class wraps a watcher class which can be written with the simplifying
+/// assumption that it can continue emitting events until an explicit `close`
+/// method is called, at which point it will cease emitting events entirely. The
+/// [ManuallyClosedDirectoryWatcher] interface is used for these watchers.
+///
+/// This would be more cleanly implemented as a function that takes a class and
+/// emits a new class, but Dart doesn't support that sort of thing. Instead it
+/// takes a factory function that produces instances of the inner class.
+abstract class ResubscribableDirectoryWatcher implements DirectoryWatcher {
+ /// The factory function that produces instances of the inner class.
+ final WatcherFactory _factory;
+
+ final String directory;
+
+ Stream<WatchEvent> get events => _eventsController.stream;
+ StreamController<WatchEvent> _eventsController;
+
+ bool get isReady => _readyCompleter.isCompleted;
+
+ Future get ready => _readyCompleter.future;
+ var _readyCompleter = new Completer();
+
+ /// Creates a new [ResubscribableDirectoryWatcher] wrapping the watchers
+ /// emitted by [_factory].
+ ResubscribableDirectoryWatcher(this.directory, this._factory) {
+ var watcher;
+ var subscription;
+
+ _eventsController = new StreamController<WatchEvent>.broadcast(
+ onListen: () {
+ watcher = _factory();
+ subscription = watcher.events.listen(_eventsController.add,
+ onError: _eventsController.addError,
+ onDone: _eventsController.close);
+
+ // It's important that we complete the value of [_readyCompleter] at the
+ // time [onListen] is called, as opposed to the value when [watcher.ready]
+ // fires. A new completer may be created by that time.
+ watcher.ready.then(_readyCompleter.complete);
+ }, onCancel: () {
+ // Cancel the subscription before closing the watcher so that the
+ // watcher's `onDone` event doesn't close [events].
+ subscription.cancel();
+ watcher.close();
+ _readyCompleter = new Completer();
+ }, sync: true);
+ }
+}
+
+/// An interface for watchers with an explicit, manual [close] method.
+///
+/// See [ResubscribableDirectoryWatcher].
+abstract class ManuallyClosedDirectoryWatcher implements DirectoryWatcher {
+ /// Closes the watcher.
+ ///
+ /// Subclasses should close their [events] stream and release any internal
+ /// resources.
+ void close();
+}
diff --git a/pkgs/watcher/lib/src/utils.dart b/pkgs/watcher/lib/src/utils.dart
index 3d00c08..e4e4457 100644
--- a/pkgs/watcher/lib/src/utils.dart
+++ b/pkgs/watcher/lib/src/utils.dart
@@ -4,7 +4,9 @@
library watcher.utils;
+import 'dart:async';
import 'dart:io';
+import 'dart:collection';
/// Returns `true` if [error] is a [FileSystemException] for a missing
/// directory.
@@ -15,3 +17,57 @@
var notFoundCode = Platform.operatingSystem == "windows" ? 3 : 2;
return error.osError.errorCode == notFoundCode;
}
+
+/// Returns a buffered stream that will emit the same values as the stream
+/// returned by [future] once [future] completes.
+///
+/// If [future] completes to an error, the return value will emit that error and
+/// then close.
+Stream futureStream(Future<Stream> future) {
+ var controller = new StreamController(sync: true);
+ future.then((stream) {
+ stream.listen(
+ controller.add,
+ onError: controller.addError,
+ onDone: controller.close);
+ }).catchError((e, stackTrace) {
+ controller.addError(e, stackTrace);
+ controller.close();
+ });
+ return controller.stream;
+}
+
+/// Like [new Future], but avoids around issue 11911 by using [new Future.value]
+/// under the covers.
+Future newFuture(callback()) => new Future.value().then((_) => callback());
+
+/// A stream transformer that batches all events that are sent at the same time.
+///
+/// When multiple events are synchronously added to a stream controller, the
+/// [StreamController] implementation uses [scheduleMicrotask] to schedule the
+/// asynchronous firing of each event. In order to recreate the synchronous
+/// batches, this collates all the events that are received in "nearby"
+/// microtasks.
+class BatchedStreamTransformer<T> implements StreamTransformer<T, List<T>> {
+ Stream<List<T>> bind(Stream<T> input) {
+ var batch = new Queue();
+ return new StreamTransformer<T, List<T>>.fromHandlers(
+ handleData: (event, sink) {
+ batch.add(event);
+
+ // [Timer.run] schedules an event that runs after any microtasks that have
+ // been scheduled.
+ Timer.run(() {
+ if (batch.isEmpty) return;
+ sink.add(batch.toList());
+ batch.clear();
+ });
+ }, handleDone: (sink) {
+ if (batch.isNotEmpty) {
+ sink.add(batch.toList());
+ batch.clear();
+ }
+ sink.close();
+ }).bind(input);
+ }
+}
diff --git a/pkgs/watcher/lib/watcher.dart b/pkgs/watcher/lib/watcher.dart
index c4824b8..88531f2 100644
--- a/pkgs/watcher/lib/watcher.dart
+++ b/pkgs/watcher/lib/watcher.dart
@@ -6,3 +6,4 @@
export 'src/watch_event.dart';
export 'src/directory_watcher.dart';
+export 'src/directory_watcher/polling.dart';
diff --git a/pkgs/watcher/test/directory_watcher/linux_test.dart b/pkgs/watcher/test/directory_watcher/linux_test.dart
new file mode 100644
index 0000000..ba69569
--- /dev/null
+++ b/pkgs/watcher/test/directory_watcher/linux_test.dart
@@ -0,0 +1,62 @@
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:scheduled_test/scheduled_test.dart';
+import 'package:watcher/src/directory_watcher/linux.dart';
+import 'package:watcher/watcher.dart';
+
+import 'shared.dart';
+import '../utils.dart';
+
+main() {
+ initConfig();
+
+ watcherFactory = (dir) => new LinuxDirectoryWatcher(dir);
+
+ setUp(() {
+ // Increase the timeout because closing a [Directory.watch] stream blocks
+ // the main isolate for a very long time on Goobuntu, as of kernel
+ // 3.2.5-gg1336 (see issue 14606).
+ currentSchedule.timeout *= 3;
+
+ createSandbox();
+ });
+
+ sharedTests();
+
+ test('DirectoryWatcher creates a LinuxDirectoryWatcher on Linux', () {
+ expect(new DirectoryWatcher('.'),
+ new isInstanceOf<LinuxDirectoryWatcher>());
+ });
+
+ test('notifies even if the file contents are unchanged', () {
+ writeFile("a.txt", contents: "same");
+ writeFile("b.txt", contents: "before");
+ startWatcher();
+ writeFile("a.txt", contents: "same");
+ writeFile("b.txt", contents: "after");
+ expectModifyEvent("a.txt");
+ expectModifyEvent("b.txt");
+ });
+
+ test('emits events for many nested files moved out then immediately back in',
+ () {
+ withPermutations((i, j, k) =>
+ writeFile("dir/sub/sub-$i/sub-$j/file-$k.txt"));
+ startWatcher(dir: "dir");
+
+ renameDir("dir/sub", "sub");
+ renameDir("sub", "dir/sub");
+
+ inAnyOrder(() {
+ withPermutations((i, j, k) =>
+ expectRemoveEvent("dir/sub/sub-$i/sub-$j/file-$k.txt"));
+ });
+
+ inAnyOrder(() {
+ withPermutations((i, j, k) =>
+ expectAddEvent("dir/sub/sub-$i/sub-$j/file-$k.txt"));
+ });
+ });
+}
diff --git a/pkgs/watcher/test/directory_watcher/polling_test.dart b/pkgs/watcher/test/directory_watcher/polling_test.dart
new file mode 100644
index 0000000..02ed5d2
--- /dev/null
+++ b/pkgs/watcher/test/directory_watcher/polling_test.dart
@@ -0,0 +1,39 @@
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:scheduled_test/scheduled_test.dart';
+import 'package:watcher/watcher.dart';
+
+import 'shared.dart';
+import '../utils.dart';
+
+main() {
+ initConfig();
+
+ // Use a short delay to make the tests run quickly.
+ watcherFactory = (dir) => new PollingDirectoryWatcher(dir,
+ pollingDelay: new Duration(milliseconds: 100));
+
+ setUp(createSandbox);
+
+ sharedTests();
+
+ test('does not notify if the file contents are unchanged', () {
+ writeFile("a.txt", contents: "same");
+ writeFile("b.txt", contents: "before");
+ startWatcher();
+ writeFile("a.txt", contents: "same");
+ writeFile("b.txt", contents: "after");
+ expectModifyEvent("b.txt");
+ });
+
+ test('does not notify if the modification time did not change', () {
+ writeFile("a.txt", contents: "before");
+ writeFile("b.txt", contents: "before");
+ startWatcher();
+ writeFile("a.txt", contents: "after", updateModified: false);
+ writeFile("b.txt", contents: "after");
+ expectModifyEvent("b.txt");
+ });
+}
diff --git a/pkgs/watcher/test/directory_watcher/shared.dart b/pkgs/watcher/test/directory_watcher/shared.dart
new file mode 100644
index 0000000..eed6069
--- /dev/null
+++ b/pkgs/watcher/test/directory_watcher/shared.dart
@@ -0,0 +1,222 @@
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:scheduled_test/scheduled_test.dart';
+
+import '../utils.dart';
+
+sharedTests() {
+ test('does not notify for files that already exist when started', () {
+ // Make some pre-existing files.
+ writeFile("a.txt");
+ writeFile("b.txt");
+
+ startWatcher();
+
+ // Change one after the watcher is running.
+ writeFile("b.txt", contents: "modified");
+
+ // We should get a modify event for the changed file, but no add events
+ // for them before this.
+ expectModifyEvent("b.txt");
+ });
+
+ test('notifies when a file is added', () {
+ startWatcher();
+ writeFile("file.txt");
+ expectAddEvent("file.txt");
+ });
+
+ test('notifies when a file is modified', () {
+ writeFile("file.txt");
+ startWatcher();
+ writeFile("file.txt", contents: "modified");
+ expectModifyEvent("file.txt");
+ });
+
+ test('notifies when a file is removed', () {
+ writeFile("file.txt");
+ startWatcher();
+ deleteFile("file.txt");
+ expectRemoveEvent("file.txt");
+ });
+
+ test('notifies when a file is modified multiple times', () {
+ writeFile("file.txt");
+ startWatcher();
+ writeFile("file.txt", contents: "modified");
+ expectModifyEvent("file.txt");
+ writeFile("file.txt", contents: "modified again");
+ expectModifyEvent("file.txt");
+ });
+
+ test('when the watched directory is deleted, removes all files', () {
+ writeFile("dir/a.txt");
+ writeFile("dir/b.txt");
+
+ startWatcher(dir: "dir");
+
+ deleteDir("dir");
+ inAnyOrder(() {
+ expectRemoveEvent("dir/a.txt");
+ expectRemoveEvent("dir/b.txt");
+ });
+ });
+
+ group("moves", () {
+ test('notifies when a file is moved within the watched directory', () {
+ writeFile("old.txt");
+ startWatcher();
+ renameFile("old.txt", "new.txt");
+
+ inAnyOrder(() {
+ expectAddEvent("new.txt");
+ expectRemoveEvent("old.txt");
+ });
+ });
+
+ test('notifies when a file is moved from outside the watched directory',
+ () {
+ writeFile("old.txt");
+ createDir("dir");
+ startWatcher(dir: "dir");
+
+ renameFile("old.txt", "dir/new.txt");
+ expectAddEvent("dir/new.txt");
+ });
+
+ test('notifies when a file is moved outside the watched directory', () {
+ writeFile("dir/old.txt");
+ startWatcher(dir: "dir");
+
+ renameFile("dir/old.txt", "new.txt");
+ expectRemoveEvent("dir/old.txt");
+ });
+ });
+
+ group("clustered changes", () {
+ test("doesn't notify when a file is created and then immediately removed",
+ () {
+ startWatcher();
+ writeFile("file.txt");
+ deleteFile("file.txt");
+
+ // [startWatcher] will assert that no events were fired.
+ });
+
+ test("reports a modification when a file is deleted and then immediately "
+ "recreated", () {
+ writeFile("file.txt");
+ startWatcher();
+
+ deleteFile("file.txt");
+ writeFile("file.txt", contents: "re-created");
+ expectModifyEvent("file.txt");
+ });
+
+ test("reports a modification when a file is moved and then immediately "
+ "recreated", () {
+ writeFile("old.txt");
+ startWatcher();
+
+ renameFile("old.txt", "new.txt");
+ writeFile("old.txt", contents: "re-created");
+ inAnyOrder(() {
+ expectModifyEvent("old.txt");
+ expectAddEvent("new.txt");
+ });
+ });
+
+ test("reports a removal when a file is modified and then immediately "
+ "removed", () {
+ writeFile("file.txt");
+ startWatcher();
+
+ writeFile("file.txt", contents: "modified");
+ deleteFile("file.txt");
+ expectRemoveEvent("file.txt");
+ });
+
+ test("reports an add when a file is added and then immediately modified",
+ () {
+ startWatcher();
+
+ writeFile("file.txt");
+ writeFile("file.txt", contents: "modified");
+ expectAddEvent("file.txt");
+ });
+ });
+
+ group("subdirectories", () {
+ test('watches files in subdirectories', () {
+ startWatcher();
+ writeFile("a/b/c/d/file.txt");
+ expectAddEvent("a/b/c/d/file.txt");
+ });
+
+ test('notifies when a subdirectory is moved within the watched directory '
+ 'and then its contents are modified', () {
+ writeFile("old/file.txt");
+ startWatcher();
+
+ renameDir("old", "new");
+ inAnyOrder(() {
+ expectRemoveEvent("old/file.txt");
+ expectAddEvent("new/file.txt");
+ });
+
+ writeFile("new/file.txt", contents: "modified");
+ expectModifyEvent("new/file.txt");
+ });
+
+ test('emits events for many nested files added at once', () {
+ withPermutations((i, j, k) =>
+ writeFile("sub/sub-$i/sub-$j/file-$k.txt"));
+
+ createDir("dir");
+ startWatcher(dir: "dir");
+ renameDir("sub", "dir/sub");
+
+ inAnyOrder(() {
+ withPermutations((i, j, k) =>
+ expectAddEvent("dir/sub/sub-$i/sub-$j/file-$k.txt"));
+ });
+ });
+
+ test('emits events for many nested files removed at once', () {
+ withPermutations((i, j, k) =>
+ writeFile("dir/sub/sub-$i/sub-$j/file-$k.txt"));
+
+ createDir("dir");
+ startWatcher(dir: "dir");
+
+ // Rename the directory rather than deleting it because native watchers
+ // report a rename as a single DELETE event for the directory, whereas
+ // they report recursive deletion with DELETE events for every file in the
+ // directory.
+ renameDir("dir/sub", "sub");
+
+ inAnyOrder(() {
+ withPermutations((i, j, k) =>
+ expectRemoveEvent("dir/sub/sub-$i/sub-$j/file-$k.txt"));
+ });
+ });
+
+ test('emits events for many nested files moved at once', () {
+ withPermutations((i, j, k) =>
+ writeFile("dir/old/sub-$i/sub-$j/file-$k.txt"));
+
+ createDir("dir");
+ startWatcher(dir: "dir");
+ renameDir("dir/old", "dir/new");
+
+ inAnyOrder(() {
+ withPermutations((i, j, k) {
+ expectRemoveEvent("dir/old/sub-$i/sub-$j/file-$k.txt");
+ expectAddEvent("dir/new/sub-$i/sub-$j/file-$k.txt");
+ });
+ });
+ });
+ });
+}
diff --git a/pkgs/watcher/test/directory_watcher_test.dart b/pkgs/watcher/test/directory_watcher_test.dart
deleted file mode 100644
index 841dd08..0000000
--- a/pkgs/watcher/test/directory_watcher_test.dart
+++ /dev/null
@@ -1,108 +0,0 @@
-// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-import 'package:scheduled_test/scheduled_test.dart';
-
-import 'utils.dart';
-
-main() {
- initConfig();
-
- setUp(createSandbox);
-
- test('does not notify for files that already exist when started', () {
- // Make some pre-existing files.
- writeFile("a.txt");
- writeFile("b.txt");
-
- createWatcher();
-
- // Change one after the watcher is running.
- writeFile("b.txt", contents: "modified");
-
- // We should get a modify event for the changed file, but no add events
- // for them before this.
- expectModifyEvent("b.txt");
- });
-
- test('notifies when a file is added', () {
- createWatcher();
- writeFile("file.txt");
- expectAddEvent("file.txt");
- });
-
- test('notifies when a file is modified', () {
- writeFile("file.txt");
- createWatcher();
- writeFile("file.txt", contents: "modified");
- expectModifyEvent("file.txt");
- });
-
- test('notifies when a file is removed', () {
- writeFile("file.txt");
- createWatcher();
- deleteFile("file.txt");
- expectRemoveEvent("file.txt");
- });
-
- test('notifies when a file is moved', () {
- writeFile("old.txt");
- createWatcher();
- renameFile("old.txt", "new.txt");
- expectAddEvent("new.txt");
- expectRemoveEvent("old.txt");
- });
-
- test('notifies when a file is modified multiple times', () {
- writeFile("file.txt");
- createWatcher();
- writeFile("file.txt", contents: "modified");
- expectModifyEvent("file.txt");
- writeFile("file.txt", contents: "modified again");
- expectModifyEvent("file.txt");
- });
-
- test('does not notify if the file contents are unchanged', () {
- writeFile("a.txt", contents: "same");
- writeFile("b.txt", contents: "before");
- createWatcher();
- writeFile("a.txt", contents: "same");
- writeFile("b.txt", contents: "after");
- expectModifyEvent("b.txt");
- });
-
- test('does not notify if the modification time did not change', () {
- writeFile("a.txt", contents: "before");
- writeFile("b.txt", contents: "before");
- createWatcher();
- writeFile("a.txt", contents: "after", updateModified: false);
- writeFile("b.txt", contents: "after");
- expectModifyEvent("b.txt");
- });
-
- test('watches files in subdirectories', () {
- createWatcher();
- writeFile("a/b/c/d/file.txt");
- expectAddEvent("a/b/c/d/file.txt");
- });
-
- test('watches a directory created after the watcher', () {
- // Watch a subdirectory that doesn't exist yet.
- createWatcher(dir: "a");
-
- // This implicity creates it.
- writeFile("a/b/c/d/file.txt");
- expectAddEvent("a/b/c/d/file.txt");
- });
-
- test('when the watched directory is deleted, removes all files', () {
- writeFile("dir/a.txt");
- writeFile("dir/b.txt");
-
- createWatcher(dir: "dir");
-
- deleteDir("dir");
- expectRemoveEvents(["dir/a.txt", "dir/b.txt"]);
- });
-}
diff --git a/pkgs/watcher/test/no_subscription/linux_test.dart b/pkgs/watcher/test/no_subscription/linux_test.dart
new file mode 100644
index 0000000..7978830
--- /dev/null
+++ b/pkgs/watcher/test/no_subscription/linux_test.dart
@@ -0,0 +1,20 @@
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:scheduled_test/scheduled_test.dart';
+import 'package:watcher/src/directory_watcher/linux.dart';
+import 'package:watcher/watcher.dart';
+
+import 'shared.dart';
+import '../utils.dart';
+
+main() {
+ initConfig();
+
+ watcherFactory = (dir) => new LinuxDirectoryWatcher(dir);
+
+ setUp(createSandbox);
+
+ sharedTests();
+}
\ No newline at end of file
diff --git a/pkgs/watcher/test/no_subscription/polling_test.dart b/pkgs/watcher/test/no_subscription/polling_test.dart
new file mode 100644
index 0000000..fa4f0cb
--- /dev/null
+++ b/pkgs/watcher/test/no_subscription/polling_test.dart
@@ -0,0 +1,19 @@
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:scheduled_test/scheduled_test.dart';
+import 'package:watcher/watcher.dart';
+
+import 'shared.dart';
+import '../utils.dart';
+
+main() {
+ initConfig();
+
+ watcherFactory = (dir) => new PollingDirectoryWatcher(dir);
+
+ setUp(createSandbox);
+
+ sharedTests();
+}
\ No newline at end of file
diff --git a/pkgs/watcher/test/no_subscription_test.dart b/pkgs/watcher/test/no_subscription/shared.dart
similarity index 73%
rename from pkgs/watcher/test/no_subscription_test.dart
rename to pkgs/watcher/test/no_subscription/shared.dart
index 2e7b6d3..cd279e1 100644
--- a/pkgs/watcher/test/no_subscription_test.dart
+++ b/pkgs/watcher/test/no_subscription/shared.dart
@@ -7,14 +7,10 @@
import 'package:scheduled_test/scheduled_test.dart';
import 'package:watcher/watcher.dart';
-import 'utils.dart';
+import '../utils.dart';
-main() {
- initConfig();
-
- setUp(createSandbox);
-
- test('does not notify for changes when there were no subscribers', () {
+sharedTests() {
+ test('does not notify for changes when there are no subscribers', () {
// Note that this test doesn't rely as heavily on the test functions in
// utils.dart because it needs to be very explicit about when the event
// stream is and is not subscribed.
@@ -51,16 +47,10 @@
expect(event.path, endsWith("added.txt"));
completer.complete();
}));
- });
- // The watcher will have been cancelled and then resumed in the middle of
- // its pause between polling loops. That means the second scan to skip
- // what changed while we were unsubscribed won't happen until after that
- // delay is done. Wait long enough for that to happen.
- //
- // We're doing * 4 here because that seems to give the slower bots enough
- // time for this to complete.
- schedule(() => new Future.delayed(watcher.pollingDelay * 4));
+ // Wait until the watcher is ready to dispatch events again.
+ return watcher.ready;
+ });
// And add a third file.
writeFile("added.txt");
diff --git a/pkgs/watcher/test/ready/linux_test.dart b/pkgs/watcher/test/ready/linux_test.dart
new file mode 100644
index 0000000..7978830
--- /dev/null
+++ b/pkgs/watcher/test/ready/linux_test.dart
@@ -0,0 +1,20 @@
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:scheduled_test/scheduled_test.dart';
+import 'package:watcher/src/directory_watcher/linux.dart';
+import 'package:watcher/watcher.dart';
+
+import 'shared.dart';
+import '../utils.dart';
+
+main() {
+ initConfig();
+
+ watcherFactory = (dir) => new LinuxDirectoryWatcher(dir);
+
+ setUp(createSandbox);
+
+ sharedTests();
+}
\ No newline at end of file
diff --git a/pkgs/watcher/test/ready/polling_test.dart b/pkgs/watcher/test/ready/polling_test.dart
new file mode 100644
index 0000000..fa4f0cb
--- /dev/null
+++ b/pkgs/watcher/test/ready/polling_test.dart
@@ -0,0 +1,19 @@
+// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:scheduled_test/scheduled_test.dart';
+import 'package:watcher/watcher.dart';
+
+import 'shared.dart';
+import '../utils.dart';
+
+main() {
+ initConfig();
+
+ watcherFactory = (dir) => new PollingDirectoryWatcher(dir);
+
+ setUp(createSandbox);
+
+ sharedTests();
+}
\ No newline at end of file
diff --git a/pkgs/watcher/test/ready_test.dart b/pkgs/watcher/test/ready/shared.dart
similarity index 95%
rename from pkgs/watcher/test/ready_test.dart
rename to pkgs/watcher/test/ready/shared.dart
index 11b77e0..af1b58f 100644
--- a/pkgs/watcher/test/ready_test.dart
+++ b/pkgs/watcher/test/ready/shared.dart
@@ -2,17 +2,11 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-import 'dart:async';
-
import 'package:scheduled_test/scheduled_test.dart';
-import 'utils.dart';
+import '../utils.dart';
-main() {
- initConfig();
-
- setUp(createSandbox);
-
+sharedTests() {
test('ready does not complete until after subscription', () {
var watcher = createWatcher(waitForReady: false);
diff --git a/pkgs/watcher/test/utils.dart b/pkgs/watcher/test/utils.dart
index 28d57b0..d1b9f94 100644
--- a/pkgs/watcher/test/utils.dart
+++ b/pkgs/watcher/test/utils.dart
@@ -5,6 +5,7 @@
library watcher.test.utils;
import 'dart:async';
+import 'dart:collection';
import 'dart:io';
import 'package:path/path.dart' as p;
@@ -12,6 +13,7 @@
import 'package:unittest/compact_vm_config.dart';
import 'package:watcher/watcher.dart';
import 'package:watcher/src/stat.dart';
+import 'package:watcher/src/utils.dart';
/// The path to the temporary sandbox created for each test. All file
/// operations are implicitly relative to this directory.
@@ -36,6 +38,14 @@
/// increment the mod time for that file instantly.
Map<String, int> _mockFileModificationTimes;
+typedef DirectoryWatcher WatcherFactory(String directory);
+
+/// Sets the function used to create the directory watcher.
+set watcherFactory(WatcherFactory factory) {
+ _watcherFactory = factory;
+}
+WatcherFactory _watcherFactory;
+
void initConfig() {
useCompactVMConfiguration();
filterStacks = true;
@@ -54,10 +64,10 @@
path = p.normalize(p.relative(path, from: _sandboxDir));
// Make sure we got a path in the sandbox.
- assert(p.isRelative(path) && !path.startsWith(".."));
+ assert(p.isRelative(path) && !path.startsWith(".."));
- return new DateTime.fromMillisecondsSinceEpoch(
- _mockFileModificationTimes[path]);
+ var mtime = _mockFileModificationTimes[path];
+ return new DateTime.fromMillisecondsSinceEpoch(mtime == null ? 0 : mtime);
});
// Delete the sandbox when done.
@@ -86,68 +96,113 @@
dir = p.join(_sandboxDir, dir);
}
- // Use a short delay to make the tests run quickly.
- _watcher = new DirectoryWatcher(dir,
- pollingDelay: new Duration(milliseconds: 100));
+ var watcher = _watcherFactory(dir);
// Wait until the scan is finished so that we don't miss changes to files
// that could occur before the scan completes.
if (waitForReady != false) {
- schedule(() => _watcher.ready, "wait for watcher to be ready");
+ schedule(() => watcher.ready, "wait for watcher to be ready");
}
- currentSchedule.onComplete.schedule(() {
- _nextEvent = 0;
- _watcher = null;
- }, "reset watcher");
-
- return _watcher;
+ return watcher;
}
-/// Expects that the next set of events will all be changes of [type] on
-/// [paths].
+/// The stream of events from the watcher started with [startWatcher].
+Stream _watcherEvents;
+
+/// Creates a new [DirectoryWatcher] that watches a temporary directory and
+/// starts monitoring it for events.
///
-/// Validates that events are delivered for all paths in [paths], but allows
-/// them in any order.
-void expectEvents(ChangeType type, Iterable<String> paths) {
- var pathSet = paths
- .map((path) => p.join(_sandboxDir, path))
- .map(p.normalize)
- .toSet();
+/// If [dir] is provided, watches a subdirectory in the sandbox with that name.
+void startWatcher({String dir}) {
+ // We want to wait until we're ready *after* we subscribe to the watcher's
+ // events.
+ _watcher = createWatcher(dir: dir, waitForReady: false);
- // Create an expectation for as many paths as we have.
- var futures = [];
+ // Schedule [_watcher.events.listen] so that the watcher doesn't start
+ // watching [dir] before it exists. Expose [_watcherEvents] immediately so
+ // that it can be accessed synchronously after this.
+ _watcherEvents = futureStream(schedule(() {
+ var allEvents = new Queue();
+ var subscription = _watcher.events.listen(allEvents.add,
+ onError: currentSchedule.signalError);
- for (var i = 0; i < paths.length; i++) {
- // Immediately create the futures. This ensures we don't register too
- // late and drop the event before we receive it.
- var future = _watcher.events.elementAt(_nextEvent++).then((event) {
- expect(event.type, equals(type));
- expect(pathSet, contains(event.path));
+ currentSchedule.onComplete.schedule(() {
+ var numEvents = _nextEvent;
+ subscription.cancel();
+ _nextEvent = 0;
+ _watcher = null;
- pathSet.remove(event.path);
- });
+ // If there are already errors, don't add this to the output and make
+ // people think it might be the root cause.
+ if (currentSchedule.errors.isEmpty) {
+ expect(allEvents, hasLength(numEvents));
+ }
+ }, "reset watcher");
- // Make sure the schedule is watching it in case it fails.
- currentSchedule.wrapFuture(future);
+ return _watcher.events;
+ }, "create watcher")).asBroadcastStream();
- futures.add(future);
+ schedule(() => _watcher.ready, "wait for watcher to be ready");
+}
+
+/// A future set by [inAnyOrder] that will complete to the set of events that
+/// occur in the [inAnyOrder] block.
+Future<Set<WatchEvent>> _unorderedEventFuture;
+
+/// Runs [block] and allows multiple [expectEvent] calls in that block to match
+/// events in any order.
+void inAnyOrder(block()) {
+ var oldFuture = _unorderedEventFuture;
+ try {
+ var firstEvent = _nextEvent;
+ var completer = new Completer();
+ _unorderedEventFuture = completer.future;
+ block();
+
+ _watcherEvents.skip(firstEvent).take(_nextEvent - firstEvent).toSet()
+ .then(completer.complete, onError: completer.completeError);
+ currentSchedule.wrapFuture(_unorderedEventFuture,
+ "waiting for ${_nextEvent - firstEvent} events");
+ } finally {
+ _unorderedEventFuture = oldFuture;
}
-
- // Schedule it so that later file modifications don't occur until after this
- // event is received.
- schedule(() => Future.wait(futures),
- "wait for $type events on ${paths.join(', ')}");
}
-void expectAddEvent(String path) => expectEvents(ChangeType.ADD, [path]);
-void expectModifyEvent(String path) => expectEvents(ChangeType.MODIFY, [path]);
-void expectRemoveEvent(String path) => expectEvents(ChangeType.REMOVE, [path]);
+/// Expects that the next set of event will be a change of [type] on [path].
+///
+/// Multiple calls to [expectEvent] require that the events are received in that
+/// order unless they're called in an [inAnyOrder] block.
+void expectEvent(ChangeType type, String path) {
+ var matcher = predicate((e) {
+ return e is WatchEvent && e.type == type &&
+ e.path == p.join(_sandboxDir, path);
+ }, "is $type $path");
-void expectRemoveEvents(Iterable<String> paths) {
- expectEvents(ChangeType.REMOVE, paths);
+ if (_unorderedEventFuture != null) {
+ // Assign this to a local variable since it will be un-assigned by the time
+ // the scheduled callback runs.
+ var future = _unorderedEventFuture;
+
+ expect(
+ schedule(() => future, "should fire $type event on $path"),
+ completion(contains(matcher)));
+ } else {
+ var future = currentSchedule.wrapFuture(
+ _watcherEvents.elementAt(_nextEvent),
+ "waiting for $type event on $path");
+
+ expect(
+ schedule(() => future, "should fire $type event on $path"),
+ completion(matcher));
+ }
+ _nextEvent++;
}
+void expectAddEvent(String path) => expectEvent(ChangeType.ADD, path);
+void expectModifyEvent(String path) => expectEvent(ChangeType.MODIFY, path);
+void expectRemoveEvent(String path) => expectEvent(ChangeType.REMOVE, path);
+
/// Schedules writing a file in the sandbox at [path] with [contents].
///
/// If [contents] is omitted, creates an empty file. If [updatedModified] is
@@ -201,6 +256,21 @@
}, "rename file $from to $to");
}
+/// Schedules creating a directory in the sandbox at [path].
+void createDir(String path) {
+ schedule(() {
+ new Directory(p.join(_sandboxDir, path)).createSync();
+ }, "create directory $path");
+}
+
+/// Schedules renaming a directory in the sandbox from [from] to [to].
+void renameDir(String from, String to) {
+ schedule(() {
+ new Directory(p.join(_sandboxDir, from))
+ .renameSync(p.join(_sandboxDir, to));
+ }, "rename directory $from to $to");
+}
+
/// Schedules deleting a directory in the sandbox at [path].
void deleteDir(String path) {
schedule(() {
@@ -208,22 +278,17 @@
}, "delete directory $path");
}
-/// A [Matcher] for [WatchEvent]s.
-class _ChangeMatcher extends Matcher {
- /// The expected change.
- final ChangeType type;
-
- /// The expected path.
- final String path;
-
- _ChangeMatcher(this.type, this.path);
-
- Description describe(Description description) {
- description.add("$type $path");
+/// Runs [callback] with every permutation of non-negative [i], [j], and [k]
+/// less than [limit].
+///
+/// [limit] defaults to 3.
+void withPermutations(callback(int i, int j, int k), {int limit}) {
+ if (limit == null) limit = 3;
+ for (var i = 0; i < limit; i++) {
+ for (var j = 0; j < limit; j++) {
+ for (var k = 0; k < limit; k++) {
+ callback(i, j, k);
+ }
+ }
}
-
- bool matches(item, Map matchState) =>
- item is WatchEvent &&
- item.type == type &&
- p.normalize(item.path) == p.normalize(path);
}