Merge pull request #1657 from dart-lang/merge-stream_transform-package
Merge `package:stream_transform`
diff --git a/.github/ISSUE_TEMPLATE/stream_transform.md b/.github/ISSUE_TEMPLATE/stream_transform.md
new file mode 100644
index 0000000..475bd83
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/stream_transform.md
@@ -0,0 +1,5 @@
+---
+name: "package:stream_transform"
+about: "Create a bug or file a feature request against package:stream_transform."
+labels: "package:stream_transform"
+---
\ No newline at end of file
diff --git a/.github/labeler.yml b/.github/labeler.yml
index c502a11..bfef316 100644
--- a/.github/labeler.yml
+++ b/.github/labeler.yml
@@ -108,6 +108,10 @@
- changed-files:
- any-glob-to-any-file: 'pkgs/sse/**'
+'package:stream_transform':
+ - changed-files:
+ - any-glob-to-any-file: 'pkgs/stream_transform/**'
+
'package:term_glyph':
- changed-files:
- any-glob-to-any-file: 'pkgs/term_glyph/**'
diff --git a/.github/workflows/clock.yaml b/.github/workflows/clock.yaml
index aef0895..a09a601 100644
--- a/.github/workflows/clock.yaml
+++ b/.github/workflows/clock.yaml
@@ -5,12 +5,12 @@
push:
branches: [ main ]
paths:
- - '.github/workflows/clock.yml'
+ - '.github/workflows/clock.yaml'
- 'pkgs/clock/**'
pull_request:
branches: [ main ]
paths:
- - '.github/workflows/clock.yml'
+ - '.github/workflows/clock.yaml'
- 'pkgs/clock/**'
schedule:
- cron: "0 0 * * 0"
diff --git a/.github/workflows/stream_transform.yaml b/.github/workflows/stream_transform.yaml
new file mode 100644
index 0000000..a36a776
--- /dev/null
+++ b/.github/workflows/stream_transform.yaml
@@ -0,0 +1,73 @@
+name: package:stream_transform
+
+on:
+ # Run on PRs and pushes to the default branch.
+ push:
+ branches: [ main ]
+ paths:
+ - '.github/workflows/stream_transform.yaml'
+ - 'pkgs/stream_transform/**'
+ pull_request:
+ branches: [ main ]
+ paths:
+ - '.github/workflows/stream_transform.yaml'
+ - 'pkgs/stream_transform/**'
+ schedule:
+ - cron: "0 0 * * 0"
+
+env:
+ PUB_ENVIRONMENT: bot.github
+
+
+defaults:
+ run:
+ working-directory: pkgs/stream_transform/
+
+jobs:
+ # Check code formatting and static analysis on a single OS (linux)
+ # against Dart dev.
+ analyze:
+ runs-on: ubuntu-latest
+ strategy:
+ fail-fast: false
+ matrix:
+ sdk: [dev]
+ steps:
+ - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
+ - uses: dart-lang/setup-dart@e630b99d28a3b71860378cafdc2a067c71107f94
+ with:
+ sdk: ${{ matrix.sdk }}
+ - id: install
+ name: Install dependencies
+ run: dart pub get
+ - name: Check formatting
+ run: dart format --output=none --set-exit-if-changed .
+ if: always() && steps.install.outcome == 'success'
+ - name: Analyze code
+ run: dart analyze --fatal-infos
+ if: always() && steps.install.outcome == 'success'
+
+ # Run tests on a matrix consisting of two dimensions:
+ # 1. OS: ubuntu-latest, (macos-latest, windows-latest)
+ # 2. release channel: dev
+ test:
+ needs: analyze
+ runs-on: ${{ matrix.os }}
+ strategy:
+ fail-fast: false
+ matrix:
+ # Add macos-latest and/or windows-latest if relevant for this package.
+ os: [ubuntu-latest]
+ # Bump SDK for Legacy tests when changing min SDK.
+ sdk: [3.1, dev]
+ steps:
+ - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
+ - uses: dart-lang/setup-dart@e630b99d28a3b71860378cafdc2a067c71107f94
+ with:
+ sdk: ${{ matrix.sdk }}
+ - id: install
+ name: Install dependencies
+ run: dart pub get
+ - name: Run tests
+ run: dart test -p chrome,vm --test-randomize-ordering-seed=random
+ if: always() && steps.install.outcome == 'success'
diff --git a/README.md b/README.md
index 7b742de..d1a1d04 100644
--- a/README.md
+++ b/README.md
@@ -40,6 +40,7 @@
| [source_maps](pkgs/source_maps/) | A library to programmatically manipulate source map files. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Asource_maps) | [](https://pub.dev/packages/source_maps) |
| [source_span](pkgs/source_span/) | Provides a standard representation for source code locations and spans. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Asource_span) | [](https://pub.dev/packages/source_span) |
| [sse](pkgs/sse/) | Provides client and server functionality for setting up bi-directional communication through Server Sent Events (SSE) and corresponding POST requests. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Asse) | [](https://pub.dev/packages/sse) |
+| [stream_transform](pkgs/stream_transform/) | A collection of utilities to transform and manipulate streams. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Astream_transform) | [](https://pub.dev/packages/stream_transform) |
| [term_glyph](pkgs/term_glyph/) | Useful Unicode glyphs and ASCII substitutes. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aterm_glyph) | [](https://pub.dev/packages/term_glyph) |
| [test_reflective_loader](pkgs/test_reflective_loader/) | Support for discovering tests and test suites using reflection. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Atest_reflective_loader) | [](https://pub.dev/packages/test_reflective_loader) |
| [timing](pkgs/timing/) | A simple package for tracking the performance of synchronous and asynchronous actions. | [](https://github.com/dart-lang/tools/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Atiming) | [](https://pub.dev/packages/timing) |
diff --git a/pkgs/stream_transform/.gitignore b/pkgs/stream_transform/.gitignore
new file mode 100644
index 0000000..bfffcc6
--- /dev/null
+++ b/pkgs/stream_transform/.gitignore
@@ -0,0 +1,6 @@
+.pub/
+.dart_tool/
+build/
+packages
+pubspec.lock
+.packages
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
new file mode 100644
index 0000000..a71b2fb
--- /dev/null
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -0,0 +1,185 @@
+## 2.1.1
+
+- Require Dart 3.1 or greater
+- Forward errors from the `trigger` future through to the result stream in
+ `takeUntil`. Previously an error would have not closed the stream, and instead
+ raised as an unhandled async error.
+- Move to `dart-lang/tools` monorepo.
+
+## 2.1.0
+
+- Add `whereNotNull`.
+
+## 2.0.1
+
+- Require Dart 2.14 or greater.
+- Wait for the future returned from `StreamSubscription.cancel()` before
+ listening to the subsequent stream in `switchLatest` and `switchMap`.
+
+## 2.0.0
+
+- Migrate to null safety.
+- Improve tests of `switchMap` and improve documentation with links and
+ clarification.
+- Add `trailing` argument to `throttle`.
+
+## 1.2.0
+
+- Add support for emitting the "leading" event in `debounce`.
+
+## 1.1.1
+
+- Fix a bug in `asyncMapSample`, `buffer`, `combineLatest`,
+ `combineLatestAll`, `merge`, and `mergeAll` which would cause an exception
+ when cancelling a subscription after using the transformer if the original
+ stream(s) returned `null` from cancelling their subscriptions.
+
+## 1.1.0
+
+- Add `concurrentAsyncExpand` to interleave events emitted by multiple sub
+ streams created by a callback.
+
+## 1.0.0
+
+- Remove the top level methods and retain the extensions only.
+
+## 0.0.20
+
+- Add extension methods for most transformers. These should be used in place
+ of the current methods. All current implementations are deprecated and will
+ be removed in the next major version bump.
+ - Migrating typical use: Instead of
+ `stream.transform(debounce(Duration(seconds: 1)))` use
+ `stream.debounce(Duration(seconds: 1))`.
+ - To migrate a usage where a `StreamTransformer` instance is stored or
+ passed see "Getting a StreamTransformer instance" on the README.
+- The `map` and `chainTransformers` utilities are no longer useful with the
+ new patterns so they are deprecated without a replacement. If you still have
+ a need for them they can be replicated with `StreamTransformer.fromBind`:
+
+ ```
+ // Replace `map(convert)`
+ StreamTransformer.fromBind((s) => s.map(convert));
+
+ // Replace `chainTransformers(first, second)`
+ StreamTransformer.fromBind((s) => s.transform(first).transform(second));
+ ```
+
+## 0.0.19
+
+- Add `asyncMapSample` transform.
+
+## 0.0.18
+
+- Internal cleanup. Passed "trigger" streams or futures now allow `<void>`
+ generic type rather than an implicit `dynamic>`
+
+## 0.0.17
+
+- Add concrete types to the `onError` callback in `tap`.
+
+## 0.0.16+1
+
+- Remove usage of Set literal which is not available before Dart 2.2.0
+
+## 0.0.16
+
+- Allow a `combine` callback to return a `FutureOr<T>` in `scan`. There are no
+ behavior changes for synchronous callbacks. **Potential breaking change** In
+ the unlikely situation where `scan` was used to produce a `Stream<Future>`
+ inference may now fail and require explicit generic type arguments.
+- Add `combineLatest`.
+- Add `combineLatestAll`.
+
+## 0.0.15
+
+- Add `whereType`.
+
+## 0.0.14+1
+
+- Allow using non-dev Dart 2 SDK.
+
+## 0.0.14
+
+- `asyncWhere` will now forward exceptions thrown by the callback through the
+ result Stream.
+- Added `concurrentAsyncMap`.
+
+## 0.0.13
+
+- `mergeAll` now accepts an `Iterable<Stream>` instead of only `List<Stream>`.
+
+## 0.0.12
+
+- Add `chainTransformers` and `map` for use cases where `StreamTransformer`
+ instances are stored as variables or passed to methods other than `transform`.
+
+## 0.0.11
+
+- Renamed `concat` as `followedBy` to match the naming of `Iterable.followedBy`.
+ `concat` is now deprecated.
+
+## 0.0.10
+
+- Updates to support Dart 2.0 core library changes (wave
+ 2.2). See [issue 31847][sdk#31847] for details.
+
+ [sdk#31847]: https://github.com/dart-lang/sdk/issues/31847
+
+## 0.0.9
+
+- Add `asyncMapBuffer`.
+
+## 0.0.8
+
+- Add `takeUntil`.
+
+## 0.0.7
+
+- Bug Fix: Streams produced with `scan` and `switchMap` now correctly report
+ `isBroadcast`.
+- Add `startWith`, `startWithMany`, and `startWithStream`.
+
+## 0.0.6
+
+- Bug Fix: Some transformers did not correctly add data to all listeners on
+ broadcast streams. Fixed for `throttle`, `debounce`, `asyncWhere` and `audit`.
+- Bug Fix: Only call the `tap` data callback once per event rather than once per
+ listener.
+- Bug Fix: Allow canceling and re-listening to broadcast streams after a
+ `merge` transform.
+- Bug Fix: Broadcast streams which are buffered using a single-subscription
+ trigger can be canceled and re-listened.
+- Bug Fix: Buffer outputs one more value if there is a pending trigger before
+ the trigger closes.
+- Bug Fix: Single-subscription streams concatted after broadcast streams are
+ handled correctly.
+- Use sync `StreamControllers` for forwarding where possible.
+
+## 0.0.5
+
+- Bug Fix: Allow compiling switchLatest with Dart2Js.
+- Add `asyncWhere`: Like `where` but allows an asynchronous predicate.
+
+## 0.0.4
+- Add `scan`: fold which returns intermediate values
+- Add `throttle`: block events for a duration after emitting a value
+- Add `audit`: emits the last event received after a duration
+
+## 0.0.3
+
+- Add `tap`: React to values as they pass without being a subscriber on a stream
+- Add `switchMap` and `switchLatest`: Flatten a Stream of Streams into a Stream
+ which forwards values from the most recent Stream
+
+## 0.0.2
+
+- Add `concat`: Appends streams in series
+- Add `merge` and `mergeAll`: Interleaves streams
+
+## 0.0.1
+
+- Initial release with the following utilities:
+ - `buffer`: Collects events in a `List` until a `trigger` stream fires.
+ - `debounce`, `debounceBuffer`: Collect or drop events which occur closer in
+ time than a given duration.
diff --git a/pkgs/stream_transform/LICENSE b/pkgs/stream_transform/LICENSE
new file mode 100644
index 0000000..03af64a
--- /dev/null
+++ b/pkgs/stream_transform/LICENSE
@@ -0,0 +1,27 @@
+Copyright 2017, the Dart project authors.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following
+ disclaimer in the documentation and/or other materials provided
+ with the distribution.
+ * Neither the name of Google LLC nor the names of its
+ contributors may be used to endorse or promote products derived
+ from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/pkgs/stream_transform/README.md b/pkgs/stream_transform/README.md
new file mode 100644
index 0000000..e7049bd
--- /dev/null
+++ b/pkgs/stream_transform/README.md
@@ -0,0 +1,141 @@
+[](https://github.com/dart-lang/tools/actions/workflows/stream_transform.yaml)
+[](https://pub.dev/packages/stream_transform)
+[](https://pub.dev/packages/stream_transform/publisher)
+
+Extension methods on `Stream` adding common transform operators.
+
+## Operators
+
+### asyncMapBuffer, asyncMapSample, concurrentAsyncMap
+
+Alternatives to `asyncMap`. `asyncMapBuffer` prevents the callback from
+overlapping execution and collects events while it is executing.
+`asyncMapSample` prevents overlapping execution and discards events while it is
+executing. `concurrentAsyncMap` allows overlap and removes ordering guarantees
+for higher throughput.
+
+Like `asyncMap` but events are buffered in a List until previous events have
+been processed rather than being called for each element individually.
+
+### asyncWhere
+
+Like `where` but allows an asynchronous predicate.
+
+### audit
+
+Waits for a period of time after receiving a value and then only emits the most
+recent value.
+
+### buffer
+
+Collects values from a source stream until a `trigger` stream fires and the
+collected values are emitted.
+
+### combineLatest, combineLatestAll
+
+Combine the most recent event from multiple streams through a callback or into a
+list.
+
+### debounce, debounceBuffer
+
+Prevents a source stream from emitting too frequently by dropping or collecting
+values that occur within a given duration.
+
+### followedBy
+
+Appends the values of a stream after another stream finishes.
+
+### merge, mergeAll, concurrentAsyncExpand
+
+Interleaves events from multiple streams into a single stream.
+
+### scan
+
+Scan is like fold, but instead of producing a single value it yields each
+intermediate accumulation.
+
+### startWith, startWithMany, startWithStream
+
+Prepend a value, an iterable, or a stream to the beginning of another stream.
+
+### switchMap, switchLatest
+
+Flatten a Stream of Streams into a Stream which forwards values from the most
+recent Stream
+
+### takeUntil
+
+Let values through until a Future fires.
+
+### tap
+
+Taps into a single-subscriber stream to react to values as they pass, without
+being a real subscriber.
+
+### throttle
+
+Blocks events for a duration after an event is successfully emitted.
+
+### whereType
+
+Like `Iterable.whereType` for a stream.
+
+## Comparison to Rx Operators
+
+The semantics and naming in this package have some overlap, and some conflict,
+with the [ReactiveX](https://reactivex.io/) suite of libraries. Some of the
+conflict is intentional - Dart `Stream` predates `Observable` and coherence with
+the Dart ecosystem semantics and naming is a strictly higher priority than
+consistency with ReactiveX.
+
+Rx Operator Category | variation | `stream_transform`
+------------------------- | ------------------------------------------------------ | ------------------
+[`sample`][rx_sample] | `sample/throttleLast(Duration)` | `sample(Stream.periodic(Duration), longPoll: false)`
+​ | `throttleFirst(Duration)` | [`throttle`][throttle]
+​ | `sample(Observable)` | `sample(trigger, longPoll: false)`
+[`debounce`][rx_debounce] | `debounce/throttleWithTimeout(Duration)` | [`debounce`][debounce]
+​ | `debounce(Observable)` | No equivalent
+[`buffer`][rx_buffer] | `buffer(boundary)`, `bufferWithTime`,`bufferWithCount` | No equivalent
+​ | `buffer(boundaryClosingSelector)` | `buffer(trigger, longPoll: false)`
+RxJs extensions | [`audit(callback)`][rxjs_audit] | No equivalent
+​ | [`auditTime(Duration)`][rxjs_auditTime] | [`audit`][audit]
+​ | [`exhaustMap`][rxjs_exhaustMap] | No equivalent
+​ | [`throttleTime(trailing: true)`][rxjs_throttleTime] | `throttle(trailing: true)`
+​ | `throttleTime(leading: false, trailing: true)` | No equivalent
+No equivalent? | | [`asyncMapBuffer`][asyncMapBuffer]
+​ | | [`asyncMapSample`][asyncMapSample]
+​ | | [`buffer`][buffer]
+​ | | [`sample`][sample]
+​ | | [`debounceBuffer`][debounceBuffer]
+​ | | `debounce(leading: true, trailing: false)`
+​ | | `debounce(leading: true, trailing: true)`
+
+[rx_sample]:https://reactivex.io/documentation/operators/sample.html
+[rx_debounce]:https://reactivex.io/documentation/operators/debounce.html
+[rx_buffer]:https://reactivex.io/documentation/operators/buffer.html
+[rxjs_audit]:https://rxjs.dev/api/operators/audit
+[rxjs_auditTime]:https://rxjs.dev/api/operators/auditTime
+[rxjs_throttleTime]:https://rxjs.dev/api/operators/throttleTime
+[rxjs_exhaustMap]:https://rxjs.dev/api/operators/exhaustMap
+[asyncMapBuffer]:https://pub.dev/documentation/stream_transform/latest/stream_transform/AsyncMap/asyncMapBuffer.html
+[asyncMapSample]:https://pub.dev/documentation/stream_transform/latest/stream_transform/AsyncMap/asyncMapSample.html
+[audit]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/audit.html
+[buffer]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/buffer.html
+[sample]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/sample.html
+[debounceBuffer]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/debounceBuffer.html
+[debounce]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/debounce.html
+[throttle]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/throttle.html
+
+## Getting a `StreamTransformer` instance
+
+It may be useful to pass an instance of `StreamTransformer` so that it can be
+used with `stream.transform` calls rather than reference the specific operator
+in place. Any operator on `Stream` that returns a `Stream` can be modeled as a
+`StreamTransformer` using the [`fromBind` constructor][fromBind].
+
+```dart
+final debounce = StreamTransformer.fromBind(
+ (s) => s.debounce(const Duration(milliseconds: 100)));
+```
+
+[fromBind]: https://api.dart.dev/stable/dart-async/StreamTransformer/StreamTransformer.fromBind.html
diff --git a/pkgs/stream_transform/analysis_options.yaml b/pkgs/stream_transform/analysis_options.yaml
new file mode 100644
index 0000000..05f1af1
--- /dev/null
+++ b/pkgs/stream_transform/analysis_options.yaml
@@ -0,0 +1,16 @@
+include: package:dart_flutter_team_lints/analysis_options.yaml
+
+analyzer:
+ language:
+ strict-casts: true
+ strict-raw-types: true
+
+linter:
+ rules:
+ - avoid_bool_literals_in_conditional_expressions
+ - avoid_classes_with_only_static_members
+ - avoid_returning_this
+ - avoid_unused_constructor_parameters
+ - cascade_invocations
+ - join_return_with_assignment
+ - no_adjacent_strings_in_list
diff --git a/pkgs/stream_transform/example/index.html b/pkgs/stream_transform/example/index.html
new file mode 100644
index 0000000..aecdc09
--- /dev/null
+++ b/pkgs/stream_transform/example/index.html
@@ -0,0 +1,11 @@
+<html>
+ <head>
+ <script defer src="main.dart.js" type="application/javascript"></script>
+ </head>
+ <body>
+ <input id="first_input"><br>
+ <input id="second_input"><br>
+ <p id="output">
+ </p>
+ </body>
+</html>
diff --git a/pkgs/stream_transform/example/main.dart b/pkgs/stream_transform/example/main.dart
new file mode 100644
index 0000000..70b3e7f
--- /dev/null
+++ b/pkgs/stream_transform/example/main.dart
@@ -0,0 +1,26 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:html';
+
+import 'package:stream_transform/stream_transform.dart';
+
+void main() {
+ var firstInput = document.querySelector('#first_input') as InputElement;
+ var secondInput = document.querySelector('#second_input') as InputElement;
+ var output = document.querySelector('#output')!;
+
+ _inputValues(firstInput)
+ .combineLatest(_inputValues(secondInput),
+ (first, second) => 'First: $first, Second: $second')
+ .tap((v) {
+ print('Saw: $v');
+ }).forEach((v) {
+ output.text = v;
+ });
+}
+
+Stream<String?> _inputValues(InputElement element) => element.onKeyUp
+ .debounce(const Duration(milliseconds: 100))
+ .map((_) => element.value);
diff --git a/pkgs/stream_transform/lib/src/aggregate_sample.dart b/pkgs/stream_transform/lib/src/aggregate_sample.dart
new file mode 100644
index 0000000..f2ff8ed
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/aggregate_sample.dart
@@ -0,0 +1,146 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'common_callbacks.dart';
+
+extension AggregateSample<T> on Stream<T> {
+ /// Computes a value based on sequences of events, then emits that value when
+ /// [trigger] emits an event.
+ ///
+ /// Every time this stream emits an event, an intermediate value is created
+ /// by combining the new event with the previous intermediate value, or with
+ /// `null` if there is no previous value, using the [aggregate] function.
+ ///
+ /// When [trigger] emits value, the returned stream emits the current
+ /// intermediate value and clears it.
+ ///
+ /// If [longPoll] is `false`, if there is no intermediate value when [trigger]
+ /// emits an event, the [onEmpty] function is called with a [Sink] which can
+ /// add events to the returned stream.
+ ///
+ /// If [longPoll] is `true`, and there is no intermediate value when [trigger]
+ /// emits one or more events, then the *next* event from this stream is
+ /// immediately put through [aggregate] and emitted on the returned stream.
+ /// Subsequent events on [trigger] while there have been no events on this
+ /// stream are ignored.
+ /// In that case, [onEmpty] is never used.
+ ///
+ /// The result stream will close as soon as there is a guarantee it will not
+ /// emit any more events. There will not be any more events emitted if:
+ /// - [trigger] is closed and there is no waiting long poll.
+ /// - Or, the source stream is closed and there are no buffered events.
+ ///
+ /// If the source stream is a broadcast stream, the result will be as well.
+ /// Errors from the source stream or the trigger are immediately forwarded to
+ /// the output.
+ Stream<S> aggregateSample<S>(
+ {required Stream<void> trigger,
+ required S Function(T, S?) aggregate,
+ required bool longPoll,
+ required void Function(Sink<S>) onEmpty}) {
+ var controller = isBroadcast
+ ? StreamController<S>.broadcast(sync: true)
+ : StreamController<S>(sync: true);
+
+ S? currentResults;
+ var hasCurrentResults = false;
+ var activeLongPoll = false;
+ var isTriggerDone = false;
+ var isValueDone = false;
+ StreamSubscription<T>? valueSub;
+ StreamSubscription<void>? triggerSub;
+
+ void emit(S results) {
+ currentResults = null;
+ hasCurrentResults = false;
+ controller.add(results);
+ }
+
+ void onValue(T value) {
+ currentResults = aggregate(value, currentResults);
+ hasCurrentResults = true;
+ if (!longPoll) return;
+
+ if (activeLongPoll) {
+ activeLongPoll = false;
+ emit(currentResults as S);
+ }
+
+ if (isTriggerDone) {
+ valueSub!.cancel();
+ controller.close();
+ }
+ }
+
+ void onValuesDone() {
+ isValueDone = true;
+ if (!hasCurrentResults) {
+ triggerSub?.cancel();
+ controller.close();
+ }
+ }
+
+ void onTrigger(_) {
+ if (hasCurrentResults) {
+ emit(currentResults as S);
+ } else if (longPoll) {
+ activeLongPoll = true;
+ } else {
+ onEmpty(controller);
+ }
+
+ if (isValueDone) {
+ triggerSub!.cancel();
+ controller.close();
+ }
+ }
+
+ void onTriggerDone() {
+ isTriggerDone = true;
+ if (!activeLongPoll) {
+ valueSub?.cancel();
+ controller.close();
+ }
+ }
+
+ controller.onListen = () {
+ assert(valueSub == null);
+ valueSub =
+ listen(onValue, onError: controller.addError, onDone: onValuesDone);
+ final priorTriggerSub = triggerSub;
+ if (priorTriggerSub != null) {
+ if (priorTriggerSub.isPaused) priorTriggerSub.resume();
+ } else {
+ triggerSub = trigger.listen(onTrigger,
+ onError: controller.addError, onDone: onTriggerDone);
+ }
+ if (!isBroadcast) {
+ controller
+ ..onPause = () {
+ valueSub?.pause();
+ triggerSub?.pause();
+ }
+ ..onResume = () {
+ valueSub?.resume();
+ triggerSub?.resume();
+ };
+ }
+ controller.onCancel = () {
+ var cancels = <Future<void>>[if (!isValueDone) valueSub!.cancel()];
+ valueSub = null;
+ if (trigger.isBroadcast || !isBroadcast) {
+ if (!isTriggerDone) cancels.add(triggerSub!.cancel());
+ triggerSub = null;
+ } else {
+ triggerSub!.pause();
+ }
+ if (cancels.isEmpty) return null;
+ return cancels.wait.then(ignoreArgument);
+ };
+ };
+ return controller.stream;
+ }
+}
diff --git a/pkgs/stream_transform/lib/src/async_expand.dart b/pkgs/stream_transform/lib/src/async_expand.dart
new file mode 100644
index 0000000..28d2f40
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/async_expand.dart
@@ -0,0 +1,89 @@
+// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'common_callbacks.dart';
+import 'switch.dart';
+
+/// Alternatives to [asyncExpand].
+///
+/// The built in [asyncExpand] will not overlap the inner streams and every
+/// event will be sent to the callback individually.
+///
+/// - [concurrentAsyncExpand] allow overlap and merges inner streams without
+/// ordering guarantees.
+extension AsyncExpand<T> on Stream<T> {
+ /// Like [asyncExpand] but the [convert] callback may be called for an element
+ /// before the [Stream] emitted by the previous element has closed.
+ ///
+ /// Events on the result stream will be emitted in the order they are emitted
+ /// by the sub streams, which may not match the order of this stream.
+ ///
+ /// Errors from [convert], the source stream, or any of the sub streams are
+ /// forwarded to the result stream.
+ ///
+ /// The result stream will not close until the source stream closes and all
+ /// sub streams have closed.
+ ///
+ /// If the source stream is a broadcast stream, the result will be as well,
+ /// regardless of the types of streams created by [convert]. In this case,
+ /// some care should be taken:
+ /// - If [convert] returns a single subscription stream it may be listened to
+ /// and never canceled.
+ /// - For any period of time where there are no listeners on the result
+ /// stream, any sub streams from previously emitted events will be ignored,
+ /// regardless of whether they emit further events after a listener is added
+ /// back.
+ ///
+ /// See also:
+ /// - [switchMap], which cancels subscriptions to the previous sub stream
+ /// instead of concurrently emitting events from all sub streams.
+ Stream<S> concurrentAsyncExpand<S>(Stream<S> Function(T) convert) {
+ final controller = isBroadcast
+ ? StreamController<S>.broadcast(sync: true)
+ : StreamController<S>(sync: true);
+
+ controller.onListen = () {
+ final subscriptions = <StreamSubscription<dynamic>>[];
+ final outerSubscription = map(convert).listen((inner) {
+ if (isBroadcast && !inner.isBroadcast) {
+ inner = inner.asBroadcastStream();
+ }
+ final subscription =
+ inner.listen(controller.add, onError: controller.addError);
+ subscription.onDone(() {
+ subscriptions.remove(subscription);
+ if (subscriptions.isEmpty) controller.close();
+ });
+ subscriptions.add(subscription);
+ }, onError: controller.addError);
+ outerSubscription.onDone(() {
+ subscriptions.remove(outerSubscription);
+ if (subscriptions.isEmpty) controller.close();
+ });
+ subscriptions.add(outerSubscription);
+ if (!isBroadcast) {
+ controller
+ ..onPause = () {
+ for (final subscription in subscriptions) {
+ subscription.pause();
+ }
+ }
+ ..onResume = () {
+ for (final subscription in subscriptions) {
+ subscription.resume();
+ }
+ };
+ }
+ controller.onCancel = () {
+ if (subscriptions.isEmpty) return null;
+ return [for (var s in subscriptions) s.cancel()]
+ .wait
+ .then(ignoreArgument);
+ };
+ };
+ return controller.stream;
+ }
+}
diff --git a/pkgs/stream_transform/lib/src/async_map.dart b/pkgs/stream_transform/lib/src/async_map.dart
new file mode 100644
index 0000000..094df9c
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/async_map.dart
@@ -0,0 +1,136 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'aggregate_sample.dart';
+import 'common_callbacks.dart';
+import 'from_handlers.dart';
+import 'rate_limit.dart';
+
+/// Alternatives to [asyncMap].
+///
+/// The built in [asyncMap] will not overlap execution of the passed callback,
+/// and every event will be sent to the callback individually.
+///
+/// - [asyncMapBuffer] prevents the callback from overlapping execution and
+/// collects events while it is executing to process in batches.
+/// - [asyncMapSample] prevents overlapping execution and discards events while
+/// it is executing.
+/// - [concurrentAsyncMap] allows overlap and removes ordering guarantees.
+extension AsyncMap<T> on Stream<T> {
+ /// Like [asyncMap] but events are buffered until previous events have been
+ /// processed by [convert].
+ ///
+ /// If this stream is a broadcast stream the result will be as well.
+ /// When used with a broadcast stream behavior also differs from [asyncMap] in
+ /// that the [convert] function is only called once per event, rather than
+ /// once per listener per event.
+ ///
+ /// The first event from this stream is always passed to [convert] as a
+ /// list with a single element.
+ /// After that, events are buffered until the previous Future returned from
+ /// [convert] has completed.
+ ///
+ /// Errors from this stream are forwarded directly to the result stream.
+ /// Errors during the conversion are also forwarded to the result stream and
+ /// are considered completing work so the next values are let through.
+ ///
+ /// The result stream will not close until this stream closes and all pending
+ /// conversions have finished.
+ Stream<S> asyncMapBuffer<S>(Future<S> Function(List<T>) convert) {
+ var workFinished = StreamController<void>()
+ // Let the first event through.
+ ..add(null);
+ return buffer(workFinished.stream)._asyncMapThen(convert, workFinished.add);
+ }
+
+ /// Like [asyncMap] but events are discarded while work is happening in
+ /// [convert].
+ ///
+ /// If this stream is a broadcast stream the result will be as well.
+ /// When used with a broadcast stream behavior also differs from [asyncMap] in
+ /// that the [convert] function is only called once per event, rather than
+ /// once per listener per event.
+ ///
+ /// If no work is happening when an event is emitted it will be immediately
+ /// passed to [convert]. If there is ongoing work when an event is emitted it
+ /// will be held until the work is finished. New events emitted will replace a
+ /// pending event.
+ ///
+ /// Errors from this stream are forwarded directly to the result stream.
+ /// Errors during the conversion are also forwarded to the result stream and
+ /// are considered completing work so the next values are let through.
+ ///
+ /// The result stream will not close until this stream closes and all pending
+ /// conversions have finished.
+ Stream<S> asyncMapSample<S>(Future<S> Function(T) convert) {
+ var workFinished = StreamController<void>()
+ // Let the first event through.
+ ..add(null);
+ return aggregateSample(
+ trigger: workFinished.stream,
+ aggregate: _dropPrevious,
+ longPoll: true,
+ onEmpty: ignoreArgument)
+ ._asyncMapThen(convert, workFinished.add);
+ }
+
+ /// Like [asyncMap] but the [convert] callback may be called for an element
+ /// before processing for the previous element is finished.
+ ///
+ /// Events on the result stream will be emitted in the order that [convert]
+ /// completed which may not match the order of this stream.
+ ///
+ /// If this stream is a broadcast stream the result will be as well.
+ /// When used with a broadcast stream behavior also differs from [asyncMap] in
+ /// that the [convert] function is only called once per event, rather than
+ /// once per listener per event. The [convert] callback won't be called for
+ /// events while a broadcast stream has no listener.
+ ///
+ /// Errors from [convert] or this stream are forwarded directly to the
+ /// result stream.
+ ///
+ /// The result stream will not close until this stream closes and all pending
+ /// conversions have finished.
+ Stream<S> concurrentAsyncMap<S>(FutureOr<S> Function(T) convert) {
+ var valuesWaiting = 0;
+ var sourceDone = false;
+ return transformByHandlers(onData: (element, sink) {
+ valuesWaiting++;
+ () async {
+ try {
+ sink.add(await convert(element));
+ } catch (e, st) {
+ sink.addError(e, st);
+ }
+ valuesWaiting--;
+ if (valuesWaiting <= 0 && sourceDone) sink.close();
+ }();
+ }, onDone: (sink) {
+ sourceDone = true;
+ if (valuesWaiting <= 0) sink.close();
+ });
+ }
+
+ /// Like [Stream.asyncMap] but the [convert] is only called once per event,
+ /// rather than once per listener, and [then] is called after completing the
+ /// work.
+ Stream<S> _asyncMapThen<S>(
+ Future<S> Function(T) convert, void Function(void) then) {
+ Future<void>? pendingEvent;
+ return transformByHandlers(onData: (event, sink) {
+ pendingEvent =
+ convert(event).then(sink.add).catchError(sink.addError).then(then);
+ }, onDone: (sink) {
+ if (pendingEvent != null) {
+ pendingEvent!.then((_) => sink.close());
+ } else {
+ sink.close();
+ }
+ });
+ }
+}
+
+T _dropPrevious<T>(T event, _) => event;
diff --git a/pkgs/stream_transform/lib/src/combine_latest.dart b/pkgs/stream_transform/lib/src/combine_latest.dart
new file mode 100644
index 0000000..f02a19e
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/combine_latest.dart
@@ -0,0 +1,240 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'common_callbacks.dart';
+
+/// Utilities to combine events from multiple streams through a callback or into
+/// a list.
+extension CombineLatest<T> on Stream<T> {
+ /// Combines the latest values from this stream with the latest values from
+ /// [other] using [combine].
+ ///
+ /// No event will be emitted until both the source stream and [other] have
+ /// each emitted at least one event. If either the source stream or [other]
+ /// emit multiple events before the other emits the first event, all but the
+ /// last value will be discarded. Once both streams have emitted at least
+ /// once, the result stream will emit any time either input stream emits.
+ ///
+ /// The result stream will not close until both the source stream and [other]
+ /// have closed.
+ ///
+ /// For example:
+ ///
+ /// source.combineLatest(other, (a, b) => a + b);
+ ///
+ /// source: --1--2--------4--|
+ /// other: -------3--|
+ /// result: -------5------7--|
+ ///
+ /// Errors thrown by [combine], along with any errors on the source stream or
+ /// [other], are forwarded to the result stream.
+ ///
+ /// If the source stream is a broadcast stream, the result stream will be as
+ /// well, regardless of [other]'s type. If a single subscription stream is
+ /// combined with a broadcast stream it may never be canceled.
+ Stream<S> combineLatest<T2, S>(
+ Stream<T2> other, FutureOr<S> Function(T, T2) combine) {
+ final controller = isBroadcast
+ ? StreamController<S>.broadcast(sync: true)
+ : StreamController<S>(sync: true);
+
+ other =
+ (isBroadcast && !other.isBroadcast) ? other.asBroadcastStream() : other;
+
+ StreamSubscription<T>? sourceSubscription;
+ StreamSubscription<T2>? otherSubscription;
+
+ var sourceDone = false;
+ var otherDone = false;
+
+ late T latestSource;
+ late T2 latestOther;
+
+ var sourceStarted = false;
+ var otherStarted = false;
+
+ void emitCombined() {
+ if (!sourceStarted || !otherStarted) return;
+ FutureOr<S> result;
+ try {
+ result = combine(latestSource, latestOther);
+ } catch (e, s) {
+ controller.addError(e, s);
+ return;
+ }
+ if (result is Future<S>) {
+ sourceSubscription!.pause();
+ otherSubscription!.pause();
+ result
+ .then(controller.add, onError: controller.addError)
+ .whenComplete(() {
+ sourceSubscription!.resume();
+ otherSubscription!.resume();
+ });
+ } else {
+ controller.add(result);
+ }
+ }
+
+ controller.onListen = () {
+ assert(sourceSubscription == null);
+ sourceSubscription = listen(
+ (s) {
+ sourceStarted = true;
+ latestSource = s;
+ emitCombined();
+ },
+ onError: controller.addError,
+ onDone: () {
+ sourceDone = true;
+ if (otherDone) {
+ controller.close();
+ } else if (!sourceStarted) {
+ // Nothing can ever be emitted
+ otherSubscription!.cancel();
+ controller.close();
+ }
+ });
+ otherSubscription = other.listen(
+ (o) {
+ otherStarted = true;
+ latestOther = o;
+ emitCombined();
+ },
+ onError: controller.addError,
+ onDone: () {
+ otherDone = true;
+ if (sourceDone) {
+ controller.close();
+ } else if (!otherStarted) {
+ // Nothing can ever be emitted
+ sourceSubscription!.cancel();
+ controller.close();
+ }
+ });
+ if (!isBroadcast) {
+ controller
+ ..onPause = () {
+ sourceSubscription!.pause();
+ otherSubscription!.pause();
+ }
+ ..onResume = () {
+ sourceSubscription!.resume();
+ otherSubscription!.resume();
+ };
+ }
+ controller.onCancel = () {
+ var cancels = [
+ sourceSubscription!.cancel(),
+ otherSubscription!.cancel()
+ ];
+ sourceSubscription = null;
+ otherSubscription = null;
+ return cancels.wait.then(ignoreArgument);
+ };
+ };
+ return controller.stream;
+ }
+
+ /// Combine the latest value emitted from the source stream with the latest
+ /// values emitted from [others].
+ ///
+ /// [combineLatestAll] subscribes to the source stream and [others] and when
+ /// any one of the streams emits, the result stream will emit a [List<T>] of
+ /// the latest values emitted from all streams.
+ ///
+ /// No event will be emitted until all source streams emit at least once. If a
+ /// source stream emits multiple values before another starts emitting, all
+ /// but the last value will be discarded. Once all source streams have emitted
+ /// at least once, the result stream will emit any time any source stream
+ /// emits.
+ ///
+ /// The result stream will not close until all source streams have closed.
+ /// When a source stream closes, the result stream will continue to emit the
+ /// last value from the closed stream when the other source streams emit until
+ /// the result stream has closed. If a source stream closes without emitting
+ /// any value, the result stream will close as well.
+ ///
+ /// For example:
+ ///
+ /// final combined = first
+ /// .combineLatestAll([second, third])
+ /// .map((data) => data.join());
+ ///
+ /// first: a----b------------------c--------d---|
+ /// second: --1---------2-----------------|
+ /// third: -------&----------%---|
+ /// combined: -------b1&--b2&---b2%---c2%------d2%-|
+ ///
+ /// Errors thrown by any source stream will be forwarded to the result stream.
+ ///
+ /// If the source stream is a broadcast stream, the result stream will be as
+ /// well, regardless of the types of [others]. If a single subscription stream
+ /// is combined with a broadcast source stream, it may never be canceled.
+ Stream<List<T>> combineLatestAll(Iterable<Stream<T>> others) {
+ final controller = isBroadcast
+ ? StreamController<List<T>>.broadcast(sync: true)
+ : StreamController<List<T>>(sync: true);
+
+ final allStreams = [
+ this,
+ for (final other in others)
+ !isBroadcast || other.isBroadcast ? other : other.asBroadcastStream(),
+ ];
+
+ controller.onListen = () {
+ final subscriptions = <StreamSubscription<T>>[];
+
+ final latestData = List<T?>.filled(allStreams.length, null);
+ final hasEmitted = <int>{};
+ void handleData(int index, T data) {
+ latestData[index] = data;
+ hasEmitted.add(index);
+ if (hasEmitted.length == allStreams.length) {
+ controller.add(List.from(latestData));
+ }
+ }
+
+ var streamId = 0;
+ for (final stream in allStreams) {
+ final index = streamId;
+
+ final subscription = stream.listen((data) => handleData(index, data),
+ onError: controller.addError);
+ subscription.onDone(() {
+ assert(subscriptions.contains(subscription));
+ subscriptions.remove(subscription);
+ if (subscriptions.isEmpty || !hasEmitted.contains(index)) {
+ controller.close();
+ }
+ });
+ subscriptions.add(subscription);
+
+ streamId++;
+ }
+ if (!isBroadcast) {
+ controller
+ ..onPause = () {
+ for (final subscription in subscriptions) {
+ subscription.pause();
+ }
+ }
+ ..onResume = () {
+ for (final subscription in subscriptions) {
+ subscription.resume();
+ }
+ };
+ }
+ controller.onCancel = () {
+ if (subscriptions.isEmpty) return null;
+ return [for (var s in subscriptions) s.cancel()]
+ .wait
+ .then(ignoreArgument);
+ };
+ };
+ return controller.stream;
+ }
+}
diff --git a/pkgs/stream_transform/lib/src/common_callbacks.dart b/pkgs/stream_transform/lib/src/common_callbacks.dart
new file mode 100644
index 0000000..c239220
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/common_callbacks.dart
@@ -0,0 +1,5 @@
+// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+void ignoreArgument(_) {}
diff --git a/pkgs/stream_transform/lib/src/concatenate.dart b/pkgs/stream_transform/lib/src/concatenate.dart
new file mode 100644
index 0000000..0330dd7
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/concatenate.dart
@@ -0,0 +1,112 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+/// Utilities to append or prepend to a stream.
+extension Concatenate<T> on Stream<T> {
+ /// Emits all values and errors from [next] following all values and errors
+ /// from this stream.
+ ///
+ /// If this stream never finishes, the [next] stream will never get a
+ /// listener.
+ ///
+ /// If this stream is a broadcast stream, the result will be as well.
+ /// If a single-subscription follows a broadcast stream it may be listened
+ /// to and never canceled since there may be broadcast listeners added later.
+ ///
+ /// If a broadcast stream follows any other stream it will miss any events or
+ /// errors which occur before this stream is done.
+ /// If a broadcast stream follows a single-subscription stream, pausing the
+ /// stream while it is listening to the second stream will cause events to be
+ /// dropped rather than buffered.
+ Stream<T> followedBy(Stream<T> next) {
+ var controller = isBroadcast
+ ? StreamController<T>.broadcast(sync: true)
+ : StreamController<T>(sync: true);
+
+ next = isBroadcast && !next.isBroadcast ? next.asBroadcastStream() : next;
+
+ StreamSubscription<T>? subscription;
+ var currentStream = this;
+ var thisDone = false;
+ var secondDone = false;
+
+ late void Function() currentDoneHandler;
+
+ void listen() {
+ subscription = currentStream.listen(controller.add,
+ onError: controller.addError, onDone: () => currentDoneHandler());
+ }
+
+ void onSecondDone() {
+ secondDone = true;
+ controller.close();
+ }
+
+ void onThisDone() {
+ thisDone = true;
+ currentStream = next;
+ currentDoneHandler = onSecondDone;
+ listen();
+ }
+
+ currentDoneHandler = onThisDone;
+
+ controller.onListen = () {
+ assert(subscription == null);
+ listen();
+ if (!isBroadcast) {
+ controller
+ ..onPause = () {
+ if (!thisDone || !next.isBroadcast) return subscription!.pause();
+ subscription!.cancel();
+ subscription = null;
+ }
+ ..onResume = () {
+ if (!thisDone || !next.isBroadcast) return subscription!.resume();
+ listen();
+ };
+ }
+ controller.onCancel = () {
+ if (secondDone) return null;
+ var toCancel = subscription!;
+ subscription = null;
+ return toCancel.cancel();
+ };
+ };
+ return controller.stream;
+ }
+
+ /// Emits [initial] before any values or errors from the this stream.
+ ///
+ /// If this stream is a broadcast stream the result will be as well.
+ /// If this stream is a broadcast stream, the returned stream will only
+ /// contain events of this stream that are emitted after the [initial] value
+ /// has been emitted on the returned stream.
+ Stream<T> startWith(T initial) =>
+ startWithStream(Future.value(initial).asStream());
+
+ /// Emits all values in [initial] before any values or errors from this
+ /// stream.
+ ///
+ /// If this stream is a broadcast stream the result will be as well.
+ /// If this stream is a broadcast stream it will miss any events which
+ /// occur before the initial values are all emitted.
+ Stream<T> startWithMany(Iterable<T> initial) =>
+ startWithStream(Stream.fromIterable(initial));
+
+ /// Emits all values and errors in [initial] before any values or errors from
+ /// this stream.
+ ///
+ /// If this stream is a broadcast stream the result will be as well.
+ /// If this stream is a broadcast stream it will miss any events which occur
+ /// before [initial] closes.
+ Stream<T> startWithStream(Stream<T> initial) {
+ if (isBroadcast && !initial.isBroadcast) {
+ initial = initial.asBroadcastStream();
+ }
+ return initial.followedBy(this);
+ }
+}
diff --git a/pkgs/stream_transform/lib/src/from_handlers.dart b/pkgs/stream_transform/lib/src/from_handlers.dart
new file mode 100644
index 0000000..1146a13
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/from_handlers.dart
@@ -0,0 +1,58 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+extension TransformByHandlers<S> on Stream<S> {
+ /// Transform a stream by callbacks.
+ ///
+ /// This is similar to `transform(StreamTransformer.fromHandler(...))` except
+ /// that the handlers are called once per event rather than called for the
+ /// same event for each listener on a broadcast stream.
+ Stream<T> transformByHandlers<T>(
+ {required void Function(S, EventSink<T>) onData,
+ void Function(Object, StackTrace, EventSink<T>)? onError,
+ void Function(EventSink<T>)? onDone}) {
+ final handleError = onError ?? _defaultHandleError;
+ final handleDone = onDone ?? _defaultHandleDone;
+
+ var controller = isBroadcast
+ ? StreamController<T>.broadcast(sync: true)
+ : StreamController<T>(sync: true);
+
+ StreamSubscription<S>? subscription;
+ controller.onListen = () {
+ assert(subscription == null);
+ var valuesDone = false;
+ subscription = listen((value) => onData(value, controller),
+ onError: (Object error, StackTrace stackTrace) {
+ handleError(error, stackTrace, controller);
+ }, onDone: () {
+ valuesDone = true;
+ handleDone(controller);
+ });
+ if (!isBroadcast) {
+ controller
+ ..onPause = subscription!.pause
+ ..onResume = subscription!.resume;
+ }
+ controller.onCancel = () {
+ var toCancel = subscription;
+ subscription = null;
+ if (!valuesDone) return toCancel!.cancel();
+ return null;
+ };
+ };
+ return controller.stream;
+ }
+
+ static void _defaultHandleError<T>(
+ Object error, StackTrace stackTrace, EventSink<T> sink) {
+ sink.addError(error, stackTrace);
+ }
+
+ static void _defaultHandleDone<T>(EventSink<T> sink) {
+ sink.close();
+ }
+}
diff --git a/pkgs/stream_transform/lib/src/merge.dart b/pkgs/stream_transform/lib/src/merge.dart
new file mode 100644
index 0000000..3bfe06c
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/merge.dart
@@ -0,0 +1,102 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'common_callbacks.dart';
+
+/// Utilities to interleave events from multiple streams.
+extension Merge<T> on Stream<T> {
+ /// Merges values and errors from this stream and [other] in any order as they
+ /// arrive.
+ ///
+ /// The result stream will not close until both this stream and [other] have
+ /// closed.
+ ///
+ /// For example:
+ ///
+ /// final result = source.merge(other);
+ ///
+ /// source: 1--2-----3--|
+ /// other: ------4-------5--|
+ /// result: 1--2--4--3----5--|
+ ///
+ /// If this stream is a broadcast stream, the result stream will be as
+ /// well, regardless of [other]'s type. If a single subscription stream is
+ /// merged into a broadcast stream it may never be canceled since there may be
+ /// broadcast listeners added later.
+ ///
+ /// If a broadcast stream is merged into a single-subscription stream any
+ /// events emitted by [other] before the result stream has a subscriber will
+ /// be discarded.
+ Stream<T> merge(Stream<T> other) => mergeAll([other]);
+
+ /// Merges values and errors from this stream and any stream in [others] in
+ /// any order as they arrive.
+ ///
+ /// The result stream will not close until this stream and all streams
+ /// in [others] have closed.
+ ///
+ /// For example:
+ ///
+ /// final result = first.mergeAll([second, third]);
+ ///
+ /// first: 1--2--------3--|
+ /// second: ---------4-------5--|
+ /// third: ------6---------------7--|
+ /// result: 1--2--6--4--3----5----7--|
+ ///
+ /// If this stream is a broadcast stream, the result stream will be as
+ /// well, regardless the types of streams in [others]. If a single
+ /// subscription stream is merged into a broadcast stream it may never be
+ /// canceled since there may be broadcast listeners added later.
+ ///
+ /// If a broadcast stream is merged into a single-subscription stream any
+ /// events emitted by that stream before the result stream has a subscriber
+ /// will be discarded.
+ Stream<T> mergeAll(Iterable<Stream<T>> others) {
+ final controller = isBroadcast
+ ? StreamController<T>.broadcast(sync: true)
+ : StreamController<T>(sync: true);
+
+ final allStreams = [
+ this,
+ for (final other in others)
+ !isBroadcast || other.isBroadcast ? other : other.asBroadcastStream(),
+ ];
+
+ controller.onListen = () {
+ final subscriptions = <StreamSubscription<T>>[];
+ for (final stream in allStreams) {
+ final subscription =
+ stream.listen(controller.add, onError: controller.addError);
+ subscription.onDone(() {
+ subscriptions.remove(subscription);
+ if (subscriptions.isEmpty) controller.close();
+ });
+ subscriptions.add(subscription);
+ }
+ if (!isBroadcast) {
+ controller
+ ..onPause = () {
+ for (final subscription in subscriptions) {
+ subscription.pause();
+ }
+ }
+ ..onResume = () {
+ for (final subscription in subscriptions) {
+ subscription.resume();
+ }
+ };
+ }
+ controller.onCancel = () {
+ if (subscriptions.isEmpty) return null;
+ return [for (var s in subscriptions) s.cancel()]
+ .wait
+ .then(ignoreArgument);
+ };
+ };
+ return controller.stream;
+ }
+}
diff --git a/pkgs/stream_transform/lib/src/rate_limit.dart b/pkgs/stream_transform/lib/src/rate_limit.dart
new file mode 100644
index 0000000..299c230
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/rate_limit.dart
@@ -0,0 +1,356 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'aggregate_sample.dart';
+import 'common_callbacks.dart';
+import 'from_handlers.dart';
+
+/// Utilities to rate limit events.
+///
+/// - [debounce] - emit the the _first_ or _last_ event of a series of closely
+/// spaced events.
+/// - [debounceBuffer] - emit _all_ events at the _end_ of a series of closely
+/// spaced events.
+/// - [throttle] - emit the _first_ event at the _beginning_ of the period.
+/// - [audit] - emit the _last_ event at the _end_ of the period.
+/// - [buffer] - emit _all_ events on a _trigger_.
+extension RateLimit<T> on Stream<T> {
+ /// Suppresses events with less inter-event spacing than [duration].
+ ///
+ /// Events which are emitted with less than [duration] elapsed between them
+ /// are considered to be part of the same "series". If [leading] is `true`,
+ /// the first event of this series is emitted immediately. If [trailing] is
+ /// `true` the last event of this series is emitted with a delay of at least
+ /// [duration]. By default only trailing events are emitted, both arguments
+ /// must be specified with `leading: true, trailing: false` to emit only
+ /// leading events.
+ ///
+ /// If this stream is a broadcast stream, the result will be as well.
+ /// Errors are forwarded immediately.
+ ///
+ /// If there is a trailing event waiting during the debounce period when the
+ /// source stream closes the returned stream will wait to emit it following
+ /// the debounce period before closing. If there is no pending debounced event
+ /// when this stream closes the returned stream will close immediately.
+ ///
+ /// For example:
+ ///
+ /// source.debounce(Duration(seconds: 1));
+ ///
+ /// source: 1-2-3---4---5-6-|
+ /// result: ------3---4-----6|
+ ///
+ /// source.debounce(Duration(seconds: 1), leading: true, trailing: false);
+ ///
+ /// source: 1-2-3---4---5-6-|
+ /// result: 1-------4---5---|
+ ///
+ /// source.debounce(Duration(seconds: 1), leading: true);
+ ///
+ /// source: 1-2-3---4---5-6-|
+ /// result: 1-----3-4---5---6|
+ ///
+ /// To collect values emitted during the debounce period see [debounceBuffer].
+ Stream<T> debounce(Duration duration,
+ {bool leading = false, bool trailing = true}) =>
+ _debounceAggregate(duration, _dropPrevious,
+ leading: leading, trailing: trailing);
+
+ /// Buffers values until this stream does not emit for [duration] then emits
+ /// the collected values.
+ ///
+ /// Values will always be delayed by at least [duration], and values which
+ /// come within this time will be aggregated into the same list.
+ ///
+ /// If this stream is a broadcast stream, the result will be as well.
+ /// Errors are forwarded immediately.
+ ///
+ /// If there are events waiting during the debounce period when this stream
+ /// closes the returned stream will wait to emit them following the debounce
+ /// period before closing. If there are no pending debounced events when this
+ /// stream closes the returned stream will close immediately.
+ ///
+ /// To keep only the most recent event during the debounce period see
+ /// [debounce].
+ Stream<List<T>> debounceBuffer(Duration duration) =>
+ _debounceAggregate(duration, _collect, leading: false, trailing: true);
+
+ /// Reduces the rate that events are emitted to at most once per [duration].
+ ///
+ /// No events will ever be emitted within [duration] of another event on the
+ /// result stream.
+ /// If this stream is a broadcast stream, the result will be as well.
+ /// Errors are forwarded immediately.
+ ///
+ /// If [trailing] is `false`, source events emitted during the [duration]
+ /// period following a result event are discarded.
+ /// The result stream will not emit an event until this stream emits an event
+ /// following the throttled period.
+ /// If this stream is consistently emitting events with less than
+ /// [duration] between events, the time between events on the result stream
+ /// may still be more than [duration].
+ /// The result stream will close immediately when this stream closes.
+ ///
+ /// If [trailing] is `true`, the latest source event emitted during the
+ /// [duration] period following an result event is held and emitted following
+ /// the period.
+ /// If this stream is consistently emitting events with less than [duration]
+ /// between events, the time between events on the result stream will be
+ /// [duration].
+ /// If this stream closes the result stream will wait to emit a pending event
+ /// before closing.
+ ///
+ /// For example:
+ ///
+ /// source.throttle(Duration(seconds: 6));
+ ///
+ /// source: 1-2-3---4-5-6---7-8-|
+ /// result: 1-------4-------7---|
+ ///
+ /// source.throttle(Duration(seconds: 6), trailing: true);
+ ///
+ /// source: 1-2-3---4-5----6--|
+ /// result: 1-----3-----5-----6|
+ ///
+ /// source.throttle(Duration(seconds: 6), trailing: true);
+ ///
+ /// source: 1-2-----------3|
+ /// result: 1-----2-------3|
+ ///
+ /// See also:
+ /// - [audit], which emits the most recent event at the end of the period.
+ /// Compared to `audit`, `throttle` will not introduce delay to forwarded
+ /// elements, except for the [trailing] events.
+ /// - [debounce], which uses inter-event spacing instead of a fixed period
+ /// from the first event in a window. Compared to `debouce`, `throttle` cannot
+ /// be starved by having events emitted continuously within [duration].
+ Stream<T> throttle(Duration duration, {bool trailing = false}) =>
+ trailing ? _throttleTrailing(duration) : _throttle(duration);
+
+ Stream<T> _throttle(Duration duration) {
+ Timer? timer;
+
+ return transformByHandlers(onData: (data, sink) {
+ if (timer == null) {
+ sink.add(data);
+ timer = Timer(duration, () {
+ timer = null;
+ });
+ }
+ });
+ }
+
+ Stream<T> _throttleTrailing(Duration duration) {
+ Timer? timer;
+ T? pending;
+ var hasPending = false;
+ var isDone = false;
+
+ return transformByHandlers(onData: (data, sink) {
+ void onTimer() {
+ if (hasPending) {
+ sink.add(pending as T);
+ if (isDone) {
+ sink.close();
+ } else {
+ timer = Timer(duration, onTimer);
+ hasPending = false;
+ pending = null;
+ }
+ } else {
+ timer = null;
+ }
+ }
+
+ if (timer == null) {
+ sink.add(data);
+ timer = Timer(duration, onTimer);
+ } else {
+ hasPending = true;
+ pending = data;
+ }
+ }, onDone: (sink) {
+ isDone = true;
+ if (hasPending) return; // Will be closed by timer.
+ sink.close();
+ timer?.cancel();
+ timer = null;
+ });
+ }
+
+ /// Audit a single event from each [duration] length period where there are
+ /// events on this stream.
+ ///
+ /// No events will ever be emitted within [duration] of another event on the
+ /// result stream.
+ /// If this stream is a broadcast stream, the result will be as well.
+ /// Errors are forwarded immediately.
+ ///
+ /// The first event will begin the audit period. At the end of the audit
+ /// period the most recent event is emitted, and the next event restarts the
+ /// audit period.
+ ///
+ /// If the event that started the period is the one that is emitted it will be
+ /// delayed by [duration]. If a later event comes in within the period it's
+ /// delay will be shorter by the difference in arrival times.
+ ///
+ /// If there is no pending event when this stream closes the output
+ /// stream will close immediately. If there is a pending event the output
+ /// stream will wait to emit it before closing.
+ ///
+ /// For example:
+ ///
+ /// source.audit(Duration(seconds: 5));
+ ///
+ /// source: a------b--c----d--|
+ /// output: -----a------c--------d|
+ ///
+ /// See also:
+ /// - [throttle], which emits the _first_ event during the window, instead of
+ /// the last event in the window. Compared to `throttle`, `audit` will
+ /// introduce delay to forwarded events.
+ /// - [debounce], which only emits after the stream has not emitted for some
+ /// period. Compared to `debouce`, `audit` cannot be starved by having events
+ /// emitted continuously within [duration].
+ Stream<T> audit(Duration duration) {
+ Timer? timer;
+ var shouldClose = false;
+ T recentData;
+
+ return transformByHandlers(onData: (data, sink) {
+ recentData = data;
+ timer ??= Timer(duration, () {
+ sink.add(recentData);
+ timer = null;
+ if (shouldClose) {
+ sink.close();
+ }
+ });
+ }, onDone: (sink) {
+ if (timer != null) {
+ shouldClose = true;
+ } else {
+ sink.close();
+ }
+ });
+ }
+
+ /// Buffers the values emitted on this stream and emits them when [trigger]
+ /// emits an event.
+ ///
+ /// If [longPoll] is `false`, if there are no buffered values when [trigger]
+ /// emits an empty list is immediately emitted.
+ ///
+ /// If [longPoll] is `true`, and there are no buffered values when [trigger]
+ /// emits one or more events, then the *next* value from this stream is
+ /// immediately emitted on the returned stream as a single element list.
+ /// Subsequent events on [trigger] while there have been no events on this
+ /// stream are ignored.
+ ///
+ /// The result stream will close as soon as there is a guarantee it will not
+ /// emit any more events. There will not be any more events emitted if:
+ /// - [trigger] is closed and there is no waiting long poll.
+ /// - Or, this stream is closed and previously buffered events have been
+ /// delivered.
+ ///
+ /// If this stream is a broadcast stream, the result will be as well.
+ /// Errors from this stream or the trigger are immediately forwarded to the
+ /// output.
+ ///
+ /// See also:
+ /// - [sample] which use a [trigger] stream in the same way, but keeps only
+ /// the most recent source event.
+ Stream<List<T>> buffer(Stream<void> trigger, {bool longPoll = true}) =>
+ aggregateSample(
+ trigger: trigger,
+ aggregate: _collect,
+ longPoll: longPoll,
+ onEmpty: _empty);
+
+ /// Emits the most recent new value from this stream when [trigger] emits an
+ /// event.
+ ///
+ /// If [longPoll] is `false`, then an event on [trigger] when there is no
+ /// pending source event will be ignored.
+ /// If [longPoll] is `true` (the default), then an event on [trigger] when
+ /// there is no pending source event will cause the next source event
+ /// to immediately flow to the result stream.
+ ///
+ /// If [longPoll] is `false`, if there is no pending source event when
+ /// [trigger] emits, then the trigger event will be ignored.
+ ///
+ /// If [longPoll] is `true`, and there are no buffered values when [trigger]
+ /// emits one or more events, then the *next* value from this stream is
+ /// immediately emitted on the returned stream as a single element list.
+ /// Subsequent events on [trigger] while there have been no events on this
+ /// stream are ignored.
+ ///
+ /// The result stream will close as soon as there is a guarantee it will not
+ /// emit any more events. There will not be any more events emitted if:
+ /// - [trigger] is closed and there is no waiting long poll.
+ /// - Or, this source stream is closed and any pending source event has been
+ /// delivered.
+ ///
+ /// If this source stream is a broadcast stream, the result will be as well.
+ /// Errors from this source stream or the trigger are immediately forwarded to
+ /// the output.
+ ///
+ /// See also:
+ /// - [buffer] which use [trigger] stream in the same way, but keeps a list of
+ /// pending source events.
+ Stream<T> sample(Stream<void> trigger, {bool longPoll = true}) =>
+ aggregateSample(
+ trigger: trigger,
+ aggregate: _dropPrevious,
+ longPoll: longPoll,
+ onEmpty: ignoreArgument);
+
+ /// Aggregates values until this source stream does not emit for [duration],
+ /// then emits the aggregated values.
+ Stream<S> _debounceAggregate<S>(
+ Duration duration, S Function(T element, S? soFar) collect,
+ {required bool leading, required bool trailing}) {
+ Timer? timer;
+ S? soFar;
+ var hasPending = false;
+ var shouldClose = false;
+ var emittedLatestAsLeading = false;
+
+ return transformByHandlers(onData: (value, sink) {
+ void emit() {
+ sink.add(soFar as S);
+ soFar = null;
+ hasPending = false;
+ }
+
+ timer?.cancel();
+ soFar = collect(value, soFar);
+ hasPending = true;
+ if (timer == null && leading) {
+ emittedLatestAsLeading = true;
+ emit();
+ } else {
+ emittedLatestAsLeading = false;
+ }
+ timer = Timer(duration, () {
+ if (trailing && !emittedLatestAsLeading) emit();
+ if (shouldClose) sink.close();
+ timer = null;
+ });
+ }, onDone: (EventSink<S> sink) {
+ if (hasPending && trailing) {
+ shouldClose = true;
+ } else {
+ timer?.cancel();
+ sink.close();
+ }
+ });
+ }
+}
+
+T _dropPrevious<T>(T element, _) => element;
+List<T> _collect<T>(T event, List<T>? soFar) => (soFar ?? <T>[])..add(event);
+void _empty<T>(Sink<List<T>> sink) => sink.add([]);
diff --git a/pkgs/stream_transform/lib/src/scan.dart b/pkgs/stream_transform/lib/src/scan.dart
new file mode 100644
index 0000000..acd3c76
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/scan.dart
@@ -0,0 +1,31 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+/// A utility similar to [fold] which emits intermediate accumulations.
+extension Scan<T> on Stream<T> {
+ /// Emits a sequence of the accumulated values from repeatedly applying
+ /// [combine].
+ ///
+ /// Like [fold], but instead of producing a single value it yields each
+ /// intermediate result.
+ ///
+ /// If [combine] returns a future it will not be called again for subsequent
+ /// events from the source until it completes, therefore [combine] is always
+ /// called for elements in order, and the result stream always maintains the
+ /// same order as this stream.
+ Stream<S> scan<S>(
+ S initialValue, FutureOr<S> Function(S soFar, T element) combine) {
+ var accumulated = initialValue;
+ return asyncMap((value) {
+ var result = combine(accumulated, value);
+ if (result is Future<S>) {
+ return result.then((r) => accumulated = r);
+ } else {
+ return accumulated = result;
+ }
+ });
+ }
+}
diff --git a/pkgs/stream_transform/lib/src/switch.dart b/pkgs/stream_transform/lib/src/switch.dart
new file mode 100644
index 0000000..546036e
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/switch.dart
@@ -0,0 +1,135 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'async_expand.dart';
+import 'common_callbacks.dart';
+
+/// A utility to take events from the most recent sub stream returned by a
+/// callback.
+extension Switch<T> on Stream<T> {
+ /// Maps events to a Stream and emits values from the most recently created
+ /// Stream.
+ ///
+ /// When the source emits a value it will be converted to a [Stream] using
+ /// [convert] and the output will switch to emitting events from that result.
+ /// Like [asyncExpand] but the [Stream] emitted by a previous element
+ /// will be ignored as soon as the source stream emits a new event.
+ ///
+ /// This means that the source stream is not paused until a sub stream
+ /// returned from the [convert] callback is done. Instead, the subscription
+ /// to the sub stream is canceled as soon as the source stream emits a new
+ /// event.
+ ///
+ /// Errors from [convert], the source stream, or any of the sub streams are
+ /// forwarded to the result stream.
+ ///
+ /// The result stream will not close until the source stream closes and
+ /// the current sub stream have closed.
+ ///
+ /// If the source stream is a broadcast stream, the result will be as well,
+ /// regardless of the types of streams created by [convert]. In this case,
+ /// some care should be taken:
+ ///
+ /// * If [convert] returns a single subscription stream it may be listened to
+ /// and never canceled.
+ ///
+ /// See also:
+ /// - [concurrentAsyncExpand], which emits events from all sub streams
+ /// concurrently instead of cancelling subscriptions to previous subs
+ /// streams.
+ Stream<S> switchMap<S>(Stream<S> Function(T) convert) {
+ return map(convert).switchLatest();
+ }
+}
+
+/// A utility to take events from the most recent sub stream.
+extension SwitchLatest<T> on Stream<Stream<T>> {
+ /// Emits values from the most recently emitted Stream.
+ ///
+ /// When the source emits a stream, the output will switch to emitting events
+ /// from that stream.
+ ///
+ /// Whether the source stream is a single-subscription stream or a
+ /// broadcast stream, the result stream will be the same kind of stream,
+ /// regardless of the types of streams emitted.
+ Stream<T> switchLatest() {
+ var controller = isBroadcast
+ ? StreamController<T>.broadcast(sync: true)
+ : StreamController<T>(sync: true);
+
+ controller.onListen = () {
+ StreamSubscription<T>? innerSubscription;
+ var outerStreamDone = false;
+
+ void listenToInnerStream(Stream<T> innerStream) {
+ assert(innerSubscription == null);
+ var subscription = innerStream
+ .listen(controller.add, onError: controller.addError, onDone: () {
+ innerSubscription = null;
+ if (outerStreamDone) controller.close();
+ });
+ // If a pause happens during an innerSubscription.cancel,
+ // we still listen to the next stream when the cancel is done.
+ // Then we immediately pause it again here.
+ if (controller.isPaused) subscription.pause();
+ innerSubscription = subscription;
+ }
+
+ var addError = controller.addError;
+ final outerSubscription = listen(null, onError: addError, onDone: () {
+ outerStreamDone = true;
+ if (innerSubscription == null) controller.close();
+ });
+ outerSubscription.onData((innerStream) async {
+ var currentSubscription = innerSubscription;
+ if (currentSubscription == null) {
+ listenToInnerStream(innerStream);
+ return;
+ }
+ innerSubscription = null;
+ outerSubscription.pause();
+ try {
+ await currentSubscription.cancel();
+ } catch (error, stack) {
+ controller.addError(error, stack);
+ } finally {
+ if (!isBroadcast && !controller.hasListener) {
+ // Result single-subscription stream subscription was cancelled
+ // while waiting for previous innerStream cancel.
+ //
+ // Ensure that the last received stream is also listened to and
+ // cancelled, then do nothing further.
+ innerStream.listen(null).cancel().ignore();
+ } else {
+ outerSubscription.resume();
+ listenToInnerStream(innerStream);
+ }
+ }
+ });
+ if (!isBroadcast) {
+ controller
+ ..onPause = () {
+ innerSubscription?.pause();
+ outerSubscription.pause();
+ }
+ ..onResume = () {
+ innerSubscription?.resume();
+ outerSubscription.resume();
+ };
+ }
+ controller.onCancel = () {
+ var sub = innerSubscription;
+ var cancels = [
+ if (!outerStreamDone) outerSubscription.cancel(),
+ if (sub != null) sub.cancel(),
+ ];
+ if (cancels.isEmpty) return null;
+ return cancels.wait.then(ignoreArgument);
+ };
+ };
+ return controller.stream;
+ }
+}
diff --git a/pkgs/stream_transform/lib/src/take_until.dart b/pkgs/stream_transform/lib/src/take_until.dart
new file mode 100644
index 0000000..e6deaa1
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/take_until.dart
@@ -0,0 +1,64 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+/// A utility to end a stream based on an external trigger.
+extension TakeUntil<T> on Stream<T> {
+ /// Takes values from this stream which are emitted before [trigger]
+ /// completes.
+ ///
+ /// Completing [trigger] differs from canceling a subscription in that values
+ /// which are emitted before the trigger, but have further asynchronous delays
+ /// in transformations following the takeUtil, will still go through.
+ /// Cancelling a subscription immediately stops values.
+ ///
+ /// If [trigger] completes as an error, the error will be forwarded through
+ /// the result stream before the result stream closes.
+ ///
+ /// If [trigger] completes as a value or as an error after this stream has
+ /// already ended, the completion will be ignored.
+ Stream<T> takeUntil(Future<void> trigger) {
+ var controller = isBroadcast
+ ? StreamController<T>.broadcast(sync: true)
+ : StreamController<T>(sync: true);
+
+ StreamSubscription<T>? subscription;
+ var isDone = false;
+ trigger.then((_) {
+ if (isDone) return;
+ isDone = true;
+ subscription?.cancel();
+ controller.close();
+ }, onError: (Object error, StackTrace stackTrace) {
+ if (isDone) return;
+ isDone = true;
+ controller
+ ..addError(error, stackTrace)
+ ..close();
+ });
+
+ controller.onListen = () {
+ if (isDone) return;
+ subscription =
+ listen(controller.add, onError: controller.addError, onDone: () {
+ if (isDone) return;
+ isDone = true;
+ controller.close();
+ });
+ if (!isBroadcast) {
+ controller
+ ..onPause = subscription!.pause
+ ..onResume = subscription!.resume;
+ }
+ controller.onCancel = () {
+ if (isDone) return null;
+ var toCancel = subscription!;
+ subscription = null;
+ return toCancel.cancel();
+ };
+ };
+ return controller.stream;
+ }
+}
diff --git a/pkgs/stream_transform/lib/src/tap.dart b/pkgs/stream_transform/lib/src/tap.dart
new file mode 100644
index 0000000..4b16ab5
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/tap.dart
@@ -0,0 +1,44 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+import 'from_handlers.dart';
+
+/// A utility to chain extra behavior on a stream.
+extension Tap<T> on Stream<T> {
+ /// Taps into this stream to allow additional handling on a single-subscriber
+ /// stream without first wrapping as a broadcast stream.
+ ///
+ /// The [onValue] callback will be called with every value from this stream
+ /// before it is forwarded to listeners on the resulting stream.
+ /// May be null if only [onError] or [onDone] callbacks are needed.
+ ///
+ /// The [onError] callback will be called with every error from this stream
+ /// before it is forwarded to listeners on the resulting stream.
+ ///
+ /// The [onDone] callback will be called after this stream closes and before
+ /// the resulting stream is closed.
+ ///
+ /// Errors from any of the callbacks are caught and ignored.
+ ///
+ /// The callbacks may not be called until the tapped stream has a listener,
+ /// and may not be called after the listener has canceled the subscription.
+ Stream<T> tap(void Function(T)? onValue,
+ {void Function(Object, StackTrace)? onError,
+ void Function()? onDone}) =>
+ transformByHandlers(onData: (value, sink) {
+ try {
+ onValue?.call(value);
+ } catch (_) {/*Ignore*/}
+ sink.add(value);
+ }, onError: (error, stackTrace, sink) {
+ try {
+ onError?.call(error, stackTrace);
+ } catch (_) {/*Ignore*/}
+ sink.addError(error, stackTrace);
+ }, onDone: (sink) {
+ try {
+ onDone?.call();
+ } catch (_) {/*Ignore*/}
+ sink.close();
+ });
+}
diff --git a/pkgs/stream_transform/lib/src/where.dart b/pkgs/stream_transform/lib/src/where.dart
new file mode 100644
index 0000000..76aa28a
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/where.dart
@@ -0,0 +1,71 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'from_handlers.dart';
+
+/// Utilities to filter events.
+extension Where<T> on Stream<T> {
+ /// Discards events from this stream that are not of type [S].
+ ///
+ /// If the source stream is a broadcast stream the result will be as well.
+ ///
+ /// Errors from the source stream are forwarded directly to the result stream.
+ ///
+ /// [S] should be a subtype of the stream's generic type, otherwise nothing of
+ /// type [S] could possibly be emitted, however there is no static or runtime
+ /// checking that this is the case.
+ Stream<S> whereType<S>() => transformByHandlers(onData: (event, sink) {
+ if (event is S) sink.add(event);
+ });
+
+ /// Discards events from this stream based on an asynchronous [test] callback.
+ ///
+ /// Like [where] but allows the [test] to return a [Future].
+ ///
+ /// Events on the result stream will be emitted in the order that [test]
+ /// completes which may not match the order of this stream.
+ ///
+ /// If the source stream is a broadcast stream the result will be as well.
+ /// When used with a broadcast stream behavior also differs from [where] in
+ /// that the [test] function is only called once per event, rather than once
+ /// per listener per event.
+ ///
+ /// Errors from the source stream are forwarded directly to the result stream.
+ /// Errors from [test] are also forwarded to the result stream.
+ ///
+ /// The result stream will not close until the source stream closes and all
+ /// pending [test] calls have finished.
+ Stream<T> asyncWhere(FutureOr<bool> Function(T) test) {
+ var valuesWaiting = 0;
+ var sourceDone = false;
+ return transformByHandlers(onData: (element, sink) {
+ valuesWaiting++;
+ () async {
+ try {
+ if (await test(element)) sink.add(element);
+ } catch (e, st) {
+ sink.addError(e, st);
+ }
+ valuesWaiting--;
+ if (valuesWaiting <= 0 && sourceDone) sink.close();
+ }();
+ }, onDone: (sink) {
+ sourceDone = true;
+ if (valuesWaiting <= 0) sink.close();
+ });
+ }
+}
+
+extension WhereNotNull<T extends Object> on Stream<T?> {
+ /// Discards `null` events from this stream.
+ ///
+ /// If the source stream is a broadcast stream the result will be as well.
+ ///
+ /// Errors from the source stream are forwarded directly to the result stream.
+ Stream<T> whereNotNull() => transformByHandlers(onData: (event, sink) {
+ if (event != null) sink.add(event);
+ });
+}
diff --git a/pkgs/stream_transform/lib/stream_transform.dart b/pkgs/stream_transform/lib/stream_transform.dart
new file mode 100644
index 0000000..edf4df9
--- /dev/null
+++ b/pkgs/stream_transform/lib/stream_transform.dart
@@ -0,0 +1,15 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+export 'src/async_expand.dart';
+export 'src/async_map.dart';
+export 'src/combine_latest.dart';
+export 'src/concatenate.dart';
+export 'src/merge.dart';
+export 'src/rate_limit.dart';
+export 'src/scan.dart';
+export 'src/switch.dart';
+export 'src/take_until.dart';
+export 'src/tap.dart';
+export 'src/where.dart';
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
new file mode 100644
index 0000000..1e2298a
--- /dev/null
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -0,0 +1,13 @@
+name: stream_transform
+version: 2.1.1
+description: A collection of utilities to transform and manipulate streams.
+repository: https://github.com/dart-lang/tools/tree/main/pkgs/stream_transform
+
+environment:
+ sdk: ^3.1.0
+
+dev_dependencies:
+ async: ^2.5.0
+ dart_flutter_team_lints: ^2.0.0
+ fake_async: ^1.3.0
+ test: ^1.16.0
diff --git a/pkgs/stream_transform/test/async_expand_test.dart b/pkgs/stream_transform/test/async_expand_test.dart
new file mode 100644
index 0000000..8d84300
--- /dev/null
+++ b/pkgs/stream_transform/test/async_expand_test.dart
@@ -0,0 +1,195 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ test('forwards errors from the convert callback', () async {
+ var errors = <String>[];
+ var source = Stream.fromIterable([1, 2, 3]);
+ source.concurrentAsyncExpand<void>((i) {
+ // ignore: only_throw_errors
+ throw 'Error: $i';
+ }).listen((_) {}, onError: errors.add);
+ await Future<void>(() {});
+ expect(errors, ['Error: 1', 'Error: 2', 'Error: 3']);
+ });
+
+ for (var outerType in streamTypes) {
+ for (var innerType in streamTypes) {
+ group('concurrentAsyncExpand $outerType to $innerType', () {
+ late StreamController<int> outerController;
+ late bool outerCanceled;
+ late List<StreamController<String>> innerControllers;
+ late List<bool> innerCanceled;
+ late List<String> emittedValues;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<String> transformed;
+ late StreamSubscription<String> subscription;
+
+ setUp(() {
+ outerController = createController(outerType)
+ ..onCancel = () {
+ outerCanceled = true;
+ };
+ outerCanceled = false;
+ innerControllers = [];
+ innerCanceled = [];
+ emittedValues = [];
+ errors = [];
+ isDone = false;
+ transformed = outerController.stream.concurrentAsyncExpand((i) {
+ var index = innerControllers.length;
+ innerCanceled.add(false);
+ innerControllers.add(createController<String>(innerType)
+ ..onCancel = () {
+ innerCanceled[index] = true;
+ });
+ return innerControllers.last.stream;
+ });
+ subscription = transformed
+ .listen(emittedValues.add, onError: errors.add, onDone: () {
+ isDone = true;
+ });
+ });
+
+ test('interleaves events from sub streams', () async {
+ outerController
+ ..add(1)
+ ..add(2);
+ await Future<void>(() {});
+ expect(emittedValues, isEmpty);
+ expect(innerControllers, hasLength(2));
+ innerControllers[0].add('First');
+ innerControllers[1].add('Second');
+ innerControllers[0].add('First again');
+ await Future<void>(() {});
+ expect(emittedValues, ['First', 'Second', 'First again']);
+ });
+
+ test('forwards errors from outer stream', () async {
+ outerController.addError('Error');
+ await Future<void>(() {});
+ expect(errors, ['Error']);
+ });
+
+ test('forwards errors from inner streams', () async {
+ outerController
+ ..add(1)
+ ..add(2);
+ await Future<void>(() {});
+ innerControllers[0].addError('Error 1');
+ innerControllers[1].addError('Error 2');
+ await Future<void>(() {});
+ expect(errors, ['Error 1', 'Error 2']);
+ });
+
+ test('can continue handling events after an error in outer stream',
+ () async {
+ outerController
+ ..addError('Error')
+ ..add(1);
+ await Future<void>(() {});
+ innerControllers[0].add('First');
+ await Future<void>(() {});
+ expect(emittedValues, ['First']);
+ expect(errors, ['Error']);
+ });
+
+ test('cancels outer subscription if output canceled', () async {
+ await subscription.cancel();
+ expect(outerCanceled, true);
+ });
+
+ if (outerType != 'broadcast' || innerType != 'single subscription') {
+ // A single subscription inner stream in a broadcast outer stream is
+ // not canceled.
+ test('cancels inner subscriptions if output canceled', () async {
+ outerController
+ ..add(1)
+ ..add(2);
+ await Future<void>(() {});
+ await subscription.cancel();
+ expect(innerCanceled, [true, true]);
+ });
+ }
+
+ test('stays open if any inner stream is still open', () async {
+ outerController.add(1);
+ await outerController.close();
+ await Future<void>(() {});
+ expect(isDone, false);
+ });
+
+ test('stays open if outer stream is still open', () async {
+ outerController.add(1);
+ await Future<void>(() {});
+ await innerControllers[0].close();
+ await Future<void>(() {});
+ expect(isDone, false);
+ });
+
+ test('closes after all inner streams and outer stream close', () async {
+ outerController.add(1);
+ await Future<void>(() {});
+ await innerControllers[0].close();
+ await outerController.close();
+ await Future<void>(() {});
+ expect(isDone, true);
+ });
+
+ if (outerType == 'broadcast') {
+ test('multiple listerns all get values', () async {
+ var otherValues = <String>[];
+ transformed.listen(otherValues.add);
+ outerController.add(1);
+ await Future<void>(() {});
+ innerControllers[0].add('First');
+ await Future<void>(() {});
+ expect(emittedValues, ['First']);
+ expect(otherValues, ['First']);
+ });
+
+ test('multiple listeners get closed', () async {
+ var otherDone = false;
+ transformed.listen(null, onDone: () => otherDone = true);
+ outerController.add(1);
+ await Future<void>(() {});
+ await innerControllers[0].close();
+ await outerController.close();
+ await Future<void>(() {});
+ expect(isDone, true);
+ expect(otherDone, true);
+ });
+
+ test('can cancel and relisten', () async {
+ outerController
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ innerControllers[0].add('First');
+ innerControllers[1].add('Second');
+ await Future(() {});
+ await subscription.cancel();
+ innerControllers[0].add('Ignored');
+ await Future(() {});
+ subscription = transformed.listen(emittedValues.add);
+ innerControllers[0].add('Also ignored');
+ outerController.add(3);
+ await Future(() {});
+ innerControllers[2].add('More');
+ await Future(() {});
+ expect(emittedValues, ['First', 'Second', 'More']);
+ });
+ }
+ });
+ }
+ }
+}
diff --git a/pkgs/stream_transform/test/async_map_buffer_test.dart b/pkgs/stream_transform/test/async_map_buffer_test.dart
new file mode 100644
index 0000000..2386217
--- /dev/null
+++ b/pkgs/stream_transform/test/async_map_buffer_test.dart
@@ -0,0 +1,204 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ late StreamController<int> values;
+ late List<String> emittedValues;
+ late bool valuesCanceled;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<String> transformed;
+ late StreamSubscription<String> subscription;
+
+ Completer<String>? finishWork;
+ List<int>? workArgument;
+
+ /// Represents the async `convert` function and asserts that is is only called
+ /// after the previous iteration has completed.
+ Future<String> work(List<int> values) {
+ expect(finishWork, isNull,
+ reason: 'See $values befor previous work is complete');
+ workArgument = values;
+ finishWork = Completer()
+ ..future.then((_) {
+ workArgument = null;
+ finishWork = null;
+ }).catchError((_) {
+ workArgument = null;
+ finishWork = null;
+ });
+ return finishWork!.future;
+ }
+
+ for (var streamType in streamTypes) {
+ group('asyncMapBuffer for stream type: [$streamType]', () {
+ setUp(() {
+ valuesCanceled = false;
+ values = createController(streamType)
+ ..onCancel = () {
+ valuesCanceled = true;
+ };
+ emittedValues = [];
+ errors = [];
+ isDone = false;
+ finishWork = null;
+ workArgument = null;
+ transformed = values.stream.asyncMapBuffer(work);
+ subscription = transformed
+ .listen(emittedValues.add, onError: errors.add, onDone: () {
+ isDone = true;
+ });
+ });
+
+ test('does not emit before work finishes', () async {
+ values.add(1);
+ await Future(() {});
+ expect(emittedValues, isEmpty);
+ expect(workArgument, [1]);
+ finishWork!.complete('result');
+ await Future(() {});
+ expect(emittedValues, ['result']);
+ });
+
+ test('buffers values while work is ongoing', () async {
+ values.add(1);
+ await Future(() {});
+ values
+ ..add(2)
+ ..add(3);
+ await Future(() {});
+ finishWork!.complete('');
+ await Future(() {});
+ expect(workArgument, [2, 3]);
+ });
+
+ test('forwards errors without waiting for work', () async {
+ values.add(1);
+ await Future(() {});
+ values.addError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+
+ test('forwards errors which occur during the work', () async {
+ values.add(1);
+ await Future(() {});
+ finishWork!.completeError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+
+ test('can continue handling events after an error', () async {
+ values.add(1);
+ await Future(() {});
+ finishWork!.completeError('error');
+ values.add(2);
+ await Future(() {});
+ expect(workArgument, [2]);
+ finishWork!.completeError('another');
+ await Future(() {});
+ expect(errors, ['error', 'another']);
+ });
+
+ test('does not start next work early due to an error in values',
+ () async {
+ values.add(1);
+ await Future(() {});
+ values
+ ..addError('error')
+ ..add(2);
+ await Future(() {});
+ expect(errors, ['error']);
+ // [work] will assert that the second iteration is not called because
+ // the first has not completed.
+ });
+
+ test('cancels value subscription when output canceled', () async {
+ expect(valuesCanceled, false);
+ await subscription.cancel();
+ expect(valuesCanceled, true);
+ });
+
+ test('closes when values end if no work is pending', () async {
+ expect(isDone, false);
+ await values.close();
+ await Future(() {});
+ expect(isDone, true);
+ });
+
+ test('waits for pending work when values close', () async {
+ values.add(1);
+ await Future(() {});
+ expect(isDone, false);
+ values.add(2);
+ await values.close();
+ expect(isDone, false);
+ finishWork!.complete('');
+ await Future(() {});
+ // Still a pending value
+ expect(isDone, false);
+ finishWork!.complete('');
+ await Future(() {});
+ expect(isDone, true);
+ });
+
+ test('forwards errors from values', () async {
+ values.addError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+
+ if (streamType == 'broadcast') {
+ test('multiple listeners all get values', () async {
+ var otherValues = <String>[];
+ transformed.listen(otherValues.add);
+ values.add(1);
+ await Future(() {});
+ finishWork!.complete('result');
+ await Future(() {});
+ expect(emittedValues, ['result']);
+ expect(otherValues, ['result']);
+ });
+
+ test('multiple listeners get done when values end', () async {
+ var otherDone = false;
+ transformed.listen(null, onDone: () => otherDone = true);
+ values.add(1);
+ await Future(() {});
+ await values.close();
+ expect(isDone, false);
+ expect(otherDone, false);
+ finishWork!.complete('');
+ await Future(() {});
+ expect(isDone, true);
+ expect(otherDone, true);
+ });
+
+ test('can cancel and relisten', () async {
+ values.add(1);
+ await Future(() {});
+ finishWork!.complete('first');
+ await Future(() {});
+ await subscription.cancel();
+ values.add(2);
+ await Future(() {});
+ subscription = transformed.listen(emittedValues.add);
+ values.add(3);
+ await Future(() {});
+ expect(workArgument, [3]);
+ finishWork!.complete('second');
+ await Future(() {});
+ expect(emittedValues, ['first', 'second']);
+ });
+ }
+ });
+ }
+}
diff --git a/pkgs/stream_transform/test/async_map_sample_test.dart b/pkgs/stream_transform/test/async_map_sample_test.dart
new file mode 100644
index 0000000..62b1b92
--- /dev/null
+++ b/pkgs/stream_transform/test/async_map_sample_test.dart
@@ -0,0 +1,209 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ late StreamController<int> values;
+ late List<String> emittedValues;
+ late bool valuesCanceled;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<String> transformed;
+ late StreamSubscription<String> subscription;
+
+ Completer<String>? finishWork;
+ int? workArgument;
+
+ /// Represents the async `convert` function and asserts that is is only called
+ /// after the previous iteration has completed.
+ Future<String> work(int value) {
+ expect(finishWork, isNull,
+ reason: 'See $values befor previous work is complete');
+ workArgument = value;
+ finishWork = Completer()
+ ..future.then((_) {
+ workArgument = null;
+ finishWork = null;
+ }).catchError((_) {
+ workArgument = null;
+ finishWork = null;
+ });
+ return finishWork!.future;
+ }
+
+ for (var streamType in streamTypes) {
+ group('asyncMapSample for stream type: [$streamType]', () {
+ setUp(() {
+ valuesCanceled = false;
+ values = createController(streamType)
+ ..onCancel = () {
+ valuesCanceled = true;
+ };
+ emittedValues = [];
+ errors = [];
+ isDone = false;
+ finishWork = null;
+ workArgument = null;
+ transformed = values.stream.asyncMapSample(work);
+ subscription = transformed
+ .listen(emittedValues.add, onError: errors.add, onDone: () {
+ isDone = true;
+ });
+ });
+
+ test('does not emit before work finishes', () async {
+ values.add(1);
+ await Future(() {});
+ expect(emittedValues, isEmpty);
+ expect(workArgument, 1);
+ finishWork!.complete('result');
+ await Future(() {});
+ expect(emittedValues, ['result']);
+ });
+
+ test('buffers values while work is ongoing', () async {
+ values.add(1);
+ await Future(() {});
+ values
+ ..add(2)
+ ..add(3);
+ await Future(() {});
+ finishWork!.complete('');
+ await Future(() {});
+ expect(workArgument, 3);
+ });
+
+ test('forwards errors without waiting for work', () async {
+ values.add(1);
+ await Future(() {});
+ values.addError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+
+ test('forwards errors which occur during the work', () async {
+ values.add(1);
+ await Future(() {});
+ finishWork!.completeError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+
+ test('can continue handling events after an error', () async {
+ values.add(1);
+ await Future(() {});
+ finishWork!.completeError('error');
+ values.add(2);
+ await Future(() {});
+ expect(workArgument, 2);
+ finishWork!.completeError('another');
+ await Future(() {});
+ expect(errors, ['error', 'another']);
+ });
+
+ test('does not start next work early due to an error in values',
+ () async {
+ values.add(1);
+ await Future(() {});
+ values
+ ..addError('error')
+ ..add(2);
+ await Future(() {});
+ expect(errors, ['error']);
+ // [work] will assert that the second iteration is not called because
+ // the first has not completed.
+ });
+
+ test('cancels value subscription when output canceled', () async {
+ expect(valuesCanceled, false);
+ await subscription.cancel();
+ expect(valuesCanceled, true);
+ });
+
+ test('closes when values end if no work is pending', () async {
+ expect(isDone, false);
+ await values.close();
+ await Future(() {});
+ expect(isDone, true);
+ });
+
+ test('waits for pending work when values close', () async {
+ values.add(1);
+ await Future(() {});
+ expect(isDone, false);
+ values.add(2);
+ await values.close();
+ expect(isDone, false);
+ finishWork!.complete('');
+ await Future(() {});
+ // Still a pending value
+ expect(isDone, false);
+ finishWork!.complete('');
+ await Future(() {});
+ expect(isDone, true);
+ });
+
+ test('forwards errors from values', () async {
+ values.addError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+
+ if (streamType == 'broadcast') {
+ test('multiple listeners all get values', () async {
+ var otherValues = <String>[];
+ transformed.listen(otherValues.add);
+ values.add(1);
+ await Future(() {});
+ finishWork!.complete('result');
+ await Future(() {});
+ expect(emittedValues, ['result']);
+ expect(otherValues, ['result']);
+ });
+
+ test('multiple listeners get done when values end', () async {
+ var otherDone = false;
+ transformed.listen(null, onDone: () => otherDone = true);
+ values.add(1);
+ await Future(() {});
+ await values.close();
+ expect(isDone, false);
+ expect(otherDone, false);
+ finishWork!.complete('');
+ await Future(() {});
+ expect(isDone, true);
+ expect(otherDone, true);
+ });
+
+ test('can cancel and relisten', () async {
+ values.add(1);
+ await Future(() {});
+ finishWork!.complete('first');
+ await Future(() {});
+ await subscription.cancel();
+ values.add(2);
+ await Future(() {});
+ subscription = transformed.listen(emittedValues.add);
+ values.add(3);
+ await Future(() {});
+ expect(workArgument, 3);
+ finishWork!.complete('second');
+ await Future(() {});
+ expect(emittedValues, ['first', 'second']);
+ });
+ }
+ });
+ }
+
+ test('allows nulls', () async {
+ var stream = Stream<int?>.value(null);
+ await stream.asyncMapSample(expectAsync1((_) async {})).drain<void>();
+ });
+}
diff --git a/pkgs/stream_transform/test/async_where_test.dart b/pkgs/stream_transform/test/async_where_test.dart
new file mode 100644
index 0000000..6ea4e76
--- /dev/null
+++ b/pkgs/stream_transform/test/async_where_test.dart
@@ -0,0 +1,90 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+void main() {
+ test('forwards only events that pass the predicate', () async {
+ var values = Stream.fromIterable([1, 2, 3, 4]);
+ var filtered = values.asyncWhere((e) async => e > 2);
+ expect(await filtered.toList(), [3, 4]);
+ });
+
+ test('allows predicates that go through event loop', () async {
+ var values = Stream.fromIterable([1, 2, 3, 4]);
+ var filtered = values.asyncWhere((e) async {
+ await Future(() {});
+ return e > 2;
+ });
+ expect(await filtered.toList(), [3, 4]);
+ });
+
+ test('allows synchronous predicate', () async {
+ var values = Stream.fromIterable([1, 2, 3, 4]);
+ var filtered = values.asyncWhere((e) => e > 2);
+ expect(await filtered.toList(), [3, 4]);
+ });
+
+ test('can result in empty stream', () async {
+ var values = Stream.fromIterable([1, 2, 3, 4]);
+ var filtered = values.asyncWhere((e) => e > 4);
+ expect(await filtered.isEmpty, true);
+ });
+
+ test('forwards values to multiple listeners', () async {
+ var values = StreamController<int>.broadcast();
+ var filtered = values.stream.asyncWhere((e) async => e > 2);
+ var firstValues = <int>[];
+ var secondValues = <int>[];
+ filtered
+ ..listen(firstValues.add)
+ ..listen(secondValues.add);
+ values
+ ..add(1)
+ ..add(2)
+ ..add(3)
+ ..add(4);
+ await Future(() {});
+ expect(firstValues, [3, 4]);
+ expect(secondValues, [3, 4]);
+ });
+
+ test('closes streams with multiple listeners', () async {
+ var values = StreamController<int>.broadcast();
+ var predicate = Completer<bool>();
+ var filtered = values.stream.asyncWhere((_) => predicate.future);
+ var firstDone = false;
+ var secondDone = false;
+ filtered
+ ..listen(null, onDone: () => firstDone = true)
+ ..listen(null, onDone: () => secondDone = true);
+ values.add(1);
+ await values.close();
+ expect(firstDone, false);
+ expect(secondDone, false);
+
+ predicate.complete(true);
+ await Future(() {});
+ expect(firstDone, true);
+ expect(secondDone, true);
+ });
+
+ test('forwards errors emitted by the test callback', () async {
+ var errors = <Object>[];
+ var emitted = <Object>[];
+ var values = Stream.fromIterable([1, 2, 3, 4]);
+ var filtered = values.asyncWhere((e) async {
+ await Future(() {});
+ if (e.isEven) throw Exception('$e');
+ return true;
+ });
+ var done = Completer<Object?>();
+ filtered.listen(emitted.add, onError: errors.add, onDone: done.complete);
+ await done.future;
+ expect(emitted, [1, 3]);
+ expect(errors.map((e) => '$e'), ['Exception: 2', 'Exception: 4']);
+ });
+}
diff --git a/pkgs/stream_transform/test/audit_test.dart b/pkgs/stream_transform/test/audit_test.dart
new file mode 100644
index 0000000..28537db
--- /dev/null
+++ b/pkgs/stream_transform/test/audit_test.dart
@@ -0,0 +1,140 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:fake_async/fake_async.dart';
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ for (var streamType in streamTypes) {
+ group('Stream type [$streamType]', () {
+ late StreamController<int> values;
+ late List<int> emittedValues;
+ late bool valuesCanceled;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<int> transformed;
+ late StreamSubscription<int> subscription;
+
+ group('audit', () {
+ setUp(() {
+ valuesCanceled = false;
+ values = createController(streamType)
+ ..onCancel = () {
+ valuesCanceled = true;
+ };
+ emittedValues = [];
+ errors = [];
+ isDone = false;
+ transformed = values.stream.audit(const Duration(milliseconds: 6));
+ });
+
+ void listen() {
+ subscription = transformed
+ .listen(emittedValues.add, onError: errors.add, onDone: () {
+ isDone = true;
+ });
+ }
+
+ test('cancels values', () async {
+ listen();
+ await subscription.cancel();
+ expect(valuesCanceled, true);
+ });
+
+ test('swallows values that come faster than duration', () {
+ fakeAsync((async) {
+ listen();
+ values
+ ..add(1)
+ ..add(2)
+ ..close();
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [2]);
+ });
+ });
+
+ test('outputs multiple values spaced further than duration', () {
+ fakeAsync((async) {
+ listen();
+ values.add(1);
+ async.elapse(const Duration(milliseconds: 6));
+ values.add(2);
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [1, 2]);
+ });
+ });
+
+ test('waits for pending value to close', () {
+ fakeAsync((async) {
+ listen();
+ values
+ ..add(1)
+ ..close();
+ expect(isDone, false);
+ async.elapse(const Duration(milliseconds: 6));
+ expect(isDone, true);
+ });
+ });
+
+ test('closes output if there are no pending values', () {
+ fakeAsync((async) {
+ listen();
+ values.add(1);
+ async.elapse(const Duration(milliseconds: 6));
+ values
+ ..add(2)
+ ..close();
+ expect(isDone, false);
+ expect(emittedValues, [1]);
+ async.elapse(const Duration(milliseconds: 6));
+ expect(isDone, true);
+ expect(emittedValues, [1, 2]);
+ });
+ });
+
+ test('does not starve output if many values come closer than duration',
+ () {
+ fakeAsync((async) {
+ listen();
+ values.add(1);
+ async.elapse(const Duration(milliseconds: 3));
+ values.add(2);
+ async.elapse(const Duration(milliseconds: 3));
+ values.add(3);
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [2, 3]);
+ });
+ });
+
+ if (streamType == 'broadcast') {
+ test('multiple listeners all get the values', () {
+ fakeAsync((async) {
+ listen();
+ values.add(1);
+ async.elapse(const Duration(milliseconds: 3));
+ values.add(2);
+ var otherValues = <int>[];
+ transformed.listen(otherValues.add);
+ values.add(3);
+ async.elapse(const Duration(milliseconds: 3));
+ values.add(4);
+ async.elapse(const Duration(milliseconds: 3));
+ values
+ ..add(5)
+ ..close();
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [3, 5]);
+ expect(otherValues, [3, 5]);
+ });
+ });
+ }
+ });
+ });
+ }
+}
diff --git a/pkgs/stream_transform/test/buffer_test.dart b/pkgs/stream_transform/test/buffer_test.dart
new file mode 100644
index 0000000..830f555
--- /dev/null
+++ b/pkgs/stream_transform/test/buffer_test.dart
@@ -0,0 +1,305 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ late StreamController<void> trigger;
+ late StreamController<int> values;
+ late List<List<int>> emittedValues;
+ late bool valuesCanceled;
+ late bool triggerCanceled;
+ late bool triggerPaused;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<List<int>> transformed;
+ late StreamSubscription<List<int>> subscription;
+
+ void setUpForStreamTypes(String triggerType, String valuesType,
+ {required bool longPoll}) {
+ valuesCanceled = false;
+ triggerCanceled = false;
+ triggerPaused = false;
+ trigger = createController(triggerType)
+ ..onCancel = () {
+ triggerCanceled = true;
+ };
+ if (triggerType == 'single subscription') {
+ trigger.onPause = () {
+ triggerPaused = true;
+ };
+ }
+ values = createController(valuesType)
+ ..onCancel = () {
+ valuesCanceled = true;
+ };
+ emittedValues = [];
+ errors = [];
+ isDone = false;
+ transformed = values.stream.buffer(trigger.stream, longPoll: longPoll);
+ subscription =
+ transformed.listen(emittedValues.add, onError: errors.add, onDone: () {
+ isDone = true;
+ });
+ }
+
+ for (var triggerType in streamTypes) {
+ for (var valuesType in streamTypes) {
+ group('Trigger type: [$triggerType], Values type: [$valuesType]', () {
+ group('general behavior', () {
+ setUp(() {
+ setUpForStreamTypes(triggerType, valuesType, longPoll: true);
+ });
+
+ test('does not emit before `trigger`', () async {
+ values.add(1);
+ await Future(() {});
+ expect(emittedValues, isEmpty);
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [
+ [1]
+ ]);
+ });
+
+ test('groups values between trigger', () async {
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ trigger.add(null);
+ values
+ ..add(3)
+ ..add(4);
+ await Future(() {});
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [
+ [1, 2],
+ [3, 4]
+ ]);
+ });
+
+ test('cancels value subscription when output canceled', () async {
+ expect(valuesCanceled, false);
+ await subscription.cancel();
+ expect(valuesCanceled, true);
+ });
+
+ test('closes when trigger ends', () async {
+ expect(isDone, false);
+ await trigger.close();
+ await Future(() {});
+ expect(isDone, true);
+ });
+
+ test('closes after outputting final values when source closes',
+ () async {
+ expect(isDone, false);
+ values.add(1);
+ await values.close();
+ expect(isDone, false);
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [
+ [1]
+ ]);
+ expect(isDone, true);
+ });
+
+ test('closes when source closes and there are no buffered', () async {
+ expect(isDone, false);
+ await values.close();
+ await Future(() {});
+ expect(isDone, true);
+ });
+
+ test('forwards errors from trigger', () async {
+ trigger.addError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+
+ test('forwards errors from values', () async {
+ values.addError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+ });
+
+ group('long polling', () {
+ setUp(() {
+ setUpForStreamTypes(triggerType, valuesType, longPoll: true);
+ });
+
+ test('emits immediately if trigger emits before a value', () async {
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, isEmpty);
+ values.add(1);
+ await Future(() {});
+ expect(emittedValues, [
+ [1]
+ ]);
+ });
+
+ test('two triggers in a row - emit buffere then emit next value',
+ () async {
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ trigger
+ ..add(null)
+ ..add(null);
+ await Future(() {});
+ values.add(3);
+ await Future(() {});
+ expect(emittedValues, [
+ [1, 2],
+ [3]
+ ]);
+ });
+
+ test('pre-emptive trigger then trigger after values', () async {
+ trigger.add(null);
+ await Future(() {});
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [
+ [1],
+ [2]
+ ]);
+ });
+
+ test('multiple pre-emptive triggers, only emits first value',
+ () async {
+ trigger
+ ..add(null)
+ ..add(null);
+ await Future(() {});
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ expect(emittedValues, [
+ [1]
+ ]);
+ });
+
+ test('closes if there is no waiting long poll when source closes',
+ () async {
+ expect(isDone, false);
+ values.add(1);
+ trigger.add(null);
+ await values.close();
+ await Future(() {});
+ expect(isDone, true);
+ });
+
+ test('waits to emit if there waiting long poll when trigger closes',
+ () async {
+ trigger.add(null);
+ await trigger.close();
+ expect(isDone, false);
+ values.add(1);
+ await Future(() {});
+ expect(emittedValues, [
+ [1]
+ ]);
+ expect(isDone, true);
+ });
+ });
+
+ group('immediate polling', () {
+ setUp(() {
+ setUpForStreamTypes(triggerType, valuesType, longPoll: false);
+ });
+
+ test('emits empty list before values', () async {
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [<int>[]]);
+ });
+
+ test('emits empty list after emitting values', () async {
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ trigger
+ ..add(null)
+ ..add(null);
+ await Future(() {});
+ expect(emittedValues, [
+ [1, 2],
+ <int>[]
+ ]);
+ });
+ });
+ });
+ }
+ }
+
+ test('always cancels trigger if values is singlesubscription', () async {
+ setUpForStreamTypes('broadcast', 'single subscription', longPoll: true);
+ expect(triggerCanceled, false);
+ await subscription.cancel();
+ expect(triggerCanceled, true);
+
+ setUpForStreamTypes('single subscription', 'single subscription',
+ longPoll: true);
+ expect(triggerCanceled, false);
+ await subscription.cancel();
+ expect(triggerCanceled, true);
+ });
+
+ test('cancels trigger if trigger is broadcast', () async {
+ setUpForStreamTypes('broadcast', 'broadcast', longPoll: true);
+ expect(triggerCanceled, false);
+ await subscription.cancel();
+ expect(triggerCanceled, true);
+ });
+
+ test('pauses single subscription trigger for broadcast values', () async {
+ setUpForStreamTypes('single subscription', 'broadcast', longPoll: true);
+ expect(triggerCanceled, false);
+ expect(triggerPaused, false);
+ await subscription.cancel();
+ expect(triggerCanceled, false);
+ expect(triggerPaused, true);
+ });
+
+ for (var triggerType in streamTypes) {
+ test('cancel and relisten with [$triggerType] trigger', () async {
+ setUpForStreamTypes(triggerType, 'broadcast', longPoll: true);
+ values.add(1);
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [
+ [1]
+ ]);
+ await subscription.cancel();
+ values.add(2);
+ trigger.add(null);
+ await Future(() {});
+ subscription = transformed.listen(emittedValues.add);
+ values.add(3);
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [
+ [1],
+ [3]
+ ]);
+ });
+ }
+}
diff --git a/pkgs/stream_transform/test/combine_latest_all_test.dart b/pkgs/stream_transform/test/combine_latest_all_test.dart
new file mode 100644
index 0000000..f4b719c
--- /dev/null
+++ b/pkgs/stream_transform/test/combine_latest_all_test.dart
@@ -0,0 +1,166 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+Future<void> tick() => Future(() {});
+
+void main() {
+ group('combineLatestAll', () {
+ test('emits latest values', () async {
+ final first = StreamController<String>();
+ final second = StreamController<String>();
+ final third = StreamController<String>();
+ final combined = first.stream.combineLatestAll(
+ [second.stream, third.stream]).map((data) => data.join());
+
+ // first: a----b------------------c--------d---|
+ // second: --1---------2-----------------|
+ // third: -------&----------%---|
+ // combined: -------b1&--b2&---b2%---c2%------d2%-|
+
+ expect(combined,
+ emitsInOrder(['b1&', 'b2&', 'b2%', 'c2%', 'd2%', emitsDone]));
+
+ first.add('a');
+ await tick();
+ second.add('1');
+ await tick();
+ first.add('b');
+ await tick();
+ third.add('&');
+ await tick();
+ second.add('2');
+ await tick();
+ third.add('%');
+ await tick();
+ await third.close();
+ await tick();
+ first.add('c');
+ await tick();
+ await second.close();
+ await tick();
+ first.add('d');
+ await tick();
+ await first.close();
+ });
+
+ test('ends if a Stream closes without ever emitting a value', () async {
+ final first = StreamController<String>();
+ final second = StreamController<String>();
+ final combined = first.stream.combineLatestAll([second.stream]);
+
+ // first: -a------b-------|
+ // second: -----|
+ // combined: -----|
+
+ expect(combined, emits(emitsDone));
+
+ first.add('a');
+ await tick();
+ await second.close();
+ await tick();
+ first.add('b');
+ });
+
+ test('forwards errors', () async {
+ final first = StreamController<String>();
+ final second = StreamController<String>();
+ final combined = first.stream
+ .combineLatestAll([second.stream]).map((data) => data.join());
+
+ // first: -a---------|
+ // second: ----1---#
+ // combined: ----a1--#
+
+ expect(combined, emitsThrough(emitsError('doh')));
+
+ first.add('a');
+ await tick();
+ second.add('1');
+ await tick();
+ second.addError('doh');
+ });
+
+ test('ends after both streams have ended', () async {
+ final first = StreamController<String>();
+ final second = StreamController<String>();
+
+ var done = false;
+ first.stream.combineLatestAll([second.stream]).listen(null,
+ onDone: () => done = true);
+
+ // first: -a---|
+ // second: --------1--|
+ // combined: --------a1-|
+
+ first.add('a');
+ await tick();
+ await first.close();
+ await tick();
+
+ expect(done, isFalse);
+
+ second.add('1');
+ await tick();
+ await second.close();
+ await tick();
+
+ expect(done, isTrue);
+ });
+
+ group('broadcast source', () {
+ test('can cancel and relisten to broadcast stream', () async {
+ final first = StreamController<String>.broadcast();
+ final second = StreamController<String>.broadcast();
+ final combined = first.stream
+ .combineLatestAll([second.stream]).map((data) => data.join());
+
+ // first: a------b----------------c------d----e---|
+ // second: --1---------2---3---4------5-|
+ // combined: --a1---b1---b2--b3--b4-----c5--d5---e5--|
+ // sub1: ^-----------------!
+ // sub2: ----------------------^-----------------|
+
+ expect(combined.take(4), emitsInOrder(['a1', 'b1', 'b2', 'b3']));
+
+ first.add('a');
+ await tick();
+ second.add('1');
+ await tick();
+ first.add('b');
+ await tick();
+ second.add('2');
+ await tick();
+ second.add('3');
+ await tick();
+
+ // First subscription is canceled here by .take(4)
+ expect(first.hasListener, isFalse);
+ expect(second.hasListener, isFalse);
+
+ // This emit is thrown away because there are no subscribers
+ second.add('4');
+ await tick();
+
+ expect(combined, emitsInOrder(['c5', 'd5', 'e5', emitsDone]));
+
+ first.add('c');
+ await tick();
+ second.add('5');
+ await tick();
+ await second.close();
+ await tick();
+ first.add('d');
+ await tick();
+ first.add('e');
+ await tick();
+ await first.close();
+ });
+ });
+ });
+}
diff --git a/pkgs/stream_transform/test/combine_latest_test.dart b/pkgs/stream_transform/test/combine_latest_test.dart
new file mode 100644
index 0000000..1985c75
--- /dev/null
+++ b/pkgs/stream_transform/test/combine_latest_test.dart
@@ -0,0 +1,179 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+void main() {
+ group('combineLatest', () {
+ test('flows through combine callback', () async {
+ var source = StreamController<int>();
+ var other = StreamController<int>();
+ int sum(int a, int b) => a + b;
+
+ var results = <int>[];
+ unawaited(
+ source.stream.combineLatest(other.stream, sum).forEach(results.add));
+
+ source.add(1);
+ await Future(() {});
+ expect(results, isEmpty);
+
+ other.add(2);
+ await Future(() {});
+ expect(results, [3]);
+
+ source.add(3);
+ await Future(() {});
+ expect(results, [3, 5]);
+
+ source.add(4);
+ await Future(() {});
+ expect(results, [3, 5, 6]);
+
+ other.add(5);
+ await Future(() {});
+ expect(results, [3, 5, 6, 9]);
+ });
+
+ test('can combine different typed streams', () async {
+ var source = StreamController<String>();
+ var other = StreamController<int>();
+ String times(String a, int b) => a * b;
+
+ var results = <String>[];
+ unawaited(source.stream
+ .combineLatest(other.stream, times)
+ .forEach(results.add));
+
+ source
+ ..add('a')
+ ..add('b');
+ await Future(() {});
+ expect(results, isEmpty);
+
+ other.add(2);
+ await Future(() {});
+ expect(results, ['bb']);
+
+ other.add(3);
+ await Future(() {});
+ expect(results, ['bb', 'bbb']);
+
+ source.add('c');
+ await Future(() {});
+ expect(results, ['bb', 'bbb', 'ccc']);
+ });
+
+ test('ends after both streams have ended', () async {
+ var source = StreamController<int>();
+ var other = StreamController<int>();
+ int sum(int a, int b) => a + b;
+
+ var done = false;
+ source.stream
+ .combineLatest(other.stream, sum)
+ .listen(null, onDone: () => done = true);
+
+ source.add(1);
+
+ await source.close();
+ await Future(() {});
+ expect(done, false);
+
+ await other.close();
+ await Future(() {});
+ expect(done, true);
+ });
+
+ test('ends if source stream closes without ever emitting a value',
+ () async {
+ var source = const Stream<int>.empty();
+ var other = StreamController<int>();
+
+ int sum(int a, int b) => a + b;
+
+ var done = false;
+ source
+ .combineLatest(other.stream, sum)
+ .listen(null, onDone: () => done = true);
+
+ await Future(() {});
+ // Nothing can ever be emitted on the result, may as well close.
+ expect(done, true);
+ });
+
+ test('ends if other stream closes without ever emitting a value', () async {
+ var source = StreamController<int>();
+ var other = const Stream<int>.empty();
+
+ int sum(int a, int b) => a + b;
+
+ var done = false;
+ source.stream
+ .combineLatest(other, sum)
+ .listen(null, onDone: () => done = true);
+
+ await Future(() {});
+ // Nothing can ever be emitted on the result, may as well close.
+ expect(done, true);
+ });
+
+ test('forwards errors', () async {
+ var source = StreamController<int>();
+ var other = StreamController<int>();
+ int sum(int a, int b) => throw _NumberedException(3);
+
+ var errors = <Object>[];
+ source.stream
+ .combineLatest(other.stream, sum)
+ .listen(null, onError: errors.add);
+
+ source.addError(_NumberedException(1));
+ other.addError(_NumberedException(2));
+
+ source.add(1);
+ other.add(2);
+
+ await Future(() {});
+
+ expect(errors, [_isException(1), _isException(2), _isException(3)]);
+ });
+
+ group('broadcast source', () {
+ test('can cancel and relisten to broadcast stream', () async {
+ var source = StreamController<int>.broadcast();
+ var other = StreamController<int>();
+ int combine(int a, int b) => a + b;
+
+ var emittedValues = <int>[];
+ var transformed = source.stream.combineLatest(other.stream, combine);
+
+ var subscription = transformed.listen(emittedValues.add);
+
+ source.add(1);
+ other.add(2);
+ await Future(() {});
+ expect(emittedValues, [3]);
+
+ await subscription.cancel();
+
+ subscription = transformed.listen(emittedValues.add);
+ source.add(3);
+ await Future(() {});
+ expect(emittedValues, [3, 5]);
+ });
+ });
+ });
+}
+
+class _NumberedException implements Exception {
+ final int id;
+ _NumberedException(this.id);
+}
+
+Matcher _isException(int id) =>
+ const TypeMatcher<_NumberedException>().having((n) => n.id, 'id', id);
diff --git a/pkgs/stream_transform/test/concurrent_async_map_test.dart b/pkgs/stream_transform/test/concurrent_async_map_test.dart
new file mode 100644
index 0000000..1807f9f
--- /dev/null
+++ b/pkgs/stream_transform/test/concurrent_async_map_test.dart
@@ -0,0 +1,157 @@
+// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ late StreamController<int> controller;
+ late List<String> emittedValues;
+ late bool valuesCanceled;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<String> transformed;
+ late StreamSubscription<String> subscription;
+
+ late List<Completer<String>> finishWork;
+ late List<dynamic> values;
+
+ Future<String> convert(int value) {
+ values.add(value);
+ var completer = Completer<String>();
+ finishWork.add(completer);
+ return completer.future;
+ }
+
+ for (var streamType in streamTypes) {
+ group('concurrentAsyncMap for stream type: [$streamType]', () {
+ setUp(() {
+ valuesCanceled = false;
+ controller = createController(streamType)
+ ..onCancel = () {
+ valuesCanceled = true;
+ };
+ emittedValues = [];
+ errors = [];
+ isDone = false;
+ finishWork = [];
+ values = [];
+ transformed = controller.stream.concurrentAsyncMap(convert);
+ subscription = transformed
+ .listen(emittedValues.add, onError: errors.add, onDone: () {
+ isDone = true;
+ });
+ });
+
+ test('does not emit before convert finishes', () async {
+ controller.add(1);
+ await Future(() {});
+ expect(emittedValues, isEmpty);
+ expect(values, [1]);
+ finishWork.first.complete('result');
+ await Future(() {});
+ expect(emittedValues, ['result']);
+ });
+
+ test('allows calls to convert before the last one finished', () async {
+ controller
+ ..add(1)
+ ..add(2)
+ ..add(3);
+ await Future(() {});
+ expect(values, [1, 2, 3]);
+ });
+
+ test('forwards errors directly without waiting for previous convert',
+ () async {
+ controller.add(1);
+ await Future(() {});
+ controller.addError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+
+ test('forwards errors which occur during the convert', () async {
+ controller.add(1);
+ await Future(() {});
+ finishWork.first.completeError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+
+ test('can continue handling events after an error', () async {
+ controller.add(1);
+ await Future(() {});
+ finishWork[0].completeError('error');
+ controller.add(2);
+ await Future(() {});
+ expect(values, [1, 2]);
+ finishWork[1].completeError('another');
+ await Future(() {});
+ expect(errors, ['error', 'another']);
+ });
+
+ test('cancels value subscription when output canceled', () async {
+ expect(valuesCanceled, false);
+ await subscription.cancel();
+ expect(valuesCanceled, true);
+ });
+
+ test('closes when values end if no conversion is pending', () async {
+ expect(isDone, false);
+ await controller.close();
+ await Future(() {});
+ expect(isDone, true);
+ });
+
+ if (streamType == 'broadcast') {
+ test('multiple listeners all get values', () async {
+ var otherValues = <String>[];
+ transformed.listen(otherValues.add);
+ controller.add(1);
+ await Future(() {});
+ finishWork.first.complete('result');
+ await Future(() {});
+ expect(emittedValues, ['result']);
+ expect(otherValues, ['result']);
+ });
+
+ test('multiple listeners get done when values end', () async {
+ var otherDone = false;
+ transformed.listen(null, onDone: () => otherDone = true);
+ controller.add(1);
+ await Future(() {});
+ await controller.close();
+ expect(isDone, false);
+ expect(otherDone, false);
+ finishWork.first.complete('');
+ await Future(() {});
+ expect(isDone, true);
+ expect(otherDone, true);
+ });
+
+ test('can cancel and relisten', () async {
+ controller.add(1);
+ await Future(() {});
+ finishWork.first.complete('first');
+ await Future(() {});
+ await subscription.cancel();
+ controller.add(2);
+ await Future(() {});
+ subscription = transformed.listen(emittedValues.add);
+ controller.add(3);
+ await Future(() {});
+ expect(values, [1, 3]);
+ finishWork[1].complete('second');
+ await Future(() {});
+ expect(emittedValues, ['first', 'second']);
+ });
+ }
+ });
+ }
+}
diff --git a/pkgs/stream_transform/test/debounce_test.dart b/pkgs/stream_transform/test/debounce_test.dart
new file mode 100644
index 0000000..19de055
--- /dev/null
+++ b/pkgs/stream_transform/test/debounce_test.dart
@@ -0,0 +1,310 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:fake_async/fake_async.dart';
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ for (var streamType in streamTypes) {
+ group('Stream type [$streamType]', () {
+ group('debounce - trailing', () {
+ late StreamController<int> values;
+ late List<int> emittedValues;
+ late bool valuesCanceled;
+ late bool isDone;
+ late List<String> errors;
+ late StreamSubscription<int> subscription;
+ late Stream<int> transformed;
+
+ setUp(() async {
+ valuesCanceled = false;
+ values = createController(streamType)
+ ..onCancel = () {
+ valuesCanceled = true;
+ };
+ emittedValues = [];
+ errors = [];
+ isDone = false;
+ transformed = values.stream.debounce(const Duration(milliseconds: 5));
+ });
+
+ void listen() {
+ subscription = transformed
+ .listen(emittedValues.add, onError: errors.add, onDone: () {
+ isDone = true;
+ });
+ }
+
+ test('cancels values', () async {
+ listen();
+ await subscription.cancel();
+ expect(valuesCanceled, true);
+ });
+
+ test('swallows values that come faster than duration', () {
+ fakeAsync((async) {
+ listen();
+ values
+ ..add(1)
+ ..add(2)
+ ..close();
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [2]);
+ });
+ });
+
+ test('outputs multiple values spaced further than duration', () {
+ fakeAsync((async) {
+ listen();
+ values.add(1);
+ async.elapse(const Duration(milliseconds: 6));
+ values.add(2);
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [1, 2]);
+ });
+ });
+
+ test('waits for pending value to close', () {
+ fakeAsync((async) {
+ listen();
+ values.add(1);
+ async.elapse(const Duration(milliseconds: 6));
+ values.close();
+ async.flushMicrotasks();
+ expect(isDone, true);
+ });
+ });
+
+ test('closes output if there are no pending values', () {
+ fakeAsync((async) {
+ listen();
+ values.add(1);
+ async.elapse(const Duration(milliseconds: 6));
+ values
+ ..add(2)
+ ..close();
+ async.flushMicrotasks();
+ expect(isDone, false);
+ async.elapse(const Duration(milliseconds: 6));
+ expect(isDone, true);
+ });
+ });
+
+ if (streamType == 'broadcast') {
+ test('multiple listeners all get values', () {
+ fakeAsync((async) {
+ listen();
+ var otherValues = <int>[];
+ transformed.listen(otherValues.add);
+ values
+ ..add(1)
+ ..add(2);
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [2]);
+ expect(otherValues, [2]);
+ });
+ });
+ }
+ });
+
+ group('debounce - leading', () {
+ late StreamController<int> values;
+ late List<int> emittedValues;
+ late Stream<int> transformed;
+ late bool isDone;
+
+ setUp(() async {
+ values = createController(streamType);
+ emittedValues = [];
+ isDone = false;
+ transformed = values.stream.debounce(const Duration(milliseconds: 5),
+ leading: true, trailing: false);
+ });
+
+ void listen() {
+ transformed.listen(emittedValues.add, onDone: () {
+ isDone = true;
+ });
+ }
+
+ test('swallows values that come faster than duration', () async {
+ listen();
+ values
+ ..add(1)
+ ..add(2);
+ await values.close();
+ expect(emittedValues, [1]);
+ });
+
+ test('outputs multiple values spaced further than duration', () {
+ fakeAsync((async) {
+ listen();
+ values.add(1);
+ async.elapse(const Duration(milliseconds: 6));
+ values.add(2);
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [1, 2]);
+ });
+ });
+
+ if (streamType == 'broadcast') {
+ test('multiple listeners all get values', () {
+ fakeAsync((async) {
+ listen();
+ var otherValues = <int>[];
+ transformed.listen(otherValues.add);
+ values
+ ..add(1)
+ ..add(2);
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [1]);
+ expect(otherValues, [1]);
+ });
+ });
+ }
+
+ test('closes output immediately if not waiting for trailing value',
+ () async {
+ listen();
+ values.add(1);
+ await values.close();
+ expect(isDone, true);
+ });
+ });
+
+ group('debounce - leading and trailing', () {
+ late StreamController<int> values;
+ late List<int> emittedValues;
+ late Stream<int> transformed;
+
+ setUp(() async {
+ values = createController(streamType);
+ emittedValues = [];
+ transformed = values.stream.debounce(const Duration(milliseconds: 5),
+ leading: true, trailing: true);
+ });
+ void listen() {
+ transformed.listen(emittedValues.add);
+ }
+
+ test('swallows values that come faster than duration', () {
+ fakeAsync((async) {
+ listen();
+ values
+ ..add(1)
+ ..add(2)
+ ..add(3)
+ ..close();
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [1, 3]);
+ });
+ });
+
+ test('outputs multiple values spaced further than duration', () {
+ fakeAsync((async) {
+ listen();
+ values.add(1);
+ async.elapse(const Duration(milliseconds: 6));
+ values.add(2);
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [1, 2]);
+ });
+ });
+
+ if (streamType == 'broadcast') {
+ test('multiple listeners all get values', () {
+ fakeAsync((async) {
+ listen();
+ var otherValues = <int>[];
+ transformed.listen(otherValues.add);
+ values
+ ..add(1)
+ ..add(2);
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [1, 2]);
+ expect(otherValues, [1, 2]);
+ });
+ });
+ }
+ });
+
+ group('debounceBuffer', () {
+ late StreamController<int> values;
+ late List<List<int>> emittedValues;
+ late List<String> errors;
+ late Stream<List<int>> transformed;
+
+ setUp(() async {
+ values = createController(streamType);
+ emittedValues = [];
+ errors = [];
+ transformed =
+ values.stream.debounceBuffer(const Duration(milliseconds: 5));
+ });
+ void listen() {
+ transformed.listen(emittedValues.add, onError: errors.add);
+ }
+
+ test('Emits all values as a list', () {
+ fakeAsync((async) {
+ listen();
+ values
+ ..add(1)
+ ..add(2)
+ ..close();
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [
+ [1, 2]
+ ]);
+ });
+ });
+
+ test('separate lists for multiple values spaced further than duration',
+ () {
+ fakeAsync((async) {
+ listen();
+ values.add(1);
+ async.elapse(const Duration(milliseconds: 6));
+ values.add(2);
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [
+ [1],
+ [2]
+ ]);
+ });
+ });
+
+ if (streamType == 'broadcast') {
+ test('multiple listeners all get values', () {
+ fakeAsync((async) {
+ listen();
+ var otherValues = <List<int>>[];
+ transformed.listen(otherValues.add);
+ values
+ ..add(1)
+ ..add(2);
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [
+ [1, 2]
+ ]);
+ expect(otherValues, [
+ [1, 2]
+ ]);
+ });
+ });
+ }
+ });
+ });
+ }
+ test('allows nulls', () async {
+ final values = Stream<int?>.fromIterable([null]);
+ final transformed = values.debounce(const Duration(milliseconds: 1));
+ expect(await transformed.toList(), [null]);
+ });
+}
diff --git a/pkgs/stream_transform/test/followd_by_test.dart b/pkgs/stream_transform/test/followd_by_test.dart
new file mode 100644
index 0000000..d600d13
--- /dev/null
+++ b/pkgs/stream_transform/test/followd_by_test.dart
@@ -0,0 +1,159 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ for (var firstType in streamTypes) {
+ for (var secondType in streamTypes) {
+ group('followedBy [$firstType] with [$secondType]', () {
+ late StreamController<int> first;
+ late StreamController<int> second;
+
+ late List<int> emittedValues;
+ late bool firstCanceled;
+ late bool secondCanceled;
+ late bool secondListened;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<int> transformed;
+ late StreamSubscription<int> subscription;
+
+ setUp(() async {
+ firstCanceled = false;
+ secondCanceled = false;
+ secondListened = false;
+ first = createController(firstType)
+ ..onCancel = () {
+ firstCanceled = true;
+ };
+ second = createController(secondType)
+ ..onCancel = () {
+ secondCanceled = true;
+ }
+ ..onListen = () {
+ secondListened = true;
+ };
+ emittedValues = [];
+ errors = [];
+ isDone = false;
+ transformed = first.stream.followedBy(second.stream);
+ subscription = transformed
+ .listen(emittedValues.add, onError: errors.add, onDone: () {
+ isDone = true;
+ });
+ });
+
+ test('adds all values from both streams', () async {
+ first
+ ..add(1)
+ ..add(2);
+ await first.close();
+ await Future(() {});
+ second
+ ..add(3)
+ ..add(4);
+ await Future(() {});
+ expect(emittedValues, [1, 2, 3, 4]);
+ });
+
+ test('Does not listen to second stream before first stream finishes',
+ () async {
+ expect(secondListened, false);
+ await first.close();
+ expect(secondListened, true);
+ });
+
+ test('closes stream after both inputs close', () async {
+ await first.close();
+ await second.close();
+ expect(isDone, true);
+ });
+
+ test('cancels any type of first stream on cancel', () async {
+ await subscription.cancel();
+ expect(firstCanceled, true);
+ });
+
+ if (firstType == 'single subscription') {
+ test(
+ 'cancels any type of second stream on cancel if first is '
+ 'broadcast', () async {
+ await first.close();
+ await subscription.cancel();
+ expect(secondCanceled, true);
+ });
+
+ if (secondType == 'broadcast') {
+ test('can pause and resume during second stream - dropping values',
+ () async {
+ await first.close();
+ subscription.pause();
+ second.add(1);
+ await Future(() {});
+ subscription.resume();
+ second.add(2);
+ await Future(() {});
+ expect(emittedValues, [2]);
+ });
+ } else {
+ test('can pause and resume during second stream - buffering values',
+ () async {
+ await first.close();
+ subscription.pause();
+ second.add(1);
+ await Future(() {});
+ subscription.resume();
+ second.add(2);
+ await Future(() {});
+ expect(emittedValues, [1, 2]);
+ });
+ }
+ }
+
+ if (firstType == 'broadcast') {
+ test('can cancel and relisten during first stream', () async {
+ await subscription.cancel();
+ first.add(1);
+ subscription = transformed.listen(emittedValues.add);
+ first.add(2);
+ await Future(() {});
+ expect(emittedValues, [2]);
+ });
+
+ test('can cancel and relisten during second stream', () async {
+ await first.close();
+ await subscription.cancel();
+ second.add(2);
+ await Future(() {});
+ subscription = transformed.listen(emittedValues.add);
+ second.add(3);
+ await Future(() {});
+ expect(emittedValues, [3]);
+ });
+
+ test('forwards values to multiple listeners', () async {
+ var otherValues = <int>[];
+ transformed.listen(otherValues.add);
+ first.add(1);
+ await first.close();
+ second.add(2);
+ await Future(() {});
+ var thirdValues = <int>[];
+ transformed.listen(thirdValues.add);
+ second.add(3);
+ await Future(() {});
+ expect(emittedValues, [1, 2, 3]);
+ expect(otherValues, [1, 2, 3]);
+ expect(thirdValues, [3]);
+ });
+ }
+ });
+ }
+ }
+}
diff --git a/pkgs/stream_transform/test/from_handlers_test.dart b/pkgs/stream_transform/test/from_handlers_test.dart
new file mode 100644
index 0000000..694199c
--- /dev/null
+++ b/pkgs/stream_transform/test/from_handlers_test.dart
@@ -0,0 +1,183 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/src/from_handlers.dart';
+import 'package:test/test.dart';
+
+void main() {
+ late StreamController<int> values;
+ late List<int> emittedValues;
+ late bool valuesCanceled;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<int> transformed;
+ late StreamSubscription<int> subscription;
+
+ void setUpForController(StreamController<int> controller,
+ Stream<int> Function(Stream<int>) transform) {
+ valuesCanceled = false;
+ values = controller
+ ..onCancel = () {
+ valuesCanceled = true;
+ };
+ emittedValues = [];
+ errors = [];
+ isDone = false;
+ transformed = transform(values.stream);
+ subscription =
+ transformed.listen(emittedValues.add, onError: errors.add, onDone: () {
+ isDone = true;
+ });
+ }
+
+ group('default from_handlers', () {
+ group('Single subscription stream', () {
+ setUp(() {
+ setUpForController(StreamController(),
+ (s) => s.transformByHandlers(onData: (e, sink) => sink.add(e)));
+ });
+
+ test('has correct stream type', () {
+ expect(transformed.isBroadcast, false);
+ });
+
+ test('forwards values', () async {
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ expect(emittedValues, [1, 2]);
+ });
+
+ test('forwards errors', () async {
+ values.addError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+
+ test('forwards done', () async {
+ await values.close();
+ expect(isDone, true);
+ });
+
+ test('forwards cancel', () async {
+ await subscription.cancel();
+ expect(valuesCanceled, true);
+ });
+ });
+
+ group('broadcast stream with muliple listeners', () {
+ late List<int> emittedValues2;
+ late List<String> errors2;
+ late bool isDone2;
+ late StreamSubscription<int> subscription2;
+
+ setUp(() {
+ setUpForController(StreamController.broadcast(),
+ (s) => s.transformByHandlers(onData: (e, sink) => sink.add(e)));
+ emittedValues2 = [];
+ errors2 = [];
+ isDone2 = false;
+ subscription2 = transformed
+ .listen(emittedValues2.add, onError: errors2.add, onDone: () {
+ isDone2 = true;
+ });
+ });
+
+ test('has correct stream type', () {
+ expect(transformed.isBroadcast, true);
+ });
+
+ test('forwards values', () async {
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ expect(emittedValues, [1, 2]);
+ expect(emittedValues2, [1, 2]);
+ });
+
+ test('forwards errors', () async {
+ values.addError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ expect(errors2, ['error']);
+ });
+
+ test('forwards done', () async {
+ await values.close();
+ expect(isDone, true);
+ expect(isDone2, true);
+ });
+
+ test('forwards cancel', () async {
+ await subscription.cancel();
+ expect(valuesCanceled, false);
+ await subscription2.cancel();
+ expect(valuesCanceled, true);
+ });
+ });
+ });
+
+ group('custom handlers', () {
+ group('single subscription', () {
+ setUp(() async {
+ setUpForController(
+ StreamController(),
+ (s) => s.transformByHandlers(onData: (value, sink) {
+ sink.add(value + 1);
+ }));
+ });
+ test('uses transform from handleData', () async {
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ expect(emittedValues, [2, 3]);
+ });
+ });
+
+ group('broadcast stream with multiple listeners', () {
+ late int dataCallCount;
+ late int doneCallCount;
+ late int errorCallCount;
+
+ setUp(() async {
+ dataCallCount = 0;
+ doneCallCount = 0;
+ errorCallCount = 0;
+ setUpForController(
+ StreamController.broadcast(),
+ (s) => s.transformByHandlers(onData: (value, sink) {
+ dataCallCount++;
+ }, onError: (error, stackTrace, sink) {
+ errorCallCount++;
+ sink.addError(error, stackTrace);
+ }, onDone: (sink) {
+ doneCallCount++;
+ }));
+ transformed.listen((_) {}, onError: (_, __) {});
+ });
+
+ test('handles data once', () async {
+ values.add(1);
+ await Future(() {});
+ expect(dataCallCount, 1);
+ });
+
+ test('handles done once', () async {
+ await values.close();
+ expect(doneCallCount, 1);
+ });
+
+ test('handles errors once', () async {
+ values.addError('error');
+ await Future(() {});
+ expect(errorCallCount, 1);
+ });
+ });
+ });
+}
diff --git a/pkgs/stream_transform/test/merge_test.dart b/pkgs/stream_transform/test/merge_test.dart
new file mode 100644
index 0000000..ecbf97f
--- /dev/null
+++ b/pkgs/stream_transform/test/merge_test.dart
@@ -0,0 +1,140 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+void main() {
+ group('merge', () {
+ test('includes all values', () async {
+ var first = Stream.fromIterable([1, 2, 3]);
+ var second = Stream.fromIterable([4, 5, 6]);
+ var allValues = await first.merge(second).toList();
+ expect(allValues, containsAllInOrder([1, 2, 3]));
+ expect(allValues, containsAllInOrder([4, 5, 6]));
+ expect(allValues, hasLength(6));
+ });
+
+ test('cancels both sources', () async {
+ var firstCanceled = false;
+ var first = StreamController<int>()
+ ..onCancel = () {
+ firstCanceled = true;
+ };
+ var secondCanceled = false;
+ var second = StreamController<int>()
+ ..onCancel = () {
+ secondCanceled = true;
+ };
+ var subscription = first.stream.merge(second.stream).listen((_) {});
+ await subscription.cancel();
+ expect(firstCanceled, true);
+ expect(secondCanceled, true);
+ });
+
+ test('completes when both sources complete', () async {
+ var first = StreamController<int>();
+ var second = StreamController<int>();
+ var isDone = false;
+ first.stream.merge(second.stream).listen((_) {}, onDone: () {
+ isDone = true;
+ });
+ await first.close();
+ expect(isDone, false);
+ await second.close();
+ expect(isDone, true);
+ });
+
+ test('can cancel and relisten to broadcast stream', () async {
+ var first = StreamController<int>.broadcast();
+ var second = StreamController<int>();
+ var emittedValues = <int>[];
+ var transformed = first.stream.merge(second.stream);
+ var subscription = transformed.listen(emittedValues.add);
+ first.add(1);
+ second.add(2);
+ await Future(() {});
+ expect(emittedValues, contains(1));
+ expect(emittedValues, contains(2));
+ await subscription.cancel();
+ emittedValues = [];
+ subscription = transformed.listen(emittedValues.add);
+ first.add(3);
+ second.add(4);
+ await Future(() {});
+ expect(emittedValues, contains(3));
+ expect(emittedValues, contains(4));
+ });
+ });
+
+ group('mergeAll', () {
+ test('includes all values', () async {
+ var first = Stream.fromIterable([1, 2, 3]);
+ var second = Stream.fromIterable([4, 5, 6]);
+ var third = Stream.fromIterable([7, 8, 9]);
+ var allValues = await first.mergeAll([second, third]).toList();
+ expect(allValues, containsAllInOrder([1, 2, 3]));
+ expect(allValues, containsAllInOrder([4, 5, 6]));
+ expect(allValues, containsAllInOrder([7, 8, 9]));
+ expect(allValues, hasLength(9));
+ });
+
+ test('handles mix of broadcast and single-subscription', () async {
+ var firstCanceled = false;
+ var first = StreamController<int>.broadcast()
+ ..onCancel = () {
+ firstCanceled = true;
+ };
+ var secondBroadcastCanceled = false;
+ var secondBroadcast = StreamController<int>.broadcast()
+ ..onCancel = () {
+ secondBroadcastCanceled = true;
+ };
+ var secondSingleCanceled = false;
+ var secondSingle = StreamController<int>()
+ ..onCancel = () {
+ secondSingleCanceled = true;
+ };
+
+ var merged =
+ first.stream.mergeAll([secondBroadcast.stream, secondSingle.stream]);
+
+ var firstListenerValues = <int>[];
+ var secondListenerValues = <int>[];
+
+ var firstSubscription = merged.listen(firstListenerValues.add);
+ var secondSubscription = merged.listen(secondListenerValues.add);
+
+ first.add(1);
+ secondBroadcast.add(2);
+ secondSingle.add(3);
+
+ await Future(() {});
+ await firstSubscription.cancel();
+
+ expect(firstCanceled, false);
+ expect(secondBroadcastCanceled, false);
+ expect(secondSingleCanceled, false);
+
+ first.add(4);
+ secondBroadcast.add(5);
+ secondSingle.add(6);
+
+ await Future(() {});
+ await secondSubscription.cancel();
+
+ await Future(() {});
+ expect(firstCanceled, true);
+ expect(secondBroadcastCanceled, true);
+ expect(secondSingleCanceled, false,
+ reason: 'Single subscription streams merged into broadcast streams '
+ 'are not canceled');
+
+ expect(firstListenerValues, [1, 2, 3]);
+ expect(secondListenerValues, [1, 2, 3, 4, 5, 6]);
+ });
+ });
+}
diff --git a/pkgs/stream_transform/test/sample_test.dart b/pkgs/stream_transform/test/sample_test.dart
new file mode 100644
index 0000000..66ca09d
--- /dev/null
+++ b/pkgs/stream_transform/test/sample_test.dart
@@ -0,0 +1,291 @@
+// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ late StreamController<void> trigger;
+ late StreamController<int> values;
+ late List<int> emittedValues;
+ late bool valuesCanceled;
+ late bool triggerCanceled;
+ late bool triggerPaused;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<int> transformed;
+ late StreamSubscription<int> subscription;
+
+ void setUpForStreamTypes(String triggerType, String valuesType,
+ {required bool longPoll}) {
+ valuesCanceled = false;
+ triggerCanceled = false;
+ triggerPaused = false;
+ trigger = createController(triggerType)
+ ..onCancel = () {
+ triggerCanceled = true;
+ };
+ if (triggerType == 'single subscription') {
+ trigger.onPause = () {
+ triggerPaused = true;
+ };
+ }
+ values = createController(valuesType)
+ ..onCancel = () {
+ valuesCanceled = true;
+ };
+ emittedValues = [];
+ errors = [];
+ isDone = false;
+ transformed = values.stream.sample(trigger.stream, longPoll: longPoll);
+ subscription =
+ transformed.listen(emittedValues.add, onError: errors.add, onDone: () {
+ isDone = true;
+ });
+ }
+
+ for (var triggerType in streamTypes) {
+ for (var valuesType in streamTypes) {
+ group('Trigger type: [$triggerType], Values type: [$valuesType]', () {
+ group('general behavior', () {
+ setUp(() {
+ setUpForStreamTypes(triggerType, valuesType, longPoll: true);
+ });
+
+ test('does not emit before `trigger`', () async {
+ values.add(1);
+ await Future(() {});
+ expect(emittedValues, isEmpty);
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [1]);
+ });
+
+ test('keeps most recent event between triggers', () async {
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ trigger.add(null);
+ values
+ ..add(3)
+ ..add(4);
+ await Future(() {});
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [2, 4]);
+ });
+
+ test('cancels value subscription when output canceled', () async {
+ expect(valuesCanceled, false);
+ await subscription.cancel();
+ expect(valuesCanceled, true);
+ });
+
+ test('closes when trigger ends', () async {
+ expect(isDone, false);
+ await trigger.close();
+ await Future(() {});
+ expect(isDone, true);
+ });
+
+ test('closes after outputting final values when source closes',
+ () async {
+ expect(isDone, false);
+ values.add(1);
+ await values.close();
+ expect(isDone, false);
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [1]);
+ expect(isDone, true);
+ });
+
+ test('closes when source closes and there is no pending', () async {
+ expect(isDone, false);
+ await values.close();
+ await Future(() {});
+ expect(isDone, true);
+ });
+
+ test('forwards errors from trigger', () async {
+ trigger.addError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+
+ test('forwards errors from values', () async {
+ values.addError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+ });
+
+ group('long polling', () {
+ setUp(() {
+ setUpForStreamTypes(triggerType, valuesType, longPoll: true);
+ });
+
+ test('emits immediately if trigger emits before a value', () async {
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, isEmpty);
+ values.add(1);
+ await Future(() {});
+ expect(emittedValues, [1]);
+ });
+
+ test('two triggers in a row - emit buffere then emit next value',
+ () async {
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ trigger
+ ..add(null)
+ ..add(null);
+ await Future(() {});
+ values.add(3);
+ await Future(() {});
+ expect(emittedValues, [2, 3]);
+ });
+
+ test('pre-emptive trigger then trigger after values', () async {
+ trigger.add(null);
+ await Future(() {});
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [1, 2]);
+ });
+
+ test('multiple pre-emptive triggers, only emits first value',
+ () async {
+ trigger
+ ..add(null)
+ ..add(null);
+ await Future(() {});
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ expect(emittedValues, [1]);
+ });
+
+ test('closes if there is no waiting long poll when source closes',
+ () async {
+ expect(isDone, false);
+ values.add(1);
+ trigger.add(null);
+ await values.close();
+ await Future(() {});
+ expect(isDone, true);
+ });
+
+ test('waits to emit if there waiting long poll when trigger closes',
+ () async {
+ trigger.add(null);
+ await trigger.close();
+ expect(isDone, false);
+ values.add(1);
+ await Future(() {});
+ expect(emittedValues, [1]);
+ expect(isDone, true);
+ });
+ });
+
+ group('immediate polling', () {
+ setUp(() {
+ setUpForStreamTypes(triggerType, valuesType, longPoll: false);
+ });
+
+ test('ignores trigger before values', () async {
+ trigger.add(null);
+ await Future(() {});
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [2]);
+ });
+
+ test('ignores trigger if no pending values', () async {
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ trigger
+ ..add(null)
+ ..add(null);
+ await Future(() {});
+ values
+ ..add(3)
+ ..add(4);
+ await Future(() {});
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [2, 4]);
+ });
+ });
+ });
+ }
+ }
+
+ test('always cancels trigger if values is singlesubscription', () async {
+ setUpForStreamTypes('broadcast', 'single subscription', longPoll: true);
+ expect(triggerCanceled, false);
+ await subscription.cancel();
+ expect(triggerCanceled, true);
+
+ setUpForStreamTypes('single subscription', 'single subscription',
+ longPoll: true);
+ expect(triggerCanceled, false);
+ await subscription.cancel();
+ expect(triggerCanceled, true);
+ });
+
+ test('cancels trigger if trigger is broadcast', () async {
+ setUpForStreamTypes('broadcast', 'broadcast', longPoll: true);
+ expect(triggerCanceled, false);
+ await subscription.cancel();
+ expect(triggerCanceled, true);
+ });
+
+ test('pauses single subscription trigger for broadcast values', () async {
+ setUpForStreamTypes('single subscription', 'broadcast', longPoll: true);
+ expect(triggerCanceled, false);
+ expect(triggerPaused, false);
+ await subscription.cancel();
+ expect(triggerCanceled, false);
+ expect(triggerPaused, true);
+ });
+
+ for (var triggerType in streamTypes) {
+ test('cancel and relisten with [$triggerType] trigger', () async {
+ setUpForStreamTypes(triggerType, 'broadcast', longPoll: true);
+ values.add(1);
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [1]);
+ await subscription.cancel();
+ values.add(2);
+ trigger.add(null);
+ await Future(() {});
+ subscription = transformed.listen(emittedValues.add);
+ values.add(3);
+ trigger.add(null);
+ await Future(() {});
+ expect(emittedValues, [1, 3]);
+ });
+ }
+}
diff --git a/pkgs/stream_transform/test/scan_test.dart b/pkgs/stream_transform/test/scan_test.dart
new file mode 100644
index 0000000..3c749e7
--- /dev/null
+++ b/pkgs/stream_transform/test/scan_test.dart
@@ -0,0 +1,109 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+void main() {
+ group('Scan', () {
+ test('produces intermediate values', () async {
+ var source = Stream.fromIterable([1, 2, 3, 4]);
+ int sum(int x, int y) => x + y;
+ var result = await source.scan(0, sum).toList();
+
+ expect(result, [1, 3, 6, 10]);
+ });
+
+ test('can create a broadcast stream', () {
+ var source = StreamController<int>.broadcast();
+
+ var transformed = source.stream.scan(null, (_, __) {});
+
+ expect(transformed.isBroadcast, true);
+ });
+
+ test('forwards errors from source', () async {
+ var source = StreamController<int>();
+
+ int sum(int x, int y) => x + y;
+
+ var errors = <Object>[];
+
+ source.stream.scan(0, sum).listen(null, onError: errors.add);
+
+ source.addError(StateError('fail'));
+ await Future(() {});
+
+ expect(errors, [isStateError]);
+ });
+
+ group('with async combine', () {
+ test('returns a Stream of non-futures', () async {
+ var source = Stream.fromIterable([1, 2, 3, 4]);
+ Future<int> sum(int x, int y) async => x + y;
+ var result = await source.scan(0, sum).toList();
+
+ expect(result, [1, 3, 6, 10]);
+ });
+
+ test('can return a Stream of futures when specified', () async {
+ var source = Stream.fromIterable([1, 2]);
+ Future<int> sum(Future<int> x, int y) async => (await x) + y;
+ var result =
+ await source.scan<Future<int>>(Future.value(0), sum).toList();
+
+ expect(result, [
+ const TypeMatcher<Future<void>>(),
+ const TypeMatcher<Future<void>>()
+ ]);
+ expect(await result.wait, [1, 3]);
+ });
+
+ test('does not call for subsequent values while waiting', () async {
+ var source = StreamController<int>();
+
+ var calledWith = <int>[];
+ var block = Completer<void>();
+ Future<int> combine(int x, int y) async {
+ calledWith.add(y);
+ await block.future;
+ return x + y;
+ }
+
+ var results = <int>[];
+
+ unawaited(source.stream.scan(0, combine).forEach(results.add));
+
+ source
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ expect(calledWith, [1]);
+ expect(results, isEmpty);
+
+ block.complete();
+ await Future(() {});
+ expect(calledWith, [1, 2]);
+ expect(results, [1, 3]);
+ });
+
+ test('forwards async errors', () async {
+ var source = StreamController<int>();
+
+ Future<int> combine(int x, int y) async => throw StateError('fail');
+
+ var errors = <Object>[];
+
+ source.stream.scan(0, combine).listen(null, onError: errors.add);
+
+ source.add(1);
+ await Future(() {});
+
+ expect(errors, [isStateError]);
+ });
+ });
+ });
+}
diff --git a/pkgs/stream_transform/test/start_with_test.dart b/pkgs/stream_transform/test/start_with_test.dart
new file mode 100644
index 0000000..35f0330
--- /dev/null
+++ b/pkgs/stream_transform/test/start_with_test.dart
@@ -0,0 +1,167 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ late StreamController<int> values;
+ late Stream<int> transformed;
+ late StreamSubscription<int> subscription;
+
+ late List<int> emittedValues;
+ late bool isDone;
+
+ void setupForStreamType(
+ String streamType, Stream<int> Function(Stream<int>) transform) {
+ emittedValues = [];
+ isDone = false;
+ values = createController(streamType);
+ transformed = transform(values.stream);
+ subscription =
+ transformed.listen(emittedValues.add, onDone: () => isDone = true);
+ }
+
+ for (var streamType in streamTypes) {
+ group('startWith then [$streamType]', () {
+ setUp(() => setupForStreamType(streamType, (s) => s.startWith(1)));
+
+ test('outputs all values', () async {
+ values
+ ..add(2)
+ ..add(3);
+ await Future(() {});
+ expect(emittedValues, [1, 2, 3]);
+ });
+
+ test('outputs initial when followed by empty stream', () async {
+ await values.close();
+ expect(emittedValues, [1]);
+ });
+
+ test('closes with values', () async {
+ expect(isDone, false);
+ await values.close();
+ expect(isDone, true);
+ });
+
+ if (streamType == 'broadcast') {
+ test('can cancel and relisten', () async {
+ values.add(2);
+ await Future(() {});
+ await subscription.cancel();
+ subscription = transformed.listen(emittedValues.add);
+ values.add(3);
+ await Future(() {});
+ await Future(() {});
+ expect(emittedValues, [1, 2, 3]);
+ });
+ }
+ });
+
+ group('startWithMany then [$streamType]', () {
+ setUp(() async {
+ setupForStreamType(streamType, (s) => s.startWithMany([1, 2]));
+ // Ensure all initial values go through
+ await Future(() {});
+ });
+
+ test('outputs all values', () async {
+ values
+ ..add(3)
+ ..add(4);
+ await Future(() {});
+ expect(emittedValues, [1, 2, 3, 4]);
+ });
+
+ test('outputs initial when followed by empty stream', () async {
+ await values.close();
+ expect(emittedValues, [1, 2]);
+ });
+
+ test('closes with values', () async {
+ expect(isDone, false);
+ await values.close();
+ expect(isDone, true);
+ });
+
+ if (streamType == 'broadcast') {
+ test('can cancel and relisten', () async {
+ values.add(3);
+ await Future(() {});
+ await subscription.cancel();
+ subscription = transformed.listen(emittedValues.add);
+ values.add(4);
+ await Future(() {});
+ expect(emittedValues, [1, 2, 3, 4]);
+ });
+ }
+ });
+
+ for (var startingStreamType in streamTypes) {
+ group('startWithStream [$startingStreamType] then [$streamType]', () {
+ late StreamController<int> starting;
+ setUp(() async {
+ starting = createController(startingStreamType);
+ setupForStreamType(
+ streamType, (s) => s.startWithStream(starting.stream));
+ });
+
+ test('outputs all values', () async {
+ starting
+ ..add(1)
+ ..add(2);
+ await starting.close();
+ values
+ ..add(3)
+ ..add(4);
+ await Future(() {});
+ expect(emittedValues, [1, 2, 3, 4]);
+ });
+
+ test('closes with values', () async {
+ expect(isDone, false);
+ await starting.close();
+ expect(isDone, false);
+ await values.close();
+ expect(isDone, true);
+ });
+
+ if (streamType == 'broadcast') {
+ test('can cancel and relisten during starting', () async {
+ starting.add(1);
+ await Future(() {});
+ await subscription.cancel();
+ subscription = transformed.listen(emittedValues.add);
+ starting.add(2);
+ await starting.close();
+ values
+ ..add(3)
+ ..add(4);
+ await Future(() {});
+ expect(emittedValues, [1, 2, 3, 4]);
+ });
+
+ test('can cancel and relisten during values', () async {
+ starting
+ ..add(1)
+ ..add(2);
+ await starting.close();
+ values.add(3);
+ await Future(() {});
+ await subscription.cancel();
+ subscription = transformed.listen(emittedValues.add);
+ values.add(4);
+ await Future(() {});
+ expect(emittedValues, [1, 2, 3, 4]);
+ });
+ }
+ });
+ }
+ }
+}
diff --git a/pkgs/stream_transform/test/switch_test.dart b/pkgs/stream_transform/test/switch_test.dart
new file mode 100644
index 0000000..9e70c08
--- /dev/null
+++ b/pkgs/stream_transform/test/switch_test.dart
@@ -0,0 +1,229 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ for (var outerType in streamTypes) {
+ for (var innerType in streamTypes) {
+ group('Outer type: [$outerType], Inner type: [$innerType]', () {
+ late StreamController<int> first;
+ late StreamController<int> second;
+ late StreamController<int> third;
+ late StreamController<Stream<int>> outer;
+
+ late List<int> emittedValues;
+ late bool firstCanceled;
+ late bool outerCanceled;
+ late bool isDone;
+ late List<String> errors;
+ late StreamSubscription<int> subscription;
+
+ setUp(() async {
+ firstCanceled = false;
+ outerCanceled = false;
+ outer = createController(outerType)
+ ..onCancel = () {
+ outerCanceled = true;
+ };
+ first = createController(innerType)
+ ..onCancel = () {
+ firstCanceled = true;
+ };
+ second = createController(innerType);
+ third = createController(innerType);
+ emittedValues = [];
+ errors = [];
+ isDone = false;
+ subscription = outer.stream
+ .switchLatest()
+ .listen(emittedValues.add, onError: errors.add, onDone: () {
+ isDone = true;
+ });
+ });
+
+ test('forwards events', () async {
+ outer.add(first.stream);
+ await Future(() {});
+ first
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+
+ outer.add(second.stream);
+ await Future(() {});
+ second
+ ..add(3)
+ ..add(4);
+ await Future(() {});
+
+ expect(emittedValues, [1, 2, 3, 4]);
+ });
+
+ test('forwards errors from outer Stream', () async {
+ outer.addError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+
+ test('forwards errors from inner Stream', () async {
+ outer.add(first.stream);
+ await Future(() {});
+ first.addError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+
+ test('closes when final stream is done', () async {
+ outer.add(first.stream);
+ await Future(() {});
+
+ outer.add(second.stream);
+ await Future(() {});
+
+ await outer.close();
+ expect(isDone, false);
+
+ await second.close();
+ expect(isDone, true);
+ });
+
+ test(
+ 'closes when outer stream closes if latest inner stream already '
+ 'closed', () async {
+ outer.add(first.stream);
+ await Future(() {});
+ await first.close();
+ expect(isDone, false);
+
+ await outer.close();
+ expect(isDone, true);
+ });
+
+ test('cancels listeners on previous streams', () async {
+ outer.add(first.stream);
+ await Future(() {});
+
+ outer.add(second.stream);
+ await Future(() {});
+ expect(firstCanceled, true);
+ });
+
+ if (innerType != 'broadcast') {
+ test('waits for cancel before listening to subsequent stream',
+ () async {
+ var cancelWork = Completer<void>();
+ first.onCancel = () => cancelWork.future;
+ outer.add(first.stream);
+ await Future(() {});
+
+ var cancelDone = false;
+ second.onListen = expectAsync0(() {
+ expect(cancelDone, true);
+ });
+ outer.add(second.stream);
+ await Future(() {});
+ cancelWork.complete();
+ cancelDone = true;
+ });
+
+ test('all streams are listened to, even while cancelling', () async {
+ var cancelWork = Completer<void>();
+ first.onCancel = () => cancelWork.future;
+ outer.add(first.stream);
+ await Future(() {});
+
+ var cancelDone = false;
+ second.onListen = expectAsync0(() {
+ expect(cancelDone, true);
+ });
+ third.onListen = expectAsync0(() {
+ expect(cancelDone, true);
+ });
+ outer
+ ..add(second.stream)
+ ..add(third.stream);
+ await Future(() {});
+ cancelWork.complete();
+ cancelDone = true;
+ });
+ }
+
+ if (outerType != 'broadcast' && innerType != 'broadcast') {
+ test('pausing while cancelling an inner stream is respected',
+ () async {
+ var cancelWork = Completer<void>();
+ first.onCancel = () => cancelWork.future;
+ outer.add(first.stream);
+ await Future(() {});
+
+ var cancelDone = false;
+ second.onListen = expectAsync0(() {
+ expect(cancelDone, true);
+ });
+ outer.add(second.stream);
+ await Future(() {});
+ subscription.pause();
+ cancelWork.complete();
+ cancelDone = true;
+ await Future(() {});
+ expect(second.isPaused, true);
+ subscription.resume();
+ });
+ }
+
+ test('cancels listener on current and outer stream on cancel',
+ () async {
+ outer.add(first.stream);
+ await Future(() {});
+ await subscription.cancel();
+
+ await Future(() {});
+ expect(outerCanceled, true);
+ expect(firstCanceled, true);
+ });
+ });
+ }
+ }
+
+ group('switchMap', () {
+ test('uses map function', () async {
+ var outer = StreamController<List<int>>();
+
+ var values = <int>[];
+ outer.stream.switchMap(Stream.fromIterable).listen(values.add);
+
+ outer.add([1, 2, 3]);
+ await Future(() {});
+ outer.add([4, 5, 6]);
+ await Future(() {});
+ expect(values, [1, 2, 3, 4, 5, 6]);
+ });
+
+ test('can create a broadcast stream', () async {
+ var outer = StreamController<int>.broadcast();
+
+ var transformed =
+ outer.stream.switchMap((_) => const Stream<int>.empty());
+
+ expect(transformed.isBroadcast, true);
+ });
+
+ test('forwards errors from the convert callback', () async {
+ var errors = <String>[];
+ var source = Stream.fromIterable([1, 2, 3]);
+ source.switchMap<int>((i) {
+ // ignore: only_throw_errors
+ throw 'Error: $i';
+ }).listen((_) {}, onError: errors.add);
+ await Future<void>(() {});
+ expect(errors, ['Error: 1', 'Error: 2', 'Error: 3']);
+ });
+ });
+}
diff --git a/pkgs/stream_transform/test/take_until_test.dart b/pkgs/stream_transform/test/take_until_test.dart
new file mode 100644
index 0000000..982b3da
--- /dev/null
+++ b/pkgs/stream_transform/test/take_until_test.dart
@@ -0,0 +1,135 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ for (var streamType in streamTypes) {
+ group('takeUntil on Stream type [$streamType]', () {
+ late StreamController<int> values;
+ late List<int> emittedValues;
+ late bool valuesCanceled;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<int> transformed;
+ late StreamSubscription<int> subscription;
+ late Completer<void> closeTrigger;
+
+ setUp(() {
+ valuesCanceled = false;
+ values = createController(streamType)
+ ..onCancel = () {
+ valuesCanceled = true;
+ };
+ emittedValues = [];
+ errors = [];
+ isDone = false;
+ closeTrigger = Completer();
+ transformed = values.stream.takeUntil(closeTrigger.future);
+ subscription = transformed
+ .listen(emittedValues.add, onError: errors.add, onDone: () {
+ isDone = true;
+ });
+ });
+
+ test('forwards cancellation', () async {
+ await subscription.cancel();
+ expect(valuesCanceled, true);
+ });
+
+ test('lets values through before trigger', () async {
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ expect(emittedValues, [1, 2]);
+ });
+
+ test('forwards errors', () async {
+ values.addError('error');
+ await Future(() {});
+ expect(errors, ['error']);
+ });
+
+ test('sends done if original strem ends', () async {
+ await values.close();
+ expect(isDone, true);
+ });
+
+ test('sends done when trigger fires', () async {
+ closeTrigger.complete();
+ await Future(() {});
+ expect(isDone, true);
+ });
+
+ test('forwards errors from the close trigger', () async {
+ closeTrigger.completeError('sad');
+ await Future(() {});
+ expect(errors, ['sad']);
+ expect(isDone, true);
+ });
+
+ test('ignores errors from the close trigger after stream closed',
+ () async {
+ await values.close();
+ closeTrigger.completeError('sad');
+ await Future(() {});
+ expect(errors, <Object>[]);
+ });
+
+ test('cancels value subscription when trigger fires', () async {
+ closeTrigger.complete();
+ await Future(() {});
+ expect(valuesCanceled, true);
+ });
+
+ if (streamType == 'broadcast') {
+ test('multiple listeners all get values', () async {
+ var otherValues = <Object>[];
+ transformed.listen(otherValues.add);
+ values
+ ..add(1)
+ ..add(2);
+ await Future(() {});
+ expect(emittedValues, [1, 2]);
+ expect(otherValues, [1, 2]);
+ });
+
+ test('multiple listeners get done when trigger fires', () async {
+ var otherDone = false;
+ transformed.listen(null, onDone: () => otherDone = true);
+ closeTrigger.complete();
+ await Future(() {});
+ expect(otherDone, true);
+ expect(isDone, true);
+ });
+
+ test('multiple listeners get done when values end', () async {
+ var otherDone = false;
+ transformed.listen(null, onDone: () => otherDone = true);
+ await values.close();
+ expect(otherDone, true);
+ expect(isDone, true);
+ });
+
+ test('can cancel and relisten before trigger fires', () async {
+ values.add(1);
+ await Future(() {});
+ await subscription.cancel();
+ values.add(2);
+ await Future(() {});
+ subscription = transformed.listen(emittedValues.add);
+ values.add(3);
+ await Future(() {});
+ expect(emittedValues, [1, 3]);
+ });
+ }
+ });
+ }
+}
diff --git a/pkgs/stream_transform/test/tap_test.dart b/pkgs/stream_transform/test/tap_test.dart
new file mode 100644
index 0000000..f2b4346
--- /dev/null
+++ b/pkgs/stream_transform/test/tap_test.dart
@@ -0,0 +1,116 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+void main() {
+ test('calls function for values', () async {
+ var valuesSeen = <int>[];
+ var stream = Stream.fromIterable([1, 2, 3]);
+ await stream.tap(valuesSeen.add).last;
+ expect(valuesSeen, [1, 2, 3]);
+ });
+
+ test('forwards values', () async {
+ var stream = Stream.fromIterable([1, 2, 3]);
+ var values = await stream.tap((_) {}).toList();
+ expect(values, [1, 2, 3]);
+ });
+
+ test('calls function for errors', () async {
+ dynamic error;
+ var source = StreamController<int>();
+ source.stream.tap((_) {}, onError: (e, st) {
+ error = e;
+ }).listen((_) {}, onError: (_) {});
+ source.addError('error');
+ await Future(() {});
+ expect(error, 'error');
+ });
+
+ test('forwards errors', () async {
+ dynamic error;
+ var source = StreamController<int>();
+ source.stream.tap((_) {}, onError: (e, st) {}).listen((_) {},
+ onError: (Object e) {
+ error = e;
+ });
+ source.addError('error');
+ await Future(() {});
+ expect(error, 'error');
+ });
+
+ test('calls function on done', () async {
+ var doneCalled = false;
+ var source = StreamController<int>();
+ source.stream.tap((_) {}, onDone: () {
+ doneCalled = true;
+ }).listen((_) {});
+ await source.close();
+ expect(doneCalled, true);
+ });
+
+ test('forwards only once with multiple listeners on a broadcast stream',
+ () async {
+ var dataCallCount = 0;
+ var source = StreamController<int>.broadcast();
+ source.stream.tap((_) {
+ dataCallCount++;
+ })
+ ..listen((_) {})
+ ..listen((_) {});
+ source.add(1);
+ await Future(() {});
+ expect(dataCallCount, 1);
+ });
+
+ test(
+ 'forwards errors only once with multiple listeners on a broadcast stream',
+ () async {
+ var errorCallCount = 0;
+ var source = StreamController<int>.broadcast();
+ source.stream.tap((_) {}, onError: (_, __) {
+ errorCallCount++;
+ })
+ ..listen((_) {}, onError: (_, __) {})
+ ..listen((_) {}, onError: (_, __) {});
+ source.addError('error');
+ await Future(() {});
+ expect(errorCallCount, 1);
+ });
+
+ test('calls onDone only once with multiple listeners on a broadcast stream',
+ () async {
+ var doneCallCount = 0;
+ var source = StreamController<int>.broadcast();
+ source.stream.tap((_) {}, onDone: () {
+ doneCallCount++;
+ })
+ ..listen((_) {})
+ ..listen((_) {});
+ await source.close();
+ expect(doneCallCount, 1);
+ });
+
+ test('forwards values to multiple listeners', () async {
+ var source = StreamController<int>.broadcast();
+ var emittedValues1 = <int>[];
+ var emittedValues2 = <int>[];
+ source.stream.tap((_) {})
+ ..listen(emittedValues1.add)
+ ..listen(emittedValues2.add);
+ source.add(1);
+ await Future(() {});
+ expect(emittedValues1, [1]);
+ expect(emittedValues2, [1]);
+ });
+
+ test('allows null callback', () async {
+ var stream = Stream.fromIterable([1, 2, 3]);
+ await stream.tap(null).last;
+ });
+}
diff --git a/pkgs/stream_transform/test/throttle_test.dart b/pkgs/stream_transform/test/throttle_test.dart
new file mode 100644
index 0000000..07f607a
--- /dev/null
+++ b/pkgs/stream_transform/test/throttle_test.dart
@@ -0,0 +1,193 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:fake_async/fake_async.dart';
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ for (var streamType in streamTypes) {
+ group('Stream type [$streamType]', () {
+ late StreamController<int> values;
+ late List<int> emittedValues;
+ late bool valuesCanceled;
+ late bool isDone;
+ late Stream<int> transformed;
+ late StreamSubscription<int> subscription;
+
+ group('throttle - trailing: false', () {
+ setUp(() async {
+ valuesCanceled = false;
+ values = createController(streamType)
+ ..onCancel = () {
+ valuesCanceled = true;
+ };
+ emittedValues = [];
+ isDone = false;
+ transformed = values.stream.throttle(const Duration(milliseconds: 5));
+ });
+
+ void listen() {
+ subscription = transformed.listen(emittedValues.add, onDone: () {
+ isDone = true;
+ });
+ }
+
+ test('cancels values', () async {
+ listen();
+ await subscription.cancel();
+ expect(valuesCanceled, true);
+ });
+
+ test('swallows values that come faster than duration', () {
+ fakeAsync((async) {
+ listen();
+ values
+ ..add(1)
+ ..add(2)
+ ..close();
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [1]);
+ });
+ });
+
+ test('outputs multiple values spaced further than duration', () {
+ fakeAsync((async) {
+ listen();
+ values.add(1);
+ async.elapse(const Duration(milliseconds: 6));
+ values.add(2);
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [1, 2]);
+ async.elapse(const Duration(milliseconds: 6));
+ });
+ });
+
+ test('closes output immediately', () {
+ fakeAsync((async) {
+ listen();
+ values.add(1);
+ async.elapse(const Duration(milliseconds: 6));
+ values
+ ..add(2)
+ ..close();
+ async.flushMicrotasks();
+ expect(isDone, true);
+ });
+ });
+
+ if (streamType == 'broadcast') {
+ test('multiple listeners all get values', () {
+ fakeAsync((async) {
+ listen();
+ var otherValues = <int>[];
+ transformed.listen(otherValues.add);
+ values.add(1);
+ async.flushMicrotasks();
+ expect(emittedValues, [1]);
+ expect(otherValues, [1]);
+ });
+ });
+ }
+ });
+
+ group('throttle - trailing: true', () {
+ setUp(() async {
+ valuesCanceled = false;
+ values = createController(streamType)
+ ..onCancel = () {
+ valuesCanceled = true;
+ };
+ emittedValues = [];
+ isDone = false;
+ transformed = values.stream
+ .throttle(const Duration(milliseconds: 5), trailing: true);
+ });
+ void listen() {
+ subscription = transformed.listen(emittedValues.add, onDone: () {
+ isDone = true;
+ });
+ }
+
+ test('emits both first and last in a period', () {
+ fakeAsync((async) {
+ listen();
+ values
+ ..add(1)
+ ..add(2)
+ ..close();
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [1, 2]);
+ });
+ });
+
+ test('swallows values that are not the latest in a period', () {
+ fakeAsync((async) {
+ listen();
+ values
+ ..add(1)
+ ..add(2)
+ ..add(3)
+ ..close();
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [1, 3]);
+ });
+ });
+
+ test('waits to output the last value even if the stream closes',
+ () async {
+ fakeAsync((async) {
+ listen();
+ values
+ ..add(1)
+ ..add(2)
+ ..close();
+ async.flushMicrotasks();
+ expect(isDone, false);
+ expect(emittedValues, [1],
+ reason: 'Should not be emitted until after duration');
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [1, 2]);
+ expect(isDone, true);
+ async.elapse(const Duration(milliseconds: 6));
+ });
+ });
+
+ test('closes immediately if there is no pending value', () {
+ fakeAsync((async) {
+ listen();
+ values
+ ..add(1)
+ ..close();
+ async.flushMicrotasks();
+ expect(isDone, true);
+ });
+ });
+
+ if (streamType == 'broadcast') {
+ test('multiple listeners all get values', () {
+ fakeAsync((async) {
+ listen();
+ var otherValues = <int>[];
+ transformed.listen(otherValues.add);
+ values
+ ..add(1)
+ ..add(2);
+ async.flushMicrotasks();
+ expect(emittedValues, [1]);
+ expect(otherValues, [1]);
+ async.elapse(const Duration(milliseconds: 6));
+ expect(emittedValues, [1, 2]);
+ expect(otherValues, [1, 2]);
+ });
+ });
+ }
+ });
+ });
+ }
+}
diff --git a/pkgs/stream_transform/test/utils.dart b/pkgs/stream_transform/test/utils.dart
new file mode 100644
index 0000000..42d9613
--- /dev/null
+++ b/pkgs/stream_transform/test/utils.dart
@@ -0,0 +1,19 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+StreamController<T> createController<T>(String streamType) {
+ switch (streamType) {
+ case 'single subscription':
+ return StreamController<T>();
+ case 'broadcast':
+ return StreamController<T>.broadcast();
+ default:
+ throw ArgumentError.value(
+ streamType, 'streamType', 'Must be one of $streamTypes');
+ }
+}
+
+const streamTypes = ['single subscription', 'broadcast'];
diff --git a/pkgs/stream_transform/test/where_not_null_test.dart b/pkgs/stream_transform/test/where_not_null_test.dart
new file mode 100644
index 0000000..c9af794
--- /dev/null
+++ b/pkgs/stream_transform/test/where_not_null_test.dart
@@ -0,0 +1,56 @@
+// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+void main() {
+ test('forwards only events that match the type', () async {
+ var values = Stream.fromIterable([null, 'a', null, 'b']);
+ var filtered = values.whereNotNull();
+ expect(await filtered.toList(), ['a', 'b']);
+ });
+
+ test('can result in empty stream', () async {
+ var values = Stream<Object?>.fromIterable([null, null]);
+ var filtered = values.whereNotNull();
+ expect(await filtered.isEmpty, true);
+ });
+
+ test('forwards values to multiple listeners', () async {
+ var values = StreamController<Object?>.broadcast();
+ var filtered = values.stream.whereNotNull();
+ var firstValues = <Object>[];
+ var secondValues = <Object>[];
+ filtered
+ ..listen(firstValues.add)
+ ..listen(secondValues.add);
+ values
+ ..add(null)
+ ..add('a')
+ ..add(null)
+ ..add('b');
+ await Future(() {});
+ expect(firstValues, ['a', 'b']);
+ expect(secondValues, ['a', 'b']);
+ });
+
+ test('closes streams with multiple listeners', () async {
+ var values = StreamController<Object?>.broadcast();
+ var filtered = values.stream.whereNotNull();
+ var firstDone = false;
+ var secondDone = false;
+ filtered
+ ..listen(null, onDone: () => firstDone = true)
+ ..listen(null, onDone: () => secondDone = true);
+ values
+ ..add(null)
+ ..add('a');
+ await values.close();
+ expect(firstDone, true);
+ expect(secondDone, true);
+ });
+}
diff --git a/pkgs/stream_transform/test/where_type_test.dart b/pkgs/stream_transform/test/where_type_test.dart
new file mode 100644
index 0000000..4cbea37
--- /dev/null
+++ b/pkgs/stream_transform/test/where_type_test.dart
@@ -0,0 +1,56 @@
+// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+import 'package:test/test.dart';
+
+void main() {
+ test('forwards only events that match the type', () async {
+ var values = Stream.fromIterable([1, 'a', 2, 'b']);
+ var filtered = values.whereType<String>();
+ expect(await filtered.toList(), ['a', 'b']);
+ });
+
+ test('can result in empty stream', () async {
+ var values = Stream.fromIterable([1, 2, 3, 4]);
+ var filtered = values.whereType<String>();
+ expect(await filtered.isEmpty, true);
+ });
+
+ test('forwards values to multiple listeners', () async {
+ var values = StreamController<Object>.broadcast();
+ var filtered = values.stream.whereType<String>();
+ var firstValues = <Object>[];
+ var secondValues = <Object>[];
+ filtered
+ ..listen(firstValues.add)
+ ..listen(secondValues.add);
+ values
+ ..add(1)
+ ..add('a')
+ ..add(2)
+ ..add('b');
+ await Future(() {});
+ expect(firstValues, ['a', 'b']);
+ expect(secondValues, ['a', 'b']);
+ });
+
+ test('closes streams with multiple listeners', () async {
+ var values = StreamController<Object>.broadcast();
+ var filtered = values.stream.whereType<String>();
+ var firstDone = false;
+ var secondDone = false;
+ filtered
+ ..listen(null, onDone: () => firstDone = true)
+ ..listen(null, onDone: () => secondDone = true);
+ values
+ ..add(1)
+ ..add('a');
+ await values.close();
+ expect(firstDone, true);
+ expect(secondDone, true);
+ });
+}