blob: e7049bdc574bbd589ddce15801dcc830812b3844 [file] [log] [blame] [view]
[![Build Status](https://github.com/dart-lang/tools/actions/workflows/stream_transform.yaml/badge.svg)](https://github.com/dart-lang/tools/actions/workflows/stream_transform.yaml)
[![pub package](https://img.shields.io/pub/v/stream_transform.svg)](https://pub.dev/packages/stream_transform)
[![package publisher](https://img.shields.io/pub/publisher/stream_transform.svg)](https://pub.dev/packages/stream_transform/publisher)
Extension methods on `Stream` adding common transform operators.
## Operators
### asyncMapBuffer, asyncMapSample, concurrentAsyncMap
Alternatives to `asyncMap`. `asyncMapBuffer` prevents the callback from
overlapping execution and collects events while it is executing.
`asyncMapSample` prevents overlapping execution and discards events while it is
executing. `concurrentAsyncMap` allows overlap and removes ordering guarantees
for higher throughput.
Like `asyncMap` but events are buffered in a List until previous events have
been processed rather than being called for each element individually.
### asyncWhere
Like `where` but allows an asynchronous predicate.
### audit
Waits for a period of time after receiving a value and then only emits the most
recent value.
### buffer
Collects values from a source stream until a `trigger` stream fires and the
collected values are emitted.
### combineLatest, combineLatestAll
Combine the most recent event from multiple streams through a callback or into a
list.
### debounce, debounceBuffer
Prevents a source stream from emitting too frequently by dropping or collecting
values that occur within a given duration.
### followedBy
Appends the values of a stream after another stream finishes.
### merge, mergeAll, concurrentAsyncExpand
Interleaves events from multiple streams into a single stream.
### scan
Scan is like fold, but instead of producing a single value it yields each
intermediate accumulation.
### startWith, startWithMany, startWithStream
Prepend a value, an iterable, or a stream to the beginning of another stream.
### switchMap, switchLatest
Flatten a Stream of Streams into a Stream which forwards values from the most
recent Stream
### takeUntil
Let values through until a Future fires.
### tap
Taps into a single-subscriber stream to react to values as they pass, without
being a real subscriber.
### throttle
Blocks events for a duration after an event is successfully emitted.
### whereType
Like `Iterable.whereType` for a stream.
## Comparison to Rx Operators
The semantics and naming in this package have some overlap, and some conflict,
with the [ReactiveX](https://reactivex.io/) suite of libraries. Some of the
conflict is intentional - Dart `Stream` predates `Observable` and coherence with
the Dart ecosystem semantics and naming is a strictly higher priority than
consistency with ReactiveX.
Rx Operator Category | variation | `stream_transform`
------------------------- | ------------------------------------------------------ | ------------------
[`sample`][rx_sample] | `sample/throttleLast(Duration)` | `sample(Stream.periodic(Duration), longPoll: false)`
​ | `throttleFirst(Duration)` | [`throttle`][throttle]
​ | `sample(Observable)` | `sample(trigger, longPoll: false)`
[`debounce`][rx_debounce] | `debounce/throttleWithTimeout(Duration)` | [`debounce`][debounce]
​ | `debounce(Observable)` | No equivalent
[`buffer`][rx_buffer] | `buffer(boundary)`, `bufferWithTime`,`bufferWithCount` | No equivalent
​ | `buffer(boundaryClosingSelector)` | `buffer(trigger, longPoll: false)`
RxJs extensions | [`audit(callback)`][rxjs_audit] | No equivalent
​ | [`auditTime(Duration)`][rxjs_auditTime] | [`audit`][audit]
​ | [`exhaustMap`][rxjs_exhaustMap] | No equivalent
​ | [`throttleTime(trailing: true)`][rxjs_throttleTime] | `throttle(trailing: true)`
​ | `throttleTime(leading: false, trailing: true)` | No equivalent
No equivalent? | | [`asyncMapBuffer`][asyncMapBuffer]
​ | | [`asyncMapSample`][asyncMapSample]
​ | | [`buffer`][buffer]
​ | | [`sample`][sample]
​ | | [`debounceBuffer`][debounceBuffer]
​ | | `debounce(leading: true, trailing: false)`
​ | | `debounce(leading: true, trailing: true)`
[rx_sample]:https://reactivex.io/documentation/operators/sample.html
[rx_debounce]:https://reactivex.io/documentation/operators/debounce.html
[rx_buffer]:https://reactivex.io/documentation/operators/buffer.html
[rxjs_audit]:https://rxjs.dev/api/operators/audit
[rxjs_auditTime]:https://rxjs.dev/api/operators/auditTime
[rxjs_throttleTime]:https://rxjs.dev/api/operators/throttleTime
[rxjs_exhaustMap]:https://rxjs.dev/api/operators/exhaustMap
[asyncMapBuffer]:https://pub.dev/documentation/stream_transform/latest/stream_transform/AsyncMap/asyncMapBuffer.html
[asyncMapSample]:https://pub.dev/documentation/stream_transform/latest/stream_transform/AsyncMap/asyncMapSample.html
[audit]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/audit.html
[buffer]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/buffer.html
[sample]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/sample.html
[debounceBuffer]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/debounceBuffer.html
[debounce]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/debounce.html
[throttle]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/throttle.html
## Getting a `StreamTransformer` instance
It may be useful to pass an instance of `StreamTransformer` so that it can be
used with `stream.transform` calls rather than reference the specific operator
in place. Any operator on `Stream` that returns a `Stream` can be modeled as a
`StreamTransformer` using the [`fromBind` constructor][fromBind].
```dart
final debounce = StreamTransformer.fromBind(
(s) => s.debounce(const Duration(milliseconds: 100)));
```
[fromBind]: https://api.dart.dev/stable/dart-async/StreamTransformer/StreamTransformer.fromBind.html