Various clean-ups and null-safety improvements. (#140)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d2d99c3..ab5a57d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,5 @@
+## 2.5.0-nullsafety.4-dev
+
## 2.5.0-nullsafety.3
* Update SDK constraints to `>=2.12.0-0 <3.0.0` based on beta release
diff --git a/lib/src/async_cache.dart b/lib/src/async_cache.dart
index 99b3881..c4cd04e 100644
--- a/lib/src/async_cache.dart
+++ b/lib/src/async_cache.dart
@@ -42,7 +42,7 @@
/// The [duration] starts counting after the Future returned by [fetch]
/// completes, or after the Stream returned by [fetchStream] emits a done
/// event.
- AsyncCache(this._duration);
+ AsyncCache(Duration duration) : _duration = duration;
/// Creates a cache that invalidates after an in-flight request is complete.
///
diff --git a/lib/src/result/result.dart b/lib/src/result/result.dart
index 5f93b55..165b472 100644
--- a/lib/src/result/result.dart
+++ b/lib/src/result/result.dart
@@ -99,7 +99,7 @@
static Future<List<Result<T>>> captureAll<T>(Iterable<FutureOr<T>> elements) {
var results = <Result<T>?>[];
var pending = 0;
- late Completer<List<Result<T>>> completer;
+ var completer = Completer<List<Result<T>>>();
for (var element in elements) {
if (element is Future<T>) {
var i = results.length;
@@ -116,9 +116,8 @@
}
}
if (pending == 0) {
- return Future.value(List.from(results));
+ completer.complete(List.from(results));
}
- completer = Completer<List<Result<T>>>();
return completer.future;
}
diff --git a/lib/src/single_subscription_transformer.dart b/lib/src/single_subscription_transformer.dart
index 2e056b2..7c93b44 100644
--- a/lib/src/single_subscription_transformer.dart
+++ b/lib/src/single_subscription_transformer.dart
@@ -18,10 +18,8 @@
@override
Stream<T> bind(Stream<S> stream) {
- late StreamSubscription<S> subscription;
- var controller =
- StreamController<T>(sync: true, onCancel: () => subscription.cancel());
- subscription = stream.listen((value) {
+ var controller = StreamController<T>(sync: true);
+ var subscription = stream.listen((value) {
// TODO(nweiz): When we release a new major version, get rid of the second
// type parameter and avoid this conversion.
try {
@@ -30,6 +28,7 @@
controller.addError(error, stackTrace);
}
}, onError: controller.addError, onDone: controller.close);
+ controller.onCancel = subscription.cancel;
return controller.stream;
}
}
diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart
index 8c17ec2..14a04e5 100644
--- a/lib/src/stream_group.dart
+++ b/lib/src/stream_group.dart
@@ -29,7 +29,7 @@
class StreamGroup<T> implements Sink<Stream<T>> {
/// The stream through which all events from streams in the group are emitted.
Stream<T> get stream => _controller.stream;
- late StreamController<T> _controller;
+ final StreamController<T> _controller;
/// Whether the group is closed, meaning that no more streams may be added.
var _closed = false;
@@ -72,19 +72,20 @@
}
/// Creates a new stream group where [stream] is single-subscriber.
- StreamGroup() {
- _controller = StreamController<T>(
- onListen: _onListen,
- onPause: _onPause,
- onResume: _onResume,
- onCancel: _onCancel,
- sync: true);
+ StreamGroup() : _controller = StreamController<T>(sync: true) {
+ _controller
+ ..onListen = _onListen
+ ..onPause = _onPause
+ ..onResume = _onResume
+ ..onCancel = _onCancel;
}
/// Creates a new stream group where [stream] is a broadcast stream.
- StreamGroup.broadcast() {
- _controller = StreamController<T>.broadcast(
- onListen: _onListen, onCancel: _onCancelBroadcast, sync: true);
+ StreamGroup.broadcast()
+ : _controller = StreamController<T>.broadcast(sync: true) {
+ _controller
+ ..onListen = _onListen
+ ..onCancel = _onCancelBroadcast;
}
/// Adds [stream] as a member of this group.
diff --git a/lib/src/stream_zip.dart b/lib/src/stream_zip.dart
index 506bf6d..e417cdc 100644
--- a/lib/src/stream_zip.dart
+++ b/lib/src/stream_zip.dart
@@ -22,7 +22,7 @@
{Function? onError, void Function()? onDone, bool? cancelOnError}) {
cancelOnError = identical(true, cancelOnError);
var subscriptions = <StreamSubscription<T>>[];
- late StreamController<List<T>> controller;
+ var controller = StreamController<List<T>>();
late List<T?> current;
var dataCount = 0;
@@ -32,7 +32,7 @@
dataCount++;
if (dataCount == subscriptions.length) {
var data = List<T>.from(current);
- current = List<T?>.filled(subscriptions.length, null);
+ current.fillRange(0, current.length, null);
dataCount = 0;
for (var i = 0; i < subscriptions.length; i++) {
if (i != index) subscriptions[i].resume();
@@ -87,23 +87,26 @@
current = List<T?>.filled(subscriptions.length, null);
- controller = StreamController<List<T>>(onPause: () {
- for (var i = 0; i < subscriptions.length; i++) {
- // This may pause some subscriptions more than once.
- // These will not be resumed by onResume below, but must wait for the
- // next round.
- subscriptions[i].pause();
+ controller
+ ..onPause = () {
+ for (var i = 0; i < subscriptions.length; i++) {
+ // This may pause some subscriptions more than once.
+ // These will not be resumed by onResume below, but must wait for the
+ // next round.
+ subscriptions[i].pause();
+ }
}
- }, onResume: () {
- for (var i = 0; i < subscriptions.length; i++) {
- subscriptions[i].resume();
+ ..onResume = () {
+ for (var i = 0; i < subscriptions.length; i++) {
+ subscriptions[i].resume();
+ }
}
- }, onCancel: () {
- for (var i = 0; i < subscriptions.length; i++) {
- // Canceling more than once is safe.
- subscriptions[i].cancel();
- }
- });
+ ..onCancel = () {
+ for (var i = 0; i < subscriptions.length; i++) {
+ // Canceling more than once is safe.
+ subscriptions[i].cancel();
+ }
+ };
if (subscriptions.isEmpty) {
controller.close();
diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart
index 2c94e05..501cf91 100644
--- a/lib/src/subscription_stream.dart
+++ b/lib/src/subscription_stream.dart
@@ -31,12 +31,12 @@
/// an error.
SubscriptionStream(StreamSubscription<T> subscription)
: _source = subscription {
- var source = _source!;
- source.pause();
// Clear callbacks to avoid keeping them alive unnecessarily.
- source.onData(null);
- source.onError(null);
- source.onDone(null);
+ subscription
+ ..pause()
+ ..onData(null)
+ ..onError(null)
+ ..onDone(null);
}
@override
diff --git a/pubspec.yaml b/pubspec.yaml
index 3aa8080..671174f 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: async
-version: 2.5.0-nullsafety.3
+version: 2.5.0-nullsafety.4-dev
description: Utility functions and classes related to the 'dart:async' library.
homepage: https://www.github.com/dart-lang/async