Add leading and trailing support to debounce (dart-lang/stream_transform#103)
Closes dart-lang/stream_transform#100
Add support for immediately emitting the leading event of a series of
closely spaced events. By default continue to only emit the trailing
events.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 3cada22..f235df1 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,4 +1,6 @@
-## 1.1.2-dev
+## 1.2.0-dev
+
+- Add support for emitting the "leading" event in `debounce`.
## 1.1.1
diff --git a/pkgs/stream_transform/lib/src/rate_limit.dart b/pkgs/stream_transform/lib/src/rate_limit.dart
index 40b77ef..6341c3e 100644
--- a/pkgs/stream_transform/lib/src/rate_limit.dart
+++ b/pkgs/stream_transform/lib/src/rate_limit.dart
@@ -9,7 +9,7 @@
/// Utilities to rate limit events.
///
-/// - [debounce] - emit the _last_ event at the _end_ of a series of closely
+/// - [debounce] - emit the the _first_ or _last_ event of a series of closely
/// spaced events.
/// - [debounceBuffer] - emit _all_ events at the _end_ of a series of closely
/// spaced events.
@@ -17,24 +17,47 @@
/// - [audit] - emit the _last_ event at the _end_ of the period.
/// - [buffer] - emit _all_ events on a _trigger_.
extension RateLimit<T> on Stream<T> {
- /// Returns a Stream which only emits when the source stream does not emit for
- /// [duration].
+ /// Returns a Stream which suppresses events with less inter-event spacing
+ /// than [duration].
///
- /// Values will always be delayed by at least [duration], and values which
- /// come within this time will replace the old values, only the most
- /// recent value will be emitted.
+ /// Events which are emitted with less than [duration] elapsed between them
+ /// are considered to be part of the same "series". If [leading] is `true`,
+ /// the first event of this series is emitted immediately. If [trailing] is
+ /// `true` the last event of this series is emitted with a delay of at least
+ /// [duration]. By default only trailing events are emitted, both arguments
+ /// must be specified with `leading: true, trailing: false` to emit only
+ /// leading events.
///
/// If the source stream is a broadcast stream, the result will be as well.
/// Errors are forwarded immediately.
///
- /// If there is an event waiting during the debounce period when the source
- /// stream closes the returned stream will wait to emit it following the
- /// debounce period before closing. If there is no pending debounced event
+ /// If there is a trailing event waiting during the debounce period when the
+ /// source stream closes the returned stream will wait to emit it following
+ /// the debounce period before closing. If there is no pending debounced event
/// when the source stream closes the returned stream will close immediately.
///
+ /// For example:
+ ///
+ /// source.debouce(Duration(seconds: 1));
+ ///
+ /// source: 1-2-3---4---5-6-|
+ /// result: ------3---4-----6|
+ ///
+ /// source.debouce(Duration(seconds: 1), leading: true, trailing: false);
+ ///
+ /// source: 1-2-3---4---5-6-|
+ /// result: 1-------4---5---|
+ ///
+ /// source.debouce(Duration(seconds: 1), leading: true);
+ ///
+ /// source: 1-2-3---4---5-6-|
+ /// result: 1-----3-4---5---6|
+ ///
/// To collect values emitted during the debounce period see [debounceBuffer].
- Stream<T> debounce(Duration duration) =>
- transform(_debounceAggregate(duration, _dropPrevious));
+ Stream<T> debounce(Duration duration,
+ {bool leading = false, bool trailing = true}) =>
+ transform(_debounceAggregate(duration, _dropPrevious,
+ leading: leading, trailing: trailing));
/// Returns a Stream which collects values until the source stream does not
/// emit for [duration] then emits the collected values.
@@ -53,7 +76,8 @@
/// To keep only the most recent event during the debounce perios see
/// [debounce].
Stream<List<T>> debounceBuffer(Duration duration) =>
- transform(_debounceAggregate(duration, _collectToList));
+ transform(_debounceAggregate(duration, _collectToList,
+ leading: false, trailing: true));
/// Returns a stream which only emits once per [duration], at the beginning of
/// the period.
@@ -148,25 +172,34 @@
/// Creates a StreamTransformer which aggregates values until the source stream
/// does not emit for [duration], then emits the aggregated values.
StreamTransformer<T, R> _debounceAggregate<T, R>(
- Duration duration, R Function(T element, R soFar) collect) {
+ Duration duration, R Function(T element, R soFar) collect,
+ {bool leading, bool trailing}) {
Timer timer;
R soFar;
var shouldClose = false;
+ var emittedLatestAsLeading = false;
return fromHandlers(handleData: (T value, EventSink<R> sink) {
timer?.cancel();
- timer = Timer(duration, () {
+ soFar = collect(value, soFar);
+ if (timer == null && leading) {
+ emittedLatestAsLeading = true;
sink.add(soFar);
+ } else {
+ emittedLatestAsLeading = false;
+ }
+ timer = Timer(duration, () {
+ if (trailing && !emittedLatestAsLeading) sink.add(soFar);
if (shouldClose) {
sink.close();
}
soFar = null;
timer = null;
});
- soFar = collect(value, soFar);
}, handleDone: (EventSink<R> sink) {
- if (soFar != null) {
+ if (soFar != null && trailing) {
shouldClose = true;
} else {
+ timer?.cancel();
sink.close();
}
});
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index 9f98a71..c1d6298 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -1,7 +1,7 @@
name: stream_transform
description: A collection of utilities to transform and manipulate streams.
homepage: https://www.github.com/dart-lang/stream_transform
-version: 1.1.2-dev
+version: 1.2.0-dev
environment:
sdk: ">=2.6.0 <3.0.0"
diff --git a/pkgs/stream_transform/test/debounce_test.dart b/pkgs/stream_transform/test/debounce_test.dart
index cb027c5..de6b740 100644
--- a/pkgs/stream_transform/test/debounce_test.dart
+++ b/pkgs/stream_transform/test/debounce_test.dart
@@ -12,7 +12,7 @@
void main() {
for (var streamType in streamTypes) {
group('Stream type [$streamType]', () {
- group('debounce', () {
+ group('debounce - trailing', () {
StreamController<int> values;
List<int> emittedValues;
bool valuesCanceled;
@@ -88,6 +88,96 @@
}
});
+ group('debounce - leading', () {
+ StreamController<int> values;
+ List<int> emittedValues;
+ Stream<int> transformed;
+ bool isDone;
+
+ setUp(() async {
+ values = createController(streamType);
+ emittedValues = [];
+ isDone = false;
+ transformed = values.stream.debounce(const Duration(milliseconds: 5),
+ leading: true, trailing: false)
+ ..listen(emittedValues.add, onDone: () {
+ isDone = true;
+ });
+ });
+
+ test('swallows values that come faster than duration', () async {
+ values..add(1)..add(2);
+ await values.close();
+ expect(emittedValues, [1]);
+ });
+
+ test('outputs multiple values spaced further than duration', () async {
+ values.add(1);
+ await waitForTimer(5);
+ values.add(2);
+ await waitForTimer(5);
+ expect(emittedValues, [1, 2]);
+ });
+
+ if (streamType == 'broadcast') {
+ test('multiple listeners all get values', () async {
+ var otherValues = [];
+ transformed.listen(otherValues.add);
+ values..add(1)..add(2);
+ await waitForTimer(5);
+ expect(emittedValues, [1]);
+ expect(otherValues, [1]);
+ });
+ }
+
+ test('closes output immediately if not waiting for trailing value',
+ () async {
+ values.add(1);
+ await values.close();
+ expect(isDone, true);
+ });
+ });
+
+ group('debounce - leading and trailing', () {
+ StreamController<int> values;
+ List<int> emittedValues;
+ Stream<int> transformed;
+
+ setUp(() async {
+ values = createController(streamType);
+ emittedValues = [];
+ transformed = values.stream.debounce(const Duration(milliseconds: 5),
+ leading: true, trailing: true)
+ ..listen(emittedValues.add);
+ });
+
+ test('swallows values that come faster than duration', () async {
+ values..add(1)..add(2)..add(3);
+ await values.close();
+ await waitForTimer(5);
+ expect(emittedValues, [1, 3]);
+ });
+
+ test('outputs multiple values spaced further than duration', () async {
+ values.add(1);
+ await waitForTimer(5);
+ values.add(2);
+ await waitForTimer(5);
+ expect(emittedValues, [1, 2]);
+ });
+
+ if (streamType == 'broadcast') {
+ test('multiple listeners all get values', () async {
+ var otherValues = [];
+ transformed.listen(otherValues.add);
+ values..add(1)..add(2);
+ await waitForTimer(5);
+ expect(emittedValues, [1, 2]);
+ expect(otherValues, [1, 2]);
+ });
+ }
+ });
+
group('debounceBuffer', () {
StreamController<int> values;
List<List<int>> emittedValues;