Revert "Various clean-ups and null-safety improvements. (#140)" (#152)

This reverts commit 05b9553c81b0168e7665d47730977ca289334a69.

Since this has not been published or rolled through to the SDK or
Flutter we will hold it back to reduce risk.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ab5a57d..d2d99c3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,3 @@
-## 2.5.0-nullsafety.4-dev
-
 ## 2.5.0-nullsafety.3
 
 * Update SDK constraints to `>=2.12.0-0 <3.0.0` based on beta release
diff --git a/lib/src/async_cache.dart b/lib/src/async_cache.dart
index c4cd04e..99b3881 100644
--- a/lib/src/async_cache.dart
+++ b/lib/src/async_cache.dart
@@ -42,7 +42,7 @@
   /// The [duration] starts counting after the Future returned by [fetch]
   /// completes, or after the Stream returned by [fetchStream] emits a done
   /// event.
-  AsyncCache(Duration duration) : _duration = duration;
+  AsyncCache(this._duration);
 
   /// Creates a cache that invalidates after an in-flight request is complete.
   ///
diff --git a/lib/src/result/result.dart b/lib/src/result/result.dart
index 165b472..5f93b55 100644
--- a/lib/src/result/result.dart
+++ b/lib/src/result/result.dart
@@ -99,7 +99,7 @@
   static Future<List<Result<T>>> captureAll<T>(Iterable<FutureOr<T>> elements) {
     var results = <Result<T>?>[];
     var pending = 0;
-    var completer = Completer<List<Result<T>>>();
+    late Completer<List<Result<T>>> completer;
     for (var element in elements) {
       if (element is Future<T>) {
         var i = results.length;
@@ -116,8 +116,9 @@
       }
     }
     if (pending == 0) {
-      completer.complete(List.from(results));
+      return Future.value(List.from(results));
     }
+    completer = Completer<List<Result<T>>>();
     return completer.future;
   }
 
diff --git a/lib/src/single_subscription_transformer.dart b/lib/src/single_subscription_transformer.dart
index 7c93b44..2e056b2 100644
--- a/lib/src/single_subscription_transformer.dart
+++ b/lib/src/single_subscription_transformer.dart
@@ -18,8 +18,10 @@
 
   @override
   Stream<T> bind(Stream<S> stream) {
-    var controller = StreamController<T>(sync: true);
-    var subscription = stream.listen((value) {
+    late StreamSubscription<S> subscription;
+    var controller =
+        StreamController<T>(sync: true, onCancel: () => subscription.cancel());
+    subscription = stream.listen((value) {
       // TODO(nweiz): When we release a new major version, get rid of the second
       // type parameter and avoid this conversion.
       try {
@@ -28,7 +30,6 @@
         controller.addError(error, stackTrace);
       }
     }, onError: controller.addError, onDone: controller.close);
-    controller.onCancel = subscription.cancel;
     return controller.stream;
   }
 }
diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart
index 14a04e5..8c17ec2 100644
--- a/lib/src/stream_group.dart
+++ b/lib/src/stream_group.dart
@@ -29,7 +29,7 @@
 class StreamGroup<T> implements Sink<Stream<T>> {
   /// The stream through which all events from streams in the group are emitted.
   Stream<T> get stream => _controller.stream;
-  final StreamController<T> _controller;
+  late StreamController<T> _controller;
 
   /// Whether the group is closed, meaning that no more streams may be added.
   var _closed = false;
@@ -72,20 +72,19 @@
   }
 
   /// Creates a new stream group where [stream] is single-subscriber.
-  StreamGroup() : _controller = StreamController<T>(sync: true) {
-    _controller
-      ..onListen = _onListen
-      ..onPause = _onPause
-      ..onResume = _onResume
-      ..onCancel = _onCancel;
+  StreamGroup() {
+    _controller = StreamController<T>(
+        onListen: _onListen,
+        onPause: _onPause,
+        onResume: _onResume,
+        onCancel: _onCancel,
+        sync: true);
   }
 
   /// Creates a new stream group where [stream] is a broadcast stream.
-  StreamGroup.broadcast()
-      : _controller = StreamController<T>.broadcast(sync: true) {
-    _controller
-      ..onListen = _onListen
-      ..onCancel = _onCancelBroadcast;
+  StreamGroup.broadcast() {
+    _controller = StreamController<T>.broadcast(
+        onListen: _onListen, onCancel: _onCancelBroadcast, sync: true);
   }
 
   /// Adds [stream] as a member of this group.
diff --git a/lib/src/stream_zip.dart b/lib/src/stream_zip.dart
index e417cdc..506bf6d 100644
--- a/lib/src/stream_zip.dart
+++ b/lib/src/stream_zip.dart
@@ -22,7 +22,7 @@
       {Function? onError, void Function()? onDone, bool? cancelOnError}) {
     cancelOnError = identical(true, cancelOnError);
     var subscriptions = <StreamSubscription<T>>[];
-    var controller = StreamController<List<T>>();
+    late StreamController<List<T>> controller;
     late List<T?> current;
     var dataCount = 0;
 
@@ -32,7 +32,7 @@
       dataCount++;
       if (dataCount == subscriptions.length) {
         var data = List<T>.from(current);
-        current.fillRange(0, current.length, null);
+        current = List<T?>.filled(subscriptions.length, null);
         dataCount = 0;
         for (var i = 0; i < subscriptions.length; i++) {
           if (i != index) subscriptions[i].resume();
@@ -87,26 +87,23 @@
 
     current = List<T?>.filled(subscriptions.length, null);
 
-    controller
-      ..onPause = () {
-        for (var i = 0; i < subscriptions.length; i++) {
-          // This may pause some subscriptions more than once.
-          // These will not be resumed by onResume below, but must wait for the
-          // next round.
-          subscriptions[i].pause();
-        }
+    controller = StreamController<List<T>>(onPause: () {
+      for (var i = 0; i < subscriptions.length; i++) {
+        // This may pause some subscriptions more than once.
+        // These will not be resumed by onResume below, but must wait for the
+        // next round.
+        subscriptions[i].pause();
       }
-      ..onResume = () {
-        for (var i = 0; i < subscriptions.length; i++) {
-          subscriptions[i].resume();
-        }
+    }, onResume: () {
+      for (var i = 0; i < subscriptions.length; i++) {
+        subscriptions[i].resume();
       }
-      ..onCancel = () {
-        for (var i = 0; i < subscriptions.length; i++) {
-          // Canceling more than once is safe.
-          subscriptions[i].cancel();
-        }
-      };
+    }, onCancel: () {
+      for (var i = 0; i < subscriptions.length; i++) {
+        // Canceling more than once is safe.
+        subscriptions[i].cancel();
+      }
+    });
 
     if (subscriptions.isEmpty) {
       controller.close();
diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart
index 501cf91..2c94e05 100644
--- a/lib/src/subscription_stream.dart
+++ b/lib/src/subscription_stream.dart
@@ -31,12 +31,12 @@
   /// an error.
   SubscriptionStream(StreamSubscription<T> subscription)
       : _source = subscription {
+    var source = _source!;
+    source.pause();
     // Clear callbacks to avoid keeping them alive unnecessarily.
-    subscription
-      ..pause()
-      ..onData(null)
-      ..onError(null)
-      ..onDone(null);
+    source.onData(null);
+    source.onError(null);
+    source.onDone(null);
   }
 
   @override
diff --git a/pubspec.yaml b/pubspec.yaml
index 671174f..3aa8080 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 2.5.0-nullsafety.4-dev
+version: 2.5.0-nullsafety.3
 
 description: Utility functions and classes related to the 'dart:async' library.
 homepage: https://www.github.com/dart-lang/async