Handle `null` returns from cancelled subscriptions (dart-lang/stream_transform#97)

A call to `StreamSubscription.cancel` is allowed to return `null`,
though it never does using the `StreamController` from the SDK. If a
custom Stream implementation does synchronously cancel and return `null`
it could cause problems in a number of transformers which would attempt
to `Future.wait` on the results of the cancels.

- Update the places using `Future.wait` to filter out null results
  first.
- Refactor some places to use collection literals and conditional
  elements.
- Filter out nulls first, and if every canceled subscription was
  synchronous skip the call to `Future.wait` entirely.
- Add a custom Stream implementation which always returns `null` when
  it's subscription is canceled as a test utility.
- Add tests that would fail without the null filtering on each
  implementation that was updated.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 686f919..5342392 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,3 +1,10 @@
+## 1.1.1
+
+-   Fix a bug in `asyncMapSample`, `buffer`, `combineLatest`,
+    `combineLatestAll`, `merge`, and `mergeAll` which would cause an exception
+    when cancelling a subscription after using the transformer if the original
+    stream(s) returned `null` from cancelling their subscriptions.
+
 ## 1.1.0
 
 -   Add `concurrentAsyncExpand` to interleave events emitted by multiple sub
diff --git a/pkgs/stream_transform/lib/src/aggregate_sample.dart b/pkgs/stream_transform/lib/src/aggregate_sample.dart
index 5ddeb10..88fc490 100644
--- a/pkgs/stream_transform/lib/src/aggregate_sample.dart
+++ b/pkgs/stream_transform/lib/src/aggregate_sample.dart
@@ -107,8 +107,10 @@
         } else {
           triggerSub.pause();
         }
-        if (toCancel.isEmpty) return null;
-        return Future.wait(toCancel.map((s) => s.cancel()));
+        var cancels =
+            toCancel.map((s) => s.cancel()).where((f) => f != null).toList();
+        if (cancels.isEmpty) return null;
+        return Future.wait(cancels).then((_) => null);
       };
     };
     return controller.stream;
diff --git a/pkgs/stream_transform/lib/src/combine_latest.dart b/pkgs/stream_transform/lib/src/combine_latest.dart
index 7fee05d..303b16a 100644
--- a/pkgs/stream_transform/lib/src/combine_latest.dart
+++ b/pkgs/stream_transform/lib/src/combine_latest.dart
@@ -175,11 +175,11 @@
           };
       }
       controller.onCancel = () {
-        var cancelSource = sourceSubscription.cancel();
-        var cancelOther = otherSubscription.cancel();
+        var cancels = [sourceSubscription.cancel(), otherSubscription.cancel()]
+            .where((f) => f != null);
         sourceSubscription = null;
         otherSubscription = null;
-        return Future.wait([cancelSource, cancelOther]);
+        return Future.wait(cancels).then((_) => null);
       };
     };
     return controller.stream;
@@ -249,8 +249,12 @@
           };
       }
       controller.onCancel = () {
-        if (subscriptions.isEmpty) return null;
-        return Future.wait(subscriptions.map((s) => s.cancel()));
+        var cancels = subscriptions
+            .map((s) => s.cancel())
+            .where((f) => f != null)
+            .toList();
+        if (cancels.isEmpty) return null;
+        return Future.wait(cancels).then((_) => null);
       };
     };
     return controller.stream;
diff --git a/pkgs/stream_transform/lib/src/merge.dart b/pkgs/stream_transform/lib/src/merge.dart
index 99d19c9..f654d35 100644
--- a/pkgs/stream_transform/lib/src/merge.dart
+++ b/pkgs/stream_transform/lib/src/merge.dart
@@ -124,8 +124,12 @@
           };
       }
       controller.onCancel = () {
-        if (subscriptions.isEmpty) return null;
-        return Future.wait(subscriptions.map((s) => s.cancel()));
+        var cancels = subscriptions
+            .map((s) => s.cancel())
+            .where((f) => f != null)
+            .toList();
+        if (cancels.isEmpty) return null;
+        return Future.wait(cancels).then((_) => null);
       };
     };
     return controller.stream;
@@ -172,7 +176,12 @@
           };
       }
       controller.onCancel = () {
-        return Future.wait(subscriptions.map((s) => s.cancel()));
+        var cancels = subscriptions
+            .map((s) => s.cancel())
+            .where((f) => f != null)
+            .toList();
+        if (cancels.isEmpty) return null;
+        return Future.wait(cancels).then((_) => null);
       };
     };
     return controller.stream;
diff --git a/pkgs/stream_transform/lib/src/switch.dart b/pkgs/stream_transform/lib/src/switch.dart
index b30556a..5125d57 100644
--- a/pkgs/stream_transform/lib/src/switch.dart
+++ b/pkgs/stream_transform/lib/src/switch.dart
@@ -41,15 +41,11 @@
         ? StreamController<T>.broadcast(sync: true)
         : StreamController<T>(sync: true);
 
-    StreamSubscription<Stream<T>> outerSubscription;
-
     controller.onListen = () {
-      assert(outerSubscription == null);
-
       StreamSubscription<T> innerSubscription;
       var outerStreamDone = false;
 
-      outerSubscription = outer.listen(
+      final outerSubscription = outer.listen(
           (innerStream) {
             innerSubscription?.cancel();
             innerSubscription = innerStream.listen(controller.add,
@@ -75,15 +71,12 @@
           };
       }
       controller.onCancel = () {
-        var toCancel = <StreamSubscription<void>>[];
-        if (!outerStreamDone) toCancel.add(outerSubscription);
-        if (innerSubscription != null) {
-          toCancel.add(innerSubscription);
-        }
-        outerSubscription = null;
-        innerSubscription = null;
-        if (toCancel.isEmpty) return null;
-        return Future.wait(toCancel.map((s) => s.cancel()));
+        var cancels = [
+          if (!outerStreamDone) outerSubscription.cancel(),
+          if (innerSubscription != null) innerSubscription.cancel(),
+        ].where((f) => f != null);
+        if (cancels.isEmpty) return null;
+        return Future.wait(cancels).then((_) => null);
       };
     };
     return controller.stream;
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index 1c8d88e..62a7fa6 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -1,11 +1,12 @@
 name: stream_transform
 description: A collection of utilities to transform and manipulate streams.
 homepage: https://www.github.com/dart-lang/stream_transform
-version: 1.1.1-dev
+version: 1.1.1
 
 environment:
   sdk: ">=2.6.0 <3.0.0"
 
 dev_dependencies:
+  async: ^2.0.0
   pedantic: ^1.5.0
   test: ^1.0.0
diff --git a/pkgs/stream_transform/test/async_map_sample_test.dart b/pkgs/stream_transform/test/async_map_sample_test.dart
index 119e0c8..9c37cd4 100644
--- a/pkgs/stream_transform/test/async_map_sample_test.dart
+++ b/pkgs/stream_transform/test/async_map_sample_test.dart
@@ -200,4 +200,13 @@
       }
     });
   }
+  test('handles null response from cancel', () async {
+    var controller = StreamController<int>();
+
+    var subscription = NullOnCancelStream(controller.stream)
+        .asyncMapSample((_) async {})
+        .listen(null);
+
+    await subscription.cancel();
+  });
 }
diff --git a/pkgs/stream_transform/test/buffer_test.dart b/pkgs/stream_transform/test/buffer_test.dart
index 5800586..80a793c 100644
--- a/pkgs/stream_transform/test/buffer_test.dart
+++ b/pkgs/stream_transform/test/buffer_test.dart
@@ -243,4 +243,13 @@
       ]);
     });
   }
+
+  test('handles null response from cancel', () async {
+    var controller = StreamController<int>();
+    var trigger = StreamController<void>();
+    var subscription = NullOnCancelStream(controller.stream)
+        .buffer(trigger.stream)
+        .listen(null);
+    await subscription.cancel();
+  });
 }
diff --git a/pkgs/stream_transform/test/combine_latest_all_test.dart b/pkgs/stream_transform/test/combine_latest_all_test.dart
index 2943449..d57d618 100644
--- a/pkgs/stream_transform/test/combine_latest_all_test.dart
+++ b/pkgs/stream_transform/test/combine_latest_all_test.dart
@@ -8,6 +8,8 @@
 
 import 'package:stream_transform/stream_transform.dart';
 
+import 'utils.dart';
+
 Future<void> tick() => Future(() {});
 
 void main() {
@@ -164,4 +166,14 @@
       });
     });
   });
+
+  test('handles null response from cancel', () async {
+    var source = StreamController<int>();
+    var other = StreamController<int>();
+
+    var subscription = NullOnCancelStream(source.stream)
+        .combineLatestAll([NullOnCancelStream(other.stream)]).listen(null);
+
+    await subscription.cancel();
+  });
 }
diff --git a/pkgs/stream_transform/test/combine_latest_test.dart b/pkgs/stream_transform/test/combine_latest_test.dart
index 2b35e2c..4ea9499 100644
--- a/pkgs/stream_transform/test/combine_latest_test.dart
+++ b/pkgs/stream_transform/test/combine_latest_test.dart
@@ -9,6 +9,8 @@
 
 import 'package:stream_transform/stream_transform.dart';
 
+import 'utils.dart';
+
 void main() {
   group('combineLatest', () {
     test('flows through combine callback', () async {
@@ -168,6 +170,17 @@
       });
     });
   });
+
+  test('handles null response from cancel', () async {
+    var source = StreamController<int>();
+    var other = StreamController<int>();
+
+    var subscription = NullOnCancelStream(source.stream)
+        .combineLatest(NullOnCancelStream(other.stream), (a, b) => null)
+        .listen(null);
+
+    await subscription.cancel();
+  });
 }
 
 class _NumberedException implements Exception {
diff --git a/pkgs/stream_transform/test/concurrent_async_expand_test.dart b/pkgs/stream_transform/test/concurrent_async_expand_test.dart
index f843c6f..9ed02d0 100644
--- a/pkgs/stream_transform/test/concurrent_async_expand_test.dart
+++ b/pkgs/stream_transform/test/concurrent_async_expand_test.dart
@@ -184,4 +184,19 @@
       });
     }
   }
+
+  test('hendles null response from cancel', () async {
+    var source = StreamController<int>();
+    var other = StreamController<int>();
+
+    var subscription = NullOnCancelStream(source.stream)
+        .concurrentAsyncExpand((_) => NullOnCancelStream(other.stream))
+        .listen(null);
+
+    source.add(1);
+
+    await Future<void>(() {});
+
+    await subscription.cancel();
+  });
 }
diff --git a/pkgs/stream_transform/test/merge_test.dart b/pkgs/stream_transform/test/merge_test.dart
index 24cd76b..2eef5d1 100644
--- a/pkgs/stream_transform/test/merge_test.dart
+++ b/pkgs/stream_transform/test/merge_test.dart
@@ -8,6 +8,8 @@
 
 import 'package:stream_transform/stream_transform.dart';
 
+import 'utils.dart';
+
 void main() {
   group('merge', () {
     test('includes all values', () async {
@@ -138,4 +140,15 @@
       expect(secondListenerValues, [1, 2, 3, 4, 5, 6]);
     });
   });
+
+  test('handles null response rom cancel', () async {
+    var source = StreamController<int>();
+    var other = StreamController<int>();
+
+    var subscription = NullOnCancelStream(source.stream)
+        .merge(NullOnCancelStream(other.stream))
+        .listen(null);
+
+    await subscription.cancel();
+  });
 }
diff --git a/pkgs/stream_transform/test/switch_test.dart b/pkgs/stream_transform/test/switch_test.dart
index ba26503..40030f0 100644
--- a/pkgs/stream_transform/test/switch_test.dart
+++ b/pkgs/stream_transform/test/switch_test.dart
@@ -146,4 +146,17 @@
       expect(transformed.isBroadcast, true);
     });
   });
+
+  test('handles null response from cancel', () async {
+    var outer = StreamController<Stream<int>>();
+    var inner = StreamController<int>();
+
+    var subscription =
+        NullOnCancelStream(outer.stream).switchLatest().listen(null);
+
+    outer.add(NullOnCancelStream(inner.stream));
+    await Future<void>(() {});
+
+    await subscription.cancel();
+  });
 }
diff --git a/pkgs/stream_transform/test/utils.dart b/pkgs/stream_transform/test/utils.dart
index b6196d6..4aa5837 100644
--- a/pkgs/stream_transform/test/utils.dart
+++ b/pkgs/stream_transform/test/utils.dart
@@ -4,6 +4,8 @@
 
 import 'dart:async';
 
+import 'package:async/async.dart';
+
 /// Cycle the event loop to ensure timers are started, then wait for a delay
 /// longer than [milliseconds] to allow for the timer to fire.
 Future<void> waitForTimer(int milliseconds) =>
@@ -23,3 +25,26 @@
 }
 
 const streamTypes = ['single subscription', 'broadcast'];
+
+class NullOnCancelStream<T> extends StreamView<T> {
+  final Stream<T> _stream;
+
+  NullOnCancelStream(this._stream) : super(_stream);
+
+  @override
+  StreamSubscription<T> listen(void Function(T) onData,
+          {Function onError, void Function() onDone, bool cancelOnError}) =>
+      _NullOnCancelSubscription(_stream.listen(onData,
+          onError: onError, onDone: onDone, cancelOnError: cancelOnError));
+}
+
+class _NullOnCancelSubscription<T> extends DelegatingStreamSubscription<T> {
+  final StreamSubscription<T> _subscription;
+  _NullOnCancelSubscription(this._subscription) : super(_subscription);
+
+  @override
+  Future<void> cancel() {
+    _subscription.cancel();
+    return null;
+  }
+}