Implement `sample`, add `longPoll` to `buffer` (dart-lang/stream_transform#147)
Closes dart-lang/stream_transform#145
Add `longPoll` support to `aggregateSample`. This method is a package
private implementation detail.
- Refactor to named arguments to make changes smoother.
- Add `longPoll` boolean argument and conditional behavior in each
stream callback.
- Add `onEmpty` callback to let the caller decide whether to emit some
default (empty list in the `buffer` case) or not emit at all (in the
`sample` case) for triggers which do no set up a long poll and have
no prior aggregated events.
- Switch `waitingForTrigger` to the inverted condition `activeLongPoll`
since it is a closer match to how the behavior is described in the docs.
Add `longPoll` argument to `buffer` and update the doc to describe the
conditional behavior. Add a section in the docs discussing the end of
stream behavior.
Add a `sample` method which is similar to `buffer`. Cross reference each
method in the docs.
Update the comparison table with Rx. Show that `buffer` is equivalent to
`buffer(longPoll: false)`, that `sample` is `sampel(longPoll: false)`. I
cannot find an equivalent Rx method to the `longPoll: true` behavior.
diff --git a/pkgs/stream_transform/README.md b/pkgs/stream_transform/README.md
index d3c71c6..ee0ac2e 100644
--- a/pkgs/stream_transform/README.md
+++ b/pkgs/stream_transform/README.md
@@ -89,13 +89,13 @@
Rx Operator Category | variation | `stream_transform`
------------------------- | ------------------------------------------------------ | ------------------
-[`sample`][rx_sample] | `sample/throttleLast(Duration)` | No equivalent
+[`sample`][rx_sample] | `sample/throttleLast(Duration)` | `sample(Stream.periodic(Duration), longPoll: false)`
​ | `throttleFirst(Duration)` | [`throttle`][throttle]
-​ | `sample(Observable)` | No equivalent
+​ | `sample(Observable)` | `sample(trigger, longPoll: false)`
[`debounce`][rx_debounce] | `debounce/throttleWithTimeout(Duration)` | [`debounce`][debounce]
​ | `debounce(Observable)` | No equivalent
[`buffer`][rx_buffer] | `buffer(boundary)`, `bufferWithTime`,`bufferWithCount` | No equivalent
-​ | `buffer(boundaryClosingSelector)` | No equivalent[^1]
+​ | `buffer(boundaryClosingSelector)` | `buffer(trigger, longPoll: false)`
RxJs extensions | [`audit(callback)`][rxjs_audit] | No equivalent
​ | [`auditTime(Duration)`][rxjs_auditTime] | [`audit`][audit]
​ | [`exhaustMap`][rxjs_exhaustMap] | No equivalent
@@ -103,7 +103,8 @@
​ | `throttleTime(leading: false, trailing: true)` | No equivalent
No equivalent? | | [`asyncMapBuffer`][asyncMapBuffer]
​ | | [`asyncMapSample`][asyncMapSample]
-​ | | [`buffer`][buffer][^1]
+​ | | [`buffer`][buffer]
+​ | | [`sample`][sample]
​ | | [`debounceBuffer`][debounceBuffer]
​ | | `debounce(leading: true, trailing: false)`
​ | | `debounce(leading: true, trailing: true)`
@@ -119,15 +120,10 @@
[asyncMapSample]:https://pub.dev/documentation/stream_transform/latest/stream_transform/AsyncMap/asyncMapSample.html
[audit]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/audit.html
[buffer]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/buffer.html
+[sample]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/sample.html
[debounceBuffer]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/debounceBuffer.html
[debounce]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/debounce.html
[throttle]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/throttle.html
-[^1]: `stream_transform` `buffer` is closest to
- [`buffer(bufferClosingSelector)`][rx_buffer], except where the trigger
- emits while no events are buffered. ReactiveX will immediately emit and
- empty list, while `stream_transform` will wait and emit a single element
- list when the next event occurs. You can think of it like
- `stream_transform` implemeting the "long polling" version of `buffer`.
# Getting a `StreamTransformer` instance
diff --git a/pkgs/stream_transform/lib/src/aggregate_sample.dart b/pkgs/stream_transform/lib/src/aggregate_sample.dart
index 0eea768..aa47c1e 100644
--- a/pkgs/stream_transform/lib/src/aggregate_sample.dart
+++ b/pkgs/stream_transform/lib/src/aggregate_sample.dart
@@ -5,41 +5,67 @@
import 'dart:async';
extension AggregateSample<T> on Stream<T> {
- /// Aggregates values and emits when it sees a value on [trigger].
+ /// Computes a value based on sequences of events, then emits that value when
+ /// [trigger] emits an event.
///
- /// If there are no pending values when [trigger] emits, the next value on the
- /// source Stream will be passed to [aggregate] and emitted on the result
- /// stream immediately. Otherwise, the pending values are released when
- /// [trigger] emits.
+ /// Every time this stream emits an event, an intermediate value is created
+ /// by combining the new event with the previous intermediate value, or with
+ /// `null` if there is no previous value, using the [aggregate] function.
///
+ /// When [trigger] emits value, the returned stream emits the current
+ /// intermediate value and clears it.
+ ///
+ /// If [longPoll] is `false`, if there is no intermediate value when [trigger]
+ /// emits an event, the [onEmpty] function is called with a [Sink] which can
+ /// add events to the returned stream.
+ ///
+ /// If [longPoll] is `true`, and there is no intermediate value when [trigger]
+ /// emits one or more events, then the *next* event from this stream is
+ /// immediately put through [aggregate] and emitted on the returned stream.
+ /// Subsequent events on [trigger] while there have been no events on this
+ /// stream are ignored.
+ /// In that case, [onEmpty] is never used.
+ ///
+ /// The result stream will close as soon as there is a guarantee it will not
+ /// emit any more events. There will not be any more events emitted if:
+ /// - [trigger] is closed and there is no waiting long poll.
+ /// - Or, the source stream is closed and there are no buffered events.
+ ///
+ /// If the source stream is a broadcast stream, the result will be as well.
/// Errors from the source stream or the trigger are immediately forwarded to
/// the output.
Stream<S> aggregateSample<S>(
- Stream<void> trigger, S Function(T, S?) aggregate) {
+ {required Stream<void> trigger,
+ required S Function(T, S?) aggregate,
+ required bool longPoll,
+ required void Function(Sink<S>) onEmpty}) {
var controller = isBroadcast
? StreamController<S>.broadcast(sync: true)
: StreamController<S>(sync: true);
S? currentResults;
var hasCurrentResults = false;
- var waitingForTrigger = true;
+ var activeLongPoll = false;
var isTriggerDone = false;
var isValueDone = false;
StreamSubscription<T>? valueSub;
StreamSubscription<void>? triggerSub;
- void emit() {
- controller.add(currentResults as S);
+ void emit(S results) {
currentResults = null;
hasCurrentResults = false;
- waitingForTrigger = true;
+ controller.add(results);
}
void onValue(T value) {
currentResults = aggregate(value, currentResults);
hasCurrentResults = true;
+ if (!longPoll) return;
- if (!waitingForTrigger) emit();
+ if (activeLongPoll) {
+ activeLongPoll = false;
+ emit(currentResults as S);
+ }
if (isTriggerDone) {
valueSub!.cancel();
@@ -56,9 +82,13 @@
}
void onTrigger(_) {
- waitingForTrigger = false;
-
- if (hasCurrentResults) emit();
+ if (hasCurrentResults) {
+ emit(currentResults as S);
+ } else if (longPoll) {
+ activeLongPoll = true;
+ } else {
+ onEmpty(controller);
+ }
if (isValueDone) {
triggerSub!.cancel();
@@ -68,7 +98,7 @@
void onTriggerDone() {
isTriggerDone = true;
- if (waitingForTrigger) {
+ if (!activeLongPoll) {
valueSub?.cancel();
controller.close();
}
diff --git a/pkgs/stream_transform/lib/src/async_map.dart b/pkgs/stream_transform/lib/src/async_map.dart
index f602252..c789827 100644
--- a/pkgs/stream_transform/lib/src/async_map.dart
+++ b/pkgs/stream_transform/lib/src/async_map.dart
@@ -67,7 +67,11 @@
var workFinished = StreamController<void>()
// Let the first event through.
..add(null);
- return aggregateSample(workFinished.stream, _dropPrevious)
+ return aggregateSample(
+ trigger: workFinished.stream,
+ aggregate: _dropPrevious,
+ longPoll: true,
+ onEmpty: _ignore)
._asyncMapThen(convert, workFinished.add);
}
@@ -128,3 +132,4 @@
}
T _dropPrevious<T>(T event, _) => event;
+void _ignore<T>(Sink<T> sink) {}
diff --git a/pkgs/stream_transform/lib/src/rate_limit.dart b/pkgs/stream_transform/lib/src/rate_limit.dart
index b29f4f5..1d31339 100644
--- a/pkgs/stream_transform/lib/src/rate_limit.dart
+++ b/pkgs/stream_transform/lib/src/rate_limit.dart
@@ -221,18 +221,75 @@
});
}
- /// Returns a Stream which collects values and emits when it sees a value on
- /// [trigger].
+ /// Buffers the values emitted on this stream and emits them when [trigger]
+ /// emits an event.
///
- /// If there are no pending values when [trigger] emits, the next value on the
- /// source Stream will immediately flow through. Otherwise, the pending values
- /// are released when [trigger] emits.
+ /// If [longPoll] is `false`, if there are no buffered values when [trigger]
+ /// emits an empty list is immediately emitted.
+ ///
+ /// If [longPoll] is `true`, and there are no buffered values when [trigger]
+ /// emits one or more events, then the *next* value from this stream is
+ /// immediately emitted on the returned stream as a single element list.
+ /// Subsequent events on [trigger] while there have been no events on this
+ /// stream are ignored.
+ ///
+ /// The result stream will close as soon as there is a guarantee it will not
+ /// emit any more events. There will not be any more events emitted if:
+ /// - [trigger] is closed and there is no waiting long poll.
+ /// - Or, the source stream is closed and previously buffered events have been
+ /// delivered.
///
/// If the source stream is a broadcast stream, the result will be as well.
/// Errors from the source stream or the trigger are immediately forwarded to
/// the output.
- Stream<List<T>> buffer(Stream<void> trigger) =>
- aggregateSample<List<T>>(trigger, _collect);
+ ///
+ /// See also:
+ /// - [sample] which use a [trigger] stream in the same way, but keeps only
+ /// the most recent source event.
+ Stream<List<T>> buffer(Stream<void> trigger, {bool longPoll = true}) =>
+ aggregateSample(
+ trigger: trigger,
+ aggregate: _collect,
+ longPoll: longPoll,
+ onEmpty: _empty);
+
+ /// Creates a stream which emits the most recent new value from the source
+ /// stream when it sees a value on [trigger].
+ ///
+ /// If [longPoll] is `false`, then an event on [trigger] when there is no
+ /// pending source event will be ignored.
+ /// If [longPoll] is `true` (the default), then an event on [trigger] when
+ /// there is no pending source event will cause the next source event
+ /// to immediately flow to the result stream.
+ ///
+ /// If [longPoll] is `false`, if there is no pending source event when
+ /// [trigger] emits, then the trigger event will be ignored.
+ ///
+ /// If [longPoll] is `true`, and there are no buffered values when [trigger]
+ /// emits one or more events, then the *next* value from this stream is
+ /// immediately emitted on the returned stream as a single element list.
+ /// Subsequent events on [trigger] while there have been no events on this
+ /// stream are ignored.
+ ///
+ /// The result stream will close as soon as there is a guarantee it will not
+ /// emit any more events. There will not be any more events emitted if:
+ /// - [trigger] is closed and there is no waiting long poll.
+ /// - Or, the source stream is closed and any pending source event has been
+ /// delivered.
+ ///
+ /// If the source stream is a broadcast stream, the result will be as well.
+ /// Errors from the source stream or the trigger are immediately forwarded to
+ /// the output.
+ ///
+ /// See also:
+ /// - [buffer] which use [trigger] stream in the same way, but keeps a list of
+ /// pending source events.
+ Stream<T> sample(Stream<void> trigger, {bool longPoll = true}) =>
+ aggregateSample(
+ trigger: trigger,
+ aggregate: _dropPrevious,
+ longPoll: longPoll,
+ onEmpty: _ignore);
/// Aggregates values until the source stream does not emit for [duration],
/// then emits the aggregated values.
@@ -279,3 +336,5 @@
T _dropPrevious<T>(T element, _) => element;
List<T> _collect<T>(T event, List<T>? soFar) => (soFar ?? <T>[])..add(event);
+void _empty<T>(Sink<List<T>> sink) => sink.add([]);
+void _ignore<T>(Sink<T> sink) {}
diff --git a/pkgs/stream_transform/test/buffer_test.dart b/pkgs/stream_transform/test/buffer_test.dart
index fc17f4b..1675c28 100644
--- a/pkgs/stream_transform/test/buffer_test.dart
+++ b/pkgs/stream_transform/test/buffer_test.dart
@@ -20,7 +20,8 @@
late Stream<List<int>> transformed;
late StreamSubscription<List<int>> subscription;
- void setUpForStreamTypes(String triggerType, String valuesType) {
+ void setUpForStreamTypes(String triggerType, String valuesType,
+ {required bool longPoll}) {
valuesCanceled = false;
triggerCanceled = false;
triggerPaused = false;
@@ -40,7 +41,7 @@
emittedValues = [];
errors = [];
isDone = false;
- transformed = values.stream.buffer(trigger.stream);
+ transformed = values.stream.buffer(trigger.stream, longPoll: longPoll);
subscription =
transformed.listen(emittedValues.add, onError: errors.add, onDone: () {
isDone = true;
@@ -50,182 +51,227 @@
for (var triggerType in streamTypes) {
for (var valuesType in streamTypes) {
group('Trigger type: [$triggerType], Values type: [$valuesType]', () {
- setUp(() {
- setUpForStreamTypes(triggerType, valuesType);
+ group('general behavior', () {
+ setUp(() {
+ setUpForStreamTypes(triggerType, valuesType, longPoll: true);
+ });
+
+ test('does not emit before `trigger`', () async {
+ values.add(1);
+ await Future(() {});
+ expect(emittedValues, isEmpty);
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [
+ [1]
+ ]);
+ });
+
+ test('groups values between trigger', () async {
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ trigger.add(null);
+ values
+ ..add(3)
+ ..add(4);
+ await Future(() {});
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [
+ [1, 2],
+ [3, 4]
+ ]);
+ });
+
+ test('cancels value subscription when output canceled', () async {
+ expect(valuesCanceled, false);
+ await subscription.cancel();
+ expect(valuesCanceled, true);
+ });
+
+ test('closes when trigger ends', () async {
+ expect(isDone, false);
+ await trigger.close();
+ await Future(() {});
+ expect(isDone, true);
+ });
+
+ test('closes after outputting final values when source closes',
+ () async {
+ expect(isDone, false);
+ values.add(1);
+ await values.close();
+ expect(isDone, false);
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [
+ [1]
+ ]);
+ expect(isDone, true);
+ });
+
+ test('closes when source closes and there are no buffered', () async {
+ expect(isDone, false);
+ await values.close();
+ await Future(() {});
+ expect(isDone, true);
+ });
+
+ test('forwards errors from trigger', () async {
+ trigger.addError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+
+ test('forwards errors from values', () async {
+ values.addError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
});
- test('does not emit before `trigger`', () async {
- values.add(1);
- await Future(() {});
- expect(emittedValues, isEmpty);
- trigger.add(null);
- await Future(() {});
- expect(emittedValues, [
- [1]
- ]);
+ group('long polling', () {
+ setUp(() {
+ setUpForStreamTypes(triggerType, valuesType, longPoll: true);
+ });
+
+ test('emits immediately if trigger emits before a value', () async {
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, isEmpty);
+ values.add(1);
+ await Future(() {});
+ expect(emittedValues, [
+ [1]
+ ]);
+ });
+
+ test('two triggers in a row - emit buffere then emit next value',
+ () async {
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ trigger
+ ..add(null)
+ ..add(null);
+ await Future(() {});
+ values.add(3);
+ await Future(() {});
+ expect(emittedValues, [
+ [1, 2],
+ [3]
+ ]);
+ });
+
+ test('pre-emptive trigger then trigger after values', () async {
+ trigger.add(null);
+ await Future(() {});
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [
+ [1],
+ [2]
+ ]);
+ });
+
+ test('multiple pre-emptive triggers, only emits first value',
+ () async {
+ trigger
+ ..add(null)
+ ..add(null);
+ await Future(() {});
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ expect(emittedValues, [
+ [1]
+ ]);
+ });
+
+ test('closes if there is no waiting long poll when source closes',
+ () async {
+ expect(isDone, false);
+ values.add(1);
+ trigger.add(null);
+ await values.close();
+ await Future(() {});
+ expect(isDone, true);
+ });
+
+ test('waits to emit if there waiting long poll when trigger closes',
+ () async {
+ trigger.add(null);
+ await trigger.close();
+ expect(isDone, false);
+ values.add(1);
+ await Future(() {});
+ expect(emittedValues, [
+ [1]
+ ]);
+ expect(isDone, true);
+ });
});
- test('emits immediately if trigger emits before a value', () async {
- trigger.add(null);
- await Future(() {});
- expect(emittedValues, isEmpty);
- values.add(1);
- await Future(() {});
- expect(emittedValues, [
- [1]
- ]);
- });
+ group('immediate polling', () {
+ setUp(() {
+ setUpForStreamTypes(triggerType, valuesType, longPoll: false);
+ });
- test('two triggers in a row - emit then emit next value', () async {
- values
- ..add(1)
- ..add(2);
- await Future(() {});
- trigger
- ..add(null)
- ..add(null);
- await Future(() {});
- values.add(3);
- await Future(() {});
- expect(emittedValues, [
- [1, 2],
- [3]
- ]);
- });
+ test('emits empty list before values', () async {
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [[]]);
+ });
- test('pre-emptive trigger then trigger after values', () async {
- trigger.add(null);
- await Future(() {});
- values
- ..add(1)
- ..add(2);
- await Future(() {});
- trigger.add(null);
- await Future(() {});
- expect(emittedValues, [
- [1],
- [2]
- ]);
- });
-
- test('multiple pre-emptive triggers, only emits first value', () async {
- trigger
- ..add(null)
- ..add(null);
- await Future(() {});
- values
- ..add(1)
- ..add(2);
- await Future(() {});
- expect(emittedValues, [
- [1]
- ]);
- });
-
- test('groups values between trigger', () async {
- values
- ..add(1)
- ..add(2);
- await Future(() {});
- trigger.add(null);
- values
- ..add(3)
- ..add(4);
- await Future(() {});
- trigger.add(null);
- await Future(() {});
- expect(emittedValues, [
- [1, 2],
- [3, 4]
- ]);
- });
-
- test('cancels value subscription when output canceled', () async {
- expect(valuesCanceled, false);
- await subscription.cancel();
- expect(valuesCanceled, true);
- });
-
- test('closes when trigger ends', () async {
- expect(isDone, false);
- await trigger.close();
- await Future(() {});
- expect(isDone, true);
- });
-
- test('closes after outputting final values when source closes',
- () async {
- expect(isDone, false);
- values.add(1);
- await values.close();
- expect(isDone, false);
- trigger.add(null);
- await Future(() {});
- expect(emittedValues, [
- [1]
- ]);
- expect(isDone, true);
- });
-
- test('closes if there are no pending values when source closes',
- () async {
- expect(isDone, false);
- values.add(1);
- trigger.add(null);
- await values.close();
- await Future(() {});
- expect(isDone, true);
- });
-
- test('waits to emit if there is a pending trigger when trigger closes',
- () async {
- trigger.add(null);
- await trigger.close();
- expect(isDone, false);
- values.add(1);
- await Future(() {});
- expect(emittedValues, [
- [1]
- ]);
- expect(isDone, true);
- });
-
- test('forwards errors from trigger', () async {
- trigger.addError('error');
- await Future(() {});
- expect(errors, ['error']);
- });
-
- test('forwards errors from values', () async {
- values.addError('error');
- await Future(() {});
- expect(errors, ['error']);
+ test('emits empty list after emitting values', () async {
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ trigger
+ ..add(null)
+ ..add(null);
+ await Future(() {});
+ expect(emittedValues, [
+ [1, 2],
+ []
+ ]);
+ });
});
});
}
}
test('always cancels trigger if values is singlesubscription', () async {
- setUpForStreamTypes('broadcast', 'single subscription');
+ setUpForStreamTypes('broadcast', 'single subscription', longPoll: true);
expect(triggerCanceled, false);
await subscription.cancel();
expect(triggerCanceled, true);
- setUpForStreamTypes('single subscription', 'single subscription');
+ setUpForStreamTypes('single subscription', 'single subscription',
+ longPoll: true);
expect(triggerCanceled, false);
await subscription.cancel();
expect(triggerCanceled, true);
});
test('cancels trigger if trigger is broadcast', () async {
- setUpForStreamTypes('broadcast', 'broadcast');
+ setUpForStreamTypes('broadcast', 'broadcast', longPoll: true);
expect(triggerCanceled, false);
await subscription.cancel();
expect(triggerCanceled, true);
});
test('pauses single subscription trigger for broadcast values', () async {
- setUpForStreamTypes('single subscription', 'broadcast');
+ setUpForStreamTypes('single subscription', 'broadcast', longPoll: true);
expect(triggerCanceled, false);
expect(triggerPaused, false);
await subscription.cancel();
@@ -235,7 +281,7 @@
for (var triggerType in streamTypes) {
test('cancel and relisten with [$triggerType] trigger', () async {
- setUpForStreamTypes(triggerType, 'broadcast');
+ setUpForStreamTypes(triggerType, 'broadcast', longPoll: true);
values.add(1);
trigger.add(null);
await Future(() {});
diff --git a/pkgs/stream_transform/test/sample_test.dart b/pkgs/stream_transform/test/sample_test.dart
new file mode 100644
index 0000000..66ca09d
--- /dev/null
+++ b/pkgs/stream_transform/test/sample_test.dart
@@ -0,0 +1,291 @@
+// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ late StreamController<void> trigger;
+ late StreamController<int> values;
+ late List<int> emittedValues;
+ late bool valuesCanceled;
+ late bool triggerCanceled;
+ late bool triggerPaused;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<int> transformed;
+ late StreamSubscription<int> subscription;
+
+ void setUpForStreamTypes(String triggerType, String valuesType,
+ {required bool longPoll}) {
+ valuesCanceled = false;
+ triggerCanceled = false;
+ triggerPaused = false;
+ trigger = createController(triggerType)
+ ..onCancel = () {
+ triggerCanceled = true;
+ };
+ if (triggerType == 'single subscription') {
+ trigger.onPause = () {
+ triggerPaused = true;
+ };
+ }
+ values = createController(valuesType)
+ ..onCancel = () {
+ valuesCanceled = true;
+ };
+ emittedValues = [];
+ errors = [];
+ isDone = false;
+ transformed = values.stream.sample(trigger.stream, longPoll: longPoll);
+ subscription =
+ transformed.listen(emittedValues.add, onError: errors.add, onDone: () {
+ isDone = true;
+ });
+ }
+
+ for (var triggerType in streamTypes) {
+ for (var valuesType in streamTypes) {
+ group('Trigger type: [$triggerType], Values type: [$valuesType]', () {
+ group('general behavior', () {
+ setUp(() {
+ setUpForStreamTypes(triggerType, valuesType, longPoll: true);
+ });
+
+ test('does not emit before `trigger`', () async {
+ values.add(1);
+ await Future(() {});
+ expect(emittedValues, isEmpty);
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [1]);
+ });
+
+ test('keeps most recent event between triggers', () async {
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ trigger.add(null);
+ values
+ ..add(3)
+ ..add(4);
+ await Future(() {});
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [2, 4]);
+ });
+
+ test('cancels value subscription when output canceled', () async {
+ expect(valuesCanceled, false);
+ await subscription.cancel();
+ expect(valuesCanceled, true);
+ });
+
+ test('closes when trigger ends', () async {
+ expect(isDone, false);
+ await trigger.close();
+ await Future(() {});
+ expect(isDone, true);
+ });
+
+ test('closes after outputting final values when source closes',
+ () async {
+ expect(isDone, false);
+ values.add(1);
+ await values.close();
+ expect(isDone, false);
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [1]);
+ expect(isDone, true);
+ });
+
+ test('closes when source closes and there is no pending', () async {
+ expect(isDone, false);
+ await values.close();
+ await Future(() {});
+ expect(isDone, true);
+ });
+
+ test('forwards errors from trigger', () async {
+ trigger.addError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+
+ test('forwards errors from values', () async {
+ values.addError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+ });
+
+ group('long polling', () {
+ setUp(() {
+ setUpForStreamTypes(triggerType, valuesType, longPoll: true);
+ });
+
+ test('emits immediately if trigger emits before a value', () async {
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, isEmpty);
+ values.add(1);
+ await Future(() {});
+ expect(emittedValues, [1]);
+ });
+
+ test('two triggers in a row - emit buffere then emit next value',
+ () async {
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ trigger
+ ..add(null)
+ ..add(null);
+ await Future(() {});
+ values.add(3);
+ await Future(() {});
+ expect(emittedValues, [2, 3]);
+ });
+
+ test('pre-emptive trigger then trigger after values', () async {
+ trigger.add(null);
+ await Future(() {});
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [1, 2]);
+ });
+
+ test('multiple pre-emptive triggers, only emits first value',
+ () async {
+ trigger
+ ..add(null)
+ ..add(null);
+ await Future(() {});
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ expect(emittedValues, [1]);
+ });
+
+ test('closes if there is no waiting long poll when source closes',
+ () async {
+ expect(isDone, false);
+ values.add(1);
+ trigger.add(null);
+ await values.close();
+ await Future(() {});
+ expect(isDone, true);
+ });
+
+ test('waits to emit if there waiting long poll when trigger closes',
+ () async {
+ trigger.add(null);
+ await trigger.close();
+ expect(isDone, false);
+ values.add(1);
+ await Future(() {});
+ expect(emittedValues, [1]);
+ expect(isDone, true);
+ });
+ });
+
+ group('immediate polling', () {
+ setUp(() {
+ setUpForStreamTypes(triggerType, valuesType, longPoll: false);
+ });
+
+ test('ignores trigger before values', () async {
+ trigger.add(null);
+ await Future(() {});
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [2]);
+ });
+
+ test('ignores trigger if no pending values', () async {
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ trigger
+ ..add(null)
+ ..add(null);
+ await Future(() {});
+ values
+ ..add(3)
+ ..add(4);
+ await Future(() {});
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [2, 4]);
+ });
+ });
+ });
+ }
+ }
+
+ test('always cancels trigger if values is singlesubscription', () async {
+ setUpForStreamTypes('broadcast', 'single subscription', longPoll: true);
+ expect(triggerCanceled, false);
+ await subscription.cancel();
+ expect(triggerCanceled, true);
+
+ setUpForStreamTypes('single subscription', 'single subscription',
+ longPoll: true);
+ expect(triggerCanceled, false);
+ await subscription.cancel();
+ expect(triggerCanceled, true);
+ });
+
+ test('cancels trigger if trigger is broadcast', () async {
+ setUpForStreamTypes('broadcast', 'broadcast', longPoll: true);
+ expect(triggerCanceled, false);
+ await subscription.cancel();
+ expect(triggerCanceled, true);
+ });
+
+ test('pauses single subscription trigger for broadcast values', () async {
+ setUpForStreamTypes('single subscription', 'broadcast', longPoll: true);
+ expect(triggerCanceled, false);
+ expect(triggerPaused, false);
+ await subscription.cancel();
+ expect(triggerCanceled, false);
+ expect(triggerPaused, true);
+ });
+
+ for (var triggerType in streamTypes) {
+ test('cancel and relisten with [$triggerType] trigger', () async {
+ setUpForStreamTypes(triggerType, 'broadcast', longPoll: true);
+ values.add(1);
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [1]);
+ await subscription.cancel();
+ values.add(2);
+ trigger.add(null);
+ await Future(() {});
+ subscription = transformed.listen(emittedValues.add);
+ values.add(3);
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [1, 3]);
+ });
+ }
+}