Merge null_safety branch into master (#125)

Migrates this package to null safety and prepares for a null safety pre-release, once we have landed the package internally and in the SDK to verify its accuracy. 
diff --git a/.travis.yml b/.travis.yml
index 6fb2f8d..0789302 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,18 +1,33 @@
 language: dart
 
 dart:
-  - dev
-  - 2.2.0
+ - dev
 
-dart_task:
-  - test: --platform vm
-  - test: --platform chrome
-  - dartanalyzer
-  - dartfmt
+jobs:
+  include:
+    - stage: analyze_and_format
+      name: "Analyzer"
+      dart: be/raw/latest
+      os: linux
+      script: dartanalyzer --enable-experiment=non-nullable --fatal-warnings --fatal-infos .
+    - stage: analyze_and_format
+      name: "Format"
+      dart: be/raw/latest
+      os: linux
+      script: dartfmt -n --set-exit-if-changed .
+    - stage: test
+      name: "Vm Tests"
+      dart: be/raw/latest
+      os: linux
+      script: pub run --enable-experiment=non-nullable test -p vm
+
+stages:
+  - analyze_and_format
+  - test
 
 # Only building master means that we don't run two builds for each pull request.
 branches:
-  only: [master]
+  only: [master, null_safety]
 
 cache:
   directories:
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f247608..3ca7f6c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 2.5.0-nullsafety
+
+* Migrate this package to null safety.
+
 ## 2.4.2
 
 * `StreamQueue` starts listening immediately to broadcast strings.
diff --git a/analysis_options.yaml b/analysis_options.yaml
index 18c26e0..b58ef99 100644
--- a/analysis_options.yaml
+++ b/analysis_options.yaml
@@ -7,6 +7,8 @@
     todo: ignore
     # Lint provided by pkg:pedantic – should fix this!
     unawaited_futures: ignore
+  enable-experiment:
+    - non-nullable
 
 linter:
   rules:
diff --git a/lib/src/async_cache.dart b/lib/src/async_cache.dart
index e0a6f49..d6f5f7f 100644
--- a/lib/src/async_cache.dart
+++ b/lib/src/async_cache.dart
@@ -29,13 +29,13 @@
   final Duration _duration;
 
   /// Cached results of a previous [fetchStream] call.
-  StreamSplitter<T> _cachedStreamSplitter;
+  StreamSplitter<T>? _cachedStreamSplitter;
 
   /// Cached results of a previous [fetch] call.
-  Future<T> _cachedValueFuture;
+  Future<T>? _cachedValueFuture;
 
   /// Fires when the cache should be considered stale.
-  Timer _stale;
+  Timer? _stale;
 
   /// Creates a cache that invalidates its contents after [duration] has passed.
   ///
@@ -65,7 +65,7 @@
       await _cachedValueFuture;
       _startStaleTimer();
     }
-    return _cachedValueFuture;
+    return _cachedValueFuture!;
   }
 
   /// Returns a cached stream from a previous call to [fetchStream], or runs
@@ -78,12 +78,12 @@
     if (_cachedValueFuture != null) {
       throw StateError('Previously used to cache via `fetch`');
     }
-    _cachedStreamSplitter ??= StreamSplitter(
+    var splitter = _cachedStreamSplitter ??= StreamSplitter(
         callback().transform(StreamTransformer.fromHandlers(handleDone: (sink) {
       _startStaleTimer();
       sink.close();
     })));
-    return _cachedStreamSplitter.split();
+    return splitter.split();
   }
 
   /// Removes any cached value.
diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart
index 9fd0534..267c6d1 100644
--- a/lib/src/cancelable_operation.dart
+++ b/lib/src/cancelable_operation.dart
@@ -35,7 +35,7 @@
   /// moment this [CancelableOperation] is created, regardless of whether
   /// [inner] has completed yet or not.
   factory CancelableOperation.fromFuture(Future<T> inner,
-      {FutureOr Function() onCancel}) {
+      {FutureOr Function()? onCancel}) {
     var completer = CancelableCompleter<T>(onCancel: onCancel);
     completer.complete(inner);
     return completer.operation;
@@ -55,7 +55,7 @@
     value.then((value) {
       controller.add(value);
       controller.close();
-    }, onError: (error, StackTrace stackTrace) {
+    }, onError: (Object error, StackTrace stackTrace) {
       controller.addError(error, stackTrace);
       controller.close();
     });
@@ -68,8 +68,8 @@
   /// If this operation completes, this completes to the same result as [value].
   /// If this operation is cancelled, the returned future waits for the future
   /// returned by [cancel], then completes to [cancellationValue].
-  Future<T> valueOrCancellation([T cancellationValue]) {
-    var completer = Completer<T>.sync();
+  Future<T?> valueOrCancellation([T? cancellationValue]) {
+    var completer = Completer<T?>.sync();
     value.then((result) => completer.complete(result),
         onError: completer.completeError);
 
@@ -93,23 +93,24 @@
   /// If [propagateCancel] is `true` and the returned operation is canceled then
   /// this operation is canceled. The default is `false`.
   CancelableOperation<R> then<R>(FutureOr<R> Function(T) onValue,
-      {FutureOr<R> Function(Object, StackTrace) onError,
-      FutureOr<R> Function() onCancel,
+      {FutureOr<R> Function(Object, StackTrace)? onError,
+      FutureOr<R> Function()? onCancel,
       bool propagateCancel = false}) {
     final completer =
         CancelableCompleter<R>(onCancel: propagateCancel ? cancel : null);
 
-    valueOrCancellation().then((T result) {
+    valueOrCancellation().then((T? result) {
       if (!completer.isCanceled) {
         if (isCompleted) {
-          completer.complete(Future.sync(() => onValue(result)));
+          assert(result is T);
+          completer.complete(Future.sync(() => onValue(result!)));
         } else if (onCancel != null) {
           completer.complete(Future.sync(onCancel));
         } else {
           completer._cancel();
         }
       }
-    }, onError: (error, StackTrace stackTrace) {
+    }, onError: (Object error, StackTrace stackTrace) {
       if (!completer.isCanceled) {
         if (onError != null) {
           completer.complete(Future.sync(() => onError(error, stackTrace)));
@@ -145,7 +146,7 @@
   final Completer<T> _inner;
 
   /// The callback to call if the future is canceled.
-  final FutureOrCallback _onCancel;
+  final FutureOrCallback? _onCancel;
 
   /// Creates a new completer for a [CancelableOperation].
   ///
@@ -155,15 +156,14 @@
   ///
   /// [onCancel] will be called synchronously when the operation is canceled.
   /// It's guaranteed to only be called once.
-  CancelableCompleter({FutureOr Function() onCancel})
+  CancelableCompleter({FutureOr Function()? onCancel})
       : _onCancel = onCancel,
         _inner = Completer<T>() {
-    _operation = CancelableOperation<T>._(this);
+    operation = CancelableOperation<T>._(this);
   }
 
   /// The operation controlled by this completer.
-  CancelableOperation<T> get operation => _operation;
-  CancelableOperation<T> _operation;
+  late final CancelableOperation<T> operation;
 
   /// Whether the completer has completed.
   bool get isCompleted => _isCompleted;
@@ -180,7 +180,7 @@
   ///
   /// If [value] is a [Future], this will complete to the result of that
   /// [Future] once it completes.
-  void complete([FutureOr<T> value]) {
+  void complete([FutureOr<T>? value]) {
     if (_isCompleted) throw StateError('Operation already completed');
     _isCompleted = true;
 
@@ -200,14 +200,14 @@
     future.then((result) {
       if (_isCanceled) return;
       _inner.complete(result);
-    }, onError: (error, StackTrace stackTrace) {
+    }, onError: (Object error, StackTrace stackTrace) {
       if (_isCanceled) return;
       _inner.completeError(error, stackTrace);
     });
   }
 
   /// Completes [operation] to [error].
-  void completeError(Object error, [StackTrace stackTrace]) {
+  void completeError(Object error, [StackTrace? stackTrace]) {
     if (_isCompleted) throw StateError('Operation already completed');
     _isCompleted = true;
 
@@ -221,7 +221,8 @@
 
     return _cancelMemo.runOnce(() {
       _isCanceled = true;
-      if (_onCancel != null) return _onCancel();
+      var onCancel = _onCancel;
+      if (onCancel != null) return onCancel();
     });
   }
 }
diff --git a/lib/src/delegate/event_sink.dart b/lib/src/delegate/event_sink.dart
index bc33b19..33d88e9 100644
--- a/lib/src/delegate/event_sink.dart
+++ b/lib/src/delegate/event_sink.dart
@@ -33,7 +33,7 @@
   }
 
   @override
-  void addError(error, [StackTrace stackTrace]) {
+  void addError(error, [StackTrace? stackTrace]) {
     _sink.addError(error, stackTrace);
   }
 
diff --git a/lib/src/delegate/future.dart b/lib/src/delegate/future.dart
index 984caf6..1155452 100644
--- a/lib/src/delegate/future.dart
+++ b/lib/src/delegate/future.dart
@@ -25,11 +25,11 @@
   Stream<T> asStream() => _future.asStream();
 
   @override
-  Future<T> catchError(Function onError, {bool Function(Object error) test}) =>
+  Future<T> catchError(Function onError, {bool Function(Object error)? test}) =>
       _future.catchError(onError, test: test);
 
   @override
-  Future<S> then<S>(FutureOr<S> Function(T) onValue, {Function onError}) =>
+  Future<S> then<S>(FutureOr<S> Function(T) onValue, {Function? onError}) =>
       _future.then(onValue, onError: onError);
 
   @override
@@ -37,6 +37,6 @@
       _future.whenComplete(action);
 
   @override
-  Future<T> timeout(Duration timeLimit, {FutureOr<T> Function() onTimeout}) =>
+  Future<T> timeout(Duration timeLimit, {FutureOr<T> Function()? onTimeout}) =>
       _future.timeout(timeLimit, onTimeout: onTimeout);
 }
diff --git a/lib/src/delegate/stream_sink.dart b/lib/src/delegate/stream_sink.dart
index 9005d1d..ad76e90 100644
--- a/lib/src/delegate/stream_sink.dart
+++ b/lib/src/delegate/stream_sink.dart
@@ -36,7 +36,7 @@
   }
 
   @override
-  void addError(error, [StackTrace stackTrace]) {
+  void addError(error, [StackTrace? stackTrace]) {
     _sink.addError(error, stackTrace);
   }
 
diff --git a/lib/src/delegate/stream_subscription.dart b/lib/src/delegate/stream_subscription.dart
index 392e27b..45b1134 100644
--- a/lib/src/delegate/stream_subscription.dart
+++ b/lib/src/delegate/stream_subscription.dart
@@ -31,22 +31,22 @@
           : TypeSafeStreamSubscription<T>(subscription);
 
   @override
-  void onData(void Function(T) handleData) {
+  void onData(void Function(T)? handleData) {
     _source.onData(handleData);
   }
 
   @override
-  void onError(Function handleError) {
+  void onError(Function? handleError) {
     _source.onError(handleError);
   }
 
   @override
-  void onDone(void Function() handleDone) {
+  void onDone(void Function()? handleDone) {
     _source.onDone(handleDone);
   }
 
   @override
-  void pause([Future resumeFuture]) {
+  void pause([Future? resumeFuture]) {
     _source.pause(resumeFuture);
   }
 
@@ -59,7 +59,7 @@
   Future cancel() => _source.cancel();
 
   @override
-  Future<E> asFuture<E>([E futureValue]) => _source.asFuture(futureValue);
+  Future<E> asFuture<E>([E? futureValue]) => _source.asFuture(futureValue);
 
   @override
   bool get isPaused => _source.isPaused;
diff --git a/lib/src/future_group.dart b/lib/src/future_group.dart
index 402ae46..3a6291f 100644
--- a/lib/src/future_group.dart
+++ b/lib/src/future_group.dart
@@ -44,13 +44,13 @@
   Stream get onIdle =>
       (_onIdleController ??= StreamController.broadcast(sync: true)).stream;
 
-  StreamController _onIdleController;
+  StreamController? _onIdleController;
 
   /// The values emitted by the futures that have been added to the group, in
   /// the order they were added.
   ///
   /// The slots for futures that haven't completed yet are `null`.
-  final _values = <T>[];
+  final _values = <T?>[];
 
   /// Wait for [task] to complete.
   @override
@@ -71,12 +71,13 @@
       _values[index] = value;
 
       if (_pending != 0) return null;
-      if (_onIdleController != null) _onIdleController.add(null);
+      var onIdleController = _onIdleController;
+      if (onIdleController != null) onIdleController.add(null);
 
       if (!_closed) return null;
-      if (_onIdleController != null) _onIdleController.close();
-      _completer.complete(_values);
-    }).catchError((error, StackTrace stackTrace) {
+      if (onIdleController != null) onIdleController.close();
+      _completer.complete(_values.whereType<T>().toList());
+    }).catchError((Object error, StackTrace stackTrace) {
       if (_completer.isCompleted) return null;
       _completer.completeError(error, stackTrace);
     });
@@ -89,6 +90,6 @@
     _closed = true;
     if (_pending != 0) return;
     if (_completer.isCompleted) return;
-    _completer.complete(_values);
+    _completer.complete(_values.whereType<T>().toList());
   }
 }
diff --git a/lib/src/lazy_stream.dart b/lib/src/lazy_stream.dart
index f5e65d1..e6779ee 100644
--- a/lib/src/lazy_stream.dart
+++ b/lib/src/lazy_stream.dart
@@ -15,7 +15,7 @@
 /// produce a `Stream`.
 class LazyStream<T> extends Stream<T> {
   /// The callback that's called to create the inner stream.
-  FutureOrCallback<Stream<T>> _callback;
+  FutureOrCallback<Stream<T>>? _callback;
 
   /// Creates a single-subscription `Stream` that calls [callback] when it gets
   /// a listener and forwards to the returned stream.
@@ -25,15 +25,15 @@
   }
 
   @override
-  StreamSubscription<T> listen(void Function(T) onData,
-      {Function onError, void Function() onDone, bool cancelOnError}) {
-    if (_callback == null) {
+  StreamSubscription<T> listen(void Function(T)? onData,
+      {Function? onError, void Function()? onDone, bool? cancelOnError}) {
+    var callback = _callback;
+    if (callback == null) {
       throw StateError('Stream has already been listened to.');
     }
 
     // Null out the callback before we invoke it to ensure that even while
     // running it, this can't be called twice.
-    var callback = _callback;
     _callback = null;
     var result = callback();
 
@@ -41,7 +41,7 @@
     if (result is Future<Stream<T>>) {
       stream = StreamCompleter.fromFuture(result);
     } else {
-      stream = result as Stream<T>;
+      stream = result;
     }
 
     return stream.listen(onData,
diff --git a/lib/src/null_stream_sink.dart b/lib/src/null_stream_sink.dart
index b28df2a..34b100b 100644
--- a/lib/src/null_stream_sink.dart
+++ b/lib/src/null_stream_sink.dart
@@ -43,12 +43,12 @@
   ///
   /// If [done] is passed, it's used as the [Sink.done] future. Otherwise, a
   /// completed future is used.
-  NullStreamSink({Future done}) : done = done ?? Future.value();
+  NullStreamSink({Future? done}) : done = done ?? Future.value();
 
   /// Creates a null sink whose [done] future emits [error].
   ///
   /// Note that this error will not be considered uncaught.
-  NullStreamSink.error(error, [StackTrace stackTrace])
+  NullStreamSink.error(Object error, [StackTrace? stackTrace])
       : done = Future.error(error, stackTrace)
           // Don't top-level the error. This gives the user a change to call
           // [close] or [done], and matches the behavior of a remote endpoint
@@ -61,7 +61,7 @@
   }
 
   @override
-  void addError(error, [StackTrace stackTrace]) {
+  void addError(Object error, [StackTrace? stackTrace]) {
     _checkEventAllowed();
   }
 
@@ -70,7 +70,7 @@
     _checkEventAllowed();
 
     _addingStream = true;
-    var future = stream.listen(null).cancel() ?? Future.value();
+    var future = stream.listen(null).cancel();
     return future.whenComplete(() {
       _addingStream = false;
     });
diff --git a/lib/src/result/capture_sink.dart b/lib/src/result/capture_sink.dart
index a7c3a59..562f5f9 100644
--- a/lib/src/result/capture_sink.dart
+++ b/lib/src/result/capture_sink.dart
@@ -18,7 +18,7 @@
   }
 
   @override
-  void addError(Object error, [StackTrace stackTrace]) {
+  void addError(Object error, [StackTrace? stackTrace]) {
     _sink.add(Result.error(error, stackTrace));
   }
 
diff --git a/lib/src/result/error.dart b/lib/src/result/error.dart
index 76e0275..27ce155 100644
--- a/lib/src/result/error.dart
+++ b/lib/src/result/error.dart
@@ -8,7 +8,7 @@
 import 'value.dart';
 
 /// A result representing a thrown error.
-class ErrorResult implements Result<Null> {
+class ErrorResult implements Result<Never> {
   /// The error object that was thrown.
   final Object error;
 
@@ -20,11 +20,13 @@
   @override
   bool get isError => true;
   @override
-  ValueResult<Null> get asValue => null;
+  ValueResult<Never>? get asValue => null;
   @override
   ErrorResult get asError => this;
 
-  ErrorResult(this.error, this.stackTrace);
+  ErrorResult(this.error, [StackTrace? stackTrace])
+      // TODO: Use AsyncError.defaultStackTrace(error) once available
+      : stackTrace = stackTrace ?? StackTrace.fromString('');
 
   @override
   void complete(Completer completer) {
@@ -37,7 +39,7 @@
   }
 
   @override
-  Future<Null> get asFuture => Future<Null>.error(error, stackTrace);
+  Future<Never> get asFuture => Future<Never>.error(error, stackTrace);
 
   /// Calls an error handler with the error and stacktrace.
   ///
diff --git a/lib/src/result/future.dart b/lib/src/result/future.dart
index ff30546..20a5ebf 100644
--- a/lib/src/result/future.dart
+++ b/lib/src/result/future.dart
@@ -16,8 +16,8 @@
   /// The result of the wrapped [Future], if it's completed.
   ///
   /// If it hasn't completed yet, this will be `null`.
-  Result<T> get result => _result;
-  Result<T> _result;
+  Result<T>? get result => _result;
+  Result<T>? _result;
 
   ResultFuture(Future<T> future) : super(future) {
     Result.capture(future).then((result) {
diff --git a/lib/src/result/release_sink.dart b/lib/src/result/release_sink.dart
index 5d8267a..bf6dd50 100644
--- a/lib/src/result/release_sink.dart
+++ b/lib/src/result/release_sink.dart
@@ -18,7 +18,7 @@
   }
 
   @override
-  void addError(Object error, [StackTrace stackTrace]) {
+  void addError(Object error, [StackTrace? stackTrace]) {
     // Errors may be added by intermediate processing, even if it is never
     // added by CaptureSink.
     _sink.addError(error, stackTrace);
diff --git a/lib/src/result/result.dart b/lib/src/result/result.dart
index e604782..5f93b55 100644
--- a/lib/src/result/result.dart
+++ b/lib/src/result/result.dart
@@ -63,7 +63,7 @@
   factory Result(T Function() computation) {
     try {
       return ValueResult<T>(computation());
-    } catch (e, s) {
+    } on Object catch (e, s) {
       return ErrorResult(e, s);
     }
   }
@@ -76,7 +76,7 @@
   /// Creates a `Result` holding an error.
   ///
   /// Alias for [ErrorResult.ErrorResult].
-  factory Result.error(Object error, [StackTrace stackTrace]) =>
+  factory Result.error(Object error, [StackTrace? stackTrace]) =>
       ErrorResult(error, stackTrace);
 
   /// Captures the result of a future into a `Result` future.
@@ -85,7 +85,7 @@
   /// Errors have been converted to an [ErrorResult] value.
   static Future<Result<T>> capture<T>(Future<T> future) {
     return future.then((value) => ValueResult(value),
-        onError: (error, StackTrace stackTrace) =>
+        onError: (Object error, StackTrace stackTrace) =>
             ErrorResult(error, stackTrace));
   }
 
@@ -97,9 +97,9 @@
   /// wrapped as a [Result.value].
   /// The returned future will never have an error.
   static Future<List<Result<T>>> captureAll<T>(Iterable<FutureOr<T>> elements) {
-    var results = <Result<T>>[];
+    var results = <Result<T>?>[];
     var pending = 0;
-    Completer<List<Result<T>>> completer;
+    late Completer<List<Result<T>>> completer;
     for (var element in elements) {
       if (element is Future<T>) {
         var i = results.length;
@@ -108,15 +108,15 @@
         Result.capture<T>(element).then((result) {
           results[i] = result;
           if (--pending == 0) {
-            completer.complete(results);
+            completer.complete(List.from(results));
           }
         });
       } else {
-        results.add(Result<T>.value(element as T));
+        results.add(Result<T>.value(element));
       }
     }
     if (pending == 0) {
-      return Future<List<Result<T>>>.value(results);
+      return Future.value(List.from(results));
     }
     completer = Completer<List<Result<T>>>();
     return completer.future;
@@ -172,8 +172,8 @@
   /// Otherwise both levels of results are value results, and a single
   /// result with the value is returned.
   static Result<T> flatten<T>(Result<Result<T>> result) {
-    if (result.isValue) return result.asValue.value;
-    return result.asError;
+    if (result.isValue) return result.asValue!.value;
+    return result.asError!;
   }
 
   /// Converts a sequence of results to a result of a list.
@@ -184,9 +184,9 @@
     var values = <T>[];
     for (var result in results) {
       if (result.isValue) {
-        values.add(result.asValue.value);
+        values.add(result.asValue!.value);
       } else {
-        return result.asError;
+        return result.asError!;
       }
     }
     return Result<List<T>>.value(values);
@@ -205,12 +205,12 @@
   /// If this is a value result, returns itself.
   ///
   /// Otherwise returns `null`.
-  ValueResult<T> get asValue;
+  ValueResult<T>? get asValue;
 
   /// If this is an error result, returns itself.
   ///
   /// Otherwise returns `null`.
-  ErrorResult get asError;
+  ErrorResult? get asError;
 
   /// Completes a completer with this result.
   void complete(Completer<T> completer);
diff --git a/lib/src/result/value.dart b/lib/src/result/value.dart
index 5c1a60f..7cfc474 100644
--- a/lib/src/result/value.dart
+++ b/lib/src/result/value.dart
@@ -19,7 +19,7 @@
   @override
   ValueResult<T> get asValue => this;
   @override
-  ErrorResult get asError => null;
+  ErrorResult? get asError => null;
 
   ValueResult(this.value);
 
diff --git a/lib/src/single_subscription_transformer.dart b/lib/src/single_subscription_transformer.dart
index fe939fc..2e056b2 100644
--- a/lib/src/single_subscription_transformer.dart
+++ b/lib/src/single_subscription_transformer.dart
@@ -11,14 +11,14 @@
 /// listening to a stream as soon as it's bound.
 ///
 /// This also casts the source stream's events to type `T`. If the cast fails,
-/// the result stream will emit a [CastError]. This behavior is deprecated, and
+/// the result stream will emit a [TypeError]. This behavior is deprecated, and
 /// should not be relied upon.
 class SingleSubscriptionTransformer<S, T> extends StreamTransformerBase<S, T> {
   const SingleSubscriptionTransformer();
 
   @override
   Stream<T> bind(Stream<S> stream) {
-    StreamSubscription<S> subscription;
+    late StreamSubscription<S> subscription;
     var controller =
         StreamController<T>(sync: true, onCancel: () => subscription.cancel());
     subscription = stream.listen((value) {
@@ -26,7 +26,7 @@
       // type parameter and avoid this conversion.
       try {
         controller.add(value as T);
-      } on CastError catch (error, stackTrace) {
+      } on TypeError catch (error, stackTrace) {
         controller.addError(error, stackTrace);
       }
     }, onError: controller.addError, onDone: controller.close);
diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart
index 5128d57..27034c2 100644
--- a/lib/src/stream_completer.dart
+++ b/lib/src/stream_completer.dart
@@ -97,7 +97,7 @@
   ///
   /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at
   /// most once. Trying to call any of them again will fail.
-  void setError(error, [StackTrace stackTrace]) {
+  void setError(Object error, [StackTrace? stackTrace]) {
     setSourceStream(Stream.fromFuture(Future.error(error, stackTrace)));
   }
 }
@@ -108,30 +108,31 @@
   ///
   /// Created if the user listens on this stream before the source stream
   /// is set, or if using [_setEmpty] so there is no source stream.
-  StreamController<T> _controller;
+  StreamController<T>? _controller;
 
   /// Source stream for the events provided by this stream.
   ///
   /// Set when the completer sets the source stream using [_setSourceStream]
   /// or [_setEmpty].
-  Stream<T> _sourceStream;
+  Stream<T>? _sourceStream;
 
   @override
-  StreamSubscription<T> listen(void Function(T) onData,
-      {Function onError, void Function() onDone, bool cancelOnError}) {
+  StreamSubscription<T> listen(void Function(T)? onData,
+      {Function? onError, void Function()? onDone, bool? cancelOnError}) {
     if (_controller == null) {
-      if (_sourceStream != null && !_sourceStream.isBroadcast) {
+      var sourceStream = _sourceStream;
+      if (sourceStream != null && !sourceStream.isBroadcast) {
         // If the source stream is itself single subscription,
         // just listen to it directly instead of creating a controller.
-        return _sourceStream.listen(onData,
+        return sourceStream.listen(onData,
             onError: onError, onDone: onDone, cancelOnError: cancelOnError);
       }
-      _createController();
+      _ensureController();
       if (_sourceStream != null) {
         _linkStreamToController();
       }
     }
-    return _controller.stream.listen(onData,
+    return _controller!.stream.listen(onData,
         onError: onError, onDone: onDone, cancelOnError: cancelOnError);
   }
 
@@ -157,11 +158,10 @@
 
   /// Links source stream to controller when both are available.
   void _linkStreamToController() {
-    assert(_controller != null);
-    assert(_sourceStream != null);
-    _controller
-        .addStream(_sourceStream, cancelOnError: false)
-        .whenComplete(_controller.close);
+    var controller = _controller!;
+    controller
+        .addStream(_sourceStream!, cancelOnError: false)
+        .whenComplete(controller.close);
   }
 
   /// Sets an empty source stream.
@@ -170,16 +170,13 @@
   /// immediately.
   void _setEmpty() {
     assert(_sourceStream == null);
-    if (_controller == null) {
-      _createController();
-    }
-    _sourceStream = _controller.stream; // Mark stream as set.
-    _controller.close();
+    var controller = _ensureController();
+    _sourceStream = controller.stream; // Mark stream as set.
+    controller.close();
   }
 
   // Creates the [_controller].
-  void _createController() {
-    assert(_controller == null);
-    _controller = StreamController<T>(sync: true);
+  StreamController<T> _ensureController() {
+    return _controller ??= StreamController<T>(sync: true);
   }
 }
diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart
index e9a4afc..8c17ec2 100644
--- a/lib/src/stream_group.dart
+++ b/lib/src/stream_group.dart
@@ -29,7 +29,7 @@
 class StreamGroup<T> implements Sink<Stream<T>> {
   /// The stream through which all events from streams in the group are emitted.
   Stream<T> get stream => _controller.stream;
-  StreamController<T> _controller;
+  late StreamController<T> _controller;
 
   /// Whether the group is closed, meaning that no more streams may be added.
   var _closed = false;
@@ -47,7 +47,7 @@
   /// subscriptions will be canceled and set to null again. Single-subscriber
   /// stream subscriptions will be left intact, since they can't be
   /// re-subscribed.
-  final _subscriptions = <Stream<T>, StreamSubscription<T>>{};
+  final _subscriptions = <Stream<T>, StreamSubscription<T>?>{};
 
   /// Merges the events from [streams] into a single single-subscription stream.
   ///
@@ -100,7 +100,7 @@
   ///
   /// Throws a [StateError] if this group is closed.
   @override
-  Future add(Stream<T> stream) {
+  Future? add(Stream<T> stream) {
     if (_closed) {
       throw StateError("Can't add a Stream to a closed StreamGroup.");
     }
@@ -130,7 +130,7 @@
   ///
   /// If [stream]'s subscription is canceled, this returns
   /// [StreamSubscription.cancel]'s return value. Otherwise, it returns `null`.
-  Future remove(Stream<T> stream) {
+  Future? remove(Stream<T> stream) {
     var subscription = _subscriptions.remove(stream);
     var future = subscription == null ? null : subscription.cancel();
     if (_closed && _subscriptions.isEmpty) _controller.close();
@@ -155,7 +155,7 @@
   void _onPause() {
     _state = _StreamGroupState.paused;
     for (var subscription in _subscriptions.values) {
-      subscription.pause();
+      subscription!.pause();
     }
   }
 
@@ -163,19 +163,18 @@
   void _onResume() {
     _state = _StreamGroupState.listening;
     for (var subscription in _subscriptions.values) {
-      subscription.resume();
+      subscription!.resume();
     }
   }
 
   /// A callback called when [stream] is canceled.
   ///
   /// This is only called for single-subscription groups.
-  Future _onCancel() {
+  Future? _onCancel() {
     _state = _StreamGroupState.canceled;
 
     var futures = _subscriptions.values
-        .map((subscription) => subscription.cancel())
-        .where((future) => future != null)
+        .map((subscription) => subscription!.cancel())
         .toList();
 
     _subscriptions.clear();
@@ -194,7 +193,7 @@
       // will still be added to [_controller], but then they'll be dropped since
       // it has no listeners.
       if (!stream.isBroadcast) return;
-      subscription.cancel();
+      subscription!.cancel();
       _subscriptions[stream] = null;
     });
   }
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 278e249..5aa6054 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -87,7 +87,7 @@
   ///
   /// Set to subscription when listening, and set to `null` when the
   /// subscription is done (and [_isDone] is set to true).
-  StreamSubscription<T> _subscription;
+  StreamSubscription<T>? _subscription;
 
   /// Whether the event source is done.
   bool _isDone = false;
@@ -343,7 +343,7 @@
         transaction.reject();
       }
       return result;
-    }, onError: (error) {
+    }, onError: (Object error) {
       transaction.commit(queue);
       throw error;
     });
@@ -399,7 +399,7 @@
   /// After calling `cancel`, no further events can be requested.
   /// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel]
   /// may be called again.
-  Future cancel({bool immediate = false}) {
+  Future? cancel({bool immediate = false}) {
     if (_isClosed) throw _failClosed();
     _isClosed = true;
 
@@ -454,11 +454,10 @@
     }
     _isDone = true;
 
-    if (_subscription == null) {
+    var subscription = _subscription;
+    if (subscription == null) {
       return _source;
     }
-
-    var subscription = _subscription;
     _subscription = null;
 
     var wasPaused = subscription.isPaused;
@@ -475,7 +474,7 @@
   ///
   /// The event source is restarted by the next call to [_ensureListening].
   void _pause() {
-    _subscription.pause();
+    _subscription!.pause();
   }
 
   /// Ensures that we are listening on events from the event source.
@@ -488,22 +487,22 @@
     if (_subscription == null) {
       _subscription = _source.listen((data) {
         _addResult(Result.value(data));
-      }, onError: (error, StackTrace stackTrace) {
+      }, onError: (Object error, StackTrace stackTrace) {
         _addResult(Result.error(error, stackTrace));
       }, onDone: () {
         _subscription = null;
         _close();
       });
     } else {
-      _subscription.resume();
+      _subscription!.resume();
     }
   }
 
   /// Cancels the underlying event source.
-  Future _cancel() {
+  Future? _cancel() {
     if (_isDone) return null;
     _subscription ??= _source.listen(null);
-    var future = _subscription.cancel();
+    var future = _subscription!.cancel();
     _close();
     return future;
   }
@@ -771,7 +770,8 @@
 
       var event = events.removeFirst();
       if (event.isError) {
-        _completer.completeError(event.asError.error, event.asError.stackTrace);
+        _completer.completeError(
+            event.asError!.error, event.asError!.stackTrace);
         return true;
       }
     }
@@ -814,10 +814,10 @@
 
       var event = events.removeFirst();
       if (event.isError) {
-        event.asError.complete(_completer);
+        event.asError!.complete(_completer);
         return true;
       }
-      _list.add(event.asValue.value);
+      _list.add(event.asValue!.value);
     }
     _completer.complete(_list);
     return true;
@@ -837,10 +837,10 @@
       }
       var event = events.elementAt(_list.length);
       if (event.isError) {
-        event.asError.complete(_completer);
+        event.asError!.complete(_completer);
         return true;
       }
-      _list.add(event.asValue.value);
+      _list.add(event.asValue!.value);
     }
     _completer.complete(_list);
     return true;
@@ -954,8 +954,7 @@
 /// [StreamQueue._updateRequests].
 class _TransactionRequest<T> implements _EventRequest<T> {
   /// The transaction created by this request.
-  StreamQueueTransaction<T> get transaction => _transaction;
-  StreamQueueTransaction<T> _transaction;
+  late final StreamQueueTransaction<T> transaction;
 
   /// The controller that passes events to [transaction].
   final _controller = StreamController<T>(sync: true);
@@ -964,7 +963,7 @@
   var _eventsSent = 0;
 
   _TransactionRequest(StreamQueue<T> parent) {
-    _transaction = StreamQueueTransaction._(parent, _controller.stream);
+    transaction = StreamQueueTransaction._(parent, _controller.stream);
   }
 
   @override
@@ -973,6 +972,6 @@
       events[_eventsSent++].addTo(_controller);
     }
     if (isDone && !_controller.isClosed) _controller.close();
-    return transaction._committed || _transaction._rejected;
+    return transaction._committed || transaction._rejected;
   }
 }
diff --git a/lib/src/stream_sink_completer.dart b/lib/src/stream_sink_completer.dart
index ebfd717..10e549d 100644
--- a/lib/src/stream_sink_completer.dart
+++ b/lib/src/stream_sink_completer.dart
@@ -70,7 +70,7 @@
   ///
   /// Either of [setDestinationSink] or [setError] may be called at most once.
   /// Trying to call either of them again will fail.
-  void setError(error, [StackTrace stackTrace]) {
+  void setError(Object error, [StackTrace? stackTrace]) {
     setDestinationSink(NullStreamSink.error(error, stackTrace));
   }
 }
@@ -81,18 +81,18 @@
   ///
   /// Created if the user adds events to this sink before the destination sink
   /// is set.
-  StreamController<T> _controller;
+  StreamController<T>? _controller;
 
   /// Completer for [done].
   ///
   /// Created if the user requests the [done] future before the destination sink
   /// is set.
-  Completer _doneCompleter;
+  Completer? _doneCompleter;
 
   /// Destination sink for the events added to this sink.
   ///
   /// Set when [StreamSinkCompleter.setDestinationSink] is called.
-  StreamSink<T> _destinationSink;
+  StreamSink<T>? _destinationSink;
 
   /// Whether events should be sent directly to [_destinationSink], as opposed
   /// to going through [_controller].
@@ -100,56 +100,52 @@
 
   @override
   Future get done {
-    if (_doneCompleter != null) return _doneCompleter.future;
+    if (_doneCompleter != null) return _doneCompleter!.future;
     if (_destinationSink == null) {
       _doneCompleter = Completer.sync();
-      return _doneCompleter.future;
+      return _doneCompleter!.future;
     }
-    return _destinationSink.done;
+    return _destinationSink!.done;
   }
 
   @override
   void add(T event) {
     if (_canSendDirectly) {
-      _destinationSink.add(event);
+      _destinationSink!.add(event);
     } else {
-      _ensureController();
-      _controller.add(event);
+      _ensureController().add(event);
     }
   }
 
   @override
-  void addError(error, [StackTrace stackTrace]) {
+  void addError(error, [StackTrace? stackTrace]) {
     if (_canSendDirectly) {
-      _destinationSink.addError(error, stackTrace);
+      _destinationSink!.addError(error, stackTrace);
     } else {
-      _ensureController();
-      _controller.addError(error, stackTrace);
+      _ensureController().addError(error, stackTrace);
     }
   }
 
   @override
   Future addStream(Stream<T> stream) {
-    if (_canSendDirectly) return _destinationSink.addStream(stream);
+    if (_canSendDirectly) return _destinationSink!.addStream(stream);
 
-    _ensureController();
-    return _controller.addStream(stream, cancelOnError: false);
+    return _ensureController().addStream(stream, cancelOnError: false);
   }
 
   @override
   Future close() {
     if (_canSendDirectly) {
-      _destinationSink.close();
+      _destinationSink!.close();
     } else {
-      _ensureController();
-      _controller.close();
+      _ensureController().close();
     }
     return done;
   }
 
   /// Create [_controller] if it doesn't yet exist.
-  void _ensureController() {
-    _controller ??= StreamController(sync: true);
+  StreamController<T> _ensureController() {
+    return _controller ??= StreamController(sync: true);
   }
 
   /// Sets the destination sink to which events from this sink will be provided.
@@ -168,7 +164,7 @@
       // Catch any error that may come from [addStream] or [sink.close]. They'll
       // be reported through [done] anyway.
       sink
-          .addStream(_controller.stream)
+          .addStream(_controller!.stream)
           .whenComplete(sink.close)
           .catchError((_) {});
     }
@@ -176,7 +172,7 @@
     // If the user has already asked when the sink is done, connect the sink's
     // done callback to that completer.
     if (_doneCompleter != null) {
-      _doneCompleter.complete(sink.done);
+      _doneCompleter!.complete(sink.done);
     }
   }
 }
diff --git a/lib/src/stream_sink_transformer.dart b/lib/src/stream_sink_transformer.dart
index 1dc27ed..bdcb196 100644
--- a/lib/src/stream_sink_transformer.dart
+++ b/lib/src/stream_sink_transformer.dart
@@ -34,9 +34,9 @@
   /// they're passed are forwarded to the inner sink. If a handler is omitted,
   /// the event is passed through unaltered.
   factory StreamSinkTransformer.fromHandlers(
-      {void Function(S, EventSink<T>) handleData,
-      void Function(Object, StackTrace, EventSink<T>) handleError,
-      void Function(EventSink<T>) handleDone}) {
+      {void Function(S, EventSink<T>)? handleData,
+      void Function(Object, StackTrace, EventSink<T>)? handleError,
+      void Function(EventSink<T>)? handleDone}) {
     return HandlerTransformer<S, T>(handleData, handleError, handleDone);
   }
 
diff --git a/lib/src/stream_sink_transformer/handler_transformer.dart b/lib/src/stream_sink_transformer/handler_transformer.dart
index 8d37ce3..b112e3e 100644
--- a/lib/src/stream_sink_transformer/handler_transformer.dart
+++ b/lib/src/stream_sink_transformer/handler_transformer.dart
@@ -11,6 +11,9 @@
 typedef HandleData<S, T> = void Function(S data, EventSink<T> sink);
 
 /// The type of the callback for handling error events.
+//
+// TODO: Update to take a non-nullable StackTrace once that change lands in
+// the sdk.
 typedef HandleError<T> = void Function(
     Object error, StackTrace stackTrace, EventSink<T> sink);
 
@@ -20,13 +23,13 @@
 /// A [StreamSinkTransformer] that delegates events to the given handlers.
 class HandlerTransformer<S, T> implements StreamSinkTransformer<S, T> {
   /// The handler for data events.
-  final HandleData<S, T> _handleData;
+  final HandleData<S, T>? _handleData;
 
   /// The handler for error events.
-  final HandleError<T> _handleError;
+  final HandleError<T>? _handleError;
 
   /// The handler for done events.
-  final HandleDone<T> _handleDone;
+  final HandleDone<T>? _handleDone;
 
   HandlerTransformer(this._handleData, this._handleError, this._handleDone);
 
@@ -55,19 +58,22 @@
 
   @override
   void add(S event) {
-    if (_transformer._handleData == null) {
+    var handleData = _transformer._handleData;
+    if (handleData == null) {
       _inner.add(event as T);
     } else {
-      _transformer._handleData(event, _safeCloseInner);
+      handleData(event, _safeCloseInner);
     }
   }
 
   @override
-  void addError(error, [StackTrace stackTrace]) {
-    if (_transformer._handleError == null) {
+  void addError(error, [StackTrace? stackTrace]) {
+    var handleError = _transformer._handleError;
+    if (handleError == null) {
       _inner.addError(error, stackTrace);
     } else {
-      _transformer._handleError(error, stackTrace, _safeCloseInner);
+      handleError(error, stackTrace ?? AsyncError.defaultStackTrace(error),
+          _safeCloseInner);
     }
   }
 
@@ -82,9 +88,10 @@
 
   @override
   Future close() {
-    if (_transformer._handleDone == null) return _inner.close();
+    var handleDone = _transformer._handleDone;
+    if (handleDone == null) return _inner.close();
 
-    _transformer._handleDone(_safeCloseInner);
+    handleDone(_safeCloseInner);
     return _inner.done;
   }
 }
diff --git a/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart b/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart
index 1df7e5a..2afcbff 100644
--- a/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart
+++ b/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart
@@ -50,7 +50,7 @@
   }
 
   @override
-  void addError(error, [StackTrace stackTrace]) {
+  void addError(error, [StackTrace? stackTrace]) {
     _controller.addError(error, stackTrace);
   }
 
diff --git a/lib/src/stream_splitter.dart b/lib/src/stream_splitter.dart
index e9f326e..f7377d6 100644
--- a/lib/src/stream_splitter.dart
+++ b/lib/src/stream_splitter.dart
@@ -28,7 +28,7 @@
   /// The subscription to [_stream].
   ///
   /// This will be `null` until a branch has a listener.
-  StreamSubscription<T> _subscription;
+  StreamSubscription<T>? _subscription;
 
   /// The buffer of events or errors that have already been emitted by
   /// [_stream].
@@ -57,7 +57,7 @@
   ///
   /// [count] defaults to 2. This is the same as creating [count] branches and
   /// then closing the [StreamSplitter].
-  static List<Stream<T>> splitFrom<T>(Stream<T> stream, [int count]) {
+  static List<Stream<T>> splitFrom<T>(Stream<T> stream, [int? count]) {
     count ??= 2;
     var splitter = StreamSplitter<T>(stream);
     var streams = List<Stream<T>>.generate(count, (_) => splitter.split());
@@ -125,8 +125,8 @@
     assert(_controllers.isEmpty);
     assert(_isClosed);
 
-    Future future;
-    if (_subscription != null) future = _subscription.cancel();
+    Future? future;
+    if (_subscription != null) future = _subscription!.cancel();
     if (future != null) _closeGroup.add(future);
     _closeGroup.close();
   }
@@ -142,7 +142,7 @@
       // Resume the subscription in case it was paused, either because all the
       // controllers were paused or because the last one was canceled. If it
       // wasn't paused, this will be a no-op.
-      _subscription.resume();
+      _subscription!.resume();
     } else {
       _subscription =
           _stream.listen(_onData, onError: _onError, onDone: _onDone);
@@ -152,14 +152,14 @@
   /// Pauses [_subscription] if every controller is paused.
   void _onPause() {
     if (!_controllers.every((controller) => controller.isPaused)) return;
-    _subscription.pause();
+    _subscription!.pause();
   }
 
   /// Resumes [_subscription].
   ///
   /// If [_subscription] wasn't paused, this is a no-op.
   void _onResume() {
-    _subscription.resume();
+    _subscription!.resume();
   }
 
   /// Removes [controller] from [_controllers] and cancels or pauses
@@ -175,7 +175,7 @@
     if (_isClosed) {
       _cancelSubscription();
     } else {
-      _subscription.pause();
+      _subscription!.pause();
     }
   }
 
diff --git a/lib/src/stream_subscription_transformer.dart b/lib/src/stream_subscription_transformer.dart
index 0e28972..d03ea70 100644
--- a/lib/src/stream_subscription_transformer.dart
+++ b/lib/src/stream_subscription_transformer.dart
@@ -28,9 +28,9 @@
 /// [StreamSubscription]: [handleCancel] must call `cancel()`, [handlePause]
 /// must call `pause()`, and [handleResume] must call `resume()`.
 StreamTransformer<T, T> subscriptionTransformer<T>(
-    {Future Function(StreamSubscription<T>) handleCancel,
-    void Function(StreamSubscription<T>) handlePause,
-    void Function(StreamSubscription<T>) handleResume}) {
+    {Future Function(StreamSubscription<T>)? handleCancel,
+    void Function(StreamSubscription<T>)? handlePause,
+    void Function(StreamSubscription<T>)? handleResume}) {
   return StreamTransformer((stream, cancelOnError) {
     return _TransformedSubscription(
         stream.listen(null, cancelOnError: cancelOnError),
@@ -50,7 +50,7 @@
 /// methods.
 class _TransformedSubscription<T> implements StreamSubscription<T> {
   /// The wrapped subscription.
-  StreamSubscription<T> _inner;
+  StreamSubscription<T>? _inner;
 
   /// The callback to run when [cancel] is called.
   final _AsyncHandler<T> _handleCancel;
@@ -68,47 +68,47 @@
       this._inner, this._handleCancel, this._handlePause, this._handleResume);
 
   @override
-  void onData(void Function(T) handleData) {
+  void onData(void Function(T)? handleData) {
     _inner?.onData(handleData);
   }
 
   @override
-  void onError(Function handleError) {
+  void onError(Function? handleError) {
     _inner?.onError(handleError);
   }
 
   @override
-  void onDone(void Function() handleDone) {
+  void onDone(void Function()? handleDone) {
     _inner?.onDone(handleDone);
   }
 
   @override
   Future cancel() => _cancelMemoizer.runOnce(() {
-        var inner = _inner;
-        _inner.onData(null);
-        _inner.onDone(null);
+        var inner = _inner!;
+        inner.onData(null);
+        inner.onDone(null);
 
         // Setting onError to null will cause errors to be top-leveled.
-        _inner.onError((_, __) {});
+        inner.onError((_, __) {});
         _inner = null;
         return _handleCancel(inner);
       });
   final _cancelMemoizer = AsyncMemoizer();
 
   @override
-  void pause([Future resumeFuture]) {
+  void pause([Future? resumeFuture]) {
     if (_cancelMemoizer.hasRun) return;
     if (resumeFuture != null) resumeFuture.whenComplete(resume);
-    _handlePause(_inner);
+    _handlePause(_inner!);
   }
 
   @override
   void resume() {
     if (_cancelMemoizer.hasRun) return;
-    _handleResume(_inner);
+    _handleResume(_inner!);
   }
 
   @override
-  Future<E> asFuture<E>([E futureValue]) =>
+  Future<E> asFuture<E>([E? futureValue]) =>
       _inner?.asFuture(futureValue) ?? Completer<E>().future;
 }
diff --git a/lib/src/stream_zip.dart b/lib/src/stream_zip.dart
index e319746..506bf6d 100644
--- a/lib/src/stream_zip.dart
+++ b/lib/src/stream_zip.dart
@@ -18,12 +18,12 @@
   StreamZip(Iterable<Stream<T>> streams) : _streams = streams;
 
   @override
-  StreamSubscription<List<T>> listen(void Function(List<T>) onData,
-      {Function onError, void Function() onDone, bool cancelOnError}) {
+  StreamSubscription<List<T>> listen(void Function(List<T>)? onData,
+      {Function? onError, void Function()? onDone, bool? cancelOnError}) {
     cancelOnError = identical(true, cancelOnError);
     var subscriptions = <StreamSubscription<T>>[];
-    StreamController<List<T>> controller;
-    List<T> current;
+    late StreamController<List<T>> controller;
+    late List<T?> current;
     var dataCount = 0;
 
     /// Called for each data from a subscription in [subscriptions].
@@ -31,8 +31,8 @@
       current[index] = data;
       dataCount++;
       if (dataCount == subscriptions.length) {
-        var data = current;
-        current = List(subscriptions.length);
+        var data = List<T>.from(current);
+        current = List<T?>.filled(subscriptions.length, null);
         dataCount = 0;
         for (var i = 0; i < subscriptions.length; i++) {
           if (i != index) subscriptions[i].resume();
@@ -85,7 +85,7 @@
       rethrow;
     }
 
-    current = List(subscriptions.length);
+    current = List<T?>.filled(subscriptions.length, null);
 
     controller = StreamController<List<T>>(onPause: () {
       for (var i = 0; i < subscriptions.length; i++) {
diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart
index b428619..2c94e05 100644
--- a/lib/src/subscription_stream.dart
+++ b/lib/src/subscription_stream.dart
@@ -18,7 +18,7 @@
 /// If other code is accessing the subscription, results may be unpredictable.
 class SubscriptionStream<T> extends Stream<T> {
   /// The subscription providing the events for this stream.
-  StreamSubscription<T> _source;
+  StreamSubscription<T>? _source;
 
   /// Create a single-subscription `Stream` from [subscription].
   ///
@@ -31,21 +31,22 @@
   /// an error.
   SubscriptionStream(StreamSubscription<T> subscription)
       : _source = subscription {
-    _source.pause();
+    var source = _source!;
+    source.pause();
     // Clear callbacks to avoid keeping them alive unnecessarily.
-    _source.onData(null);
-    _source.onError(null);
-    _source.onDone(null);
+    source.onData(null);
+    source.onError(null);
+    source.onDone(null);
   }
 
   @override
-  StreamSubscription<T> listen(void Function(T) onData,
-      {Function onError, void Function() onDone, bool cancelOnError}) {
-    if (_source == null) {
+  StreamSubscription<T> listen(void Function(T)? onData,
+      {Function? onError, void Function()? onDone, bool? cancelOnError}) {
+    var subscription = _source;
+    if (subscription == null) {
       throw StateError('Stream has already been listened to.');
     }
     cancelOnError = (true == cancelOnError);
-    var subscription = _source;
     _source = null;
 
     var result = cancelOnError
@@ -71,26 +72,17 @@
       : super(subscription);
 
   @override
-  void onError(Function handleError) {
+  void onError(Function? handleError) {
     // Cancel when receiving an error.
     super.onError((error, StackTrace stackTrace) {
-      var cancelFuture = super.cancel();
-      if (cancelFuture != null) {
-        // Wait for the cancel to complete before sending the error event.
-        cancelFuture.whenComplete(() {
-          if (handleError is ZoneBinaryCallback) {
-            handleError(error, stackTrace);
-          } else {
-            handleError(error);
-          }
-        });
-      } else {
+      // Wait for the cancel to complete before sending the error event.
+      super.cancel().whenComplete(() {
         if (handleError is ZoneBinaryCallback) {
           handleError(error, stackTrace);
-        } else {
+        } else if (handleError != null) {
           handleError(error);
         }
-      }
+      });
     });
   }
 }
diff --git a/lib/src/typed/stream_subscription.dart b/lib/src/typed/stream_subscription.dart
index d85b776..fe91656 100644
--- a/lib/src/typed/stream_subscription.dart
+++ b/lib/src/typed/stream_subscription.dart
@@ -13,22 +13,23 @@
   TypeSafeStreamSubscription(this._subscription);
 
   @override
-  void onData(void Function(T) handleData) {
+  void onData(void Function(T)? handleData) {
+    if (handleData == null) return _subscription.onData(null);
     _subscription.onData((data) => handleData(data as T));
   }
 
   @override
-  void onError(Function handleError) {
+  void onError(Function? handleError) {
     _subscription.onError(handleError);
   }
 
   @override
-  void onDone(void Function() handleDone) {
+  void onDone(void Function()? handleDone) {
     _subscription.onDone(handleDone);
   }
 
   @override
-  void pause([Future resumeFuture]) {
+  void pause([Future? resumeFuture]) {
     _subscription.pause(resumeFuture);
   }
 
@@ -41,5 +42,6 @@
   Future cancel() => _subscription.cancel();
 
   @override
-  Future<E> asFuture<E>([E futureValue]) => _subscription.asFuture(futureValue);
+  Future<E> asFuture<E>([E? futureValue]) =>
+      _subscription.asFuture(futureValue);
 }
diff --git a/pubspec.yaml b/pubspec.yaml
index 0cb71cf..bfdf20e 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,11 +1,11 @@
 name: async
-version: 2.4.2
+version: 2.5.0-nullsafety
 
 description: Utility functions and classes related to the 'dart:async' library.
 homepage: https://www.github.com/dart-lang/async
 
 environment:
-  sdk: '>=2.2.0 <3.0.0'
+  sdk: '>=2.9.0-20.0.dev <2.9.0'
 
 dependencies:
   collection: ^1.5.0
@@ -15,3 +15,81 @@
   stack_trace: ^1.0.0
   test: ^1.0.0
   pedantic: ^1.0.0
+
+dependency_overrides:
+  boolean_selector:
+    git:
+      url: git://github.com/dart-lang/boolean_selector.git
+      ref: null_safety
+  charcode:
+    git:
+      url: git://github.com/dart-lang/charcode.git
+      ref: null_safety
+  clock:
+    git:
+      url: git://github.com/dart-lang/clock.git
+      ref: null_safety
+  collection:
+    git:
+      url: git://github.com/dart-lang/collection.git
+      ref: null_safety
+  fake_async:
+    git:
+      url: git://github.com/dart-lang/fake_async.git
+      ref: null_safety
+  matcher:
+    git:
+      url: git://github.com/dart-lang/matcher.git
+      ref: null_safety
+  meta:
+    git:
+      url: git://github.com/dart-lang/sdk.git
+      ref: null_safety-pkgs
+      path: pkg/meta
+  path:
+    git:
+      url: git://github.com/dart-lang/path.git
+      ref: null_safety
+  pedantic:
+    git:
+      url: git://github.com/dart-lang/pedantic.git
+      ref: null_safety
+  pool:
+    git:
+      url: git://github.com/dart-lang/pool.git
+      ref: null_safety
+  source_span:
+    git:
+      url: git://github.com/dart-lang/source_span.git
+      ref: null_safety
+  stack_trace:
+    git:
+      url: git://github.com/dart-lang/stack_trace.git
+      ref: null_safety
+  stream_channel:
+    git:
+      url: git://github.com/dart-lang/stream_channel.git
+      ref: null_safety
+  string_scanner:
+    git:
+      url: git://github.com/dart-lang/string_scanner.git
+      ref: null_safety
+  term_glyph:
+    git:
+      url: git://github.com/dart-lang/term_glyph.git
+      ref: null_safety
+  test_api:
+    git:
+      url: git://github.com/dart-lang/test.git
+      ref: null_safety
+      path: pkgs/test_api
+  test_core:
+    git:
+      url: git://github.com/dart-lang/test.git
+      ref: null_safety
+      path: pkgs/test_core
+  test:
+    git:
+      url: git://github.com/dart-lang/test.git
+      ref: null_safety
+      path: pkgs/test
diff --git a/test/async_cache_test.dart b/test/async_cache_test.dart
index 4efbb69..81e43e1 100644
--- a/test/async_cache_test.dart
+++ b/test/async_cache_test.dart
@@ -9,7 +9,7 @@
 import 'package:test/test.dart';
 
 void main() {
-  AsyncCache<String> cache;
+  late AsyncCache<String> cache;
 
   setUp(() {
     // Create a cache that is fresh for an hour.
@@ -22,7 +22,8 @@
 
   test('should not fetch via callback when a cache exists', () async {
     await cache.fetch(() async => 'Expensive');
-    expect(await cache.fetch(expectAsync0(() => null, count: 0)), 'Expensive');
+    expect(await cache.fetch(expectAsync0(() async => 'fake', count: 0)),
+        'Expensive');
   });
 
   test('should not fetch via callback when a future is in-flight', () async {
@@ -31,7 +32,7 @@
 
     var completer = Completer<String>();
     expect(cache.fetch(() => completer.future), completion('Expensive'));
-    expect(cache.fetch(expectAsync0(() => null, count: 0)),
+    expect(cache.fetch(expectAsync0(() async => 'fake', count: 0)),
         completion('Expensive'));
     completer.complete('Expensive');
   });
@@ -79,7 +80,10 @@
       yield '2';
       yield '3';
     }).toList();
-    expect(await cache.fetchStream(expectAsync0(() => null, count: 0)).toList(),
+    expect(
+        await cache
+            .fetchStream(expectAsync0(() => Stream.empty(), count: 0))
+            .toList(),
         ['1', '2', '3']);
   });
 
@@ -148,7 +152,7 @@
   test('should pause a cached stream without affecting others', () async {
     Stream<String> call() => Stream.fromIterable(['1', '2', '3']);
 
-    StreamSubscription sub;
+    late StreamSubscription sub;
     sub = cache.fetchStream(call).listen(expectAsync1((event) {
       if (event == '1') sub.pause();
     }));
diff --git a/test/async_memoizer_test.dart b/test/async_memoizer_test.dart
index 982f7c9..490b389 100644
--- a/test/async_memoizer_test.dart
+++ b/test/async_memoizer_test.dart
@@ -6,7 +6,7 @@
 import 'package:test/test.dart';
 
 void main() {
-  AsyncMemoizer cache;
+  late AsyncMemoizer cache;
   setUp(() => cache = AsyncMemoizer());
 
   test('runs the function only the first time runOnce() is called', () async {
diff --git a/test/cancelable_operation_test.dart b/test/cancelable_operation_test.dart
index b38c0b9..c87e43a 100644
--- a/test/cancelable_operation_test.dart
+++ b/test/cancelable_operation_test.dart
@@ -11,7 +11,7 @@
 
 void main() {
   group('without being canceled', () {
-    CancelableCompleter completer;
+    late CancelableCompleter completer;
     setUp(() {
       completer = CancelableCompleter(onCancel: expectAsync0(() {}, count: 0));
     });
@@ -116,7 +116,7 @@
 
     test('fires onCancel', () {
       var canceled = false;
-      CancelableCompleter completer;
+      late CancelableCompleter completer;
       completer = CancelableCompleter(onCancel: expectAsync0(() {
         expect(completer.isCanceled, isTrue);
         canceled = true;
@@ -243,23 +243,23 @@
   });
 
   group('then', () {
-    FutureOr<String> Function(int) onValue;
-    FutureOr<String> Function(Object, StackTrace) onError;
-    FutureOr<String> Function() onCancel;
-    bool propagateCancel;
-    CancelableCompleter<int> originalCompleter;
+    FutureOr<String> Function(int)? onValue;
+    FutureOr<String> Function(Object, StackTrace)? onError;
+    FutureOr<String> Function()? onCancel;
+    late bool propagateCancel;
+    late CancelableCompleter<int> originalCompleter;
 
     setUp(() {
       // Initialize all functions to ones that expect to not be called.
-      onValue = expectAsync1((_) => null, count: 0, id: 'onValue');
-      onError = expectAsync2((e, s) => null, count: 0, id: 'onError');
-      onCancel = expectAsync0(() => null, count: 0, id: 'onCancel');
+      onValue = expectAsync1((_) => 'Fake', count: 0, id: 'onValue');
+      onError = expectAsync2((e, s) => 'Fake', count: 0, id: 'onError');
+      onCancel = expectAsync0(() => 'Fake', count: 0, id: 'onCancel');
       propagateCancel = false;
     });
 
     CancelableOperation<String> runThen() {
       originalCompleter = CancelableCompleter();
-      return originalCompleter.operation.then(onValue,
+      return originalCompleter.operation.then(onValue!,
           onError: onError,
           onCancel: onCancel,
           propagateCancel: propagateCancel);
diff --git a/test/future_group_test.dart b/test/future_group_test.dart
index f99d06d..22e90f8 100644
--- a/test/future_group_test.dart
+++ b/test/future_group_test.dart
@@ -10,7 +10,7 @@
 import 'utils.dart';
 
 void main() {
-  FutureGroup futureGroup;
+  late FutureGroup futureGroup;
   setUp(() {
     futureGroup = FutureGroup();
   });
diff --git a/test/lazy_stream_test.dart b/test/lazy_stream_test.dart
index 2affe5b..32c6b14 100644
--- a/test/lazy_stream_test.dart
+++ b/test/lazy_stream_test.dart
@@ -10,10 +10,6 @@
 import 'utils.dart';
 
 void main() {
-  test('disallows a null callback', () {
-    expect(() => LazyStream(null), throwsArgumentError);
-  });
-
   test('calls the callback when the stream is listened', () async {
     var callbackCalled = false;
     var stream = LazyStream(expectAsync0(() {
@@ -96,7 +92,7 @@
   });
 
   test("a lazy stream can't be listened to from within its callback", () {
-    LazyStream stream;
+    late LazyStream stream;
     stream = LazyStream(expectAsync0(() {
       expect(() => stream.listen(null), throwsStateError);
       return Stream.empty();
diff --git a/test/result/result_captureAll_test.dart b/test/result/result_captureAll_test.dart
index 8e79872..b992e1f 100644
--- a/test/result/result_captureAll_test.dart
+++ b/test/result/result_captureAll_test.dart
@@ -14,7 +14,7 @@
 
 /// Helper function creating an iterable of futures.
 Iterable<Future<int>> futures(int count,
-    {bool Function(int index) throwWhen}) sync* {
+    {bool Function(int index)? throwWhen}) sync* {
   for (var i = 0; i < count; i++) {
     if (throwWhen != null && throwWhen(i)) {
       yield Future<int>.error('$i', someStack);
diff --git a/test/result/result_flattenAll_test.dart b/test/result/result_flattenAll_test.dart
index c0a8603..b87fec4 100644
--- a/test/result/result_flattenAll_test.dart
+++ b/test/result/result_flattenAll_test.dart
@@ -11,7 +11,7 @@
 
 /// Helper function creating an iterable of results.
 Iterable<Result<int>> results(int count,
-    {bool Function(int index) throwWhen}) sync* {
+    {bool Function(int index)? throwWhen}) sync* {
   for (var i = 0; i < count; i++) {
     if (throwWhen != null && throwWhen(i)) {
       yield err(i);
@@ -27,7 +27,7 @@
       expect(result, expectation);
     } else {
       expect(result.isValue, true);
-      expect(result.asValue.value, expectation.asValue.value);
+      expect(result.asValue!.value, expectation.asValue!.value);
     }
   }
 
diff --git a/test/result/result_future_test.dart b/test/result/result_future_test.dart
index 4218db2..7171005 100644
--- a/test/result/result_future_test.dart
+++ b/test/result/result_future_test.dart
@@ -9,8 +9,8 @@
 import 'package:test/test.dart';
 
 void main() {
-  Completer completer;
-  ResultFuture future;
+  late Completer completer;
+  late ResultFuture future;
   setUp(() {
     completer = Completer();
     future = ResultFuture(completer.future);
@@ -25,7 +25,7 @@
 
     // The completer calls its listeners asynchronously. We have to wait
     // before we can access the result.
-    expect(future.then((_) => future.result.asValue.value),
+    expect(future.then((_) => future.result!.asValue!.value),
         completion(equals(12)));
   });
 
@@ -36,7 +36,7 @@
     // The completer calls its listeners asynchronously. We have to wait
     // before we can access the result.
     return future.catchError((_) {}).then((_) {
-      var error = future.result.asError;
+      var error = future.result!.asError!;
       expect(error.error, equals('error'));
       expect(error.stackTrace, equals(trace));
     });
diff --git a/test/result/result_test.dart b/test/result/result_test.dart
index adbc445..154785b 100644
--- a/test/result/result_test.dart
+++ b/test/result/result_test.dart
@@ -16,7 +16,7 @@
     var result = Result<int>.value(42);
     expect(result.isValue, isTrue);
     expect(result.isError, isFalse);
-    ValueResult value = result.asValue;
+    ValueResult value = result.asValue!;
     expect(value.value, equals(42));
   });
 
@@ -24,7 +24,7 @@
     Result<int> result = ValueResult<int>(42);
     expect(result.isValue, isTrue);
     expect(result.isError, isFalse);
-    var value = result.asValue;
+    var value = result.asValue!;
     expect(value.value, equals(42));
   });
 
@@ -32,7 +32,7 @@
     var result = Result<bool>.error('BAD', stack);
     expect(result.isValue, isFalse);
     expect(result.isError, isTrue);
-    var error = result.asError;
+    var error = result.asError!;
     expect(error.error, equals('BAD'));
     expect(error.stackTrace, same(stack));
   });
@@ -50,9 +50,10 @@
     var result = Result<bool>.error('BAD');
     expect(result.isValue, isFalse);
     expect(result.isError, isTrue);
-    var error = result.asError;
+    var error = result.asError!;
     expect(error.error, equals('BAD'));
-    expect(error.stackTrace, isNull);
+    // A default stack trace is created
+    expect(error.stackTrace, isNotNull);
   });
 
   test('complete with value', () {
@@ -71,7 +72,7 @@
     var c = Completer<bool>();
     c.future.then((bool v) {
       fail('Unexpected value $v');
-    }, onError: expectAsync2((e, s) {
+    }).then<void>((_) {}, onError: expectAsync2((e, s) {
       expect(e, equals('BAD'));
       expect(s, same(stack));
     }));
@@ -108,7 +109,7 @@
     Result<bool> result = ErrorResult('BAD', stack);
     result.asFuture.then((bool v) {
       fail('Unexpected value $v');
-    }, onError: expectAsync2((e, s) {
+    }).then<void>((_) {}, onError: expectAsync2((e, s) {
       expect(e, equals('BAD'));
       expect(s, same(stack));
     }));
@@ -119,7 +120,7 @@
     Result.capture(value).then(expectAsync1((Result result) {
       expect(result.isValue, isTrue);
       expect(result.isError, isFalse);
-      var value = result.asValue;
+      var value = result.asValue!;
       expect(value.value, equals(42));
     }), onError: (e, s) {
       fail('Unexpected error: $e');
@@ -131,7 +132,7 @@
     Result.capture(value).then(expectAsync1((Result result) {
       expect(result.isValue, isFalse);
       expect(result.isError, isTrue);
-      var error = result.asError;
+      var error = result.asError!;
       expect(error.error, equals('BAD'));
       expect(error.stackTrace, same(stack));
     }), onError: (e, s) {
@@ -153,7 +154,7 @@
     var future = Future<Result<bool>>.value(Result<bool>.error('BAD', stack));
     Result.release(future).then((v) {
       fail('Unexpected value: $v');
-    }, onError: expectAsync2((e, s) {
+    }).then<void>((_) {}, onError: expectAsync2((e, s) {
       expect(e, equals('BAD'));
       expect(s, same(stack));
     }));
@@ -164,7 +165,7 @@
     var future = Future<Result<bool>>.error('BAD', stack);
     Result.release(future).then((v) {
       fail('Unexpected value: $v');
-    }, onError: expectAsync2((e, s) {
+    }).then<void>((_) {}, onError: expectAsync2((e, s) {
       expect(e, equals('BAD'));
       expect(s, same(stack));
     }));
@@ -203,15 +204,15 @@
       expect(expectedList.isEmpty, isFalse);
       Result expected = expectedList.removeFirst();
       expect(expected.isValue, isTrue);
-      expect(v, equals(expected.asValue.value));
+      expect(v, equals(expected.asValue!.value));
     }
 
     void errorListener(error, StackTrace stackTrace) {
       expect(expectedList.isEmpty, isFalse);
       Result expected = expectedList.removeFirst();
       expect(expected.isError, isTrue);
-      expect(error, equals(expected.asError.error));
-      expect(stackTrace, same(expected.asError.stackTrace));
+      expect(error, equals(expected.asError!.error));
+      expect(stackTrace, same(expected.asError!.stackTrace));
     }
 
     stream.listen(expectAsync1(dataListener, count: 2),
@@ -311,10 +312,10 @@
   expect(actual.isValue, equals(expected.isValue));
   expect(actual.isError, equals(expected.isError));
   if (actual.isValue) {
-    expect(actual.asValue.value, equals(expected.asValue.value));
+    expect(actual.asValue!.value, equals(expected.asValue!.value));
   } else {
-    expect(actual.asError.error, equals(expected.asError.error));
-    expect(actual.asError.stackTrace, same(expected.asError.stackTrace));
+    expect(actual.asError!.error, equals(expected.asError!.error));
+    expect(actual.asError!.stackTrace, same(expected.asError!.stackTrace));
   }
 }
 
@@ -334,8 +335,8 @@
   }
 
   @override
-  void addError(error, [StackTrace stack]) {
-    onError(error, stack);
+  void addError(error, [StackTrace? stack]) {
+    onError(error, stack ?? StackTrace.fromString(''));
   }
 
   @override
diff --git a/test/stream_completer_test.dart b/test/stream_completer_test.dart
index 0cd210a..ecb5621 100644
--- a/test/stream_completer_test.dart
+++ b/test/stream_completer_test.dart
@@ -77,7 +77,7 @@
     var completer = StreamCompleter<int>();
     var lastEvent = -1;
     var controller = StreamController<int>();
-    StreamSubscription subscription;
+    late StreamSubscription subscription;
     subscription = completer.stream.listen((value) {
       expect(value, lessThan(3));
       lastEvent = value;
@@ -125,7 +125,7 @@
 
   test("source stream isn't listened to until completer stream is", () async {
     var completer = StreamCompleter();
-    StreamController controller;
+    late StreamController controller;
     controller = StreamController(onListen: () {
       scheduleMicrotask(controller.close);
     });
@@ -139,13 +139,13 @@
   });
 
   test('cancelOnError true when listening before linking stream', () async {
-    var completer = StreamCompleter();
+    var completer = StreamCompleter<Object>();
     Object lastEvent = -1;
-    var controller = StreamController();
+    var controller = StreamController<Object>();
     completer.stream.listen((value) {
       expect(value, lessThan(3));
       lastEvent = value;
-    }, onError: (value) {
+    }, onError: (Object value) {
       expect(value, '3');
       lastEvent = value;
     }, onDone: unreachable('done'), cancelOnError: true);
@@ -172,9 +172,9 @@
   });
 
   test('cancelOnError true when listening after linking stream', () async {
-    var completer = StreamCompleter();
+    var completer = StreamCompleter<Object>();
     Object lastEvent = -1;
-    var controller = StreamController();
+    var controller = StreamController<Object>();
     completer.setSourceStream(controller.stream);
     controller.add(1);
     expect(controller.hasListener, isFalse);
@@ -182,7 +182,7 @@
     completer.stream.listen((value) {
       expect(value, lessThan(3));
       lastEvent = value;
-    }, onError: (value) {
+    }, onError: (Object value) {
       expect(value, '3');
       lastEvent = value;
     }, onDone: unreachable('done'), cancelOnError: true);
@@ -265,8 +265,8 @@
   });
 
   test('setting onData etc. before and after setting stream', () async {
-    var completer = StreamCompleter();
-    var controller = StreamController();
+    var completer = StreamCompleter<int>();
+    var controller = StreamController<int>();
     var subscription = completer.stream.listen(null);
     Object lastEvent = 0;
     subscription.onData((value) => lastEvent = value);
diff --git a/test/stream_group_test.dart b/test/stream_group_test.dart
index 690fbc1..eadae19 100644
--- a/test/stream_group_test.dart
+++ b/test/stream_group_test.dart
@@ -9,7 +9,7 @@
 
 void main() {
   group('single-subscription', () {
-    StreamGroup<String> streamGroup;
+    late StreamGroup<String> streamGroup;
     setUp(() {
       streamGroup = StreamGroup<String>();
     });
@@ -236,7 +236,7 @@
             StreamController<String>(onCancel: () => completer.future);
 
         var fired = false;
-        streamGroup.add(controller.stream).then((_) => fired = true);
+        streamGroup.add(controller.stream)!.then((_) => fired = true);
 
         await flushMicrotasks();
         expect(fired, isFalse);
@@ -249,7 +249,7 @@
   });
 
   group('broadcast', () {
-    StreamGroup<String> streamGroup;
+    late StreamGroup<String> streamGroup;
     setUp(() {
       streamGroup = StreamGroup<String>.broadcast();
     });
@@ -455,7 +455,7 @@
 }
 
 void regardlessOfType(StreamGroup<String> Function() newStreamGroup) {
-  StreamGroup<String> streamGroup;
+  late StreamGroup<String> streamGroup;
   setUp(() {
     streamGroup = newStreamGroup();
   });
@@ -608,7 +608,7 @@
         await flushMicrotasks();
 
         var fired = false;
-        streamGroup.remove(controller.stream).then((_) => fired = true);
+        streamGroup.remove(controller.stream)!.then((_) => fired = true);
 
         await flushMicrotasks();
         expect(fired, isFalse);
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
index fa286a8..74c91e6 100644
--- a/test/stream_queue_test.dart
+++ b/test/stream_queue_test.dart
@@ -250,7 +250,7 @@
       var skip4 = events.skip(1);
       var index = 0;
       // Check that futures complete in order.
-      Func1Required<int> sequence(expectedValue, sequenceIndex) => (value) {
+      Func1Required<int?> sequence(expectedValue, sequenceIndex) => (value) {
             expect(value, expectedValue);
             expect(index, sequenceIndex);
             index++;
@@ -610,7 +610,7 @@
       var controller = StreamController<int>();
       var events = StreamQueue<int>(controller.stream);
 
-      bool hasNext;
+      bool? hasNext;
       events.hasNext.then((result) {
         hasNext = result;
       });
@@ -626,7 +626,7 @@
       var controller = StreamController<int>();
       var events = StreamQueue<int>(controller.stream);
 
-      bool hasNext;
+      bool? hasNext;
       events.hasNext.then((result) {
         hasNext = result;
       });
@@ -772,10 +772,10 @@
   });
 
   group('startTransaction operation produces a transaction that', () {
-    StreamQueue<int> events;
-    StreamQueueTransaction<int> transaction;
-    StreamQueue<int> queue1;
-    StreamQueue<int> queue2;
+    late StreamQueue<int> events;
+    late StreamQueueTransaction<int> transaction;
+    late StreamQueue<int> queue1;
+    late StreamQueue<int> queue2;
     setUp(() async {
       events = StreamQueue(createStream());
       expect(await events.next, 1);
@@ -1004,7 +1004,7 @@
   });
 
   group('withTransaction operation', () {
-    StreamQueue<int> events;
+    late StreamQueue<int> events;
     setUp(() async {
       events = StreamQueue(createStream());
       expect(await events.next, 1);
@@ -1058,7 +1058,7 @@
   });
 
   group('cancelable operation', () {
-    StreamQueue<int> events;
+    late StreamQueue<int> events;
     setUp(() async {
       events = StreamQueue(createStream());
       expect(await events.next, 1);
diff --git a/test/stream_sink_completer_test.dart b/test/stream_sink_completer_test.dart
index 69d1d8a..591f32f 100644
--- a/test/stream_sink_completer_test.dart
+++ b/test/stream_sink_completer_test.dart
@@ -10,7 +10,7 @@
 import 'utils.dart';
 
 void main() {
-  StreamSinkCompleter completer;
+  late StreamSinkCompleter completer;
   setUp(() {
     completer = StreamSinkCompleter();
   });
@@ -21,10 +21,10 @@
       completer.setDestinationSink(sink);
       completer.sink..add(1)..add(2)..add(3)..add(4);
 
-      expect(sink.results[0].asValue.value, equals(1));
-      expect(sink.results[1].asValue.value, equals(2));
-      expect(sink.results[2].asValue.value, equals(3));
-      expect(sink.results[3].asValue.value, equals(4));
+      expect(sink.results[0].asValue!.value, equals(1));
+      expect(sink.results[1].asValue!.value, equals(2));
+      expect(sink.results[2].asValue!.value, equals(3));
+      expect(sink.results[3].asValue!.value, equals(4));
     });
 
     test('error events are forwarded', () {
@@ -32,8 +32,8 @@
       completer.setDestinationSink(sink);
       completer.sink..addError('oh no')..addError("that's bad");
 
-      expect(sink.results[0].asError.error, equals('oh no'));
-      expect(sink.results[1].asError.error, equals("that's bad"));
+      expect(sink.results[0].asError!.error, equals('oh no'));
+      expect(sink.results[1].asError!.error, equals("that's bad"));
     });
 
     test('addStream is forwarded', () async {
@@ -49,10 +49,10 @@
       controller.addError("that's bad");
       await flushMicrotasks();
 
-      expect(sink.results[0].asValue.value, equals(1));
-      expect(sink.results[1].asError.error, equals('oh no'));
-      expect(sink.results[2].asValue.value, equals(2));
-      expect(sink.results[3].asError.error, equals("that's bad"));
+      expect(sink.results[0].asValue!.value, equals(1));
+      expect(sink.results[1].asError!.error, equals('oh no'));
+      expect(sink.results[2].asValue!.value, equals(2));
+      expect(sink.results[3].asError!.error, equals("that's bad"));
       expect(sink.isClosed, isFalse);
 
       controller.close();
@@ -121,10 +121,10 @@
       completer.setDestinationSink(sink);
       await flushMicrotasks();
 
-      expect(sink.results[0].asValue.value, equals(1));
-      expect(sink.results[1].asValue.value, equals(2));
-      expect(sink.results[2].asValue.value, equals(3));
-      expect(sink.results[3].asValue.value, equals(4));
+      expect(sink.results[0].asValue!.value, equals(1));
+      expect(sink.results[1].asValue!.value, equals(2));
+      expect(sink.results[2].asValue!.value, equals(3));
+      expect(sink.results[3].asValue!.value, equals(4));
     });
 
     test('error events are forwarded', () async {
@@ -135,8 +135,8 @@
       completer.setDestinationSink(sink);
       await flushMicrotasks();
 
-      expect(sink.results[0].asError.error, equals('oh no'));
-      expect(sink.results[1].asError.error, equals("that's bad"));
+      expect(sink.results[0].asError!.error, equals('oh no'));
+      expect(sink.results[1].asError!.error, equals("that's bad"));
     });
 
     test('addStream is forwarded', () async {
@@ -154,10 +154,10 @@
       completer.setDestinationSink(sink);
       await flushMicrotasks();
 
-      expect(sink.results[0].asValue.value, equals(1));
-      expect(sink.results[1].asError.error, equals('oh no'));
-      expect(sink.results[2].asValue.value, equals(2));
-      expect(sink.results[3].asError.error, equals("that's bad"));
+      expect(sink.results[0].asValue!.value, equals(1));
+      expect(sink.results[1].asError!.error, equals('oh no'));
+      expect(sink.results[2].asValue!.value, equals(2));
+      expect(sink.results[3].asError!.error, equals("that's bad"));
       expect(sink.isClosed, isFalse);
     });
 
@@ -258,9 +258,9 @@
       futureCompleter.complete(testSink);
       await testSink.done;
 
-      expect(testSink.results[0].asValue.value, equals(1));
-      expect(testSink.results[1].asValue.value, equals(2));
-      expect(testSink.results[2].asValue.value, equals(3));
+      expect(testSink.results[0].asValue!.value, equals(1));
+      expect(testSink.results[1].asValue!.value, equals(2));
+      expect(testSink.results[2].asValue!.value, equals(3));
     });
 
     test('with an error', () async {
diff --git a/test/stream_sink_transformer_test.dart b/test/stream_sink_transformer_test.dart
index 8493971..e5f6baa 100644
--- a/test/stream_sink_transformer_test.dart
+++ b/test/stream_sink_transformer_test.dart
@@ -10,7 +10,7 @@
 import 'utils.dart';
 
 void main() {
-  StreamController controller;
+  late StreamController controller;
   setUp(() {
     controller = StreamController();
   });
@@ -18,7 +18,7 @@
   group('fromStreamTransformer', () {
     test('transforms data events', () {
       var transformer = StreamSinkTransformer.fromStreamTransformer(
-          StreamTransformer.fromHandlers(handleData: (i, sink) {
+          StreamTransformer.fromHandlers(handleData: (int i, sink) {
         sink.add(i * 2);
       }));
       var sink = transformer.bind(controller.sink);
@@ -117,7 +117,7 @@
   group('fromHandlers', () {
     test('transforms data events', () {
       var transformer =
-          StreamSinkTransformer.fromHandlers(handleData: (i, sink) {
+          StreamSinkTransformer.fromHandlers(handleData: (int i, sink) {
         sink.add(i * 2);
       });
       var sink = transformer.bind(controller.sink);
diff --git a/test/stream_splitter_test.dart b/test/stream_splitter_test.dart
index 3748c5c..3493193 100644
--- a/test/stream_splitter_test.dart
+++ b/test/stream_splitter_test.dart
@@ -8,8 +8,8 @@
 import 'package:test/test.dart';
 
 void main() {
-  StreamController<int> controller;
-  StreamSplitter splitter;
+  late StreamController<int> controller;
+  late StreamSplitter splitter;
   setUp(() {
     controller = StreamController<int>();
     splitter = StreamSplitter<int>(controller.stream);
diff --git a/test/stream_zip_test.dart b/test/stream_zip_test.dart
index 19c3dec..84948bf 100644
--- a/test/stream_zip_test.dart
+++ b/test/stream_zip_test.dart
@@ -9,7 +9,7 @@
 
 /// Create an error with the same values as [base], except that it throwsA
 /// when seeing the value [errorValue].
-Stream streamError(Stream base, int errorValue, error) {
+Stream streamError(Stream base, int errorValue, Object error) {
   return base.map((x) => (x == errorValue) ? throw error : x);
 }
 
@@ -306,7 +306,7 @@
     var s2 = Stream.fromIterable([1, 3, 5, 7]);
     var sz = StreamZip([s1, s2]);
     var ctr = 0;
-    StreamSubscription sub;
+    late StreamSubscription sub;
     sub = sz.listen(expectAsync1((v) {
       expect(v, equals([ctr * 2, ctr * 2 + 1]));
       if (ctr == 1) {
diff --git a/test/stream_zip_zone_test.dart b/test/stream_zip_zone_test.dart
index 68cd723..50af6b6 100644
--- a/test/stream_zip_zone_test.dart
+++ b/test/stream_zip_zone_test.dart
@@ -33,7 +33,7 @@
     var outer = Zone.current;
     runZoned(() {
       var newZone1 = Zone.current;
-      StreamSubscription sub;
+      late StreamSubscription sub;
       sub = stream.listen(expectAsync1((v) {
         expect(v, 42);
         expect(Zone.current, newZone1);
diff --git a/test/subscription_stream_test.dart b/test/subscription_stream_test.dart
index 98a809f..8be0572 100644
--- a/test/subscription_stream_test.dart
+++ b/test/subscription_stream_test.dart
@@ -22,7 +22,7 @@
     var stream = createStream();
     var skips = 0;
     var completer = Completer();
-    StreamSubscription<int> subscription;
+    late StreamSubscription<int> subscription;
     subscription = stream.listen((value) {
       ++skips;
       expect(value, skips);
@@ -72,8 +72,9 @@
   group('cancelOnError source:', () {
     for (var sourceCancels in [false, true]) {
       group('${sourceCancels ? "yes" : "no"}:', () {
-        SubscriptionStream subscriptionStream;
-        Future onCancel; // Completes if source stream is canceled before done.
+        late SubscriptionStream subscriptionStream;
+        late Future
+            onCancel; // Completes if source stream is canceled before done.
         setUp(() {
           var cancelCompleter = Completer();
           var source = createErrorStream(cancelCompleter);
@@ -159,7 +160,7 @@
   yield 4;
 }
 
-Stream<int> createErrorStream([Completer onCancel]) async* {
+Stream<int> createErrorStream([Completer? onCancel]) async* {
   var canceled = true;
   try {
     yield 1;
diff --git a/test/subscription_transformer_test.dart b/test/subscription_transformer_test.dart
index f0e3a70..8278ea0 100644
--- a/test/subscription_transformer_test.dart
+++ b/test/subscription_transformer_test.dart
@@ -90,7 +90,7 @@
           subscriptionTransformer(handleCancel: expectAsync1((inner) {
         callbackInvoked = true;
         inner.cancel();
-        return null;
+        return Future.value();
       }))).listen(expectAsync1((_) {}, count: 0));
 
       await flushMicrotasks();
@@ -248,11 +248,12 @@
   });
 
   group('when the outer subscription is canceled but the inner is not', () {
-    StreamSubscription subscription;
+    late StreamSubscription subscription;
     setUp(() {
       var controller = StreamController();
       subscription = controller.stream
-          .transform(subscriptionTransformer(handleCancel: (_) => null))
+          .transform(
+              subscriptionTransformer(handleCancel: (_) => Future.value()))
           .listen(expectAsync1((_) {}, count: 0),
               onError: expectAsync2((_, __) {}, count: 0),
               onDone: expectAsync0(() {}, count: 0));
diff --git a/test/typed_wrapper/stream_subscription_test.dart b/test/typed_wrapper/stream_subscription_test.dart
index 2a11546..33d28a8 100644
--- a/test/typed_wrapper/stream_subscription_test.dart
+++ b/test/typed_wrapper/stream_subscription_test.dart
@@ -11,10 +11,11 @@
 
 void main() {
   group('with valid types, forwards', () {
-    StreamController controller;
-    StreamSubscription wrapper;
-    bool isCanceled;
+    late StreamController controller;
+    late StreamSubscription wrapper;
+    late bool isCanceled;
     setUp(() {
+      isCanceled = false;
       controller = StreamController<Object>(onCancel: () {
         isCanceled = true;
       });
@@ -67,10 +68,11 @@
   });
 
   group('with invalid types,', () {
-    StreamController controller;
-    StreamSubscription wrapper;
-    bool isCanceled;
+    late StreamController controller;
+    late StreamSubscription wrapper;
+    late bool isCanceled;
     setUp(() {
+      isCanceled = false;
       controller = StreamController<Object>(onCancel: () {
         isCanceled = true;
       });
diff --git a/test/utils.dart b/test/utils.dart
index a99fe9b..4cc743e 100644
--- a/test/utils.dart
+++ b/test/utils.dart
@@ -25,7 +25,7 @@
 Matcher throwsZoned(matcher) => predicate((void Function() callback) {
       var firstError = true;
       runZoned(callback,
-          onError: expectAsync2((error, StackTrace stackTrace) {
+          onError: expectAsync2((Object error, StackTrace stackTrace) {
             if (firstError) {
               expect(error, matcher);
               firstError = false;
@@ -37,11 +37,11 @@
     });
 
 /// A matcher that runs a callback in its own zone and asserts that that zone
-/// emits a [CastError].
-final throwsZonedCastError = throwsZoned(TypeMatcher<CastError>());
+/// emits a [TypeError].
+final throwsZonedCastError = throwsZoned(TypeMatcher<TypeError>());
 
-/// A matcher that matches a callback or future that throws a [CastError].
-final throwsCastError = throwsA(TypeMatcher<CastError>());
+/// A matcher that matches a callback or future that throws a [TypeError].
+final throwsCastError = throwsA(TypeMatcher<TypeError>());
 
 /// A badly behaved stream which throws if it's ever listened to.
 ///
@@ -67,7 +67,7 @@
   @override
   void add(T event) {}
   @override
-  void addError(error, [StackTrace stackTrace]) {}
+  void addError(error, [StackTrace? stackTrace]) {}
   @override
   Future addStream(Stream<T> stream) async {}
   @override
@@ -95,7 +95,7 @@
   ///
   /// If [onDone] is passed, it's called when the user calls [close]. Its result
   /// is piped to the [done] future.
-  TestSink({void Function() onDone}) : _onDone = onDone ?? (() {});
+  TestSink({void Function()? onDone}) : _onDone = onDone ?? (() {});
 
   @override
   void add(T event) {
@@ -103,7 +103,7 @@
   }
 
   @override
-  void addError(error, [StackTrace stackTrace]) {
+  void addError(error, [StackTrace? stackTrace]) {
     results.add(Result<T>.error(error, stackTrace));
   }