Rewrite a StreamTransformer as an extension (dart-lang/watcher#102)
diff --git a/pkgs/watcher/lib/src/directory_watcher/linux.dart b/pkgs/watcher/lib/src/directory_watcher/linux.dart index 06d3508..1bf5efd 100644 --- a/pkgs/watcher/lib/src/directory_watcher/linux.dart +++ b/pkgs/watcher/lib/src/directory_watcher/linux.dart
@@ -81,8 +81,7 @@ }))); // Batch the inotify changes together so that we can dedup events. - var innerStream = _nativeEvents.stream - .transform(BatchedStreamTransformer<FileSystemEvent>()); + var innerStream = _nativeEvents.stream.batchEvents(); _listen(innerStream, _onBatch, onError: _eventsController.addError); _listen(Directory(path).list(recursive: true), (FileSystemEntity entity) {
diff --git a/pkgs/watcher/lib/src/directory_watcher/mac_os.dart b/pkgs/watcher/lib/src/directory_watcher/mac_os.dart index 7dc6e7c..4ad94d4 100644 --- a/pkgs/watcher/lib/src/directory_watcher/mac_os.dart +++ b/pkgs/watcher/lib/src/directory_watcher/mac_os.dart
@@ -359,9 +359,7 @@ /// Start or restart the underlying [Directory.watch] stream. void _startWatch() { // Batch the FSEvent changes together so that we can dedup events. - var innerStream = Directory(path) - .watch(recursive: true) - .transform(BatchedStreamTransformer<FileSystemEvent>()); + var innerStream = Directory(path).watch(recursive: true).batchEvents(); _watchSubscription = innerStream.listen(_onBatch, onError: _eventsController.addError, onDone: _onDone); }
diff --git a/pkgs/watcher/lib/src/file_watcher/native.dart b/pkgs/watcher/lib/src/file_watcher/native.dart index f7d92d4..48f12e6 100644 --- a/pkgs/watcher/lib/src/file_watcher/native.dart +++ b/pkgs/watcher/lib/src/file_watcher/native.dart
@@ -47,7 +47,7 @@ // Batch the events together so that we can dedup them. _subscription = File(path) .watch() - .transform(BatchedStreamTransformer<FileSystemEvent>()) + .batchEvents() .listen(_onBatch, onError: _eventsController.addError, onDone: _onDone); }
diff --git a/pkgs/watcher/lib/src/utils.dart b/pkgs/watcher/lib/src/utils.dart index 66c59d3..ecf4e10 100644 --- a/pkgs/watcher/lib/src/utils.dart +++ b/pkgs/watcher/lib/src/utils.dart
@@ -20,16 +20,15 @@ Set<T> unionAll<T>(Iterable<Set<T>> sets) => sets.fold(<T>{}, (union, set) => union.union(set)); -/// A stream transformer that batches all events that are sent at the same time. -/// -/// When multiple events are synchronously added to a stream controller, the -/// [StreamController] implementation uses [scheduleMicrotask] to schedule the -/// asynchronous firing of each event. In order to recreate the synchronous -/// batches, this collates all the events that are received in "nearby" -/// microtasks. -class BatchedStreamTransformer<T> extends StreamTransformerBase<T, List<T>> { - @override - Stream<List<T>> bind(Stream<T> input) { +extension BatchEvents<T> on Stream<T> { + /// Batches all events that are sent at the same time. + /// + /// When multiple events are synchronously added to a stream controller, the + /// [StreamController] implementation uses [scheduleMicrotask] to schedule the + /// asynchronous firing of each event. In order to recreate the synchronous + /// batches, this collates all the events that are received in "nearby" + /// microtasks. + Stream<List<T>> batchEvents() { var batch = Queue<T>(); return StreamTransformer<T, List<T>>.fromHandlers( handleData: (event, sink) { @@ -48,6 +47,6 @@ batch.clear(); } sink.close(); - }).bind(input); + }).bind(this); } }