Migrate to null safety (dart-lang/stream_transform#119)

- Bump SDK constraint.
- Bump to null safe dependencies.
- Migrate code, examples, and tests.
- Add an opt-out test for testing the behavior of transforming streams
  with the `subscription.cancel()` call returns null despite a
  statically non-nullable return. This requires an unusual pattern in
  the code.
diff --git a/pkgs/stream_transform/.travis.yml b/pkgs/stream_transform/.travis.yml
index 24eb0d7..3b2eb08 100644
--- a/pkgs/stream_transform/.travis.yml
+++ b/pkgs/stream_transform/.travis.yml
@@ -3,7 +3,6 @@
   only: [master]
 dart:
   - dev
-  - 2.7.0
 cache:
   directories:
     - $HOME/.pub-cache
@@ -18,6 +17,3 @@
     - dart: dev
       dart_task:
         dartanalyzer: --fatal-warnings --fatal-infos .
-    - dart: 2.7.0
-      dart_task:
-        dartanalyzer: --fatal-warnings .
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 268bf35..68f8859 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,5 +1,6 @@
-## 1.2.1-dev
+## 2.0.0-nullsafety.0-dev
 
+- Migrate to null safety.
 - Improve tests of `switchMap` and improve documentation with links and
   clarification.
 
diff --git a/pkgs/stream_transform/example/main.dart b/pkgs/stream_transform/example/main.dart
index 4ed53d8..70b3e7f 100644
--- a/pkgs/stream_transform/example/main.dart
+++ b/pkgs/stream_transform/example/main.dart
@@ -9,7 +9,7 @@
 void main() {
   var firstInput = document.querySelector('#first_input') as InputElement;
   var secondInput = document.querySelector('#second_input') as InputElement;
-  var output = document.querySelector('#output');
+  var output = document.querySelector('#output')!;
 
   _inputValues(firstInput)
       .combineLatest(_inputValues(secondInput),
@@ -21,6 +21,6 @@
   });
 }
 
-Stream<String> _inputValues(InputElement element) => element.onKeyUp
+Stream<String?> _inputValues(InputElement element) => element.onKeyUp
     .debounce(const Duration(milliseconds: 100))
     .map((_) => element.value);
diff --git a/pkgs/stream_transform/lib/src/aggregate_sample.dart b/pkgs/stream_transform/lib/src/aggregate_sample.dart
index 88fc490..3a967d4 100644
--- a/pkgs/stream_transform/lib/src/aggregate_sample.dart
+++ b/pkgs/stream_transform/lib/src/aggregate_sample.dart
@@ -15,7 +15,7 @@
 /// the output.
 class AggregateSample<S, T> extends StreamTransformerBase<S, T> {
   final Stream<void> _trigger;
-  final T Function(S, T) _aggregate;
+  final T Function(S, T?) _aggregate;
 
   AggregateSample(this._trigger, this._aggregate);
 
@@ -25,15 +25,15 @@
         ? StreamController<T>.broadcast(sync: true)
         : StreamController<T>(sync: true);
 
-    T currentResults;
+    T? currentResults;
     var waitingForTrigger = true;
     var isTriggerDone = false;
     var isValueDone = false;
-    StreamSubscription<S> valueSub;
-    StreamSubscription<void> triggerSub;
+    StreamSubscription<S>? valueSub;
+    StreamSubscription<void>? triggerSub;
 
     void emit() {
-      controller.add(currentResults);
+      controller.add(currentResults!);
       currentResults = null;
       waitingForTrigger = true;
     }
@@ -44,7 +44,7 @@
       if (!waitingForTrigger) emit();
 
       if (isTriggerDone) {
-        valueSub.cancel();
+        valueSub!.cancel();
         controller.close();
       }
     }
@@ -63,7 +63,7 @@
       if (currentResults != null) emit();
 
       if (isValueDone) {
-        triggerSub.cancel();
+        triggerSub!.cancel();
         controller.close();
       }
     }
@@ -80,8 +80,9 @@
       assert(valueSub == null);
       valueSub = values.listen(onValue,
           onError: controller.addError, onDone: onValuesDone);
-      if (triggerSub != null) {
-        if (triggerSub.isPaused) triggerSub.resume();
+      final priorTriggerSub = triggerSub;
+      if (priorTriggerSub != null) {
+        if (priorTriggerSub.isPaused) priorTriggerSub.resume();
       } else {
         triggerSub = _trigger.listen(onTrigger,
             onError: controller.addError, onDone: onTriggerDone);
@@ -98,17 +99,16 @@
           };
       }
       controller.onCancel = () {
-        var toCancel = <StreamSubscription<void>>[];
-        if (!isValueDone) toCancel.add(valueSub);
+        var cancels = <Future<void>>[if (!isValueDone) valueSub!.cancel()];
         valueSub = null;
         if (_trigger.isBroadcast || !values.isBroadcast) {
-          if (!isTriggerDone) toCancel.add(triggerSub);
+          if (!isTriggerDone) cancels.add(triggerSub!.cancel());
           triggerSub = null;
         } else {
-          triggerSub.pause();
+          triggerSub!.pause();
         }
-        var cancels =
-            toCancel.map((s) => s.cancel()).where((f) => f != null).toList();
+        // Handle opt-out nulls
+        cancels.removeWhere((Object? f) => f == null);
         if (cancels.isEmpty) return null;
         return Future.wait(cancels).then((_) => null);
       };
diff --git a/pkgs/stream_transform/lib/src/async_map.dart b/pkgs/stream_transform/lib/src/async_map.dart
index c5b160b..7b406c7 100644
--- a/pkgs/stream_transform/lib/src/async_map.dart
+++ b/pkgs/stream_transform/lib/src/async_map.dart
@@ -117,13 +117,13 @@
 /// work.
 StreamTransformer<S, T> _asyncMapThen<S, T>(
     Future<T> Function(S) convert, void Function(void) then) {
-  Future<void> pendingEvent;
+  Future<void>? pendingEvent;
   return fromHandlers(handleData: (event, sink) {
     pendingEvent =
         convert(event).then(sink.add).catchError(sink.addError).then(then);
   }, handleDone: (sink) {
     if (pendingEvent != null) {
-      pendingEvent.then((_) => sink.close());
+      pendingEvent!.then((_) => sink.close());
     } else {
       sink.close();
     }
diff --git a/pkgs/stream_transform/lib/src/combine_latest.dart b/pkgs/stream_transform/lib/src/combine_latest.dart
index 303b16a..6e268e3 100644
--- a/pkgs/stream_transform/lib/src/combine_latest.dart
+++ b/pkgs/stream_transform/lib/src/combine_latest.dart
@@ -92,14 +92,14 @@
         ? _other.asBroadcastStream()
         : _other;
 
-    StreamSubscription<S> sourceSubscription;
-    StreamSubscription<T> otherSubscription;
+    StreamSubscription<S>? sourceSubscription;
+    StreamSubscription<T>? otherSubscription;
 
     var sourceDone = false;
     var otherDone = false;
 
-    S latestSource;
-    T latestOther;
+    late S latestSource;
+    late T latestOther;
 
     var sourceStarted = false;
     var otherStarted = false;
@@ -114,16 +114,16 @@
         return;
       }
       if (result is Future<R>) {
-        sourceSubscription.pause();
-        otherSubscription.pause();
+        sourceSubscription!.pause();
+        otherSubscription!.pause();
         result
             .then(controller.add, onError: controller.addError)
             .whenComplete(() {
-          sourceSubscription.resume();
-          otherSubscription.resume();
+          sourceSubscription!.resume();
+          otherSubscription!.resume();
         });
       } else {
-        controller.add(result as R);
+        controller.add(result);
       }
     }
 
@@ -142,7 +142,7 @@
               controller.close();
             } else if (!sourceStarted) {
               // Nothing can ever be emitted
-              otherSubscription.cancel();
+              otherSubscription!.cancel();
               controller.close();
             }
           });
@@ -159,24 +159,28 @@
               controller.close();
             } else if (!otherStarted) {
               // Nothing can ever be emitted
-              sourceSubscription.cancel();
+              sourceSubscription!.cancel();
               controller.close();
             }
           });
       if (!source.isBroadcast) {
         controller
           ..onPause = () {
-            sourceSubscription.pause();
-            otherSubscription.pause();
+            sourceSubscription!.pause();
+            otherSubscription!.pause();
           }
           ..onResume = () {
-            sourceSubscription.resume();
-            otherSubscription.resume();
+            sourceSubscription!.resume();
+            otherSubscription!.resume();
           };
       }
       controller.onCancel = () {
-        var cancels = [sourceSubscription.cancel(), otherSubscription.cancel()]
-            .where((f) => f != null);
+        var cancels = [
+          sourceSubscription!.cancel(),
+          otherSubscription!.cancel()
+        ]
+          // Handle opt-out nulls
+          ..removeWhere((Object? f) => f == null);
         sourceSubscription = null;
         otherSubscription = null;
         return Future.wait(cancels).then((_) => null);
@@ -208,7 +212,7 @@
     controller.onListen = () {
       final subscriptions = <StreamSubscription<T>>[];
 
-      final latestData = List<T>(allStreams.length);
+      final latestData = List<T?>.filled(allStreams.length, null);
       final hasEmitted = <int>{};
       void handleData(int index, T data) {
         latestData[index] = data;
@@ -249,10 +253,10 @@
           };
       }
       controller.onCancel = () {
-        var cancels = subscriptions
-            .map((s) => s.cancel())
-            .where((f) => f != null)
-            .toList();
+        if (subscriptions.isEmpty) return null;
+        var cancels = [for (var s in subscriptions) s.cancel()]
+          // Handle opt-out nulls
+          ..removeWhere((Object? f) => f == null);
         if (cancels.isEmpty) return null;
         return Future.wait(cancels).then((_) => null);
       };
diff --git a/pkgs/stream_transform/lib/src/concatenate.dart b/pkgs/stream_transform/lib/src/concatenate.dart
index 05c977f..402d8a0 100644
--- a/pkgs/stream_transform/lib/src/concatenate.dart
+++ b/pkgs/stream_transform/lib/src/concatenate.dart
@@ -68,12 +68,12 @@
         ? _next.asBroadcastStream()
         : _next;
 
-    StreamSubscription<T> subscription;
+    StreamSubscription<T>? subscription;
     var currentStream = first;
     var firstDone = false;
     var secondDone = false;
 
-    Function currentDoneHandler;
+    late void Function() currentDoneHandler;
 
     void listen() {
       subscription = currentStream.listen(controller.add,
@@ -100,18 +100,18 @@
       if (!first.isBroadcast) {
         controller
           ..onPause = () {
-            if (!firstDone || !next.isBroadcast) return subscription.pause();
-            subscription.cancel();
+            if (!firstDone || !next.isBroadcast) return subscription!.pause();
+            subscription!.cancel();
             subscription = null;
           }
           ..onResume = () {
-            if (!firstDone || !next.isBroadcast) return subscription.resume();
+            if (!firstDone || !next.isBroadcast) return subscription!.resume();
             listen();
           };
       }
       controller.onCancel = () {
         if (secondDone) return null;
-        var toCancel = subscription;
+        var toCancel = subscription!;
         subscription = null;
         return toCancel.cancel();
       };
diff --git a/pkgs/stream_transform/lib/src/from_handlers.dart b/pkgs/stream_transform/lib/src/from_handlers.dart
index 3e5689f..c7c9332 100644
--- a/pkgs/stream_transform/lib/src/from_handlers.dart
+++ b/pkgs/stream_transform/lib/src/from_handlers.dart
@@ -7,9 +7,9 @@
 /// Like [new StreamTransformer.fromHandlers] but the handlers are called once
 /// per event rather than once per listener for broadcast streams.
 StreamTransformer<S, T> fromHandlers<S, T>(
-        {void Function(S, EventSink<T>) handleData,
-        void Function(Object, StackTrace, EventSink<T>) handleError,
-        void Function(EventSink<T>) handleDone}) =>
+        {void Function(S, EventSink<T>)? handleData,
+        void Function(Object, StackTrace, EventSink<T>)? handleError,
+        void Function(EventSink<T>)? handleDone}) =>
     _StreamTransformer(
         handleData: handleData,
         handleError: handleError,
@@ -21,9 +21,9 @@
   final void Function(Object, StackTrace, EventSink<T>) _handleError;
 
   _StreamTransformer(
-      {void Function(S, EventSink<T>) handleData,
-      void Function(Object, StackTrace, EventSink<T>) handleError,
-      void Function(EventSink<T>) handleDone})
+      {void Function(S, EventSink<T>)? handleData,
+      void Function(Object, StackTrace, EventSink<T>)? handleError,
+      void Function(EventSink<T>)? handleDone})
       : _handleData = handleData ?? _defaultHandleData,
         _handleError = handleError ?? _defaultHandleError,
         _handleDone = handleDone ?? _defaultHandleDone;
@@ -47,12 +47,12 @@
         ? StreamController<T>.broadcast(sync: true)
         : StreamController<T>(sync: true);
 
-    StreamSubscription<S> subscription;
+    StreamSubscription<S>? subscription;
     controller.onListen = () {
       assert(subscription == null);
       var valuesDone = false;
       subscription = values.listen((value) => _handleData(value, controller),
-          onError: (error, StackTrace stackTrace) {
+          onError: (Object error, StackTrace stackTrace) {
         _handleError(error, stackTrace, controller);
       }, onDone: () {
         valuesDone = true;
@@ -60,13 +60,13 @@
       });
       if (!values.isBroadcast) {
         controller
-          ..onPause = subscription.pause
-          ..onResume = subscription.resume;
+          ..onPause = subscription!.pause
+          ..onResume = subscription!.resume;
       }
       controller.onCancel = () {
         var toCancel = subscription;
         subscription = null;
-        if (!valuesDone) return toCancel.cancel();
+        if (!valuesDone) return toCancel!.cancel();
         return null;
       };
     };
diff --git a/pkgs/stream_transform/lib/src/merge.dart b/pkgs/stream_transform/lib/src/merge.dart
index c68695c..61b5e21 100644
--- a/pkgs/stream_transform/lib/src/merge.dart
+++ b/pkgs/stream_transform/lib/src/merge.dart
@@ -131,10 +131,10 @@
           };
       }
       controller.onCancel = () {
-        var cancels = subscriptions
-            .map((s) => s.cancel())
-            .where((f) => f != null)
-            .toList();
+        if (subscriptions.isEmpty) return null;
+        var cancels = [for (var s in subscriptions) s.cancel()]
+          // Handle opt-out nulls
+          ..removeWhere((Object? f) => f == null);
         if (cancels.isEmpty) return null;
         return Future.wait(cancels).then((_) => null);
       };
@@ -183,11 +183,10 @@
           };
       }
       controller.onCancel = () {
-        var cancels = subscriptions
-            .map((s) => s.cancel())
-            .where((f) => f != null)
-            .toList();
-        if (cancels.isEmpty) return null;
+        if (subscriptions.isEmpty) return null;
+        var cancels = [for (var s in subscriptions) s.cancel()]
+          // Handle opt-out nulls
+          ..removeWhere((Object? f) => f == null);
         return Future.wait(cancels).then((_) => null);
       };
     };
diff --git a/pkgs/stream_transform/lib/src/rate_limit.dart b/pkgs/stream_transform/lib/src/rate_limit.dart
index e42f8f4..23dfbc5 100644
--- a/pkgs/stream_transform/lib/src/rate_limit.dart
+++ b/pkgs/stream_transform/lib/src/rate_limit.dart
@@ -85,7 +85,7 @@
   /// Events emitted by the source stream within [duration] following an emitted
   /// event will be discarded. Errors are always forwarded immediately.
   Stream<T> throttle(Duration duration) {
-    Timer timer;
+    Timer? timer;
 
     return transform(fromHandlers(handleData: (data, sink) {
       if (timer == null) {
@@ -125,7 +125,7 @@
   ///     source: a------b--c----d--|
   ///     output: -----a------c--------d|
   Stream<T> audit(Duration duration) {
-    Timer timer;
+    Timer? timer;
     var shouldClose = false;
     T recentData;
 
@@ -161,7 +161,7 @@
       transform(AggregateSample<T, List<T>>(trigger, _collect));
 }
 
-List<T> _collectToList<T>(T element, List<T> soFar) {
+List<T> _collectToList<T>(T element, List<T>? soFar) {
   soFar ??= <T>[];
   soFar.add(element);
   return soFar;
@@ -172,10 +172,10 @@
 /// Creates a StreamTransformer which aggregates values until the source stream
 /// does not emit for [duration], then emits the aggregated values.
 StreamTransformer<T, R> _debounceAggregate<T, R>(
-    Duration duration, R Function(T element, R soFar) collect,
-    {bool leading, bool trailing}) {
-  Timer timer;
-  R soFar;
+    Duration duration, R Function(T element, R? soFar) collect,
+    {required bool leading, required bool trailing}) {
+  Timer? timer;
+  R? soFar;
   var shouldClose = false;
   var emittedLatestAsLeading = false;
   return fromHandlers(handleData: (T value, EventSink<R> sink) {
@@ -183,12 +183,12 @@
     soFar = collect(value, soFar);
     if (timer == null && leading) {
       emittedLatestAsLeading = true;
-      sink.add(soFar);
+      sink.add(soFar as R);
     } else {
       emittedLatestAsLeading = false;
     }
     timer = Timer(duration, () {
-      if (trailing && !emittedLatestAsLeading) sink.add(soFar);
+      if (trailing && !emittedLatestAsLeading) sink.add(soFar as R);
       if (shouldClose) {
         sink.close();
       }
@@ -205,4 +205,4 @@
   });
 }
 
-List<T> _collect<T>(T event, List<T> soFar) => (soFar ?? <T>[])..add(event);
+List<T> _collect<T>(T event, List<T>? soFar) => (soFar ?? <T>[])..add(event);
diff --git a/pkgs/stream_transform/lib/src/scan.dart b/pkgs/stream_transform/lib/src/scan.dart
index 4e022f5..d14381c 100644
--- a/pkgs/stream_transform/lib/src/scan.dart
+++ b/pkgs/stream_transform/lib/src/scan.dart
@@ -21,7 +21,7 @@
       if (result is Future<S>) {
         return result.then((r) => accumulated = r);
       } else {
-        return accumulated = result as S;
+        return accumulated = result;
       }
     });
   }
diff --git a/pkgs/stream_transform/lib/src/switch.dart b/pkgs/stream_transform/lib/src/switch.dart
index dcd2431..df7f1b8 100644
--- a/pkgs/stream_transform/lib/src/switch.dart
+++ b/pkgs/stream_transform/lib/src/switch.dart
@@ -65,7 +65,7 @@
         : StreamController<T>(sync: true);
 
     controller.onListen = () {
-      StreamSubscription<T> innerSubscription;
+      StreamSubscription<T>? innerSubscription;
       var outerStreamDone = false;
 
       final outerSubscription = outer.listen(
@@ -96,8 +96,10 @@
       controller.onCancel = () {
         var cancels = [
           if (!outerStreamDone) outerSubscription.cancel(),
-          if (innerSubscription != null) innerSubscription.cancel(),
-        ].where((f) => f != null);
+          if (innerSubscription != null) innerSubscription!.cancel(),
+        ]
+          // Handle opt-out nulls
+          ..removeWhere((Object? f) => f == null);
         if (cancels.isEmpty) return null;
         return Future.wait(cancels).then((_) => null);
       };
diff --git a/pkgs/stream_transform/lib/src/take_until.dart b/pkgs/stream_transform/lib/src/take_until.dart
index 43b35d1..5420500 100644
--- a/pkgs/stream_transform/lib/src/take_until.dart
+++ b/pkgs/stream_transform/lib/src/take_until.dart
@@ -27,7 +27,7 @@
         ? StreamController<T>.broadcast(sync: true)
         : StreamController<T>(sync: true);
 
-    StreamSubscription<T> subscription;
+    StreamSubscription<T>? subscription;
     var isDone = false;
     _trigger.then((_) {
       if (isDone) return;
@@ -46,12 +46,12 @@
       });
       if (!values.isBroadcast) {
         controller
-          ..onPause = subscription.pause
-          ..onResume = subscription.resume;
+          ..onPause = subscription!.pause
+          ..onResume = subscription!.resume;
       }
       controller.onCancel = () {
         if (isDone) return null;
-        var toCancel = subscription;
+        var toCancel = subscription!;
         subscription = null;
         return toCancel.cancel();
       };
diff --git a/pkgs/stream_transform/lib/src/tap.dart b/pkgs/stream_transform/lib/src/tap.dart
index 2696e02..b7e0321 100644
--- a/pkgs/stream_transform/lib/src/tap.dart
+++ b/pkgs/stream_transform/lib/src/tap.dart
@@ -22,9 +22,9 @@
   ///
   /// The callbacks may not be called until the tapped stream has a listener,
   /// and may not be called after the listener has canceled the subscription.
-  Stream<T> tap(void Function(T) onValue,
-          {void Function(Object, StackTrace) onError,
-          void Function() onDone}) =>
+  Stream<T> tap(void Function(T)? onValue,
+          {void Function(Object, StackTrace)? onError,
+          void Function()? onDone}) =>
       transform(fromHandlers(handleData: (value, sink) {
         try {
           onValue?.call(value);
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index 5da90b1..ba4bd51 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -1,12 +1,12 @@
 name: stream_transform
 description: A collection of utilities to transform and manipulate streams.
 homepage: https://github.com/dart-lang/stream_transform
-version: 1.2.1-dev
+version: 2.0.0-nullsafety.0-dev
 
 environment:
-  sdk: ">=2.7.0 <3.0.0"
+  sdk: ">=2.12.0-0 <3.0.0"
 
 dev_dependencies:
-  async: ^2.0.0
-  pedantic: ^1.5.0
-  test: ^1.0.0
+  async: ^2.5.0-nullsafety
+  pedantic: ^1.10.0-nullsafety
+  test: ^1.16.0-nullsafety
diff --git a/pkgs/stream_transform/test/async_map_buffer_test.dart b/pkgs/stream_transform/test/async_map_buffer_test.dart
index 9b46e1f..1280b51 100644
--- a/pkgs/stream_transform/test/async_map_buffer_test.dart
+++ b/pkgs/stream_transform/test/async_map_buffer_test.dart
@@ -11,16 +11,16 @@
 import 'utils.dart';
 
 void main() {
-  StreamController<int> values;
-  List<String> emittedValues;
-  bool valuesCanceled;
-  bool isDone;
-  List<String> errors;
-  Stream<String> transformed;
-  StreamSubscription<String> subscription;
+  late StreamController<int> values;
+  late List<String> emittedValues;
+  late bool valuesCanceled;
+  late bool isDone;
+  late List<String> errors;
+  late Stream<String> transformed;
+  late StreamSubscription<String> subscription;
 
-  Completer<String> finishWork;
-  List<int> workArgument;
+  Completer<String>? finishWork;
+  List<int>? workArgument;
 
   /// Represents the async `convert` function and asserts that is is only called
   /// after the previous iteration has completed.
@@ -28,15 +28,15 @@
     expect(finishWork, isNull,
         reason: 'See $values befor previous work is complete');
     workArgument = values;
-    finishWork = Completer();
-    finishWork.future.then((_) {
-      workArgument = null;
-      finishWork = null;
-    }).catchError((_) {
-      workArgument = null;
-      finishWork = null;
-    });
-    return finishWork.future;
+    finishWork = Completer()
+      ..future.then((_) {
+        workArgument = null;
+        finishWork = null;
+      }).catchError((_) {
+        workArgument = null;
+        finishWork = null;
+      });
+    return finishWork!.future;
   }
 
   for (var streamType in streamTypes) {
@@ -64,7 +64,7 @@
         await Future(() {});
         expect(emittedValues, isEmpty);
         expect(workArgument, [1]);
-        finishWork.complete('result');
+        finishWork!.complete('result');
         await Future(() {});
         expect(emittedValues, ['result']);
       });
@@ -74,7 +74,7 @@
         await Future(() {});
         values..add(2)..add(3);
         await Future(() {});
-        finishWork.complete();
+        finishWork!.complete('');
         await Future(() {});
         expect(workArgument, [2, 3]);
       });
@@ -90,7 +90,7 @@
       test('forwards errors which occur during the work', () async {
         values.add(1);
         await Future(() {});
-        finishWork.completeError('error');
+        finishWork!.completeError('error');
         await Future(() {});
         expect(errors, ['error']);
       });
@@ -98,11 +98,11 @@
       test('can continue handling events after an error', () async {
         values.add(1);
         await Future(() {});
-        finishWork.completeError('error');
+        finishWork!.completeError('error');
         values.add(2);
         await Future(() {});
         expect(workArgument, [2]);
-        finishWork.completeError('another');
+        finishWork!.completeError('another');
         await Future(() {});
         expect(errors, ['error', 'another']);
       });
@@ -140,11 +140,11 @@
         values.add(2);
         await values.close();
         expect(isDone, false);
-        finishWork.complete(null);
+        finishWork!.complete('');
         await Future(() {});
         // Still a pending value
         expect(isDone, false);
-        finishWork.complete(null);
+        finishWork!.complete('');
         await Future(() {});
         expect(isDone, true);
       });
@@ -161,7 +161,7 @@
           transformed.listen(otherValues.add);
           values.add(1);
           await Future(() {});
-          finishWork.complete('result');
+          finishWork!.complete('result');
           await Future(() {});
           expect(emittedValues, ['result']);
           expect(otherValues, ['result']);
@@ -175,7 +175,7 @@
           await values.close();
           expect(isDone, false);
           expect(otherDone, false);
-          finishWork.complete();
+          finishWork!.complete('');
           await Future(() {});
           expect(isDone, true);
           expect(otherDone, true);
@@ -184,7 +184,7 @@
         test('can cancel and relisten', () async {
           values.add(1);
           await Future(() {});
-          finishWork.complete('first');
+          finishWork!.complete('first');
           await Future(() {});
           await subscription.cancel();
           values.add(2);
@@ -193,7 +193,7 @@
           values.add(3);
           await Future(() {});
           expect(workArgument, [3]);
-          finishWork.complete('second');
+          finishWork!.complete('second');
           await Future(() {});
           expect(emittedValues, ['first', 'second']);
         });
diff --git a/pkgs/stream_transform/test/async_map_sample_test.dart b/pkgs/stream_transform/test/async_map_sample_test.dart
index 9c37cd4..06457d8 100644
--- a/pkgs/stream_transform/test/async_map_sample_test.dart
+++ b/pkgs/stream_transform/test/async_map_sample_test.dart
@@ -11,16 +11,16 @@
 import 'utils.dart';
 
 void main() {
-  StreamController<int> values;
-  List<String> emittedValues;
-  bool valuesCanceled;
-  bool isDone;
-  List<String> errors;
-  Stream<String> transformed;
-  StreamSubscription<String> subscription;
+  late StreamController<int> values;
+  late List<String> emittedValues;
+  late bool valuesCanceled;
+  late bool isDone;
+  late List<String> errors;
+  late Stream<String> transformed;
+  late StreamSubscription<String> subscription;
 
-  Completer<String> finishWork;
-  int workArgument;
+  Completer<String>? finishWork;
+  int? workArgument;
 
   /// Represents the async `convert` function and asserts that is is only called
   /// after the previous iteration has completed.
@@ -28,15 +28,15 @@
     expect(finishWork, isNull,
         reason: 'See $values befor previous work is complete');
     workArgument = value;
-    finishWork = Completer();
-    finishWork.future.then((_) {
-      workArgument = null;
-      finishWork = null;
-    }).catchError((_) {
-      workArgument = null;
-      finishWork = null;
-    });
-    return finishWork.future;
+    finishWork = Completer()
+      ..future.then((_) {
+        workArgument = null;
+        finishWork = null;
+      }).catchError((_) {
+        workArgument = null;
+        finishWork = null;
+      });
+    return finishWork!.future;
   }
 
   for (var streamType in streamTypes) {
@@ -64,7 +64,7 @@
         await Future(() {});
         expect(emittedValues, isEmpty);
         expect(workArgument, 1);
-        finishWork.complete('result');
+        finishWork!.complete('result');
         await Future(() {});
         expect(emittedValues, ['result']);
       });
@@ -74,7 +74,7 @@
         await Future(() {});
         values..add(2)..add(3);
         await Future(() {});
-        finishWork.complete();
+        finishWork!.complete('');
         await Future(() {});
         expect(workArgument, 3);
       });
@@ -90,7 +90,7 @@
       test('forwards errors which occur during the work', () async {
         values.add(1);
         await Future(() {});
-        finishWork.completeError('error');
+        finishWork!.completeError('error');
         await Future(() {});
         expect(errors, ['error']);
       });
@@ -98,11 +98,11 @@
       test('can continue handling events after an error', () async {
         values.add(1);
         await Future(() {});
-        finishWork.completeError('error');
+        finishWork!.completeError('error');
         values.add(2);
         await Future(() {});
         expect(workArgument, 2);
-        finishWork.completeError('another');
+        finishWork!.completeError('another');
         await Future(() {});
         expect(errors, ['error', 'another']);
       });
@@ -140,11 +140,11 @@
         values.add(2);
         await values.close();
         expect(isDone, false);
-        finishWork.complete(null);
+        finishWork!.complete('');
         await Future(() {});
         // Still a pending value
         expect(isDone, false);
-        finishWork.complete(null);
+        finishWork!.complete('');
         await Future(() {});
         expect(isDone, true);
       });
@@ -161,7 +161,7 @@
           transformed.listen(otherValues.add);
           values.add(1);
           await Future(() {});
-          finishWork.complete('result');
+          finishWork!.complete('result');
           await Future(() {});
           expect(emittedValues, ['result']);
           expect(otherValues, ['result']);
@@ -175,7 +175,7 @@
           await values.close();
           expect(isDone, false);
           expect(otherDone, false);
-          finishWork.complete();
+          finishWork!.complete('');
           await Future(() {});
           expect(isDone, true);
           expect(otherDone, true);
@@ -184,7 +184,7 @@
         test('can cancel and relisten', () async {
           values.add(1);
           await Future(() {});
-          finishWork.complete('first');
+          finishWork!.complete('first');
           await Future(() {});
           await subscription.cancel();
           values.add(2);
@@ -193,20 +193,11 @@
           values.add(3);
           await Future(() {});
           expect(workArgument, 3);
-          finishWork.complete('second');
+          finishWork!.complete('second');
           await Future(() {});
           expect(emittedValues, ['first', 'second']);
         });
       }
     });
   }
-  test('handles null response from cancel', () async {
-    var controller = StreamController<int>();
-
-    var subscription = NullOnCancelStream(controller.stream)
-        .asyncMapSample((_) async {})
-        .listen(null);
-
-    await subscription.cancel();
-  });
 }
diff --git a/pkgs/stream_transform/test/audit_test.dart b/pkgs/stream_transform/test/audit_test.dart
index 58cb95b..20c7d9f 100644
--- a/pkgs/stream_transform/test/audit_test.dart
+++ b/pkgs/stream_transform/test/audit_test.dart
@@ -12,13 +12,13 @@
 void main() {
   for (var streamType in streamTypes) {
     group('Stream type [$streamType]', () {
-      StreamController<int> values;
-      List<int> emittedValues;
-      bool valuesCanceled;
-      bool isDone;
-      List<String> errors;
-      Stream<int> transformed;
-      StreamSubscription<int> subscription;
+      late StreamController<int> values;
+      late List<int> emittedValues;
+      late bool valuesCanceled;
+      late bool isDone;
+      late List<String> errors;
+      late Stream<int> transformed;
+      late StreamSubscription<int> subscription;
 
       group('audit', () {
         setUp(() async {
diff --git a/pkgs/stream_transform/test/buffer_test.dart b/pkgs/stream_transform/test/buffer_test.dart
index 80a793c..b528fa2 100644
--- a/pkgs/stream_transform/test/buffer_test.dart
+++ b/pkgs/stream_transform/test/buffer_test.dart
@@ -10,16 +10,16 @@
 import 'utils.dart';
 
 void main() {
-  StreamController<void> trigger;
-  StreamController<int> values;
-  List<List<int>> emittedValues;
-  bool valuesCanceled;
-  bool triggerCanceled;
-  bool triggerPaused;
-  bool isDone;
-  List<String> errors;
-  Stream<List<int>> transformed;
-  StreamSubscription<List<int>> subscription;
+  late StreamController<void> trigger;
+  late StreamController<int> values;
+  late List<List<int>> emittedValues;
+  late bool valuesCanceled;
+  late bool triggerCanceled;
+  late bool triggerPaused;
+  late bool isDone;
+  late List<String> errors;
+  late Stream<List<int>> transformed;
+  late StreamSubscription<List<int>> subscription;
 
   void setUpForStreamTypes(String triggerType, String valuesType) {
     valuesCanceled = false;
@@ -243,13 +243,4 @@
       ]);
     });
   }
-
-  test('handles null response from cancel', () async {
-    var controller = StreamController<int>();
-    var trigger = StreamController<void>();
-    var subscription = NullOnCancelStream(controller.stream)
-        .buffer(trigger.stream)
-        .listen(null);
-    await subscription.cancel();
-  });
 }
diff --git a/pkgs/stream_transform/test/combine_latest_all_test.dart b/pkgs/stream_transform/test/combine_latest_all_test.dart
index d57d618..2943449 100644
--- a/pkgs/stream_transform/test/combine_latest_all_test.dart
+++ b/pkgs/stream_transform/test/combine_latest_all_test.dart
@@ -8,8 +8,6 @@
 
 import 'package:stream_transform/stream_transform.dart';
 
-import 'utils.dart';
-
 Future<void> tick() => Future(() {});
 
 void main() {
@@ -166,14 +164,4 @@
       });
     });
   });
-
-  test('handles null response from cancel', () async {
-    var source = StreamController<int>();
-    var other = StreamController<int>();
-
-    var subscription = NullOnCancelStream(source.stream)
-        .combineLatestAll([NullOnCancelStream(other.stream)]).listen(null);
-
-    await subscription.cancel();
-  });
 }
diff --git a/pkgs/stream_transform/test/combine_latest_test.dart b/pkgs/stream_transform/test/combine_latest_test.dart
index 4ea9499..2b35e2c 100644
--- a/pkgs/stream_transform/test/combine_latest_test.dart
+++ b/pkgs/stream_transform/test/combine_latest_test.dart
@@ -9,8 +9,6 @@
 
 import 'package:stream_transform/stream_transform.dart';
 
-import 'utils.dart';
-
 void main() {
   group('combineLatest', () {
     test('flows through combine callback', () async {
@@ -170,17 +168,6 @@
       });
     });
   });
-
-  test('handles null response from cancel', () async {
-    var source = StreamController<int>();
-    var other = StreamController<int>();
-
-    var subscription = NullOnCancelStream(source.stream)
-        .combineLatest(NullOnCancelStream(other.stream), (a, b) => null)
-        .listen(null);
-
-    await subscription.cancel();
-  });
 }
 
 class _NumberedException implements Exception {
diff --git a/pkgs/stream_transform/test/concurrent_async_expand_test.dart b/pkgs/stream_transform/test/concurrent_async_expand_test.dart
index 9ed02d0..d0b4ad4 100644
--- a/pkgs/stream_transform/test/concurrent_async_expand_test.dart
+++ b/pkgs/stream_transform/test/concurrent_async_expand_test.dart
@@ -24,15 +24,15 @@
   for (var outerType in streamTypes) {
     for (var innerType in streamTypes) {
       group('concurrentAsyncExpand $outerType to $innerType', () {
-        StreamController<int> outerController;
-        bool outerCanceled;
-        List<StreamController<String>> innerControllers;
-        List<bool> innerCanceled;
-        List<String> emittedValues;
-        bool isDone;
-        List<String> errors;
-        Stream<String> transformed;
-        StreamSubscription<String> subscription;
+        late StreamController<int> outerController;
+        late bool outerCanceled;
+        late List<StreamController<String>> innerControllers;
+        late List<bool> innerCanceled;
+        late List<String> emittedValues;
+        late bool isDone;
+        late List<String> errors;
+        late Stream<String> transformed;
+        late StreamSubscription<String> subscription;
 
         setUp(() {
           outerController = createController(outerType)
@@ -184,19 +184,4 @@
       });
     }
   }
-
-  test('hendles null response from cancel', () async {
-    var source = StreamController<int>();
-    var other = StreamController<int>();
-
-    var subscription = NullOnCancelStream(source.stream)
-        .concurrentAsyncExpand((_) => NullOnCancelStream(other.stream))
-        .listen(null);
-
-    source.add(1);
-
-    await Future<void>(() {});
-
-    await subscription.cancel();
-  });
 }
diff --git a/pkgs/stream_transform/test/concurrent_async_map_test.dart b/pkgs/stream_transform/test/concurrent_async_map_test.dart
index 6da2101..a894e7f 100644
--- a/pkgs/stream_transform/test/concurrent_async_map_test.dart
+++ b/pkgs/stream_transform/test/concurrent_async_map_test.dart
@@ -11,16 +11,16 @@
 import 'utils.dart';
 
 void main() {
-  StreamController<int> controller;
-  List<String> emittedValues;
-  bool valuesCanceled;
-  bool isDone;
-  List<String> errors;
-  Stream<String> transformed;
-  StreamSubscription<String> subscription;
+  late StreamController<int> controller;
+  late List<String> emittedValues;
+  late bool valuesCanceled;
+  late bool isDone;
+  late List<String> errors;
+  late Stream<String> transformed;
+  late StreamSubscription<String> subscription;
 
-  List<Completer<String>> finishWork;
-  List<dynamic> values;
+  late List<Completer<String>> finishWork;
+  late List<dynamic> values;
 
   Future<String> convert(int value) {
     values.add(value);
@@ -127,7 +127,7 @@
           await controller.close();
           expect(isDone, false);
           expect(otherDone, false);
-          finishWork.first.complete();
+          finishWork.first.complete('');
           await Future(() {});
           expect(isDone, true);
           expect(otherDone, true);
diff --git a/pkgs/stream_transform/test/debounce_test.dart b/pkgs/stream_transform/test/debounce_test.dart
index de6b740..f24bfb9 100644
--- a/pkgs/stream_transform/test/debounce_test.dart
+++ b/pkgs/stream_transform/test/debounce_test.dart
@@ -13,13 +13,13 @@
   for (var streamType in streamTypes) {
     group('Stream type [$streamType]', () {
       group('debounce - trailing', () {
-        StreamController<int> values;
-        List<int> emittedValues;
-        bool valuesCanceled;
-        bool isDone;
-        List<String> errors;
-        StreamSubscription<int> subscription;
-        Stream<int> transformed;
+        late StreamController<int> values;
+        late List<int> emittedValues;
+        late bool valuesCanceled;
+        late bool isDone;
+        late List<String> errors;
+        late StreamSubscription<int> subscription;
+        late Stream<int> transformed;
 
         setUp(() async {
           valuesCanceled = false;
@@ -89,10 +89,10 @@
       });
 
       group('debounce - leading', () {
-        StreamController<int> values;
-        List<int> emittedValues;
-        Stream<int> transformed;
-        bool isDone;
+        late StreamController<int> values;
+        late List<int> emittedValues;
+        late Stream<int> transformed;
+        late bool isDone;
 
         setUp(() async {
           values = createController(streamType);
@@ -139,9 +139,9 @@
       });
 
       group('debounce - leading and trailing', () {
-        StreamController<int> values;
-        List<int> emittedValues;
-        Stream<int> transformed;
+        late StreamController<int> values;
+        late List<int> emittedValues;
+        late Stream<int> transformed;
 
         setUp(() async {
           values = createController(streamType);
@@ -179,10 +179,10 @@
       });
 
       group('debounceBuffer', () {
-        StreamController<int> values;
-        List<List<int>> emittedValues;
-        List<String> errors;
-        Stream<List<int>> transformed;
+        late StreamController<int> values;
+        late List<List<int>> emittedValues;
+        late List<String> errors;
+        late Stream<List<int>> transformed;
 
         setUp(() async {
           values = createController(streamType);
diff --git a/pkgs/stream_transform/test/followd_by_test.dart b/pkgs/stream_transform/test/followd_by_test.dart
index c8864b5..fbe904a 100644
--- a/pkgs/stream_transform/test/followd_by_test.dart
+++ b/pkgs/stream_transform/test/followd_by_test.dart
@@ -13,17 +13,17 @@
   for (var firstType in streamTypes) {
     for (var secondType in streamTypes) {
       group('followedBy [$firstType] with [$secondType]', () {
-        StreamController<int> first;
-        StreamController<int> second;
+        late StreamController<int> first;
+        late StreamController<int> second;
 
-        List<int> emittedValues;
-        bool firstCanceled;
-        bool secondCanceled;
-        bool secondListened;
-        bool isDone;
-        List<String> errors;
-        Stream<int> transformed;
-        StreamSubscription<int> subscription;
+        late List<int> emittedValues;
+        late bool firstCanceled;
+        late bool secondCanceled;
+        late bool secondListened;
+        late bool isDone;
+        late List<String> errors;
+        late Stream<int> transformed;
+        late StreamSubscription<int> subscription;
 
         setUp(() async {
           firstCanceled = false;
diff --git a/pkgs/stream_transform/test/from_handlers_test.dart b/pkgs/stream_transform/test/from_handlers_test.dart
index 50d59c5..206acc8 100644
--- a/pkgs/stream_transform/test/from_handlers_test.dart
+++ b/pkgs/stream_transform/test/from_handlers_test.dart
@@ -9,13 +9,13 @@
 import 'package:stream_transform/src/from_handlers.dart';
 
 void main() {
-  StreamController<int> values;
-  List<int> emittedValues;
-  bool valuesCanceled;
-  bool isDone;
-  List<String> errors;
-  Stream<int> transformed;
-  StreamSubscription<int> subscription;
+  late StreamController<int> values;
+  late List<int> emittedValues;
+  late bool valuesCanceled;
+  late bool isDone;
+  late List<String> errors;
+  late Stream<int> transformed;
+  late StreamSubscription<int> subscription;
 
   void setUpForController(StreamController<int> controller,
       StreamTransformer<int, int> transformer) {
@@ -68,10 +68,10 @@
     });
 
     group('broadcast stream with muliple listeners', () {
-      List<int> emittedValues2;
-      List<String> errors2;
-      bool isDone2;
-      StreamSubscription<int> subscription2;
+      late List<int> emittedValues2;
+      late List<String> errors2;
+      late bool isDone2;
+      late StreamSubscription<int> subscription2;
 
       setUp(() {
         setUpForController(StreamController.broadcast(), fromHandlers());
@@ -133,9 +133,9 @@
     });
 
     group('broadcast stream with multiple listeners', () {
-      int dataCallCount;
-      int doneCallCount;
-      int errorCallCount;
+      late int dataCallCount;
+      late int doneCallCount;
+      late int errorCallCount;
 
       setUp(() async {
         dataCallCount = 0;
diff --git a/pkgs/stream_transform/test/merge_test.dart b/pkgs/stream_transform/test/merge_test.dart
index 2eef5d1..24cd76b 100644
--- a/pkgs/stream_transform/test/merge_test.dart
+++ b/pkgs/stream_transform/test/merge_test.dart
@@ -8,8 +8,6 @@
 
 import 'package:stream_transform/stream_transform.dart';
 
-import 'utils.dart';
-
 void main() {
   group('merge', () {
     test('includes all values', () async {
@@ -140,15 +138,4 @@
       expect(secondListenerValues, [1, 2, 3, 4, 5, 6]);
     });
   });
-
-  test('handles null response rom cancel', () async {
-    var source = StreamController<int>();
-    var other = StreamController<int>();
-
-    var subscription = NullOnCancelStream(source.stream)
-        .merge(NullOnCancelStream(other.stream))
-        .listen(null);
-
-    await subscription.cancel();
-  });
 }
diff --git a/pkgs/stream_transform/test/opt_out_test.dart b/pkgs/stream_transform/test/opt_out_test.dart
new file mode 100644
index 0000000..8ab4fc4
--- /dev/null
+++ b/pkgs/stream_transform/test/opt_out_test.dart
@@ -0,0 +1,72 @@
+// Copyright (c) 2020, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// @dart=2.9
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:test/test.dart';
+
+import 'package:stream_transform/stream_transform.dart';
+
+void main() {
+  group('null returned from StreamSubscription.cancel', () {
+    void testNullCancel(
+        String name, Stream<void> Function(Stream<void>) transform) {
+      test(name, () async {
+        var subscription = transform(_NullOnCancelStream()).listen(null);
+        await subscription.cancel();
+      });
+    }
+
+    testNullCancel('asyncMapSample', (s) => s.asyncMapSample((_) async {}));
+    testNullCancel('buffer', (s) => s.buffer(_nonEndingStream()));
+    testNullCancel(
+        'combineLatestAll', (s) => s.combineLatestAll([_NullOnCancelStream()]));
+    testNullCancel('combineLatest',
+        (s) => s.combineLatest(_NullOnCancelStream(), (a, b) {}));
+    testNullCancel('merge', (s) => s.merge(_NullOnCancelStream()));
+
+    test('switchLatest', () async {
+      var subscription =
+          _NullOnCancelStream(Stream<Stream<void>>.value(_NullOnCancelStream()))
+              .switchLatest()
+              .listen(null);
+      await Future(() {});
+      await subscription.cancel();
+    });
+
+    test('concurrentAsyncExpand', () async {
+      var subscription = _NullOnCancelStream(Stream.value(null))
+          .concurrentAsyncExpand((_) => _NullOnCancelStream())
+          .listen(null);
+      await Future(() {});
+      await subscription.cancel();
+    });
+  });
+}
+
+class _NullOnCancelStream<T> extends StreamView<T> {
+  _NullOnCancelStream([Stream<T> stream]) : super(stream ?? _nonEndingStream());
+
+  @override
+  StreamSubscription<T> listen(void Function(T) onData,
+          {Function onError, void Function() onDone, bool cancelOnError}) =>
+      _NullOnCancelSubscription(super.listen(onData,
+          onError: onError, onDone: onDone, cancelOnError: cancelOnError));
+}
+
+class _NullOnCancelSubscription<T> extends DelegatingStreamSubscription<T> {
+  final StreamSubscription<T> _subscription;
+  _NullOnCancelSubscription(this._subscription) : super(_subscription);
+
+  @override
+  Future<void> cancel() {
+    _subscription.cancel();
+    return null;
+  }
+}
+
+Stream<T> _nonEndingStream<T>() => StreamController<T>().stream;
diff --git a/pkgs/stream_transform/test/scan_test.dart b/pkgs/stream_transform/test/scan_test.dart
index 4e58680..e3effb3 100644
--- a/pkgs/stream_transform/test/scan_test.dart
+++ b/pkgs/stream_transform/test/scan_test.dart
@@ -22,7 +22,7 @@
     test('can create a broadcast stream', () {
       var source = StreamController.broadcast();
 
-      var transformed = source.stream.scan(null, null);
+      var transformed = source.stream.scan(null, (_, __) {});
 
       expect(transformed.isBroadcast, true);
     });
diff --git a/pkgs/stream_transform/test/start_with_test.dart b/pkgs/stream_transform/test/start_with_test.dart
index 3a5a025..29241be 100644
--- a/pkgs/stream_transform/test/start_with_test.dart
+++ b/pkgs/stream_transform/test/start_with_test.dart
@@ -11,12 +11,12 @@
 import 'utils.dart';
 
 void main() {
-  StreamController<int> values;
-  Stream<int> transformed;
-  StreamSubscription<int> subscription;
+  late StreamController<int> values;
+  late Stream<int> transformed;
+  late StreamSubscription<int> subscription;
 
-  List<int> emittedValues;
-  bool isDone;
+  late List<int> emittedValues;
+  late bool isDone;
 
   void setupForStreamType(
       String streamType, Stream<int> Function(Stream<int>) transform) {
@@ -102,7 +102,7 @@
 
     for (var startingStreamType in streamTypes) {
       group('startWithStream [$startingStreamType] then [$streamType]', () {
-        StreamController<int> starting;
+        late StreamController<int> starting;
         setUp(() async {
           starting = createController(startingStreamType);
           setupForStreamType(
diff --git a/pkgs/stream_transform/test/switch_test.dart b/pkgs/stream_transform/test/switch_test.dart
index 16deb4b..b69c8b0 100644
--- a/pkgs/stream_transform/test/switch_test.dart
+++ b/pkgs/stream_transform/test/switch_test.dart
@@ -14,16 +14,16 @@
   for (var outerType in streamTypes) {
     for (var innerType in streamTypes) {
       group('Outer type: [$outerType], Inner type: [$innerType]', () {
-        StreamController<int> first;
-        StreamController<int> second;
-        StreamController<Stream<int>> outer;
+        late StreamController<int> first;
+        late StreamController<int> second;
+        late StreamController<Stream<int>> outer;
 
-        List<int> emittedValues;
-        bool firstCanceled;
-        bool outerCanceled;
-        bool isDone;
-        List<String> errors;
-        StreamSubscription<int> subscription;
+        late List<int> emittedValues;
+        late bool firstCanceled;
+        late bool outerCanceled;
+        late bool isDone;
+        late List<String> errors;
+        late StreamSubscription<int> subscription;
 
         setUp(() async {
           firstCanceled = false;
@@ -141,24 +141,11 @@
     test('can create a broadcast stream', () async {
       var outer = StreamController.broadcast();
 
-      var transformed = outer.stream.switchMap(null);
+      var transformed = outer.stream.switchMap((_) => const Stream.empty());
 
       expect(transformed.isBroadcast, true);
     });
 
-    test('handles null response from cancel', () async {
-      var outer = StreamController<Stream<int>>();
-      var inner = StreamController<int>();
-
-      var subscription =
-          NullOnCancelStream(outer.stream).switchLatest().listen(null);
-
-      outer.add(NullOnCancelStream(inner.stream));
-      await Future<void>(() {});
-
-      await subscription.cancel();
-    });
-
     test('forwards errors from the convert callback', () async {
       var errors = <String>[];
       var source = Stream.fromIterable([1, 2, 3]);
diff --git a/pkgs/stream_transform/test/take_until_test.dart b/pkgs/stream_transform/test/take_until_test.dart
index c25d342..11e906f 100644
--- a/pkgs/stream_transform/test/take_until_test.dart
+++ b/pkgs/stream_transform/test/take_until_test.dart
@@ -13,14 +13,14 @@
 void main() {
   for (var streamType in streamTypes) {
     group('takeUntil on Stream type [$streamType]', () {
-      StreamController<int> values;
-      List<int> emittedValues;
-      bool valuesCanceled;
-      bool isDone;
-      List<String> errors;
-      Stream<int> transformed;
-      StreamSubscription<int> subscription;
-      Completer<void> closeTrigger;
+      late StreamController<int> values;
+      late List<int> emittedValues;
+      late bool valuesCanceled;
+      late bool isDone;
+      late List<String> errors;
+      late Stream<int> transformed;
+      late StreamSubscription<int> subscription;
+      late Completer<void> closeTrigger;
 
       setUp(() {
         valuesCanceled = false;
diff --git a/pkgs/stream_transform/test/throttle_test.dart b/pkgs/stream_transform/test/throttle_test.dart
index e51ef84..d84fdf4 100644
--- a/pkgs/stream_transform/test/throttle_test.dart
+++ b/pkgs/stream_transform/test/throttle_test.dart
@@ -12,12 +12,12 @@
 void main() {
   for (var streamType in streamTypes) {
     group('Stream type [$streamType]', () {
-      StreamController<int> values;
-      List<int> emittedValues;
-      bool valuesCanceled;
-      bool isDone;
-      Stream<int> transformed;
-      StreamSubscription<int> subscription;
+      late StreamController<int> values;
+      late List<int> emittedValues;
+      late bool valuesCanceled;
+      late bool isDone;
+      late Stream<int> transformed;
+      late StreamSubscription<int> subscription;
 
       group('throttle', () {
         setUp(() async {
diff --git a/pkgs/stream_transform/test/utils.dart b/pkgs/stream_transform/test/utils.dart
index 4aa5837..b6196d6 100644
--- a/pkgs/stream_transform/test/utils.dart
+++ b/pkgs/stream_transform/test/utils.dart
@@ -4,8 +4,6 @@
 
 import 'dart:async';
 
-import 'package:async/async.dart';
-
 /// Cycle the event loop to ensure timers are started, then wait for a delay
 /// longer than [milliseconds] to allow for the timer to fire.
 Future<void> waitForTimer(int milliseconds) =>
@@ -25,26 +23,3 @@
 }
 
 const streamTypes = ['single subscription', 'broadcast'];
-
-class NullOnCancelStream<T> extends StreamView<T> {
-  final Stream<T> _stream;
-
-  NullOnCancelStream(this._stream) : super(_stream);
-
-  @override
-  StreamSubscription<T> listen(void Function(T) onData,
-          {Function onError, void Function() onDone, bool cancelOnError}) =>
-      _NullOnCancelSubscription(_stream.listen(onData,
-          onError: onError, onDone: onDone, cancelOnError: cancelOnError));
-}
-
-class _NullOnCancelSubscription<T> extends DelegatingStreamSubscription<T> {
-  final StreamSubscription<T> _subscription;
-  _NullOnCancelSubscription(this._subscription) : super(_subscription);
-
-  @override
-  Future<void> cancel() {
-    _subscription.cancel();
-    return null;
-  }
-}