Wait for cancel before next listen in switchLatest (dart-lang/stream_transform#137)
If a stream is performing resource cleanup on cancel it may be a problem
to listen to the next stream and start consuming resources before the
cleanup is done. Propagate errors during the cancel to the result stream.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 5a59f6f..e1ea482 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 2.0.1-dev
+
+- Wait for the future returned from `StreamSubscription.cancel()` before
+ listening to the subsequent stream in `switchLatest` and `switchMap`.
+
## 2.0.0
- Migrate to null safety.
diff --git a/pkgs/stream_transform/lib/src/switch.dart b/pkgs/stream_transform/lib/src/switch.dart
index 17fbb81..53a8394 100644
--- a/pkgs/stream_transform/lib/src/switch.dart
+++ b/pkgs/stream_transform/lib/src/switch.dart
@@ -47,11 +47,12 @@
extension SwitchLatest<T> on Stream<Stream<T>> {
/// Emits values from the most recently emitted Stream.
///
- /// When the source emits a stream the output will switch to emitting events
+ /// When the source emits a stream, the output will switch to emitting events
/// from that stream.
///
- /// If the source stream is a broadcast stream, the result stream will be as
- /// well, regardless of the types of streams emitted.
+ /// Whether the source stream is a single-subscription stream or a
+ /// broadcast stream, the result stream will be the same kind of stream,
+ /// regardless of the types of streams emitted.
Stream<T> switchLatest() {
var controller = isBroadcast
? StreamController<T>.broadcast(sync: true)
@@ -61,20 +62,56 @@
StreamSubscription<T>? innerSubscription;
var outerStreamDone = false;
- final outerSubscription = listen(
- (innerStream) {
- innerSubscription?.cancel();
- innerSubscription = innerStream.listen(controller.add,
- onError: controller.addError, onDone: () {
- innerSubscription = null;
- if (outerStreamDone) controller.close();
+ void listenToInnerStream(Stream<T> innerStream) {
+ assert(innerSubscription == null);
+ var subscription = innerStream
+ .listen(controller.add, onError: controller.addError, onDone: () {
+ innerSubscription = null;
+ if (outerStreamDone) controller.close();
+ });
+ // If a pause happens during an innerSubscription.cancel,
+ // we still listen to the next stream when the cancel is done.
+ // Then we immediately pause it again here.
+ if (controller.isPaused) subscription.pause();
+ innerSubscription = subscription;
+ }
+
+ var addError = controller.addError;
+ final outerSubscription = listen(null, onError: addError, onDone: () {
+ outerStreamDone = true;
+ if (innerSubscription == null) controller.close();
+ });
+ outerSubscription.onData((innerStream) {
+ var currentSubscription = innerSubscription;
+ if (currentSubscription != null) {
+ innerSubscription = null;
+ try {
+ currentSubscription.cancel().catchError(addError).whenComplete(() {
+ if (!isBroadcast && !controller.hasListener) {
+ // Result single-subscription stream subscription was cancelled
+ // while waiting for previous innerStream cancel.
+ //
+ // Ensure that the last received stream is also listened to and
+ // cancelled, then do nothing further.
+ // TODO(lrn): When SDK 2.14 is available, use `.ignore()`.
+ innerStream
+ .listen(null)
+ .cancel()
+ .then(_ignore, onError: _ignore);
+ return;
+ }
+ outerSubscription.resume();
+ listenToInnerStream(innerStream);
});
- },
- onError: controller.addError,
- onDone: () {
- outerStreamDone = true;
- if (innerSubscription == null) controller.close();
- });
+ outerSubscription.pause();
+ return;
+ } catch (error, stack) {
+ // The cancel call threw synchronously.
+ controller.addError(error, stack);
+ }
+ }
+ listenToInnerStream(innerStream);
+ });
if (!isBroadcast) {
controller
..onPause = () {
@@ -87,16 +124,20 @@
};
}
controller.onCancel = () {
+ var _innerSubscription = innerSubscription;
var cancels = [
if (!outerStreamDone) outerSubscription.cancel(),
- if (innerSubscription != null) innerSubscription!.cancel(),
+ if (_innerSubscription != null) _innerSubscription.cancel(),
]
// Handle opt-out nulls
..removeWhere((Object? f) => f == null);
if (cancels.isEmpty) return null;
- return Future.wait(cancels).then((_) => null);
+ return Future.wait(cancels).then(_ignore);
};
};
return controller.stream;
}
}
+
+/// Helper function to ignore future callback
+void _ignore(_, [__]) {}
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index 98086c5..6e90b66 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -1,10 +1,10 @@
name: stream_transform
description: A collection of utilities to transform and manipulate streams.
repository: https://github.com/dart-lang/stream_transform
-version: 2.0.0
+version: 2.0.1-dev
environment:
- sdk: ">=2.12.0-0 <3.0.0"
+ sdk: ">=2.12.0 <3.0.0"
dev_dependencies:
async: ^2.5.0
diff --git a/pkgs/stream_transform/test/switch_test.dart b/pkgs/stream_transform/test/switch_test.dart
index e35cb0a..00f196a 100644
--- a/pkgs/stream_transform/test/switch_test.dart
+++ b/pkgs/stream_transform/test/switch_test.dart
@@ -15,6 +15,7 @@
group('Outer type: [$outerType], Inner type: [$innerType]', () {
late StreamController<int> first;
late StreamController<int> second;
+ late StreamController<int> third;
late StreamController<Stream<int>> outer;
late List<int> emittedValues;
@@ -36,6 +37,7 @@
firstCanceled = true;
};
second = createController(innerType);
+ third = createController(innerType);
emittedValues = [];
errors = [];
isDone = false;
@@ -113,6 +115,69 @@
expect(firstCanceled, true);
});
+ if (innerType != 'broadcast') {
+ test('waits for cancel before listening to subsequent stream',
+ () async {
+ var cancelWork = Completer<void>();
+ first.onCancel = () => cancelWork.future;
+ outer.add(first.stream);
+ await Future(() {});
+
+ var cancelDone = false;
+ second.onListen = expectAsync0(() {
+ expect(cancelDone, true);
+ });
+ outer.add(second.stream);
+ await Future(() {});
+ cancelWork.complete();
+ cancelDone = true;
+ });
+
+ test('all streams are listened to, even while cancelling', () async {
+ var cancelWork = Completer<void>();
+ first.onCancel = () => cancelWork.future;
+ outer.add(first.stream);
+ await Future(() {});
+
+ var cancelDone = false;
+ second.onListen = expectAsync0(() {
+ expect(cancelDone, true);
+ });
+ third.onListen = expectAsync0(() {
+ expect(cancelDone, true);
+ });
+ outer
+ ..add(second.stream)
+ ..add(third.stream);
+ await Future(() {});
+ cancelWork.complete();
+ cancelDone = true;
+ });
+ }
+
+ if (outerType != 'broadcast' && innerType != 'broadcast') {
+ test('pausing while cancelling an inner stream is respected',
+ () async {
+ var cancelWork = Completer<void>();
+ first.onCancel = () => cancelWork.future;
+ outer.add(first.stream);
+ await Future(() {});
+
+ var cancelDone = false;
+ second.onListen = expectAsync0(() {
+ expect(cancelDone, true);
+ });
+ outer.add(second.stream);
+ await Future(() {});
+ subscription.pause();
+ cancelWork.complete();
+ cancelDone = true;
+ await Future(() {});
+ expect(second.isPaused, true);
+ subscription.resume();
+ });
+ }
+
test('cancels listener on current and outer stream on cancel',
() async {
outer.add(first.stream);