Refactor to remove StreamTransformer instantiation (dart-lang/stream_transform#120)
Directly implement the behavior from `bind` which is the `Stream` to
`Stream` implementation in the extension methods rather than in separate
`StreamTransformer` classes. This pattern has fewer potential issues
around generics since it's fully based on function calls which are
type checked appropriately unlike the `StreamTransformer` interface
which suffers from the incorrect covariant subtype relationships. It
also may avoid some unnecessary allocations as improve stack traces.
For the transforms that were changed, aim for better consistency around
generic type arguments. `T` is now more consistently the source stream
type, while `S` is the output stream type.
diff --git a/pkgs/stream_transform/lib/src/aggregate_sample.dart b/pkgs/stream_transform/lib/src/aggregate_sample.dart
index 3a967d4..cd0a7d0 100644
--- a/pkgs/stream_transform/lib/src/aggregate_sample.dart
+++ b/pkgs/stream_transform/lib/src/aggregate_sample.dart
@@ -4,32 +4,27 @@
import 'dart:async';
-/// A StreamTransformer which aggregates values and emits when it sees a value
-/// on [_trigger].
-///
-/// If there are no pending values when [_trigger] emits the first value on the
-/// source Stream will immediately flow through. Otherwise, the pending values
-/// and released when [_trigger] emits.
-///
-/// Errors from the source stream or the trigger are immediately forwarded to
-/// the output.
-class AggregateSample<S, T> extends StreamTransformerBase<S, T> {
- final Stream<void> _trigger;
- final T Function(S, T?) _aggregate;
+extension AggregateSample<T> on Stream<T> {
+ /// Aggregates values and emits when it sees a value on [trigger].
+ ///
+ /// If there are no pending values when [trigger] emits, the next value on the
+ /// source Stream will be passed to [aggregate] and emitted on the result
+ /// stream immediately. Otherwise, the pending values are released when
+ /// [trigger] emits.
+ ///
+ /// Errors from the source stream or the trigger are immediately forwarded to
+ /// the output.
+ Stream<S> aggregateSample<S>(
+ Stream<void> trigger, S Function(T, S?) aggregate) {
+ var controller = isBroadcast
+ ? StreamController<S>.broadcast(sync: true)
+ : StreamController<S>(sync: true);
- AggregateSample(this._trigger, this._aggregate);
-
- @override
- Stream<T> bind(Stream<S> values) {
- var controller = values.isBroadcast
- ? StreamController<T>.broadcast(sync: true)
- : StreamController<T>(sync: true);
-
- T? currentResults;
+ S? currentResults;
var waitingForTrigger = true;
var isTriggerDone = false;
var isValueDone = false;
- StreamSubscription<S>? valueSub;
+ StreamSubscription<T>? valueSub;
StreamSubscription<void>? triggerSub;
void emit() {
@@ -38,8 +33,8 @@
waitingForTrigger = true;
}
- void onValue(S value) {
- currentResults = _aggregate(value, currentResults);
+ void onValue(T value) {
+ currentResults = aggregate(value, currentResults);
if (!waitingForTrigger) emit();
@@ -78,16 +73,16 @@
controller.onListen = () {
assert(valueSub == null);
- valueSub = values.listen(onValue,
- onError: controller.addError, onDone: onValuesDone);
+ valueSub =
+ listen(onValue, onError: controller.addError, onDone: onValuesDone);
final priorTriggerSub = triggerSub;
if (priorTriggerSub != null) {
if (priorTriggerSub.isPaused) priorTriggerSub.resume();
} else {
- triggerSub = _trigger.listen(onTrigger,
+ triggerSub = trigger.listen(onTrigger,
onError: controller.addError, onDone: onTriggerDone);
}
- if (!values.isBroadcast) {
+ if (!isBroadcast) {
controller
..onPause = () {
valueSub?.pause();
@@ -101,7 +96,7 @@
controller.onCancel = () {
var cancels = <Future<void>>[if (!isValueDone) valueSub!.cancel()];
valueSub = null;
- if (_trigger.isBroadcast || !values.isBroadcast) {
+ if (trigger.isBroadcast || !isBroadcast) {
if (!isTriggerDone) cancels.add(triggerSub!.cancel());
triggerSub = null;
} else {
diff --git a/pkgs/stream_transform/lib/src/async_map.dart b/pkgs/stream_transform/lib/src/async_map.dart
index 7b406c7..f602252 100644
--- a/pkgs/stream_transform/lib/src/async_map.dart
+++ b/pkgs/stream_transform/lib/src/async_map.dart
@@ -41,8 +41,7 @@
var workFinished = StreamController<void>()
// Let the first event through.
..add(null);
- return buffer(workFinished.stream)
- .transform(_asyncMapThen(convert, workFinished.add));
+ return buffer(workFinished.stream)._asyncMapThen(convert, workFinished.add);
}
/// Like [asyncMap] but events are discarded while work is happening in
@@ -68,8 +67,8 @@
var workFinished = StreamController<void>()
// Let the first event through.
..add(null);
- return transform(AggregateSample(workFinished.stream, _dropPrevious))
- .transform(_asyncMapThen(convert, workFinished.add));
+ return aggregateSample(workFinished.stream, _dropPrevious)
+ ._asyncMapThen(convert, workFinished.add);
}
/// Like [asyncMap] but the [convert] callback may be called for an element
@@ -92,7 +91,7 @@
Stream<S> concurrentAsyncMap<S>(FutureOr<S> Function(T) convert) {
var valuesWaiting = 0;
var sourceDone = false;
- return transform(fromHandlers(handleData: (element, sink) {
+ return transformByHandlers(onData: (element, sink) {
valuesWaiting++;
() async {
try {
@@ -103,29 +102,29 @@
valuesWaiting--;
if (valuesWaiting <= 0 && sourceDone) sink.close();
}();
- }, handleDone: (sink) {
+ }, onDone: (sink) {
sourceDone = true;
if (valuesWaiting <= 0) sink.close();
- }));
+ });
+ }
+
+ /// Like [Stream.asyncMap] but the [convert] is only called once per event,
+ /// rather than once per listener, and [then] is called after completing the
+ /// work.
+ Stream<S> _asyncMapThen<S>(
+ Future<S> Function(T) convert, void Function(void) then) {
+ Future<void>? pendingEvent;
+ return transformByHandlers(onData: (event, sink) {
+ pendingEvent =
+ convert(event).then(sink.add).catchError(sink.addError).then(then);
+ }, onDone: (sink) {
+ if (pendingEvent != null) {
+ pendingEvent!.then((_) => sink.close());
+ } else {
+ sink.close();
+ }
+ });
}
}
T _dropPrevious<T>(T event, _) => event;
-
-/// Like [Stream.asyncMap] but the [convert] is only called once per event,
-/// rather than once per listener, and [then] is called after completing the
-/// work.
-StreamTransformer<S, T> _asyncMapThen<S, T>(
- Future<T> Function(S) convert, void Function(void) then) {
- Future<void>? pendingEvent;
- return fromHandlers(handleData: (event, sink) {
- pendingEvent =
- convert(event).then(sink.add).catchError(sink.addError).then(then);
- }, handleDone: (sink) {
- if (pendingEvent != null) {
- pendingEvent!.then((_) => sink.close());
- } else {
- sink.close();
- }
- });
-}
diff --git a/pkgs/stream_transform/lib/src/combine_latest.dart b/pkgs/stream_transform/lib/src/combine_latest.dart
index 6e268e3..dcaf672 100644
--- a/pkgs/stream_transform/lib/src/combine_latest.dart
+++ b/pkgs/stream_transform/lib/src/combine_latest.dart
@@ -34,8 +34,110 @@
/// well, regardless of [other]'s type. If a single subscription stream is
/// combined with a broadcast stream it may never be canceled.
Stream<S> combineLatest<T2, S>(
- Stream<T2> other, FutureOr<S> Function(T, T2) combine) =>
- transform(_CombineLatest(other, combine));
+ Stream<T2> other, FutureOr<S> Function(T, T2) combine) {
+ final controller = isBroadcast
+ ? StreamController<S>.broadcast(sync: true)
+ : StreamController<S>(sync: true);
+
+ other =
+ (isBroadcast && !other.isBroadcast) ? other.asBroadcastStream() : other;
+
+ StreamSubscription<T>? sourceSubscription;
+ StreamSubscription<T2>? otherSubscription;
+
+ var sourceDone = false;
+ var otherDone = false;
+
+ late T latestSource;
+ late T2 latestOther;
+
+ var sourceStarted = false;
+ var otherStarted = false;
+
+ void emitCombined() {
+ if (!sourceStarted || !otherStarted) return;
+ FutureOr<S> result;
+ try {
+ result = combine(latestSource, latestOther);
+ } catch (e, s) {
+ controller.addError(e, s);
+ return;
+ }
+ if (result is Future<S>) {
+ sourceSubscription!.pause();
+ otherSubscription!.pause();
+ result
+ .then(controller.add, onError: controller.addError)
+ .whenComplete(() {
+ sourceSubscription!.resume();
+ otherSubscription!.resume();
+ });
+ } else {
+ controller.add(result);
+ }
+ }
+
+ controller.onListen = () {
+ assert(sourceSubscription == null);
+ sourceSubscription = listen(
+ (s) {
+ sourceStarted = true;
+ latestSource = s;
+ emitCombined();
+ },
+ onError: controller.addError,
+ onDone: () {
+ sourceDone = true;
+ if (otherDone) {
+ controller.close();
+ } else if (!sourceStarted) {
+ // Nothing can ever be emitted
+ otherSubscription!.cancel();
+ controller.close();
+ }
+ });
+ otherSubscription = other.listen(
+ (o) {
+ otherStarted = true;
+ latestOther = o;
+ emitCombined();
+ },
+ onError: controller.addError,
+ onDone: () {
+ otherDone = true;
+ if (sourceDone) {
+ controller.close();
+ } else if (!otherStarted) {
+ // Nothing can ever be emitted
+ sourceSubscription!.cancel();
+ controller.close();
+ }
+ });
+ if (!isBroadcast) {
+ controller
+ ..onPause = () {
+ sourceSubscription!.pause();
+ otherSubscription!.pause();
+ }
+ ..onResume = () {
+ sourceSubscription!.resume();
+ otherSubscription!.resume();
+ };
+ }
+ controller.onCancel = () {
+ var cancels = [
+ sourceSubscription!.cancel(),
+ otherSubscription!.cancel()
+ ]
+ // Handle opt-out nulls
+ ..removeWhere((Object? f) => f == null);
+ sourceSubscription = null;
+ otherSubscription = null;
+ return Future.wait(cancels).then((_) => null);
+ };
+ };
+ return controller.stream;
+ }
/// Combine the latest value emitted from the source stream with the latest
/// values emitted from [others].
@@ -72,141 +174,15 @@
/// If the source stream is a broadcast stream, the result stream will be as
/// well, regardless of the types of [others]. If a single subscription stream
/// is combined with a broadcast source stream, it may never be canceled.
- Stream<List<T>> combineLatestAll(Iterable<Stream<T>> others) =>
- transform(_CombineLatestAll<T>(others));
-}
-
-class _CombineLatest<S, T, R> extends StreamTransformerBase<S, R> {
- final Stream<T> _other;
- final FutureOr<R> Function(S, T) _combine;
-
- _CombineLatest(this._other, this._combine);
-
- @override
- Stream<R> bind(Stream<S> source) {
- final controller = source.isBroadcast
- ? StreamController<R>.broadcast(sync: true)
- : StreamController<R>(sync: true);
-
- final other = (source.isBroadcast && !_other.isBroadcast)
- ? _other.asBroadcastStream()
- : _other;
-
- StreamSubscription<S>? sourceSubscription;
- StreamSubscription<T>? otherSubscription;
-
- var sourceDone = false;
- var otherDone = false;
-
- late S latestSource;
- late T latestOther;
-
- var sourceStarted = false;
- var otherStarted = false;
-
- void emitCombined() {
- if (!sourceStarted || !otherStarted) return;
- FutureOr<R> result;
- try {
- result = _combine(latestSource, latestOther);
- } catch (e, s) {
- controller.addError(e, s);
- return;
- }
- if (result is Future<R>) {
- sourceSubscription!.pause();
- otherSubscription!.pause();
- result
- .then(controller.add, onError: controller.addError)
- .whenComplete(() {
- sourceSubscription!.resume();
- otherSubscription!.resume();
- });
- } else {
- controller.add(result);
- }
- }
-
- controller.onListen = () {
- assert(sourceSubscription == null);
- sourceSubscription = source.listen(
- (s) {
- sourceStarted = true;
- latestSource = s;
- emitCombined();
- },
- onError: controller.addError,
- onDone: () {
- sourceDone = true;
- if (otherDone) {
- controller.close();
- } else if (!sourceStarted) {
- // Nothing can ever be emitted
- otherSubscription!.cancel();
- controller.close();
- }
- });
- otherSubscription = other.listen(
- (o) {
- otherStarted = true;
- latestOther = o;
- emitCombined();
- },
- onError: controller.addError,
- onDone: () {
- otherDone = true;
- if (sourceDone) {
- controller.close();
- } else if (!otherStarted) {
- // Nothing can ever be emitted
- sourceSubscription!.cancel();
- controller.close();
- }
- });
- if (!source.isBroadcast) {
- controller
- ..onPause = () {
- sourceSubscription!.pause();
- otherSubscription!.pause();
- }
- ..onResume = () {
- sourceSubscription!.resume();
- otherSubscription!.resume();
- };
- }
- controller.onCancel = () {
- var cancels = [
- sourceSubscription!.cancel(),
- otherSubscription!.cancel()
- ]
- // Handle opt-out nulls
- ..removeWhere((Object? f) => f == null);
- sourceSubscription = null;
- otherSubscription = null;
- return Future.wait(cancels).then((_) => null);
- };
- };
- return controller.stream;
- }
-}
-
-class _CombineLatestAll<T> extends StreamTransformerBase<T, List<T>> {
- final Iterable<Stream<T>> _others;
-
- _CombineLatestAll(this._others);
-
- @override
- Stream<List<T>> bind(Stream<T> first) {
- final controller = first.isBroadcast
+ Stream<List<T>> combineLatestAll(Iterable<Stream<T>> others) {
+ final controller = isBroadcast
? StreamController<List<T>>.broadcast(sync: true)
: StreamController<List<T>>(sync: true);
final allStreams = [
- first,
- for (final other in _others)
- !first.isBroadcast || other.isBroadcast
- ? other
- : other.asBroadcastStream(),
+ this,
+ for (final other in others)
+ !isBroadcast || other.isBroadcast ? other : other.asBroadcastStream(),
];
controller.onListen = () {
@@ -239,7 +215,7 @@
streamId++;
}
- if (!first.isBroadcast) {
+ if (!isBroadcast) {
controller
..onPause = () {
for (final subscription in subscriptions) {
diff --git a/pkgs/stream_transform/lib/src/concatenate.dart b/pkgs/stream_transform/lib/src/concatenate.dart
index 402d8a0..37a4d96 100644
--- a/pkgs/stream_transform/lib/src/concatenate.dart
+++ b/pkgs/stream_transform/lib/src/concatenate.dart
@@ -17,11 +17,67 @@
/// to and never canceled since there may be broadcast listeners added later.
///
/// If a broadcast stream follows any other stream it will miss any events or
- /// errors which occur before the first stream is done. If a broadcast stream
- /// follows a single-subscription stream, pausing the stream while it is
- /// listening to the second stream will cause events to be dropped rather than
- /// buffered.
- Stream<T> followedBy(Stream<T> next) => transform(_FollowedBy(next));
+ /// errors which occur before the original stream is done. If a broadcast
+ /// stream follows a single-subscription stream, pausing the stream while it
+ /// is listening to the second stream will cause events to be dropped rather
+ /// than buffered.
+ Stream<T> followedBy(Stream<T> next) {
+ var controller = isBroadcast
+ ? StreamController<T>.broadcast(sync: true)
+ : StreamController<T>(sync: true);
+
+ next = isBroadcast && !next.isBroadcast ? next.asBroadcastStream() : next;
+
+ StreamSubscription<T>? subscription;
+ var currentStream = this;
+ var thisDone = false;
+ var secondDone = false;
+
+ late void Function() currentDoneHandler;
+
+ void listen() {
+ subscription = currentStream.listen(controller.add,
+ onError: controller.addError, onDone: () => currentDoneHandler());
+ }
+
+ void onSecondDone() {
+ secondDone = true;
+ controller.close();
+ }
+
+ void onThisDone() {
+ thisDone = true;
+ currentStream = next;
+ currentDoneHandler = onSecondDone;
+ listen();
+ }
+
+ currentDoneHandler = onThisDone;
+
+ controller.onListen = () {
+ assert(subscription == null);
+ listen();
+ if (!isBroadcast) {
+ controller
+ ..onPause = () {
+ if (!thisDone || !next.isBroadcast) return subscription!.pause();
+ subscription!.cancel();
+ subscription = null;
+ }
+ ..onResume = () {
+ if (!thisDone || !next.isBroadcast) return subscription!.resume();
+ listen();
+ };
+ }
+ controller.onCancel = () {
+ if (secondDone) return null;
+ var toCancel = subscription!;
+ subscription = null;
+ return toCancel.cancel();
+ };
+ };
+ return controller.stream;
+ }
/// Returns a stream which emits [initial] before any values from the original
/// stream.
@@ -52,70 +108,3 @@
return initial.followedBy(this);
}
}
-
-class _FollowedBy<T> extends StreamTransformerBase<T, T> {
- final Stream<T> _next;
-
- _FollowedBy(this._next);
-
- @override
- Stream<T> bind(Stream<T> first) {
- var controller = first.isBroadcast
- ? StreamController<T>.broadcast(sync: true)
- : StreamController<T>(sync: true);
-
- var next = first.isBroadcast && !_next.isBroadcast
- ? _next.asBroadcastStream()
- : _next;
-
- StreamSubscription<T>? subscription;
- var currentStream = first;
- var firstDone = false;
- var secondDone = false;
-
- late void Function() currentDoneHandler;
-
- void listen() {
- subscription = currentStream.listen(controller.add,
- onError: controller.addError, onDone: () => currentDoneHandler());
- }
-
- void onSecondDone() {
- secondDone = true;
- controller.close();
- }
-
- void onFirstDone() {
- firstDone = true;
- currentStream = next;
- currentDoneHandler = onSecondDone;
- listen();
- }
-
- currentDoneHandler = onFirstDone;
-
- controller.onListen = () {
- assert(subscription == null);
- listen();
- if (!first.isBroadcast) {
- controller
- ..onPause = () {
- if (!firstDone || !next.isBroadcast) return subscription!.pause();
- subscription!.cancel();
- subscription = null;
- }
- ..onResume = () {
- if (!firstDone || !next.isBroadcast) return subscription!.resume();
- listen();
- };
- }
- controller.onCancel = () {
- if (secondDone) return null;
- var toCancel = subscription!;
- subscription = null;
- return toCancel.cancel();
- };
- };
- return controller.stream;
- }
-}
diff --git a/pkgs/stream_transform/lib/src/from_handlers.dart b/pkgs/stream_transform/lib/src/from_handlers.dart
index c7c9332..f5c4b84 100644
--- a/pkgs/stream_transform/lib/src/from_handlers.dart
+++ b/pkgs/stream_transform/lib/src/from_handlers.dart
@@ -4,46 +4,21 @@
import 'dart:async';
-/// Like [new StreamTransformer.fromHandlers] but the handlers are called once
-/// per event rather than once per listener for broadcast streams.
-StreamTransformer<S, T> fromHandlers<S, T>(
- {void Function(S, EventSink<T>)? handleData,
- void Function(Object, StackTrace, EventSink<T>)? handleError,
- void Function(EventSink<T>)? handleDone}) =>
- _StreamTransformer(
- handleData: handleData,
- handleError: handleError,
- handleDone: handleDone);
+extension TransformByHandlers<S> on Stream<S> {
+ /// Transform a stream by callbacks.
+ ///
+ /// This is similar to `transform(StreamTransformer.fromHandler(...))` except
+ /// that the handlers are called once per event rather than called for the
+ /// same event for each listener on a broadcast stream.
+ Stream<T> transformByHandlers<T>(
+ {void Function(S, EventSink<T>)? onData,
+ void Function(Object, StackTrace, EventSink<T>)? onError,
+ void Function(EventSink<T>)? onDone}) {
+ final handleData = onData ?? _defaultHandleData;
+ final handleError = onError ?? _defaultHandleError;
+ final handleDone = onDone ?? _defaultHandleDone;
-class _StreamTransformer<S, T> extends StreamTransformerBase<S, T> {
- final void Function(S, EventSink<T>) _handleData;
- final void Function(EventSink<T>) _handleDone;
- final void Function(Object, StackTrace, EventSink<T>) _handleError;
-
- _StreamTransformer(
- {void Function(S, EventSink<T>)? handleData,
- void Function(Object, StackTrace, EventSink<T>)? handleError,
- void Function(EventSink<T>)? handleDone})
- : _handleData = handleData ?? _defaultHandleData,
- _handleError = handleError ?? _defaultHandleError,
- _handleDone = handleDone ?? _defaultHandleDone;
-
- static void _defaultHandleData<S, T>(S value, EventSink<T> sink) {
- sink.add(value as T);
- }
-
- static void _defaultHandleError<T>(
- Object error, StackTrace stackTrace, EventSink<T> sink) {
- sink.addError(error, stackTrace);
- }
-
- static void _defaultHandleDone<T>(EventSink<T> sink) {
- sink.close();
- }
-
- @override
- Stream<T> bind(Stream<S> values) {
- var controller = values.isBroadcast
+ var controller = isBroadcast
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
@@ -51,14 +26,14 @@
controller.onListen = () {
assert(subscription == null);
var valuesDone = false;
- subscription = values.listen((value) => _handleData(value, controller),
+ subscription = listen((value) => handleData(value, controller),
onError: (Object error, StackTrace stackTrace) {
- _handleError(error, stackTrace, controller);
+ handleError(error, stackTrace, controller);
}, onDone: () {
valuesDone = true;
- _handleDone(controller);
+ handleDone(controller);
});
- if (!values.isBroadcast) {
+ if (!isBroadcast) {
controller
..onPause = subscription!.pause
..onResume = subscription!.resume;
@@ -72,4 +47,17 @@
};
return controller.stream;
}
+
+ static void _defaultHandleData<S, T>(S value, EventSink<T> sink) {
+ sink.add(value as T);
+ }
+
+ static void _defaultHandleError<T>(
+ Object error, StackTrace stackTrace, EventSink<T> sink) {
+ sink.addError(error, stackTrace);
+ }
+
+ static void _defaultHandleDone<T>(EventSink<T> sink) {
+ sink.close();
+ }
}
diff --git a/pkgs/stream_transform/lib/src/merge.dart b/pkgs/stream_transform/lib/src/merge.dart
index 61b5e21..55fe409 100644
--- a/pkgs/stream_transform/lib/src/merge.dart
+++ b/pkgs/stream_transform/lib/src/merge.dart
@@ -30,7 +30,7 @@
/// If a broadcast stream is merged into a single-subscription stream any
/// events emitted by [other] before the result stream has a subscriber will
/// be discarded.
- Stream<T> merge(Stream<T> other) => transform(_Merge([other]));
+ Stream<T> merge(Stream<T> other) => mergeAll([other]);
/// Returns a stream which emits values and errors from the source stream and
/// any stream in [others] in any order as they arrive.
@@ -55,7 +55,52 @@
/// If a broadcast stream is merged into a single-subscription stream any
/// events emitted by that stream before the result stream has a subscriber
/// will be discarded.
- Stream<T> mergeAll(Iterable<Stream<T>> others) => transform(_Merge(others));
+ Stream<T> mergeAll(Iterable<Stream<T>> others) {
+ final controller = isBroadcast
+ ? StreamController<T>.broadcast(sync: true)
+ : StreamController<T>(sync: true);
+
+ final allStreams = [
+ this,
+ for (final other in others)
+ !isBroadcast || other.isBroadcast ? other : other.asBroadcastStream(),
+ ];
+
+ controller.onListen = () {
+ final subscriptions = <StreamSubscription<T>>[];
+ for (final stream in allStreams) {
+ final subscription =
+ stream.listen(controller.add, onError: controller.addError);
+ subscription.onDone(() {
+ subscriptions.remove(subscription);
+ if (subscriptions.isEmpty) controller.close();
+ });
+ subscriptions.add(subscription);
+ }
+ if (!isBroadcast) {
+ controller
+ ..onPause = () {
+ for (final subscription in subscriptions) {
+ subscription.pause();
+ }
+ }
+ ..onResume = () {
+ for (final subscription in subscriptions) {
+ subscription.resume();
+ }
+ };
+ }
+ controller.onCancel = () {
+ if (subscriptions.isEmpty) return null;
+ var cancels = [for (var s in subscriptions) s.cancel()]
+ // Handle opt-out nulls
+ ..removeWhere((Object? f) => f == null);
+ if (cancels.isEmpty) return null;
+ return Future.wait(cancels).then((_) => null);
+ };
+ };
+ return controller.stream;
+ }
/// Like [asyncExpand] but the [convert] callback may be called for an element
/// before the [Stream] emitted by the previous element has closed.
@@ -84,76 +129,19 @@
/// * [switchMap], which cancels subscriptions to the previous sub
/// stream instead of concurrently emitting events from all sub streams.
Stream<S> concurrentAsyncExpand<S>(Stream<S> Function(T) convert) =>
- map(convert).transform(_MergeExpanded());
+ map(convert).mergeExpanded();
}
-class _Merge<T> extends StreamTransformerBase<T, T> {
- final Iterable<Stream<T>> _others;
-
- _Merge(this._others);
-
- @override
- Stream<T> bind(Stream<T> first) {
- final controller = first.isBroadcast
- ? StreamController<T>.broadcast(sync: true)
- : StreamController<T>(sync: true);
-
- final allStreams = [
- first,
- for (final other in _others)
- !first.isBroadcast || other.isBroadcast
- ? other
- : other.asBroadcastStream(),
- ];
-
- controller.onListen = () {
- final subscriptions = <StreamSubscription<T>>[];
- for (final stream in allStreams) {
- final subscription =
- stream.listen(controller.add, onError: controller.addError);
- subscription.onDone(() {
- subscriptions.remove(subscription);
- if (subscriptions.isEmpty) controller.close();
- });
- subscriptions.add(subscription);
- }
- if (!first.isBroadcast) {
- controller
- ..onPause = () {
- for (final subscription in subscriptions) {
- subscription.pause();
- }
- }
- ..onResume = () {
- for (final subscription in subscriptions) {
- subscription.resume();
- }
- };
- }
- controller.onCancel = () {
- if (subscriptions.isEmpty) return null;
- var cancels = [for (var s in subscriptions) s.cancel()]
- // Handle opt-out nulls
- ..removeWhere((Object? f) => f == null);
- if (cancels.isEmpty) return null;
- return Future.wait(cancels).then((_) => null);
- };
- };
- return controller.stream;
- }
-}
-
-class _MergeExpanded<T> extends StreamTransformerBase<Stream<T>, T> {
- @override
- Stream<T> bind(Stream<Stream<T>> streams) {
- final controller = streams.isBroadcast
+extension _MergeExpanded<T> on Stream<Stream<T>> {
+ Stream<T> mergeExpanded() {
+ final controller = isBroadcast
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
controller.onListen = () {
final subscriptions = <StreamSubscription<dynamic>>[];
- final outerSubscription = streams.listen((inner) {
- if (streams.isBroadcast && !inner.isBroadcast) {
+ final outerSubscription = listen((inner) {
+ if (isBroadcast && !inner.isBroadcast) {
inner = inner.asBroadcastStream();
}
final subscription =
@@ -169,7 +157,7 @@
if (subscriptions.isEmpty) controller.close();
});
subscriptions.add(outerSubscription);
- if (!streams.isBroadcast) {
+ if (!isBroadcast) {
controller
..onPause = () {
for (final subscription in subscriptions) {
diff --git a/pkgs/stream_transform/lib/src/rate_limit.dart b/pkgs/stream_transform/lib/src/rate_limit.dart
index 23dfbc5..d558f61 100644
--- a/pkgs/stream_transform/lib/src/rate_limit.dart
+++ b/pkgs/stream_transform/lib/src/rate_limit.dart
@@ -56,8 +56,8 @@
/// To collect values emitted during the debounce period see [debounceBuffer].
Stream<T> debounce(Duration duration,
{bool leading = false, bool trailing = true}) =>
- transform(_debounceAggregate(duration, _dropPrevious,
- leading: leading, trailing: trailing));
+ _debounceAggregate(duration, _dropPrevious,
+ leading: leading, trailing: trailing);
/// Returns a Stream which collects values until the source stream does not
/// emit for [duration] then emits the collected values.
@@ -76,8 +76,7 @@
/// To keep only the most recent event during the debounce perios see
/// [debounce].
Stream<List<T>> debounceBuffer(Duration duration) =>
- transform(_debounceAggregate(duration, _collectToList,
- leading: false, trailing: true));
+ _debounceAggregate(duration, _collect, leading: false, trailing: true);
/// Returns a stream which only emits once per [duration], at the beginning of
/// the period.
@@ -87,14 +86,14 @@
Stream<T> throttle(Duration duration) {
Timer? timer;
- return transform(fromHandlers(handleData: (data, sink) {
+ return transformByHandlers(onData: (data, sink) {
if (timer == null) {
sink.add(data);
timer = Timer(duration, () {
timer = null;
});
}
- }));
+ });
}
/// Returns a Stream which only emits once per [duration], at the end of the
@@ -129,7 +128,7 @@
var shouldClose = false;
T recentData;
- return transform(fromHandlers(handleData: (T data, EventSink<T> sink) {
+ return transformByHandlers(onData: (data, sink) {
recentData = data;
timer ??= Timer(duration, () {
sink.add(recentData);
@@ -138,13 +137,13 @@
sink.close();
}
});
- }, handleDone: (EventSink<T> sink) {
+ }, onDone: (sink) {
if (timer != null) {
shouldClose = true;
} else {
sink.close();
}
- }));
+ });
}
/// Returns a Stream which collects values and emits when it sees a value on
@@ -158,51 +157,44 @@
/// Errors from the source stream or the trigger are immediately forwarded to
/// the output.
Stream<List<T>> buffer(Stream<void> trigger) =>
- transform(AggregateSample<T, List<T>>(trigger, _collect));
-}
+ aggregateSample<List<T>>(trigger, _collect);
-List<T> _collectToList<T>(T element, List<T>? soFar) {
- soFar ??= <T>[];
- soFar.add(element);
- return soFar;
+ /// Aggregates values until the source stream does not emit for [duration],
+ /// then emits the aggregated values.
+ Stream<S> _debounceAggregate<S>(
+ Duration duration, S Function(T element, S? soFar) collect,
+ {required bool leading, required bool trailing}) {
+ Timer? timer;
+ S? soFar;
+ var shouldClose = false;
+ var emittedLatestAsLeading = false;
+ return transformByHandlers(onData: (value, sink) {
+ timer?.cancel();
+ soFar = collect(value, soFar);
+ if (timer == null && leading) {
+ emittedLatestAsLeading = true;
+ sink.add(soFar as S);
+ } else {
+ emittedLatestAsLeading = false;
+ }
+ timer = Timer(duration, () {
+ if (trailing && !emittedLatestAsLeading) sink.add(soFar as S);
+ if (shouldClose) {
+ sink.close();
+ }
+ soFar = null;
+ timer = null;
+ });
+ }, onDone: (EventSink<S> sink) {
+ if (soFar != null && trailing) {
+ shouldClose = true;
+ } else {
+ timer?.cancel();
+ sink.close();
+ }
+ });
+ }
}
T _dropPrevious<T>(T element, _) => element;
-
-/// Creates a StreamTransformer which aggregates values until the source stream
-/// does not emit for [duration], then emits the aggregated values.
-StreamTransformer<T, R> _debounceAggregate<T, R>(
- Duration duration, R Function(T element, R? soFar) collect,
- {required bool leading, required bool trailing}) {
- Timer? timer;
- R? soFar;
- var shouldClose = false;
- var emittedLatestAsLeading = false;
- return fromHandlers(handleData: (T value, EventSink<R> sink) {
- timer?.cancel();
- soFar = collect(value, soFar);
- if (timer == null && leading) {
- emittedLatestAsLeading = true;
- sink.add(soFar as R);
- } else {
- emittedLatestAsLeading = false;
- }
- timer = Timer(duration, () {
- if (trailing && !emittedLatestAsLeading) sink.add(soFar as R);
- if (shouldClose) {
- sink.close();
- }
- soFar = null;
- timer = null;
- });
- }, handleDone: (EventSink<R> sink) {
- if (soFar != null && trailing) {
- shouldClose = true;
- } else {
- timer?.cancel();
- sink.close();
- }
- });
-}
-
List<T> _collect<T>(T event, List<T>? soFar) => (soFar ?? <T>[])..add(event);
diff --git a/pkgs/stream_transform/lib/src/switch.dart b/pkgs/stream_transform/lib/src/switch.dart
index df7f1b8..17fbb81 100644
--- a/pkgs/stream_transform/lib/src/switch.dart
+++ b/pkgs/stream_transform/lib/src/switch.dart
@@ -52,15 +52,8 @@
///
/// If the source stream is a broadcast stream, the result stream will be as
/// well, regardless of the types of streams emitted.
- Stream<T> switchLatest() => transform(_SwitchTransformer<T>());
-}
-
-class _SwitchTransformer<T> extends StreamTransformerBase<Stream<T>, T> {
- const _SwitchTransformer();
-
- @override
- Stream<T> bind(Stream<Stream<T>> outer) {
- var controller = outer.isBroadcast
+ Stream<T> switchLatest() {
+ var controller = isBroadcast
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
@@ -68,7 +61,7 @@
StreamSubscription<T>? innerSubscription;
var outerStreamDone = false;
- final outerSubscription = outer.listen(
+ final outerSubscription = listen(
(innerStream) {
innerSubscription?.cancel();
innerSubscription = innerStream.listen(controller.add,
@@ -82,7 +75,7 @@
outerStreamDone = true;
if (innerSubscription == null) controller.close();
});
- if (!outer.isBroadcast) {
+ if (!isBroadcast) {
controller
..onPause = () {
innerSubscription?.pause();
diff --git a/pkgs/stream_transform/lib/src/take_until.dart b/pkgs/stream_transform/lib/src/take_until.dart
index 5420500..32366ed 100644
--- a/pkgs/stream_transform/lib/src/take_until.dart
+++ b/pkgs/stream_transform/lib/src/take_until.dart
@@ -13,23 +13,14 @@
/// which are emitted before the trigger, but have further asynchronous delays
/// in transformations following the takeUtil, will still go through.
/// Cancelling a subscription immediately stops values.
- Stream<T> takeUntil(Future<void> trigger) => transform(_TakeUntil(trigger));
-}
-
-class _TakeUntil<T> extends StreamTransformerBase<T, T> {
- final Future<void> _trigger;
-
- _TakeUntil(this._trigger);
-
- @override
- Stream<T> bind(Stream<T> values) {
- var controller = values.isBroadcast
+ Stream<T> takeUntil(Future<void> trigger) {
+ var controller = isBroadcast
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
StreamSubscription<T>? subscription;
var isDone = false;
- _trigger.then((_) {
+ trigger.then((_) {
if (isDone) return;
isDone = true;
subscription?.cancel();
@@ -38,13 +29,13 @@
controller.onListen = () {
if (isDone) return;
- subscription = values.listen(controller.add, onError: controller.addError,
- onDone: () {
+ subscription =
+ listen(controller.add, onError: controller.addError, onDone: () {
if (isDone) return;
isDone = true;
controller.close();
});
- if (!values.isBroadcast) {
+ if (!isBroadcast) {
controller
..onPause = subscription!.pause
..onResume = subscription!.resume;
diff --git a/pkgs/stream_transform/lib/src/tap.dart b/pkgs/stream_transform/lib/src/tap.dart
index b7e0321..05ab32f 100644
--- a/pkgs/stream_transform/lib/src/tap.dart
+++ b/pkgs/stream_transform/lib/src/tap.dart
@@ -25,20 +25,20 @@
Stream<T> tap(void Function(T)? onValue,
{void Function(Object, StackTrace)? onError,
void Function()? onDone}) =>
- transform(fromHandlers(handleData: (value, sink) {
+ transformByHandlers(onData: (value, sink) {
try {
onValue?.call(value);
} catch (_) {/*Ignore*/}
sink.add(value);
- }, handleError: (error, stackTrace, sink) {
+ }, onError: (error, stackTrace, sink) {
try {
onError?.call(error, stackTrace);
} catch (_) {/*Ignore*/}
sink.addError(error, stackTrace);
- }, handleDone: (sink) {
+ }, onDone: (sink) {
try {
onDone?.call();
} catch (_) {/*Ignore*/}
sink.close();
- }));
+ });
}
diff --git a/pkgs/stream_transform/lib/src/where.dart b/pkgs/stream_transform/lib/src/where.dart
index 97ec57b..ed3532e 100644
--- a/pkgs/stream_transform/lib/src/where.dart
+++ b/pkgs/stream_transform/lib/src/where.dart
@@ -17,10 +17,9 @@
/// [S] should be a subtype of the stream's generic type, otherwise nothing of
/// type [S] could possibly be emitted, however there is no static or runtime
/// checking that this is the case.
- Stream<S> whereType<S>() =>
- transform(StreamTransformer.fromHandlers(handleData: (event, sink) {
+ Stream<S> whereType<S>() => transformByHandlers(onData: (event, sink) {
if (event is S) sink.add(event);
- }));
+ });
/// Like [where] but allows the [test] to return a [Future].
///
@@ -40,7 +39,7 @@
Stream<T> asyncWhere(FutureOr<bool> Function(T) test) {
var valuesWaiting = 0;
var sourceDone = false;
- return transform(fromHandlers(handleData: (element, sink) {
+ return transformByHandlers(onData: (element, sink) {
valuesWaiting++;
() async {
try {
@@ -51,9 +50,9 @@
valuesWaiting--;
if (valuesWaiting <= 0 && sourceDone) sink.close();
}();
- }, handleDone: (sink) {
+ }, onDone: (sink) {
sourceDone = true;
if (valuesWaiting <= 0) sink.close();
- }));
+ });
}
}
diff --git a/pkgs/stream_transform/test/from_handlers_test.dart b/pkgs/stream_transform/test/from_handlers_test.dart
index 206acc8..7ae3368 100644
--- a/pkgs/stream_transform/test/from_handlers_test.dart
+++ b/pkgs/stream_transform/test/from_handlers_test.dart
@@ -18,7 +18,7 @@
late StreamSubscription<int> subscription;
void setUpForController(StreamController<int> controller,
- StreamTransformer<int, int> transformer) {
+ Stream<int> Function(Stream<int>) transform) {
valuesCanceled = false;
values = controller
..onCancel = () {
@@ -27,7 +27,7 @@
emittedValues = [];
errors = [];
isDone = false;
- transformed = values.stream.transform(transformer);
+ transformed = transform(values.stream);
subscription =
transformed.listen(emittedValues.add, onError: errors.add, onDone: () {
isDone = true;
@@ -37,7 +37,7 @@
group('default from_handlers', () {
group('Single subscription stream', () {
setUp(() {
- setUpForController(StreamController(), fromHandlers());
+ setUpForController(StreamController(), (s) => s.transformByHandlers());
});
test('has correct stream type', () {
@@ -74,7 +74,8 @@
late StreamSubscription<int> subscription2;
setUp(() {
- setUpForController(StreamController.broadcast(), fromHandlers());
+ setUpForController(
+ StreamController.broadcast(), (s) => s.transformByHandlers());
emittedValues2 = [];
errors2 = [];
isDone2 = false;
@@ -120,10 +121,11 @@
group('custom handlers', () {
group('single subscription', () {
setUp(() async {
- setUpForController(StreamController(),
- fromHandlers(handleData: (value, sink) {
- sink.add(value + 1);
- }));
+ setUpForController(
+ StreamController(),
+ (s) => s.transformByHandlers(onData: (value, sink) {
+ sink.add(value + 1);
+ }));
});
test('uses transform from handleData', () async {
values..add(1)..add(2);
@@ -143,14 +145,14 @@
errorCallCount = 0;
setUpForController(
StreamController.broadcast(),
- fromHandlers(handleData: (value, sink) {
- dataCallCount++;
- }, handleError: (error, stackTrace, sink) {
- errorCallCount++;
- sink.addError(error, stackTrace);
- }, handleDone: (sink) {
- doneCallCount++;
- }));
+ (s) => s.transformByHandlers(onData: (value, sink) {
+ dataCallCount++;
+ }, onError: (error, stackTrace, sink) {
+ errorCallCount++;
+ sink.addError(error, stackTrace);
+ }, onDone: (sink) {
+ doneCallCount++;
+ }));
transformed.listen((_) {}, onError: (_, __) {});
});