Revert "Various clean-ups and null-safety improvements. (#140)" (#152)
This reverts commit 05b9553c81b0168e7665d47730977ca289334a69.
Since this has not been published or rolled through to the SDK or
Flutter we will hold it back to reduce risk.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ab5a57d..d2d99c3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,3 @@
-## 2.5.0-nullsafety.4-dev
-
## 2.5.0-nullsafety.3
* Update SDK constraints to `>=2.12.0-0 <3.0.0` based on beta release
diff --git a/lib/src/async_cache.dart b/lib/src/async_cache.dart
index c4cd04e..99b3881 100644
--- a/lib/src/async_cache.dart
+++ b/lib/src/async_cache.dart
@@ -42,7 +42,7 @@
/// The [duration] starts counting after the Future returned by [fetch]
/// completes, or after the Stream returned by [fetchStream] emits a done
/// event.
- AsyncCache(Duration duration) : _duration = duration;
+ AsyncCache(this._duration);
/// Creates a cache that invalidates after an in-flight request is complete.
///
diff --git a/lib/src/result/result.dart b/lib/src/result/result.dart
index 165b472..5f93b55 100644
--- a/lib/src/result/result.dart
+++ b/lib/src/result/result.dart
@@ -99,7 +99,7 @@
static Future<List<Result<T>>> captureAll<T>(Iterable<FutureOr<T>> elements) {
var results = <Result<T>?>[];
var pending = 0;
- var completer = Completer<List<Result<T>>>();
+ late Completer<List<Result<T>>> completer;
for (var element in elements) {
if (element is Future<T>) {
var i = results.length;
@@ -116,8 +116,9 @@
}
}
if (pending == 0) {
- completer.complete(List.from(results));
+ return Future.value(List.from(results));
}
+ completer = Completer<List<Result<T>>>();
return completer.future;
}
diff --git a/lib/src/single_subscription_transformer.dart b/lib/src/single_subscription_transformer.dart
index 7c93b44..2e056b2 100644
--- a/lib/src/single_subscription_transformer.dart
+++ b/lib/src/single_subscription_transformer.dart
@@ -18,8 +18,10 @@
@override
Stream<T> bind(Stream<S> stream) {
- var controller = StreamController<T>(sync: true);
- var subscription = stream.listen((value) {
+ late StreamSubscription<S> subscription;
+ var controller =
+ StreamController<T>(sync: true, onCancel: () => subscription.cancel());
+ subscription = stream.listen((value) {
// TODO(nweiz): When we release a new major version, get rid of the second
// type parameter and avoid this conversion.
try {
@@ -28,7 +30,6 @@
controller.addError(error, stackTrace);
}
}, onError: controller.addError, onDone: controller.close);
- controller.onCancel = subscription.cancel;
return controller.stream;
}
}
diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart
index 14a04e5..8c17ec2 100644
--- a/lib/src/stream_group.dart
+++ b/lib/src/stream_group.dart
@@ -29,7 +29,7 @@
class StreamGroup<T> implements Sink<Stream<T>> {
/// The stream through which all events from streams in the group are emitted.
Stream<T> get stream => _controller.stream;
- final StreamController<T> _controller;
+ late StreamController<T> _controller;
/// Whether the group is closed, meaning that no more streams may be added.
var _closed = false;
@@ -72,20 +72,19 @@
}
/// Creates a new stream group where [stream] is single-subscriber.
- StreamGroup() : _controller = StreamController<T>(sync: true) {
- _controller
- ..onListen = _onListen
- ..onPause = _onPause
- ..onResume = _onResume
- ..onCancel = _onCancel;
+ StreamGroup() {
+ _controller = StreamController<T>(
+ onListen: _onListen,
+ onPause: _onPause,
+ onResume: _onResume,
+ onCancel: _onCancel,
+ sync: true);
}
/// Creates a new stream group where [stream] is a broadcast stream.
- StreamGroup.broadcast()
- : _controller = StreamController<T>.broadcast(sync: true) {
- _controller
- ..onListen = _onListen
- ..onCancel = _onCancelBroadcast;
+ StreamGroup.broadcast() {
+ _controller = StreamController<T>.broadcast(
+ onListen: _onListen, onCancel: _onCancelBroadcast, sync: true);
}
/// Adds [stream] as a member of this group.
diff --git a/lib/src/stream_zip.dart b/lib/src/stream_zip.dart
index e417cdc..506bf6d 100644
--- a/lib/src/stream_zip.dart
+++ b/lib/src/stream_zip.dart
@@ -22,7 +22,7 @@
{Function? onError, void Function()? onDone, bool? cancelOnError}) {
cancelOnError = identical(true, cancelOnError);
var subscriptions = <StreamSubscription<T>>[];
- var controller = StreamController<List<T>>();
+ late StreamController<List<T>> controller;
late List<T?> current;
var dataCount = 0;
@@ -32,7 +32,7 @@
dataCount++;
if (dataCount == subscriptions.length) {
var data = List<T>.from(current);
- current.fillRange(0, current.length, null);
+ current = List<T?>.filled(subscriptions.length, null);
dataCount = 0;
for (var i = 0; i < subscriptions.length; i++) {
if (i != index) subscriptions[i].resume();
@@ -87,26 +87,23 @@
current = List<T?>.filled(subscriptions.length, null);
- controller
- ..onPause = () {
- for (var i = 0; i < subscriptions.length; i++) {
- // This may pause some subscriptions more than once.
- // These will not be resumed by onResume below, but must wait for the
- // next round.
- subscriptions[i].pause();
- }
+ controller = StreamController<List<T>>(onPause: () {
+ for (var i = 0; i < subscriptions.length; i++) {
+ // This may pause some subscriptions more than once.
+ // These will not be resumed by onResume below, but must wait for the
+ // next round.
+ subscriptions[i].pause();
}
- ..onResume = () {
- for (var i = 0; i < subscriptions.length; i++) {
- subscriptions[i].resume();
- }
+ }, onResume: () {
+ for (var i = 0; i < subscriptions.length; i++) {
+ subscriptions[i].resume();
}
- ..onCancel = () {
- for (var i = 0; i < subscriptions.length; i++) {
- // Canceling more than once is safe.
- subscriptions[i].cancel();
- }
- };
+ }, onCancel: () {
+ for (var i = 0; i < subscriptions.length; i++) {
+ // Canceling more than once is safe.
+ subscriptions[i].cancel();
+ }
+ });
if (subscriptions.isEmpty) {
controller.close();
diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart
index 501cf91..2c94e05 100644
--- a/lib/src/subscription_stream.dart
+++ b/lib/src/subscription_stream.dart
@@ -31,12 +31,12 @@
/// an error.
SubscriptionStream(StreamSubscription<T> subscription)
: _source = subscription {
+ var source = _source!;
+ source.pause();
// Clear callbacks to avoid keeping them alive unnecessarily.
- subscription
- ..pause()
- ..onData(null)
- ..onError(null)
- ..onDone(null);
+ source.onData(null);
+ source.onError(null);
+ source.onDone(null);
}
@override
diff --git a/pubspec.yaml b/pubspec.yaml
index 671174f..3aa8080 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: async
-version: 2.5.0-nullsafety.4-dev
+version: 2.5.0-nullsafety.3
description: Utility functions and classes related to the 'dart:async' library.
homepage: https://www.github.com/dart-lang/async