Migrate to null safety (dart-lang/stream_transform#119)
- Bump SDK constraint.
- Bump to null safe dependencies.
- Migrate code, examples, and tests.
- Add an opt-out test for testing the behavior of transforming streams
with the `subscription.cancel()` call returns null despite a
statically non-nullable return. This requires an unusual pattern in
the code.
diff --git a/pkgs/stream_transform/.travis.yml b/pkgs/stream_transform/.travis.yml
index 24eb0d7..3b2eb08 100644
--- a/pkgs/stream_transform/.travis.yml
+++ b/pkgs/stream_transform/.travis.yml
@@ -3,7 +3,6 @@
only: [master]
dart:
- dev
- - 2.7.0
cache:
directories:
- $HOME/.pub-cache
@@ -18,6 +17,3 @@
- dart: dev
dart_task:
dartanalyzer: --fatal-warnings --fatal-infos .
- - dart: 2.7.0
- dart_task:
- dartanalyzer: --fatal-warnings .
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 268bf35..68f8859 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,5 +1,6 @@
-## 1.2.1-dev
+## 2.0.0-nullsafety.0-dev
+- Migrate to null safety.
- Improve tests of `switchMap` and improve documentation with links and
clarification.
diff --git a/pkgs/stream_transform/example/main.dart b/pkgs/stream_transform/example/main.dart
index 4ed53d8..70b3e7f 100644
--- a/pkgs/stream_transform/example/main.dart
+++ b/pkgs/stream_transform/example/main.dart
@@ -9,7 +9,7 @@
void main() {
var firstInput = document.querySelector('#first_input') as InputElement;
var secondInput = document.querySelector('#second_input') as InputElement;
- var output = document.querySelector('#output');
+ var output = document.querySelector('#output')!;
_inputValues(firstInput)
.combineLatest(_inputValues(secondInput),
@@ -21,6 +21,6 @@
});
}
-Stream<String> _inputValues(InputElement element) => element.onKeyUp
+Stream<String?> _inputValues(InputElement element) => element.onKeyUp
.debounce(const Duration(milliseconds: 100))
.map((_) => element.value);
diff --git a/pkgs/stream_transform/lib/src/aggregate_sample.dart b/pkgs/stream_transform/lib/src/aggregate_sample.dart
index 88fc490..3a967d4 100644
--- a/pkgs/stream_transform/lib/src/aggregate_sample.dart
+++ b/pkgs/stream_transform/lib/src/aggregate_sample.dart
@@ -15,7 +15,7 @@
/// the output.
class AggregateSample<S, T> extends StreamTransformerBase<S, T> {
final Stream<void> _trigger;
- final T Function(S, T) _aggregate;
+ final T Function(S, T?) _aggregate;
AggregateSample(this._trigger, this._aggregate);
@@ -25,15 +25,15 @@
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
- T currentResults;
+ T? currentResults;
var waitingForTrigger = true;
var isTriggerDone = false;
var isValueDone = false;
- StreamSubscription<S> valueSub;
- StreamSubscription<void> triggerSub;
+ StreamSubscription<S>? valueSub;
+ StreamSubscription<void>? triggerSub;
void emit() {
- controller.add(currentResults);
+ controller.add(currentResults!);
currentResults = null;
waitingForTrigger = true;
}
@@ -44,7 +44,7 @@
if (!waitingForTrigger) emit();
if (isTriggerDone) {
- valueSub.cancel();
+ valueSub!.cancel();
controller.close();
}
}
@@ -63,7 +63,7 @@
if (currentResults != null) emit();
if (isValueDone) {
- triggerSub.cancel();
+ triggerSub!.cancel();
controller.close();
}
}
@@ -80,8 +80,9 @@
assert(valueSub == null);
valueSub = values.listen(onValue,
onError: controller.addError, onDone: onValuesDone);
- if (triggerSub != null) {
- if (triggerSub.isPaused) triggerSub.resume();
+ final priorTriggerSub = triggerSub;
+ if (priorTriggerSub != null) {
+ if (priorTriggerSub.isPaused) priorTriggerSub.resume();
} else {
triggerSub = _trigger.listen(onTrigger,
onError: controller.addError, onDone: onTriggerDone);
@@ -98,17 +99,16 @@
};
}
controller.onCancel = () {
- var toCancel = <StreamSubscription<void>>[];
- if (!isValueDone) toCancel.add(valueSub);
+ var cancels = <Future<void>>[if (!isValueDone) valueSub!.cancel()];
valueSub = null;
if (_trigger.isBroadcast || !values.isBroadcast) {
- if (!isTriggerDone) toCancel.add(triggerSub);
+ if (!isTriggerDone) cancels.add(triggerSub!.cancel());
triggerSub = null;
} else {
- triggerSub.pause();
+ triggerSub!.pause();
}
- var cancels =
- toCancel.map((s) => s.cancel()).where((f) => f != null).toList();
+ // Handle opt-out nulls
+ cancels.removeWhere((Object? f) => f == null);
if (cancels.isEmpty) return null;
return Future.wait(cancels).then((_) => null);
};
diff --git a/pkgs/stream_transform/lib/src/async_map.dart b/pkgs/stream_transform/lib/src/async_map.dart
index c5b160b..7b406c7 100644
--- a/pkgs/stream_transform/lib/src/async_map.dart
+++ b/pkgs/stream_transform/lib/src/async_map.dart
@@ -117,13 +117,13 @@
/// work.
StreamTransformer<S, T> _asyncMapThen<S, T>(
Future<T> Function(S) convert, void Function(void) then) {
- Future<void> pendingEvent;
+ Future<void>? pendingEvent;
return fromHandlers(handleData: (event, sink) {
pendingEvent =
convert(event).then(sink.add).catchError(sink.addError).then(then);
}, handleDone: (sink) {
if (pendingEvent != null) {
- pendingEvent.then((_) => sink.close());
+ pendingEvent!.then((_) => sink.close());
} else {
sink.close();
}
diff --git a/pkgs/stream_transform/lib/src/combine_latest.dart b/pkgs/stream_transform/lib/src/combine_latest.dart
index 303b16a..6e268e3 100644
--- a/pkgs/stream_transform/lib/src/combine_latest.dart
+++ b/pkgs/stream_transform/lib/src/combine_latest.dart
@@ -92,14 +92,14 @@
? _other.asBroadcastStream()
: _other;
- StreamSubscription<S> sourceSubscription;
- StreamSubscription<T> otherSubscription;
+ StreamSubscription<S>? sourceSubscription;
+ StreamSubscription<T>? otherSubscription;
var sourceDone = false;
var otherDone = false;
- S latestSource;
- T latestOther;
+ late S latestSource;
+ late T latestOther;
var sourceStarted = false;
var otherStarted = false;
@@ -114,16 +114,16 @@
return;
}
if (result is Future<R>) {
- sourceSubscription.pause();
- otherSubscription.pause();
+ sourceSubscription!.pause();
+ otherSubscription!.pause();
result
.then(controller.add, onError: controller.addError)
.whenComplete(() {
- sourceSubscription.resume();
- otherSubscription.resume();
+ sourceSubscription!.resume();
+ otherSubscription!.resume();
});
} else {
- controller.add(result as R);
+ controller.add(result);
}
}
@@ -142,7 +142,7 @@
controller.close();
} else if (!sourceStarted) {
// Nothing can ever be emitted
- otherSubscription.cancel();
+ otherSubscription!.cancel();
controller.close();
}
});
@@ -159,24 +159,28 @@
controller.close();
} else if (!otherStarted) {
// Nothing can ever be emitted
- sourceSubscription.cancel();
+ sourceSubscription!.cancel();
controller.close();
}
});
if (!source.isBroadcast) {
controller
..onPause = () {
- sourceSubscription.pause();
- otherSubscription.pause();
+ sourceSubscription!.pause();
+ otherSubscription!.pause();
}
..onResume = () {
- sourceSubscription.resume();
- otherSubscription.resume();
+ sourceSubscription!.resume();
+ otherSubscription!.resume();
};
}
controller.onCancel = () {
- var cancels = [sourceSubscription.cancel(), otherSubscription.cancel()]
- .where((f) => f != null);
+ var cancels = [
+ sourceSubscription!.cancel(),
+ otherSubscription!.cancel()
+ ]
+ // Handle opt-out nulls
+ ..removeWhere((Object? f) => f == null);
sourceSubscription = null;
otherSubscription = null;
return Future.wait(cancels).then((_) => null);
@@ -208,7 +212,7 @@
controller.onListen = () {
final subscriptions = <StreamSubscription<T>>[];
- final latestData = List<T>(allStreams.length);
+ final latestData = List<T?>.filled(allStreams.length, null);
final hasEmitted = <int>{};
void handleData(int index, T data) {
latestData[index] = data;
@@ -249,10 +253,10 @@
};
}
controller.onCancel = () {
- var cancels = subscriptions
- .map((s) => s.cancel())
- .where((f) => f != null)
- .toList();
+ if (subscriptions.isEmpty) return null;
+ var cancels = [for (var s in subscriptions) s.cancel()]
+ // Handle opt-out nulls
+ ..removeWhere((Object? f) => f == null);
if (cancels.isEmpty) return null;
return Future.wait(cancels).then((_) => null);
};
diff --git a/pkgs/stream_transform/lib/src/concatenate.dart b/pkgs/stream_transform/lib/src/concatenate.dart
index 05c977f..402d8a0 100644
--- a/pkgs/stream_transform/lib/src/concatenate.dart
+++ b/pkgs/stream_transform/lib/src/concatenate.dart
@@ -68,12 +68,12 @@
? _next.asBroadcastStream()
: _next;
- StreamSubscription<T> subscription;
+ StreamSubscription<T>? subscription;
var currentStream = first;
var firstDone = false;
var secondDone = false;
- Function currentDoneHandler;
+ late void Function() currentDoneHandler;
void listen() {
subscription = currentStream.listen(controller.add,
@@ -100,18 +100,18 @@
if (!first.isBroadcast) {
controller
..onPause = () {
- if (!firstDone || !next.isBroadcast) return subscription.pause();
- subscription.cancel();
+ if (!firstDone || !next.isBroadcast) return subscription!.pause();
+ subscription!.cancel();
subscription = null;
}
..onResume = () {
- if (!firstDone || !next.isBroadcast) return subscription.resume();
+ if (!firstDone || !next.isBroadcast) return subscription!.resume();
listen();
};
}
controller.onCancel = () {
if (secondDone) return null;
- var toCancel = subscription;
+ var toCancel = subscription!;
subscription = null;
return toCancel.cancel();
};
diff --git a/pkgs/stream_transform/lib/src/from_handlers.dart b/pkgs/stream_transform/lib/src/from_handlers.dart
index 3e5689f..c7c9332 100644
--- a/pkgs/stream_transform/lib/src/from_handlers.dart
+++ b/pkgs/stream_transform/lib/src/from_handlers.dart
@@ -7,9 +7,9 @@
/// Like [new StreamTransformer.fromHandlers] but the handlers are called once
/// per event rather than once per listener for broadcast streams.
StreamTransformer<S, T> fromHandlers<S, T>(
- {void Function(S, EventSink<T>) handleData,
- void Function(Object, StackTrace, EventSink<T>) handleError,
- void Function(EventSink<T>) handleDone}) =>
+ {void Function(S, EventSink<T>)? handleData,
+ void Function(Object, StackTrace, EventSink<T>)? handleError,
+ void Function(EventSink<T>)? handleDone}) =>
_StreamTransformer(
handleData: handleData,
handleError: handleError,
@@ -21,9 +21,9 @@
final void Function(Object, StackTrace, EventSink<T>) _handleError;
_StreamTransformer(
- {void Function(S, EventSink<T>) handleData,
- void Function(Object, StackTrace, EventSink<T>) handleError,
- void Function(EventSink<T>) handleDone})
+ {void Function(S, EventSink<T>)? handleData,
+ void Function(Object, StackTrace, EventSink<T>)? handleError,
+ void Function(EventSink<T>)? handleDone})
: _handleData = handleData ?? _defaultHandleData,
_handleError = handleError ?? _defaultHandleError,
_handleDone = handleDone ?? _defaultHandleDone;
@@ -47,12 +47,12 @@
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
- StreamSubscription<S> subscription;
+ StreamSubscription<S>? subscription;
controller.onListen = () {
assert(subscription == null);
var valuesDone = false;
subscription = values.listen((value) => _handleData(value, controller),
- onError: (error, StackTrace stackTrace) {
+ onError: (Object error, StackTrace stackTrace) {
_handleError(error, stackTrace, controller);
}, onDone: () {
valuesDone = true;
@@ -60,13 +60,13 @@
});
if (!values.isBroadcast) {
controller
- ..onPause = subscription.pause
- ..onResume = subscription.resume;
+ ..onPause = subscription!.pause
+ ..onResume = subscription!.resume;
}
controller.onCancel = () {
var toCancel = subscription;
subscription = null;
- if (!valuesDone) return toCancel.cancel();
+ if (!valuesDone) return toCancel!.cancel();
return null;
};
};
diff --git a/pkgs/stream_transform/lib/src/merge.dart b/pkgs/stream_transform/lib/src/merge.dart
index c68695c..61b5e21 100644
--- a/pkgs/stream_transform/lib/src/merge.dart
+++ b/pkgs/stream_transform/lib/src/merge.dart
@@ -131,10 +131,10 @@
};
}
controller.onCancel = () {
- var cancels = subscriptions
- .map((s) => s.cancel())
- .where((f) => f != null)
- .toList();
+ if (subscriptions.isEmpty) return null;
+ var cancels = [for (var s in subscriptions) s.cancel()]
+ // Handle opt-out nulls
+ ..removeWhere((Object? f) => f == null);
if (cancels.isEmpty) return null;
return Future.wait(cancels).then((_) => null);
};
@@ -183,11 +183,10 @@
};
}
controller.onCancel = () {
- var cancels = subscriptions
- .map((s) => s.cancel())
- .where((f) => f != null)
- .toList();
- if (cancels.isEmpty) return null;
+ if (subscriptions.isEmpty) return null;
+ var cancels = [for (var s in subscriptions) s.cancel()]
+ // Handle opt-out nulls
+ ..removeWhere((Object? f) => f == null);
return Future.wait(cancels).then((_) => null);
};
};
diff --git a/pkgs/stream_transform/lib/src/rate_limit.dart b/pkgs/stream_transform/lib/src/rate_limit.dart
index e42f8f4..23dfbc5 100644
--- a/pkgs/stream_transform/lib/src/rate_limit.dart
+++ b/pkgs/stream_transform/lib/src/rate_limit.dart
@@ -85,7 +85,7 @@
/// Events emitted by the source stream within [duration] following an emitted
/// event will be discarded. Errors are always forwarded immediately.
Stream<T> throttle(Duration duration) {
- Timer timer;
+ Timer? timer;
return transform(fromHandlers(handleData: (data, sink) {
if (timer == null) {
@@ -125,7 +125,7 @@
/// source: a------b--c----d--|
/// output: -----a------c--------d|
Stream<T> audit(Duration duration) {
- Timer timer;
+ Timer? timer;
var shouldClose = false;
T recentData;
@@ -161,7 +161,7 @@
transform(AggregateSample<T, List<T>>(trigger, _collect));
}
-List<T> _collectToList<T>(T element, List<T> soFar) {
+List<T> _collectToList<T>(T element, List<T>? soFar) {
soFar ??= <T>[];
soFar.add(element);
return soFar;
@@ -172,10 +172,10 @@
/// Creates a StreamTransformer which aggregates values until the source stream
/// does not emit for [duration], then emits the aggregated values.
StreamTransformer<T, R> _debounceAggregate<T, R>(
- Duration duration, R Function(T element, R soFar) collect,
- {bool leading, bool trailing}) {
- Timer timer;
- R soFar;
+ Duration duration, R Function(T element, R? soFar) collect,
+ {required bool leading, required bool trailing}) {
+ Timer? timer;
+ R? soFar;
var shouldClose = false;
var emittedLatestAsLeading = false;
return fromHandlers(handleData: (T value, EventSink<R> sink) {
@@ -183,12 +183,12 @@
soFar = collect(value, soFar);
if (timer == null && leading) {
emittedLatestAsLeading = true;
- sink.add(soFar);
+ sink.add(soFar as R);
} else {
emittedLatestAsLeading = false;
}
timer = Timer(duration, () {
- if (trailing && !emittedLatestAsLeading) sink.add(soFar);
+ if (trailing && !emittedLatestAsLeading) sink.add(soFar as R);
if (shouldClose) {
sink.close();
}
@@ -205,4 +205,4 @@
});
}
-List<T> _collect<T>(T event, List<T> soFar) => (soFar ?? <T>[])..add(event);
+List<T> _collect<T>(T event, List<T>? soFar) => (soFar ?? <T>[])..add(event);
diff --git a/pkgs/stream_transform/lib/src/scan.dart b/pkgs/stream_transform/lib/src/scan.dart
index 4e022f5..d14381c 100644
--- a/pkgs/stream_transform/lib/src/scan.dart
+++ b/pkgs/stream_transform/lib/src/scan.dart
@@ -21,7 +21,7 @@
if (result is Future<S>) {
return result.then((r) => accumulated = r);
} else {
- return accumulated = result as S;
+ return accumulated = result;
}
});
}
diff --git a/pkgs/stream_transform/lib/src/switch.dart b/pkgs/stream_transform/lib/src/switch.dart
index dcd2431..df7f1b8 100644
--- a/pkgs/stream_transform/lib/src/switch.dart
+++ b/pkgs/stream_transform/lib/src/switch.dart
@@ -65,7 +65,7 @@
: StreamController<T>(sync: true);
controller.onListen = () {
- StreamSubscription<T> innerSubscription;
+ StreamSubscription<T>? innerSubscription;
var outerStreamDone = false;
final outerSubscription = outer.listen(
@@ -96,8 +96,10 @@
controller.onCancel = () {
var cancels = [
if (!outerStreamDone) outerSubscription.cancel(),
- if (innerSubscription != null) innerSubscription.cancel(),
- ].where((f) => f != null);
+ if (innerSubscription != null) innerSubscription!.cancel(),
+ ]
+ // Handle opt-out nulls
+ ..removeWhere((Object? f) => f == null);
if (cancels.isEmpty) return null;
return Future.wait(cancels).then((_) => null);
};
diff --git a/pkgs/stream_transform/lib/src/take_until.dart b/pkgs/stream_transform/lib/src/take_until.dart
index 43b35d1..5420500 100644
--- a/pkgs/stream_transform/lib/src/take_until.dart
+++ b/pkgs/stream_transform/lib/src/take_until.dart
@@ -27,7 +27,7 @@
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
- StreamSubscription<T> subscription;
+ StreamSubscription<T>? subscription;
var isDone = false;
_trigger.then((_) {
if (isDone) return;
@@ -46,12 +46,12 @@
});
if (!values.isBroadcast) {
controller
- ..onPause = subscription.pause
- ..onResume = subscription.resume;
+ ..onPause = subscription!.pause
+ ..onResume = subscription!.resume;
}
controller.onCancel = () {
if (isDone) return null;
- var toCancel = subscription;
+ var toCancel = subscription!;
subscription = null;
return toCancel.cancel();
};
diff --git a/pkgs/stream_transform/lib/src/tap.dart b/pkgs/stream_transform/lib/src/tap.dart
index 2696e02..b7e0321 100644
--- a/pkgs/stream_transform/lib/src/tap.dart
+++ b/pkgs/stream_transform/lib/src/tap.dart
@@ -22,9 +22,9 @@
///
/// The callbacks may not be called until the tapped stream has a listener,
/// and may not be called after the listener has canceled the subscription.
- Stream<T> tap(void Function(T) onValue,
- {void Function(Object, StackTrace) onError,
- void Function() onDone}) =>
+ Stream<T> tap(void Function(T)? onValue,
+ {void Function(Object, StackTrace)? onError,
+ void Function()? onDone}) =>
transform(fromHandlers(handleData: (value, sink) {
try {
onValue?.call(value);
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index 5da90b1..ba4bd51 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -1,12 +1,12 @@
name: stream_transform
description: A collection of utilities to transform and manipulate streams.
homepage: https://github.com/dart-lang/stream_transform
-version: 1.2.1-dev
+version: 2.0.0-nullsafety.0-dev
environment:
- sdk: ">=2.7.0 <3.0.0"
+ sdk: ">=2.12.0-0 <3.0.0"
dev_dependencies:
- async: ^2.0.0
- pedantic: ^1.5.0
- test: ^1.0.0
+ async: ^2.5.0-nullsafety
+ pedantic: ^1.10.0-nullsafety
+ test: ^1.16.0-nullsafety
diff --git a/pkgs/stream_transform/test/async_map_buffer_test.dart b/pkgs/stream_transform/test/async_map_buffer_test.dart
index 9b46e1f..1280b51 100644
--- a/pkgs/stream_transform/test/async_map_buffer_test.dart
+++ b/pkgs/stream_transform/test/async_map_buffer_test.dart
@@ -11,16 +11,16 @@
import 'utils.dart';
void main() {
- StreamController<int> values;
- List<String> emittedValues;
- bool valuesCanceled;
- bool isDone;
- List<String> errors;
- Stream<String> transformed;
- StreamSubscription<String> subscription;
+ late StreamController<int> values;
+ late List<String> emittedValues;
+ late bool valuesCanceled;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<String> transformed;
+ late StreamSubscription<String> subscription;
- Completer<String> finishWork;
- List<int> workArgument;
+ Completer<String>? finishWork;
+ List<int>? workArgument;
/// Represents the async `convert` function and asserts that is is only called
/// after the previous iteration has completed.
@@ -28,15 +28,15 @@
expect(finishWork, isNull,
reason: 'See $values befor previous work is complete');
workArgument = values;
- finishWork = Completer();
- finishWork.future.then((_) {
- workArgument = null;
- finishWork = null;
- }).catchError((_) {
- workArgument = null;
- finishWork = null;
- });
- return finishWork.future;
+ finishWork = Completer()
+ ..future.then((_) {
+ workArgument = null;
+ finishWork = null;
+ }).catchError((_) {
+ workArgument = null;
+ finishWork = null;
+ });
+ return finishWork!.future;
}
for (var streamType in streamTypes) {
@@ -64,7 +64,7 @@
await Future(() {});
expect(emittedValues, isEmpty);
expect(workArgument, [1]);
- finishWork.complete('result');
+ finishWork!.complete('result');
await Future(() {});
expect(emittedValues, ['result']);
});
@@ -74,7 +74,7 @@
await Future(() {});
values..add(2)..add(3);
await Future(() {});
- finishWork.complete();
+ finishWork!.complete('');
await Future(() {});
expect(workArgument, [2, 3]);
});
@@ -90,7 +90,7 @@
test('forwards errors which occur during the work', () async {
values.add(1);
await Future(() {});
- finishWork.completeError('error');
+ finishWork!.completeError('error');
await Future(() {});
expect(errors, ['error']);
});
@@ -98,11 +98,11 @@
test('can continue handling events after an error', () async {
values.add(1);
await Future(() {});
- finishWork.completeError('error');
+ finishWork!.completeError('error');
values.add(2);
await Future(() {});
expect(workArgument, [2]);
- finishWork.completeError('another');
+ finishWork!.completeError('another');
await Future(() {});
expect(errors, ['error', 'another']);
});
@@ -140,11 +140,11 @@
values.add(2);
await values.close();
expect(isDone, false);
- finishWork.complete(null);
+ finishWork!.complete('');
await Future(() {});
// Still a pending value
expect(isDone, false);
- finishWork.complete(null);
+ finishWork!.complete('');
await Future(() {});
expect(isDone, true);
});
@@ -161,7 +161,7 @@
transformed.listen(otherValues.add);
values.add(1);
await Future(() {});
- finishWork.complete('result');
+ finishWork!.complete('result');
await Future(() {});
expect(emittedValues, ['result']);
expect(otherValues, ['result']);
@@ -175,7 +175,7 @@
await values.close();
expect(isDone, false);
expect(otherDone, false);
- finishWork.complete();
+ finishWork!.complete('');
await Future(() {});
expect(isDone, true);
expect(otherDone, true);
@@ -184,7 +184,7 @@
test('can cancel and relisten', () async {
values.add(1);
await Future(() {});
- finishWork.complete('first');
+ finishWork!.complete('first');
await Future(() {});
await subscription.cancel();
values.add(2);
@@ -193,7 +193,7 @@
values.add(3);
await Future(() {});
expect(workArgument, [3]);
- finishWork.complete('second');
+ finishWork!.complete('second');
await Future(() {});
expect(emittedValues, ['first', 'second']);
});
diff --git a/pkgs/stream_transform/test/async_map_sample_test.dart b/pkgs/stream_transform/test/async_map_sample_test.dart
index 9c37cd4..06457d8 100644
--- a/pkgs/stream_transform/test/async_map_sample_test.dart
+++ b/pkgs/stream_transform/test/async_map_sample_test.dart
@@ -11,16 +11,16 @@
import 'utils.dart';
void main() {
- StreamController<int> values;
- List<String> emittedValues;
- bool valuesCanceled;
- bool isDone;
- List<String> errors;
- Stream<String> transformed;
- StreamSubscription<String> subscription;
+ late StreamController<int> values;
+ late List<String> emittedValues;
+ late bool valuesCanceled;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<String> transformed;
+ late StreamSubscription<String> subscription;
- Completer<String> finishWork;
- int workArgument;
+ Completer<String>? finishWork;
+ int? workArgument;
/// Represents the async `convert` function and asserts that is is only called
/// after the previous iteration has completed.
@@ -28,15 +28,15 @@
expect(finishWork, isNull,
reason: 'See $values befor previous work is complete');
workArgument = value;
- finishWork = Completer();
- finishWork.future.then((_) {
- workArgument = null;
- finishWork = null;
- }).catchError((_) {
- workArgument = null;
- finishWork = null;
- });
- return finishWork.future;
+ finishWork = Completer()
+ ..future.then((_) {
+ workArgument = null;
+ finishWork = null;
+ }).catchError((_) {
+ workArgument = null;
+ finishWork = null;
+ });
+ return finishWork!.future;
}
for (var streamType in streamTypes) {
@@ -64,7 +64,7 @@
await Future(() {});
expect(emittedValues, isEmpty);
expect(workArgument, 1);
- finishWork.complete('result');
+ finishWork!.complete('result');
await Future(() {});
expect(emittedValues, ['result']);
});
@@ -74,7 +74,7 @@
await Future(() {});
values..add(2)..add(3);
await Future(() {});
- finishWork.complete();
+ finishWork!.complete('');
await Future(() {});
expect(workArgument, 3);
});
@@ -90,7 +90,7 @@
test('forwards errors which occur during the work', () async {
values.add(1);
await Future(() {});
- finishWork.completeError('error');
+ finishWork!.completeError('error');
await Future(() {});
expect(errors, ['error']);
});
@@ -98,11 +98,11 @@
test('can continue handling events after an error', () async {
values.add(1);
await Future(() {});
- finishWork.completeError('error');
+ finishWork!.completeError('error');
values.add(2);
await Future(() {});
expect(workArgument, 2);
- finishWork.completeError('another');
+ finishWork!.completeError('another');
await Future(() {});
expect(errors, ['error', 'another']);
});
@@ -140,11 +140,11 @@
values.add(2);
await values.close();
expect(isDone, false);
- finishWork.complete(null);
+ finishWork!.complete('');
await Future(() {});
// Still a pending value
expect(isDone, false);
- finishWork.complete(null);
+ finishWork!.complete('');
await Future(() {});
expect(isDone, true);
});
@@ -161,7 +161,7 @@
transformed.listen(otherValues.add);
values.add(1);
await Future(() {});
- finishWork.complete('result');
+ finishWork!.complete('result');
await Future(() {});
expect(emittedValues, ['result']);
expect(otherValues, ['result']);
@@ -175,7 +175,7 @@
await values.close();
expect(isDone, false);
expect(otherDone, false);
- finishWork.complete();
+ finishWork!.complete('');
await Future(() {});
expect(isDone, true);
expect(otherDone, true);
@@ -184,7 +184,7 @@
test('can cancel and relisten', () async {
values.add(1);
await Future(() {});
- finishWork.complete('first');
+ finishWork!.complete('first');
await Future(() {});
await subscription.cancel();
values.add(2);
@@ -193,20 +193,11 @@
values.add(3);
await Future(() {});
expect(workArgument, 3);
- finishWork.complete('second');
+ finishWork!.complete('second');
await Future(() {});
expect(emittedValues, ['first', 'second']);
});
}
});
}
- test('handles null response from cancel', () async {
- var controller = StreamController<int>();
-
- var subscription = NullOnCancelStream(controller.stream)
- .asyncMapSample((_) async {})
- .listen(null);
-
- await subscription.cancel();
- });
}
diff --git a/pkgs/stream_transform/test/audit_test.dart b/pkgs/stream_transform/test/audit_test.dart
index 58cb95b..20c7d9f 100644
--- a/pkgs/stream_transform/test/audit_test.dart
+++ b/pkgs/stream_transform/test/audit_test.dart
@@ -12,13 +12,13 @@
void main() {
for (var streamType in streamTypes) {
group('Stream type [$streamType]', () {
- StreamController<int> values;
- List<int> emittedValues;
- bool valuesCanceled;
- bool isDone;
- List<String> errors;
- Stream<int> transformed;
- StreamSubscription<int> subscription;
+ late StreamController<int> values;
+ late List<int> emittedValues;
+ late bool valuesCanceled;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<int> transformed;
+ late StreamSubscription<int> subscription;
group('audit', () {
setUp(() async {
diff --git a/pkgs/stream_transform/test/buffer_test.dart b/pkgs/stream_transform/test/buffer_test.dart
index 80a793c..b528fa2 100644
--- a/pkgs/stream_transform/test/buffer_test.dart
+++ b/pkgs/stream_transform/test/buffer_test.dart
@@ -10,16 +10,16 @@
import 'utils.dart';
void main() {
- StreamController<void> trigger;
- StreamController<int> values;
- List<List<int>> emittedValues;
- bool valuesCanceled;
- bool triggerCanceled;
- bool triggerPaused;
- bool isDone;
- List<String> errors;
- Stream<List<int>> transformed;
- StreamSubscription<List<int>> subscription;
+ late StreamController<void> trigger;
+ late StreamController<int> values;
+ late List<List<int>> emittedValues;
+ late bool valuesCanceled;
+ late bool triggerCanceled;
+ late bool triggerPaused;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<List<int>> transformed;
+ late StreamSubscription<List<int>> subscription;
void setUpForStreamTypes(String triggerType, String valuesType) {
valuesCanceled = false;
@@ -243,13 +243,4 @@
]);
});
}
-
- test('handles null response from cancel', () async {
- var controller = StreamController<int>();
- var trigger = StreamController<void>();
- var subscription = NullOnCancelStream(controller.stream)
- .buffer(trigger.stream)
- .listen(null);
- await subscription.cancel();
- });
}
diff --git a/pkgs/stream_transform/test/combine_latest_all_test.dart b/pkgs/stream_transform/test/combine_latest_all_test.dart
index d57d618..2943449 100644
--- a/pkgs/stream_transform/test/combine_latest_all_test.dart
+++ b/pkgs/stream_transform/test/combine_latest_all_test.dart
@@ -8,8 +8,6 @@
import 'package:stream_transform/stream_transform.dart';
-import 'utils.dart';
-
Future<void> tick() => Future(() {});
void main() {
@@ -166,14 +164,4 @@
});
});
});
-
- test('handles null response from cancel', () async {
- var source = StreamController<int>();
- var other = StreamController<int>();
-
- var subscription = NullOnCancelStream(source.stream)
- .combineLatestAll([NullOnCancelStream(other.stream)]).listen(null);
-
- await subscription.cancel();
- });
}
diff --git a/pkgs/stream_transform/test/combine_latest_test.dart b/pkgs/stream_transform/test/combine_latest_test.dart
index 4ea9499..2b35e2c 100644
--- a/pkgs/stream_transform/test/combine_latest_test.dart
+++ b/pkgs/stream_transform/test/combine_latest_test.dart
@@ -9,8 +9,6 @@
import 'package:stream_transform/stream_transform.dart';
-import 'utils.dart';
-
void main() {
group('combineLatest', () {
test('flows through combine callback', () async {
@@ -170,17 +168,6 @@
});
});
});
-
- test('handles null response from cancel', () async {
- var source = StreamController<int>();
- var other = StreamController<int>();
-
- var subscription = NullOnCancelStream(source.stream)
- .combineLatest(NullOnCancelStream(other.stream), (a, b) => null)
- .listen(null);
-
- await subscription.cancel();
- });
}
class _NumberedException implements Exception {
diff --git a/pkgs/stream_transform/test/concurrent_async_expand_test.dart b/pkgs/stream_transform/test/concurrent_async_expand_test.dart
index 9ed02d0..d0b4ad4 100644
--- a/pkgs/stream_transform/test/concurrent_async_expand_test.dart
+++ b/pkgs/stream_transform/test/concurrent_async_expand_test.dart
@@ -24,15 +24,15 @@
for (var outerType in streamTypes) {
for (var innerType in streamTypes) {
group('concurrentAsyncExpand $outerType to $innerType', () {
- StreamController<int> outerController;
- bool outerCanceled;
- List<StreamController<String>> innerControllers;
- List<bool> innerCanceled;
- List<String> emittedValues;
- bool isDone;
- List<String> errors;
- Stream<String> transformed;
- StreamSubscription<String> subscription;
+ late StreamController<int> outerController;
+ late bool outerCanceled;
+ late List<StreamController<String>> innerControllers;
+ late List<bool> innerCanceled;
+ late List<String> emittedValues;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<String> transformed;
+ late StreamSubscription<String> subscription;
setUp(() {
outerController = createController(outerType)
@@ -184,19 +184,4 @@
});
}
}
-
- test('hendles null response from cancel', () async {
- var source = StreamController<int>();
- var other = StreamController<int>();
-
- var subscription = NullOnCancelStream(source.stream)
- .concurrentAsyncExpand((_) => NullOnCancelStream(other.stream))
- .listen(null);
-
- source.add(1);
-
- await Future<void>(() {});
-
- await subscription.cancel();
- });
}
diff --git a/pkgs/stream_transform/test/concurrent_async_map_test.dart b/pkgs/stream_transform/test/concurrent_async_map_test.dart
index 6da2101..a894e7f 100644
--- a/pkgs/stream_transform/test/concurrent_async_map_test.dart
+++ b/pkgs/stream_transform/test/concurrent_async_map_test.dart
@@ -11,16 +11,16 @@
import 'utils.dart';
void main() {
- StreamController<int> controller;
- List<String> emittedValues;
- bool valuesCanceled;
- bool isDone;
- List<String> errors;
- Stream<String> transformed;
- StreamSubscription<String> subscription;
+ late StreamController<int> controller;
+ late List<String> emittedValues;
+ late bool valuesCanceled;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<String> transformed;
+ late StreamSubscription<String> subscription;
- List<Completer<String>> finishWork;
- List<dynamic> values;
+ late List<Completer<String>> finishWork;
+ late List<dynamic> values;
Future<String> convert(int value) {
values.add(value);
@@ -127,7 +127,7 @@
await controller.close();
expect(isDone, false);
expect(otherDone, false);
- finishWork.first.complete();
+ finishWork.first.complete('');
await Future(() {});
expect(isDone, true);
expect(otherDone, true);
diff --git a/pkgs/stream_transform/test/debounce_test.dart b/pkgs/stream_transform/test/debounce_test.dart
index de6b740..f24bfb9 100644
--- a/pkgs/stream_transform/test/debounce_test.dart
+++ b/pkgs/stream_transform/test/debounce_test.dart
@@ -13,13 +13,13 @@
for (var streamType in streamTypes) {
group('Stream type [$streamType]', () {
group('debounce - trailing', () {
- StreamController<int> values;
- List<int> emittedValues;
- bool valuesCanceled;
- bool isDone;
- List<String> errors;
- StreamSubscription<int> subscription;
- Stream<int> transformed;
+ late StreamController<int> values;
+ late List<int> emittedValues;
+ late bool valuesCanceled;
+ late bool isDone;
+ late List<String> errors;
+ late StreamSubscription<int> subscription;
+ late Stream<int> transformed;
setUp(() async {
valuesCanceled = false;
@@ -89,10 +89,10 @@
});
group('debounce - leading', () {
- StreamController<int> values;
- List<int> emittedValues;
- Stream<int> transformed;
- bool isDone;
+ late StreamController<int> values;
+ late List<int> emittedValues;
+ late Stream<int> transformed;
+ late bool isDone;
setUp(() async {
values = createController(streamType);
@@ -139,9 +139,9 @@
});
group('debounce - leading and trailing', () {
- StreamController<int> values;
- List<int> emittedValues;
- Stream<int> transformed;
+ late StreamController<int> values;
+ late List<int> emittedValues;
+ late Stream<int> transformed;
setUp(() async {
values = createController(streamType);
@@ -179,10 +179,10 @@
});
group('debounceBuffer', () {
- StreamController<int> values;
- List<List<int>> emittedValues;
- List<String> errors;
- Stream<List<int>> transformed;
+ late StreamController<int> values;
+ late List<List<int>> emittedValues;
+ late List<String> errors;
+ late Stream<List<int>> transformed;
setUp(() async {
values = createController(streamType);
diff --git a/pkgs/stream_transform/test/followd_by_test.dart b/pkgs/stream_transform/test/followd_by_test.dart
index c8864b5..fbe904a 100644
--- a/pkgs/stream_transform/test/followd_by_test.dart
+++ b/pkgs/stream_transform/test/followd_by_test.dart
@@ -13,17 +13,17 @@
for (var firstType in streamTypes) {
for (var secondType in streamTypes) {
group('followedBy [$firstType] with [$secondType]', () {
- StreamController<int> first;
- StreamController<int> second;
+ late StreamController<int> first;
+ late StreamController<int> second;
- List<int> emittedValues;
- bool firstCanceled;
- bool secondCanceled;
- bool secondListened;
- bool isDone;
- List<String> errors;
- Stream<int> transformed;
- StreamSubscription<int> subscription;
+ late List<int> emittedValues;
+ late bool firstCanceled;
+ late bool secondCanceled;
+ late bool secondListened;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<int> transformed;
+ late StreamSubscription<int> subscription;
setUp(() async {
firstCanceled = false;
diff --git a/pkgs/stream_transform/test/from_handlers_test.dart b/pkgs/stream_transform/test/from_handlers_test.dart
index 50d59c5..206acc8 100644
--- a/pkgs/stream_transform/test/from_handlers_test.dart
+++ b/pkgs/stream_transform/test/from_handlers_test.dart
@@ -9,13 +9,13 @@
import 'package:stream_transform/src/from_handlers.dart';
void main() {
- StreamController<int> values;
- List<int> emittedValues;
- bool valuesCanceled;
- bool isDone;
- List<String> errors;
- Stream<int> transformed;
- StreamSubscription<int> subscription;
+ late StreamController<int> values;
+ late List<int> emittedValues;
+ late bool valuesCanceled;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<int> transformed;
+ late StreamSubscription<int> subscription;
void setUpForController(StreamController<int> controller,
StreamTransformer<int, int> transformer) {
@@ -68,10 +68,10 @@
});
group('broadcast stream with muliple listeners', () {
- List<int> emittedValues2;
- List<String> errors2;
- bool isDone2;
- StreamSubscription<int> subscription2;
+ late List<int> emittedValues2;
+ late List<String> errors2;
+ late bool isDone2;
+ late StreamSubscription<int> subscription2;
setUp(() {
setUpForController(StreamController.broadcast(), fromHandlers());
@@ -133,9 +133,9 @@
});
group('broadcast stream with multiple listeners', () {
- int dataCallCount;
- int doneCallCount;
- int errorCallCount;
+ late int dataCallCount;
+ late int doneCallCount;
+ late int errorCallCount;
setUp(() async {
dataCallCount = 0;
diff --git a/pkgs/stream_transform/test/merge_test.dart b/pkgs/stream_transform/test/merge_test.dart
index 2eef5d1..24cd76b 100644
--- a/pkgs/stream_transform/test/merge_test.dart
+++ b/pkgs/stream_transform/test/merge_test.dart
@@ -8,8 +8,6 @@
import 'package:stream_transform/stream_transform.dart';
-import 'utils.dart';
-
void main() {
group('merge', () {
test('includes all values', () async {
@@ -140,15 +138,4 @@
expect(secondListenerValues, [1, 2, 3, 4, 5, 6]);
});
});
-
- test('handles null response rom cancel', () async {
- var source = StreamController<int>();
- var other = StreamController<int>();
-
- var subscription = NullOnCancelStream(source.stream)
- .merge(NullOnCancelStream(other.stream))
- .listen(null);
-
- await subscription.cancel();
- });
}
diff --git a/pkgs/stream_transform/test/opt_out_test.dart b/pkgs/stream_transform/test/opt_out_test.dart
new file mode 100644
index 0000000..8ab4fc4
--- /dev/null
+++ b/pkgs/stream_transform/test/opt_out_test.dart
@@ -0,0 +1,72 @@
+// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// @dart=2.9
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:test/test.dart';
+
+import 'package:stream_transform/stream_transform.dart';
+
+void main() {
+ group('null returned from StreamSubscription.cancel', () {
+ void testNullCancel(
+ String name, Stream<void> Function(Stream<void>) transform) {
+ test(name, () async {
+ var subscription = transform(_NullOnCancelStream()).listen(null);
+ await subscription.cancel();
+ });
+ }
+
+ testNullCancel('asyncMapSample', (s) => s.asyncMapSample((_) async {}));
+ testNullCancel('buffer', (s) => s.buffer(_nonEndingStream()));
+ testNullCancel(
+ 'combineLatestAll', (s) => s.combineLatestAll([_NullOnCancelStream()]));
+ testNullCancel('combineLatest',
+ (s) => s.combineLatest(_NullOnCancelStream(), (a, b) {}));
+ testNullCancel('merge', (s) => s.merge(_NullOnCancelStream()));
+
+ test('switchLatest', () async {
+ var subscription =
+ _NullOnCancelStream(Stream<Stream<void>>.value(_NullOnCancelStream()))
+ .switchLatest()
+ .listen(null);
+ await Future(() {});
+ await subscription.cancel();
+ });
+
+ test('concurrentAsyncExpand', () async {
+ var subscription = _NullOnCancelStream(Stream.value(null))
+ .concurrentAsyncExpand((_) => _NullOnCancelStream())
+ .listen(null);
+ await Future(() {});
+ await subscription.cancel();
+ });
+ });
+}
+
+class _NullOnCancelStream<T> extends StreamView<T> {
+ _NullOnCancelStream([Stream<T> stream]) : super(stream ?? _nonEndingStream());
+
+ @override
+ StreamSubscription<T> listen(void Function(T) onData,
+ {Function onError, void Function() onDone, bool cancelOnError}) =>
+ _NullOnCancelSubscription(super.listen(onData,
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError));
+}
+
+class _NullOnCancelSubscription<T> extends DelegatingStreamSubscription<T> {
+ final StreamSubscription<T> _subscription;
+ _NullOnCancelSubscription(this._subscription) : super(_subscription);
+
+ @override
+ Future<void> cancel() {
+ _subscription.cancel();
+ return null;
+ }
+}
+
+Stream<T> _nonEndingStream<T>() => StreamController<T>().stream;
diff --git a/pkgs/stream_transform/test/scan_test.dart b/pkgs/stream_transform/test/scan_test.dart
index 4e58680..e3effb3 100644
--- a/pkgs/stream_transform/test/scan_test.dart
+++ b/pkgs/stream_transform/test/scan_test.dart
@@ -22,7 +22,7 @@
test('can create a broadcast stream', () {
var source = StreamController.broadcast();
- var transformed = source.stream.scan(null, null);
+ var transformed = source.stream.scan(null, (_, __) {});
expect(transformed.isBroadcast, true);
});
diff --git a/pkgs/stream_transform/test/start_with_test.dart b/pkgs/stream_transform/test/start_with_test.dart
index 3a5a025..29241be 100644
--- a/pkgs/stream_transform/test/start_with_test.dart
+++ b/pkgs/stream_transform/test/start_with_test.dart
@@ -11,12 +11,12 @@
import 'utils.dart';
void main() {
- StreamController<int> values;
- Stream<int> transformed;
- StreamSubscription<int> subscription;
+ late StreamController<int> values;
+ late Stream<int> transformed;
+ late StreamSubscription<int> subscription;
- List<int> emittedValues;
- bool isDone;
+ late List<int> emittedValues;
+ late bool isDone;
void setupForStreamType(
String streamType, Stream<int> Function(Stream<int>) transform) {
@@ -102,7 +102,7 @@
for (var startingStreamType in streamTypes) {
group('startWithStream [$startingStreamType] then [$streamType]', () {
- StreamController<int> starting;
+ late StreamController<int> starting;
setUp(() async {
starting = createController(startingStreamType);
setupForStreamType(
diff --git a/pkgs/stream_transform/test/switch_test.dart b/pkgs/stream_transform/test/switch_test.dart
index 16deb4b..b69c8b0 100644
--- a/pkgs/stream_transform/test/switch_test.dart
+++ b/pkgs/stream_transform/test/switch_test.dart
@@ -14,16 +14,16 @@
for (var outerType in streamTypes) {
for (var innerType in streamTypes) {
group('Outer type: [$outerType], Inner type: [$innerType]', () {
- StreamController<int> first;
- StreamController<int> second;
- StreamController<Stream<int>> outer;
+ late StreamController<int> first;
+ late StreamController<int> second;
+ late StreamController<Stream<int>> outer;
- List<int> emittedValues;
- bool firstCanceled;
- bool outerCanceled;
- bool isDone;
- List<String> errors;
- StreamSubscription<int> subscription;
+ late List<int> emittedValues;
+ late bool firstCanceled;
+ late bool outerCanceled;
+ late bool isDone;
+ late List<String> errors;
+ late StreamSubscription<int> subscription;
setUp(() async {
firstCanceled = false;
@@ -141,24 +141,11 @@
test('can create a broadcast stream', () async {
var outer = StreamController.broadcast();
- var transformed = outer.stream.switchMap(null);
+ var transformed = outer.stream.switchMap((_) => const Stream.empty());
expect(transformed.isBroadcast, true);
});
- test('handles null response from cancel', () async {
- var outer = StreamController<Stream<int>>();
- var inner = StreamController<int>();
-
- var subscription =
- NullOnCancelStream(outer.stream).switchLatest().listen(null);
-
- outer.add(NullOnCancelStream(inner.stream));
- await Future<void>(() {});
-
- await subscription.cancel();
- });
-
test('forwards errors from the convert callback', () async {
var errors = <String>[];
var source = Stream.fromIterable([1, 2, 3]);
diff --git a/pkgs/stream_transform/test/take_until_test.dart b/pkgs/stream_transform/test/take_until_test.dart
index c25d342..11e906f 100644
--- a/pkgs/stream_transform/test/take_until_test.dart
+++ b/pkgs/stream_transform/test/take_until_test.dart
@@ -13,14 +13,14 @@
void main() {
for (var streamType in streamTypes) {
group('takeUntil on Stream type [$streamType]', () {
- StreamController<int> values;
- List<int> emittedValues;
- bool valuesCanceled;
- bool isDone;
- List<String> errors;
- Stream<int> transformed;
- StreamSubscription<int> subscription;
- Completer<void> closeTrigger;
+ late StreamController<int> values;
+ late List<int> emittedValues;
+ late bool valuesCanceled;
+ late bool isDone;
+ late List<String> errors;
+ late Stream<int> transformed;
+ late StreamSubscription<int> subscription;
+ late Completer<void> closeTrigger;
setUp(() {
valuesCanceled = false;
diff --git a/pkgs/stream_transform/test/throttle_test.dart b/pkgs/stream_transform/test/throttle_test.dart
index e51ef84..d84fdf4 100644
--- a/pkgs/stream_transform/test/throttle_test.dart
+++ b/pkgs/stream_transform/test/throttle_test.dart
@@ -12,12 +12,12 @@
void main() {
for (var streamType in streamTypes) {
group('Stream type [$streamType]', () {
- StreamController<int> values;
- List<int> emittedValues;
- bool valuesCanceled;
- bool isDone;
- Stream<int> transformed;
- StreamSubscription<int> subscription;
+ late StreamController<int> values;
+ late List<int> emittedValues;
+ late bool valuesCanceled;
+ late bool isDone;
+ late Stream<int> transformed;
+ late StreamSubscription<int> subscription;
group('throttle', () {
setUp(() async {
diff --git a/pkgs/stream_transform/test/utils.dart b/pkgs/stream_transform/test/utils.dart
index 4aa5837..b6196d6 100644
--- a/pkgs/stream_transform/test/utils.dart
+++ b/pkgs/stream_transform/test/utils.dart
@@ -4,8 +4,6 @@
import 'dart:async';
-import 'package:async/async.dart';
-
/// Cycle the event loop to ensure timers are started, then wait for a delay
/// longer than [milliseconds] to allow for the timer to fire.
Future<void> waitForTimer(int milliseconds) =>
@@ -25,26 +23,3 @@
}
const streamTypes = ['single subscription', 'broadcast'];
-
-class NullOnCancelStream<T> extends StreamView<T> {
- final Stream<T> _stream;
-
- NullOnCancelStream(this._stream) : super(_stream);
-
- @override
- StreamSubscription<T> listen(void Function(T) onData,
- {Function onError, void Function() onDone, bool cancelOnError}) =>
- _NullOnCancelSubscription(_stream.listen(onData,
- onError: onError, onDone: onDone, cancelOnError: cancelOnError));
-}
-
-class _NullOnCancelSubscription<T> extends DelegatingStreamSubscription<T> {
- final StreamSubscription<T> _subscription;
- _NullOnCancelSubscription(this._subscription) : super(_subscription);
-
- @override
- Future<void> cancel() {
- _subscription.cancel();
- return null;
- }
-}