Add leading and trailing support to debounce (dart-lang/stream_transform#103)

Closes dart-lang/stream_transform#100

Add support for immediately emitting the leading event of a series of
closely spaced events. By default continue to only emit the trailing
events.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 3cada22..f235df1 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,4 +1,6 @@
-## 1.1.2-dev
+## 1.2.0-dev
+
+-  Add support for emitting the "leading" event in `debounce`.
 
 ## 1.1.1
 
diff --git a/pkgs/stream_transform/lib/src/rate_limit.dart b/pkgs/stream_transform/lib/src/rate_limit.dart
index 40b77ef..6341c3e 100644
--- a/pkgs/stream_transform/lib/src/rate_limit.dart
+++ b/pkgs/stream_transform/lib/src/rate_limit.dart
@@ -9,7 +9,7 @@
 
 /// Utilities to rate limit events.
 ///
-/// - [debounce] - emit the _last_ event at the _end_ of a series of closely
+/// - [debounce] - emit the the _first_ or _last_ event of a series of closely
 ///   spaced events.
 /// - [debounceBuffer] - emit _all_ events at the _end_ of a series of closely
 ///   spaced events.
@@ -17,24 +17,47 @@
 /// - [audit] - emit the _last_ event at the _end_ of the period.
 /// - [buffer] - emit _all_ events on a _trigger_.
 extension RateLimit<T> on Stream<T> {
-  /// Returns a Stream which only emits when the source stream does not emit for
-  /// [duration].
+  /// Returns a Stream which suppresses events with less inter-event spacing
+  /// than [duration].
   ///
-  /// Values will always be delayed by at least [duration], and values which
-  /// come within this time will replace the old values, only the most
-  /// recent value will be emitted.
+  /// Events which are emitted with less than [duration] elapsed between them
+  /// are considered to be part of the same "series". If [leading] is `true`,
+  /// the first event of this series is emitted immediately. If [trailing] is
+  /// `true` the last event of this series is emitted with a delay of at least
+  /// [duration]. By default only trailing events are emitted, both arguments
+  /// must be specified with `leading: true, trailing: false` to emit only
+  /// leading events.
   ///
   /// If the source stream is a broadcast stream, the result will be as well.
   /// Errors are forwarded immediately.
   ///
-  /// If there is an event waiting during the debounce period when the source
-  /// stream closes the returned stream will wait to emit it following the
-  /// debounce period before closing. If there is no pending debounced event
+  /// If there is a trailing event waiting during the debounce period when the
+  /// source stream closes the returned stream will wait to emit it following
+  /// the debounce period before closing. If there is no pending debounced event
   /// when the source stream closes the returned stream will close immediately.
   ///
+  /// For example:
+  ///
+  ///     source.debouce(Duration(seconds: 1));
+  ///
+  ///     source: 1-2-3---4---5-6-|
+  ///     result: ------3---4-----6|
+  ///
+  ///     source.debouce(Duration(seconds: 1), leading: true, trailing: false);
+  ///
+  ///     source: 1-2-3---4---5-6-|
+  ///     result: 1-------4---5---|
+  ///
+  ///     source.debouce(Duration(seconds: 1), leading: true);
+  ///
+  ///     source: 1-2-3---4---5-6-|
+  ///     result: 1-----3-4---5---6|
+  ///
   /// To collect values emitted during the debounce period see [debounceBuffer].
-  Stream<T> debounce(Duration duration) =>
-      transform(_debounceAggregate(duration, _dropPrevious));
+  Stream<T> debounce(Duration duration,
+          {bool leading = false, bool trailing = true}) =>
+      transform(_debounceAggregate(duration, _dropPrevious,
+          leading: leading, trailing: trailing));
 
   /// Returns a Stream which collects values until the source stream does not
   /// emit for [duration] then emits the collected values.
@@ -53,7 +76,8 @@
   /// To keep only the most recent event during the debounce perios see
   /// [debounce].
   Stream<List<T>> debounceBuffer(Duration duration) =>
-      transform(_debounceAggregate(duration, _collectToList));
+      transform(_debounceAggregate(duration, _collectToList,
+          leading: false, trailing: true));
 
   /// Returns a stream which only emits once per [duration], at the beginning of
   /// the period.
@@ -148,25 +172,34 @@
 /// Creates a StreamTransformer which aggregates values until the source stream
 /// does not emit for [duration], then emits the aggregated values.
 StreamTransformer<T, R> _debounceAggregate<T, R>(
-    Duration duration, R Function(T element, R soFar) collect) {
+    Duration duration, R Function(T element, R soFar) collect,
+    {bool leading, bool trailing}) {
   Timer timer;
   R soFar;
   var shouldClose = false;
+  var emittedLatestAsLeading = false;
   return fromHandlers(handleData: (T value, EventSink<R> sink) {
     timer?.cancel();
-    timer = Timer(duration, () {
+    soFar = collect(value, soFar);
+    if (timer == null && leading) {
+      emittedLatestAsLeading = true;
       sink.add(soFar);
+    } else {
+      emittedLatestAsLeading = false;
+    }
+    timer = Timer(duration, () {
+      if (trailing && !emittedLatestAsLeading) sink.add(soFar);
       if (shouldClose) {
         sink.close();
       }
       soFar = null;
       timer = null;
     });
-    soFar = collect(value, soFar);
   }, handleDone: (EventSink<R> sink) {
-    if (soFar != null) {
+    if (soFar != null && trailing) {
       shouldClose = true;
     } else {
+      timer?.cancel();
       sink.close();
     }
   });
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index 9f98a71..c1d6298 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -1,7 +1,7 @@
 name: stream_transform
 description: A collection of utilities to transform and manipulate streams.
 homepage: https://www.github.com/dart-lang/stream_transform
-version: 1.1.2-dev
+version: 1.2.0-dev
 
 environment:
   sdk: ">=2.6.0 <3.0.0"
diff --git a/pkgs/stream_transform/test/debounce_test.dart b/pkgs/stream_transform/test/debounce_test.dart
index cb027c5..de6b740 100644
--- a/pkgs/stream_transform/test/debounce_test.dart
+++ b/pkgs/stream_transform/test/debounce_test.dart
@@ -12,7 +12,7 @@
 void main() {
   for (var streamType in streamTypes) {
     group('Stream type [$streamType]', () {
-      group('debounce', () {
+      group('debounce - trailing', () {
         StreamController<int> values;
         List<int> emittedValues;
         bool valuesCanceled;
@@ -88,6 +88,96 @@
         }
       });
 
+      group('debounce - leading', () {
+        StreamController<int> values;
+        List<int> emittedValues;
+        Stream<int> transformed;
+        bool isDone;
+
+        setUp(() async {
+          values = createController(streamType);
+          emittedValues = [];
+          isDone = false;
+          transformed = values.stream.debounce(const Duration(milliseconds: 5),
+              leading: true, trailing: false)
+            ..listen(emittedValues.add, onDone: () {
+              isDone = true;
+            });
+        });
+
+        test('swallows values that come faster than duration', () async {
+          values..add(1)..add(2);
+          await values.close();
+          expect(emittedValues, [1]);
+        });
+
+        test('outputs multiple values spaced further than duration', () async {
+          values.add(1);
+          await waitForTimer(5);
+          values.add(2);
+          await waitForTimer(5);
+          expect(emittedValues, [1, 2]);
+        });
+
+        if (streamType == 'broadcast') {
+          test('multiple listeners all get values', () async {
+            var otherValues = [];
+            transformed.listen(otherValues.add);
+            values..add(1)..add(2);
+            await waitForTimer(5);
+            expect(emittedValues, [1]);
+            expect(otherValues, [1]);
+          });
+        }
+
+        test('closes output immediately if not waiting for trailing value',
+            () async {
+          values.add(1);
+          await values.close();
+          expect(isDone, true);
+        });
+      });
+
+      group('debounce - leading and trailing', () {
+        StreamController<int> values;
+        List<int> emittedValues;
+        Stream<int> transformed;
+
+        setUp(() async {
+          values = createController(streamType);
+          emittedValues = [];
+          transformed = values.stream.debounce(const Duration(milliseconds: 5),
+              leading: true, trailing: true)
+            ..listen(emittedValues.add);
+        });
+
+        test('swallows values that come faster than duration', () async {
+          values..add(1)..add(2)..add(3);
+          await values.close();
+          await waitForTimer(5);
+          expect(emittedValues, [1, 3]);
+        });
+
+        test('outputs multiple values spaced further than duration', () async {
+          values.add(1);
+          await waitForTimer(5);
+          values.add(2);
+          await waitForTimer(5);
+          expect(emittedValues, [1, 2]);
+        });
+
+        if (streamType == 'broadcast') {
+          test('multiple listeners all get values', () async {
+            var otherValues = [];
+            transformed.listen(otherValues.add);
+            values..add(1)..add(2);
+            await waitForTimer(5);
+            expect(emittedValues, [1, 2]);
+            expect(otherValues, [1, 2]);
+          });
+        }
+      });
+
       group('debounceBuffer', () {
         StreamController<int> values;
         List<List<int>> emittedValues;