Merge package:stream_transform into the tools monorepo
diff --git a/pkgs/stream_transform/.github/dependabot.yaml b/pkgs/stream_transform/.github/dependabot.yaml new file mode 100644 index 0000000..bf6b38a --- /dev/null +++ b/pkgs/stream_transform/.github/dependabot.yaml
@@ -0,0 +1,14 @@ +# Dependabot configuration file. +version: 2 + +updates: + - package-ecosystem: github-actions + directory: / + schedule: + interval: monthly + labels: + - autosubmit + groups: + github-actions: + patterns: + - "*"
diff --git a/pkgs/stream_transform/.github/workflows/test-package.yml b/pkgs/stream_transform/.github/workflows/test-package.yml new file mode 100644 index 0000000..6f545fb --- /dev/null +++ b/pkgs/stream_transform/.github/workflows/test-package.yml
@@ -0,0 +1,62 @@ +name: Dart CI + +on: + # Run on PRs and pushes to the default branch. + push: + branches: [ master ] + pull_request: + branches: [ master ] + schedule: + - cron: "0 0 * * 0" + +env: + PUB_ENVIRONMENT: bot.github + +jobs: + # Check code formatting and static analysis on a single OS (linux) + # against Dart dev. + analyze: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + sdk: [dev] + steps: + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 + - uses: dart-lang/setup-dart@e630b99d28a3b71860378cafdc2a067c71107f94 + with: + sdk: ${{ matrix.sdk }} + - id: install + name: Install dependencies + run: dart pub get + - name: Check formatting + run: dart format --output=none --set-exit-if-changed . + if: always() && steps.install.outcome == 'success' + - name: Analyze code + run: dart analyze --fatal-infos + if: always() && steps.install.outcome == 'success' + + # Run tests on a matrix consisting of two dimensions: + # 1. OS: ubuntu-latest, (macos-latest, windows-latest) + # 2. release channel: dev + test: + needs: analyze + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + # Add macos-latest and/or windows-latest if relevant for this package. + os: [ubuntu-latest] + # Bump SDK for Legacy tests when changing min SDK. + sdk: [3.1, dev] + steps: + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 + - uses: dart-lang/setup-dart@e630b99d28a3b71860378cafdc2a067c71107f94 + with: + sdk: ${{ matrix.sdk }} + - id: install + name: Install dependencies + run: dart pub get + - name: Run tests + run: dart test -p chrome,vm --test-randomize-ordering-seed=random + if: always() && steps.install.outcome == 'success'
diff --git a/pkgs/stream_transform/.gitignore b/pkgs/stream_transform/.gitignore new file mode 100644 index 0000000..bfffcc6 --- /dev/null +++ b/pkgs/stream_transform/.gitignore
@@ -0,0 +1,6 @@ +.pub/ +.dart_tool/ +build/ +packages +pubspec.lock +.packages
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md new file mode 100644 index 0000000..c1e2c0e --- /dev/null +++ b/pkgs/stream_transform/CHANGELOG.md
@@ -0,0 +1,184 @@ +## 2.1.1-wip + +- Require Dart 3.1 or greater +- Forward errors from the `trigger` future through to the result stream in + `takeUntil`. Previously an error would have not closed the stream, and instead + raised as an unhandled async error. + +## 2.1.0 + +- Add `whereNotNull`. + +## 2.0.1 + +- Require Dart 2.14 or greater. +- Wait for the future returned from `StreamSubscription.cancel()` before + listening to the subsequent stream in `switchLatest` and `switchMap`. + +## 2.0.0 + +- Migrate to null safety. +- Improve tests of `switchMap` and improve documentation with links and + clarification. +- Add `trailing` argument to `throttle`. + +## 1.2.0 + +- Add support for emitting the "leading" event in `debounce`. + +## 1.1.1 + +- Fix a bug in `asyncMapSample`, `buffer`, `combineLatest`, + `combineLatestAll`, `merge`, and `mergeAll` which would cause an exception + when cancelling a subscription after using the transformer if the original + stream(s) returned `null` from cancelling their subscriptions. + +## 1.1.0 + +- Add `concurrentAsyncExpand` to interleave events emitted by multiple sub + streams created by a callback. + +## 1.0.0 + +- Remove the top level methods and retain the extensions only. + +## 0.0.20 + +- Add extension methods for most transformers. These should be used in place + of the current methods. All current implementations are deprecated and will + be removed in the next major version bump. + - Migrating typical use: Instead of + `stream.transform(debounce(Duration(seconds: 1)))` use + `stream.debounce(Duration(seconds: 1))`. + - To migrate a usage where a `StreamTransformer` instance is stored or + passed see "Getting a StreamTransformer instance" on the README. +- The `map` and `chainTransformers` utilities are no longer useful with the + new patterns so they are deprecated without a replacement. If you still have + a need for them they can be replicated with `StreamTransformer.fromBind`: + + ``` + // Replace `map(convert)` + StreamTransformer.fromBind((s) => s.map(convert)); + + // Replace `chainTransformers(first, second)` + StreamTransformer.fromBind((s) => s.transform(first).transform(second)); + ``` + +## 0.0.19 + +- Add `asyncMapSample` transform. + +## 0.0.18 + +- Internal cleanup. Passed "trigger" streams or futures now allow `<void>` + generic type rather than an implicit `dynamic>` + +## 0.0.17 + +- Add concrete types to the `onError` callback in `tap`. + +## 0.0.16+1 + +- Remove usage of Set literal which is not available before Dart 2.2.0 + +## 0.0.16 + +- Allow a `combine` callback to return a `FutureOr<T>` in `scan`. There are no + behavior changes for synchronous callbacks. **Potential breaking change** In + the unlikely situation where `scan` was used to produce a `Stream<Future>` + inference may now fail and require explicit generic type arguments. +- Add `combineLatest`. +- Add `combineLatestAll`. + +## 0.0.15 + +- Add `whereType`. + +## 0.0.14+1 + +- Allow using non-dev Dart 2 SDK. + +## 0.0.14 + +- `asyncWhere` will now forward exceptions thrown by the callback through the + result Stream. +- Added `concurrentAsyncMap`. + +## 0.0.13 + +- `mergeAll` now accepts an `Iterable<Stream>` instead of only `List<Stream>`. + +## 0.0.12 + +- Add `chainTransformers` and `map` for use cases where `StreamTransformer` + instances are stored as variables or passed to methods other than `transform`. + +## 0.0.11 + +- Renamed `concat` as `followedBy` to match the naming of `Iterable.followedBy`. + `concat` is now deprecated. + +## 0.0.10 + +- Updates to support Dart 2.0 core library changes (wave + 2.2). See [issue 31847][sdk#31847] for details. + + [sdk#31847]: https://github.com/dart-lang/sdk/issues/31847 + +## 0.0.9 + +- Add `asyncMapBuffer`. + +## 0.0.8 + +- Add `takeUntil`. + +## 0.0.7 + +- Bug Fix: Streams produced with `scan` and `switchMap` now correctly report + `isBroadcast`. +- Add `startWith`, `startWithMany`, and `startWithStream`. + +## 0.0.6 + +- Bug Fix: Some transformers did not correctly add data to all listeners on + broadcast streams. Fixed for `throttle`, `debounce`, `asyncWhere` and `audit`. +- Bug Fix: Only call the `tap` data callback once per event rather than once per + listener. +- Bug Fix: Allow canceling and re-listening to broadcast streams after a + `merge` transform. +- Bug Fix: Broadcast streams which are buffered using a single-subscription + trigger can be canceled and re-listened. +- Bug Fix: Buffer outputs one more value if there is a pending trigger before + the trigger closes. +- Bug Fix: Single-subscription streams concatted after broadcast streams are + handled correctly. +- Use sync `StreamControllers` for forwarding where possible. + +## 0.0.5 + +- Bug Fix: Allow compiling switchLatest with Dart2Js. +- Add `asyncWhere`: Like `where` but allows an asynchronous predicate. + +## 0.0.4 +- Add `scan`: fold which returns intermediate values +- Add `throttle`: block events for a duration after emitting a value +- Add `audit`: emits the last event received after a duration + +## 0.0.3 + +- Add `tap`: React to values as they pass without being a subscriber on a stream +- Add `switchMap` and `switchLatest`: Flatten a Stream of Streams into a Stream + which forwards values from the most recent Stream + +## 0.0.2 + +- Add `concat`: Appends streams in series +- Add `merge` and `mergeAll`: Interleaves streams + +## 0.0.1 + +- Initial release with the following utilities: + - `buffer`: Collects events in a `List` until a `trigger` stream fires. + - `debounce`, `debounceBuffer`: Collect or drop events which occur closer in + time than a given duration.
diff --git a/pkgs/stream_transform/LICENSE b/pkgs/stream_transform/LICENSE new file mode 100644 index 0000000..03af64a --- /dev/null +++ b/pkgs/stream_transform/LICENSE
@@ -0,0 +1,27 @@ +Copyright 2017, the Dart project authors. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + * Neither the name of Google LLC nor the names of its + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/pkgs/stream_transform/README.md b/pkgs/stream_transform/README.md new file mode 100644 index 0000000..68d84d9 --- /dev/null +++ b/pkgs/stream_transform/README.md
@@ -0,0 +1,141 @@ +[](https://github.com/dart-lang/stream_transform/actions/workflows/test-package.yml) +[](https://pub.dev/packages/stream_transform) +[](https://pub.dev/packages/stream_transform/publisher) + +Extension methods on `Stream` adding common transform operators. + +## Operators + +### asyncMapBuffer, asyncMapSample, concurrentAsyncMap + +Alternatives to `asyncMap`. `asyncMapBuffer` prevents the callback from +overlapping execution and collects events while it is executing. +`asyncMapSample` prevents overlapping execution and discards events while it is +executing. `concurrentAsyncMap` allows overlap and removes ordering guarantees +for higher throughput. + +Like `asyncMap` but events are buffered in a List until previous events have +been processed rather than being called for each element individually. + +### asyncWhere + +Like `where` but allows an asynchronous predicate. + +### audit + +Waits for a period of time after receiving a value and then only emits the most +recent value. + +### buffer + +Collects values from a source stream until a `trigger` stream fires and the +collected values are emitted. + +### combineLatest, combineLatestAll + +Combine the most recent event from multiple streams through a callback or into a +list. + +### debounce, debounceBuffer + +Prevents a source stream from emitting too frequently by dropping or collecting +values that occur within a given duration. + +### followedBy + +Appends the values of a stream after another stream finishes. + +### merge, mergeAll, concurrentAsyncExpand + +Interleaves events from multiple streams into a single stream. + +### scan + +Scan is like fold, but instead of producing a single value it yields each +intermediate accumulation. + +### startWith, startWithMany, startWithStream + +Prepend a value, an iterable, or a stream to the beginning of another stream. + +### switchMap, switchLatest + +Flatten a Stream of Streams into a Stream which forwards values from the most +recent Stream + +### takeUntil + +Let values through until a Future fires. + +### tap + +Taps into a single-subscriber stream to react to values as they pass, without +being a real subscriber. + +### throttle + +Blocks events for a duration after an event is successfully emitted. + +### whereType + +Like `Iterable.whereType` for a stream. + +## Comparison to Rx Operators + +The semantics and naming in this package have some overlap, and some conflict, +with the [ReactiveX](https://reactivex.io/) suite of libraries. Some of the +conflict is intentional - Dart `Stream` predates `Observable` and coherence with +the Dart ecosystem semantics and naming is a strictly higher priority than +consistency with ReactiveX. + +Rx Operator Category | variation | `stream_transform` +------------------------- | ------------------------------------------------------ | ------------------ +[`sample`][rx_sample] | `sample/throttleLast(Duration)` | `sample(Stream.periodic(Duration), longPoll: false)` +​ | `throttleFirst(Duration)` | [`throttle`][throttle] +​ | `sample(Observable)` | `sample(trigger, longPoll: false)` +[`debounce`][rx_debounce] | `debounce/throttleWithTimeout(Duration)` | [`debounce`][debounce] +​ | `debounce(Observable)` | No equivalent +[`buffer`][rx_buffer] | `buffer(boundary)`, `bufferWithTime`,`bufferWithCount` | No equivalent +​ | `buffer(boundaryClosingSelector)` | `buffer(trigger, longPoll: false)` +RxJs extensions | [`audit(callback)`][rxjs_audit] | No equivalent +​ | [`auditTime(Duration)`][rxjs_auditTime] | [`audit`][audit] +​ | [`exhaustMap`][rxjs_exhaustMap] | No equivalent +​ | [`throttleTime(trailing: true)`][rxjs_throttleTime] | `throttle(trailing: true)` +​ | `throttleTime(leading: false, trailing: true)` | No equivalent +No equivalent? | | [`asyncMapBuffer`][asyncMapBuffer] +​ | | [`asyncMapSample`][asyncMapSample] +​ | | [`buffer`][buffer] +​ | | [`sample`][sample] +​ | | [`debounceBuffer`][debounceBuffer] +​ | | `debounce(leading: true, trailing: false)` +​ | | `debounce(leading: true, trailing: true)` + +[rx_sample]:https://reactivex.io/documentation/operators/sample.html +[rx_debounce]:https://reactivex.io/documentation/operators/debounce.html +[rx_buffer]:https://reactivex.io/documentation/operators/buffer.html +[rxjs_audit]:https://rxjs.dev/api/operators/audit +[rxjs_auditTime]:https://rxjs.dev/api/operators/auditTime +[rxjs_throttleTime]:https://rxjs.dev/api/operators/throttleTime +[rxjs_exhaustMap]:https://rxjs.dev/api/operators/exhaustMap +[asyncMapBuffer]:https://pub.dev/documentation/stream_transform/latest/stream_transform/AsyncMap/asyncMapBuffer.html +[asyncMapSample]:https://pub.dev/documentation/stream_transform/latest/stream_transform/AsyncMap/asyncMapSample.html +[audit]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/audit.html +[buffer]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/buffer.html +[sample]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/sample.html +[debounceBuffer]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/debounceBuffer.html +[debounce]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/debounce.html +[throttle]:https://pub.dev/documentation/stream_transform/latest/stream_transform/RateLimit/throttle.html + +## Getting a `StreamTransformer` instance + +It may be useful to pass an instance of `StreamTransformer` so that it can be +used with `stream.transform` calls rather than reference the specific operator +in place. Any operator on `Stream` that returns a `Stream` can be modeled as a +`StreamTransformer` using the [`fromBind` constructor][fromBind]. + +```dart +final debounce = StreamTransformer.fromBind( + (s) => s.debounce(const Duration(milliseconds: 100))); +``` + +[fromBind]: https://api.dart.dev/stable/dart-async/StreamTransformer/StreamTransformer.fromBind.html
diff --git a/pkgs/stream_transform/analysis_options.yaml b/pkgs/stream_transform/analysis_options.yaml new file mode 100644 index 0000000..05f1af1 --- /dev/null +++ b/pkgs/stream_transform/analysis_options.yaml
@@ -0,0 +1,16 @@ +include: package:dart_flutter_team_lints/analysis_options.yaml + +analyzer: + language: + strict-casts: true + strict-raw-types: true + +linter: + rules: + - avoid_bool_literals_in_conditional_expressions + - avoid_classes_with_only_static_members + - avoid_returning_this + - avoid_unused_constructor_parameters + - cascade_invocations + - join_return_with_assignment + - no_adjacent_strings_in_list
diff --git a/pkgs/stream_transform/example/index.html b/pkgs/stream_transform/example/index.html new file mode 100644 index 0000000..aecdc09 --- /dev/null +++ b/pkgs/stream_transform/example/index.html
@@ -0,0 +1,11 @@ +<html> + <head> + <script defer src="main.dart.js" type="application/javascript"></script> + </head> + <body> + <input id="first_input"><br> + <input id="second_input"><br> + <p id="output"> + </p> + </body> +</html>
diff --git a/pkgs/stream_transform/example/main.dart b/pkgs/stream_transform/example/main.dart new file mode 100644 index 0000000..70b3e7f --- /dev/null +++ b/pkgs/stream_transform/example/main.dart
@@ -0,0 +1,26 @@ +// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:html'; + +import 'package:stream_transform/stream_transform.dart'; + +void main() { + var firstInput = document.querySelector('#first_input') as InputElement; + var secondInput = document.querySelector('#second_input') as InputElement; + var output = document.querySelector('#output')!; + + _inputValues(firstInput) + .combineLatest(_inputValues(secondInput), + (first, second) => 'First: $first, Second: $second') + .tap((v) { + print('Saw: $v'); + }).forEach((v) { + output.text = v; + }); +} + +Stream<String?> _inputValues(InputElement element) => element.onKeyUp + .debounce(const Duration(milliseconds: 100)) + .map((_) => element.value);
diff --git a/pkgs/stream_transform/lib/src/aggregate_sample.dart b/pkgs/stream_transform/lib/src/aggregate_sample.dart new file mode 100644 index 0000000..f2ff8ed --- /dev/null +++ b/pkgs/stream_transform/lib/src/aggregate_sample.dart
@@ -0,0 +1,146 @@ +// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'common_callbacks.dart'; + +extension AggregateSample<T> on Stream<T> { + /// Computes a value based on sequences of events, then emits that value when + /// [trigger] emits an event. + /// + /// Every time this stream emits an event, an intermediate value is created + /// by combining the new event with the previous intermediate value, or with + /// `null` if there is no previous value, using the [aggregate] function. + /// + /// When [trigger] emits value, the returned stream emits the current + /// intermediate value and clears it. + /// + /// If [longPoll] is `false`, if there is no intermediate value when [trigger] + /// emits an event, the [onEmpty] function is called with a [Sink] which can + /// add events to the returned stream. + /// + /// If [longPoll] is `true`, and there is no intermediate value when [trigger] + /// emits one or more events, then the *next* event from this stream is + /// immediately put through [aggregate] and emitted on the returned stream. + /// Subsequent events on [trigger] while there have been no events on this + /// stream are ignored. + /// In that case, [onEmpty] is never used. + /// + /// The result stream will close as soon as there is a guarantee it will not + /// emit any more events. There will not be any more events emitted if: + /// - [trigger] is closed and there is no waiting long poll. + /// - Or, the source stream is closed and there are no buffered events. + /// + /// If the source stream is a broadcast stream, the result will be as well. + /// Errors from the source stream or the trigger are immediately forwarded to + /// the output. + Stream<S> aggregateSample<S>( + {required Stream<void> trigger, + required S Function(T, S?) aggregate, + required bool longPoll, + required void Function(Sink<S>) onEmpty}) { + var controller = isBroadcast + ? StreamController<S>.broadcast(sync: true) + : StreamController<S>(sync: true); + + S? currentResults; + var hasCurrentResults = false; + var activeLongPoll = false; + var isTriggerDone = false; + var isValueDone = false; + StreamSubscription<T>? valueSub; + StreamSubscription<void>? triggerSub; + + void emit(S results) { + currentResults = null; + hasCurrentResults = false; + controller.add(results); + } + + void onValue(T value) { + currentResults = aggregate(value, currentResults); + hasCurrentResults = true; + if (!longPoll) return; + + if (activeLongPoll) { + activeLongPoll = false; + emit(currentResults as S); + } + + if (isTriggerDone) { + valueSub!.cancel(); + controller.close(); + } + } + + void onValuesDone() { + isValueDone = true; + if (!hasCurrentResults) { + triggerSub?.cancel(); + controller.close(); + } + } + + void onTrigger(_) { + if (hasCurrentResults) { + emit(currentResults as S); + } else if (longPoll) { + activeLongPoll = true; + } else { + onEmpty(controller); + } + + if (isValueDone) { + triggerSub!.cancel(); + controller.close(); + } + } + + void onTriggerDone() { + isTriggerDone = true; + if (!activeLongPoll) { + valueSub?.cancel(); + controller.close(); + } + } + + controller.onListen = () { + assert(valueSub == null); + valueSub = + listen(onValue, onError: controller.addError, onDone: onValuesDone); + final priorTriggerSub = triggerSub; + if (priorTriggerSub != null) { + if (priorTriggerSub.isPaused) priorTriggerSub.resume(); + } else { + triggerSub = trigger.listen(onTrigger, + onError: controller.addError, onDone: onTriggerDone); + } + if (!isBroadcast) { + controller + ..onPause = () { + valueSub?.pause(); + triggerSub?.pause(); + } + ..onResume = () { + valueSub?.resume(); + triggerSub?.resume(); + }; + } + controller.onCancel = () { + var cancels = <Future<void>>[if (!isValueDone) valueSub!.cancel()]; + valueSub = null; + if (trigger.isBroadcast || !isBroadcast) { + if (!isTriggerDone) cancels.add(triggerSub!.cancel()); + triggerSub = null; + } else { + triggerSub!.pause(); + } + if (cancels.isEmpty) return null; + return cancels.wait.then(ignoreArgument); + }; + }; + return controller.stream; + } +}
diff --git a/pkgs/stream_transform/lib/src/async_expand.dart b/pkgs/stream_transform/lib/src/async_expand.dart new file mode 100644 index 0000000..28d2f40 --- /dev/null +++ b/pkgs/stream_transform/lib/src/async_expand.dart
@@ -0,0 +1,89 @@ +// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'common_callbacks.dart'; +import 'switch.dart'; + +/// Alternatives to [asyncExpand]. +/// +/// The built in [asyncExpand] will not overlap the inner streams and every +/// event will be sent to the callback individually. +/// +/// - [concurrentAsyncExpand] allow overlap and merges inner streams without +/// ordering guarantees. +extension AsyncExpand<T> on Stream<T> { + /// Like [asyncExpand] but the [convert] callback may be called for an element + /// before the [Stream] emitted by the previous element has closed. + /// + /// Events on the result stream will be emitted in the order they are emitted + /// by the sub streams, which may not match the order of this stream. + /// + /// Errors from [convert], the source stream, or any of the sub streams are + /// forwarded to the result stream. + /// + /// The result stream will not close until the source stream closes and all + /// sub streams have closed. + /// + /// If the source stream is a broadcast stream, the result will be as well, + /// regardless of the types of streams created by [convert]. In this case, + /// some care should be taken: + /// - If [convert] returns a single subscription stream it may be listened to + /// and never canceled. + /// - For any period of time where there are no listeners on the result + /// stream, any sub streams from previously emitted events will be ignored, + /// regardless of whether they emit further events after a listener is added + /// back. + /// + /// See also: + /// - [switchMap], which cancels subscriptions to the previous sub stream + /// instead of concurrently emitting events from all sub streams. + Stream<S> concurrentAsyncExpand<S>(Stream<S> Function(T) convert) { + final controller = isBroadcast + ? StreamController<S>.broadcast(sync: true) + : StreamController<S>(sync: true); + + controller.onListen = () { + final subscriptions = <StreamSubscription<dynamic>>[]; + final outerSubscription = map(convert).listen((inner) { + if (isBroadcast && !inner.isBroadcast) { + inner = inner.asBroadcastStream(); + } + final subscription = + inner.listen(controller.add, onError: controller.addError); + subscription.onDone(() { + subscriptions.remove(subscription); + if (subscriptions.isEmpty) controller.close(); + }); + subscriptions.add(subscription); + }, onError: controller.addError); + outerSubscription.onDone(() { + subscriptions.remove(outerSubscription); + if (subscriptions.isEmpty) controller.close(); + }); + subscriptions.add(outerSubscription); + if (!isBroadcast) { + controller + ..onPause = () { + for (final subscription in subscriptions) { + subscription.pause(); + } + } + ..onResume = () { + for (final subscription in subscriptions) { + subscription.resume(); + } + }; + } + controller.onCancel = () { + if (subscriptions.isEmpty) return null; + return [for (var s in subscriptions) s.cancel()] + .wait + .then(ignoreArgument); + }; + }; + return controller.stream; + } +}
diff --git a/pkgs/stream_transform/lib/src/async_map.dart b/pkgs/stream_transform/lib/src/async_map.dart new file mode 100644 index 0000000..094df9c --- /dev/null +++ b/pkgs/stream_transform/lib/src/async_map.dart
@@ -0,0 +1,136 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'aggregate_sample.dart'; +import 'common_callbacks.dart'; +import 'from_handlers.dart'; +import 'rate_limit.dart'; + +/// Alternatives to [asyncMap]. +/// +/// The built in [asyncMap] will not overlap execution of the passed callback, +/// and every event will be sent to the callback individually. +/// +/// - [asyncMapBuffer] prevents the callback from overlapping execution and +/// collects events while it is executing to process in batches. +/// - [asyncMapSample] prevents overlapping execution and discards events while +/// it is executing. +/// - [concurrentAsyncMap] allows overlap and removes ordering guarantees. +extension AsyncMap<T> on Stream<T> { + /// Like [asyncMap] but events are buffered until previous events have been + /// processed by [convert]. + /// + /// If this stream is a broadcast stream the result will be as well. + /// When used with a broadcast stream behavior also differs from [asyncMap] in + /// that the [convert] function is only called once per event, rather than + /// once per listener per event. + /// + /// The first event from this stream is always passed to [convert] as a + /// list with a single element. + /// After that, events are buffered until the previous Future returned from + /// [convert] has completed. + /// + /// Errors from this stream are forwarded directly to the result stream. + /// Errors during the conversion are also forwarded to the result stream and + /// are considered completing work so the next values are let through. + /// + /// The result stream will not close until this stream closes and all pending + /// conversions have finished. + Stream<S> asyncMapBuffer<S>(Future<S> Function(List<T>) convert) { + var workFinished = StreamController<void>() + // Let the first event through. + ..add(null); + return buffer(workFinished.stream)._asyncMapThen(convert, workFinished.add); + } + + /// Like [asyncMap] but events are discarded while work is happening in + /// [convert]. + /// + /// If this stream is a broadcast stream the result will be as well. + /// When used with a broadcast stream behavior also differs from [asyncMap] in + /// that the [convert] function is only called once per event, rather than + /// once per listener per event. + /// + /// If no work is happening when an event is emitted it will be immediately + /// passed to [convert]. If there is ongoing work when an event is emitted it + /// will be held until the work is finished. New events emitted will replace a + /// pending event. + /// + /// Errors from this stream are forwarded directly to the result stream. + /// Errors during the conversion are also forwarded to the result stream and + /// are considered completing work so the next values are let through. + /// + /// The result stream will not close until this stream closes and all pending + /// conversions have finished. + Stream<S> asyncMapSample<S>(Future<S> Function(T) convert) { + var workFinished = StreamController<void>() + // Let the first event through. + ..add(null); + return aggregateSample( + trigger: workFinished.stream, + aggregate: _dropPrevious, + longPoll: true, + onEmpty: ignoreArgument) + ._asyncMapThen(convert, workFinished.add); + } + + /// Like [asyncMap] but the [convert] callback may be called for an element + /// before processing for the previous element is finished. + /// + /// Events on the result stream will be emitted in the order that [convert] + /// completed which may not match the order of this stream. + /// + /// If this stream is a broadcast stream the result will be as well. + /// When used with a broadcast stream behavior also differs from [asyncMap] in + /// that the [convert] function is only called once per event, rather than + /// once per listener per event. The [convert] callback won't be called for + /// events while a broadcast stream has no listener. + /// + /// Errors from [convert] or this stream are forwarded directly to the + /// result stream. + /// + /// The result stream will not close until this stream closes and all pending + /// conversions have finished. + Stream<S> concurrentAsyncMap<S>(FutureOr<S> Function(T) convert) { + var valuesWaiting = 0; + var sourceDone = false; + return transformByHandlers(onData: (element, sink) { + valuesWaiting++; + () async { + try { + sink.add(await convert(element)); + } catch (e, st) { + sink.addError(e, st); + } + valuesWaiting--; + if (valuesWaiting <= 0 && sourceDone) sink.close(); + }(); + }, onDone: (sink) { + sourceDone = true; + if (valuesWaiting <= 0) sink.close(); + }); + } + + /// Like [Stream.asyncMap] but the [convert] is only called once per event, + /// rather than once per listener, and [then] is called after completing the + /// work. + Stream<S> _asyncMapThen<S>( + Future<S> Function(T) convert, void Function(void) then) { + Future<void>? pendingEvent; + return transformByHandlers(onData: (event, sink) { + pendingEvent = + convert(event).then(sink.add).catchError(sink.addError).then(then); + }, onDone: (sink) { + if (pendingEvent != null) { + pendingEvent!.then((_) => sink.close()); + } else { + sink.close(); + } + }); + } +} + +T _dropPrevious<T>(T event, _) => event;
diff --git a/pkgs/stream_transform/lib/src/combine_latest.dart b/pkgs/stream_transform/lib/src/combine_latest.dart new file mode 100644 index 0000000..f02a19e --- /dev/null +++ b/pkgs/stream_transform/lib/src/combine_latest.dart
@@ -0,0 +1,240 @@ +// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'common_callbacks.dart'; + +/// Utilities to combine events from multiple streams through a callback or into +/// a list. +extension CombineLatest<T> on Stream<T> { + /// Combines the latest values from this stream with the latest values from + /// [other] using [combine]. + /// + /// No event will be emitted until both the source stream and [other] have + /// each emitted at least one event. If either the source stream or [other] + /// emit multiple events before the other emits the first event, all but the + /// last value will be discarded. Once both streams have emitted at least + /// once, the result stream will emit any time either input stream emits. + /// + /// The result stream will not close until both the source stream and [other] + /// have closed. + /// + /// For example: + /// + /// source.combineLatest(other, (a, b) => a + b); + /// + /// source: --1--2--------4--| + /// other: -------3--| + /// result: -------5------7--| + /// + /// Errors thrown by [combine], along with any errors on the source stream or + /// [other], are forwarded to the result stream. + /// + /// If the source stream is a broadcast stream, the result stream will be as + /// well, regardless of [other]'s type. If a single subscription stream is + /// combined with a broadcast stream it may never be canceled. + Stream<S> combineLatest<T2, S>( + Stream<T2> other, FutureOr<S> Function(T, T2) combine) { + final controller = isBroadcast + ? StreamController<S>.broadcast(sync: true) + : StreamController<S>(sync: true); + + other = + (isBroadcast && !other.isBroadcast) ? other.asBroadcastStream() : other; + + StreamSubscription<T>? sourceSubscription; + StreamSubscription<T2>? otherSubscription; + + var sourceDone = false; + var otherDone = false; + + late T latestSource; + late T2 latestOther; + + var sourceStarted = false; + var otherStarted = false; + + void emitCombined() { + if (!sourceStarted || !otherStarted) return; + FutureOr<S> result; + try { + result = combine(latestSource, latestOther); + } catch (e, s) { + controller.addError(e, s); + return; + } + if (result is Future<S>) { + sourceSubscription!.pause(); + otherSubscription!.pause(); + result + .then(controller.add, onError: controller.addError) + .whenComplete(() { + sourceSubscription!.resume(); + otherSubscription!.resume(); + }); + } else { + controller.add(result); + } + } + + controller.onListen = () { + assert(sourceSubscription == null); + sourceSubscription = listen( + (s) { + sourceStarted = true; + latestSource = s; + emitCombined(); + }, + onError: controller.addError, + onDone: () { + sourceDone = true; + if (otherDone) { + controller.close(); + } else if (!sourceStarted) { + // Nothing can ever be emitted + otherSubscription!.cancel(); + controller.close(); + } + }); + otherSubscription = other.listen( + (o) { + otherStarted = true; + latestOther = o; + emitCombined(); + }, + onError: controller.addError, + onDone: () { + otherDone = true; + if (sourceDone) { + controller.close(); + } else if (!otherStarted) { + // Nothing can ever be emitted + sourceSubscription!.cancel(); + controller.close(); + } + }); + if (!isBroadcast) { + controller + ..onPause = () { + sourceSubscription!.pause(); + otherSubscription!.pause(); + } + ..onResume = () { + sourceSubscription!.resume(); + otherSubscription!.resume(); + }; + } + controller.onCancel = () { + var cancels = [ + sourceSubscription!.cancel(), + otherSubscription!.cancel() + ]; + sourceSubscription = null; + otherSubscription = null; + return cancels.wait.then(ignoreArgument); + }; + }; + return controller.stream; + } + + /// Combine the latest value emitted from the source stream with the latest + /// values emitted from [others]. + /// + /// [combineLatestAll] subscribes to the source stream and [others] and when + /// any one of the streams emits, the result stream will emit a [List<T>] of + /// the latest values emitted from all streams. + /// + /// No event will be emitted until all source streams emit at least once. If a + /// source stream emits multiple values before another starts emitting, all + /// but the last value will be discarded. Once all source streams have emitted + /// at least once, the result stream will emit any time any source stream + /// emits. + /// + /// The result stream will not close until all source streams have closed. + /// When a source stream closes, the result stream will continue to emit the + /// last value from the closed stream when the other source streams emit until + /// the result stream has closed. If a source stream closes without emitting + /// any value, the result stream will close as well. + /// + /// For example: + /// + /// final combined = first + /// .combineLatestAll([second, third]) + /// .map((data) => data.join()); + /// + /// first: a----b------------------c--------d---| + /// second: --1---------2-----------------| + /// third: -------&----------%---| + /// combined: -------b1&--b2&---b2%---c2%------d2%-| + /// + /// Errors thrown by any source stream will be forwarded to the result stream. + /// + /// If the source stream is a broadcast stream, the result stream will be as + /// well, regardless of the types of [others]. If a single subscription stream + /// is combined with a broadcast source stream, it may never be canceled. + Stream<List<T>> combineLatestAll(Iterable<Stream<T>> others) { + final controller = isBroadcast + ? StreamController<List<T>>.broadcast(sync: true) + : StreamController<List<T>>(sync: true); + + final allStreams = [ + this, + for (final other in others) + !isBroadcast || other.isBroadcast ? other : other.asBroadcastStream(), + ]; + + controller.onListen = () { + final subscriptions = <StreamSubscription<T>>[]; + + final latestData = List<T?>.filled(allStreams.length, null); + final hasEmitted = <int>{}; + void handleData(int index, T data) { + latestData[index] = data; + hasEmitted.add(index); + if (hasEmitted.length == allStreams.length) { + controller.add(List.from(latestData)); + } + } + + var streamId = 0; + for (final stream in allStreams) { + final index = streamId; + + final subscription = stream.listen((data) => handleData(index, data), + onError: controller.addError); + subscription.onDone(() { + assert(subscriptions.contains(subscription)); + subscriptions.remove(subscription); + if (subscriptions.isEmpty || !hasEmitted.contains(index)) { + controller.close(); + } + }); + subscriptions.add(subscription); + + streamId++; + } + if (!isBroadcast) { + controller + ..onPause = () { + for (final subscription in subscriptions) { + subscription.pause(); + } + } + ..onResume = () { + for (final subscription in subscriptions) { + subscription.resume(); + } + }; + } + controller.onCancel = () { + if (subscriptions.isEmpty) return null; + return [for (var s in subscriptions) s.cancel()] + .wait + .then(ignoreArgument); + }; + }; + return controller.stream; + } +}
diff --git a/pkgs/stream_transform/lib/src/common_callbacks.dart b/pkgs/stream_transform/lib/src/common_callbacks.dart new file mode 100644 index 0000000..c239220 --- /dev/null +++ b/pkgs/stream_transform/lib/src/common_callbacks.dart
@@ -0,0 +1,5 @@ +// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +void ignoreArgument(_) {}
diff --git a/pkgs/stream_transform/lib/src/concatenate.dart b/pkgs/stream_transform/lib/src/concatenate.dart new file mode 100644 index 0000000..0330dd7 --- /dev/null +++ b/pkgs/stream_transform/lib/src/concatenate.dart
@@ -0,0 +1,112 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +/// Utilities to append or prepend to a stream. +extension Concatenate<T> on Stream<T> { + /// Emits all values and errors from [next] following all values and errors + /// from this stream. + /// + /// If this stream never finishes, the [next] stream will never get a + /// listener. + /// + /// If this stream is a broadcast stream, the result will be as well. + /// If a single-subscription follows a broadcast stream it may be listened + /// to and never canceled since there may be broadcast listeners added later. + /// + /// If a broadcast stream follows any other stream it will miss any events or + /// errors which occur before this stream is done. + /// If a broadcast stream follows a single-subscription stream, pausing the + /// stream while it is listening to the second stream will cause events to be + /// dropped rather than buffered. + Stream<T> followedBy(Stream<T> next) { + var controller = isBroadcast + ? StreamController<T>.broadcast(sync: true) + : StreamController<T>(sync: true); + + next = isBroadcast && !next.isBroadcast ? next.asBroadcastStream() : next; + + StreamSubscription<T>? subscription; + var currentStream = this; + var thisDone = false; + var secondDone = false; + + late void Function() currentDoneHandler; + + void listen() { + subscription = currentStream.listen(controller.add, + onError: controller.addError, onDone: () => currentDoneHandler()); + } + + void onSecondDone() { + secondDone = true; + controller.close(); + } + + void onThisDone() { + thisDone = true; + currentStream = next; + currentDoneHandler = onSecondDone; + listen(); + } + + currentDoneHandler = onThisDone; + + controller.onListen = () { + assert(subscription == null); + listen(); + if (!isBroadcast) { + controller + ..onPause = () { + if (!thisDone || !next.isBroadcast) return subscription!.pause(); + subscription!.cancel(); + subscription = null; + } + ..onResume = () { + if (!thisDone || !next.isBroadcast) return subscription!.resume(); + listen(); + }; + } + controller.onCancel = () { + if (secondDone) return null; + var toCancel = subscription!; + subscription = null; + return toCancel.cancel(); + }; + }; + return controller.stream; + } + + /// Emits [initial] before any values or errors from the this stream. + /// + /// If this stream is a broadcast stream the result will be as well. + /// If this stream is a broadcast stream, the returned stream will only + /// contain events of this stream that are emitted after the [initial] value + /// has been emitted on the returned stream. + Stream<T> startWith(T initial) => + startWithStream(Future.value(initial).asStream()); + + /// Emits all values in [initial] before any values or errors from this + /// stream. + /// + /// If this stream is a broadcast stream the result will be as well. + /// If this stream is a broadcast stream it will miss any events which + /// occur before the initial values are all emitted. + Stream<T> startWithMany(Iterable<T> initial) => + startWithStream(Stream.fromIterable(initial)); + + /// Emits all values and errors in [initial] before any values or errors from + /// this stream. + /// + /// If this stream is a broadcast stream the result will be as well. + /// If this stream is a broadcast stream it will miss any events which occur + /// before [initial] closes. + Stream<T> startWithStream(Stream<T> initial) { + if (isBroadcast && !initial.isBroadcast) { + initial = initial.asBroadcastStream(); + } + return initial.followedBy(this); + } +}
diff --git a/pkgs/stream_transform/lib/src/from_handlers.dart b/pkgs/stream_transform/lib/src/from_handlers.dart new file mode 100644 index 0000000..1146a13 --- /dev/null +++ b/pkgs/stream_transform/lib/src/from_handlers.dart
@@ -0,0 +1,58 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +extension TransformByHandlers<S> on Stream<S> { + /// Transform a stream by callbacks. + /// + /// This is similar to `transform(StreamTransformer.fromHandler(...))` except + /// that the handlers are called once per event rather than called for the + /// same event for each listener on a broadcast stream. + Stream<T> transformByHandlers<T>( + {required void Function(S, EventSink<T>) onData, + void Function(Object, StackTrace, EventSink<T>)? onError, + void Function(EventSink<T>)? onDone}) { + final handleError = onError ?? _defaultHandleError; + final handleDone = onDone ?? _defaultHandleDone; + + var controller = isBroadcast + ? StreamController<T>.broadcast(sync: true) + : StreamController<T>(sync: true); + + StreamSubscription<S>? subscription; + controller.onListen = () { + assert(subscription == null); + var valuesDone = false; + subscription = listen((value) => onData(value, controller), + onError: (Object error, StackTrace stackTrace) { + handleError(error, stackTrace, controller); + }, onDone: () { + valuesDone = true; + handleDone(controller); + }); + if (!isBroadcast) { + controller + ..onPause = subscription!.pause + ..onResume = subscription!.resume; + } + controller.onCancel = () { + var toCancel = subscription; + subscription = null; + if (!valuesDone) return toCancel!.cancel(); + return null; + }; + }; + return controller.stream; + } + + static void _defaultHandleError<T>( + Object error, StackTrace stackTrace, EventSink<T> sink) { + sink.addError(error, stackTrace); + } + + static void _defaultHandleDone<T>(EventSink<T> sink) { + sink.close(); + } +}
diff --git a/pkgs/stream_transform/lib/src/merge.dart b/pkgs/stream_transform/lib/src/merge.dart new file mode 100644 index 0000000..3bfe06c --- /dev/null +++ b/pkgs/stream_transform/lib/src/merge.dart
@@ -0,0 +1,102 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'common_callbacks.dart'; + +/// Utilities to interleave events from multiple streams. +extension Merge<T> on Stream<T> { + /// Merges values and errors from this stream and [other] in any order as they + /// arrive. + /// + /// The result stream will not close until both this stream and [other] have + /// closed. + /// + /// For example: + /// + /// final result = source.merge(other); + /// + /// source: 1--2-----3--| + /// other: ------4-------5--| + /// result: 1--2--4--3----5--| + /// + /// If this stream is a broadcast stream, the result stream will be as + /// well, regardless of [other]'s type. If a single subscription stream is + /// merged into a broadcast stream it may never be canceled since there may be + /// broadcast listeners added later. + /// + /// If a broadcast stream is merged into a single-subscription stream any + /// events emitted by [other] before the result stream has a subscriber will + /// be discarded. + Stream<T> merge(Stream<T> other) => mergeAll([other]); + + /// Merges values and errors from this stream and any stream in [others] in + /// any order as they arrive. + /// + /// The result stream will not close until this stream and all streams + /// in [others] have closed. + /// + /// For example: + /// + /// final result = first.mergeAll([second, third]); + /// + /// first: 1--2--------3--| + /// second: ---------4-------5--| + /// third: ------6---------------7--| + /// result: 1--2--6--4--3----5----7--| + /// + /// If this stream is a broadcast stream, the result stream will be as + /// well, regardless the types of streams in [others]. If a single + /// subscription stream is merged into a broadcast stream it may never be + /// canceled since there may be broadcast listeners added later. + /// + /// If a broadcast stream is merged into a single-subscription stream any + /// events emitted by that stream before the result stream has a subscriber + /// will be discarded. + Stream<T> mergeAll(Iterable<Stream<T>> others) { + final controller = isBroadcast + ? StreamController<T>.broadcast(sync: true) + : StreamController<T>(sync: true); + + final allStreams = [ + this, + for (final other in others) + !isBroadcast || other.isBroadcast ? other : other.asBroadcastStream(), + ]; + + controller.onListen = () { + final subscriptions = <StreamSubscription<T>>[]; + for (final stream in allStreams) { + final subscription = + stream.listen(controller.add, onError: controller.addError); + subscription.onDone(() { + subscriptions.remove(subscription); + if (subscriptions.isEmpty) controller.close(); + }); + subscriptions.add(subscription); + } + if (!isBroadcast) { + controller + ..onPause = () { + for (final subscription in subscriptions) { + subscription.pause(); + } + } + ..onResume = () { + for (final subscription in subscriptions) { + subscription.resume(); + } + }; + } + controller.onCancel = () { + if (subscriptions.isEmpty) return null; + return [for (var s in subscriptions) s.cancel()] + .wait + .then(ignoreArgument); + }; + }; + return controller.stream; + } +}
diff --git a/pkgs/stream_transform/lib/src/rate_limit.dart b/pkgs/stream_transform/lib/src/rate_limit.dart new file mode 100644 index 0000000..299c230 --- /dev/null +++ b/pkgs/stream_transform/lib/src/rate_limit.dart
@@ -0,0 +1,356 @@ +// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'aggregate_sample.dart'; +import 'common_callbacks.dart'; +import 'from_handlers.dart'; + +/// Utilities to rate limit events. +/// +/// - [debounce] - emit the the _first_ or _last_ event of a series of closely +/// spaced events. +/// - [debounceBuffer] - emit _all_ events at the _end_ of a series of closely +/// spaced events. +/// - [throttle] - emit the _first_ event at the _beginning_ of the period. +/// - [audit] - emit the _last_ event at the _end_ of the period. +/// - [buffer] - emit _all_ events on a _trigger_. +extension RateLimit<T> on Stream<T> { + /// Suppresses events with less inter-event spacing than [duration]. + /// + /// Events which are emitted with less than [duration] elapsed between them + /// are considered to be part of the same "series". If [leading] is `true`, + /// the first event of this series is emitted immediately. If [trailing] is + /// `true` the last event of this series is emitted with a delay of at least + /// [duration]. By default only trailing events are emitted, both arguments + /// must be specified with `leading: true, trailing: false` to emit only + /// leading events. + /// + /// If this stream is a broadcast stream, the result will be as well. + /// Errors are forwarded immediately. + /// + /// If there is a trailing event waiting during the debounce period when the + /// source stream closes the returned stream will wait to emit it following + /// the debounce period before closing. If there is no pending debounced event + /// when this stream closes the returned stream will close immediately. + /// + /// For example: + /// + /// source.debounce(Duration(seconds: 1)); + /// + /// source: 1-2-3---4---5-6-| + /// result: ------3---4-----6| + /// + /// source.debounce(Duration(seconds: 1), leading: true, trailing: false); + /// + /// source: 1-2-3---4---5-6-| + /// result: 1-------4---5---| + /// + /// source.debounce(Duration(seconds: 1), leading: true); + /// + /// source: 1-2-3---4---5-6-| + /// result: 1-----3-4---5---6| + /// + /// To collect values emitted during the debounce period see [debounceBuffer]. + Stream<T> debounce(Duration duration, + {bool leading = false, bool trailing = true}) => + _debounceAggregate(duration, _dropPrevious, + leading: leading, trailing: trailing); + + /// Buffers values until this stream does not emit for [duration] then emits + /// the collected values. + /// + /// Values will always be delayed by at least [duration], and values which + /// come within this time will be aggregated into the same list. + /// + /// If this stream is a broadcast stream, the result will be as well. + /// Errors are forwarded immediately. + /// + /// If there are events waiting during the debounce period when this stream + /// closes the returned stream will wait to emit them following the debounce + /// period before closing. If there are no pending debounced events when this + /// stream closes the returned stream will close immediately. + /// + /// To keep only the most recent event during the debounce period see + /// [debounce]. + Stream<List<T>> debounceBuffer(Duration duration) => + _debounceAggregate(duration, _collect, leading: false, trailing: true); + + /// Reduces the rate that events are emitted to at most once per [duration]. + /// + /// No events will ever be emitted within [duration] of another event on the + /// result stream. + /// If this stream is a broadcast stream, the result will be as well. + /// Errors are forwarded immediately. + /// + /// If [trailing] is `false`, source events emitted during the [duration] + /// period following a result event are discarded. + /// The result stream will not emit an event until this stream emits an event + /// following the throttled period. + /// If this stream is consistently emitting events with less than + /// [duration] between events, the time between events on the result stream + /// may still be more than [duration]. + /// The result stream will close immediately when this stream closes. + /// + /// If [trailing] is `true`, the latest source event emitted during the + /// [duration] period following an result event is held and emitted following + /// the period. + /// If this stream is consistently emitting events with less than [duration] + /// between events, the time between events on the result stream will be + /// [duration]. + /// If this stream closes the result stream will wait to emit a pending event + /// before closing. + /// + /// For example: + /// + /// source.throttle(Duration(seconds: 6)); + /// + /// source: 1-2-3---4-5-6---7-8-| + /// result: 1-------4-------7---| + /// + /// source.throttle(Duration(seconds: 6), trailing: true); + /// + /// source: 1-2-3---4-5----6--| + /// result: 1-----3-----5-----6| + /// + /// source.throttle(Duration(seconds: 6), trailing: true); + /// + /// source: 1-2-----------3| + /// result: 1-----2-------3| + /// + /// See also: + /// - [audit], which emits the most recent event at the end of the period. + /// Compared to `audit`, `throttle` will not introduce delay to forwarded + /// elements, except for the [trailing] events. + /// - [debounce], which uses inter-event spacing instead of a fixed period + /// from the first event in a window. Compared to `debouce`, `throttle` cannot + /// be starved by having events emitted continuously within [duration]. + Stream<T> throttle(Duration duration, {bool trailing = false}) => + trailing ? _throttleTrailing(duration) : _throttle(duration); + + Stream<T> _throttle(Duration duration) { + Timer? timer; + + return transformByHandlers(onData: (data, sink) { + if (timer == null) { + sink.add(data); + timer = Timer(duration, () { + timer = null; + }); + } + }); + } + + Stream<T> _throttleTrailing(Duration duration) { + Timer? timer; + T? pending; + var hasPending = false; + var isDone = false; + + return transformByHandlers(onData: (data, sink) { + void onTimer() { + if (hasPending) { + sink.add(pending as T); + if (isDone) { + sink.close(); + } else { + timer = Timer(duration, onTimer); + hasPending = false; + pending = null; + } + } else { + timer = null; + } + } + + if (timer == null) { + sink.add(data); + timer = Timer(duration, onTimer); + } else { + hasPending = true; + pending = data; + } + }, onDone: (sink) { + isDone = true; + if (hasPending) return; // Will be closed by timer. + sink.close(); + timer?.cancel(); + timer = null; + }); + } + + /// Audit a single event from each [duration] length period where there are + /// events on this stream. + /// + /// No events will ever be emitted within [duration] of another event on the + /// result stream. + /// If this stream is a broadcast stream, the result will be as well. + /// Errors are forwarded immediately. + /// + /// The first event will begin the audit period. At the end of the audit + /// period the most recent event is emitted, and the next event restarts the + /// audit period. + /// + /// If the event that started the period is the one that is emitted it will be + /// delayed by [duration]. If a later event comes in within the period it's + /// delay will be shorter by the difference in arrival times. + /// + /// If there is no pending event when this stream closes the output + /// stream will close immediately. If there is a pending event the output + /// stream will wait to emit it before closing. + /// + /// For example: + /// + /// source.audit(Duration(seconds: 5)); + /// + /// source: a------b--c----d--| + /// output: -----a------c--------d| + /// + /// See also: + /// - [throttle], which emits the _first_ event during the window, instead of + /// the last event in the window. Compared to `throttle`, `audit` will + /// introduce delay to forwarded events. + /// - [debounce], which only emits after the stream has not emitted for some + /// period. Compared to `debouce`, `audit` cannot be starved by having events + /// emitted continuously within [duration]. + Stream<T> audit(Duration duration) { + Timer? timer; + var shouldClose = false; + T recentData; + + return transformByHandlers(onData: (data, sink) { + recentData = data; + timer ??= Timer(duration, () { + sink.add(recentData); + timer = null; + if (shouldClose) { + sink.close(); + } + }); + }, onDone: (sink) { + if (timer != null) { + shouldClose = true; + } else { + sink.close(); + } + }); + } + + /// Buffers the values emitted on this stream and emits them when [trigger] + /// emits an event. + /// + /// If [longPoll] is `false`, if there are no buffered values when [trigger] + /// emits an empty list is immediately emitted. + /// + /// If [longPoll] is `true`, and there are no buffered values when [trigger] + /// emits one or more events, then the *next* value from this stream is + /// immediately emitted on the returned stream as a single element list. + /// Subsequent events on [trigger] while there have been no events on this + /// stream are ignored. + /// + /// The result stream will close as soon as there is a guarantee it will not + /// emit any more events. There will not be any more events emitted if: + /// - [trigger] is closed and there is no waiting long poll. + /// - Or, this stream is closed and previously buffered events have been + /// delivered. + /// + /// If this stream is a broadcast stream, the result will be as well. + /// Errors from this stream or the trigger are immediately forwarded to the + /// output. + /// + /// See also: + /// - [sample] which use a [trigger] stream in the same way, but keeps only + /// the most recent source event. + Stream<List<T>> buffer(Stream<void> trigger, {bool longPoll = true}) => + aggregateSample( + trigger: trigger, + aggregate: _collect, + longPoll: longPoll, + onEmpty: _empty); + + /// Emits the most recent new value from this stream when [trigger] emits an + /// event. + /// + /// If [longPoll] is `false`, then an event on [trigger] when there is no + /// pending source event will be ignored. + /// If [longPoll] is `true` (the default), then an event on [trigger] when + /// there is no pending source event will cause the next source event + /// to immediately flow to the result stream. + /// + /// If [longPoll] is `false`, if there is no pending source event when + /// [trigger] emits, then the trigger event will be ignored. + /// + /// If [longPoll] is `true`, and there are no buffered values when [trigger] + /// emits one or more events, then the *next* value from this stream is + /// immediately emitted on the returned stream as a single element list. + /// Subsequent events on [trigger] while there have been no events on this + /// stream are ignored. + /// + /// The result stream will close as soon as there is a guarantee it will not + /// emit any more events. There will not be any more events emitted if: + /// - [trigger] is closed and there is no waiting long poll. + /// - Or, this source stream is closed and any pending source event has been + /// delivered. + /// + /// If this source stream is a broadcast stream, the result will be as well. + /// Errors from this source stream or the trigger are immediately forwarded to + /// the output. + /// + /// See also: + /// - [buffer] which use [trigger] stream in the same way, but keeps a list of + /// pending source events. + Stream<T> sample(Stream<void> trigger, {bool longPoll = true}) => + aggregateSample( + trigger: trigger, + aggregate: _dropPrevious, + longPoll: longPoll, + onEmpty: ignoreArgument); + + /// Aggregates values until this source stream does not emit for [duration], + /// then emits the aggregated values. + Stream<S> _debounceAggregate<S>( + Duration duration, S Function(T element, S? soFar) collect, + {required bool leading, required bool trailing}) { + Timer? timer; + S? soFar; + var hasPending = false; + var shouldClose = false; + var emittedLatestAsLeading = false; + + return transformByHandlers(onData: (value, sink) { + void emit() { + sink.add(soFar as S); + soFar = null; + hasPending = false; + } + + timer?.cancel(); + soFar = collect(value, soFar); + hasPending = true; + if (timer == null && leading) { + emittedLatestAsLeading = true; + emit(); + } else { + emittedLatestAsLeading = false; + } + timer = Timer(duration, () { + if (trailing && !emittedLatestAsLeading) emit(); + if (shouldClose) sink.close(); + timer = null; + }); + }, onDone: (EventSink<S> sink) { + if (hasPending && trailing) { + shouldClose = true; + } else { + timer?.cancel(); + sink.close(); + } + }); + } +} + +T _dropPrevious<T>(T element, _) => element; +List<T> _collect<T>(T event, List<T>? soFar) => (soFar ?? <T>[])..add(event); +void _empty<T>(Sink<List<T>> sink) => sink.add([]);
diff --git a/pkgs/stream_transform/lib/src/scan.dart b/pkgs/stream_transform/lib/src/scan.dart new file mode 100644 index 0000000..acd3c76 --- /dev/null +++ b/pkgs/stream_transform/lib/src/scan.dart
@@ -0,0 +1,31 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +/// A utility similar to [fold] which emits intermediate accumulations. +extension Scan<T> on Stream<T> { + /// Emits a sequence of the accumulated values from repeatedly applying + /// [combine]. + /// + /// Like [fold], but instead of producing a single value it yields each + /// intermediate result. + /// + /// If [combine] returns a future it will not be called again for subsequent + /// events from the source until it completes, therefore [combine] is always + /// called for elements in order, and the result stream always maintains the + /// same order as this stream. + Stream<S> scan<S>( + S initialValue, FutureOr<S> Function(S soFar, T element) combine) { + var accumulated = initialValue; + return asyncMap((value) { + var result = combine(accumulated, value); + if (result is Future<S>) { + return result.then((r) => accumulated = r); + } else { + return accumulated = result; + } + }); + } +}
diff --git a/pkgs/stream_transform/lib/src/switch.dart b/pkgs/stream_transform/lib/src/switch.dart new file mode 100644 index 0000000..546036e --- /dev/null +++ b/pkgs/stream_transform/lib/src/switch.dart
@@ -0,0 +1,135 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'async_expand.dart'; +import 'common_callbacks.dart'; + +/// A utility to take events from the most recent sub stream returned by a +/// callback. +extension Switch<T> on Stream<T> { + /// Maps events to a Stream and emits values from the most recently created + /// Stream. + /// + /// When the source emits a value it will be converted to a [Stream] using + /// [convert] and the output will switch to emitting events from that result. + /// Like [asyncExpand] but the [Stream] emitted by a previous element + /// will be ignored as soon as the source stream emits a new event. + /// + /// This means that the source stream is not paused until a sub stream + /// returned from the [convert] callback is done. Instead, the subscription + /// to the sub stream is canceled as soon as the source stream emits a new + /// event. + /// + /// Errors from [convert], the source stream, or any of the sub streams are + /// forwarded to the result stream. + /// + /// The result stream will not close until the source stream closes and + /// the current sub stream have closed. + /// + /// If the source stream is a broadcast stream, the result will be as well, + /// regardless of the types of streams created by [convert]. In this case, + /// some care should be taken: + /// + /// * If [convert] returns a single subscription stream it may be listened to + /// and never canceled. + /// + /// See also: + /// - [concurrentAsyncExpand], which emits events from all sub streams + /// concurrently instead of cancelling subscriptions to previous subs + /// streams. + Stream<S> switchMap<S>(Stream<S> Function(T) convert) { + return map(convert).switchLatest(); + } +} + +/// A utility to take events from the most recent sub stream. +extension SwitchLatest<T> on Stream<Stream<T>> { + /// Emits values from the most recently emitted Stream. + /// + /// When the source emits a stream, the output will switch to emitting events + /// from that stream. + /// + /// Whether the source stream is a single-subscription stream or a + /// broadcast stream, the result stream will be the same kind of stream, + /// regardless of the types of streams emitted. + Stream<T> switchLatest() { + var controller = isBroadcast + ? StreamController<T>.broadcast(sync: true) + : StreamController<T>(sync: true); + + controller.onListen = () { + StreamSubscription<T>? innerSubscription; + var outerStreamDone = false; + + void listenToInnerStream(Stream<T> innerStream) { + assert(innerSubscription == null); + var subscription = innerStream + .listen(controller.add, onError: controller.addError, onDone: () { + innerSubscription = null; + if (outerStreamDone) controller.close(); + }); + // If a pause happens during an innerSubscription.cancel, + // we still listen to the next stream when the cancel is done. + // Then we immediately pause it again here. + if (controller.isPaused) subscription.pause(); + innerSubscription = subscription; + } + + var addError = controller.addError; + final outerSubscription = listen(null, onError: addError, onDone: () { + outerStreamDone = true; + if (innerSubscription == null) controller.close(); + }); + outerSubscription.onData((innerStream) async { + var currentSubscription = innerSubscription; + if (currentSubscription == null) { + listenToInnerStream(innerStream); + return; + } + innerSubscription = null; + outerSubscription.pause(); + try { + await currentSubscription.cancel(); + } catch (error, stack) { + controller.addError(error, stack); + } finally { + if (!isBroadcast && !controller.hasListener) { + // Result single-subscription stream subscription was cancelled + // while waiting for previous innerStream cancel. + // + // Ensure that the last received stream is also listened to and + // cancelled, then do nothing further. + innerStream.listen(null).cancel().ignore(); + } else { + outerSubscription.resume(); + listenToInnerStream(innerStream); + } + } + }); + if (!isBroadcast) { + controller + ..onPause = () { + innerSubscription?.pause(); + outerSubscription.pause(); + } + ..onResume = () { + innerSubscription?.resume(); + outerSubscription.resume(); + }; + } + controller.onCancel = () { + var sub = innerSubscription; + var cancels = [ + if (!outerStreamDone) outerSubscription.cancel(), + if (sub != null) sub.cancel(), + ]; + if (cancels.isEmpty) return null; + return cancels.wait.then(ignoreArgument); + }; + }; + return controller.stream; + } +}
diff --git a/pkgs/stream_transform/lib/src/take_until.dart b/pkgs/stream_transform/lib/src/take_until.dart new file mode 100644 index 0000000..e6deaa1 --- /dev/null +++ b/pkgs/stream_transform/lib/src/take_until.dart
@@ -0,0 +1,64 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +/// A utility to end a stream based on an external trigger. +extension TakeUntil<T> on Stream<T> { + /// Takes values from this stream which are emitted before [trigger] + /// completes. + /// + /// Completing [trigger] differs from canceling a subscription in that values + /// which are emitted before the trigger, but have further asynchronous delays + /// in transformations following the takeUtil, will still go through. + /// Cancelling a subscription immediately stops values. + /// + /// If [trigger] completes as an error, the error will be forwarded through + /// the result stream before the result stream closes. + /// + /// If [trigger] completes as a value or as an error after this stream has + /// already ended, the completion will be ignored. + Stream<T> takeUntil(Future<void> trigger) { + var controller = isBroadcast + ? StreamController<T>.broadcast(sync: true) + : StreamController<T>(sync: true); + + StreamSubscription<T>? subscription; + var isDone = false; + trigger.then((_) { + if (isDone) return; + isDone = true; + subscription?.cancel(); + controller.close(); + }, onError: (Object error, StackTrace stackTrace) { + if (isDone) return; + isDone = true; + controller + ..addError(error, stackTrace) + ..close(); + }); + + controller.onListen = () { + if (isDone) return; + subscription = + listen(controller.add, onError: controller.addError, onDone: () { + if (isDone) return; + isDone = true; + controller.close(); + }); + if (!isBroadcast) { + controller + ..onPause = subscription!.pause + ..onResume = subscription!.resume; + } + controller.onCancel = () { + if (isDone) return null; + var toCancel = subscription!; + subscription = null; + return toCancel.cancel(); + }; + }; + return controller.stream; + } +}
diff --git a/pkgs/stream_transform/lib/src/tap.dart b/pkgs/stream_transform/lib/src/tap.dart new file mode 100644 index 0000000..4b16ab5 --- /dev/null +++ b/pkgs/stream_transform/lib/src/tap.dart
@@ -0,0 +1,44 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. +import 'from_handlers.dart'; + +/// A utility to chain extra behavior on a stream. +extension Tap<T> on Stream<T> { + /// Taps into this stream to allow additional handling on a single-subscriber + /// stream without first wrapping as a broadcast stream. + /// + /// The [onValue] callback will be called with every value from this stream + /// before it is forwarded to listeners on the resulting stream. + /// May be null if only [onError] or [onDone] callbacks are needed. + /// + /// The [onError] callback will be called with every error from this stream + /// before it is forwarded to listeners on the resulting stream. + /// + /// The [onDone] callback will be called after this stream closes and before + /// the resulting stream is closed. + /// + /// Errors from any of the callbacks are caught and ignored. + /// + /// The callbacks may not be called until the tapped stream has a listener, + /// and may not be called after the listener has canceled the subscription. + Stream<T> tap(void Function(T)? onValue, + {void Function(Object, StackTrace)? onError, + void Function()? onDone}) => + transformByHandlers(onData: (value, sink) { + try { + onValue?.call(value); + } catch (_) {/*Ignore*/} + sink.add(value); + }, onError: (error, stackTrace, sink) { + try { + onError?.call(error, stackTrace); + } catch (_) {/*Ignore*/} + sink.addError(error, stackTrace); + }, onDone: (sink) { + try { + onDone?.call(); + } catch (_) {/*Ignore*/} + sink.close(); + }); +}
diff --git a/pkgs/stream_transform/lib/src/where.dart b/pkgs/stream_transform/lib/src/where.dart new file mode 100644 index 0000000..76aa28a --- /dev/null +++ b/pkgs/stream_transform/lib/src/where.dart
@@ -0,0 +1,71 @@ +// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'from_handlers.dart'; + +/// Utilities to filter events. +extension Where<T> on Stream<T> { + /// Discards events from this stream that are not of type [S]. + /// + /// If the source stream is a broadcast stream the result will be as well. + /// + /// Errors from the source stream are forwarded directly to the result stream. + /// + /// [S] should be a subtype of the stream's generic type, otherwise nothing of + /// type [S] could possibly be emitted, however there is no static or runtime + /// checking that this is the case. + Stream<S> whereType<S>() => transformByHandlers(onData: (event, sink) { + if (event is S) sink.add(event); + }); + + /// Discards events from this stream based on an asynchronous [test] callback. + /// + /// Like [where] but allows the [test] to return a [Future]. + /// + /// Events on the result stream will be emitted in the order that [test] + /// completes which may not match the order of this stream. + /// + /// If the source stream is a broadcast stream the result will be as well. + /// When used with a broadcast stream behavior also differs from [where] in + /// that the [test] function is only called once per event, rather than once + /// per listener per event. + /// + /// Errors from the source stream are forwarded directly to the result stream. + /// Errors from [test] are also forwarded to the result stream. + /// + /// The result stream will not close until the source stream closes and all + /// pending [test] calls have finished. + Stream<T> asyncWhere(FutureOr<bool> Function(T) test) { + var valuesWaiting = 0; + var sourceDone = false; + return transformByHandlers(onData: (element, sink) { + valuesWaiting++; + () async { + try { + if (await test(element)) sink.add(element); + } catch (e, st) { + sink.addError(e, st); + } + valuesWaiting--; + if (valuesWaiting <= 0 && sourceDone) sink.close(); + }(); + }, onDone: (sink) { + sourceDone = true; + if (valuesWaiting <= 0) sink.close(); + }); + } +} + +extension WhereNotNull<T extends Object> on Stream<T?> { + /// Discards `null` events from this stream. + /// + /// If the source stream is a broadcast stream the result will be as well. + /// + /// Errors from the source stream are forwarded directly to the result stream. + Stream<T> whereNotNull() => transformByHandlers(onData: (event, sink) { + if (event != null) sink.add(event); + }); +}
diff --git a/pkgs/stream_transform/lib/stream_transform.dart b/pkgs/stream_transform/lib/stream_transform.dart new file mode 100644 index 0000000..edf4df9 --- /dev/null +++ b/pkgs/stream_transform/lib/stream_transform.dart
@@ -0,0 +1,15 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +export 'src/async_expand.dart'; +export 'src/async_map.dart'; +export 'src/combine_latest.dart'; +export 'src/concatenate.dart'; +export 'src/merge.dart'; +export 'src/rate_limit.dart'; +export 'src/scan.dart'; +export 'src/switch.dart'; +export 'src/take_until.dart'; +export 'src/tap.dart'; +export 'src/where.dart';
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml new file mode 100644 index 0000000..9cd6584 --- /dev/null +++ b/pkgs/stream_transform/pubspec.yaml
@@ -0,0 +1,13 @@ +name: stream_transform +version: 2.1.1-wip +description: A collection of utilities to transform and manipulate streams. +repository: https://github.com/dart-lang/stream_transform + +environment: + sdk: ^3.1.0 + +dev_dependencies: + async: ^2.5.0 + dart_flutter_team_lints: ^2.0.0 + fake_async: ^1.3.0 + test: ^1.16.0
diff --git a/pkgs/stream_transform/test/async_expand_test.dart b/pkgs/stream_transform/test/async_expand_test.dart new file mode 100644 index 0000000..8d84300 --- /dev/null +++ b/pkgs/stream_transform/test/async_expand_test.dart
@@ -0,0 +1,195 @@ +// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +import 'utils.dart'; + +void main() { + test('forwards errors from the convert callback', () async { + var errors = <String>[]; + var source = Stream.fromIterable([1, 2, 3]); + source.concurrentAsyncExpand<void>((i) { + // ignore: only_throw_errors + throw 'Error: $i'; + }).listen((_) {}, onError: errors.add); + await Future<void>(() {}); + expect(errors, ['Error: 1', 'Error: 2', 'Error: 3']); + }); + + for (var outerType in streamTypes) { + for (var innerType in streamTypes) { + group('concurrentAsyncExpand $outerType to $innerType', () { + late StreamController<int> outerController; + late bool outerCanceled; + late List<StreamController<String>> innerControllers; + late List<bool> innerCanceled; + late List<String> emittedValues; + late bool isDone; + late List<String> errors; + late Stream<String> transformed; + late StreamSubscription<String> subscription; + + setUp(() { + outerController = createController(outerType) + ..onCancel = () { + outerCanceled = true; + }; + outerCanceled = false; + innerControllers = []; + innerCanceled = []; + emittedValues = []; + errors = []; + isDone = false; + transformed = outerController.stream.concurrentAsyncExpand((i) { + var index = innerControllers.length; + innerCanceled.add(false); + innerControllers.add(createController<String>(innerType) + ..onCancel = () { + innerCanceled[index] = true; + }); + return innerControllers.last.stream; + }); + subscription = transformed + .listen(emittedValues.add, onError: errors.add, onDone: () { + isDone = true; + }); + }); + + test('interleaves events from sub streams', () async { + outerController + ..add(1) + ..add(2); + await Future<void>(() {}); + expect(emittedValues, isEmpty); + expect(innerControllers, hasLength(2)); + innerControllers[0].add('First'); + innerControllers[1].add('Second'); + innerControllers[0].add('First again'); + await Future<void>(() {}); + expect(emittedValues, ['First', 'Second', 'First again']); + }); + + test('forwards errors from outer stream', () async { + outerController.addError('Error'); + await Future<void>(() {}); + expect(errors, ['Error']); + }); + + test('forwards errors from inner streams', () async { + outerController + ..add(1) + ..add(2); + await Future<void>(() {}); + innerControllers[0].addError('Error 1'); + innerControllers[1].addError('Error 2'); + await Future<void>(() {}); + expect(errors, ['Error 1', 'Error 2']); + }); + + test('can continue handling events after an error in outer stream', + () async { + outerController + ..addError('Error') + ..add(1); + await Future<void>(() {}); + innerControllers[0].add('First'); + await Future<void>(() {}); + expect(emittedValues, ['First']); + expect(errors, ['Error']); + }); + + test('cancels outer subscription if output canceled', () async { + await subscription.cancel(); + expect(outerCanceled, true); + }); + + if (outerType != 'broadcast' || innerType != 'single subscription') { + // A single subscription inner stream in a broadcast outer stream is + // not canceled. + test('cancels inner subscriptions if output canceled', () async { + outerController + ..add(1) + ..add(2); + await Future<void>(() {}); + await subscription.cancel(); + expect(innerCanceled, [true, true]); + }); + } + + test('stays open if any inner stream is still open', () async { + outerController.add(1); + await outerController.close(); + await Future<void>(() {}); + expect(isDone, false); + }); + + test('stays open if outer stream is still open', () async { + outerController.add(1); + await Future<void>(() {}); + await innerControllers[0].close(); + await Future<void>(() {}); + expect(isDone, false); + }); + + test('closes after all inner streams and outer stream close', () async { + outerController.add(1); + await Future<void>(() {}); + await innerControllers[0].close(); + await outerController.close(); + await Future<void>(() {}); + expect(isDone, true); + }); + + if (outerType == 'broadcast') { + test('multiple listerns all get values', () async { + var otherValues = <String>[]; + transformed.listen(otherValues.add); + outerController.add(1); + await Future<void>(() {}); + innerControllers[0].add('First'); + await Future<void>(() {}); + expect(emittedValues, ['First']); + expect(otherValues, ['First']); + }); + + test('multiple listeners get closed', () async { + var otherDone = false; + transformed.listen(null, onDone: () => otherDone = true); + outerController.add(1); + await Future<void>(() {}); + await innerControllers[0].close(); + await outerController.close(); + await Future<void>(() {}); + expect(isDone, true); + expect(otherDone, true); + }); + + test('can cancel and relisten', () async { + outerController + ..add(1) + ..add(2); + await Future(() {}); + innerControllers[0].add('First'); + innerControllers[1].add('Second'); + await Future(() {}); + await subscription.cancel(); + innerControllers[0].add('Ignored'); + await Future(() {}); + subscription = transformed.listen(emittedValues.add); + innerControllers[0].add('Also ignored'); + outerController.add(3); + await Future(() {}); + innerControllers[2].add('More'); + await Future(() {}); + expect(emittedValues, ['First', 'Second', 'More']); + }); + } + }); + } + } +}
diff --git a/pkgs/stream_transform/test/async_map_buffer_test.dart b/pkgs/stream_transform/test/async_map_buffer_test.dart new file mode 100644 index 0000000..2386217 --- /dev/null +++ b/pkgs/stream_transform/test/async_map_buffer_test.dart
@@ -0,0 +1,204 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +import 'utils.dart'; + +void main() { + late StreamController<int> values; + late List<String> emittedValues; + late bool valuesCanceled; + late bool isDone; + late List<String> errors; + late Stream<String> transformed; + late StreamSubscription<String> subscription; + + Completer<String>? finishWork; + List<int>? workArgument; + + /// Represents the async `convert` function and asserts that is is only called + /// after the previous iteration has completed. + Future<String> work(List<int> values) { + expect(finishWork, isNull, + reason: 'See $values befor previous work is complete'); + workArgument = values; + finishWork = Completer() + ..future.then((_) { + workArgument = null; + finishWork = null; + }).catchError((_) { + workArgument = null; + finishWork = null; + }); + return finishWork!.future; + } + + for (var streamType in streamTypes) { + group('asyncMapBuffer for stream type: [$streamType]', () { + setUp(() { + valuesCanceled = false; + values = createController(streamType) + ..onCancel = () { + valuesCanceled = true; + }; + emittedValues = []; + errors = []; + isDone = false; + finishWork = null; + workArgument = null; + transformed = values.stream.asyncMapBuffer(work); + subscription = transformed + .listen(emittedValues.add, onError: errors.add, onDone: () { + isDone = true; + }); + }); + + test('does not emit before work finishes', () async { + values.add(1); + await Future(() {}); + expect(emittedValues, isEmpty); + expect(workArgument, [1]); + finishWork!.complete('result'); + await Future(() {}); + expect(emittedValues, ['result']); + }); + + test('buffers values while work is ongoing', () async { + values.add(1); + await Future(() {}); + values + ..add(2) + ..add(3); + await Future(() {}); + finishWork!.complete(''); + await Future(() {}); + expect(workArgument, [2, 3]); + }); + + test('forwards errors without waiting for work', () async { + values.add(1); + await Future(() {}); + values.addError('error'); + await Future(() {}); + expect(errors, ['error']); + }); + + test('forwards errors which occur during the work', () async { + values.add(1); + await Future(() {}); + finishWork!.completeError('error'); + await Future(() {}); + expect(errors, ['error']); + }); + + test('can continue handling events after an error', () async { + values.add(1); + await Future(() {}); + finishWork!.completeError('error'); + values.add(2); + await Future(() {}); + expect(workArgument, [2]); + finishWork!.completeError('another'); + await Future(() {}); + expect(errors, ['error', 'another']); + }); + + test('does not start next work early due to an error in values', + () async { + values.add(1); + await Future(() {}); + values + ..addError('error') + ..add(2); + await Future(() {}); + expect(errors, ['error']); + // [work] will assert that the second iteration is not called because + // the first has not completed. + }); + + test('cancels value subscription when output canceled', () async { + expect(valuesCanceled, false); + await subscription.cancel(); + expect(valuesCanceled, true); + }); + + test('closes when values end if no work is pending', () async { + expect(isDone, false); + await values.close(); + await Future(() {}); + expect(isDone, true); + }); + + test('waits for pending work when values close', () async { + values.add(1); + await Future(() {}); + expect(isDone, false); + values.add(2); + await values.close(); + expect(isDone, false); + finishWork!.complete(''); + await Future(() {}); + // Still a pending value + expect(isDone, false); + finishWork!.complete(''); + await Future(() {}); + expect(isDone, true); + }); + + test('forwards errors from values', () async { + values.addError('error'); + await Future(() {}); + expect(errors, ['error']); + }); + + if (streamType == 'broadcast') { + test('multiple listeners all get values', () async { + var otherValues = <String>[]; + transformed.listen(otherValues.add); + values.add(1); + await Future(() {}); + finishWork!.complete('result'); + await Future(() {}); + expect(emittedValues, ['result']); + expect(otherValues, ['result']); + }); + + test('multiple listeners get done when values end', () async { + var otherDone = false; + transformed.listen(null, onDone: () => otherDone = true); + values.add(1); + await Future(() {}); + await values.close(); + expect(isDone, false); + expect(otherDone, false); + finishWork!.complete(''); + await Future(() {}); + expect(isDone, true); + expect(otherDone, true); + }); + + test('can cancel and relisten', () async { + values.add(1); + await Future(() {}); + finishWork!.complete('first'); + await Future(() {}); + await subscription.cancel(); + values.add(2); + await Future(() {}); + subscription = transformed.listen(emittedValues.add); + values.add(3); + await Future(() {}); + expect(workArgument, [3]); + finishWork!.complete('second'); + await Future(() {}); + expect(emittedValues, ['first', 'second']); + }); + } + }); + } +}
diff --git a/pkgs/stream_transform/test/async_map_sample_test.dart b/pkgs/stream_transform/test/async_map_sample_test.dart new file mode 100644 index 0000000..62b1b92 --- /dev/null +++ b/pkgs/stream_transform/test/async_map_sample_test.dart
@@ -0,0 +1,209 @@ +// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +import 'utils.dart'; + +void main() { + late StreamController<int> values; + late List<String> emittedValues; + late bool valuesCanceled; + late bool isDone; + late List<String> errors; + late Stream<String> transformed; + late StreamSubscription<String> subscription; + + Completer<String>? finishWork; + int? workArgument; + + /// Represents the async `convert` function and asserts that is is only called + /// after the previous iteration has completed. + Future<String> work(int value) { + expect(finishWork, isNull, + reason: 'See $values befor previous work is complete'); + workArgument = value; + finishWork = Completer() + ..future.then((_) { + workArgument = null; + finishWork = null; + }).catchError((_) { + workArgument = null; + finishWork = null; + }); + return finishWork!.future; + } + + for (var streamType in streamTypes) { + group('asyncMapSample for stream type: [$streamType]', () { + setUp(() { + valuesCanceled = false; + values = createController(streamType) + ..onCancel = () { + valuesCanceled = true; + }; + emittedValues = []; + errors = []; + isDone = false; + finishWork = null; + workArgument = null; + transformed = values.stream.asyncMapSample(work); + subscription = transformed + .listen(emittedValues.add, onError: errors.add, onDone: () { + isDone = true; + }); + }); + + test('does not emit before work finishes', () async { + values.add(1); + await Future(() {}); + expect(emittedValues, isEmpty); + expect(workArgument, 1); + finishWork!.complete('result'); + await Future(() {}); + expect(emittedValues, ['result']); + }); + + test('buffers values while work is ongoing', () async { + values.add(1); + await Future(() {}); + values + ..add(2) + ..add(3); + await Future(() {}); + finishWork!.complete(''); + await Future(() {}); + expect(workArgument, 3); + }); + + test('forwards errors without waiting for work', () async { + values.add(1); + await Future(() {}); + values.addError('error'); + await Future(() {}); + expect(errors, ['error']); + }); + + test('forwards errors which occur during the work', () async { + values.add(1); + await Future(() {}); + finishWork!.completeError('error'); + await Future(() {}); + expect(errors, ['error']); + }); + + test('can continue handling events after an error', () async { + values.add(1); + await Future(() {}); + finishWork!.completeError('error'); + values.add(2); + await Future(() {}); + expect(workArgument, 2); + finishWork!.completeError('another'); + await Future(() {}); + expect(errors, ['error', 'another']); + }); + + test('does not start next work early due to an error in values', + () async { + values.add(1); + await Future(() {}); + values + ..addError('error') + ..add(2); + await Future(() {}); + expect(errors, ['error']); + // [work] will assert that the second iteration is not called because + // the first has not completed. + }); + + test('cancels value subscription when output canceled', () async { + expect(valuesCanceled, false); + await subscription.cancel(); + expect(valuesCanceled, true); + }); + + test('closes when values end if no work is pending', () async { + expect(isDone, false); + await values.close(); + await Future(() {}); + expect(isDone, true); + }); + + test('waits for pending work when values close', () async { + values.add(1); + await Future(() {}); + expect(isDone, false); + values.add(2); + await values.close(); + expect(isDone, false); + finishWork!.complete(''); + await Future(() {}); + // Still a pending value + expect(isDone, false); + finishWork!.complete(''); + await Future(() {}); + expect(isDone, true); + }); + + test('forwards errors from values', () async { + values.addError('error'); + await Future(() {}); + expect(errors, ['error']); + }); + + if (streamType == 'broadcast') { + test('multiple listeners all get values', () async { + var otherValues = <String>[]; + transformed.listen(otherValues.add); + values.add(1); + await Future(() {}); + finishWork!.complete('result'); + await Future(() {}); + expect(emittedValues, ['result']); + expect(otherValues, ['result']); + }); + + test('multiple listeners get done when values end', () async { + var otherDone = false; + transformed.listen(null, onDone: () => otherDone = true); + values.add(1); + await Future(() {}); + await values.close(); + expect(isDone, false); + expect(otherDone, false); + finishWork!.complete(''); + await Future(() {}); + expect(isDone, true); + expect(otherDone, true); + }); + + test('can cancel and relisten', () async { + values.add(1); + await Future(() {}); + finishWork!.complete('first'); + await Future(() {}); + await subscription.cancel(); + values.add(2); + await Future(() {}); + subscription = transformed.listen(emittedValues.add); + values.add(3); + await Future(() {}); + expect(workArgument, 3); + finishWork!.complete('second'); + await Future(() {}); + expect(emittedValues, ['first', 'second']); + }); + } + }); + } + + test('allows nulls', () async { + var stream = Stream<int?>.value(null); + await stream.asyncMapSample(expectAsync1((_) async {})).drain<void>(); + }); +}
diff --git a/pkgs/stream_transform/test/async_where_test.dart b/pkgs/stream_transform/test/async_where_test.dart new file mode 100644 index 0000000..6ea4e76 --- /dev/null +++ b/pkgs/stream_transform/test/async_where_test.dart
@@ -0,0 +1,90 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. +import 'dart:async'; + +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +void main() { + test('forwards only events that pass the predicate', () async { + var values = Stream.fromIterable([1, 2, 3, 4]); + var filtered = values.asyncWhere((e) async => e > 2); + expect(await filtered.toList(), [3, 4]); + }); + + test('allows predicates that go through event loop', () async { + var values = Stream.fromIterable([1, 2, 3, 4]); + var filtered = values.asyncWhere((e) async { + await Future(() {}); + return e > 2; + }); + expect(await filtered.toList(), [3, 4]); + }); + + test('allows synchronous predicate', () async { + var values = Stream.fromIterable([1, 2, 3, 4]); + var filtered = values.asyncWhere((e) => e > 2); + expect(await filtered.toList(), [3, 4]); + }); + + test('can result in empty stream', () async { + var values = Stream.fromIterable([1, 2, 3, 4]); + var filtered = values.asyncWhere((e) => e > 4); + expect(await filtered.isEmpty, true); + }); + + test('forwards values to multiple listeners', () async { + var values = StreamController<int>.broadcast(); + var filtered = values.stream.asyncWhere((e) async => e > 2); + var firstValues = <int>[]; + var secondValues = <int>[]; + filtered + ..listen(firstValues.add) + ..listen(secondValues.add); + values + ..add(1) + ..add(2) + ..add(3) + ..add(4); + await Future(() {}); + expect(firstValues, [3, 4]); + expect(secondValues, [3, 4]); + }); + + test('closes streams with multiple listeners', () async { + var values = StreamController<int>.broadcast(); + var predicate = Completer<bool>(); + var filtered = values.stream.asyncWhere((_) => predicate.future); + var firstDone = false; + var secondDone = false; + filtered + ..listen(null, onDone: () => firstDone = true) + ..listen(null, onDone: () => secondDone = true); + values.add(1); + await values.close(); + expect(firstDone, false); + expect(secondDone, false); + + predicate.complete(true); + await Future(() {}); + expect(firstDone, true); + expect(secondDone, true); + }); + + test('forwards errors emitted by the test callback', () async { + var errors = <Object>[]; + var emitted = <Object>[]; + var values = Stream.fromIterable([1, 2, 3, 4]); + var filtered = values.asyncWhere((e) async { + await Future(() {}); + if (e.isEven) throw Exception('$e'); + return true; + }); + var done = Completer<Object?>(); + filtered.listen(emitted.add, onError: errors.add, onDone: done.complete); + await done.future; + expect(emitted, [1, 3]); + expect(errors.map((e) => '$e'), ['Exception: 2', 'Exception: 4']); + }); +}
diff --git a/pkgs/stream_transform/test/audit_test.dart b/pkgs/stream_transform/test/audit_test.dart new file mode 100644 index 0000000..28537db --- /dev/null +++ b/pkgs/stream_transform/test/audit_test.dart
@@ -0,0 +1,140 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:fake_async/fake_async.dart'; +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +import 'utils.dart'; + +void main() { + for (var streamType in streamTypes) { + group('Stream type [$streamType]', () { + late StreamController<int> values; + late List<int> emittedValues; + late bool valuesCanceled; + late bool isDone; + late List<String> errors; + late Stream<int> transformed; + late StreamSubscription<int> subscription; + + group('audit', () { + setUp(() { + valuesCanceled = false; + values = createController(streamType) + ..onCancel = () { + valuesCanceled = true; + }; + emittedValues = []; + errors = []; + isDone = false; + transformed = values.stream.audit(const Duration(milliseconds: 6)); + }); + + void listen() { + subscription = transformed + .listen(emittedValues.add, onError: errors.add, onDone: () { + isDone = true; + }); + } + + test('cancels values', () async { + listen(); + await subscription.cancel(); + expect(valuesCanceled, true); + }); + + test('swallows values that come faster than duration', () { + fakeAsync((async) { + listen(); + values + ..add(1) + ..add(2) + ..close(); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [2]); + }); + }); + + test('outputs multiple values spaced further than duration', () { + fakeAsync((async) { + listen(); + values.add(1); + async.elapse(const Duration(milliseconds: 6)); + values.add(2); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [1, 2]); + }); + }); + + test('waits for pending value to close', () { + fakeAsync((async) { + listen(); + values + ..add(1) + ..close(); + expect(isDone, false); + async.elapse(const Duration(milliseconds: 6)); + expect(isDone, true); + }); + }); + + test('closes output if there are no pending values', () { + fakeAsync((async) { + listen(); + values.add(1); + async.elapse(const Duration(milliseconds: 6)); + values + ..add(2) + ..close(); + expect(isDone, false); + expect(emittedValues, [1]); + async.elapse(const Duration(milliseconds: 6)); + expect(isDone, true); + expect(emittedValues, [1, 2]); + }); + }); + + test('does not starve output if many values come closer than duration', + () { + fakeAsync((async) { + listen(); + values.add(1); + async.elapse(const Duration(milliseconds: 3)); + values.add(2); + async.elapse(const Duration(milliseconds: 3)); + values.add(3); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [2, 3]); + }); + }); + + if (streamType == 'broadcast') { + test('multiple listeners all get the values', () { + fakeAsync((async) { + listen(); + values.add(1); + async.elapse(const Duration(milliseconds: 3)); + values.add(2); + var otherValues = <int>[]; + transformed.listen(otherValues.add); + values.add(3); + async.elapse(const Duration(milliseconds: 3)); + values.add(4); + async.elapse(const Duration(milliseconds: 3)); + values + ..add(5) + ..close(); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [3, 5]); + expect(otherValues, [3, 5]); + }); + }); + } + }); + }); + } +}
diff --git a/pkgs/stream_transform/test/buffer_test.dart b/pkgs/stream_transform/test/buffer_test.dart new file mode 100644 index 0000000..830f555 --- /dev/null +++ b/pkgs/stream_transform/test/buffer_test.dart
@@ -0,0 +1,305 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. +import 'dart:async'; + +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +import 'utils.dart'; + +void main() { + late StreamController<void> trigger; + late StreamController<int> values; + late List<List<int>> emittedValues; + late bool valuesCanceled; + late bool triggerCanceled; + late bool triggerPaused; + late bool isDone; + late List<String> errors; + late Stream<List<int>> transformed; + late StreamSubscription<List<int>> subscription; + + void setUpForStreamTypes(String triggerType, String valuesType, + {required bool longPoll}) { + valuesCanceled = false; + triggerCanceled = false; + triggerPaused = false; + trigger = createController(triggerType) + ..onCancel = () { + triggerCanceled = true; + }; + if (triggerType == 'single subscription') { + trigger.onPause = () { + triggerPaused = true; + }; + } + values = createController(valuesType) + ..onCancel = () { + valuesCanceled = true; + }; + emittedValues = []; + errors = []; + isDone = false; + transformed = values.stream.buffer(trigger.stream, longPoll: longPoll); + subscription = + transformed.listen(emittedValues.add, onError: errors.add, onDone: () { + isDone = true; + }); + } + + for (var triggerType in streamTypes) { + for (var valuesType in streamTypes) { + group('Trigger type: [$triggerType], Values type: [$valuesType]', () { + group('general behavior', () { + setUp(() { + setUpForStreamTypes(triggerType, valuesType, longPoll: true); + }); + + test('does not emit before `trigger`', () async { + values.add(1); + await Future(() {}); + expect(emittedValues, isEmpty); + trigger.add(null); + await Future(() {}); + expect(emittedValues, [ + [1] + ]); + }); + + test('groups values between trigger', () async { + values + ..add(1) + ..add(2); + await Future(() {}); + trigger.add(null); + values + ..add(3) + ..add(4); + await Future(() {}); + trigger.add(null); + await Future(() {}); + expect(emittedValues, [ + [1, 2], + [3, 4] + ]); + }); + + test('cancels value subscription when output canceled', () async { + expect(valuesCanceled, false); + await subscription.cancel(); + expect(valuesCanceled, true); + }); + + test('closes when trigger ends', () async { + expect(isDone, false); + await trigger.close(); + await Future(() {}); + expect(isDone, true); + }); + + test('closes after outputting final values when source closes', + () async { + expect(isDone, false); + values.add(1); + await values.close(); + expect(isDone, false); + trigger.add(null); + await Future(() {}); + expect(emittedValues, [ + [1] + ]); + expect(isDone, true); + }); + + test('closes when source closes and there are no buffered', () async { + expect(isDone, false); + await values.close(); + await Future(() {}); + expect(isDone, true); + }); + + test('forwards errors from trigger', () async { + trigger.addError('error'); + await Future(() {}); + expect(errors, ['error']); + }); + + test('forwards errors from values', () async { + values.addError('error'); + await Future(() {}); + expect(errors, ['error']); + }); + }); + + group('long polling', () { + setUp(() { + setUpForStreamTypes(triggerType, valuesType, longPoll: true); + }); + + test('emits immediately if trigger emits before a value', () async { + trigger.add(null); + await Future(() {}); + expect(emittedValues, isEmpty); + values.add(1); + await Future(() {}); + expect(emittedValues, [ + [1] + ]); + }); + + test('two triggers in a row - emit buffere then emit next value', + () async { + values + ..add(1) + ..add(2); + await Future(() {}); + trigger + ..add(null) + ..add(null); + await Future(() {}); + values.add(3); + await Future(() {}); + expect(emittedValues, [ + [1, 2], + [3] + ]); + }); + + test('pre-emptive trigger then trigger after values', () async { + trigger.add(null); + await Future(() {}); + values + ..add(1) + ..add(2); + await Future(() {}); + trigger.add(null); + await Future(() {}); + expect(emittedValues, [ + [1], + [2] + ]); + }); + + test('multiple pre-emptive triggers, only emits first value', + () async { + trigger + ..add(null) + ..add(null); + await Future(() {}); + values + ..add(1) + ..add(2); + await Future(() {}); + expect(emittedValues, [ + [1] + ]); + }); + + test('closes if there is no waiting long poll when source closes', + () async { + expect(isDone, false); + values.add(1); + trigger.add(null); + await values.close(); + await Future(() {}); + expect(isDone, true); + }); + + test('waits to emit if there waiting long poll when trigger closes', + () async { + trigger.add(null); + await trigger.close(); + expect(isDone, false); + values.add(1); + await Future(() {}); + expect(emittedValues, [ + [1] + ]); + expect(isDone, true); + }); + }); + + group('immediate polling', () { + setUp(() { + setUpForStreamTypes(triggerType, valuesType, longPoll: false); + }); + + test('emits empty list before values', () async { + trigger.add(null); + await Future(() {}); + expect(emittedValues, [<int>[]]); + }); + + test('emits empty list after emitting values', () async { + values + ..add(1) + ..add(2); + await Future(() {}); + trigger + ..add(null) + ..add(null); + await Future(() {}); + expect(emittedValues, [ + [1, 2], + <int>[] + ]); + }); + }); + }); + } + } + + test('always cancels trigger if values is singlesubscription', () async { + setUpForStreamTypes('broadcast', 'single subscription', longPoll: true); + expect(triggerCanceled, false); + await subscription.cancel(); + expect(triggerCanceled, true); + + setUpForStreamTypes('single subscription', 'single subscription', + longPoll: true); + expect(triggerCanceled, false); + await subscription.cancel(); + expect(triggerCanceled, true); + }); + + test('cancels trigger if trigger is broadcast', () async { + setUpForStreamTypes('broadcast', 'broadcast', longPoll: true); + expect(triggerCanceled, false); + await subscription.cancel(); + expect(triggerCanceled, true); + }); + + test('pauses single subscription trigger for broadcast values', () async { + setUpForStreamTypes('single subscription', 'broadcast', longPoll: true); + expect(triggerCanceled, false); + expect(triggerPaused, false); + await subscription.cancel(); + expect(triggerCanceled, false); + expect(triggerPaused, true); + }); + + for (var triggerType in streamTypes) { + test('cancel and relisten with [$triggerType] trigger', () async { + setUpForStreamTypes(triggerType, 'broadcast', longPoll: true); + values.add(1); + trigger.add(null); + await Future(() {}); + expect(emittedValues, [ + [1] + ]); + await subscription.cancel(); + values.add(2); + trigger.add(null); + await Future(() {}); + subscription = transformed.listen(emittedValues.add); + values.add(3); + trigger.add(null); + await Future(() {}); + expect(emittedValues, [ + [1], + [3] + ]); + }); + } +}
diff --git a/pkgs/stream_transform/test/combine_latest_all_test.dart b/pkgs/stream_transform/test/combine_latest_all_test.dart new file mode 100644 index 0000000..f4b719c --- /dev/null +++ b/pkgs/stream_transform/test/combine_latest_all_test.dart
@@ -0,0 +1,166 @@ +// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +Future<void> tick() => Future(() {}); + +void main() { + group('combineLatestAll', () { + test('emits latest values', () async { + final first = StreamController<String>(); + final second = StreamController<String>(); + final third = StreamController<String>(); + final combined = first.stream.combineLatestAll( + [second.stream, third.stream]).map((data) => data.join()); + + // first: a----b------------------c--------d---| + // second: --1---------2-----------------| + // third: -------&----------%---| + // combined: -------b1&--b2&---b2%---c2%------d2%-| + + expect(combined, + emitsInOrder(['b1&', 'b2&', 'b2%', 'c2%', 'd2%', emitsDone])); + + first.add('a'); + await tick(); + second.add('1'); + await tick(); + first.add('b'); + await tick(); + third.add('&'); + await tick(); + second.add('2'); + await tick(); + third.add('%'); + await tick(); + await third.close(); + await tick(); + first.add('c'); + await tick(); + await second.close(); + await tick(); + first.add('d'); + await tick(); + await first.close(); + }); + + test('ends if a Stream closes without ever emitting a value', () async { + final first = StreamController<String>(); + final second = StreamController<String>(); + final combined = first.stream.combineLatestAll([second.stream]); + + // first: -a------b-------| + // second: -----| + // combined: -----| + + expect(combined, emits(emitsDone)); + + first.add('a'); + await tick(); + await second.close(); + await tick(); + first.add('b'); + }); + + test('forwards errors', () async { + final first = StreamController<String>(); + final second = StreamController<String>(); + final combined = first.stream + .combineLatestAll([second.stream]).map((data) => data.join()); + + // first: -a---------| + // second: ----1---# + // combined: ----a1--# + + expect(combined, emitsThrough(emitsError('doh'))); + + first.add('a'); + await tick(); + second.add('1'); + await tick(); + second.addError('doh'); + }); + + test('ends after both streams have ended', () async { + final first = StreamController<String>(); + final second = StreamController<String>(); + + var done = false; + first.stream.combineLatestAll([second.stream]).listen(null, + onDone: () => done = true); + + // first: -a---| + // second: --------1--| + // combined: --------a1-| + + first.add('a'); + await tick(); + await first.close(); + await tick(); + + expect(done, isFalse); + + second.add('1'); + await tick(); + await second.close(); + await tick(); + + expect(done, isTrue); + }); + + group('broadcast source', () { + test('can cancel and relisten to broadcast stream', () async { + final first = StreamController<String>.broadcast(); + final second = StreamController<String>.broadcast(); + final combined = first.stream + .combineLatestAll([second.stream]).map((data) => data.join()); + + // first: a------b----------------c------d----e---| + // second: --1---------2---3---4------5-| + // combined: --a1---b1---b2--b3--b4-----c5--d5---e5--| + // sub1: ^-----------------! + // sub2: ----------------------^-----------------| + + expect(combined.take(4), emitsInOrder(['a1', 'b1', 'b2', 'b3'])); + + first.add('a'); + await tick(); + second.add('1'); + await tick(); + first.add('b'); + await tick(); + second.add('2'); + await tick(); + second.add('3'); + await tick(); + + // First subscription is canceled here by .take(4) + expect(first.hasListener, isFalse); + expect(second.hasListener, isFalse); + + // This emit is thrown away because there are no subscribers + second.add('4'); + await tick(); + + expect(combined, emitsInOrder(['c5', 'd5', 'e5', emitsDone])); + + first.add('c'); + await tick(); + second.add('5'); + await tick(); + await second.close(); + await tick(); + first.add('d'); + await tick(); + first.add('e'); + await tick(); + await first.close(); + }); + }); + }); +}
diff --git a/pkgs/stream_transform/test/combine_latest_test.dart b/pkgs/stream_transform/test/combine_latest_test.dart new file mode 100644 index 0000000..1985c75 --- /dev/null +++ b/pkgs/stream_transform/test/combine_latest_test.dart
@@ -0,0 +1,179 @@ +// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +void main() { + group('combineLatest', () { + test('flows through combine callback', () async { + var source = StreamController<int>(); + var other = StreamController<int>(); + int sum(int a, int b) => a + b; + + var results = <int>[]; + unawaited( + source.stream.combineLatest(other.stream, sum).forEach(results.add)); + + source.add(1); + await Future(() {}); + expect(results, isEmpty); + + other.add(2); + await Future(() {}); + expect(results, [3]); + + source.add(3); + await Future(() {}); + expect(results, [3, 5]); + + source.add(4); + await Future(() {}); + expect(results, [3, 5, 6]); + + other.add(5); + await Future(() {}); + expect(results, [3, 5, 6, 9]); + }); + + test('can combine different typed streams', () async { + var source = StreamController<String>(); + var other = StreamController<int>(); + String times(String a, int b) => a * b; + + var results = <String>[]; + unawaited(source.stream + .combineLatest(other.stream, times) + .forEach(results.add)); + + source + ..add('a') + ..add('b'); + await Future(() {}); + expect(results, isEmpty); + + other.add(2); + await Future(() {}); + expect(results, ['bb']); + + other.add(3); + await Future(() {}); + expect(results, ['bb', 'bbb']); + + source.add('c'); + await Future(() {}); + expect(results, ['bb', 'bbb', 'ccc']); + }); + + test('ends after both streams have ended', () async { + var source = StreamController<int>(); + var other = StreamController<int>(); + int sum(int a, int b) => a + b; + + var done = false; + source.stream + .combineLatest(other.stream, sum) + .listen(null, onDone: () => done = true); + + source.add(1); + + await source.close(); + await Future(() {}); + expect(done, false); + + await other.close(); + await Future(() {}); + expect(done, true); + }); + + test('ends if source stream closes without ever emitting a value', + () async { + var source = const Stream<int>.empty(); + var other = StreamController<int>(); + + int sum(int a, int b) => a + b; + + var done = false; + source + .combineLatest(other.stream, sum) + .listen(null, onDone: () => done = true); + + await Future(() {}); + // Nothing can ever be emitted on the result, may as well close. + expect(done, true); + }); + + test('ends if other stream closes without ever emitting a value', () async { + var source = StreamController<int>(); + var other = const Stream<int>.empty(); + + int sum(int a, int b) => a + b; + + var done = false; + source.stream + .combineLatest(other, sum) + .listen(null, onDone: () => done = true); + + await Future(() {}); + // Nothing can ever be emitted on the result, may as well close. + expect(done, true); + }); + + test('forwards errors', () async { + var source = StreamController<int>(); + var other = StreamController<int>(); + int sum(int a, int b) => throw _NumberedException(3); + + var errors = <Object>[]; + source.stream + .combineLatest(other.stream, sum) + .listen(null, onError: errors.add); + + source.addError(_NumberedException(1)); + other.addError(_NumberedException(2)); + + source.add(1); + other.add(2); + + await Future(() {}); + + expect(errors, [_isException(1), _isException(2), _isException(3)]); + }); + + group('broadcast source', () { + test('can cancel and relisten to broadcast stream', () async { + var source = StreamController<int>.broadcast(); + var other = StreamController<int>(); + int combine(int a, int b) => a + b; + + var emittedValues = <int>[]; + var transformed = source.stream.combineLatest(other.stream, combine); + + var subscription = transformed.listen(emittedValues.add); + + source.add(1); + other.add(2); + await Future(() {}); + expect(emittedValues, [3]); + + await subscription.cancel(); + + subscription = transformed.listen(emittedValues.add); + source.add(3); + await Future(() {}); + expect(emittedValues, [3, 5]); + }); + }); + }); +} + +class _NumberedException implements Exception { + final int id; + _NumberedException(this.id); +} + +Matcher _isException(int id) => + const TypeMatcher<_NumberedException>().having((n) => n.id, 'id', id);
diff --git a/pkgs/stream_transform/test/concurrent_async_map_test.dart b/pkgs/stream_transform/test/concurrent_async_map_test.dart new file mode 100644 index 0000000..1807f9f --- /dev/null +++ b/pkgs/stream_transform/test/concurrent_async_map_test.dart
@@ -0,0 +1,157 @@ +// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +import 'utils.dart'; + +void main() { + late StreamController<int> controller; + late List<String> emittedValues; + late bool valuesCanceled; + late bool isDone; + late List<String> errors; + late Stream<String> transformed; + late StreamSubscription<String> subscription; + + late List<Completer<String>> finishWork; + late List<dynamic> values; + + Future<String> convert(int value) { + values.add(value); + var completer = Completer<String>(); + finishWork.add(completer); + return completer.future; + } + + for (var streamType in streamTypes) { + group('concurrentAsyncMap for stream type: [$streamType]', () { + setUp(() { + valuesCanceled = false; + controller = createController(streamType) + ..onCancel = () { + valuesCanceled = true; + }; + emittedValues = []; + errors = []; + isDone = false; + finishWork = []; + values = []; + transformed = controller.stream.concurrentAsyncMap(convert); + subscription = transformed + .listen(emittedValues.add, onError: errors.add, onDone: () { + isDone = true; + }); + }); + + test('does not emit before convert finishes', () async { + controller.add(1); + await Future(() {}); + expect(emittedValues, isEmpty); + expect(values, [1]); + finishWork.first.complete('result'); + await Future(() {}); + expect(emittedValues, ['result']); + }); + + test('allows calls to convert before the last one finished', () async { + controller + ..add(1) + ..add(2) + ..add(3); + await Future(() {}); + expect(values, [1, 2, 3]); + }); + + test('forwards errors directly without waiting for previous convert', + () async { + controller.add(1); + await Future(() {}); + controller.addError('error'); + await Future(() {}); + expect(errors, ['error']); + }); + + test('forwards errors which occur during the convert', () async { + controller.add(1); + await Future(() {}); + finishWork.first.completeError('error'); + await Future(() {}); + expect(errors, ['error']); + }); + + test('can continue handling events after an error', () async { + controller.add(1); + await Future(() {}); + finishWork[0].completeError('error'); + controller.add(2); + await Future(() {}); + expect(values, [1, 2]); + finishWork[1].completeError('another'); + await Future(() {}); + expect(errors, ['error', 'another']); + }); + + test('cancels value subscription when output canceled', () async { + expect(valuesCanceled, false); + await subscription.cancel(); + expect(valuesCanceled, true); + }); + + test('closes when values end if no conversion is pending', () async { + expect(isDone, false); + await controller.close(); + await Future(() {}); + expect(isDone, true); + }); + + if (streamType == 'broadcast') { + test('multiple listeners all get values', () async { + var otherValues = <String>[]; + transformed.listen(otherValues.add); + controller.add(1); + await Future(() {}); + finishWork.first.complete('result'); + await Future(() {}); + expect(emittedValues, ['result']); + expect(otherValues, ['result']); + }); + + test('multiple listeners get done when values end', () async { + var otherDone = false; + transformed.listen(null, onDone: () => otherDone = true); + controller.add(1); + await Future(() {}); + await controller.close(); + expect(isDone, false); + expect(otherDone, false); + finishWork.first.complete(''); + await Future(() {}); + expect(isDone, true); + expect(otherDone, true); + }); + + test('can cancel and relisten', () async { + controller.add(1); + await Future(() {}); + finishWork.first.complete('first'); + await Future(() {}); + await subscription.cancel(); + controller.add(2); + await Future(() {}); + subscription = transformed.listen(emittedValues.add); + controller.add(3); + await Future(() {}); + expect(values, [1, 3]); + finishWork[1].complete('second'); + await Future(() {}); + expect(emittedValues, ['first', 'second']); + }); + } + }); + } +}
diff --git a/pkgs/stream_transform/test/debounce_test.dart b/pkgs/stream_transform/test/debounce_test.dart new file mode 100644 index 0000000..19de055 --- /dev/null +++ b/pkgs/stream_transform/test/debounce_test.dart
@@ -0,0 +1,310 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:fake_async/fake_async.dart'; +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +import 'utils.dart'; + +void main() { + for (var streamType in streamTypes) { + group('Stream type [$streamType]', () { + group('debounce - trailing', () { + late StreamController<int> values; + late List<int> emittedValues; + late bool valuesCanceled; + late bool isDone; + late List<String> errors; + late StreamSubscription<int> subscription; + late Stream<int> transformed; + + setUp(() async { + valuesCanceled = false; + values = createController(streamType) + ..onCancel = () { + valuesCanceled = true; + }; + emittedValues = []; + errors = []; + isDone = false; + transformed = values.stream.debounce(const Duration(milliseconds: 5)); + }); + + void listen() { + subscription = transformed + .listen(emittedValues.add, onError: errors.add, onDone: () { + isDone = true; + }); + } + + test('cancels values', () async { + listen(); + await subscription.cancel(); + expect(valuesCanceled, true); + }); + + test('swallows values that come faster than duration', () { + fakeAsync((async) { + listen(); + values + ..add(1) + ..add(2) + ..close(); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [2]); + }); + }); + + test('outputs multiple values spaced further than duration', () { + fakeAsync((async) { + listen(); + values.add(1); + async.elapse(const Duration(milliseconds: 6)); + values.add(2); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [1, 2]); + }); + }); + + test('waits for pending value to close', () { + fakeAsync((async) { + listen(); + values.add(1); + async.elapse(const Duration(milliseconds: 6)); + values.close(); + async.flushMicrotasks(); + expect(isDone, true); + }); + }); + + test('closes output if there are no pending values', () { + fakeAsync((async) { + listen(); + values.add(1); + async.elapse(const Duration(milliseconds: 6)); + values + ..add(2) + ..close(); + async.flushMicrotasks(); + expect(isDone, false); + async.elapse(const Duration(milliseconds: 6)); + expect(isDone, true); + }); + }); + + if (streamType == 'broadcast') { + test('multiple listeners all get values', () { + fakeAsync((async) { + listen(); + var otherValues = <int>[]; + transformed.listen(otherValues.add); + values + ..add(1) + ..add(2); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [2]); + expect(otherValues, [2]); + }); + }); + } + }); + + group('debounce - leading', () { + late StreamController<int> values; + late List<int> emittedValues; + late Stream<int> transformed; + late bool isDone; + + setUp(() async { + values = createController(streamType); + emittedValues = []; + isDone = false; + transformed = values.stream.debounce(const Duration(milliseconds: 5), + leading: true, trailing: false); + }); + + void listen() { + transformed.listen(emittedValues.add, onDone: () { + isDone = true; + }); + } + + test('swallows values that come faster than duration', () async { + listen(); + values + ..add(1) + ..add(2); + await values.close(); + expect(emittedValues, [1]); + }); + + test('outputs multiple values spaced further than duration', () { + fakeAsync((async) { + listen(); + values.add(1); + async.elapse(const Duration(milliseconds: 6)); + values.add(2); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [1, 2]); + }); + }); + + if (streamType == 'broadcast') { + test('multiple listeners all get values', () { + fakeAsync((async) { + listen(); + var otherValues = <int>[]; + transformed.listen(otherValues.add); + values + ..add(1) + ..add(2); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [1]); + expect(otherValues, [1]); + }); + }); + } + + test('closes output immediately if not waiting for trailing value', + () async { + listen(); + values.add(1); + await values.close(); + expect(isDone, true); + }); + }); + + group('debounce - leading and trailing', () { + late StreamController<int> values; + late List<int> emittedValues; + late Stream<int> transformed; + + setUp(() async { + values = createController(streamType); + emittedValues = []; + transformed = values.stream.debounce(const Duration(milliseconds: 5), + leading: true, trailing: true); + }); + void listen() { + transformed.listen(emittedValues.add); + } + + test('swallows values that come faster than duration', () { + fakeAsync((async) { + listen(); + values + ..add(1) + ..add(2) + ..add(3) + ..close(); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [1, 3]); + }); + }); + + test('outputs multiple values spaced further than duration', () { + fakeAsync((async) { + listen(); + values.add(1); + async.elapse(const Duration(milliseconds: 6)); + values.add(2); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [1, 2]); + }); + }); + + if (streamType == 'broadcast') { + test('multiple listeners all get values', () { + fakeAsync((async) { + listen(); + var otherValues = <int>[]; + transformed.listen(otherValues.add); + values + ..add(1) + ..add(2); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [1, 2]); + expect(otherValues, [1, 2]); + }); + }); + } + }); + + group('debounceBuffer', () { + late StreamController<int> values; + late List<List<int>> emittedValues; + late List<String> errors; + late Stream<List<int>> transformed; + + setUp(() async { + values = createController(streamType); + emittedValues = []; + errors = []; + transformed = + values.stream.debounceBuffer(const Duration(milliseconds: 5)); + }); + void listen() { + transformed.listen(emittedValues.add, onError: errors.add); + } + + test('Emits all values as a list', () { + fakeAsync((async) { + listen(); + values + ..add(1) + ..add(2) + ..close(); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [ + [1, 2] + ]); + }); + }); + + test('separate lists for multiple values spaced further than duration', + () { + fakeAsync((async) { + listen(); + values.add(1); + async.elapse(const Duration(milliseconds: 6)); + values.add(2); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [ + [1], + [2] + ]); + }); + }); + + if (streamType == 'broadcast') { + test('multiple listeners all get values', () { + fakeAsync((async) { + listen(); + var otherValues = <List<int>>[]; + transformed.listen(otherValues.add); + values + ..add(1) + ..add(2); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [ + [1, 2] + ]); + expect(otherValues, [ + [1, 2] + ]); + }); + }); + } + }); + }); + } + test('allows nulls', () async { + final values = Stream<int?>.fromIterable([null]); + final transformed = values.debounce(const Duration(milliseconds: 1)); + expect(await transformed.toList(), [null]); + }); +}
diff --git a/pkgs/stream_transform/test/followd_by_test.dart b/pkgs/stream_transform/test/followd_by_test.dart new file mode 100644 index 0000000..d600d13 --- /dev/null +++ b/pkgs/stream_transform/test/followd_by_test.dart
@@ -0,0 +1,159 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. +import 'dart:async'; + +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +import 'utils.dart'; + +void main() { + for (var firstType in streamTypes) { + for (var secondType in streamTypes) { + group('followedBy [$firstType] with [$secondType]', () { + late StreamController<int> first; + late StreamController<int> second; + + late List<int> emittedValues; + late bool firstCanceled; + late bool secondCanceled; + late bool secondListened; + late bool isDone; + late List<String> errors; + late Stream<int> transformed; + late StreamSubscription<int> subscription; + + setUp(() async { + firstCanceled = false; + secondCanceled = false; + secondListened = false; + first = createController(firstType) + ..onCancel = () { + firstCanceled = true; + }; + second = createController(secondType) + ..onCancel = () { + secondCanceled = true; + } + ..onListen = () { + secondListened = true; + }; + emittedValues = []; + errors = []; + isDone = false; + transformed = first.stream.followedBy(second.stream); + subscription = transformed + .listen(emittedValues.add, onError: errors.add, onDone: () { + isDone = true; + }); + }); + + test('adds all values from both streams', () async { + first + ..add(1) + ..add(2); + await first.close(); + await Future(() {}); + second + ..add(3) + ..add(4); + await Future(() {}); + expect(emittedValues, [1, 2, 3, 4]); + }); + + test('Does not listen to second stream before first stream finishes', + () async { + expect(secondListened, false); + await first.close(); + expect(secondListened, true); + }); + + test('closes stream after both inputs close', () async { + await first.close(); + await second.close(); + expect(isDone, true); + }); + + test('cancels any type of first stream on cancel', () async { + await subscription.cancel(); + expect(firstCanceled, true); + }); + + if (firstType == 'single subscription') { + test( + 'cancels any type of second stream on cancel if first is ' + 'broadcast', () async { + await first.close(); + await subscription.cancel(); + expect(secondCanceled, true); + }); + + if (secondType == 'broadcast') { + test('can pause and resume during second stream - dropping values', + () async { + await first.close(); + subscription.pause(); + second.add(1); + await Future(() {}); + subscription.resume(); + second.add(2); + await Future(() {}); + expect(emittedValues, [2]); + }); + } else { + test('can pause and resume during second stream - buffering values', + () async { + await first.close(); + subscription.pause(); + second.add(1); + await Future(() {}); + subscription.resume(); + second.add(2); + await Future(() {}); + expect(emittedValues, [1, 2]); + }); + } + } + + if (firstType == 'broadcast') { + test('can cancel and relisten during first stream', () async { + await subscription.cancel(); + first.add(1); + subscription = transformed.listen(emittedValues.add); + first.add(2); + await Future(() {}); + expect(emittedValues, [2]); + }); + + test('can cancel and relisten during second stream', () async { + await first.close(); + await subscription.cancel(); + second.add(2); + await Future(() {}); + subscription = transformed.listen(emittedValues.add); + second.add(3); + await Future(() {}); + expect(emittedValues, [3]); + }); + + test('forwards values to multiple listeners', () async { + var otherValues = <int>[]; + transformed.listen(otherValues.add); + first.add(1); + await first.close(); + second.add(2); + await Future(() {}); + var thirdValues = <int>[]; + transformed.listen(thirdValues.add); + second.add(3); + await Future(() {}); + expect(emittedValues, [1, 2, 3]); + expect(otherValues, [1, 2, 3]); + expect(thirdValues, [3]); + }); + } + }); + } + } +}
diff --git a/pkgs/stream_transform/test/from_handlers_test.dart b/pkgs/stream_transform/test/from_handlers_test.dart new file mode 100644 index 0000000..694199c --- /dev/null +++ b/pkgs/stream_transform/test/from_handlers_test.dart
@@ -0,0 +1,183 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_transform/src/from_handlers.dart'; +import 'package:test/test.dart'; + +void main() { + late StreamController<int> values; + late List<int> emittedValues; + late bool valuesCanceled; + late bool isDone; + late List<String> errors; + late Stream<int> transformed; + late StreamSubscription<int> subscription; + + void setUpForController(StreamController<int> controller, + Stream<int> Function(Stream<int>) transform) { + valuesCanceled = false; + values = controller + ..onCancel = () { + valuesCanceled = true; + }; + emittedValues = []; + errors = []; + isDone = false; + transformed = transform(values.stream); + subscription = + transformed.listen(emittedValues.add, onError: errors.add, onDone: () { + isDone = true; + }); + } + + group('default from_handlers', () { + group('Single subscription stream', () { + setUp(() { + setUpForController(StreamController(), + (s) => s.transformByHandlers(onData: (e, sink) => sink.add(e))); + }); + + test('has correct stream type', () { + expect(transformed.isBroadcast, false); + }); + + test('forwards values', () async { + values + ..add(1) + ..add(2); + await Future(() {}); + expect(emittedValues, [1, 2]); + }); + + test('forwards errors', () async { + values.addError('error'); + await Future(() {}); + expect(errors, ['error']); + }); + + test('forwards done', () async { + await values.close(); + expect(isDone, true); + }); + + test('forwards cancel', () async { + await subscription.cancel(); + expect(valuesCanceled, true); + }); + }); + + group('broadcast stream with muliple listeners', () { + late List<int> emittedValues2; + late List<String> errors2; + late bool isDone2; + late StreamSubscription<int> subscription2; + + setUp(() { + setUpForController(StreamController.broadcast(), + (s) => s.transformByHandlers(onData: (e, sink) => sink.add(e))); + emittedValues2 = []; + errors2 = []; + isDone2 = false; + subscription2 = transformed + .listen(emittedValues2.add, onError: errors2.add, onDone: () { + isDone2 = true; + }); + }); + + test('has correct stream type', () { + expect(transformed.isBroadcast, true); + }); + + test('forwards values', () async { + values + ..add(1) + ..add(2); + await Future(() {}); + expect(emittedValues, [1, 2]); + expect(emittedValues2, [1, 2]); + }); + + test('forwards errors', () async { + values.addError('error'); + await Future(() {}); + expect(errors, ['error']); + expect(errors2, ['error']); + }); + + test('forwards done', () async { + await values.close(); + expect(isDone, true); + expect(isDone2, true); + }); + + test('forwards cancel', () async { + await subscription.cancel(); + expect(valuesCanceled, false); + await subscription2.cancel(); + expect(valuesCanceled, true); + }); + }); + }); + + group('custom handlers', () { + group('single subscription', () { + setUp(() async { + setUpForController( + StreamController(), + (s) => s.transformByHandlers(onData: (value, sink) { + sink.add(value + 1); + })); + }); + test('uses transform from handleData', () async { + values + ..add(1) + ..add(2); + await Future(() {}); + expect(emittedValues, [2, 3]); + }); + }); + + group('broadcast stream with multiple listeners', () { + late int dataCallCount; + late int doneCallCount; + late int errorCallCount; + + setUp(() async { + dataCallCount = 0; + doneCallCount = 0; + errorCallCount = 0; + setUpForController( + StreamController.broadcast(), + (s) => s.transformByHandlers(onData: (value, sink) { + dataCallCount++; + }, onError: (error, stackTrace, sink) { + errorCallCount++; + sink.addError(error, stackTrace); + }, onDone: (sink) { + doneCallCount++; + })); + transformed.listen((_) {}, onError: (_, __) {}); + }); + + test('handles data once', () async { + values.add(1); + await Future(() {}); + expect(dataCallCount, 1); + }); + + test('handles done once', () async { + await values.close(); + expect(doneCallCount, 1); + }); + + test('handles errors once', () async { + values.addError('error'); + await Future(() {}); + expect(errorCallCount, 1); + }); + }); + }); +}
diff --git a/pkgs/stream_transform/test/merge_test.dart b/pkgs/stream_transform/test/merge_test.dart new file mode 100644 index 0000000..ecbf97f --- /dev/null +++ b/pkgs/stream_transform/test/merge_test.dart
@@ -0,0 +1,140 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +void main() { + group('merge', () { + test('includes all values', () async { + var first = Stream.fromIterable([1, 2, 3]); + var second = Stream.fromIterable([4, 5, 6]); + var allValues = await first.merge(second).toList(); + expect(allValues, containsAllInOrder([1, 2, 3])); + expect(allValues, containsAllInOrder([4, 5, 6])); + expect(allValues, hasLength(6)); + }); + + test('cancels both sources', () async { + var firstCanceled = false; + var first = StreamController<int>() + ..onCancel = () { + firstCanceled = true; + }; + var secondCanceled = false; + var second = StreamController<int>() + ..onCancel = () { + secondCanceled = true; + }; + var subscription = first.stream.merge(second.stream).listen((_) {}); + await subscription.cancel(); + expect(firstCanceled, true); + expect(secondCanceled, true); + }); + + test('completes when both sources complete', () async { + var first = StreamController<int>(); + var second = StreamController<int>(); + var isDone = false; + first.stream.merge(second.stream).listen((_) {}, onDone: () { + isDone = true; + }); + await first.close(); + expect(isDone, false); + await second.close(); + expect(isDone, true); + }); + + test('can cancel and relisten to broadcast stream', () async { + var first = StreamController<int>.broadcast(); + var second = StreamController<int>(); + var emittedValues = <int>[]; + var transformed = first.stream.merge(second.stream); + var subscription = transformed.listen(emittedValues.add); + first.add(1); + second.add(2); + await Future(() {}); + expect(emittedValues, contains(1)); + expect(emittedValues, contains(2)); + await subscription.cancel(); + emittedValues = []; + subscription = transformed.listen(emittedValues.add); + first.add(3); + second.add(4); + await Future(() {}); + expect(emittedValues, contains(3)); + expect(emittedValues, contains(4)); + }); + }); + + group('mergeAll', () { + test('includes all values', () async { + var first = Stream.fromIterable([1, 2, 3]); + var second = Stream.fromIterable([4, 5, 6]); + var third = Stream.fromIterable([7, 8, 9]); + var allValues = await first.mergeAll([second, third]).toList(); + expect(allValues, containsAllInOrder([1, 2, 3])); + expect(allValues, containsAllInOrder([4, 5, 6])); + expect(allValues, containsAllInOrder([7, 8, 9])); + expect(allValues, hasLength(9)); + }); + + test('handles mix of broadcast and single-subscription', () async { + var firstCanceled = false; + var first = StreamController<int>.broadcast() + ..onCancel = () { + firstCanceled = true; + }; + var secondBroadcastCanceled = false; + var secondBroadcast = StreamController<int>.broadcast() + ..onCancel = () { + secondBroadcastCanceled = true; + }; + var secondSingleCanceled = false; + var secondSingle = StreamController<int>() + ..onCancel = () { + secondSingleCanceled = true; + }; + + var merged = + first.stream.mergeAll([secondBroadcast.stream, secondSingle.stream]); + + var firstListenerValues = <int>[]; + var secondListenerValues = <int>[]; + + var firstSubscription = merged.listen(firstListenerValues.add); + var secondSubscription = merged.listen(secondListenerValues.add); + + first.add(1); + secondBroadcast.add(2); + secondSingle.add(3); + + await Future(() {}); + await firstSubscription.cancel(); + + expect(firstCanceled, false); + expect(secondBroadcastCanceled, false); + expect(secondSingleCanceled, false); + + first.add(4); + secondBroadcast.add(5); + secondSingle.add(6); + + await Future(() {}); + await secondSubscription.cancel(); + + await Future(() {}); + expect(firstCanceled, true); + expect(secondBroadcastCanceled, true); + expect(secondSingleCanceled, false, + reason: 'Single subscription streams merged into broadcast streams ' + 'are not canceled'); + + expect(firstListenerValues, [1, 2, 3]); + expect(secondListenerValues, [1, 2, 3, 4, 5, 6]); + }); + }); +}
diff --git a/pkgs/stream_transform/test/sample_test.dart b/pkgs/stream_transform/test/sample_test.dart new file mode 100644 index 0000000..66ca09d --- /dev/null +++ b/pkgs/stream_transform/test/sample_test.dart
@@ -0,0 +1,291 @@ +// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +import 'utils.dart'; + +void main() { + late StreamController<void> trigger; + late StreamController<int> values; + late List<int> emittedValues; + late bool valuesCanceled; + late bool triggerCanceled; + late bool triggerPaused; + late bool isDone; + late List<String> errors; + late Stream<int> transformed; + late StreamSubscription<int> subscription; + + void setUpForStreamTypes(String triggerType, String valuesType, + {required bool longPoll}) { + valuesCanceled = false; + triggerCanceled = false; + triggerPaused = false; + trigger = createController(triggerType) + ..onCancel = () { + triggerCanceled = true; + }; + if (triggerType == 'single subscription') { + trigger.onPause = () { + triggerPaused = true; + }; + } + values = createController(valuesType) + ..onCancel = () { + valuesCanceled = true; + }; + emittedValues = []; + errors = []; + isDone = false; + transformed = values.stream.sample(trigger.stream, longPoll: longPoll); + subscription = + transformed.listen(emittedValues.add, onError: errors.add, onDone: () { + isDone = true; + }); + } + + for (var triggerType in streamTypes) { + for (var valuesType in streamTypes) { + group('Trigger type: [$triggerType], Values type: [$valuesType]', () { + group('general behavior', () { + setUp(() { + setUpForStreamTypes(triggerType, valuesType, longPoll: true); + }); + + test('does not emit before `trigger`', () async { + values.add(1); + await Future(() {}); + expect(emittedValues, isEmpty); + trigger.add(null); + await Future(() {}); + expect(emittedValues, [1]); + }); + + test('keeps most recent event between triggers', () async { + values + ..add(1) + ..add(2); + await Future(() {}); + trigger.add(null); + values + ..add(3) + ..add(4); + await Future(() {}); + trigger.add(null); + await Future(() {}); + expect(emittedValues, [2, 4]); + }); + + test('cancels value subscription when output canceled', () async { + expect(valuesCanceled, false); + await subscription.cancel(); + expect(valuesCanceled, true); + }); + + test('closes when trigger ends', () async { + expect(isDone, false); + await trigger.close(); + await Future(() {}); + expect(isDone, true); + }); + + test('closes after outputting final values when source closes', + () async { + expect(isDone, false); + values.add(1); + await values.close(); + expect(isDone, false); + trigger.add(null); + await Future(() {}); + expect(emittedValues, [1]); + expect(isDone, true); + }); + + test('closes when source closes and there is no pending', () async { + expect(isDone, false); + await values.close(); + await Future(() {}); + expect(isDone, true); + }); + + test('forwards errors from trigger', () async { + trigger.addError('error'); + await Future(() {}); + expect(errors, ['error']); + }); + + test('forwards errors from values', () async { + values.addError('error'); + await Future(() {}); + expect(errors, ['error']); + }); + }); + + group('long polling', () { + setUp(() { + setUpForStreamTypes(triggerType, valuesType, longPoll: true); + }); + + test('emits immediately if trigger emits before a value', () async { + trigger.add(null); + await Future(() {}); + expect(emittedValues, isEmpty); + values.add(1); + await Future(() {}); + expect(emittedValues, [1]); + }); + + test('two triggers in a row - emit buffere then emit next value', + () async { + values + ..add(1) + ..add(2); + await Future(() {}); + trigger + ..add(null) + ..add(null); + await Future(() {}); + values.add(3); + await Future(() {}); + expect(emittedValues, [2, 3]); + }); + + test('pre-emptive trigger then trigger after values', () async { + trigger.add(null); + await Future(() {}); + values + ..add(1) + ..add(2); + await Future(() {}); + trigger.add(null); + await Future(() {}); + expect(emittedValues, [1, 2]); + }); + + test('multiple pre-emptive triggers, only emits first value', + () async { + trigger + ..add(null) + ..add(null); + await Future(() {}); + values + ..add(1) + ..add(2); + await Future(() {}); + expect(emittedValues, [1]); + }); + + test('closes if there is no waiting long poll when source closes', + () async { + expect(isDone, false); + values.add(1); + trigger.add(null); + await values.close(); + await Future(() {}); + expect(isDone, true); + }); + + test('waits to emit if there waiting long poll when trigger closes', + () async { + trigger.add(null); + await trigger.close(); + expect(isDone, false); + values.add(1); + await Future(() {}); + expect(emittedValues, [1]); + expect(isDone, true); + }); + }); + + group('immediate polling', () { + setUp(() { + setUpForStreamTypes(triggerType, valuesType, longPoll: false); + }); + + test('ignores trigger before values', () async { + trigger.add(null); + await Future(() {}); + values + ..add(1) + ..add(2); + await Future(() {}); + trigger.add(null); + await Future(() {}); + expect(emittedValues, [2]); + }); + + test('ignores trigger if no pending values', () async { + values + ..add(1) + ..add(2); + await Future(() {}); + trigger + ..add(null) + ..add(null); + await Future(() {}); + values + ..add(3) + ..add(4); + await Future(() {}); + trigger.add(null); + await Future(() {}); + expect(emittedValues, [2, 4]); + }); + }); + }); + } + } + + test('always cancels trigger if values is singlesubscription', () async { + setUpForStreamTypes('broadcast', 'single subscription', longPoll: true); + expect(triggerCanceled, false); + await subscription.cancel(); + expect(triggerCanceled, true); + + setUpForStreamTypes('single subscription', 'single subscription', + longPoll: true); + expect(triggerCanceled, false); + await subscription.cancel(); + expect(triggerCanceled, true); + }); + + test('cancels trigger if trigger is broadcast', () async { + setUpForStreamTypes('broadcast', 'broadcast', longPoll: true); + expect(triggerCanceled, false); + await subscription.cancel(); + expect(triggerCanceled, true); + }); + + test('pauses single subscription trigger for broadcast values', () async { + setUpForStreamTypes('single subscription', 'broadcast', longPoll: true); + expect(triggerCanceled, false); + expect(triggerPaused, false); + await subscription.cancel(); + expect(triggerCanceled, false); + expect(triggerPaused, true); + }); + + for (var triggerType in streamTypes) { + test('cancel and relisten with [$triggerType] trigger', () async { + setUpForStreamTypes(triggerType, 'broadcast', longPoll: true); + values.add(1); + trigger.add(null); + await Future(() {}); + expect(emittedValues, [1]); + await subscription.cancel(); + values.add(2); + trigger.add(null); + await Future(() {}); + subscription = transformed.listen(emittedValues.add); + values.add(3); + trigger.add(null); + await Future(() {}); + expect(emittedValues, [1, 3]); + }); + } +}
diff --git a/pkgs/stream_transform/test/scan_test.dart b/pkgs/stream_transform/test/scan_test.dart new file mode 100644 index 0000000..3c749e7 --- /dev/null +++ b/pkgs/stream_transform/test/scan_test.dart
@@ -0,0 +1,109 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +void main() { + group('Scan', () { + test('produces intermediate values', () async { + var source = Stream.fromIterable([1, 2, 3, 4]); + int sum(int x, int y) => x + y; + var result = await source.scan(0, sum).toList(); + + expect(result, [1, 3, 6, 10]); + }); + + test('can create a broadcast stream', () { + var source = StreamController<int>.broadcast(); + + var transformed = source.stream.scan(null, (_, __) {}); + + expect(transformed.isBroadcast, true); + }); + + test('forwards errors from source', () async { + var source = StreamController<int>(); + + int sum(int x, int y) => x + y; + + var errors = <Object>[]; + + source.stream.scan(0, sum).listen(null, onError: errors.add); + + source.addError(StateError('fail')); + await Future(() {}); + + expect(errors, [isStateError]); + }); + + group('with async combine', () { + test('returns a Stream of non-futures', () async { + var source = Stream.fromIterable([1, 2, 3, 4]); + Future<int> sum(int x, int y) async => x + y; + var result = await source.scan(0, sum).toList(); + + expect(result, [1, 3, 6, 10]); + }); + + test('can return a Stream of futures when specified', () async { + var source = Stream.fromIterable([1, 2]); + Future<int> sum(Future<int> x, int y) async => (await x) + y; + var result = + await source.scan<Future<int>>(Future.value(0), sum).toList(); + + expect(result, [ + const TypeMatcher<Future<void>>(), + const TypeMatcher<Future<void>>() + ]); + expect(await result.wait, [1, 3]); + }); + + test('does not call for subsequent values while waiting', () async { + var source = StreamController<int>(); + + var calledWith = <int>[]; + var block = Completer<void>(); + Future<int> combine(int x, int y) async { + calledWith.add(y); + await block.future; + return x + y; + } + + var results = <int>[]; + + unawaited(source.stream.scan(0, combine).forEach(results.add)); + + source + ..add(1) + ..add(2); + await Future(() {}); + expect(calledWith, [1]); + expect(results, isEmpty); + + block.complete(); + await Future(() {}); + expect(calledWith, [1, 2]); + expect(results, [1, 3]); + }); + + test('forwards async errors', () async { + var source = StreamController<int>(); + + Future<int> combine(int x, int y) async => throw StateError('fail'); + + var errors = <Object>[]; + + source.stream.scan(0, combine).listen(null, onError: errors.add); + + source.add(1); + await Future(() {}); + + expect(errors, [isStateError]); + }); + }); + }); +}
diff --git a/pkgs/stream_transform/test/start_with_test.dart b/pkgs/stream_transform/test/start_with_test.dart new file mode 100644 index 0000000..35f0330 --- /dev/null +++ b/pkgs/stream_transform/test/start_with_test.dart
@@ -0,0 +1,167 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +import 'utils.dart'; + +void main() { + late StreamController<int> values; + late Stream<int> transformed; + late StreamSubscription<int> subscription; + + late List<int> emittedValues; + late bool isDone; + + void setupForStreamType( + String streamType, Stream<int> Function(Stream<int>) transform) { + emittedValues = []; + isDone = false; + values = createController(streamType); + transformed = transform(values.stream); + subscription = + transformed.listen(emittedValues.add, onDone: () => isDone = true); + } + + for (var streamType in streamTypes) { + group('startWith then [$streamType]', () { + setUp(() => setupForStreamType(streamType, (s) => s.startWith(1))); + + test('outputs all values', () async { + values + ..add(2) + ..add(3); + await Future(() {}); + expect(emittedValues, [1, 2, 3]); + }); + + test('outputs initial when followed by empty stream', () async { + await values.close(); + expect(emittedValues, [1]); + }); + + test('closes with values', () async { + expect(isDone, false); + await values.close(); + expect(isDone, true); + }); + + if (streamType == 'broadcast') { + test('can cancel and relisten', () async { + values.add(2); + await Future(() {}); + await subscription.cancel(); + subscription = transformed.listen(emittedValues.add); + values.add(3); + await Future(() {}); + await Future(() {}); + expect(emittedValues, [1, 2, 3]); + }); + } + }); + + group('startWithMany then [$streamType]', () { + setUp(() async { + setupForStreamType(streamType, (s) => s.startWithMany([1, 2])); + // Ensure all initial values go through + await Future(() {}); + }); + + test('outputs all values', () async { + values + ..add(3) + ..add(4); + await Future(() {}); + expect(emittedValues, [1, 2, 3, 4]); + }); + + test('outputs initial when followed by empty stream', () async { + await values.close(); + expect(emittedValues, [1, 2]); + }); + + test('closes with values', () async { + expect(isDone, false); + await values.close(); + expect(isDone, true); + }); + + if (streamType == 'broadcast') { + test('can cancel and relisten', () async { + values.add(3); + await Future(() {}); + await subscription.cancel(); + subscription = transformed.listen(emittedValues.add); + values.add(4); + await Future(() {}); + expect(emittedValues, [1, 2, 3, 4]); + }); + } + }); + + for (var startingStreamType in streamTypes) { + group('startWithStream [$startingStreamType] then [$streamType]', () { + late StreamController<int> starting; + setUp(() async { + starting = createController(startingStreamType); + setupForStreamType( + streamType, (s) => s.startWithStream(starting.stream)); + }); + + test('outputs all values', () async { + starting + ..add(1) + ..add(2); + await starting.close(); + values + ..add(3) + ..add(4); + await Future(() {}); + expect(emittedValues, [1, 2, 3, 4]); + }); + + test('closes with values', () async { + expect(isDone, false); + await starting.close(); + expect(isDone, false); + await values.close(); + expect(isDone, true); + }); + + if (streamType == 'broadcast') { + test('can cancel and relisten during starting', () async { + starting.add(1); + await Future(() {}); + await subscription.cancel(); + subscription = transformed.listen(emittedValues.add); + starting.add(2); + await starting.close(); + values + ..add(3) + ..add(4); + await Future(() {}); + expect(emittedValues, [1, 2, 3, 4]); + }); + + test('can cancel and relisten during values', () async { + starting + ..add(1) + ..add(2); + await starting.close(); + values.add(3); + await Future(() {}); + await subscription.cancel(); + subscription = transformed.listen(emittedValues.add); + values.add(4); + await Future(() {}); + expect(emittedValues, [1, 2, 3, 4]); + }); + } + }); + } + } +}
diff --git a/pkgs/stream_transform/test/switch_test.dart b/pkgs/stream_transform/test/switch_test.dart new file mode 100644 index 0000000..9e70c08 --- /dev/null +++ b/pkgs/stream_transform/test/switch_test.dart
@@ -0,0 +1,229 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +import 'utils.dart'; + +void main() { + for (var outerType in streamTypes) { + for (var innerType in streamTypes) { + group('Outer type: [$outerType], Inner type: [$innerType]', () { + late StreamController<int> first; + late StreamController<int> second; + late StreamController<int> third; + late StreamController<Stream<int>> outer; + + late List<int> emittedValues; + late bool firstCanceled; + late bool outerCanceled; + late bool isDone; + late List<String> errors; + late StreamSubscription<int> subscription; + + setUp(() async { + firstCanceled = false; + outerCanceled = false; + outer = createController(outerType) + ..onCancel = () { + outerCanceled = true; + }; + first = createController(innerType) + ..onCancel = () { + firstCanceled = true; + }; + second = createController(innerType); + third = createController(innerType); + emittedValues = []; + errors = []; + isDone = false; + subscription = outer.stream + .switchLatest() + .listen(emittedValues.add, onError: errors.add, onDone: () { + isDone = true; + }); + }); + + test('forwards events', () async { + outer.add(first.stream); + await Future(() {}); + first + ..add(1) + ..add(2); + await Future(() {}); + + outer.add(second.stream); + await Future(() {}); + second + ..add(3) + ..add(4); + await Future(() {}); + + expect(emittedValues, [1, 2, 3, 4]); + }); + + test('forwards errors from outer Stream', () async { + outer.addError('error'); + await Future(() {}); + expect(errors, ['error']); + }); + + test('forwards errors from inner Stream', () async { + outer.add(first.stream); + await Future(() {}); + first.addError('error'); + await Future(() {}); + expect(errors, ['error']); + }); + + test('closes when final stream is done', () async { + outer.add(first.stream); + await Future(() {}); + + outer.add(second.stream); + await Future(() {}); + + await outer.close(); + expect(isDone, false); + + await second.close(); + expect(isDone, true); + }); + + test( + 'closes when outer stream closes if latest inner stream already ' + 'closed', () async { + outer.add(first.stream); + await Future(() {}); + await first.close(); + expect(isDone, false); + + await outer.close(); + expect(isDone, true); + }); + + test('cancels listeners on previous streams', () async { + outer.add(first.stream); + await Future(() {}); + + outer.add(second.stream); + await Future(() {}); + expect(firstCanceled, true); + }); + + if (innerType != 'broadcast') { + test('waits for cancel before listening to subsequent stream', + () async { + var cancelWork = Completer<void>(); + first.onCancel = () => cancelWork.future; + outer.add(first.stream); + await Future(() {}); + + var cancelDone = false; + second.onListen = expectAsync0(() { + expect(cancelDone, true); + }); + outer.add(second.stream); + await Future(() {}); + cancelWork.complete(); + cancelDone = true; + }); + + test('all streams are listened to, even while cancelling', () async { + var cancelWork = Completer<void>(); + first.onCancel = () => cancelWork.future; + outer.add(first.stream); + await Future(() {}); + + var cancelDone = false; + second.onListen = expectAsync0(() { + expect(cancelDone, true); + }); + third.onListen = expectAsync0(() { + expect(cancelDone, true); + }); + outer + ..add(second.stream) + ..add(third.stream); + await Future(() {}); + cancelWork.complete(); + cancelDone = true; + }); + } + + if (outerType != 'broadcast' && innerType != 'broadcast') { + test('pausing while cancelling an inner stream is respected', + () async { + var cancelWork = Completer<void>(); + first.onCancel = () => cancelWork.future; + outer.add(first.stream); + await Future(() {}); + + var cancelDone = false; + second.onListen = expectAsync0(() { + expect(cancelDone, true); + }); + outer.add(second.stream); + await Future(() {}); + subscription.pause(); + cancelWork.complete(); + cancelDone = true; + await Future(() {}); + expect(second.isPaused, true); + subscription.resume(); + }); + } + + test('cancels listener on current and outer stream on cancel', + () async { + outer.add(first.stream); + await Future(() {}); + await subscription.cancel(); + + await Future(() {}); + expect(outerCanceled, true); + expect(firstCanceled, true); + }); + }); + } + } + + group('switchMap', () { + test('uses map function', () async { + var outer = StreamController<List<int>>(); + + var values = <int>[]; + outer.stream.switchMap(Stream.fromIterable).listen(values.add); + + outer.add([1, 2, 3]); + await Future(() {}); + outer.add([4, 5, 6]); + await Future(() {}); + expect(values, [1, 2, 3, 4, 5, 6]); + }); + + test('can create a broadcast stream', () async { + var outer = StreamController<int>.broadcast(); + + var transformed = + outer.stream.switchMap((_) => const Stream<int>.empty()); + + expect(transformed.isBroadcast, true); + }); + + test('forwards errors from the convert callback', () async { + var errors = <String>[]; + var source = Stream.fromIterable([1, 2, 3]); + source.switchMap<int>((i) { + // ignore: only_throw_errors + throw 'Error: $i'; + }).listen((_) {}, onError: errors.add); + await Future<void>(() {}); + expect(errors, ['Error: 1', 'Error: 2', 'Error: 3']); + }); + }); +}
diff --git a/pkgs/stream_transform/test/take_until_test.dart b/pkgs/stream_transform/test/take_until_test.dart new file mode 100644 index 0000000..982b3da --- /dev/null +++ b/pkgs/stream_transform/test/take_until_test.dart
@@ -0,0 +1,135 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +import 'utils.dart'; + +void main() { + for (var streamType in streamTypes) { + group('takeUntil on Stream type [$streamType]', () { + late StreamController<int> values; + late List<int> emittedValues; + late bool valuesCanceled; + late bool isDone; + late List<String> errors; + late Stream<int> transformed; + late StreamSubscription<int> subscription; + late Completer<void> closeTrigger; + + setUp(() { + valuesCanceled = false; + values = createController(streamType) + ..onCancel = () { + valuesCanceled = true; + }; + emittedValues = []; + errors = []; + isDone = false; + closeTrigger = Completer(); + transformed = values.stream.takeUntil(closeTrigger.future); + subscription = transformed + .listen(emittedValues.add, onError: errors.add, onDone: () { + isDone = true; + }); + }); + + test('forwards cancellation', () async { + await subscription.cancel(); + expect(valuesCanceled, true); + }); + + test('lets values through before trigger', () async { + values + ..add(1) + ..add(2); + await Future(() {}); + expect(emittedValues, [1, 2]); + }); + + test('forwards errors', () async { + values.addError('error'); + await Future(() {}); + expect(errors, ['error']); + }); + + test('sends done if original strem ends', () async { + await values.close(); + expect(isDone, true); + }); + + test('sends done when trigger fires', () async { + closeTrigger.complete(); + await Future(() {}); + expect(isDone, true); + }); + + test('forwards errors from the close trigger', () async { + closeTrigger.completeError('sad'); + await Future(() {}); + expect(errors, ['sad']); + expect(isDone, true); + }); + + test('ignores errors from the close trigger after stream closed', + () async { + await values.close(); + closeTrigger.completeError('sad'); + await Future(() {}); + expect(errors, <Object>[]); + }); + + test('cancels value subscription when trigger fires', () async { + closeTrigger.complete(); + await Future(() {}); + expect(valuesCanceled, true); + }); + + if (streamType == 'broadcast') { + test('multiple listeners all get values', () async { + var otherValues = <Object>[]; + transformed.listen(otherValues.add); + values + ..add(1) + ..add(2); + await Future(() {}); + expect(emittedValues, [1, 2]); + expect(otherValues, [1, 2]); + }); + + test('multiple listeners get done when trigger fires', () async { + var otherDone = false; + transformed.listen(null, onDone: () => otherDone = true); + closeTrigger.complete(); + await Future(() {}); + expect(otherDone, true); + expect(isDone, true); + }); + + test('multiple listeners get done when values end', () async { + var otherDone = false; + transformed.listen(null, onDone: () => otherDone = true); + await values.close(); + expect(otherDone, true); + expect(isDone, true); + }); + + test('can cancel and relisten before trigger fires', () async { + values.add(1); + await Future(() {}); + await subscription.cancel(); + values.add(2); + await Future(() {}); + subscription = transformed.listen(emittedValues.add); + values.add(3); + await Future(() {}); + expect(emittedValues, [1, 3]); + }); + } + }); + } +}
diff --git a/pkgs/stream_transform/test/tap_test.dart b/pkgs/stream_transform/test/tap_test.dart new file mode 100644 index 0000000..f2b4346 --- /dev/null +++ b/pkgs/stream_transform/test/tap_test.dart
@@ -0,0 +1,116 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +void main() { + test('calls function for values', () async { + var valuesSeen = <int>[]; + var stream = Stream.fromIterable([1, 2, 3]); + await stream.tap(valuesSeen.add).last; + expect(valuesSeen, [1, 2, 3]); + }); + + test('forwards values', () async { + var stream = Stream.fromIterable([1, 2, 3]); + var values = await stream.tap((_) {}).toList(); + expect(values, [1, 2, 3]); + }); + + test('calls function for errors', () async { + dynamic error; + var source = StreamController<int>(); + source.stream.tap((_) {}, onError: (e, st) { + error = e; + }).listen((_) {}, onError: (_) {}); + source.addError('error'); + await Future(() {}); + expect(error, 'error'); + }); + + test('forwards errors', () async { + dynamic error; + var source = StreamController<int>(); + source.stream.tap((_) {}, onError: (e, st) {}).listen((_) {}, + onError: (Object e) { + error = e; + }); + source.addError('error'); + await Future(() {}); + expect(error, 'error'); + }); + + test('calls function on done', () async { + var doneCalled = false; + var source = StreamController<int>(); + source.stream.tap((_) {}, onDone: () { + doneCalled = true; + }).listen((_) {}); + await source.close(); + expect(doneCalled, true); + }); + + test('forwards only once with multiple listeners on a broadcast stream', + () async { + var dataCallCount = 0; + var source = StreamController<int>.broadcast(); + source.stream.tap((_) { + dataCallCount++; + }) + ..listen((_) {}) + ..listen((_) {}); + source.add(1); + await Future(() {}); + expect(dataCallCount, 1); + }); + + test( + 'forwards errors only once with multiple listeners on a broadcast stream', + () async { + var errorCallCount = 0; + var source = StreamController<int>.broadcast(); + source.stream.tap((_) {}, onError: (_, __) { + errorCallCount++; + }) + ..listen((_) {}, onError: (_, __) {}) + ..listen((_) {}, onError: (_, __) {}); + source.addError('error'); + await Future(() {}); + expect(errorCallCount, 1); + }); + + test('calls onDone only once with multiple listeners on a broadcast stream', + () async { + var doneCallCount = 0; + var source = StreamController<int>.broadcast(); + source.stream.tap((_) {}, onDone: () { + doneCallCount++; + }) + ..listen((_) {}) + ..listen((_) {}); + await source.close(); + expect(doneCallCount, 1); + }); + + test('forwards values to multiple listeners', () async { + var source = StreamController<int>.broadcast(); + var emittedValues1 = <int>[]; + var emittedValues2 = <int>[]; + source.stream.tap((_) {}) + ..listen(emittedValues1.add) + ..listen(emittedValues2.add); + source.add(1); + await Future(() {}); + expect(emittedValues1, [1]); + expect(emittedValues2, [1]); + }); + + test('allows null callback', () async { + var stream = Stream.fromIterable([1, 2, 3]); + await stream.tap(null).last; + }); +}
diff --git a/pkgs/stream_transform/test/throttle_test.dart b/pkgs/stream_transform/test/throttle_test.dart new file mode 100644 index 0000000..07f607a --- /dev/null +++ b/pkgs/stream_transform/test/throttle_test.dart
@@ -0,0 +1,193 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:fake_async/fake_async.dart'; +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +import 'utils.dart'; + +void main() { + for (var streamType in streamTypes) { + group('Stream type [$streamType]', () { + late StreamController<int> values; + late List<int> emittedValues; + late bool valuesCanceled; + late bool isDone; + late Stream<int> transformed; + late StreamSubscription<int> subscription; + + group('throttle - trailing: false', () { + setUp(() async { + valuesCanceled = false; + values = createController(streamType) + ..onCancel = () { + valuesCanceled = true; + }; + emittedValues = []; + isDone = false; + transformed = values.stream.throttle(const Duration(milliseconds: 5)); + }); + + void listen() { + subscription = transformed.listen(emittedValues.add, onDone: () { + isDone = true; + }); + } + + test('cancels values', () async { + listen(); + await subscription.cancel(); + expect(valuesCanceled, true); + }); + + test('swallows values that come faster than duration', () { + fakeAsync((async) { + listen(); + values + ..add(1) + ..add(2) + ..close(); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [1]); + }); + }); + + test('outputs multiple values spaced further than duration', () { + fakeAsync((async) { + listen(); + values.add(1); + async.elapse(const Duration(milliseconds: 6)); + values.add(2); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [1, 2]); + async.elapse(const Duration(milliseconds: 6)); + }); + }); + + test('closes output immediately', () { + fakeAsync((async) { + listen(); + values.add(1); + async.elapse(const Duration(milliseconds: 6)); + values + ..add(2) + ..close(); + async.flushMicrotasks(); + expect(isDone, true); + }); + }); + + if (streamType == 'broadcast') { + test('multiple listeners all get values', () { + fakeAsync((async) { + listen(); + var otherValues = <int>[]; + transformed.listen(otherValues.add); + values.add(1); + async.flushMicrotasks(); + expect(emittedValues, [1]); + expect(otherValues, [1]); + }); + }); + } + }); + + group('throttle - trailing: true', () { + setUp(() async { + valuesCanceled = false; + values = createController(streamType) + ..onCancel = () { + valuesCanceled = true; + }; + emittedValues = []; + isDone = false; + transformed = values.stream + .throttle(const Duration(milliseconds: 5), trailing: true); + }); + void listen() { + subscription = transformed.listen(emittedValues.add, onDone: () { + isDone = true; + }); + } + + test('emits both first and last in a period', () { + fakeAsync((async) { + listen(); + values + ..add(1) + ..add(2) + ..close(); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [1, 2]); + }); + }); + + test('swallows values that are not the latest in a period', () { + fakeAsync((async) { + listen(); + values + ..add(1) + ..add(2) + ..add(3) + ..close(); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [1, 3]); + }); + }); + + test('waits to output the last value even if the stream closes', + () async { + fakeAsync((async) { + listen(); + values + ..add(1) + ..add(2) + ..close(); + async.flushMicrotasks(); + expect(isDone, false); + expect(emittedValues, [1], + reason: 'Should not be emitted until after duration'); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [1, 2]); + expect(isDone, true); + async.elapse(const Duration(milliseconds: 6)); + }); + }); + + test('closes immediately if there is no pending value', () { + fakeAsync((async) { + listen(); + values + ..add(1) + ..close(); + async.flushMicrotasks(); + expect(isDone, true); + }); + }); + + if (streamType == 'broadcast') { + test('multiple listeners all get values', () { + fakeAsync((async) { + listen(); + var otherValues = <int>[]; + transformed.listen(otherValues.add); + values + ..add(1) + ..add(2); + async.flushMicrotasks(); + expect(emittedValues, [1]); + expect(otherValues, [1]); + async.elapse(const Duration(milliseconds: 6)); + expect(emittedValues, [1, 2]); + expect(otherValues, [1, 2]); + }); + }); + } + }); + }); + } +}
diff --git a/pkgs/stream_transform/test/utils.dart b/pkgs/stream_transform/test/utils.dart new file mode 100644 index 0000000..42d9613 --- /dev/null +++ b/pkgs/stream_transform/test/utils.dart
@@ -0,0 +1,19 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +StreamController<T> createController<T>(String streamType) { + switch (streamType) { + case 'single subscription': + return StreamController<T>(); + case 'broadcast': + return StreamController<T>.broadcast(); + default: + throw ArgumentError.value( + streamType, 'streamType', 'Must be one of $streamTypes'); + } +} + +const streamTypes = ['single subscription', 'broadcast'];
diff --git a/pkgs/stream_transform/test/where_not_null_test.dart b/pkgs/stream_transform/test/where_not_null_test.dart new file mode 100644 index 0000000..c9af794 --- /dev/null +++ b/pkgs/stream_transform/test/where_not_null_test.dart
@@ -0,0 +1,56 @@ +// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +void main() { + test('forwards only events that match the type', () async { + var values = Stream.fromIterable([null, 'a', null, 'b']); + var filtered = values.whereNotNull(); + expect(await filtered.toList(), ['a', 'b']); + }); + + test('can result in empty stream', () async { + var values = Stream<Object?>.fromIterable([null, null]); + var filtered = values.whereNotNull(); + expect(await filtered.isEmpty, true); + }); + + test('forwards values to multiple listeners', () async { + var values = StreamController<Object?>.broadcast(); + var filtered = values.stream.whereNotNull(); + var firstValues = <Object>[]; + var secondValues = <Object>[]; + filtered + ..listen(firstValues.add) + ..listen(secondValues.add); + values + ..add(null) + ..add('a') + ..add(null) + ..add('b'); + await Future(() {}); + expect(firstValues, ['a', 'b']); + expect(secondValues, ['a', 'b']); + }); + + test('closes streams with multiple listeners', () async { + var values = StreamController<Object?>.broadcast(); + var filtered = values.stream.whereNotNull(); + var firstDone = false; + var secondDone = false; + filtered + ..listen(null, onDone: () => firstDone = true) + ..listen(null, onDone: () => secondDone = true); + values + ..add(null) + ..add('a'); + await values.close(); + expect(firstDone, true); + expect(secondDone, true); + }); +}
diff --git a/pkgs/stream_transform/test/where_type_test.dart b/pkgs/stream_transform/test/where_type_test.dart new file mode 100644 index 0000000..4cbea37 --- /dev/null +++ b/pkgs/stream_transform/test/where_type_test.dart
@@ -0,0 +1,56 @@ +// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:stream_transform/stream_transform.dart'; +import 'package:test/test.dart'; + +void main() { + test('forwards only events that match the type', () async { + var values = Stream.fromIterable([1, 'a', 2, 'b']); + var filtered = values.whereType<String>(); + expect(await filtered.toList(), ['a', 'b']); + }); + + test('can result in empty stream', () async { + var values = Stream.fromIterable([1, 2, 3, 4]); + var filtered = values.whereType<String>(); + expect(await filtered.isEmpty, true); + }); + + test('forwards values to multiple listeners', () async { + var values = StreamController<Object>.broadcast(); + var filtered = values.stream.whereType<String>(); + var firstValues = <Object>[]; + var secondValues = <Object>[]; + filtered + ..listen(firstValues.add) + ..listen(secondValues.add); + values + ..add(1) + ..add('a') + ..add(2) + ..add('b'); + await Future(() {}); + expect(firstValues, ['a', 'b']); + expect(secondValues, ['a', 'b']); + }); + + test('closes streams with multiple listeners', () async { + var values = StreamController<Object>.broadcast(); + var filtered = values.stream.whereType<String>(); + var firstDone = false; + var secondDone = false; + filtered + ..listen(null, onDone: () => firstDone = true) + ..listen(null, onDone: () => secondDone = true); + values + ..add(1) + ..add('a'); + await values.close(); + expect(firstDone, true); + expect(secondDone, true); + }); +}