Improve docs for switchMap (dart-lang/stream_transform#109)
- Expand the docs for `switchMap` and add caveats.
- Crosslink between `switchMap` and `concurrentAsyncMap`.
- Add test for erroring convert callback in `switchMap`
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 5a5ea2a..73fc368 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,5 +1,7 @@
## 1.2.1-dev
+* Improve tests of `switchMap` and improve documentation with links and clarification.
+
## 1.2.0
- Add support for emitting the "leading" event in `debounce`.
diff --git a/pkgs/stream_transform/lib/src/merge.dart b/pkgs/stream_transform/lib/src/merge.dart
index f654d35..afc2da4 100644
--- a/pkgs/stream_transform/lib/src/merge.dart
+++ b/pkgs/stream_transform/lib/src/merge.dart
@@ -4,6 +4,8 @@
import 'dart:async';
+import 'package:stream_transform/src/switch.dart';
+
/// Utilities to interleave events from multiple streams.
extension Merge<T> on Stream<T> {
/// Returns a stream which emits values and errors from the source stream and
@@ -56,7 +58,7 @@
Stream<T> mergeAll(Iterable<Stream<T>> others) => transform(_Merge(others));
/// Like [asyncExpand] but the [convert] callback may be called for an element
- /// before the Stream emitted by the previous element has closed.
+ /// before the [Stream] emitted by the previous element has closed.
///
/// Events on the result stream will be emitted in the order they are emitted
/// by the sub streams, which may not match the order of the original stream.
@@ -67,7 +69,7 @@
/// The result stream will not close until the source stream closes and all
/// sub streams have closed.
///
- /// If the source stream is a broadcast stream the result will be as well,
+ /// If the source stream is a broadcast stream, the result will be as well,
/// regardless of the types of streams created by [convert]. In this case,
/// some care should be taken:
/// - If [convert] returns a single subscription stream it may be listened to
@@ -76,6 +78,11 @@
/// stream, any sub streams from previously emitted events will be ignored,
/// regardless of whether they emit further events after a listener is added
/// back.
+ ///
+ /// See also:
+ ///
+ /// * [switchMap], which cancels subscriptions to the previous sub
+ /// stream instead of concurrently emitting events from all sub streams.
Stream<S> concurrentAsyncExpand<S>(Stream<S> Function(T) convert) =>
map(convert).transform(_MergeExpanded());
}
diff --git a/pkgs/stream_transform/lib/src/switch.dart b/pkgs/stream_transform/lib/src/switch.dart
index 5125d57..dcd2431 100644
--- a/pkgs/stream_transform/lib/src/switch.dart
+++ b/pkgs/stream_transform/lib/src/switch.dart
@@ -4,6 +4,8 @@
import 'dart:async';
+import 'package:stream_transform/src/merge.dart';
+
/// A utility to take events from the most recent sub stream returned by a
/// callback.
extension Switch<T> on Stream<T> {
@@ -12,9 +14,30 @@
///
/// When the source emits a value it will be converted to a [Stream] using
/// [convert] and the output will switch to emitting events from that result.
+ /// Like [asyncExpand] but the [Stream] emitted by a previous element
+ /// will be ignored as soon as the source stream emits a new event.
///
- /// If the source stream is a broadcast stream, the result stream will be as
- /// well, regardless of the types of the streams produced by [convert].
+ /// This means that the source stream is not paused until a sub stream
+ /// returned from the [convert] callback is done. Instead, the subscription
+ /// to the sub stream is canceled as soon as the source stream emits a new event.
+ ///
+ /// Errors from [convert], the source stream, or any of the sub streams are
+ /// forwarded to the result stream.
+ ///
+ /// The result stream will not close until the source stream closes and
+ /// the current sub stream have closed.
+ ///
+ /// If the source stream is a broadcast stream, the result will be as well,
+ /// regardless of the types of streams created by [convert]. In this case,
+ /// some care should be taken:
+ ///
+ /// * If [convert] returns a single subscription stream it may be listened to
+ /// and never canceled.
+ ///
+ /// See also:
+ ///
+ /// * [concurrentAsyncExpand], which emits events from all sub streams
+ /// concurrently instead of cancelling subscriptions to previous subs streams.
Stream<S> switchMap<S>(Stream<S> Function(T) convert) {
return map(convert).switchLatest();
}
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index 8279c44..c566c16 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -1,6 +1,6 @@
name: stream_transform
description: A collection of utilities to transform and manipulate streams.
-homepage: https://www.github.com/dart-lang/stream_transform
+homepage: https://github.com/dart-lang/stream_transform
version: 1.2.1-dev
environment:
diff --git a/pkgs/stream_transform/test/switch_test.dart b/pkgs/stream_transform/test/switch_test.dart
index 40030f0..16deb4b 100644
--- a/pkgs/stream_transform/test/switch_test.dart
+++ b/pkgs/stream_transform/test/switch_test.dart
@@ -145,18 +145,29 @@
expect(transformed.isBroadcast, true);
});
- });
- test('handles null response from cancel', () async {
- var outer = StreamController<Stream<int>>();
- var inner = StreamController<int>();
+ test('handles null response from cancel', () async {
+ var outer = StreamController<Stream<int>>();
+ var inner = StreamController<int>();
- var subscription =
- NullOnCancelStream(outer.stream).switchLatest().listen(null);
+ var subscription =
+ NullOnCancelStream(outer.stream).switchLatest().listen(null);
- outer.add(NullOnCancelStream(inner.stream));
- await Future<void>(() {});
+ outer.add(NullOnCancelStream(inner.stream));
+ await Future<void>(() {});
- await subscription.cancel();
+ await subscription.cancel();
+ });
+
+ test('forwards errors from the convert callback', () async {
+ var errors = <String>[];
+ var source = Stream.fromIterable([1, 2, 3]);
+ source.switchMap((i) {
+ // ignore: only_throw_errors
+ throw 'Error: $i';
+ }).listen((_) {}, onError: errors.add);
+ await Future<void>(() {});
+ expect(errors, ['Error: 1', 'Error: 2', 'Error: 3']);
+ });
});
}