Merge null_safety branch into master (#125)
Migrates this package to null safety and prepares for a null safety pre-release, once we have landed the package internally and in the SDK to verify its accuracy.
diff --git a/.travis.yml b/.travis.yml
index 6fb2f8d..0789302 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,18 +1,33 @@
language: dart
dart:
- - dev
- - 2.2.0
+ - dev
-dart_task:
- - test: --platform vm
- - test: --platform chrome
- - dartanalyzer
- - dartfmt
+jobs:
+ include:
+ - stage: analyze_and_format
+ name: "Analyzer"
+ dart: be/raw/latest
+ os: linux
+ script: dartanalyzer --enable-experiment=non-nullable --fatal-warnings --fatal-infos .
+ - stage: analyze_and_format
+ name: "Format"
+ dart: be/raw/latest
+ os: linux
+ script: dartfmt -n --set-exit-if-changed .
+ - stage: test
+ name: "Vm Tests"
+ dart: be/raw/latest
+ os: linux
+ script: pub run --enable-experiment=non-nullable test -p vm
+
+stages:
+ - analyze_and_format
+ - test
# Only building master means that we don't run two builds for each pull request.
branches:
- only: [master]
+ only: [master, null_safety]
cache:
directories:
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f247608..3ca7f6c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 2.5.0-nullsafety
+
+* Migrate this package to null safety.
+
## 2.4.2
* `StreamQueue` starts listening immediately to broadcast strings.
diff --git a/analysis_options.yaml b/analysis_options.yaml
index 18c26e0..b58ef99 100644
--- a/analysis_options.yaml
+++ b/analysis_options.yaml
@@ -7,6 +7,8 @@
todo: ignore
# Lint provided by pkg:pedantic – should fix this!
unawaited_futures: ignore
+ enable-experiment:
+ - non-nullable
linter:
rules:
diff --git a/lib/src/async_cache.dart b/lib/src/async_cache.dart
index e0a6f49..d6f5f7f 100644
--- a/lib/src/async_cache.dart
+++ b/lib/src/async_cache.dart
@@ -29,13 +29,13 @@
final Duration _duration;
/// Cached results of a previous [fetchStream] call.
- StreamSplitter<T> _cachedStreamSplitter;
+ StreamSplitter<T>? _cachedStreamSplitter;
/// Cached results of a previous [fetch] call.
- Future<T> _cachedValueFuture;
+ Future<T>? _cachedValueFuture;
/// Fires when the cache should be considered stale.
- Timer _stale;
+ Timer? _stale;
/// Creates a cache that invalidates its contents after [duration] has passed.
///
@@ -65,7 +65,7 @@
await _cachedValueFuture;
_startStaleTimer();
}
- return _cachedValueFuture;
+ return _cachedValueFuture!;
}
/// Returns a cached stream from a previous call to [fetchStream], or runs
@@ -78,12 +78,12 @@
if (_cachedValueFuture != null) {
throw StateError('Previously used to cache via `fetch`');
}
- _cachedStreamSplitter ??= StreamSplitter(
+ var splitter = _cachedStreamSplitter ??= StreamSplitter(
callback().transform(StreamTransformer.fromHandlers(handleDone: (sink) {
_startStaleTimer();
sink.close();
})));
- return _cachedStreamSplitter.split();
+ return splitter.split();
}
/// Removes any cached value.
diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart
index 9fd0534..267c6d1 100644
--- a/lib/src/cancelable_operation.dart
+++ b/lib/src/cancelable_operation.dart
@@ -35,7 +35,7 @@
/// moment this [CancelableOperation] is created, regardless of whether
/// [inner] has completed yet or not.
factory CancelableOperation.fromFuture(Future<T> inner,
- {FutureOr Function() onCancel}) {
+ {FutureOr Function()? onCancel}) {
var completer = CancelableCompleter<T>(onCancel: onCancel);
completer.complete(inner);
return completer.operation;
@@ -55,7 +55,7 @@
value.then((value) {
controller.add(value);
controller.close();
- }, onError: (error, StackTrace stackTrace) {
+ }, onError: (Object error, StackTrace stackTrace) {
controller.addError(error, stackTrace);
controller.close();
});
@@ -68,8 +68,8 @@
/// If this operation completes, this completes to the same result as [value].
/// If this operation is cancelled, the returned future waits for the future
/// returned by [cancel], then completes to [cancellationValue].
- Future<T> valueOrCancellation([T cancellationValue]) {
- var completer = Completer<T>.sync();
+ Future<T?> valueOrCancellation([T? cancellationValue]) {
+ var completer = Completer<T?>.sync();
value.then((result) => completer.complete(result),
onError: completer.completeError);
@@ -93,23 +93,24 @@
/// If [propagateCancel] is `true` and the returned operation is canceled then
/// this operation is canceled. The default is `false`.
CancelableOperation<R> then<R>(FutureOr<R> Function(T) onValue,
- {FutureOr<R> Function(Object, StackTrace) onError,
- FutureOr<R> Function() onCancel,
+ {FutureOr<R> Function(Object, StackTrace)? onError,
+ FutureOr<R> Function()? onCancel,
bool propagateCancel = false}) {
final completer =
CancelableCompleter<R>(onCancel: propagateCancel ? cancel : null);
- valueOrCancellation().then((T result) {
+ valueOrCancellation().then((T? result) {
if (!completer.isCanceled) {
if (isCompleted) {
- completer.complete(Future.sync(() => onValue(result)));
+ assert(result is T);
+ completer.complete(Future.sync(() => onValue(result!)));
} else if (onCancel != null) {
completer.complete(Future.sync(onCancel));
} else {
completer._cancel();
}
}
- }, onError: (error, StackTrace stackTrace) {
+ }, onError: (Object error, StackTrace stackTrace) {
if (!completer.isCanceled) {
if (onError != null) {
completer.complete(Future.sync(() => onError(error, stackTrace)));
@@ -145,7 +146,7 @@
final Completer<T> _inner;
/// The callback to call if the future is canceled.
- final FutureOrCallback _onCancel;
+ final FutureOrCallback? _onCancel;
/// Creates a new completer for a [CancelableOperation].
///
@@ -155,15 +156,14 @@
///
/// [onCancel] will be called synchronously when the operation is canceled.
/// It's guaranteed to only be called once.
- CancelableCompleter({FutureOr Function() onCancel})
+ CancelableCompleter({FutureOr Function()? onCancel})
: _onCancel = onCancel,
_inner = Completer<T>() {
- _operation = CancelableOperation<T>._(this);
+ operation = CancelableOperation<T>._(this);
}
/// The operation controlled by this completer.
- CancelableOperation<T> get operation => _operation;
- CancelableOperation<T> _operation;
+ late final CancelableOperation<T> operation;
/// Whether the completer has completed.
bool get isCompleted => _isCompleted;
@@ -180,7 +180,7 @@
///
/// If [value] is a [Future], this will complete to the result of that
/// [Future] once it completes.
- void complete([FutureOr<T> value]) {
+ void complete([FutureOr<T>? value]) {
if (_isCompleted) throw StateError('Operation already completed');
_isCompleted = true;
@@ -200,14 +200,14 @@
future.then((result) {
if (_isCanceled) return;
_inner.complete(result);
- }, onError: (error, StackTrace stackTrace) {
+ }, onError: (Object error, StackTrace stackTrace) {
if (_isCanceled) return;
_inner.completeError(error, stackTrace);
});
}
/// Completes [operation] to [error].
- void completeError(Object error, [StackTrace stackTrace]) {
+ void completeError(Object error, [StackTrace? stackTrace]) {
if (_isCompleted) throw StateError('Operation already completed');
_isCompleted = true;
@@ -221,7 +221,8 @@
return _cancelMemo.runOnce(() {
_isCanceled = true;
- if (_onCancel != null) return _onCancel();
+ var onCancel = _onCancel;
+ if (onCancel != null) return onCancel();
});
}
}
diff --git a/lib/src/delegate/event_sink.dart b/lib/src/delegate/event_sink.dart
index bc33b19..33d88e9 100644
--- a/lib/src/delegate/event_sink.dart
+++ b/lib/src/delegate/event_sink.dart
@@ -33,7 +33,7 @@
}
@override
- void addError(error, [StackTrace stackTrace]) {
+ void addError(error, [StackTrace? stackTrace]) {
_sink.addError(error, stackTrace);
}
diff --git a/lib/src/delegate/future.dart b/lib/src/delegate/future.dart
index 984caf6..1155452 100644
--- a/lib/src/delegate/future.dart
+++ b/lib/src/delegate/future.dart
@@ -25,11 +25,11 @@
Stream<T> asStream() => _future.asStream();
@override
- Future<T> catchError(Function onError, {bool Function(Object error) test}) =>
+ Future<T> catchError(Function onError, {bool Function(Object error)? test}) =>
_future.catchError(onError, test: test);
@override
- Future<S> then<S>(FutureOr<S> Function(T) onValue, {Function onError}) =>
+ Future<S> then<S>(FutureOr<S> Function(T) onValue, {Function? onError}) =>
_future.then(onValue, onError: onError);
@override
@@ -37,6 +37,6 @@
_future.whenComplete(action);
@override
- Future<T> timeout(Duration timeLimit, {FutureOr<T> Function() onTimeout}) =>
+ Future<T> timeout(Duration timeLimit, {FutureOr<T> Function()? onTimeout}) =>
_future.timeout(timeLimit, onTimeout: onTimeout);
}
diff --git a/lib/src/delegate/stream_sink.dart b/lib/src/delegate/stream_sink.dart
index 9005d1d..ad76e90 100644
--- a/lib/src/delegate/stream_sink.dart
+++ b/lib/src/delegate/stream_sink.dart
@@ -36,7 +36,7 @@
}
@override
- void addError(error, [StackTrace stackTrace]) {
+ void addError(error, [StackTrace? stackTrace]) {
_sink.addError(error, stackTrace);
}
diff --git a/lib/src/delegate/stream_subscription.dart b/lib/src/delegate/stream_subscription.dart
index 392e27b..45b1134 100644
--- a/lib/src/delegate/stream_subscription.dart
+++ b/lib/src/delegate/stream_subscription.dart
@@ -31,22 +31,22 @@
: TypeSafeStreamSubscription<T>(subscription);
@override
- void onData(void Function(T) handleData) {
+ void onData(void Function(T)? handleData) {
_source.onData(handleData);
}
@override
- void onError(Function handleError) {
+ void onError(Function? handleError) {
_source.onError(handleError);
}
@override
- void onDone(void Function() handleDone) {
+ void onDone(void Function()? handleDone) {
_source.onDone(handleDone);
}
@override
- void pause([Future resumeFuture]) {
+ void pause([Future? resumeFuture]) {
_source.pause(resumeFuture);
}
@@ -59,7 +59,7 @@
Future cancel() => _source.cancel();
@override
- Future<E> asFuture<E>([E futureValue]) => _source.asFuture(futureValue);
+ Future<E> asFuture<E>([E? futureValue]) => _source.asFuture(futureValue);
@override
bool get isPaused => _source.isPaused;
diff --git a/lib/src/future_group.dart b/lib/src/future_group.dart
index 402ae46..3a6291f 100644
--- a/lib/src/future_group.dart
+++ b/lib/src/future_group.dart
@@ -44,13 +44,13 @@
Stream get onIdle =>
(_onIdleController ??= StreamController.broadcast(sync: true)).stream;
- StreamController _onIdleController;
+ StreamController? _onIdleController;
/// The values emitted by the futures that have been added to the group, in
/// the order they were added.
///
/// The slots for futures that haven't completed yet are `null`.
- final _values = <T>[];
+ final _values = <T?>[];
/// Wait for [task] to complete.
@override
@@ -71,12 +71,13 @@
_values[index] = value;
if (_pending != 0) return null;
- if (_onIdleController != null) _onIdleController.add(null);
+ var onIdleController = _onIdleController;
+ if (onIdleController != null) onIdleController.add(null);
if (!_closed) return null;
- if (_onIdleController != null) _onIdleController.close();
- _completer.complete(_values);
- }).catchError((error, StackTrace stackTrace) {
+ if (onIdleController != null) onIdleController.close();
+ _completer.complete(_values.whereType<T>().toList());
+ }).catchError((Object error, StackTrace stackTrace) {
if (_completer.isCompleted) return null;
_completer.completeError(error, stackTrace);
});
@@ -89,6 +90,6 @@
_closed = true;
if (_pending != 0) return;
if (_completer.isCompleted) return;
- _completer.complete(_values);
+ _completer.complete(_values.whereType<T>().toList());
}
}
diff --git a/lib/src/lazy_stream.dart b/lib/src/lazy_stream.dart
index f5e65d1..e6779ee 100644
--- a/lib/src/lazy_stream.dart
+++ b/lib/src/lazy_stream.dart
@@ -15,7 +15,7 @@
/// produce a `Stream`.
class LazyStream<T> extends Stream<T> {
/// The callback that's called to create the inner stream.
- FutureOrCallback<Stream<T>> _callback;
+ FutureOrCallback<Stream<T>>? _callback;
/// Creates a single-subscription `Stream` that calls [callback] when it gets
/// a listener and forwards to the returned stream.
@@ -25,15 +25,15 @@
}
@override
- StreamSubscription<T> listen(void Function(T) onData,
- {Function onError, void Function() onDone, bool cancelOnError}) {
- if (_callback == null) {
+ StreamSubscription<T> listen(void Function(T)? onData,
+ {Function? onError, void Function()? onDone, bool? cancelOnError}) {
+ var callback = _callback;
+ if (callback == null) {
throw StateError('Stream has already been listened to.');
}
// Null out the callback before we invoke it to ensure that even while
// running it, this can't be called twice.
- var callback = _callback;
_callback = null;
var result = callback();
@@ -41,7 +41,7 @@
if (result is Future<Stream<T>>) {
stream = StreamCompleter.fromFuture(result);
} else {
- stream = result as Stream<T>;
+ stream = result;
}
return stream.listen(onData,
diff --git a/lib/src/null_stream_sink.dart b/lib/src/null_stream_sink.dart
index b28df2a..34b100b 100644
--- a/lib/src/null_stream_sink.dart
+++ b/lib/src/null_stream_sink.dart
@@ -43,12 +43,12 @@
///
/// If [done] is passed, it's used as the [Sink.done] future. Otherwise, a
/// completed future is used.
- NullStreamSink({Future done}) : done = done ?? Future.value();
+ NullStreamSink({Future? done}) : done = done ?? Future.value();
/// Creates a null sink whose [done] future emits [error].
///
/// Note that this error will not be considered uncaught.
- NullStreamSink.error(error, [StackTrace stackTrace])
+ NullStreamSink.error(Object error, [StackTrace? stackTrace])
: done = Future.error(error, stackTrace)
// Don't top-level the error. This gives the user a change to call
// [close] or [done], and matches the behavior of a remote endpoint
@@ -61,7 +61,7 @@
}
@override
- void addError(error, [StackTrace stackTrace]) {
+ void addError(Object error, [StackTrace? stackTrace]) {
_checkEventAllowed();
}
@@ -70,7 +70,7 @@
_checkEventAllowed();
_addingStream = true;
- var future = stream.listen(null).cancel() ?? Future.value();
+ var future = stream.listen(null).cancel();
return future.whenComplete(() {
_addingStream = false;
});
diff --git a/lib/src/result/capture_sink.dart b/lib/src/result/capture_sink.dart
index a7c3a59..562f5f9 100644
--- a/lib/src/result/capture_sink.dart
+++ b/lib/src/result/capture_sink.dart
@@ -18,7 +18,7 @@
}
@override
- void addError(Object error, [StackTrace stackTrace]) {
+ void addError(Object error, [StackTrace? stackTrace]) {
_sink.add(Result.error(error, stackTrace));
}
diff --git a/lib/src/result/error.dart b/lib/src/result/error.dart
index 76e0275..27ce155 100644
--- a/lib/src/result/error.dart
+++ b/lib/src/result/error.dart
@@ -8,7 +8,7 @@
import 'value.dart';
/// A result representing a thrown error.
-class ErrorResult implements Result<Null> {
+class ErrorResult implements Result<Never> {
/// The error object that was thrown.
final Object error;
@@ -20,11 +20,13 @@
@override
bool get isError => true;
@override
- ValueResult<Null> get asValue => null;
+ ValueResult<Never>? get asValue => null;
@override
ErrorResult get asError => this;
- ErrorResult(this.error, this.stackTrace);
+ ErrorResult(this.error, [StackTrace? stackTrace])
+ // TODO: Use AsyncError.defaultStackTrace(error) once available
+ : stackTrace = stackTrace ?? StackTrace.fromString('');
@override
void complete(Completer completer) {
@@ -37,7 +39,7 @@
}
@override
- Future<Null> get asFuture => Future<Null>.error(error, stackTrace);
+ Future<Never> get asFuture => Future<Never>.error(error, stackTrace);
/// Calls an error handler with the error and stacktrace.
///
diff --git a/lib/src/result/future.dart b/lib/src/result/future.dart
index ff30546..20a5ebf 100644
--- a/lib/src/result/future.dart
+++ b/lib/src/result/future.dart
@@ -16,8 +16,8 @@
/// The result of the wrapped [Future], if it's completed.
///
/// If it hasn't completed yet, this will be `null`.
- Result<T> get result => _result;
- Result<T> _result;
+ Result<T>? get result => _result;
+ Result<T>? _result;
ResultFuture(Future<T> future) : super(future) {
Result.capture(future).then((result) {
diff --git a/lib/src/result/release_sink.dart b/lib/src/result/release_sink.dart
index 5d8267a..bf6dd50 100644
--- a/lib/src/result/release_sink.dart
+++ b/lib/src/result/release_sink.dart
@@ -18,7 +18,7 @@
}
@override
- void addError(Object error, [StackTrace stackTrace]) {
+ void addError(Object error, [StackTrace? stackTrace]) {
// Errors may be added by intermediate processing, even if it is never
// added by CaptureSink.
_sink.addError(error, stackTrace);
diff --git a/lib/src/result/result.dart b/lib/src/result/result.dart
index e604782..5f93b55 100644
--- a/lib/src/result/result.dart
+++ b/lib/src/result/result.dart
@@ -63,7 +63,7 @@
factory Result(T Function() computation) {
try {
return ValueResult<T>(computation());
- } catch (e, s) {
+ } on Object catch (e, s) {
return ErrorResult(e, s);
}
}
@@ -76,7 +76,7 @@
/// Creates a `Result` holding an error.
///
/// Alias for [ErrorResult.ErrorResult].
- factory Result.error(Object error, [StackTrace stackTrace]) =>
+ factory Result.error(Object error, [StackTrace? stackTrace]) =>
ErrorResult(error, stackTrace);
/// Captures the result of a future into a `Result` future.
@@ -85,7 +85,7 @@
/// Errors have been converted to an [ErrorResult] value.
static Future<Result<T>> capture<T>(Future<T> future) {
return future.then((value) => ValueResult(value),
- onError: (error, StackTrace stackTrace) =>
+ onError: (Object error, StackTrace stackTrace) =>
ErrorResult(error, stackTrace));
}
@@ -97,9 +97,9 @@
/// wrapped as a [Result.value].
/// The returned future will never have an error.
static Future<List<Result<T>>> captureAll<T>(Iterable<FutureOr<T>> elements) {
- var results = <Result<T>>[];
+ var results = <Result<T>?>[];
var pending = 0;
- Completer<List<Result<T>>> completer;
+ late Completer<List<Result<T>>> completer;
for (var element in elements) {
if (element is Future<T>) {
var i = results.length;
@@ -108,15 +108,15 @@
Result.capture<T>(element).then((result) {
results[i] = result;
if (--pending == 0) {
- completer.complete(results);
+ completer.complete(List.from(results));
}
});
} else {
- results.add(Result<T>.value(element as T));
+ results.add(Result<T>.value(element));
}
}
if (pending == 0) {
- return Future<List<Result<T>>>.value(results);
+ return Future.value(List.from(results));
}
completer = Completer<List<Result<T>>>();
return completer.future;
@@ -172,8 +172,8 @@
/// Otherwise both levels of results are value results, and a single
/// result with the value is returned.
static Result<T> flatten<T>(Result<Result<T>> result) {
- if (result.isValue) return result.asValue.value;
- return result.asError;
+ if (result.isValue) return result.asValue!.value;
+ return result.asError!;
}
/// Converts a sequence of results to a result of a list.
@@ -184,9 +184,9 @@
var values = <T>[];
for (var result in results) {
if (result.isValue) {
- values.add(result.asValue.value);
+ values.add(result.asValue!.value);
} else {
- return result.asError;
+ return result.asError!;
}
}
return Result<List<T>>.value(values);
@@ -205,12 +205,12 @@
/// If this is a value result, returns itself.
///
/// Otherwise returns `null`.
- ValueResult<T> get asValue;
+ ValueResult<T>? get asValue;
/// If this is an error result, returns itself.
///
/// Otherwise returns `null`.
- ErrorResult get asError;
+ ErrorResult? get asError;
/// Completes a completer with this result.
void complete(Completer<T> completer);
diff --git a/lib/src/result/value.dart b/lib/src/result/value.dart
index 5c1a60f..7cfc474 100644
--- a/lib/src/result/value.dart
+++ b/lib/src/result/value.dart
@@ -19,7 +19,7 @@
@override
ValueResult<T> get asValue => this;
@override
- ErrorResult get asError => null;
+ ErrorResult? get asError => null;
ValueResult(this.value);
diff --git a/lib/src/single_subscription_transformer.dart b/lib/src/single_subscription_transformer.dart
index fe939fc..2e056b2 100644
--- a/lib/src/single_subscription_transformer.dart
+++ b/lib/src/single_subscription_transformer.dart
@@ -11,14 +11,14 @@
/// listening to a stream as soon as it's bound.
///
/// This also casts the source stream's events to type `T`. If the cast fails,
-/// the result stream will emit a [CastError]. This behavior is deprecated, and
+/// the result stream will emit a [TypeError]. This behavior is deprecated, and
/// should not be relied upon.
class SingleSubscriptionTransformer<S, T> extends StreamTransformerBase<S, T> {
const SingleSubscriptionTransformer();
@override
Stream<T> bind(Stream<S> stream) {
- StreamSubscription<S> subscription;
+ late StreamSubscription<S> subscription;
var controller =
StreamController<T>(sync: true, onCancel: () => subscription.cancel());
subscription = stream.listen((value) {
@@ -26,7 +26,7 @@
// type parameter and avoid this conversion.
try {
controller.add(value as T);
- } on CastError catch (error, stackTrace) {
+ } on TypeError catch (error, stackTrace) {
controller.addError(error, stackTrace);
}
}, onError: controller.addError, onDone: controller.close);
diff --git a/lib/src/stream_completer.dart b/lib/src/stream_completer.dart
index 5128d57..27034c2 100644
--- a/lib/src/stream_completer.dart
+++ b/lib/src/stream_completer.dart
@@ -97,7 +97,7 @@
///
/// Any one of [setSourceStream], [setEmpty], and [setError] may be called at
/// most once. Trying to call any of them again will fail.
- void setError(error, [StackTrace stackTrace]) {
+ void setError(Object error, [StackTrace? stackTrace]) {
setSourceStream(Stream.fromFuture(Future.error(error, stackTrace)));
}
}
@@ -108,30 +108,31 @@
///
/// Created if the user listens on this stream before the source stream
/// is set, or if using [_setEmpty] so there is no source stream.
- StreamController<T> _controller;
+ StreamController<T>? _controller;
/// Source stream for the events provided by this stream.
///
/// Set when the completer sets the source stream using [_setSourceStream]
/// or [_setEmpty].
- Stream<T> _sourceStream;
+ Stream<T>? _sourceStream;
@override
- StreamSubscription<T> listen(void Function(T) onData,
- {Function onError, void Function() onDone, bool cancelOnError}) {
+ StreamSubscription<T> listen(void Function(T)? onData,
+ {Function? onError, void Function()? onDone, bool? cancelOnError}) {
if (_controller == null) {
- if (_sourceStream != null && !_sourceStream.isBroadcast) {
+ var sourceStream = _sourceStream;
+ if (sourceStream != null && !sourceStream.isBroadcast) {
// If the source stream is itself single subscription,
// just listen to it directly instead of creating a controller.
- return _sourceStream.listen(onData,
+ return sourceStream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
- _createController();
+ _ensureController();
if (_sourceStream != null) {
_linkStreamToController();
}
}
- return _controller.stream.listen(onData,
+ return _controller!.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
@@ -157,11 +158,10 @@
/// Links source stream to controller when both are available.
void _linkStreamToController() {
- assert(_controller != null);
- assert(_sourceStream != null);
- _controller
- .addStream(_sourceStream, cancelOnError: false)
- .whenComplete(_controller.close);
+ var controller = _controller!;
+ controller
+ .addStream(_sourceStream!, cancelOnError: false)
+ .whenComplete(controller.close);
}
/// Sets an empty source stream.
@@ -170,16 +170,13 @@
/// immediately.
void _setEmpty() {
assert(_sourceStream == null);
- if (_controller == null) {
- _createController();
- }
- _sourceStream = _controller.stream; // Mark stream as set.
- _controller.close();
+ var controller = _ensureController();
+ _sourceStream = controller.stream; // Mark stream as set.
+ controller.close();
}
// Creates the [_controller].
- void _createController() {
- assert(_controller == null);
- _controller = StreamController<T>(sync: true);
+ StreamController<T> _ensureController() {
+ return _controller ??= StreamController<T>(sync: true);
}
}
diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart
index e9a4afc..8c17ec2 100644
--- a/lib/src/stream_group.dart
+++ b/lib/src/stream_group.dart
@@ -29,7 +29,7 @@
class StreamGroup<T> implements Sink<Stream<T>> {
/// The stream through which all events from streams in the group are emitted.
Stream<T> get stream => _controller.stream;
- StreamController<T> _controller;
+ late StreamController<T> _controller;
/// Whether the group is closed, meaning that no more streams may be added.
var _closed = false;
@@ -47,7 +47,7 @@
/// subscriptions will be canceled and set to null again. Single-subscriber
/// stream subscriptions will be left intact, since they can't be
/// re-subscribed.
- final _subscriptions = <Stream<T>, StreamSubscription<T>>{};
+ final _subscriptions = <Stream<T>, StreamSubscription<T>?>{};
/// Merges the events from [streams] into a single single-subscription stream.
///
@@ -100,7 +100,7 @@
///
/// Throws a [StateError] if this group is closed.
@override
- Future add(Stream<T> stream) {
+ Future? add(Stream<T> stream) {
if (_closed) {
throw StateError("Can't add a Stream to a closed StreamGroup.");
}
@@ -130,7 +130,7 @@
///
/// If [stream]'s subscription is canceled, this returns
/// [StreamSubscription.cancel]'s return value. Otherwise, it returns `null`.
- Future remove(Stream<T> stream) {
+ Future? remove(Stream<T> stream) {
var subscription = _subscriptions.remove(stream);
var future = subscription == null ? null : subscription.cancel();
if (_closed && _subscriptions.isEmpty) _controller.close();
@@ -155,7 +155,7 @@
void _onPause() {
_state = _StreamGroupState.paused;
for (var subscription in _subscriptions.values) {
- subscription.pause();
+ subscription!.pause();
}
}
@@ -163,19 +163,18 @@
void _onResume() {
_state = _StreamGroupState.listening;
for (var subscription in _subscriptions.values) {
- subscription.resume();
+ subscription!.resume();
}
}
/// A callback called when [stream] is canceled.
///
/// This is only called for single-subscription groups.
- Future _onCancel() {
+ Future? _onCancel() {
_state = _StreamGroupState.canceled;
var futures = _subscriptions.values
- .map((subscription) => subscription.cancel())
- .where((future) => future != null)
+ .map((subscription) => subscription!.cancel())
.toList();
_subscriptions.clear();
@@ -194,7 +193,7 @@
// will still be added to [_controller], but then they'll be dropped since
// it has no listeners.
if (!stream.isBroadcast) return;
- subscription.cancel();
+ subscription!.cancel();
_subscriptions[stream] = null;
});
}
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 278e249..5aa6054 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -87,7 +87,7 @@
///
/// Set to subscription when listening, and set to `null` when the
/// subscription is done (and [_isDone] is set to true).
- StreamSubscription<T> _subscription;
+ StreamSubscription<T>? _subscription;
/// Whether the event source is done.
bool _isDone = false;
@@ -343,7 +343,7 @@
transaction.reject();
}
return result;
- }, onError: (error) {
+ }, onError: (Object error) {
transaction.commit(queue);
throw error;
});
@@ -399,7 +399,7 @@
/// After calling `cancel`, no further events can be requested.
/// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel]
/// may be called again.
- Future cancel({bool immediate = false}) {
+ Future? cancel({bool immediate = false}) {
if (_isClosed) throw _failClosed();
_isClosed = true;
@@ -454,11 +454,10 @@
}
_isDone = true;
- if (_subscription == null) {
+ var subscription = _subscription;
+ if (subscription == null) {
return _source;
}
-
- var subscription = _subscription;
_subscription = null;
var wasPaused = subscription.isPaused;
@@ -475,7 +474,7 @@
///
/// The event source is restarted by the next call to [_ensureListening].
void _pause() {
- _subscription.pause();
+ _subscription!.pause();
}
/// Ensures that we are listening on events from the event source.
@@ -488,22 +487,22 @@
if (_subscription == null) {
_subscription = _source.listen((data) {
_addResult(Result.value(data));
- }, onError: (error, StackTrace stackTrace) {
+ }, onError: (Object error, StackTrace stackTrace) {
_addResult(Result.error(error, stackTrace));
}, onDone: () {
_subscription = null;
_close();
});
} else {
- _subscription.resume();
+ _subscription!.resume();
}
}
/// Cancels the underlying event source.
- Future _cancel() {
+ Future? _cancel() {
if (_isDone) return null;
_subscription ??= _source.listen(null);
- var future = _subscription.cancel();
+ var future = _subscription!.cancel();
_close();
return future;
}
@@ -771,7 +770,8 @@
var event = events.removeFirst();
if (event.isError) {
- _completer.completeError(event.asError.error, event.asError.stackTrace);
+ _completer.completeError(
+ event.asError!.error, event.asError!.stackTrace);
return true;
}
}
@@ -814,10 +814,10 @@
var event = events.removeFirst();
if (event.isError) {
- event.asError.complete(_completer);
+ event.asError!.complete(_completer);
return true;
}
- _list.add(event.asValue.value);
+ _list.add(event.asValue!.value);
}
_completer.complete(_list);
return true;
@@ -837,10 +837,10 @@
}
var event = events.elementAt(_list.length);
if (event.isError) {
- event.asError.complete(_completer);
+ event.asError!.complete(_completer);
return true;
}
- _list.add(event.asValue.value);
+ _list.add(event.asValue!.value);
}
_completer.complete(_list);
return true;
@@ -954,8 +954,7 @@
/// [StreamQueue._updateRequests].
class _TransactionRequest<T> implements _EventRequest<T> {
/// The transaction created by this request.
- StreamQueueTransaction<T> get transaction => _transaction;
- StreamQueueTransaction<T> _transaction;
+ late final StreamQueueTransaction<T> transaction;
/// The controller that passes events to [transaction].
final _controller = StreamController<T>(sync: true);
@@ -964,7 +963,7 @@
var _eventsSent = 0;
_TransactionRequest(StreamQueue<T> parent) {
- _transaction = StreamQueueTransaction._(parent, _controller.stream);
+ transaction = StreamQueueTransaction._(parent, _controller.stream);
}
@override
@@ -973,6 +972,6 @@
events[_eventsSent++].addTo(_controller);
}
if (isDone && !_controller.isClosed) _controller.close();
- return transaction._committed || _transaction._rejected;
+ return transaction._committed || transaction._rejected;
}
}
diff --git a/lib/src/stream_sink_completer.dart b/lib/src/stream_sink_completer.dart
index ebfd717..10e549d 100644
--- a/lib/src/stream_sink_completer.dart
+++ b/lib/src/stream_sink_completer.dart
@@ -70,7 +70,7 @@
///
/// Either of [setDestinationSink] or [setError] may be called at most once.
/// Trying to call either of them again will fail.
- void setError(error, [StackTrace stackTrace]) {
+ void setError(Object error, [StackTrace? stackTrace]) {
setDestinationSink(NullStreamSink.error(error, stackTrace));
}
}
@@ -81,18 +81,18 @@
///
/// Created if the user adds events to this sink before the destination sink
/// is set.
- StreamController<T> _controller;
+ StreamController<T>? _controller;
/// Completer for [done].
///
/// Created if the user requests the [done] future before the destination sink
/// is set.
- Completer _doneCompleter;
+ Completer? _doneCompleter;
/// Destination sink for the events added to this sink.
///
/// Set when [StreamSinkCompleter.setDestinationSink] is called.
- StreamSink<T> _destinationSink;
+ StreamSink<T>? _destinationSink;
/// Whether events should be sent directly to [_destinationSink], as opposed
/// to going through [_controller].
@@ -100,56 +100,52 @@
@override
Future get done {
- if (_doneCompleter != null) return _doneCompleter.future;
+ if (_doneCompleter != null) return _doneCompleter!.future;
if (_destinationSink == null) {
_doneCompleter = Completer.sync();
- return _doneCompleter.future;
+ return _doneCompleter!.future;
}
- return _destinationSink.done;
+ return _destinationSink!.done;
}
@override
void add(T event) {
if (_canSendDirectly) {
- _destinationSink.add(event);
+ _destinationSink!.add(event);
} else {
- _ensureController();
- _controller.add(event);
+ _ensureController().add(event);
}
}
@override
- void addError(error, [StackTrace stackTrace]) {
+ void addError(error, [StackTrace? stackTrace]) {
if (_canSendDirectly) {
- _destinationSink.addError(error, stackTrace);
+ _destinationSink!.addError(error, stackTrace);
} else {
- _ensureController();
- _controller.addError(error, stackTrace);
+ _ensureController().addError(error, stackTrace);
}
}
@override
Future addStream(Stream<T> stream) {
- if (_canSendDirectly) return _destinationSink.addStream(stream);
+ if (_canSendDirectly) return _destinationSink!.addStream(stream);
- _ensureController();
- return _controller.addStream(stream, cancelOnError: false);
+ return _ensureController().addStream(stream, cancelOnError: false);
}
@override
Future close() {
if (_canSendDirectly) {
- _destinationSink.close();
+ _destinationSink!.close();
} else {
- _ensureController();
- _controller.close();
+ _ensureController().close();
}
return done;
}
/// Create [_controller] if it doesn't yet exist.
- void _ensureController() {
- _controller ??= StreamController(sync: true);
+ StreamController<T> _ensureController() {
+ return _controller ??= StreamController(sync: true);
}
/// Sets the destination sink to which events from this sink will be provided.
@@ -168,7 +164,7 @@
// Catch any error that may come from [addStream] or [sink.close]. They'll
// be reported through [done] anyway.
sink
- .addStream(_controller.stream)
+ .addStream(_controller!.stream)
.whenComplete(sink.close)
.catchError((_) {});
}
@@ -176,7 +172,7 @@
// If the user has already asked when the sink is done, connect the sink's
// done callback to that completer.
if (_doneCompleter != null) {
- _doneCompleter.complete(sink.done);
+ _doneCompleter!.complete(sink.done);
}
}
}
diff --git a/lib/src/stream_sink_transformer.dart b/lib/src/stream_sink_transformer.dart
index 1dc27ed..bdcb196 100644
--- a/lib/src/stream_sink_transformer.dart
+++ b/lib/src/stream_sink_transformer.dart
@@ -34,9 +34,9 @@
/// they're passed are forwarded to the inner sink. If a handler is omitted,
/// the event is passed through unaltered.
factory StreamSinkTransformer.fromHandlers(
- {void Function(S, EventSink<T>) handleData,
- void Function(Object, StackTrace, EventSink<T>) handleError,
- void Function(EventSink<T>) handleDone}) {
+ {void Function(S, EventSink<T>)? handleData,
+ void Function(Object, StackTrace, EventSink<T>)? handleError,
+ void Function(EventSink<T>)? handleDone}) {
return HandlerTransformer<S, T>(handleData, handleError, handleDone);
}
diff --git a/lib/src/stream_sink_transformer/handler_transformer.dart b/lib/src/stream_sink_transformer/handler_transformer.dart
index 8d37ce3..b112e3e 100644
--- a/lib/src/stream_sink_transformer/handler_transformer.dart
+++ b/lib/src/stream_sink_transformer/handler_transformer.dart
@@ -11,6 +11,9 @@
typedef HandleData<S, T> = void Function(S data, EventSink<T> sink);
/// The type of the callback for handling error events.
+//
+// TODO: Update to take a non-nullable StackTrace once that change lands in
+// the sdk.
typedef HandleError<T> = void Function(
Object error, StackTrace stackTrace, EventSink<T> sink);
@@ -20,13 +23,13 @@
/// A [StreamSinkTransformer] that delegates events to the given handlers.
class HandlerTransformer<S, T> implements StreamSinkTransformer<S, T> {
/// The handler for data events.
- final HandleData<S, T> _handleData;
+ final HandleData<S, T>? _handleData;
/// The handler for error events.
- final HandleError<T> _handleError;
+ final HandleError<T>? _handleError;
/// The handler for done events.
- final HandleDone<T> _handleDone;
+ final HandleDone<T>? _handleDone;
HandlerTransformer(this._handleData, this._handleError, this._handleDone);
@@ -55,19 +58,22 @@
@override
void add(S event) {
- if (_transformer._handleData == null) {
+ var handleData = _transformer._handleData;
+ if (handleData == null) {
_inner.add(event as T);
} else {
- _transformer._handleData(event, _safeCloseInner);
+ handleData(event, _safeCloseInner);
}
}
@override
- void addError(error, [StackTrace stackTrace]) {
- if (_transformer._handleError == null) {
+ void addError(error, [StackTrace? stackTrace]) {
+ var handleError = _transformer._handleError;
+ if (handleError == null) {
_inner.addError(error, stackTrace);
} else {
- _transformer._handleError(error, stackTrace, _safeCloseInner);
+ handleError(error, stackTrace ?? AsyncError.defaultStackTrace(error),
+ _safeCloseInner);
}
}
@@ -82,9 +88,10 @@
@override
Future close() {
- if (_transformer._handleDone == null) return _inner.close();
+ var handleDone = _transformer._handleDone;
+ if (handleDone == null) return _inner.close();
- _transformer._handleDone(_safeCloseInner);
+ handleDone(_safeCloseInner);
return _inner.done;
}
}
diff --git a/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart b/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart
index 1df7e5a..2afcbff 100644
--- a/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart
+++ b/lib/src/stream_sink_transformer/stream_transformer_wrapper.dart
@@ -50,7 +50,7 @@
}
@override
- void addError(error, [StackTrace stackTrace]) {
+ void addError(error, [StackTrace? stackTrace]) {
_controller.addError(error, stackTrace);
}
diff --git a/lib/src/stream_splitter.dart b/lib/src/stream_splitter.dart
index e9f326e..f7377d6 100644
--- a/lib/src/stream_splitter.dart
+++ b/lib/src/stream_splitter.dart
@@ -28,7 +28,7 @@
/// The subscription to [_stream].
///
/// This will be `null` until a branch has a listener.
- StreamSubscription<T> _subscription;
+ StreamSubscription<T>? _subscription;
/// The buffer of events or errors that have already been emitted by
/// [_stream].
@@ -57,7 +57,7 @@
///
/// [count] defaults to 2. This is the same as creating [count] branches and
/// then closing the [StreamSplitter].
- static List<Stream<T>> splitFrom<T>(Stream<T> stream, [int count]) {
+ static List<Stream<T>> splitFrom<T>(Stream<T> stream, [int? count]) {
count ??= 2;
var splitter = StreamSplitter<T>(stream);
var streams = List<Stream<T>>.generate(count, (_) => splitter.split());
@@ -125,8 +125,8 @@
assert(_controllers.isEmpty);
assert(_isClosed);
- Future future;
- if (_subscription != null) future = _subscription.cancel();
+ Future? future;
+ if (_subscription != null) future = _subscription!.cancel();
if (future != null) _closeGroup.add(future);
_closeGroup.close();
}
@@ -142,7 +142,7 @@
// Resume the subscription in case it was paused, either because all the
// controllers were paused or because the last one was canceled. If it
// wasn't paused, this will be a no-op.
- _subscription.resume();
+ _subscription!.resume();
} else {
_subscription =
_stream.listen(_onData, onError: _onError, onDone: _onDone);
@@ -152,14 +152,14 @@
/// Pauses [_subscription] if every controller is paused.
void _onPause() {
if (!_controllers.every((controller) => controller.isPaused)) return;
- _subscription.pause();
+ _subscription!.pause();
}
/// Resumes [_subscription].
///
/// If [_subscription] wasn't paused, this is a no-op.
void _onResume() {
- _subscription.resume();
+ _subscription!.resume();
}
/// Removes [controller] from [_controllers] and cancels or pauses
@@ -175,7 +175,7 @@
if (_isClosed) {
_cancelSubscription();
} else {
- _subscription.pause();
+ _subscription!.pause();
}
}
diff --git a/lib/src/stream_subscription_transformer.dart b/lib/src/stream_subscription_transformer.dart
index 0e28972..d03ea70 100644
--- a/lib/src/stream_subscription_transformer.dart
+++ b/lib/src/stream_subscription_transformer.dart
@@ -28,9 +28,9 @@
/// [StreamSubscription]: [handleCancel] must call `cancel()`, [handlePause]
/// must call `pause()`, and [handleResume] must call `resume()`.
StreamTransformer<T, T> subscriptionTransformer<T>(
- {Future Function(StreamSubscription<T>) handleCancel,
- void Function(StreamSubscription<T>) handlePause,
- void Function(StreamSubscription<T>) handleResume}) {
+ {Future Function(StreamSubscription<T>)? handleCancel,
+ void Function(StreamSubscription<T>)? handlePause,
+ void Function(StreamSubscription<T>)? handleResume}) {
return StreamTransformer((stream, cancelOnError) {
return _TransformedSubscription(
stream.listen(null, cancelOnError: cancelOnError),
@@ -50,7 +50,7 @@
/// methods.
class _TransformedSubscription<T> implements StreamSubscription<T> {
/// The wrapped subscription.
- StreamSubscription<T> _inner;
+ StreamSubscription<T>? _inner;
/// The callback to run when [cancel] is called.
final _AsyncHandler<T> _handleCancel;
@@ -68,47 +68,47 @@
this._inner, this._handleCancel, this._handlePause, this._handleResume);
@override
- void onData(void Function(T) handleData) {
+ void onData(void Function(T)? handleData) {
_inner?.onData(handleData);
}
@override
- void onError(Function handleError) {
+ void onError(Function? handleError) {
_inner?.onError(handleError);
}
@override
- void onDone(void Function() handleDone) {
+ void onDone(void Function()? handleDone) {
_inner?.onDone(handleDone);
}
@override
Future cancel() => _cancelMemoizer.runOnce(() {
- var inner = _inner;
- _inner.onData(null);
- _inner.onDone(null);
+ var inner = _inner!;
+ inner.onData(null);
+ inner.onDone(null);
// Setting onError to null will cause errors to be top-leveled.
- _inner.onError((_, __) {});
+ inner.onError((_, __) {});
_inner = null;
return _handleCancel(inner);
});
final _cancelMemoizer = AsyncMemoizer();
@override
- void pause([Future resumeFuture]) {
+ void pause([Future? resumeFuture]) {
if (_cancelMemoizer.hasRun) return;
if (resumeFuture != null) resumeFuture.whenComplete(resume);
- _handlePause(_inner);
+ _handlePause(_inner!);
}
@override
void resume() {
if (_cancelMemoizer.hasRun) return;
- _handleResume(_inner);
+ _handleResume(_inner!);
}
@override
- Future<E> asFuture<E>([E futureValue]) =>
+ Future<E> asFuture<E>([E? futureValue]) =>
_inner?.asFuture(futureValue) ?? Completer<E>().future;
}
diff --git a/lib/src/stream_zip.dart b/lib/src/stream_zip.dart
index e319746..506bf6d 100644
--- a/lib/src/stream_zip.dart
+++ b/lib/src/stream_zip.dart
@@ -18,12 +18,12 @@
StreamZip(Iterable<Stream<T>> streams) : _streams = streams;
@override
- StreamSubscription<List<T>> listen(void Function(List<T>) onData,
- {Function onError, void Function() onDone, bool cancelOnError}) {
+ StreamSubscription<List<T>> listen(void Function(List<T>)? onData,
+ {Function? onError, void Function()? onDone, bool? cancelOnError}) {
cancelOnError = identical(true, cancelOnError);
var subscriptions = <StreamSubscription<T>>[];
- StreamController<List<T>> controller;
- List<T> current;
+ late StreamController<List<T>> controller;
+ late List<T?> current;
var dataCount = 0;
/// Called for each data from a subscription in [subscriptions].
@@ -31,8 +31,8 @@
current[index] = data;
dataCount++;
if (dataCount == subscriptions.length) {
- var data = current;
- current = List(subscriptions.length);
+ var data = List<T>.from(current);
+ current = List<T?>.filled(subscriptions.length, null);
dataCount = 0;
for (var i = 0; i < subscriptions.length; i++) {
if (i != index) subscriptions[i].resume();
@@ -85,7 +85,7 @@
rethrow;
}
- current = List(subscriptions.length);
+ current = List<T?>.filled(subscriptions.length, null);
controller = StreamController<List<T>>(onPause: () {
for (var i = 0; i < subscriptions.length; i++) {
diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart
index b428619..2c94e05 100644
--- a/lib/src/subscription_stream.dart
+++ b/lib/src/subscription_stream.dart
@@ -18,7 +18,7 @@
/// If other code is accessing the subscription, results may be unpredictable.
class SubscriptionStream<T> extends Stream<T> {
/// The subscription providing the events for this stream.
- StreamSubscription<T> _source;
+ StreamSubscription<T>? _source;
/// Create a single-subscription `Stream` from [subscription].
///
@@ -31,21 +31,22 @@
/// an error.
SubscriptionStream(StreamSubscription<T> subscription)
: _source = subscription {
- _source.pause();
+ var source = _source!;
+ source.pause();
// Clear callbacks to avoid keeping them alive unnecessarily.
- _source.onData(null);
- _source.onError(null);
- _source.onDone(null);
+ source.onData(null);
+ source.onError(null);
+ source.onDone(null);
}
@override
- StreamSubscription<T> listen(void Function(T) onData,
- {Function onError, void Function() onDone, bool cancelOnError}) {
- if (_source == null) {
+ StreamSubscription<T> listen(void Function(T)? onData,
+ {Function? onError, void Function()? onDone, bool? cancelOnError}) {
+ var subscription = _source;
+ if (subscription == null) {
throw StateError('Stream has already been listened to.');
}
cancelOnError = (true == cancelOnError);
- var subscription = _source;
_source = null;
var result = cancelOnError
@@ -71,26 +72,17 @@
: super(subscription);
@override
- void onError(Function handleError) {
+ void onError(Function? handleError) {
// Cancel when receiving an error.
super.onError((error, StackTrace stackTrace) {
- var cancelFuture = super.cancel();
- if (cancelFuture != null) {
- // Wait for the cancel to complete before sending the error event.
- cancelFuture.whenComplete(() {
- if (handleError is ZoneBinaryCallback) {
- handleError(error, stackTrace);
- } else {
- handleError(error);
- }
- });
- } else {
+ // Wait for the cancel to complete before sending the error event.
+ super.cancel().whenComplete(() {
if (handleError is ZoneBinaryCallback) {
handleError(error, stackTrace);
- } else {
+ } else if (handleError != null) {
handleError(error);
}
- }
+ });
});
}
}
diff --git a/lib/src/typed/stream_subscription.dart b/lib/src/typed/stream_subscription.dart
index d85b776..fe91656 100644
--- a/lib/src/typed/stream_subscription.dart
+++ b/lib/src/typed/stream_subscription.dart
@@ -13,22 +13,23 @@
TypeSafeStreamSubscription(this._subscription);
@override
- void onData(void Function(T) handleData) {
+ void onData(void Function(T)? handleData) {
+ if (handleData == null) return _subscription.onData(null);
_subscription.onData((data) => handleData(data as T));
}
@override
- void onError(Function handleError) {
+ void onError(Function? handleError) {
_subscription.onError(handleError);
}
@override
- void onDone(void Function() handleDone) {
+ void onDone(void Function()? handleDone) {
_subscription.onDone(handleDone);
}
@override
- void pause([Future resumeFuture]) {
+ void pause([Future? resumeFuture]) {
_subscription.pause(resumeFuture);
}
@@ -41,5 +42,6 @@
Future cancel() => _subscription.cancel();
@override
- Future<E> asFuture<E>([E futureValue]) => _subscription.asFuture(futureValue);
+ Future<E> asFuture<E>([E? futureValue]) =>
+ _subscription.asFuture(futureValue);
}
diff --git a/pubspec.yaml b/pubspec.yaml
index 0cb71cf..bfdf20e 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,11 +1,11 @@
name: async
-version: 2.4.2
+version: 2.5.0-nullsafety
description: Utility functions and classes related to the 'dart:async' library.
homepage: https://www.github.com/dart-lang/async
environment:
- sdk: '>=2.2.0 <3.0.0'
+ sdk: '>=2.9.0-20.0.dev <2.9.0'
dependencies:
collection: ^1.5.0
@@ -15,3 +15,81 @@
stack_trace: ^1.0.0
test: ^1.0.0
pedantic: ^1.0.0
+
+dependency_overrides:
+ boolean_selector:
+ git:
+ url: git://github.com/dart-lang/boolean_selector.git
+ ref: null_safety
+ charcode:
+ git:
+ url: git://github.com/dart-lang/charcode.git
+ ref: null_safety
+ clock:
+ git:
+ url: git://github.com/dart-lang/clock.git
+ ref: null_safety
+ collection:
+ git:
+ url: git://github.com/dart-lang/collection.git
+ ref: null_safety
+ fake_async:
+ git:
+ url: git://github.com/dart-lang/fake_async.git
+ ref: null_safety
+ matcher:
+ git:
+ url: git://github.com/dart-lang/matcher.git
+ ref: null_safety
+ meta:
+ git:
+ url: git://github.com/dart-lang/sdk.git
+ ref: null_safety-pkgs
+ path: pkg/meta
+ path:
+ git:
+ url: git://github.com/dart-lang/path.git
+ ref: null_safety
+ pedantic:
+ git:
+ url: git://github.com/dart-lang/pedantic.git
+ ref: null_safety
+ pool:
+ git:
+ url: git://github.com/dart-lang/pool.git
+ ref: null_safety
+ source_span:
+ git:
+ url: git://github.com/dart-lang/source_span.git
+ ref: null_safety
+ stack_trace:
+ git:
+ url: git://github.com/dart-lang/stack_trace.git
+ ref: null_safety
+ stream_channel:
+ git:
+ url: git://github.com/dart-lang/stream_channel.git
+ ref: null_safety
+ string_scanner:
+ git:
+ url: git://github.com/dart-lang/string_scanner.git
+ ref: null_safety
+ term_glyph:
+ git:
+ url: git://github.com/dart-lang/term_glyph.git
+ ref: null_safety
+ test_api:
+ git:
+ url: git://github.com/dart-lang/test.git
+ ref: null_safety
+ path: pkgs/test_api
+ test_core:
+ git:
+ url: git://github.com/dart-lang/test.git
+ ref: null_safety
+ path: pkgs/test_core
+ test:
+ git:
+ url: git://github.com/dart-lang/test.git
+ ref: null_safety
+ path: pkgs/test
diff --git a/test/async_cache_test.dart b/test/async_cache_test.dart
index 4efbb69..81e43e1 100644
--- a/test/async_cache_test.dart
+++ b/test/async_cache_test.dart
@@ -9,7 +9,7 @@
import 'package:test/test.dart';
void main() {
- AsyncCache<String> cache;
+ late AsyncCache<String> cache;
setUp(() {
// Create a cache that is fresh for an hour.
@@ -22,7 +22,8 @@
test('should not fetch via callback when a cache exists', () async {
await cache.fetch(() async => 'Expensive');
- expect(await cache.fetch(expectAsync0(() => null, count: 0)), 'Expensive');
+ expect(await cache.fetch(expectAsync0(() async => 'fake', count: 0)),
+ 'Expensive');
});
test('should not fetch via callback when a future is in-flight', () async {
@@ -31,7 +32,7 @@
var completer = Completer<String>();
expect(cache.fetch(() => completer.future), completion('Expensive'));
- expect(cache.fetch(expectAsync0(() => null, count: 0)),
+ expect(cache.fetch(expectAsync0(() async => 'fake', count: 0)),
completion('Expensive'));
completer.complete('Expensive');
});
@@ -79,7 +80,10 @@
yield '2';
yield '3';
}).toList();
- expect(await cache.fetchStream(expectAsync0(() => null, count: 0)).toList(),
+ expect(
+ await cache
+ .fetchStream(expectAsync0(() => Stream.empty(), count: 0))
+ .toList(),
['1', '2', '3']);
});
@@ -148,7 +152,7 @@
test('should pause a cached stream without affecting others', () async {
Stream<String> call() => Stream.fromIterable(['1', '2', '3']);
- StreamSubscription sub;
+ late StreamSubscription sub;
sub = cache.fetchStream(call).listen(expectAsync1((event) {
if (event == '1') sub.pause();
}));
diff --git a/test/async_memoizer_test.dart b/test/async_memoizer_test.dart
index 982f7c9..490b389 100644
--- a/test/async_memoizer_test.dart
+++ b/test/async_memoizer_test.dart
@@ -6,7 +6,7 @@
import 'package:test/test.dart';
void main() {
- AsyncMemoizer cache;
+ late AsyncMemoizer cache;
setUp(() => cache = AsyncMemoizer());
test('runs the function only the first time runOnce() is called', () async {
diff --git a/test/cancelable_operation_test.dart b/test/cancelable_operation_test.dart
index b38c0b9..c87e43a 100644
--- a/test/cancelable_operation_test.dart
+++ b/test/cancelable_operation_test.dart
@@ -11,7 +11,7 @@
void main() {
group('without being canceled', () {
- CancelableCompleter completer;
+ late CancelableCompleter completer;
setUp(() {
completer = CancelableCompleter(onCancel: expectAsync0(() {}, count: 0));
});
@@ -116,7 +116,7 @@
test('fires onCancel', () {
var canceled = false;
- CancelableCompleter completer;
+ late CancelableCompleter completer;
completer = CancelableCompleter(onCancel: expectAsync0(() {
expect(completer.isCanceled, isTrue);
canceled = true;
@@ -243,23 +243,23 @@
});
group('then', () {
- FutureOr<String> Function(int) onValue;
- FutureOr<String> Function(Object, StackTrace) onError;
- FutureOr<String> Function() onCancel;
- bool propagateCancel;
- CancelableCompleter<int> originalCompleter;
+ FutureOr<String> Function(int)? onValue;
+ FutureOr<String> Function(Object, StackTrace)? onError;
+ FutureOr<String> Function()? onCancel;
+ late bool propagateCancel;
+ late CancelableCompleter<int> originalCompleter;
setUp(() {
// Initialize all functions to ones that expect to not be called.
- onValue = expectAsync1((_) => null, count: 0, id: 'onValue');
- onError = expectAsync2((e, s) => null, count: 0, id: 'onError');
- onCancel = expectAsync0(() => null, count: 0, id: 'onCancel');
+ onValue = expectAsync1((_) => 'Fake', count: 0, id: 'onValue');
+ onError = expectAsync2((e, s) => 'Fake', count: 0, id: 'onError');
+ onCancel = expectAsync0(() => 'Fake', count: 0, id: 'onCancel');
propagateCancel = false;
});
CancelableOperation<String> runThen() {
originalCompleter = CancelableCompleter();
- return originalCompleter.operation.then(onValue,
+ return originalCompleter.operation.then(onValue!,
onError: onError,
onCancel: onCancel,
propagateCancel: propagateCancel);
diff --git a/test/future_group_test.dart b/test/future_group_test.dart
index f99d06d..22e90f8 100644
--- a/test/future_group_test.dart
+++ b/test/future_group_test.dart
@@ -10,7 +10,7 @@
import 'utils.dart';
void main() {
- FutureGroup futureGroup;
+ late FutureGroup futureGroup;
setUp(() {
futureGroup = FutureGroup();
});
diff --git a/test/lazy_stream_test.dart b/test/lazy_stream_test.dart
index 2affe5b..32c6b14 100644
--- a/test/lazy_stream_test.dart
+++ b/test/lazy_stream_test.dart
@@ -10,10 +10,6 @@
import 'utils.dart';
void main() {
- test('disallows a null callback', () {
- expect(() => LazyStream(null), throwsArgumentError);
- });
-
test('calls the callback when the stream is listened', () async {
var callbackCalled = false;
var stream = LazyStream(expectAsync0(() {
@@ -96,7 +92,7 @@
});
test("a lazy stream can't be listened to from within its callback", () {
- LazyStream stream;
+ late LazyStream stream;
stream = LazyStream(expectAsync0(() {
expect(() => stream.listen(null), throwsStateError);
return Stream.empty();
diff --git a/test/result/result_captureAll_test.dart b/test/result/result_captureAll_test.dart
index 8e79872..b992e1f 100644
--- a/test/result/result_captureAll_test.dart
+++ b/test/result/result_captureAll_test.dart
@@ -14,7 +14,7 @@
/// Helper function creating an iterable of futures.
Iterable<Future<int>> futures(int count,
- {bool Function(int index) throwWhen}) sync* {
+ {bool Function(int index)? throwWhen}) sync* {
for (var i = 0; i < count; i++) {
if (throwWhen != null && throwWhen(i)) {
yield Future<int>.error('$i', someStack);
diff --git a/test/result/result_flattenAll_test.dart b/test/result/result_flattenAll_test.dart
index c0a8603..b87fec4 100644
--- a/test/result/result_flattenAll_test.dart
+++ b/test/result/result_flattenAll_test.dart
@@ -11,7 +11,7 @@
/// Helper function creating an iterable of results.
Iterable<Result<int>> results(int count,
- {bool Function(int index) throwWhen}) sync* {
+ {bool Function(int index)? throwWhen}) sync* {
for (var i = 0; i < count; i++) {
if (throwWhen != null && throwWhen(i)) {
yield err(i);
@@ -27,7 +27,7 @@
expect(result, expectation);
} else {
expect(result.isValue, true);
- expect(result.asValue.value, expectation.asValue.value);
+ expect(result.asValue!.value, expectation.asValue!.value);
}
}
diff --git a/test/result/result_future_test.dart b/test/result/result_future_test.dart
index 4218db2..7171005 100644
--- a/test/result/result_future_test.dart
+++ b/test/result/result_future_test.dart
@@ -9,8 +9,8 @@
import 'package:test/test.dart';
void main() {
- Completer completer;
- ResultFuture future;
+ late Completer completer;
+ late ResultFuture future;
setUp(() {
completer = Completer();
future = ResultFuture(completer.future);
@@ -25,7 +25,7 @@
// The completer calls its listeners asynchronously. We have to wait
// before we can access the result.
- expect(future.then((_) => future.result.asValue.value),
+ expect(future.then((_) => future.result!.asValue!.value),
completion(equals(12)));
});
@@ -36,7 +36,7 @@
// The completer calls its listeners asynchronously. We have to wait
// before we can access the result.
return future.catchError((_) {}).then((_) {
- var error = future.result.asError;
+ var error = future.result!.asError!;
expect(error.error, equals('error'));
expect(error.stackTrace, equals(trace));
});
diff --git a/test/result/result_test.dart b/test/result/result_test.dart
index adbc445..154785b 100644
--- a/test/result/result_test.dart
+++ b/test/result/result_test.dart
@@ -16,7 +16,7 @@
var result = Result<int>.value(42);
expect(result.isValue, isTrue);
expect(result.isError, isFalse);
- ValueResult value = result.asValue;
+ ValueResult value = result.asValue!;
expect(value.value, equals(42));
});
@@ -24,7 +24,7 @@
Result<int> result = ValueResult<int>(42);
expect(result.isValue, isTrue);
expect(result.isError, isFalse);
- var value = result.asValue;
+ var value = result.asValue!;
expect(value.value, equals(42));
});
@@ -32,7 +32,7 @@
var result = Result<bool>.error('BAD', stack);
expect(result.isValue, isFalse);
expect(result.isError, isTrue);
- var error = result.asError;
+ var error = result.asError!;
expect(error.error, equals('BAD'));
expect(error.stackTrace, same(stack));
});
@@ -50,9 +50,10 @@
var result = Result<bool>.error('BAD');
expect(result.isValue, isFalse);
expect(result.isError, isTrue);
- var error = result.asError;
+ var error = result.asError!;
expect(error.error, equals('BAD'));
- expect(error.stackTrace, isNull);
+ // A default stack trace is created
+ expect(error.stackTrace, isNotNull);
});
test('complete with value', () {
@@ -71,7 +72,7 @@
var c = Completer<bool>();
c.future.then((bool v) {
fail('Unexpected value $v');
- }, onError: expectAsync2((e, s) {
+ }).then<void>((_) {}, onError: expectAsync2((e, s) {
expect(e, equals('BAD'));
expect(s, same(stack));
}));
@@ -108,7 +109,7 @@
Result<bool> result = ErrorResult('BAD', stack);
result.asFuture.then((bool v) {
fail('Unexpected value $v');
- }, onError: expectAsync2((e, s) {
+ }).then<void>((_) {}, onError: expectAsync2((e, s) {
expect(e, equals('BAD'));
expect(s, same(stack));
}));
@@ -119,7 +120,7 @@
Result.capture(value).then(expectAsync1((Result result) {
expect(result.isValue, isTrue);
expect(result.isError, isFalse);
- var value = result.asValue;
+ var value = result.asValue!;
expect(value.value, equals(42));
}), onError: (e, s) {
fail('Unexpected error: $e');
@@ -131,7 +132,7 @@
Result.capture(value).then(expectAsync1((Result result) {
expect(result.isValue, isFalse);
expect(result.isError, isTrue);
- var error = result.asError;
+ var error = result.asError!;
expect(error.error, equals('BAD'));
expect(error.stackTrace, same(stack));
}), onError: (e, s) {
@@ -153,7 +154,7 @@
var future = Future<Result<bool>>.value(Result<bool>.error('BAD', stack));
Result.release(future).then((v) {
fail('Unexpected value: $v');
- }, onError: expectAsync2((e, s) {
+ }).then<void>((_) {}, onError: expectAsync2((e, s) {
expect(e, equals('BAD'));
expect(s, same(stack));
}));
@@ -164,7 +165,7 @@
var future = Future<Result<bool>>.error('BAD', stack);
Result.release(future).then((v) {
fail('Unexpected value: $v');
- }, onError: expectAsync2((e, s) {
+ }).then<void>((_) {}, onError: expectAsync2((e, s) {
expect(e, equals('BAD'));
expect(s, same(stack));
}));
@@ -203,15 +204,15 @@
expect(expectedList.isEmpty, isFalse);
Result expected = expectedList.removeFirst();
expect(expected.isValue, isTrue);
- expect(v, equals(expected.asValue.value));
+ expect(v, equals(expected.asValue!.value));
}
void errorListener(error, StackTrace stackTrace) {
expect(expectedList.isEmpty, isFalse);
Result expected = expectedList.removeFirst();
expect(expected.isError, isTrue);
- expect(error, equals(expected.asError.error));
- expect(stackTrace, same(expected.asError.stackTrace));
+ expect(error, equals(expected.asError!.error));
+ expect(stackTrace, same(expected.asError!.stackTrace));
}
stream.listen(expectAsync1(dataListener, count: 2),
@@ -311,10 +312,10 @@
expect(actual.isValue, equals(expected.isValue));
expect(actual.isError, equals(expected.isError));
if (actual.isValue) {
- expect(actual.asValue.value, equals(expected.asValue.value));
+ expect(actual.asValue!.value, equals(expected.asValue!.value));
} else {
- expect(actual.asError.error, equals(expected.asError.error));
- expect(actual.asError.stackTrace, same(expected.asError.stackTrace));
+ expect(actual.asError!.error, equals(expected.asError!.error));
+ expect(actual.asError!.stackTrace, same(expected.asError!.stackTrace));
}
}
@@ -334,8 +335,8 @@
}
@override
- void addError(error, [StackTrace stack]) {
- onError(error, stack);
+ void addError(error, [StackTrace? stack]) {
+ onError(error, stack ?? StackTrace.fromString(''));
}
@override
diff --git a/test/stream_completer_test.dart b/test/stream_completer_test.dart
index 0cd210a..ecb5621 100644
--- a/test/stream_completer_test.dart
+++ b/test/stream_completer_test.dart
@@ -77,7 +77,7 @@
var completer = StreamCompleter<int>();
var lastEvent = -1;
var controller = StreamController<int>();
- StreamSubscription subscription;
+ late StreamSubscription subscription;
subscription = completer.stream.listen((value) {
expect(value, lessThan(3));
lastEvent = value;
@@ -125,7 +125,7 @@
test("source stream isn't listened to until completer stream is", () async {
var completer = StreamCompleter();
- StreamController controller;
+ late StreamController controller;
controller = StreamController(onListen: () {
scheduleMicrotask(controller.close);
});
@@ -139,13 +139,13 @@
});
test('cancelOnError true when listening before linking stream', () async {
- var completer = StreamCompleter();
+ var completer = StreamCompleter<Object>();
Object lastEvent = -1;
- var controller = StreamController();
+ var controller = StreamController<Object>();
completer.stream.listen((value) {
expect(value, lessThan(3));
lastEvent = value;
- }, onError: (value) {
+ }, onError: (Object value) {
expect(value, '3');
lastEvent = value;
}, onDone: unreachable('done'), cancelOnError: true);
@@ -172,9 +172,9 @@
});
test('cancelOnError true when listening after linking stream', () async {
- var completer = StreamCompleter();
+ var completer = StreamCompleter<Object>();
Object lastEvent = -1;
- var controller = StreamController();
+ var controller = StreamController<Object>();
completer.setSourceStream(controller.stream);
controller.add(1);
expect(controller.hasListener, isFalse);
@@ -182,7 +182,7 @@
completer.stream.listen((value) {
expect(value, lessThan(3));
lastEvent = value;
- }, onError: (value) {
+ }, onError: (Object value) {
expect(value, '3');
lastEvent = value;
}, onDone: unreachable('done'), cancelOnError: true);
@@ -265,8 +265,8 @@
});
test('setting onData etc. before and after setting stream', () async {
- var completer = StreamCompleter();
- var controller = StreamController();
+ var completer = StreamCompleter<int>();
+ var controller = StreamController<int>();
var subscription = completer.stream.listen(null);
Object lastEvent = 0;
subscription.onData((value) => lastEvent = value);
diff --git a/test/stream_group_test.dart b/test/stream_group_test.dart
index 690fbc1..eadae19 100644
--- a/test/stream_group_test.dart
+++ b/test/stream_group_test.dart
@@ -9,7 +9,7 @@
void main() {
group('single-subscription', () {
- StreamGroup<String> streamGroup;
+ late StreamGroup<String> streamGroup;
setUp(() {
streamGroup = StreamGroup<String>();
});
@@ -236,7 +236,7 @@
StreamController<String>(onCancel: () => completer.future);
var fired = false;
- streamGroup.add(controller.stream).then((_) => fired = true);
+ streamGroup.add(controller.stream)!.then((_) => fired = true);
await flushMicrotasks();
expect(fired, isFalse);
@@ -249,7 +249,7 @@
});
group('broadcast', () {
- StreamGroup<String> streamGroup;
+ late StreamGroup<String> streamGroup;
setUp(() {
streamGroup = StreamGroup<String>.broadcast();
});
@@ -455,7 +455,7 @@
}
void regardlessOfType(StreamGroup<String> Function() newStreamGroup) {
- StreamGroup<String> streamGroup;
+ late StreamGroup<String> streamGroup;
setUp(() {
streamGroup = newStreamGroup();
});
@@ -608,7 +608,7 @@
await flushMicrotasks();
var fired = false;
- streamGroup.remove(controller.stream).then((_) => fired = true);
+ streamGroup.remove(controller.stream)!.then((_) => fired = true);
await flushMicrotasks();
expect(fired, isFalse);
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
index fa286a8..74c91e6 100644
--- a/test/stream_queue_test.dart
+++ b/test/stream_queue_test.dart
@@ -250,7 +250,7 @@
var skip4 = events.skip(1);
var index = 0;
// Check that futures complete in order.
- Func1Required<int> sequence(expectedValue, sequenceIndex) => (value) {
+ Func1Required<int?> sequence(expectedValue, sequenceIndex) => (value) {
expect(value, expectedValue);
expect(index, sequenceIndex);
index++;
@@ -610,7 +610,7 @@
var controller = StreamController<int>();
var events = StreamQueue<int>(controller.stream);
- bool hasNext;
+ bool? hasNext;
events.hasNext.then((result) {
hasNext = result;
});
@@ -626,7 +626,7 @@
var controller = StreamController<int>();
var events = StreamQueue<int>(controller.stream);
- bool hasNext;
+ bool? hasNext;
events.hasNext.then((result) {
hasNext = result;
});
@@ -772,10 +772,10 @@
});
group('startTransaction operation produces a transaction that', () {
- StreamQueue<int> events;
- StreamQueueTransaction<int> transaction;
- StreamQueue<int> queue1;
- StreamQueue<int> queue2;
+ late StreamQueue<int> events;
+ late StreamQueueTransaction<int> transaction;
+ late StreamQueue<int> queue1;
+ late StreamQueue<int> queue2;
setUp(() async {
events = StreamQueue(createStream());
expect(await events.next, 1);
@@ -1004,7 +1004,7 @@
});
group('withTransaction operation', () {
- StreamQueue<int> events;
+ late StreamQueue<int> events;
setUp(() async {
events = StreamQueue(createStream());
expect(await events.next, 1);
@@ -1058,7 +1058,7 @@
});
group('cancelable operation', () {
- StreamQueue<int> events;
+ late StreamQueue<int> events;
setUp(() async {
events = StreamQueue(createStream());
expect(await events.next, 1);
diff --git a/test/stream_sink_completer_test.dart b/test/stream_sink_completer_test.dart
index 69d1d8a..591f32f 100644
--- a/test/stream_sink_completer_test.dart
+++ b/test/stream_sink_completer_test.dart
@@ -10,7 +10,7 @@
import 'utils.dart';
void main() {
- StreamSinkCompleter completer;
+ late StreamSinkCompleter completer;
setUp(() {
completer = StreamSinkCompleter();
});
@@ -21,10 +21,10 @@
completer.setDestinationSink(sink);
completer.sink..add(1)..add(2)..add(3)..add(4);
- expect(sink.results[0].asValue.value, equals(1));
- expect(sink.results[1].asValue.value, equals(2));
- expect(sink.results[2].asValue.value, equals(3));
- expect(sink.results[3].asValue.value, equals(4));
+ expect(sink.results[0].asValue!.value, equals(1));
+ expect(sink.results[1].asValue!.value, equals(2));
+ expect(sink.results[2].asValue!.value, equals(3));
+ expect(sink.results[3].asValue!.value, equals(4));
});
test('error events are forwarded', () {
@@ -32,8 +32,8 @@
completer.setDestinationSink(sink);
completer.sink..addError('oh no')..addError("that's bad");
- expect(sink.results[0].asError.error, equals('oh no'));
- expect(sink.results[1].asError.error, equals("that's bad"));
+ expect(sink.results[0].asError!.error, equals('oh no'));
+ expect(sink.results[1].asError!.error, equals("that's bad"));
});
test('addStream is forwarded', () async {
@@ -49,10 +49,10 @@
controller.addError("that's bad");
await flushMicrotasks();
- expect(sink.results[0].asValue.value, equals(1));
- expect(sink.results[1].asError.error, equals('oh no'));
- expect(sink.results[2].asValue.value, equals(2));
- expect(sink.results[3].asError.error, equals("that's bad"));
+ expect(sink.results[0].asValue!.value, equals(1));
+ expect(sink.results[1].asError!.error, equals('oh no'));
+ expect(sink.results[2].asValue!.value, equals(2));
+ expect(sink.results[3].asError!.error, equals("that's bad"));
expect(sink.isClosed, isFalse);
controller.close();
@@ -121,10 +121,10 @@
completer.setDestinationSink(sink);
await flushMicrotasks();
- expect(sink.results[0].asValue.value, equals(1));
- expect(sink.results[1].asValue.value, equals(2));
- expect(sink.results[2].asValue.value, equals(3));
- expect(sink.results[3].asValue.value, equals(4));
+ expect(sink.results[0].asValue!.value, equals(1));
+ expect(sink.results[1].asValue!.value, equals(2));
+ expect(sink.results[2].asValue!.value, equals(3));
+ expect(sink.results[3].asValue!.value, equals(4));
});
test('error events are forwarded', () async {
@@ -135,8 +135,8 @@
completer.setDestinationSink(sink);
await flushMicrotasks();
- expect(sink.results[0].asError.error, equals('oh no'));
- expect(sink.results[1].asError.error, equals("that's bad"));
+ expect(sink.results[0].asError!.error, equals('oh no'));
+ expect(sink.results[1].asError!.error, equals("that's bad"));
});
test('addStream is forwarded', () async {
@@ -154,10 +154,10 @@
completer.setDestinationSink(sink);
await flushMicrotasks();
- expect(sink.results[0].asValue.value, equals(1));
- expect(sink.results[1].asError.error, equals('oh no'));
- expect(sink.results[2].asValue.value, equals(2));
- expect(sink.results[3].asError.error, equals("that's bad"));
+ expect(sink.results[0].asValue!.value, equals(1));
+ expect(sink.results[1].asError!.error, equals('oh no'));
+ expect(sink.results[2].asValue!.value, equals(2));
+ expect(sink.results[3].asError!.error, equals("that's bad"));
expect(sink.isClosed, isFalse);
});
@@ -258,9 +258,9 @@
futureCompleter.complete(testSink);
await testSink.done;
- expect(testSink.results[0].asValue.value, equals(1));
- expect(testSink.results[1].asValue.value, equals(2));
- expect(testSink.results[2].asValue.value, equals(3));
+ expect(testSink.results[0].asValue!.value, equals(1));
+ expect(testSink.results[1].asValue!.value, equals(2));
+ expect(testSink.results[2].asValue!.value, equals(3));
});
test('with an error', () async {
diff --git a/test/stream_sink_transformer_test.dart b/test/stream_sink_transformer_test.dart
index 8493971..e5f6baa 100644
--- a/test/stream_sink_transformer_test.dart
+++ b/test/stream_sink_transformer_test.dart
@@ -10,7 +10,7 @@
import 'utils.dart';
void main() {
- StreamController controller;
+ late StreamController controller;
setUp(() {
controller = StreamController();
});
@@ -18,7 +18,7 @@
group('fromStreamTransformer', () {
test('transforms data events', () {
var transformer = StreamSinkTransformer.fromStreamTransformer(
- StreamTransformer.fromHandlers(handleData: (i, sink) {
+ StreamTransformer.fromHandlers(handleData: (int i, sink) {
sink.add(i * 2);
}));
var sink = transformer.bind(controller.sink);
@@ -117,7 +117,7 @@
group('fromHandlers', () {
test('transforms data events', () {
var transformer =
- StreamSinkTransformer.fromHandlers(handleData: (i, sink) {
+ StreamSinkTransformer.fromHandlers(handleData: (int i, sink) {
sink.add(i * 2);
});
var sink = transformer.bind(controller.sink);
diff --git a/test/stream_splitter_test.dart b/test/stream_splitter_test.dart
index 3748c5c..3493193 100644
--- a/test/stream_splitter_test.dart
+++ b/test/stream_splitter_test.dart
@@ -8,8 +8,8 @@
import 'package:test/test.dart';
void main() {
- StreamController<int> controller;
- StreamSplitter splitter;
+ late StreamController<int> controller;
+ late StreamSplitter splitter;
setUp(() {
controller = StreamController<int>();
splitter = StreamSplitter<int>(controller.stream);
diff --git a/test/stream_zip_test.dart b/test/stream_zip_test.dart
index 19c3dec..84948bf 100644
--- a/test/stream_zip_test.dart
+++ b/test/stream_zip_test.dart
@@ -9,7 +9,7 @@
/// Create an error with the same values as [base], except that it throwsA
/// when seeing the value [errorValue].
-Stream streamError(Stream base, int errorValue, error) {
+Stream streamError(Stream base, int errorValue, Object error) {
return base.map((x) => (x == errorValue) ? throw error : x);
}
@@ -306,7 +306,7 @@
var s2 = Stream.fromIterable([1, 3, 5, 7]);
var sz = StreamZip([s1, s2]);
var ctr = 0;
- StreamSubscription sub;
+ late StreamSubscription sub;
sub = sz.listen(expectAsync1((v) {
expect(v, equals([ctr * 2, ctr * 2 + 1]));
if (ctr == 1) {
diff --git a/test/stream_zip_zone_test.dart b/test/stream_zip_zone_test.dart
index 68cd723..50af6b6 100644
--- a/test/stream_zip_zone_test.dart
+++ b/test/stream_zip_zone_test.dart
@@ -33,7 +33,7 @@
var outer = Zone.current;
runZoned(() {
var newZone1 = Zone.current;
- StreamSubscription sub;
+ late StreamSubscription sub;
sub = stream.listen(expectAsync1((v) {
expect(v, 42);
expect(Zone.current, newZone1);
diff --git a/test/subscription_stream_test.dart b/test/subscription_stream_test.dart
index 98a809f..8be0572 100644
--- a/test/subscription_stream_test.dart
+++ b/test/subscription_stream_test.dart
@@ -22,7 +22,7 @@
var stream = createStream();
var skips = 0;
var completer = Completer();
- StreamSubscription<int> subscription;
+ late StreamSubscription<int> subscription;
subscription = stream.listen((value) {
++skips;
expect(value, skips);
@@ -72,8 +72,9 @@
group('cancelOnError source:', () {
for (var sourceCancels in [false, true]) {
group('${sourceCancels ? "yes" : "no"}:', () {
- SubscriptionStream subscriptionStream;
- Future onCancel; // Completes if source stream is canceled before done.
+ late SubscriptionStream subscriptionStream;
+ late Future
+ onCancel; // Completes if source stream is canceled before done.
setUp(() {
var cancelCompleter = Completer();
var source = createErrorStream(cancelCompleter);
@@ -159,7 +160,7 @@
yield 4;
}
-Stream<int> createErrorStream([Completer onCancel]) async* {
+Stream<int> createErrorStream([Completer? onCancel]) async* {
var canceled = true;
try {
yield 1;
diff --git a/test/subscription_transformer_test.dart b/test/subscription_transformer_test.dart
index f0e3a70..8278ea0 100644
--- a/test/subscription_transformer_test.dart
+++ b/test/subscription_transformer_test.dart
@@ -90,7 +90,7 @@
subscriptionTransformer(handleCancel: expectAsync1((inner) {
callbackInvoked = true;
inner.cancel();
- return null;
+ return Future.value();
}))).listen(expectAsync1((_) {}, count: 0));
await flushMicrotasks();
@@ -248,11 +248,12 @@
});
group('when the outer subscription is canceled but the inner is not', () {
- StreamSubscription subscription;
+ late StreamSubscription subscription;
setUp(() {
var controller = StreamController();
subscription = controller.stream
- .transform(subscriptionTransformer(handleCancel: (_) => null))
+ .transform(
+ subscriptionTransformer(handleCancel: (_) => Future.value()))
.listen(expectAsync1((_) {}, count: 0),
onError: expectAsync2((_, __) {}, count: 0),
onDone: expectAsync0(() {}, count: 0));
diff --git a/test/typed_wrapper/stream_subscription_test.dart b/test/typed_wrapper/stream_subscription_test.dart
index 2a11546..33d28a8 100644
--- a/test/typed_wrapper/stream_subscription_test.dart
+++ b/test/typed_wrapper/stream_subscription_test.dart
@@ -11,10 +11,11 @@
void main() {
group('with valid types, forwards', () {
- StreamController controller;
- StreamSubscription wrapper;
- bool isCanceled;
+ late StreamController controller;
+ late StreamSubscription wrapper;
+ late bool isCanceled;
setUp(() {
+ isCanceled = false;
controller = StreamController<Object>(onCancel: () {
isCanceled = true;
});
@@ -67,10 +68,11 @@
});
group('with invalid types,', () {
- StreamController controller;
- StreamSubscription wrapper;
- bool isCanceled;
+ late StreamController controller;
+ late StreamSubscription wrapper;
+ late bool isCanceled;
setUp(() {
+ isCanceled = false;
controller = StreamController<Object>(onCancel: () {
isCanceled = true;
});
diff --git a/test/utils.dart b/test/utils.dart
index a99fe9b..4cc743e 100644
--- a/test/utils.dart
+++ b/test/utils.dart
@@ -25,7 +25,7 @@
Matcher throwsZoned(matcher) => predicate((void Function() callback) {
var firstError = true;
runZoned(callback,
- onError: expectAsync2((error, StackTrace stackTrace) {
+ onError: expectAsync2((Object error, StackTrace stackTrace) {
if (firstError) {
expect(error, matcher);
firstError = false;
@@ -37,11 +37,11 @@
});
/// A matcher that runs a callback in its own zone and asserts that that zone
-/// emits a [CastError].
-final throwsZonedCastError = throwsZoned(TypeMatcher<CastError>());
+/// emits a [TypeError].
+final throwsZonedCastError = throwsZoned(TypeMatcher<TypeError>());
-/// A matcher that matches a callback or future that throws a [CastError].
-final throwsCastError = throwsA(TypeMatcher<CastError>());
+/// A matcher that matches a callback or future that throws a [TypeError].
+final throwsCastError = throwsA(TypeMatcher<TypeError>());
/// A badly behaved stream which throws if it's ever listened to.
///
@@ -67,7 +67,7 @@
@override
void add(T event) {}
@override
- void addError(error, [StackTrace stackTrace]) {}
+ void addError(error, [StackTrace? stackTrace]) {}
@override
Future addStream(Stream<T> stream) async {}
@override
@@ -95,7 +95,7 @@
///
/// If [onDone] is passed, it's called when the user calls [close]. Its result
/// is piped to the [done] future.
- TestSink({void Function() onDone}) : _onDone = onDone ?? (() {});
+ TestSink({void Function()? onDone}) : _onDone = onDone ?? (() {});
@override
void add(T event) {
@@ -103,7 +103,7 @@
}
@override
- void addError(error, [StackTrace stackTrace]) {
+ void addError(error, [StackTrace? stackTrace]) {
results.add(Result<T>.error(error, stackTrace));
}