Various clean-ups and null-safety improvements. (#140)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index d2d99c3..ab5a57d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,5 @@
+## 2.5.0-nullsafety.4-dev
+
 ## 2.5.0-nullsafety.3
 
 * Update SDK constraints to `>=2.12.0-0 <3.0.0` based on beta release
diff --git a/lib/src/async_cache.dart b/lib/src/async_cache.dart
index 99b3881..c4cd04e 100644
--- a/lib/src/async_cache.dart
+++ b/lib/src/async_cache.dart
@@ -42,7 +42,7 @@
   /// The [duration] starts counting after the Future returned by [fetch]
   /// completes, or after the Stream returned by [fetchStream] emits a done
   /// event.
-  AsyncCache(this._duration);
+  AsyncCache(Duration duration) : _duration = duration;
 
   /// Creates a cache that invalidates after an in-flight request is complete.
   ///
diff --git a/lib/src/result/result.dart b/lib/src/result/result.dart
index 5f93b55..165b472 100644
--- a/lib/src/result/result.dart
+++ b/lib/src/result/result.dart
@@ -99,7 +99,7 @@
   static Future<List<Result<T>>> captureAll<T>(Iterable<FutureOr<T>> elements) {
     var results = <Result<T>?>[];
     var pending = 0;
-    late Completer<List<Result<T>>> completer;
+    var completer = Completer<List<Result<T>>>();
     for (var element in elements) {
       if (element is Future<T>) {
         var i = results.length;
@@ -116,9 +116,8 @@
       }
     }
     if (pending == 0) {
-      return Future.value(List.from(results));
+      completer.complete(List.from(results));
     }
-    completer = Completer<List<Result<T>>>();
     return completer.future;
   }
 
diff --git a/lib/src/single_subscription_transformer.dart b/lib/src/single_subscription_transformer.dart
index 2e056b2..7c93b44 100644
--- a/lib/src/single_subscription_transformer.dart
+++ b/lib/src/single_subscription_transformer.dart
@@ -18,10 +18,8 @@
 
   @override
   Stream<T> bind(Stream<S> stream) {
-    late StreamSubscription<S> subscription;
-    var controller =
-        StreamController<T>(sync: true, onCancel: () => subscription.cancel());
-    subscription = stream.listen((value) {
+    var controller = StreamController<T>(sync: true);
+    var subscription = stream.listen((value) {
       // TODO(nweiz): When we release a new major version, get rid of the second
       // type parameter and avoid this conversion.
       try {
@@ -30,6 +28,7 @@
         controller.addError(error, stackTrace);
       }
     }, onError: controller.addError, onDone: controller.close);
+    controller.onCancel = subscription.cancel;
     return controller.stream;
   }
 }
diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart
index 8c17ec2..14a04e5 100644
--- a/lib/src/stream_group.dart
+++ b/lib/src/stream_group.dart
@@ -29,7 +29,7 @@
 class StreamGroup<T> implements Sink<Stream<T>> {
   /// The stream through which all events from streams in the group are emitted.
   Stream<T> get stream => _controller.stream;
-  late StreamController<T> _controller;
+  final StreamController<T> _controller;
 
   /// Whether the group is closed, meaning that no more streams may be added.
   var _closed = false;
@@ -72,19 +72,20 @@
   }
 
   /// Creates a new stream group where [stream] is single-subscriber.
-  StreamGroup() {
-    _controller = StreamController<T>(
-        onListen: _onListen,
-        onPause: _onPause,
-        onResume: _onResume,
-        onCancel: _onCancel,
-        sync: true);
+  StreamGroup() : _controller = StreamController<T>(sync: true) {
+    _controller
+      ..onListen = _onListen
+      ..onPause = _onPause
+      ..onResume = _onResume
+      ..onCancel = _onCancel;
   }
 
   /// Creates a new stream group where [stream] is a broadcast stream.
-  StreamGroup.broadcast() {
-    _controller = StreamController<T>.broadcast(
-        onListen: _onListen, onCancel: _onCancelBroadcast, sync: true);
+  StreamGroup.broadcast()
+      : _controller = StreamController<T>.broadcast(sync: true) {
+    _controller
+      ..onListen = _onListen
+      ..onCancel = _onCancelBroadcast;
   }
 
   /// Adds [stream] as a member of this group.
diff --git a/lib/src/stream_zip.dart b/lib/src/stream_zip.dart
index 506bf6d..e417cdc 100644
--- a/lib/src/stream_zip.dart
+++ b/lib/src/stream_zip.dart
@@ -22,7 +22,7 @@
       {Function? onError, void Function()? onDone, bool? cancelOnError}) {
     cancelOnError = identical(true, cancelOnError);
     var subscriptions = <StreamSubscription<T>>[];
-    late StreamController<List<T>> controller;
+    var controller = StreamController<List<T>>();
     late List<T?> current;
     var dataCount = 0;
 
@@ -32,7 +32,7 @@
       dataCount++;
       if (dataCount == subscriptions.length) {
         var data = List<T>.from(current);
-        current = List<T?>.filled(subscriptions.length, null);
+        current.fillRange(0, current.length, null);
         dataCount = 0;
         for (var i = 0; i < subscriptions.length; i++) {
           if (i != index) subscriptions[i].resume();
@@ -87,23 +87,26 @@
 
     current = List<T?>.filled(subscriptions.length, null);
 
-    controller = StreamController<List<T>>(onPause: () {
-      for (var i = 0; i < subscriptions.length; i++) {
-        // This may pause some subscriptions more than once.
-        // These will not be resumed by onResume below, but must wait for the
-        // next round.
-        subscriptions[i].pause();
+    controller
+      ..onPause = () {
+        for (var i = 0; i < subscriptions.length; i++) {
+          // This may pause some subscriptions more than once.
+          // These will not be resumed by onResume below, but must wait for the
+          // next round.
+          subscriptions[i].pause();
+        }
       }
-    }, onResume: () {
-      for (var i = 0; i < subscriptions.length; i++) {
-        subscriptions[i].resume();
+      ..onResume = () {
+        for (var i = 0; i < subscriptions.length; i++) {
+          subscriptions[i].resume();
+        }
       }
-    }, onCancel: () {
-      for (var i = 0; i < subscriptions.length; i++) {
-        // Canceling more than once is safe.
-        subscriptions[i].cancel();
-      }
-    });
+      ..onCancel = () {
+        for (var i = 0; i < subscriptions.length; i++) {
+          // Canceling more than once is safe.
+          subscriptions[i].cancel();
+        }
+      };
 
     if (subscriptions.isEmpty) {
       controller.close();
diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart
index 2c94e05..501cf91 100644
--- a/lib/src/subscription_stream.dart
+++ b/lib/src/subscription_stream.dart
@@ -31,12 +31,12 @@
   /// an error.
   SubscriptionStream(StreamSubscription<T> subscription)
       : _source = subscription {
-    var source = _source!;
-    source.pause();
     // Clear callbacks to avoid keeping them alive unnecessarily.
-    source.onData(null);
-    source.onError(null);
-    source.onDone(null);
+    subscription
+      ..pause()
+      ..onData(null)
+      ..onError(null)
+      ..onDone(null);
   }
 
   @override
diff --git a/pubspec.yaml b/pubspec.yaml
index 3aa8080..671174f 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 2.5.0-nullsafety.3
+version: 2.5.0-nullsafety.4-dev
 
 description: Utility functions and classes related to the 'dart:async' library.
 homepage: https://www.github.com/dart-lang/async