Wait for cancel before next listen in switchLatest (dart-lang/stream_transform#137)

If a stream is performing resource cleanup on cancel it may be a problem
to listen to the next stream and start consuming resources before the
cleanup is done. Propagate errors during the cancel to the result stream.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 5a59f6f..e1ea482 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 2.0.1-dev
+
+- Wait for the future returned from `StreamSubscription.cancel()` before
+  listening to the subsequent stream in `switchLatest` and `switchMap`.
+
 ## 2.0.0
 
 - Migrate to null safety.
diff --git a/pkgs/stream_transform/lib/src/switch.dart b/pkgs/stream_transform/lib/src/switch.dart
index 17fbb81..53a8394 100644
--- a/pkgs/stream_transform/lib/src/switch.dart
+++ b/pkgs/stream_transform/lib/src/switch.dart
@@ -47,11 +47,12 @@
 extension SwitchLatest<T> on Stream<Stream<T>> {
   /// Emits values from the most recently emitted Stream.
   ///
-  /// When the source emits a stream the output will switch to emitting events
+  /// When the source emits a stream, the output will switch to emitting events
   /// from that stream.
   ///
-  /// If the source stream is a broadcast stream, the result stream will be as
-  /// well, regardless of the types of streams emitted.
+  /// Whether the source stream is a single-subscription stream or a
+  /// broadcast stream, the result stream will be the same kind of stream,
+  /// regardless of the types of streams emitted.
   Stream<T> switchLatest() {
     var controller = isBroadcast
         ? StreamController<T>.broadcast(sync: true)
@@ -61,20 +62,56 @@
       StreamSubscription<T>? innerSubscription;
       var outerStreamDone = false;
 
-      final outerSubscription = listen(
-          (innerStream) {
-            innerSubscription?.cancel();
-            innerSubscription = innerStream.listen(controller.add,
-                onError: controller.addError, onDone: () {
-              innerSubscription = null;
-              if (outerStreamDone) controller.close();
+      void listenToInnerStream(Stream<T> innerStream) {
+        assert(innerSubscription == null);
+        var subscription = innerStream
+            .listen(controller.add, onError: controller.addError, onDone: () {
+          innerSubscription = null;
+          if (outerStreamDone) controller.close();
+        });
+        // If a pause happens during an innerSubscription.cancel,
+        // we still listen to the next stream when the cancel is done.
+        // Then we immediately pause it again here.
+        if (controller.isPaused) subscription.pause();
+        innerSubscription = subscription;
+      }
+
+      var addError = controller.addError;
+      final outerSubscription = listen(null, onError: addError, onDone: () {
+        outerStreamDone = true;
+        if (innerSubscription == null) controller.close();
+      });
+      outerSubscription.onData((innerStream) {
+        var currentSubscription = innerSubscription;
+        if (currentSubscription != null) {
+          innerSubscription = null;
+          try {
+            currentSubscription.cancel().catchError(addError).whenComplete(() {
+              if (!isBroadcast && !controller.hasListener) {
+                // Result single-subscription stream subscription was cancelled
+                // while waiting for previous innerStream cancel.
+                //
+                // Ensure that the last received stream is also listened to and
+                // cancelled, then do nothing further.
+                // TODO(lrn): When SDK 2.14 is available, use `.ignore()`.
+                innerStream
+                    .listen(null)
+                    .cancel()
+                    .then(_ignore, onError: _ignore);
+                return;
+              }
+              outerSubscription.resume();
+              listenToInnerStream(innerStream);
             });
-          },
-          onError: controller.addError,
-          onDone: () {
-            outerStreamDone = true;
-            if (innerSubscription == null) controller.close();
-          });
+            outerSubscription.pause();
+            return;
+          } catch (error, stack) {
+            // The cancel call threw synchronously.
+            controller.addError(error, stack);
+          }
+        }
+        listenToInnerStream(innerStream);
+      });
       if (!isBroadcast) {
         controller
           ..onPause = () {
@@ -87,16 +124,20 @@
           };
       }
       controller.onCancel = () {
+        var _innerSubscription = innerSubscription;
         var cancels = [
           if (!outerStreamDone) outerSubscription.cancel(),
-          if (innerSubscription != null) innerSubscription!.cancel(),
+          if (_innerSubscription != null) _innerSubscription.cancel(),
         ]
           // Handle opt-out nulls
           ..removeWhere((Object? f) => f == null);
         if (cancels.isEmpty) return null;
-        return Future.wait(cancels).then((_) => null);
+        return Future.wait(cancels).then(_ignore);
       };
     };
     return controller.stream;
   }
 }
+
+/// Helper function to ignore future callback
+void _ignore(_, [__]) {}
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index 98086c5..6e90b66 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -1,10 +1,10 @@
 name: stream_transform
 description: A collection of utilities to transform and manipulate streams.
 repository: https://github.com/dart-lang/stream_transform
-version: 2.0.0
+version: 2.0.1-dev
 
 environment:
-  sdk: ">=2.12.0-0 <3.0.0"
+  sdk: ">=2.12.0 <3.0.0"
 
 dev_dependencies:
   async: ^2.5.0
diff --git a/pkgs/stream_transform/test/switch_test.dart b/pkgs/stream_transform/test/switch_test.dart
index e35cb0a..00f196a 100644
--- a/pkgs/stream_transform/test/switch_test.dart
+++ b/pkgs/stream_transform/test/switch_test.dart
@@ -15,6 +15,7 @@
       group('Outer type: [$outerType], Inner type: [$innerType]', () {
         late StreamController<int> first;
         late StreamController<int> second;
+        late StreamController<int> third;
         late StreamController<Stream<int>> outer;
 
         late List<int> emittedValues;
@@ -36,6 +37,7 @@
               firstCanceled = true;
             };
           second = createController(innerType);
+          third = createController(innerType);
           emittedValues = [];
           errors = [];
           isDone = false;
@@ -113,6 +115,69 @@
           expect(firstCanceled, true);
         });
 
+        if (innerType != 'broadcast') {
+          test('waits for cancel before listening to subsequent stream',
+              () async {
+            var cancelWork = Completer<void>();
+            first.onCancel = () => cancelWork.future;
+            outer.add(first.stream);
+            await Future(() {});
+
+            var cancelDone = false;
+            second.onListen = expectAsync0(() {
+              expect(cancelDone, true);
+            });
+            outer.add(second.stream);
+            await Future(() {});
+            cancelWork.complete();
+            cancelDone = true;
+          });
+
+          test('all streams are listened to, even while cancelling', () async {
+            var cancelWork = Completer<void>();
+            first.onCancel = () => cancelWork.future;
+            outer.add(first.stream);
+            await Future(() {});
+
+            var cancelDone = false;
+            second.onListen = expectAsync0(() {
+              expect(cancelDone, true);
+            });
+            third.onListen = expectAsync0(() {
+              expect(cancelDone, true);
+            });
+            outer
+              ..add(second.stream)
+              ..add(third.stream);
+            await Future(() {});
+            cancelWork.complete();
+            cancelDone = true;
+          });
+        }
+
+        if (outerType != 'broadcast' && innerType != 'broadcast') {
+          test('pausing while cancelling an inner stream is respected',
+              () async {
+            var cancelWork = Completer<void>();
+            first.onCancel = () => cancelWork.future;
+            outer.add(first.stream);
+            await Future(() {});
+
+            var cancelDone = false;
+            second.onListen = expectAsync0(() {
+              expect(cancelDone, true);
+            });
+            outer.add(second.stream);
+            await Future(() {});
+            subscription.pause();
+            cancelWork.complete();
+            cancelDone = true;
+            await Future(() {});
+            expect(second.isPaused, true);
+            subscription.resume();
+          });
+        }
+
         test('cancels listener on current and outer stream on cancel',
             () async {
           outer.add(first.stream);