Implement `sample`, add `longPoll` to `buffer` (dart-lang/stream_transform#147)

Closes dart-lang/stream_transform#145

Add `longPoll` support to `aggregateSample`. This method is a package
private implementation detail.
- Refactor to named arguments to make changes smoother.
- Add `longPoll` boolean argument and conditional behavior in each
  stream callback.
- Add `onEmpty` callback to let the caller decide whether to emit some
  default (empty list in the `buffer` case) or not emit at all (in the
  `sample` case) for triggers which do no set up a long poll and have
  no prior aggregated events.
- Switch `waitingForTrigger` to the inverted condition `activeLongPoll`
  since it is a closer match to how the behavior is described in the docs.

Add `longPoll` argument to `buffer` and update the doc to describe the
conditional behavior. Add a section in the docs discussing the end of
stream behavior.

Add a `sample` method which is similar to `buffer`. Cross reference each
method in the docs.

Update the comparison table with Rx. Show that `buffer` is equivalent to
`buffer(longPoll: false)`, that `sample` is `sampel(longPoll: false)`. I
cannot find an equivalent Rx method to the `longPoll: true` behavior.
diff --git a/pkgs/stream_transform/README.md b/pkgs/stream_transform/README.md
index d3c71c6..ee0ac2e 100644
--- a/pkgs/stream_transform/README.md
+++ b/pkgs/stream_transform/README.md
@@ -89,13 +89,13 @@
 
 Rx Operator Category      | variation                                              | `stream_transform`
 ------------------------- | ------------------------------------------------------ | ------------------
-[`sample`][rx_sample]     | `sample/throttleLast(Duration)`                        | No equivalent
+[`sample`][rx_sample]     | `sample/throttleLast(Duration)`                        | `sample(Stream.periodic(Duration), longPoll: false)`
 ​                  | `throttleFirst(Duration)`                              | [`throttle`][throttle]
-​                  | `sample(Observable)`                                   | No equivalent
+​                  | `sample(Observable)`                                   | `sample(trigger, longPoll: false)`
 [`debounce`][rx_debounce] | `debounce/throttleWithTimeout(Duration)`               | [`debounce`][debounce]
 ​                  | `debounce(Observable)`                                 | No equivalent
 [`buffer`][rx_buffer]     | `buffer(boundary)`, `bufferWithTime`,`bufferWithCount` | No equivalent
-​                  | `buffer(boundaryClosingSelector)`                      | No equivalent[^1]
+​                  | `buffer(boundaryClosingSelector)`                      | `buffer(trigger, longPoll: false)`
 RxJs extensions           | [`audit(callback)`][rxjs_audit]                        | No equivalent
 ​                  | [`auditTime(Duration)`][rxjs_auditTime]                | [`audit`][audit]
 ​                  | [`exhaustMap`][rxjs_exhaustMap]                        | No equivalent
@@ -103,7 +103,8 @@
 ​                  | `throttleTime(leading: false, trailing: true)`         | No equivalent
 No equivalent?            |                                                        | [`asyncMapBuffer`][asyncMapBuffer]
 ​                  |                                                        | [`asyncMapSample`][asyncMapSample]
-​                  |                                                        | [`buffer`][buffer][^1]
+​                  |                                                        | [`buffer`][buffer]
+​                  |                                                        | [`sample`][sample]
 ​                  |                                                        | [`debounceBuffer`][debounceBuffer]
 ​                  |                                                        | `debounce(leading: true, trailing: false)`
 ​                  |                                                        | `debounce(leading: true, trailing: true)`
@@ -119,15 +120,10 @@
 [asyncMapSample]:https://pub.dev/documentation/stream_transform/latest/stream_transform/AsyncMap/asyncMapSample.html
 [audit]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/audit.html
 [buffer]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/buffer.html
+[sample]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/sample.html
 [debounceBuffer]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/debounceBuffer.html
 [debounce]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/debounce.html
 [throttle]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/throttle.html
-[^1]: `stream_transform` `buffer` is closest to
-    [`buffer(bufferClosingSelector)`][rx_buffer], except where the trigger
-    emits while no events are buffered. ReactiveX will immediately emit and
-    empty list, while `stream_transform` will wait and emit a single element
-    list when the next event occurs. You can think of it like
-    `stream_transform` implemeting the "long polling" version of `buffer`.
 
 # Getting a `StreamTransformer` instance
 
diff --git a/pkgs/stream_transform/lib/src/aggregate_sample.dart b/pkgs/stream_transform/lib/src/aggregate_sample.dart
index 0eea768..aa47c1e 100644
--- a/pkgs/stream_transform/lib/src/aggregate_sample.dart
+++ b/pkgs/stream_transform/lib/src/aggregate_sample.dart
@@ -5,41 +5,67 @@
 import 'dart:async';
 
 extension AggregateSample<T> on Stream<T> {
-  /// Aggregates values and emits when it sees a value on [trigger].
+  /// Computes a value based on sequences of events, then emits that value when
+  /// [trigger] emits an event.
   ///
-  /// If there are no pending values when [trigger] emits, the next value on the
-  /// source Stream will be passed to [aggregate] and emitted on the result
-  /// stream immediately. Otherwise, the pending values are released when
-  /// [trigger] emits.
+  /// Every time this stream emits an event, an intermediate value is created
+  /// by combining the new event with the previous intermediate value, or with
+  /// `null` if there is no previous value, using the [aggregate] function.
   ///
+  /// When [trigger] emits value, the returned stream emits the current
+  /// intermediate value and clears it.
+  ///
+  /// If [longPoll] is `false`, if there is no intermediate value when [trigger]
+  /// emits an event, the [onEmpty] function is called with a [Sink] which can
+  /// add events to the returned stream.
+  ///
+  /// If [longPoll] is `true`, and there is no intermediate value when [trigger]
+  /// emits one or more events, then the *next* event from this stream is
+  /// immediately put through [aggregate] and emitted on the returned stream.
+  /// Subsequent events on [trigger] while there have been no events on this
+  /// stream are ignored.
+  /// In that case, [onEmpty] is never used.
+  ///
+  /// The result stream will close as soon as there is a guarantee it will not
+  /// emit any more events. There will not be any more events emitted if:
+  /// - [trigger] is closed and there is no waiting long poll.
+  /// - Or, the source stream is closed and there are no buffered events.
+  ///
+  /// If the source stream is a broadcast stream, the result will be as well.
   /// Errors from the source stream or the trigger are immediately forwarded to
   /// the output.
   Stream<S> aggregateSample<S>(
-      Stream<void> trigger, S Function(T, S?) aggregate) {
+      {required Stream<void> trigger,
+      required S Function(T, S?) aggregate,
+      required bool longPoll,
+      required void Function(Sink<S>) onEmpty}) {
     var controller = isBroadcast
         ? StreamController<S>.broadcast(sync: true)
         : StreamController<S>(sync: true);
 
     S? currentResults;
     var hasCurrentResults = false;
-    var waitingForTrigger = true;
+    var activeLongPoll = false;
     var isTriggerDone = false;
     var isValueDone = false;
     StreamSubscription<T>? valueSub;
     StreamSubscription<void>? triggerSub;
 
-    void emit() {
-      controller.add(currentResults as S);
+    void emit(S results) {
       currentResults = null;
       hasCurrentResults = false;
-      waitingForTrigger = true;
+      controller.add(results);
     }
 
     void onValue(T value) {
       currentResults = aggregate(value, currentResults);
       hasCurrentResults = true;
+      if (!longPoll) return;
 
-      if (!waitingForTrigger) emit();
+      if (activeLongPoll) {
+        activeLongPoll = false;
+        emit(currentResults as S);
+      }
 
       if (isTriggerDone) {
         valueSub!.cancel();
@@ -56,9 +82,13 @@
     }
 
     void onTrigger(_) {
-      waitingForTrigger = false;
-
-      if (hasCurrentResults) emit();
+      if (hasCurrentResults) {
+        emit(currentResults as S);
+      } else if (longPoll) {
+        activeLongPoll = true;
+      } else {
+        onEmpty(controller);
+      }
 
       if (isValueDone) {
         triggerSub!.cancel();
@@ -68,7 +98,7 @@
 
     void onTriggerDone() {
       isTriggerDone = true;
-      if (waitingForTrigger) {
+      if (!activeLongPoll) {
         valueSub?.cancel();
         controller.close();
       }
diff --git a/pkgs/stream_transform/lib/src/async_map.dart b/pkgs/stream_transform/lib/src/async_map.dart
index f602252..c789827 100644
--- a/pkgs/stream_transform/lib/src/async_map.dart
+++ b/pkgs/stream_transform/lib/src/async_map.dart
@@ -67,7 +67,11 @@
     var workFinished = StreamController<void>()
       // Let the first event through.
       ..add(null);
-    return aggregateSample(workFinished.stream, _dropPrevious)
+    return aggregateSample(
+            trigger: workFinished.stream,
+            aggregate: _dropPrevious,
+            longPoll: true,
+            onEmpty: _ignore)
         ._asyncMapThen(convert, workFinished.add);
   }
 
@@ -128,3 +132,4 @@
 }
 
 T _dropPrevious<T>(T event, _) => event;
+void _ignore<T>(Sink<T> sink) {}
diff --git a/pkgs/stream_transform/lib/src/rate_limit.dart b/pkgs/stream_transform/lib/src/rate_limit.dart
index b29f4f5..1d31339 100644
--- a/pkgs/stream_transform/lib/src/rate_limit.dart
+++ b/pkgs/stream_transform/lib/src/rate_limit.dart
@@ -221,18 +221,75 @@
     });
   }
 
-  /// Returns a Stream  which collects values and emits when it sees a value on
-  /// [trigger].
+  /// Buffers the values emitted on this stream and emits them when [trigger]
+  /// emits an event.
   ///
-  /// If there are no pending values when [trigger] emits, the next value on the
-  /// source Stream will immediately flow through. Otherwise, the pending values
-  /// are released when [trigger] emits.
+  /// If [longPoll] is `false`, if there are no buffered values when [trigger]
+  /// emits an empty list is immediately emitted.
+  ///
+  /// If [longPoll] is `true`, and there are no buffered values when [trigger]
+  /// emits one or more events, then the *next* value from this stream is
+  /// immediately emitted on the returned stream as a single element list.
+  /// Subsequent events on [trigger] while there have been no events on this
+  /// stream are ignored.
+  ///
+  /// The result stream will close as soon as there is a guarantee it will not
+  /// emit any more events. There will not be any more events emitted if:
+  /// - [trigger] is closed and there is no waiting long poll.
+  /// - Or, the source stream is closed and previously buffered events have been
+  /// delivered.
   ///
   /// If the source stream is a broadcast stream, the result will be as well.
   /// Errors from the source stream or the trigger are immediately forwarded to
   /// the output.
-  Stream<List<T>> buffer(Stream<void> trigger) =>
-      aggregateSample<List<T>>(trigger, _collect);
+  ///
+  /// See also:
+  /// - [sample] which use a [trigger] stream in the same way, but keeps only
+  /// the most recent source event.
+  Stream<List<T>> buffer(Stream<void> trigger, {bool longPoll = true}) =>
+      aggregateSample(
+          trigger: trigger,
+          aggregate: _collect,
+          longPoll: longPoll,
+          onEmpty: _empty);
+
+  /// Creates a stream which emits the most recent new value from the source
+  /// stream when it sees a value on [trigger].
+  ///
+  /// If [longPoll] is `false`, then an event on [trigger] when there is no
+  /// pending source event will be ignored.
+  /// If [longPoll] is `true` (the default), then an event on [trigger] when
+  /// there is no pending source event will cause the next source event
+  /// to immediately flow to the result stream.
+  ///
+  /// If [longPoll] is `false`, if there is no pending source event when
+  /// [trigger] emits, then the trigger event will be ignored.
+  ///
+  /// If [longPoll] is `true`, and there are no buffered values when [trigger]
+  /// emits one or more events, then the *next* value from this stream is
+  /// immediately emitted on the returned stream as a single element list.
+  /// Subsequent events on [trigger] while there have been no events on this
+  /// stream are ignored.
+  ///
+  /// The result stream will close as soon as there is a guarantee it will not
+  /// emit any more events. There will not be any more events emitted if:
+  /// - [trigger] is closed and there is no waiting long poll.
+  /// - Or, the source stream is closed and any pending source event has been
+  /// delivered.
+  ///
+  /// If the source stream is a broadcast stream, the result will be as well.
+  /// Errors from the source stream or the trigger are immediately forwarded to
+  /// the output.
+  ///
+  /// See also:
+  /// - [buffer] which use [trigger] stream in the same way, but keeps a list of
+  /// pending source events.
+  Stream<T> sample(Stream<void> trigger, {bool longPoll = true}) =>
+      aggregateSample(
+          trigger: trigger,
+          aggregate: _dropPrevious,
+          longPoll: longPoll,
+          onEmpty: _ignore);
 
   /// Aggregates values until the source stream does not emit for [duration],
   /// then emits the aggregated values.
@@ -279,3 +336,5 @@
 
 T _dropPrevious<T>(T element, _) => element;
 List<T> _collect<T>(T event, List<T>? soFar) => (soFar ?? <T>[])..add(event);
+void _empty<T>(Sink<List<T>> sink) => sink.add([]);
+void _ignore<T>(Sink<T> sink) {}
diff --git a/pkgs/stream_transform/test/buffer_test.dart b/pkgs/stream_transform/test/buffer_test.dart
index fc17f4b..1675c28 100644
--- a/pkgs/stream_transform/test/buffer_test.dart
+++ b/pkgs/stream_transform/test/buffer_test.dart
@@ -20,7 +20,8 @@
   late Stream<List<int>> transformed;
   late StreamSubscription<List<int>> subscription;
 
-  void setUpForStreamTypes(String triggerType, String valuesType) {
+  void setUpForStreamTypes(String triggerType, String valuesType,
+      {required bool longPoll}) {
     valuesCanceled = false;
     triggerCanceled = false;
     triggerPaused = false;
@@ -40,7 +41,7 @@
     emittedValues = [];
     errors = [];
     isDone = false;
-    transformed = values.stream.buffer(trigger.stream);
+    transformed = values.stream.buffer(trigger.stream, longPoll: longPoll);
     subscription =
         transformed.listen(emittedValues.add, onError: errors.add, onDone: () {
       isDone = true;
@@ -50,182 +51,227 @@
   for (var triggerType in streamTypes) {
     for (var valuesType in streamTypes) {
       group('Trigger type: [$triggerType], Values type: [$valuesType]', () {
-        setUp(() {
-          setUpForStreamTypes(triggerType, valuesType);
+        group('general behavior', () {
+          setUp(() {
+            setUpForStreamTypes(triggerType, valuesType, longPoll: true);
+          });
+
+          test('does not emit before `trigger`', () async {
+            values.add(1);
+            await Future(() {});
+            expect(emittedValues, isEmpty);
+            trigger.add(null);
+            await Future(() {});
+            expect(emittedValues, [
+              [1]
+            ]);
+          });
+
+          test('groups values between trigger', () async {
+            values
+              ..add(1)
+              ..add(2);
+            await Future(() {});
+            trigger.add(null);
+            values
+              ..add(3)
+              ..add(4);
+            await Future(() {});
+            trigger.add(null);
+            await Future(() {});
+            expect(emittedValues, [
+              [1, 2],
+              [3, 4]
+            ]);
+          });
+
+          test('cancels value subscription when output canceled', () async {
+            expect(valuesCanceled, false);
+            await subscription.cancel();
+            expect(valuesCanceled, true);
+          });
+
+          test('closes when trigger ends', () async {
+            expect(isDone, false);
+            await trigger.close();
+            await Future(() {});
+            expect(isDone, true);
+          });
+
+          test('closes after outputting final values when source closes',
+              () async {
+            expect(isDone, false);
+            values.add(1);
+            await values.close();
+            expect(isDone, false);
+            trigger.add(null);
+            await Future(() {});
+            expect(emittedValues, [
+              [1]
+            ]);
+            expect(isDone, true);
+          });
+
+          test('closes when source closes and there are no buffered', () async {
+            expect(isDone, false);
+            await values.close();
+            await Future(() {});
+            expect(isDone, true);
+          });
+
+          test('forwards errors from trigger', () async {
+            trigger.addError('error');
+            await Future(() {});
+            expect(errors, ['error']);
+          });
+
+          test('forwards errors from values', () async {
+            values.addError('error');
+            await Future(() {});
+            expect(errors, ['error']);
+          });
         });
 
-        test('does not emit before `trigger`', () async {
-          values.add(1);
-          await Future(() {});
-          expect(emittedValues, isEmpty);
-          trigger.add(null);
-          await Future(() {});
-          expect(emittedValues, [
-            [1]
-          ]);
+        group('long polling', () {
+          setUp(() {
+            setUpForStreamTypes(triggerType, valuesType, longPoll: true);
+          });
+
+          test('emits immediately if trigger emits before a value', () async {
+            trigger.add(null);
+            await Future(() {});
+            expect(emittedValues, isEmpty);
+            values.add(1);
+            await Future(() {});
+            expect(emittedValues, [
+              [1]
+            ]);
+          });
+
+          test('two triggers in a row - emit buffere then emit next value',
+              () async {
+            values
+              ..add(1)
+              ..add(2);
+            await Future(() {});
+            trigger
+              ..add(null)
+              ..add(null);
+            await Future(() {});
+            values.add(3);
+            await Future(() {});
+            expect(emittedValues, [
+              [1, 2],
+              [3]
+            ]);
+          });
+
+          test('pre-emptive trigger then trigger after values', () async {
+            trigger.add(null);
+            await Future(() {});
+            values
+              ..add(1)
+              ..add(2);
+            await Future(() {});
+            trigger.add(null);
+            await Future(() {});
+            expect(emittedValues, [
+              [1],
+              [2]
+            ]);
+          });
+
+          test('multiple pre-emptive triggers, only emits first value',
+              () async {
+            trigger
+              ..add(null)
+              ..add(null);
+            await Future(() {});
+            values
+              ..add(1)
+              ..add(2);
+            await Future(() {});
+            expect(emittedValues, [
+              [1]
+            ]);
+          });
+
+          test('closes if there is no waiting long poll when source closes',
+              () async {
+            expect(isDone, false);
+            values.add(1);
+            trigger.add(null);
+            await values.close();
+            await Future(() {});
+            expect(isDone, true);
+          });
+
+          test('waits to emit if there waiting long poll when trigger closes',
+              () async {
+            trigger.add(null);
+            await trigger.close();
+            expect(isDone, false);
+            values.add(1);
+            await Future(() {});
+            expect(emittedValues, [
+              [1]
+            ]);
+            expect(isDone, true);
+          });
         });
 
-        test('emits immediately if trigger emits before a value', () async {
-          trigger.add(null);
-          await Future(() {});
-          expect(emittedValues, isEmpty);
-          values.add(1);
-          await Future(() {});
-          expect(emittedValues, [
-            [1]
-          ]);
-        });
+        group('immediate polling', () {
+          setUp(() {
+            setUpForStreamTypes(triggerType, valuesType, longPoll: false);
+          });
 
-        test('two triggers in a row - emit then emit next value', () async {
-          values
-            ..add(1)
-            ..add(2);
-          await Future(() {});
-          trigger
-            ..add(null)
-            ..add(null);
-          await Future(() {});
-          values.add(3);
-          await Future(() {});
-          expect(emittedValues, [
-            [1, 2],
-            [3]
-          ]);
-        });
+          test('emits empty list before values', () async {
+            trigger.add(null);
+            await Future(() {});
+            expect(emittedValues, [[]]);
+          });
 
-        test('pre-emptive trigger then trigger after values', () async {
-          trigger.add(null);
-          await Future(() {});
-          values
-            ..add(1)
-            ..add(2);
-          await Future(() {});
-          trigger.add(null);
-          await Future(() {});
-          expect(emittedValues, [
-            [1],
-            [2]
-          ]);
-        });
-
-        test('multiple pre-emptive triggers, only emits first value', () async {
-          trigger
-            ..add(null)
-            ..add(null);
-          await Future(() {});
-          values
-            ..add(1)
-            ..add(2);
-          await Future(() {});
-          expect(emittedValues, [
-            [1]
-          ]);
-        });
-
-        test('groups values between trigger', () async {
-          values
-            ..add(1)
-            ..add(2);
-          await Future(() {});
-          trigger.add(null);
-          values
-            ..add(3)
-            ..add(4);
-          await Future(() {});
-          trigger.add(null);
-          await Future(() {});
-          expect(emittedValues, [
-            [1, 2],
-            [3, 4]
-          ]);
-        });
-
-        test('cancels value subscription when output canceled', () async {
-          expect(valuesCanceled, false);
-          await subscription.cancel();
-          expect(valuesCanceled, true);
-        });
-
-        test('closes when trigger ends', () async {
-          expect(isDone, false);
-          await trigger.close();
-          await Future(() {});
-          expect(isDone, true);
-        });
-
-        test('closes after outputting final values when source closes',
-            () async {
-          expect(isDone, false);
-          values.add(1);
-          await values.close();
-          expect(isDone, false);
-          trigger.add(null);
-          await Future(() {});
-          expect(emittedValues, [
-            [1]
-          ]);
-          expect(isDone, true);
-        });
-
-        test('closes if there are no pending values when source closes',
-            () async {
-          expect(isDone, false);
-          values.add(1);
-          trigger.add(null);
-          await values.close();
-          await Future(() {});
-          expect(isDone, true);
-        });
-
-        test('waits to emit if there is a pending trigger when trigger closes',
-            () async {
-          trigger.add(null);
-          await trigger.close();
-          expect(isDone, false);
-          values.add(1);
-          await Future(() {});
-          expect(emittedValues, [
-            [1]
-          ]);
-          expect(isDone, true);
-        });
-
-        test('forwards errors from trigger', () async {
-          trigger.addError('error');
-          await Future(() {});
-          expect(errors, ['error']);
-        });
-
-        test('forwards errors from values', () async {
-          values.addError('error');
-          await Future(() {});
-          expect(errors, ['error']);
+          test('emits empty list after emitting values', () async {
+            values
+              ..add(1)
+              ..add(2);
+            await Future(() {});
+            trigger
+              ..add(null)
+              ..add(null);
+            await Future(() {});
+            expect(emittedValues, [
+              [1, 2],
+              []
+            ]);
+          });
         });
       });
     }
   }
 
   test('always cancels trigger if values is singlesubscription', () async {
-    setUpForStreamTypes('broadcast', 'single subscription');
+    setUpForStreamTypes('broadcast', 'single subscription', longPoll: true);
     expect(triggerCanceled, false);
     await subscription.cancel();
     expect(triggerCanceled, true);
 
-    setUpForStreamTypes('single subscription', 'single subscription');
+    setUpForStreamTypes('single subscription', 'single subscription',
+        longPoll: true);
     expect(triggerCanceled, false);
     await subscription.cancel();
     expect(triggerCanceled, true);
   });
 
   test('cancels trigger if trigger is broadcast', () async {
-    setUpForStreamTypes('broadcast', 'broadcast');
+    setUpForStreamTypes('broadcast', 'broadcast', longPoll: true);
     expect(triggerCanceled, false);
     await subscription.cancel();
     expect(triggerCanceled, true);
   });
 
   test('pauses single subscription trigger for broadcast values', () async {
-    setUpForStreamTypes('single subscription', 'broadcast');
+    setUpForStreamTypes('single subscription', 'broadcast', longPoll: true);
     expect(triggerCanceled, false);
     expect(triggerPaused, false);
     await subscription.cancel();
@@ -235,7 +281,7 @@
 
   for (var triggerType in streamTypes) {
     test('cancel and relisten with [$triggerType] trigger', () async {
-      setUpForStreamTypes(triggerType, 'broadcast');
+      setUpForStreamTypes(triggerType, 'broadcast', longPoll: true);
       values.add(1);
       trigger.add(null);
       await Future(() {});
diff --git a/pkgs/stream_transform/test/sample_test.dart b/pkgs/stream_transform/test/sample_test.dart
new file mode 100644
index 0000000..66ca09d
--- /dev/null
+++ b/pkgs/stream_transform/test/sample_test.dart
@@ -0,0 +1,291 @@
+// Copyright (c) 2022, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+  late StreamController<void> trigger;
+  late StreamController<int> values;
+  late List<int> emittedValues;
+  late bool valuesCanceled;
+  late bool triggerCanceled;
+  late bool triggerPaused;
+  late bool isDone;
+  late List<String> errors;
+  late Stream<int> transformed;
+  late StreamSubscription<int> subscription;
+
+  void setUpForStreamTypes(String triggerType, String valuesType,
+      {required bool longPoll}) {
+    valuesCanceled = false;
+    triggerCanceled = false;
+    triggerPaused = false;
+    trigger = createController(triggerType)
+      ..onCancel = () {
+        triggerCanceled = true;
+      };
+    if (triggerType == 'single subscription') {
+      trigger.onPause = () {
+        triggerPaused = true;
+      };
+    }
+    values = createController(valuesType)
+      ..onCancel = () {
+        valuesCanceled = true;
+      };
+    emittedValues = [];
+    errors = [];
+    isDone = false;
+    transformed = values.stream.sample(trigger.stream, longPoll: longPoll);
+    subscription =
+        transformed.listen(emittedValues.add, onError: errors.add, onDone: () {
+      isDone = true;
+    });
+  }
+
+  for (var triggerType in streamTypes) {
+    for (var valuesType in streamTypes) {
+      group('Trigger type: [$triggerType], Values type: [$valuesType]', () {
+        group('general behavior', () {
+          setUp(() {
+            setUpForStreamTypes(triggerType, valuesType, longPoll: true);
+          });
+
+          test('does not emit before `trigger`', () async {
+            values.add(1);
+            await Future(() {});
+            expect(emittedValues, isEmpty);
+            trigger.add(null);
+            await Future(() {});
+            expect(emittedValues, [1]);
+          });
+
+          test('keeps most recent event between triggers', () async {
+            values
+              ..add(1)
+              ..add(2);
+            await Future(() {});
+            trigger.add(null);
+            values
+              ..add(3)
+              ..add(4);
+            await Future(() {});
+            trigger.add(null);
+            await Future(() {});
+            expect(emittedValues, [2, 4]);
+          });
+
+          test('cancels value subscription when output canceled', () async {
+            expect(valuesCanceled, false);
+            await subscription.cancel();
+            expect(valuesCanceled, true);
+          });
+
+          test('closes when trigger ends', () async {
+            expect(isDone, false);
+            await trigger.close();
+            await Future(() {});
+            expect(isDone, true);
+          });
+
+          test('closes after outputting final values when source closes',
+              () async {
+            expect(isDone, false);
+            values.add(1);
+            await values.close();
+            expect(isDone, false);
+            trigger.add(null);
+            await Future(() {});
+            expect(emittedValues, [1]);
+            expect(isDone, true);
+          });
+
+          test('closes when source closes and there is no pending', () async {
+            expect(isDone, false);
+            await values.close();
+            await Future(() {});
+            expect(isDone, true);
+          });
+
+          test('forwards errors from trigger', () async {
+            trigger.addError('error');
+            await Future(() {});
+            expect(errors, ['error']);
+          });
+
+          test('forwards errors from values', () async {
+            values.addError('error');
+            await Future(() {});
+            expect(errors, ['error']);
+          });
+        });
+
+        group('long polling', () {
+          setUp(() {
+            setUpForStreamTypes(triggerType, valuesType, longPoll: true);
+          });
+
+          test('emits immediately if trigger emits before a value', () async {
+            trigger.add(null);
+            await Future(() {});
+            expect(emittedValues, isEmpty);
+            values.add(1);
+            await Future(() {});
+            expect(emittedValues, [1]);
+          });
+
+          test('two triggers in a row - emit buffere then emit next value',
+              () async {
+            values
+              ..add(1)
+              ..add(2);
+            await Future(() {});
+            trigger
+              ..add(null)
+              ..add(null);
+            await Future(() {});
+            values.add(3);
+            await Future(() {});
+            expect(emittedValues, [2, 3]);
+          });
+
+          test('pre-emptive trigger then trigger after values', () async {
+            trigger.add(null);
+            await Future(() {});
+            values
+              ..add(1)
+              ..add(2);
+            await Future(() {});
+            trigger.add(null);
+            await Future(() {});
+            expect(emittedValues, [1, 2]);
+          });
+
+          test('multiple pre-emptive triggers, only emits first value',
+              () async {
+            trigger
+              ..add(null)
+              ..add(null);
+            await Future(() {});
+            values
+              ..add(1)
+              ..add(2);
+            await Future(() {});
+            expect(emittedValues, [1]);
+          });
+
+          test('closes if there is no waiting long poll when source closes',
+              () async {
+            expect(isDone, false);
+            values.add(1);
+            trigger.add(null);
+            await values.close();
+            await Future(() {});
+            expect(isDone, true);
+          });
+
+          test('waits to emit if there waiting long poll when trigger closes',
+              () async {
+            trigger.add(null);
+            await trigger.close();
+            expect(isDone, false);
+            values.add(1);
+            await Future(() {});
+            expect(emittedValues, [1]);
+            expect(isDone, true);
+          });
+        });
+
+        group('immediate polling', () {
+          setUp(() {
+            setUpForStreamTypes(triggerType, valuesType, longPoll: false);
+          });
+
+          test('ignores trigger before values', () async {
+            trigger.add(null);
+            await Future(() {});
+            values
+              ..add(1)
+              ..add(2);
+            await Future(() {});
+            trigger.add(null);
+            await Future(() {});
+            expect(emittedValues, [2]);
+          });
+
+          test('ignores trigger if no pending values', () async {
+            values
+              ..add(1)
+              ..add(2);
+            await Future(() {});
+            trigger
+              ..add(null)
+              ..add(null);
+            await Future(() {});
+            values
+              ..add(3)
+              ..add(4);
+            await Future(() {});
+            trigger.add(null);
+            await Future(() {});
+            expect(emittedValues, [2, 4]);
+          });
+        });
+      });
+    }
+  }
+
+  test('always cancels trigger if values is singlesubscription', () async {
+    setUpForStreamTypes('broadcast', 'single subscription', longPoll: true);
+    expect(triggerCanceled, false);
+    await subscription.cancel();
+    expect(triggerCanceled, true);
+
+    setUpForStreamTypes('single subscription', 'single subscription',
+        longPoll: true);
+    expect(triggerCanceled, false);
+    await subscription.cancel();
+    expect(triggerCanceled, true);
+  });
+
+  test('cancels trigger if trigger is broadcast', () async {
+    setUpForStreamTypes('broadcast', 'broadcast', longPoll: true);
+    expect(triggerCanceled, false);
+    await subscription.cancel();
+    expect(triggerCanceled, true);
+  });
+
+  test('pauses single subscription trigger for broadcast values', () async {
+    setUpForStreamTypes('single subscription', 'broadcast', longPoll: true);
+    expect(triggerCanceled, false);
+    expect(triggerPaused, false);
+    await subscription.cancel();
+    expect(triggerCanceled, false);
+    expect(triggerPaused, true);
+  });
+
+  for (var triggerType in streamTypes) {
+    test('cancel and relisten with [$triggerType] trigger', () async {
+      setUpForStreamTypes(triggerType, 'broadcast', longPoll: true);
+      values.add(1);
+      trigger.add(null);
+      await Future(() {});
+      expect(emittedValues, [1]);
+      await subscription.cancel();
+      values.add(2);
+      trigger.add(null);
+      await Future(() {});
+      subscription = transformed.listen(emittedValues.add);
+      values.add(3);
+      trigger.add(null);
+      await Future(() {});
+      expect(emittedValues, [1, 3]);
+    });
+  }
+}