Add trailing argument to throttle (dart-lang/stream_transform#123)

Closes dart-lang/stream_transform#122

The existing rate limit utilities don't cover the case of wanting to
update a UI with some changing data with a limited frequency. In this
case the goal is to get the "latest" event with minimal lag time, while
maintaining at least a given duration in between events. Add a
`trailing` argument to `throttle` for an operator which will emit
immediately when possible (unlike `audit`), and does not get starved by a
long series of frequent events (unlike `debounce`).

Expand the doc with the same timeline diagrams as used for other
operators.

Split the implementation into two methods to avoid extra conditional
checks and unused variables for the common case of `trailing: false`.
Add tests for the behavior of emitting the trailing event, suppressing
intermediate events, and keeping the stream open until the final
trailing even is emitted.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 68f8859..2bae497 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -3,6 +3,7 @@
 - Migrate to null safety.
 - Improve tests of `switchMap` and improve documentation with links and
   clarification.
+- Add `trailing` argument to `throttle`.
 
 ## 1.2.0
 
diff --git a/pkgs/stream_transform/lib/src/rate_limit.dart b/pkgs/stream_transform/lib/src/rate_limit.dart
index d558f61..6a02703 100644
--- a/pkgs/stream_transform/lib/src/rate_limit.dart
+++ b/pkgs/stream_transform/lib/src/rate_limit.dart
@@ -73,7 +73,7 @@
   /// debounce period before closing. If there are no pending debounced events
   /// when the source stream closes the returned stream will close immediately.
   ///
-  /// To keep only the most recent event during the debounce perios see
+  /// To keep only the most recent event during the debounce period see
   /// [debounce].
   Stream<List<T>> debounceBuffer(Duration duration) =>
       _debounceAggregate(duration, _collect, leading: false, trailing: true);
@@ -81,9 +81,46 @@
   /// Returns a stream which only emits once per [duration], at the beginning of
   /// the period.
   ///
-  /// Events emitted by the source stream within [duration] following an emitted
-  /// event will be discarded. Errors are always forwarded immediately.
-  Stream<T> throttle(Duration duration) {
+  /// No events will ever be emitted within [duration] of another event on the
+  /// result stream.
+  /// If the source stream is a broadcast stream, the result will be as well.
+  /// Errors are forwarded immediately.
+  ///
+  /// If [trailing] is `false`, source events emitted during the [duration]
+  /// period following a result event are discarded. The result stream will not
+  /// emit an event until the source stream emits an event following the
+  /// throttled period. If the source stream is consistently emitting events
+  /// with less than [duration] between events, the time between events on the
+  /// result stream may still be more than [duration]. The result stream will
+  /// close immediately when the source stream closes.
+  ///
+  /// If [trailing] is `true`, the latest source event emitted during the
+  /// [duration] period following an result event is held and emitted following
+  /// the period. If the source stream is consistently emitting events with less
+  /// than [duration] between events, the time between events on the result
+  /// stream will be [duration]. If the source stream closes the result stream
+  /// will wait to emit a pending event before closing.
+  ///
+  /// For example:
+  ///
+  ///     source.throtte(Duration(seconds: 6));
+  ///
+  ///     source: 1-2-3---4-5-6---7-8-|
+  ///     result: 1-------4-------7---|
+  ///
+  ///     source.throttle(Duration(seconds: 6), trailing: true);
+  ///
+  ///     source: 1-2-3---4-5----6--|
+  ///     result: 1-----3-----5-----6|
+  ///
+  ///     source.throttle(Duration(seconds: 6), trailing: true);
+  ///
+  ///     source: 1-2-----------3|
+  ///     result: 1-----2-------3|
+  Stream<T> throttle(Duration duration, {bool trailing = false}) =>
+      trailing ? _throttleTrailing(duration) : _throttle(duration);
+
+  Stream<T> _throttle(Duration duration) {
     Timer? timer;
 
     return transformByHandlers(onData: (data, sink) {
@@ -96,6 +133,43 @@
     });
   }
 
+  Stream<T> _throttleTrailing(Duration duration) {
+    Timer? timer;
+    T? pending;
+    var hasPending = false;
+    var isDone = false;
+
+    return transformByHandlers(onData: (data, sink) {
+      void onTimer() {
+        if (hasPending) {
+          sink.add(pending as T);
+          if (isDone) {
+            sink.close();
+          } else {
+            timer = Timer(duration, onTimer);
+            hasPending = false;
+            pending = null;
+          }
+        } else {
+          timer = null;
+        }
+      }
+
+      if (timer == null) {
+        sink.add(data);
+        timer = Timer(duration, onTimer);
+      } else {
+        hasPending = true;
+        pending = data;
+      }
+    }, onDone: (sink) {
+      isDone = true;
+      if (hasPending) return; // Will be closed by timer.
+      timer?.cancel();
+      timer = null;
+    });
+  }
+
   /// Returns a Stream which only emits once per [duration], at the end of the
   /// period.
   ///
diff --git a/pkgs/stream_transform/test/throttle_test.dart b/pkgs/stream_transform/test/throttle_test.dart
index d84fdf4..27d9b11 100644
--- a/pkgs/stream_transform/test/throttle_test.dart
+++ b/pkgs/stream_transform/test/throttle_test.dart
@@ -19,7 +19,7 @@
       late Stream<int> transformed;
       late StreamSubscription<int> subscription;
 
-      group('throttle', () {
+      group('throttle - trailing: false', () {
         setUp(() async {
           valuesCanceled = false;
           values = createController(streamType)
@@ -64,7 +64,7 @@
 
         if (streamType == 'broadcast') {
           test('multiple listeners all get values', () async {
-            var otherValues = [];
+            var otherValues = <int>[];
             transformed.listen(otherValues.add);
             values.add(1);
             await Future(() {});
@@ -73,6 +73,64 @@
           });
         }
       });
+
+      group('throttle - trailing: true', () {
+        setUp(() async {
+          valuesCanceled = false;
+          values = createController(streamType)
+            ..onCancel = () {
+              valuesCanceled = true;
+            };
+          emittedValues = [];
+          isDone = false;
+          transformed = values.stream
+              .throttle(const Duration(milliseconds: 5), trailing: true);
+          subscription = transformed.listen(emittedValues.add, onDone: () {
+            isDone = true;
+          });
+        });
+
+        test('emits both first and last in a period', () async {
+          values..add(1)..add(2);
+          await values.close();
+          await waitForTimer(5);
+          expect(emittedValues, [1, 2]);
+        });
+
+        test('swallows values that are not the latest in a period', () async {
+          values..add(1)..add(2)..add(3);
+          await values.close();
+          await waitForTimer(5);
+          expect(emittedValues, [1, 3]);
+        });
+
+        test('waits to output the last value even if the stream closes',
+            () async {
+          values..add(1)..add(2);
+          await values.close();
+          await Future(() {});
+          expect(isDone, false);
+          expect(emittedValues, [1],
+              reason: 'Should not be emitted until after duration');
+          await waitForTimer(5);
+          expect(emittedValues, [1, 2]);
+          expect(isDone, true);
+        });
+
+        if (streamType == 'broadcast') {
+          test('multiple listeners all get values', () async {
+            var otherValues = <int>[];
+            transformed.listen(otherValues.add);
+            values..add(1)..add(2);
+            await Future(() {});
+            expect(emittedValues, [1]);
+            expect(otherValues, [1]);
+            await waitForTimer(5);
+            expect(emittedValues, [1, 2]);
+            expect(otherValues, [1, 2]);
+          });
+        }
+      });
     });
   }
 }