Rewrite a StreamTransformer as an extension (dart-lang/watcher#102)

diff --git a/pkgs/watcher/lib/src/directory_watcher/linux.dart b/pkgs/watcher/lib/src/directory_watcher/linux.dart
index 06d3508..1bf5efd 100644
--- a/pkgs/watcher/lib/src/directory_watcher/linux.dart
+++ b/pkgs/watcher/lib/src/directory_watcher/linux.dart
@@ -81,8 +81,7 @@
     })));
 
     // Batch the inotify changes together so that we can dedup events.
-    var innerStream = _nativeEvents.stream
-        .transform(BatchedStreamTransformer<FileSystemEvent>());
+    var innerStream = _nativeEvents.stream.batchEvents();
     _listen(innerStream, _onBatch, onError: _eventsController.addError);
 
     _listen(Directory(path).list(recursive: true), (FileSystemEntity entity) {
diff --git a/pkgs/watcher/lib/src/directory_watcher/mac_os.dart b/pkgs/watcher/lib/src/directory_watcher/mac_os.dart
index 7dc6e7c..4ad94d4 100644
--- a/pkgs/watcher/lib/src/directory_watcher/mac_os.dart
+++ b/pkgs/watcher/lib/src/directory_watcher/mac_os.dart
@@ -359,9 +359,7 @@
   /// Start or restart the underlying [Directory.watch] stream.
   void _startWatch() {
     // Batch the FSEvent changes together so that we can dedup events.
-    var innerStream = Directory(path)
-        .watch(recursive: true)
-        .transform(BatchedStreamTransformer<FileSystemEvent>());
+    var innerStream = Directory(path).watch(recursive: true).batchEvents();
     _watchSubscription = innerStream.listen(_onBatch,
         onError: _eventsController.addError, onDone: _onDone);
   }
diff --git a/pkgs/watcher/lib/src/file_watcher/native.dart b/pkgs/watcher/lib/src/file_watcher/native.dart
index f7d92d4..48f12e6 100644
--- a/pkgs/watcher/lib/src/file_watcher/native.dart
+++ b/pkgs/watcher/lib/src/file_watcher/native.dart
@@ -47,7 +47,7 @@
     // Batch the events together so that we can dedup them.
     _subscription = File(path)
         .watch()
-        .transform(BatchedStreamTransformer<FileSystemEvent>())
+        .batchEvents()
         .listen(_onBatch, onError: _eventsController.addError, onDone: _onDone);
   }
 
diff --git a/pkgs/watcher/lib/src/utils.dart b/pkgs/watcher/lib/src/utils.dart
index 66c59d3..ecf4e10 100644
--- a/pkgs/watcher/lib/src/utils.dart
+++ b/pkgs/watcher/lib/src/utils.dart
@@ -20,16 +20,15 @@
 Set<T> unionAll<T>(Iterable<Set<T>> sets) =>
     sets.fold(<T>{}, (union, set) => union.union(set));
 
-/// A stream transformer that batches all events that are sent at the same time.
-///
-/// When multiple events are synchronously added to a stream controller, the
-/// [StreamController] implementation uses [scheduleMicrotask] to schedule the
-/// asynchronous firing of each event. In order to recreate the synchronous
-/// batches, this collates all the events that are received in "nearby"
-/// microtasks.
-class BatchedStreamTransformer<T> extends StreamTransformerBase<T, List<T>> {
-  @override
-  Stream<List<T>> bind(Stream<T> input) {
+extension BatchEvents<T> on Stream<T> {
+  /// Batches all events that are sent at the same time.
+  ///
+  /// When multiple events are synchronously added to a stream controller, the
+  /// [StreamController] implementation uses [scheduleMicrotask] to schedule the
+  /// asynchronous firing of each event. In order to recreate the synchronous
+  /// batches, this collates all the events that are received in "nearby"
+  /// microtasks.
+  Stream<List<T>> batchEvents() {
     var batch = Queue<T>();
     return StreamTransformer<T, List<T>>.fromHandlers(
         handleData: (event, sink) {
@@ -48,6 +47,6 @@
         batch.clear();
       }
       sink.close();
-    }).bind(input);
+    }).bind(this);
   }
 }