blob: b1040304c3ae15c4287632a7dfe822ef8f4ce6fb [file] [log] [blame]
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
extension AggregateSample<T> on Stream<T> {
/// Computes a value based on sequences of events, then emits that value when
/// [trigger] emits an event.
///
/// Every time this stream emits an event, an intermediate value is created
/// by combining the new event with the previous intermediate value, or with
/// `null` if there is no previous value, using the [aggregate] function.
///
/// When [trigger] emits value, the returned stream emits the current
/// intermediate value and clears it.
///
/// If [longPoll] is `false`, if there is no intermediate value when [trigger]
/// emits an event, the [onEmpty] function is called with a [Sink] which can
/// add events to the returned stream.
///
/// If [longPoll] is `true`, and there is no intermediate value when [trigger]
/// emits one or more events, then the *next* event from this stream is
/// immediately put through [aggregate] and emitted on the returned stream.
/// Subsequent events on [trigger] while there have been no events on this
/// stream are ignored.
/// In that case, [onEmpty] is never used.
///
/// The result stream will close as soon as there is a guarantee it will not
/// emit any more events. There will not be any more events emitted if:
/// - [trigger] is closed and there is no waiting long poll.
/// - Or, the source stream is closed and there are no buffered events.
///
/// If the source stream is a broadcast stream, the result will be as well.
/// Errors from the source stream or the trigger are immediately forwarded to
/// the output.
Stream<S> aggregateSample<S>(
{required Stream<void> trigger,
required S Function(T, S?) aggregate,
required bool longPoll,
required void Function(Sink<S>) onEmpty}) {
var controller = isBroadcast
? StreamController<S>.broadcast(sync: true)
: StreamController<S>(sync: true);
S? currentResults;
var hasCurrentResults = false;
var activeLongPoll = false;
var isTriggerDone = false;
var isValueDone = false;
StreamSubscription<T>? valueSub;
StreamSubscription<void>? triggerSub;
void emit(S results) {
currentResults = null;
hasCurrentResults = false;
controller.add(results);
}
void onValue(T value) {
currentResults = aggregate(value, currentResults);
hasCurrentResults = true;
if (!longPoll) return;
if (activeLongPoll) {
activeLongPoll = false;
emit(currentResults as S);
}
if (isTriggerDone) {
valueSub!.cancel();
controller.close();
}
}
void onValuesDone() {
isValueDone = true;
if (!hasCurrentResults) {
triggerSub?.cancel();
controller.close();
}
}
void onTrigger(_) {
if (hasCurrentResults) {
emit(currentResults as S);
} else if (longPoll) {
activeLongPoll = true;
} else {
onEmpty(controller);
}
if (isValueDone) {
triggerSub!.cancel();
controller.close();
}
}
void onTriggerDone() {
isTriggerDone = true;
if (!activeLongPoll) {
valueSub?.cancel();
controller.close();
}
}
controller.onListen = () {
assert(valueSub == null);
valueSub =
listen(onValue, onError: controller.addError, onDone: onValuesDone);
final priorTriggerSub = triggerSub;
if (priorTriggerSub != null) {
if (priorTriggerSub.isPaused) priorTriggerSub.resume();
} else {
triggerSub = trigger.listen(onTrigger,
onError: controller.addError, onDone: onTriggerDone);
}
if (!isBroadcast) {
controller
..onPause = () {
valueSub?.pause();
triggerSub?.pause();
}
..onResume = () {
valueSub?.resume();
triggerSub?.resume();
};
}
controller.onCancel = () {
var cancels = <Future<void>>[if (!isValueDone) valueSub!.cancel()];
valueSub = null;
if (trigger.isBroadcast || !isBroadcast) {
if (!isTriggerDone) cancels.add(triggerSub!.cancel());
triggerSub = null;
} else {
triggerSub!.pause();
}
if (cancels.isEmpty) return null;
return cancels.wait.then((_) => null);
};
};
return controller.stream;
}
}