Merge branch 'master' into race-condition
diff --git a/lib/src/async_queue.dart b/lib/src/async_queue.dart
index d864b1b..9f8bedf 100644
--- a/lib/src/async_queue.dart
+++ b/lib/src/async_queue.dart
@@ -5,7 +5,7 @@
 import 'dart:async';
 import 'dart:collection';
 
-typedef ItemProcessor<T> = Future Function(T item);
+typedef ItemProcessor<T> = Future<void> Function(T item);
 
 /// A queue of items that are sequentially, asynchronously processed.
 ///
@@ -57,15 +57,14 @@
   ///
   /// When complete, recursively calls itself to continue processing unless
   /// the process was cancelled.
-  Future _processNextItem() {
+  Future<void> _processNextItem() async {
     var item = _items.removeFirst();
-    return _processor(item).then((_) async {
-      if (_items.isNotEmpty) return await _processNextItem();
+    await _processor(item);
+    if (_items.isNotEmpty) return _processNextItem();
 
-      // We have drained the queue, stop processing and wait until something
-      // has been enqueued.
-      _isProcessing = false;
-      return null;
-    });
+    // We have drained the queue, stop processing and wait until something
+    // has been enqueued.
+    _isProcessing = false;
+    return null;
   }
 }
diff --git a/lib/src/directory_watcher/polling.dart b/lib/src/directory_watcher/polling.dart
index 8ed7590..f95deff 100644
--- a/lib/src/directory_watcher/polling.dart
+++ b/lib/src/directory_watcher/polling.dart
@@ -40,8 +40,8 @@
 
   bool get isReady => _ready.isCompleted;
 
-  Future get ready => _ready.future;
-  final _ready = Completer();
+  Future<void> get ready => _ready.future;
+  final _ready = Completer<void>();
 
   /// The amount of time the watcher pauses between successive polls of the
   /// directory contents.
@@ -129,14 +129,17 @@
 
   /// Processes [file] to determine if it has been modified since the last
   /// time it was scanned.
-  Future _processFile(String file) async {
+  Future<void> _processFile(String file) async {
     // `null` is the sentinel which means the directory listing is complete.
-    if (file == null) return _completePoll();
+    if (file == null) {
+      await _completePoll();
+      return;
+    }
 
     try {
-      var modified = await getModificationTime(file);
+      final modified = await modificationTime(file);
 
-      if (_events.isClosed) return null;
+      if (_events.isClosed) return;
 
       var lastModified = _lastModifieds[file];
 
@@ -144,16 +147,16 @@
       if (lastModified != null && lastModified == modified) {
         // The file is still here.
         _polledFiles.add(file);
-        return null;
+        return;
       }
 
-      if (_events.isClosed) return null;
+      if (_events.isClosed) return;
 
       _lastModifieds[file] = modified;
       _polledFiles.add(file);
 
       // Only notify if we're ready to emit events.
-      if (!isReady) return null;
+      if (!isReady) return;
 
       var type = lastModified == null ? ChangeType.ADD : ChangeType.MODIFY;
       _events.add(WatchEvent(type, file));
@@ -165,7 +168,7 @@
 
   /// After the directory listing is complete, this determines which files were
   /// removed and then restarts the next poll.
-  Future _completePoll() {
+  Future<void> _completePoll() async {
     // Any files that were not seen in the last poll but that we have a
     // status for must have been removed.
     var removedFiles = _lastModifieds.keys.toSet().difference(_polledFiles);
@@ -177,9 +180,8 @@
     if (!isReady) _ready.complete();
 
     // Wait and then poll again.
-    return Future.delayed(_pollingDelay).then((_) {
-      if (_events.isClosed) return;
-      _poll();
-    });
+    await Future.delayed(_pollingDelay);
+    if (_events.isClosed) return;
+    _poll();
   }
 }
diff --git a/lib/src/directory_watcher/windows.dart b/lib/src/directory_watcher/windows.dart
index 7294c17..85fef5f 100644
--- a/lib/src/directory_watcher/windows.dart
+++ b/lib/src/directory_watcher/windows.dart
@@ -52,7 +52,7 @@
 
   bool get isReady => _readyCompleter.isCompleted;
 
-  Future get ready => _readyCompleter.future;
+  Future<void> get ready => _readyCompleter.future;
   final _readyCompleter = Completer();
 
   final Map<String, _EventBatcher> _eventBatchers =
@@ -380,7 +380,7 @@
 
   /// Starts or restarts listing the watched directory to get an initial picture
   /// of its state.
-  Future _listDir() {
+  Future<void> _listDir() {
     assert(!isReady);
     if (_initialListSubscription != null) _initialListSubscription.cancel();
 
diff --git a/lib/src/file_watcher/polling.dart b/lib/src/file_watcher/polling.dart
index a0466b5..e2bf5dd 100644
--- a/lib/src/file_watcher/polling.dart
+++ b/lib/src/file_watcher/polling.dart
@@ -62,7 +62,7 @@
 
     DateTime modified;
     try {
-      modified = await getModificationTime(path);
+      modified = await modificationTime(path);
     } on FileSystemException catch (error, stackTrace) {
       if (!_eventsController.isClosed) {
         _eventsController.addError(error, stackTrace);
diff --git a/lib/src/resubscribable.dart b/lib/src/resubscribable.dart
index e00ebb0..8de3dfb 100644
--- a/lib/src/resubscribable.dart
+++ b/lib/src/resubscribable.dart
@@ -31,8 +31,8 @@
 
   bool get isReady => _readyCompleter.isCompleted;
 
-  Future get ready => _readyCompleter.future;
-  var _readyCompleter = Completer();
+  Future<void> get ready => _readyCompleter.future;
+  var _readyCompleter = Completer<void>();
 
   /// Creates a new [ResubscribableWatcher] wrapping the watchers
   /// emitted by [_factory].
@@ -41,16 +41,17 @@
     StreamSubscription subscription;
 
     _eventsController = StreamController<WatchEvent>.broadcast(
-        onListen: () {
+        onListen: () async {
           watcher = _factory();
           subscription = watcher.events.listen(_eventsController.add,
               onError: _eventsController.addError,
               onDone: _eventsController.close);
 
-          // It's important that we complete the value of [_readyCompleter] at the
-          // time [onListen] is called, as opposed to the value when [watcher.ready]
-          // fires. A new completer may be created by that time.
-          watcher.ready.then(_readyCompleter.complete);
+          // It's important that we complete the value of [_readyCompleter] at
+          // the time [onListen] is called, as opposed to the value when
+          // [watcher.ready] fires. A new completer may be created by that time.
+          await watcher.ready;
+          _readyCompleter.complete();
         },
         onCancel: () {
           // Cancel the subscription before closing the watcher so that the
diff --git a/lib/src/stat.dart b/lib/src/stat.dart
index a569208..6430d0b 100644
--- a/lib/src/stat.dart
+++ b/lib/src/stat.dart
@@ -22,10 +22,11 @@
 }
 
 /// Gets the modification time for the file at [path].
-Future<DateTime> getModificationTime(String path) {
+Future<DateTime> modificationTime(String path) async {
   if (_mockTimeCallback != null) {
-    return Future.value(_mockTimeCallback(path));
+    return _mockTimeCallback(path);
   }
 
-  return FileStat.stat(path).then((stat) => stat.modified);
+  final stat = await FileStat.stat(path);
+  return stat.modified;
 }
diff --git a/test/utils.dart b/test/utils.dart
index 57b4563..2e0ad01 100644
--- a/test/utils.dart
+++ b/test/utils.dart
@@ -79,8 +79,9 @@
 /// at the end of a test. Otherwise, if they don't occur, the test will wait
 /// indefinitely because they might in the future and because the watcher is
 /// normally only closed after the test completes.
-void startClosingEventStream() {
-  pumpEventQueue().then((_) => _watcherEvents.cancel(immediate: true));
+void startClosingEventStream() async {
+  await pumpEventQueue();
+  await _watcherEvents.cancel(immediate: true);
 }
 
 /// A list of [StreamMatcher]s that have been collected using