blob: 604376e8eb0eae529fb933b9f169869f13fb910e [file] [log] [blame]
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:io';
import 'event.dart';
import 'paths.dart';
import 'testing.dart';
/// Buffers [FileSystemEvent] streams into batches of events.
///
/// Two batching strategies are available: "nearby microtask" and "buffered by
/// path".
extension BatchEvents on Stream<FileSystemEvent> {
/// Batches all events that are sent at the same time.
///
/// When multiple events are synchronously added to a stream controller, the
/// [StreamController] implementation uses [scheduleMicrotask] to schedule the
/// asynchronous firing of each event. In order to recreate the synchronous
/// batches, this collates all the events that are received in "nearby"
/// microtasks.
///
/// Converts to [Event] using [Event.checkAndConvert], discarding events for
/// which it returns `null`.
Stream<List<Event>> batchNearbyMicrotasksAndConvertEvents() {
var batch = <Event>[];
return StreamTransformer<FileSystemEvent, List<Event>>.fromHandlers(
handleData: (event, sink) {
var convertedEvent = Event.checkAndConvert(event);
if (convertedEvent == null) return;
batch.add(convertedEvent);
// [Timer.run] schedules an event that runs after any microtasks that have
// been scheduled.
Timer.run(() {
if (batch.isEmpty) return;
sink.add(batch.toList());
batch.clear();
});
}, handleDone: (sink) {
if (batch.isNotEmpty) {
sink.add(batch.toList());
batch.clear();
}
sink.close();
}).bind(this);
}
/// Batches events by path.
///
/// For each path, events are emitted when they are at least [duration] old.
/// Rather than emitting split by path, all pending events are periodically
/// checked and all old-enough events are emitted in one batch.
Stream<List<Event>> batchBufferedByPathAndConvertEvents(
{required Duration duration}) {
final batcher = _PathBufferedBatcher(duration);
return StreamTransformer<FileSystemEvent, List<Event>>.fromHandlers(
handleData: batcher.handleData, handleDone: batcher.handleDone)
.bind(this);
}
}
class _PathBufferedBatcher {
final bufferedEvents = <AbsolutePath, _BufferedEvents>{};
final Duration duration;
bool checkAndEmitIsPending = false;
_PathBufferedBatcher(this.duration);
/// Adds events to [bufferedEvents].
///
/// Calls [maybeScheduleCheckAndEmit] to schedule a [checkAndEmit] if none is
/// already pending.
void handleData(FileSystemEvent event, Sink<List<Event>> sink) {
final convertedEvent = Event.checkAndConvert(event);
if (convertedEvent == null) return;
for (final splitEvent in convertedEvent.splitIfMove()) {
bufferedEvents
.putIfAbsent(splitEvent.absolutePath, _BufferedEvents.new)
.add(splitEvent);
}
maybeScheduleCheckAndEmit(sink);
}
/// If there is no timer running and there are events buffered, starts a timer
/// with delay [duration] that will call [checkAndEmit] on [sink].
void maybeScheduleCheckAndEmit(Sink<List<Event>> sink) {
if (checkAndEmitIsPending) return;
if (bufferedEvents.isEmpty) return;
checkAndEmitIsPending = true;
Timer(duration, () => checkAndEmit(sink));
}
/// Emits events older than [duration] to [sink].
///
/// If any events remain, calls [maybeScheduleCheckAndEmit] to schedule
/// another check.
void checkAndEmit(Sink<List<Event>> sink) {
checkAndEmitIsPending = false;
final events = <Event>[];
final sendEventsBefore = overridableDateTimeNow().subtract(duration);
for (var entry in bufferedEvents.entries.toList()) {
if (entry.value.lastUpdated.isBefore(sendEventsBefore)) {
events.addAll(entry.value.events);
bufferedEvents.remove(entry.key);
}
}
if (events.isNotEmpty) {
sink.add(events);
}
maybeScheduleCheckAndEmit(sink);
}
/// Flushes buffered events and closes the [sink].
void handleDone(Sink<List<Event>> sink) {
if (bufferedEvents.isNotEmpty) {
sink.add(bufferedEvents.values.expand((x) => x.events).toList());
bufferedEvents.clear();
}
sink.close();
}
}
class _BufferedEvents {
final List<Event> events = [];
DateTime _lastUpdated;
_BufferedEvents() : _lastUpdated = overridableDateTimeNow();
void add(Event event) {
events.add(event);
_lastUpdated = overridableDateTimeNow();
}
DateTime get lastUpdated => _lastUpdated;
}