Use more async/await (#70)

* Use more async/await

- Change some bare `Future` into `Future<void>` to make sure we aren't
  silently passing state where we don't expect.
- Change some methods with `.then` calls inside to `async`.

* Add another await
diff --git a/lib/src/async_queue.dart b/lib/src/async_queue.dart
index d864b1b..9f8bedf 100644
--- a/lib/src/async_queue.dart
+++ b/lib/src/async_queue.dart
@@ -5,7 +5,7 @@
 import 'dart:async';
 import 'dart:collection';
 
-typedef ItemProcessor<T> = Future Function(T item);
+typedef ItemProcessor<T> = Future<void> Function(T item);
 
 /// A queue of items that are sequentially, asynchronously processed.
 ///
@@ -57,15 +57,14 @@
   ///
   /// When complete, recursively calls itself to continue processing unless
   /// the process was cancelled.
-  Future _processNextItem() {
+  Future<void> _processNextItem() async {
     var item = _items.removeFirst();
-    return _processor(item).then((_) async {
-      if (_items.isNotEmpty) return await _processNextItem();
+    await _processor(item);
+    if (_items.isNotEmpty) return _processNextItem();
 
-      // We have drained the queue, stop processing and wait until something
-      // has been enqueued.
-      _isProcessing = false;
-      return null;
-    });
+    // We have drained the queue, stop processing and wait until something
+    // has been enqueued.
+    _isProcessing = false;
+    return null;
   }
 }
diff --git a/lib/src/directory_watcher/polling.dart b/lib/src/directory_watcher/polling.dart
index 735a807..f21a239 100644
--- a/lib/src/directory_watcher/polling.dart
+++ b/lib/src/directory_watcher/polling.dart
@@ -40,8 +40,8 @@
 
   bool get isReady => _ready.isCompleted;
 
-  Future get ready => _ready.future;
-  final _ready = Completer();
+  Future<void> get ready => _ready.future;
+  final _ready = Completer<void>();
 
   /// The amount of time the watcher pauses between successive polls of the
   /// directory contents.
@@ -129,38 +129,41 @@
 
   /// Processes [file] to determine if it has been modified since the last
   /// time it was scanned.
-  Future _processFile(String file) {
+  Future<void> _processFile(String file) async {
     // `null` is the sentinel which means the directory listing is complete.
-    if (file == null) return _completePoll();
+    if (file == null) {
+      await _completePoll();
+      return;
+    }
 
-    return getModificationTime(file).then((modified) {
-      if (_events.isClosed) return null;
+    final modified = await modificationTime(file);
 
-      var lastModified = _lastModifieds[file];
+    if (_events.isClosed) return;
 
-      // If its modification time hasn't changed, assume the file is unchanged.
-      if (lastModified != null && lastModified == modified) {
-        // The file is still here.
-        _polledFiles.add(file);
-        return null;
-      }
+    var lastModified = _lastModifieds[file];
 
-      if (_events.isClosed) return null;
-
-      _lastModifieds[file] = modified;
+    // If its modification time hasn't changed, assume the file is unchanged.
+    if (lastModified != null && lastModified == modified) {
+      // The file is still here.
       _polledFiles.add(file);
+      return;
+    }
 
-      // Only notify if we're ready to emit events.
-      if (!isReady) return null;
+    if (_events.isClosed) return;
 
-      var type = lastModified == null ? ChangeType.ADD : ChangeType.MODIFY;
-      _events.add(WatchEvent(type, file));
-    });
+    _lastModifieds[file] = modified;
+    _polledFiles.add(file);
+
+    // Only notify if we're ready to emit events.
+    if (!isReady) return;
+
+    var type = lastModified == null ? ChangeType.ADD : ChangeType.MODIFY;
+    _events.add(WatchEvent(type, file));
   }
 
   /// After the directory listing is complete, this determines which files were
   /// removed and then restarts the next poll.
-  Future _completePoll() {
+  Future<void> _completePoll() async {
     // Any files that were not seen in the last poll but that we have a
     // status for must have been removed.
     var removedFiles = _lastModifieds.keys.toSet().difference(_polledFiles);
@@ -172,9 +175,8 @@
     if (!isReady) _ready.complete();
 
     // Wait and then poll again.
-    return Future.delayed(_pollingDelay).then((_) {
-      if (_events.isClosed) return;
-      _poll();
-    });
+    await Future.delayed(_pollingDelay);
+    if (_events.isClosed) return;
+    _poll();
   }
 }
diff --git a/lib/src/directory_watcher/windows.dart b/lib/src/directory_watcher/windows.dart
index 7294c17..85fef5f 100644
--- a/lib/src/directory_watcher/windows.dart
+++ b/lib/src/directory_watcher/windows.dart
@@ -52,7 +52,7 @@
 
   bool get isReady => _readyCompleter.isCompleted;
 
-  Future get ready => _readyCompleter.future;
+  Future<void> get ready => _readyCompleter.future;
   final _readyCompleter = Completer();
 
   final Map<String, _EventBatcher> _eventBatchers =
@@ -380,7 +380,7 @@
 
   /// Starts or restarts listing the watched directory to get an initial picture
   /// of its state.
-  Future _listDir() {
+  Future<void> _listDir() {
     assert(!isReady);
     if (_initialListSubscription != null) _initialListSubscription.cancel();
 
diff --git a/lib/src/file_watcher/polling.dart b/lib/src/file_watcher/polling.dart
index a0466b5..e2bf5dd 100644
--- a/lib/src/file_watcher/polling.dart
+++ b/lib/src/file_watcher/polling.dart
@@ -62,7 +62,7 @@
 
     DateTime modified;
     try {
-      modified = await getModificationTime(path);
+      modified = await modificationTime(path);
     } on FileSystemException catch (error, stackTrace) {
       if (!_eventsController.isClosed) {
         _eventsController.addError(error, stackTrace);
diff --git a/lib/src/resubscribable.dart b/lib/src/resubscribable.dart
index e00ebb0..8de3dfb 100644
--- a/lib/src/resubscribable.dart
+++ b/lib/src/resubscribable.dart
@@ -31,8 +31,8 @@
 
   bool get isReady => _readyCompleter.isCompleted;
 
-  Future get ready => _readyCompleter.future;
-  var _readyCompleter = Completer();
+  Future<void> get ready => _readyCompleter.future;
+  var _readyCompleter = Completer<void>();
 
   /// Creates a new [ResubscribableWatcher] wrapping the watchers
   /// emitted by [_factory].
@@ -41,16 +41,17 @@
     StreamSubscription subscription;
 
     _eventsController = StreamController<WatchEvent>.broadcast(
-        onListen: () {
+        onListen: () async {
           watcher = _factory();
           subscription = watcher.events.listen(_eventsController.add,
               onError: _eventsController.addError,
               onDone: _eventsController.close);
 
-          // It's important that we complete the value of [_readyCompleter] at the
-          // time [onListen] is called, as opposed to the value when [watcher.ready]
-          // fires. A new completer may be created by that time.
-          watcher.ready.then(_readyCompleter.complete);
+          // It's important that we complete the value of [_readyCompleter] at
+          // the time [onListen] is called, as opposed to the value when
+          // [watcher.ready] fires. A new completer may be created by that time.
+          await watcher.ready;
+          _readyCompleter.complete();
         },
         onCancel: () {
           // Cancel the subscription before closing the watcher so that the
diff --git a/lib/src/stat.dart b/lib/src/stat.dart
index a569208..6430d0b 100644
--- a/lib/src/stat.dart
+++ b/lib/src/stat.dart
@@ -22,10 +22,11 @@
 }
 
 /// Gets the modification time for the file at [path].
-Future<DateTime> getModificationTime(String path) {
+Future<DateTime> modificationTime(String path) async {
   if (_mockTimeCallback != null) {
-    return Future.value(_mockTimeCallback(path));
+    return _mockTimeCallback(path);
   }
 
-  return FileStat.stat(path).then((stat) => stat.modified);
+  final stat = await FileStat.stat(path);
+  return stat.modified;
 }
diff --git a/test/utils.dart b/test/utils.dart
index 57b4563..2e0ad01 100644
--- a/test/utils.dart
+++ b/test/utils.dart
@@ -79,8 +79,9 @@
 /// at the end of a test. Otherwise, if they don't occur, the test will wait
 /// indefinitely because they might in the future and because the watcher is
 /// normally only closed after the test completes.
-void startClosingEventStream() {
-  pumpEventQueue().then((_) => _watcherEvents.cancel(immediate: true));
+void startClosingEventStream() async {
+  await pumpEventQueue();
+  await _watcherEvents.cancel(immediate: true);
 }
 
 /// A list of [StreamMatcher]s that have been collected using