blob: 347919bde7e9948007a39842346559d0810aeb9d [file] [log] [blame]
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'aggregate_sample.dart';
import 'from_handlers.dart';
/// Utilities to rate limit events.
///
/// - [debounce] - emit the the _first_ or _last_ event of a series of closely
/// spaced events.
/// - [debounceBuffer] - emit _all_ events at the _end_ of a series of closely
/// spaced events.
/// - [throttle] - emit the _first_ event at the _beginning_ of the period.
/// - [audit] - emit the _last_ event at the _end_ of the period.
/// - [buffer] - emit _all_ events on a _trigger_.
extension RateLimit<T> on Stream<T> {
/// Suppresses events with less inter-event spacing than [duration].
///
/// Events which are emitted with less than [duration] elapsed between them
/// are considered to be part of the same "series". If [leading] is `true`,
/// the first event of this series is emitted immediately. If [trailing] is
/// `true` the last event of this series is emitted with a delay of at least
/// [duration]. By default only trailing events are emitted, both arguments
/// must be specified with `leading: true, trailing: false` to emit only
/// leading events.
///
/// If this stream is a broadcast stream, the result will be as well.
/// Errors are forwarded immediately.
///
/// If there is a trailing event waiting during the debounce period when the
/// source stream closes the returned stream will wait to emit it following
/// the debounce period before closing. If there is no pending debounced event
/// when this stream closes the returned stream will close immediately.
///
/// For example:
///
/// source.debounce(Duration(seconds: 1));
///
/// source: 1-2-3---4---5-6-|
/// result: ------3---4-----6|
///
/// source.debounce(Duration(seconds: 1), leading: true, trailing: false);
///
/// source: 1-2-3---4---5-6-|
/// result: 1-------4---5---|
///
/// source.debounce(Duration(seconds: 1), leading: true);
///
/// source: 1-2-3---4---5-6-|
/// result: 1-----3-4---5---6|
///
/// To collect values emitted during the debounce period see [debounceBuffer].
Stream<T> debounce(Duration duration,
{bool leading = false, bool trailing = true}) =>
_debounceAggregate(duration, _dropPrevious,
leading: leading, trailing: trailing);
/// Buffers values until this stream does not emit for [duration] then emits
/// the collected values.
///
/// Values will always be delayed by at least [duration], and values which
/// come within this time will be aggregated into the same list.
///
/// If this stream is a broadcast stream, the result will be as well.
/// Errors are forwarded immediately.
///
/// If there are events waiting during the debounce period when this stream
/// closes the returned stream will wait to emit them following the debounce
/// period before closing. If there are no pending debounced events when this
/// stream closes the returned stream will close immediately.
///
/// To keep only the most recent event during the debounce period see
/// [debounce].
Stream<List<T>> debounceBuffer(Duration duration) =>
_debounceAggregate(duration, _collect, leading: false, trailing: true);
/// Reduces the rate that events are emitted to at most once per [duration].
///
/// No events will ever be emitted within [duration] of another event on the
/// result stream.
/// If this stream is a broadcast stream, the result will be as well.
/// Errors are forwarded immediately.
///
/// If [trailing] is `false`, source events emitted during the [duration]
/// period following a result event are discarded.
/// The result stream will not emit an event until this stream emits an event
/// following the throttled period.
/// If this stream is consistently emitting events with less than
/// [duration] between events, the time between events on the result stream
/// may still be more than [duration].
/// The result stream will close immediately when this stream closes.
///
/// If [trailing] is `true`, the latest source event emitted during the
/// [duration] period following an result event is held and emitted following
/// the period.
/// If this stream is consistently emitting events with less than [duration]
/// between events, the time between events on the result stream will be
/// [duration].
/// If this stream closes the result stream will wait to emit a pending event
/// before closing.
///
/// For example:
///
/// source.throtte(Duration(seconds: 6));
///
/// source: 1-2-3---4-5-6---7-8-|
/// result: 1-------4-------7---|
///
/// source.throttle(Duration(seconds: 6), trailing: true);
///
/// source: 1-2-3---4-5----6--|
/// result: 1-----3-----5-----6|
///
/// source.throttle(Duration(seconds: 6), trailing: true);
///
/// source: 1-2-----------3|
/// result: 1-----2-------3|
///
/// See also:
/// - [audit], which emits the most recent event at the end of the period.
/// Compared to `audit`, `throttle` will not introduce delay to forwarded
/// elements, except for the [trailing] events.
/// - [debounce], which uses inter-event spacing instead of a fixed period
/// from the first event in a window. Compared to `debouce`, `throttle` cannot
/// be starved by having events emitted continuously within [duration].
Stream<T> throttle(Duration duration, {bool trailing = false}) =>
trailing ? _throttleTrailing(duration) : _throttle(duration);
Stream<T> _throttle(Duration duration) {
Timer? timer;
return transformByHandlers(onData: (data, sink) {
if (timer == null) {
sink.add(data);
timer = Timer(duration, () {
timer = null;
});
}
});
}
Stream<T> _throttleTrailing(Duration duration) {
Timer? timer;
T? pending;
var hasPending = false;
var isDone = false;
return transformByHandlers(onData: (data, sink) {
void onTimer() {
if (hasPending) {
sink.add(pending as T);
if (isDone) {
sink.close();
} else {
timer = Timer(duration, onTimer);
hasPending = false;
pending = null;
}
} else {
timer = null;
}
}
if (timer == null) {
sink.add(data);
timer = Timer(duration, onTimer);
} else {
hasPending = true;
pending = data;
}
}, onDone: (sink) {
isDone = true;
if (hasPending) return; // Will be closed by timer.
sink.close();
timer?.cancel();
timer = null;
});
}
/// Audit a single event from each [duration] length period where there are
/// events on this stream.
///
/// No events will ever be emitted within [duration] of another event on the
/// result stream.
/// If this stream is a broadcast stream, the result will be as well.
/// Errors are forwarded immediately.
///
/// The first event will begin the audit period. At the end of the audit
/// period the most recent event is emitted, and the next event restarts the
/// audit period.
///
/// If the event that started the period is the one that is emitted it will be
/// delayed by [duration]. If a later event comes in within the period it's
/// delay will be shorter by the difference in arrival times.
///
/// If there is no pending event when this stream closes the output
/// stream will close immediately. If there is a pending event the output
/// stream will wait to emit it before closing.
///
/// For example:
///
/// source.audit(Duration(seconds: 5));
///
/// source: a------b--c----d--|
/// output: -----a------c--------d|
///
/// See also:
/// - [throttle], which emits the _first_ event during the window, instead of
/// the last event in the window. Compared to `throttle`, `audit` will
/// introduce delay to forwarded events.
/// - [debounce], which only emits after the stream has not emitted for some
/// period. Compared to `debouce`, `audit` cannot be starved by having events
/// emitted continuously within [duration].
Stream<T> audit(Duration duration) {
Timer? timer;
var shouldClose = false;
T recentData;
return transformByHandlers(onData: (data, sink) {
recentData = data;
timer ??= Timer(duration, () {
sink.add(recentData);
timer = null;
if (shouldClose) {
sink.close();
}
});
}, onDone: (sink) {
if (timer != null) {
shouldClose = true;
} else {
sink.close();
}
});
}
/// Buffers the values emitted on this stream and emits them when [trigger]
/// emits an event.
///
/// If [longPoll] is `false`, if there are no buffered values when [trigger]
/// emits an empty list is immediately emitted.
///
/// If [longPoll] is `true`, and there are no buffered values when [trigger]
/// emits one or more events, then the *next* value from this stream is
/// immediately emitted on the returned stream as a single element list.
/// Subsequent events on [trigger] while there have been no events on this
/// stream are ignored.
///
/// The result stream will close as soon as there is a guarantee it will not
/// emit any more events. There will not be any more events emitted if:
/// - [trigger] is closed and there is no waiting long poll.
/// - Or, this stream is closed and previously buffered events have been
/// delivered.
///
/// If this stream is a broadcast stream, the result will be as well.
/// Errors from this stream or the trigger are immediately forwarded to the
/// output.
///
/// See also:
/// - [sample] which use a [trigger] stream in the same way, but keeps only
/// the most recent source event.
Stream<List<T>> buffer(Stream<void> trigger, {bool longPoll = true}) =>
aggregateSample(
trigger: trigger,
aggregate: _collect,
longPoll: longPoll,
onEmpty: _empty);
/// Emits the most recent new value from this stream when [trigger] emits an
/// event.
///
/// If [longPoll] is `false`, then an event on [trigger] when there is no
/// pending source event will be ignored.
/// If [longPoll] is `true` (the default), then an event on [trigger] when
/// there is no pending source event will cause the next source event
/// to immediately flow to the result stream.
///
/// If [longPoll] is `false`, if there is no pending source event when
/// [trigger] emits, then the trigger event will be ignored.
///
/// If [longPoll] is `true`, and there are no buffered values when [trigger]
/// emits one or more events, then the *next* value from this stream is
/// immediately emitted on the returned stream as a single element list.
/// Subsequent events on [trigger] while there have been no events on this
/// stream are ignored.
///
/// The result stream will close as soon as there is a guarantee it will not
/// emit any more events. There will not be any more events emitted if:
/// - [trigger] is closed and there is no waiting long poll.
/// - Or, this source stream is closed and any pending source event has been
/// delivered.
///
/// If this source stream is a broadcast stream, the result will be as well.
/// Errors from this source stream or the trigger are immediately forwarded to
/// the output.
///
/// See also:
/// - [buffer] which use [trigger] stream in the same way, but keeps a list of
/// pending source events.
Stream<T> sample(Stream<void> trigger, {bool longPoll = true}) =>
aggregateSample(
trigger: trigger,
aggregate: _dropPrevious,
longPoll: longPoll,
onEmpty: _ignore);
/// Aggregates values until this source stream does not emit for [duration],
/// then emits the aggregated values.
Stream<S> _debounceAggregate<S>(
Duration duration, S Function(T element, S? soFar) collect,
{required bool leading, required bool trailing}) {
Timer? timer;
S? soFar;
var hasPending = false;
var shouldClose = false;
var emittedLatestAsLeading = false;
return transformByHandlers(onData: (value, sink) {
void emit() {
sink.add(soFar as S);
soFar = null;
hasPending = false;
}
timer?.cancel();
soFar = collect(value, soFar);
hasPending = true;
if (timer == null && leading) {
emittedLatestAsLeading = true;
emit();
} else {
emittedLatestAsLeading = false;
}
timer = Timer(duration, () {
if (trailing && !emittedLatestAsLeading) emit();
if (shouldClose) sink.close();
timer = null;
});
}, onDone: (EventSink<S> sink) {
if (hasPending && trailing) {
shouldClose = true;
} else {
timer?.cancel();
sink.close();
}
});
}
}
T _dropPrevious<T>(T element, _) => element;
List<T> _collect<T>(T event, List<T>? soFar) => (soFar ?? <T>[])..add(event);
void _empty<T>(Sink<List<T>> sink) => sink.add([]);
void _ignore<T>(Sink<T> sink) {}