Handle `null` returns from cancelled subscriptions (dart-lang/stream_transform#97)
A call to `StreamSubscription.cancel` is allowed to return `null`,
though it never does using the `StreamController` from the SDK. If a
custom Stream implementation does synchronously cancel and return `null`
it could cause problems in a number of transformers which would attempt
to `Future.wait` on the results of the cancels.
- Update the places using `Future.wait` to filter out null results
first.
- Refactor some places to use collection literals and conditional
elements.
- Filter out nulls first, and if every canceled subscription was
synchronous skip the call to `Future.wait` entirely.
- Add a custom Stream implementation which always returns `null` when
it's subscription is canceled as a test utility.
- Add tests that would fail without the null filtering on each
implementation that was updated.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 686f919..5342392 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,3 +1,10 @@
+## 1.1.1
+
+- Fix a bug in `asyncMapSample`, `buffer`, `combineLatest`,
+ `combineLatestAll`, `merge`, and `mergeAll` which would cause an exception
+ when cancelling a subscription after using the transformer if the original
+ stream(s) returned `null` from cancelling their subscriptions.
+
## 1.1.0
- Add `concurrentAsyncExpand` to interleave events emitted by multiple sub
diff --git a/pkgs/stream_transform/lib/src/aggregate_sample.dart b/pkgs/stream_transform/lib/src/aggregate_sample.dart
index 5ddeb10..88fc490 100644
--- a/pkgs/stream_transform/lib/src/aggregate_sample.dart
+++ b/pkgs/stream_transform/lib/src/aggregate_sample.dart
@@ -107,8 +107,10 @@
} else {
triggerSub.pause();
}
- if (toCancel.isEmpty) return null;
- return Future.wait(toCancel.map((s) => s.cancel()));
+ var cancels =
+ toCancel.map((s) => s.cancel()).where((f) => f != null).toList();
+ if (cancels.isEmpty) return null;
+ return Future.wait(cancels).then((_) => null);
};
};
return controller.stream;
diff --git a/pkgs/stream_transform/lib/src/combine_latest.dart b/pkgs/stream_transform/lib/src/combine_latest.dart
index 7fee05d..303b16a 100644
--- a/pkgs/stream_transform/lib/src/combine_latest.dart
+++ b/pkgs/stream_transform/lib/src/combine_latest.dart
@@ -175,11 +175,11 @@
};
}
controller.onCancel = () {
- var cancelSource = sourceSubscription.cancel();
- var cancelOther = otherSubscription.cancel();
+ var cancels = [sourceSubscription.cancel(), otherSubscription.cancel()]
+ .where((f) => f != null);
sourceSubscription = null;
otherSubscription = null;
- return Future.wait([cancelSource, cancelOther]);
+ return Future.wait(cancels).then((_) => null);
};
};
return controller.stream;
@@ -249,8 +249,12 @@
};
}
controller.onCancel = () {
- if (subscriptions.isEmpty) return null;
- return Future.wait(subscriptions.map((s) => s.cancel()));
+ var cancels = subscriptions
+ .map((s) => s.cancel())
+ .where((f) => f != null)
+ .toList();
+ if (cancels.isEmpty) return null;
+ return Future.wait(cancels).then((_) => null);
};
};
return controller.stream;
diff --git a/pkgs/stream_transform/lib/src/merge.dart b/pkgs/stream_transform/lib/src/merge.dart
index 99d19c9..f654d35 100644
--- a/pkgs/stream_transform/lib/src/merge.dart
+++ b/pkgs/stream_transform/lib/src/merge.dart
@@ -124,8 +124,12 @@
};
}
controller.onCancel = () {
- if (subscriptions.isEmpty) return null;
- return Future.wait(subscriptions.map((s) => s.cancel()));
+ var cancels = subscriptions
+ .map((s) => s.cancel())
+ .where((f) => f != null)
+ .toList();
+ if (cancels.isEmpty) return null;
+ return Future.wait(cancels).then((_) => null);
};
};
return controller.stream;
@@ -172,7 +176,12 @@
};
}
controller.onCancel = () {
- return Future.wait(subscriptions.map((s) => s.cancel()));
+ var cancels = subscriptions
+ .map((s) => s.cancel())
+ .where((f) => f != null)
+ .toList();
+ if (cancels.isEmpty) return null;
+ return Future.wait(cancels).then((_) => null);
};
};
return controller.stream;
diff --git a/pkgs/stream_transform/lib/src/switch.dart b/pkgs/stream_transform/lib/src/switch.dart
index b30556a..5125d57 100644
--- a/pkgs/stream_transform/lib/src/switch.dart
+++ b/pkgs/stream_transform/lib/src/switch.dart
@@ -41,15 +41,11 @@
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
- StreamSubscription<Stream<T>> outerSubscription;
-
controller.onListen = () {
- assert(outerSubscription == null);
-
StreamSubscription<T> innerSubscription;
var outerStreamDone = false;
- outerSubscription = outer.listen(
+ final outerSubscription = outer.listen(
(innerStream) {
innerSubscription?.cancel();
innerSubscription = innerStream.listen(controller.add,
@@ -75,15 +71,12 @@
};
}
controller.onCancel = () {
- var toCancel = <StreamSubscription<void>>[];
- if (!outerStreamDone) toCancel.add(outerSubscription);
- if (innerSubscription != null) {
- toCancel.add(innerSubscription);
- }
- outerSubscription = null;
- innerSubscription = null;
- if (toCancel.isEmpty) return null;
- return Future.wait(toCancel.map((s) => s.cancel()));
+ var cancels = [
+ if (!outerStreamDone) outerSubscription.cancel(),
+ if (innerSubscription != null) innerSubscription.cancel(),
+ ].where((f) => f != null);
+ if (cancels.isEmpty) return null;
+ return Future.wait(cancels).then((_) => null);
};
};
return controller.stream;
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index 1c8d88e..62a7fa6 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -1,11 +1,12 @@
name: stream_transform
description: A collection of utilities to transform and manipulate streams.
homepage: https://www.github.com/dart-lang/stream_transform
-version: 1.1.1-dev
+version: 1.1.1
environment:
sdk: ">=2.6.0 <3.0.0"
dev_dependencies:
+ async: ^2.0.0
pedantic: ^1.5.0
test: ^1.0.0
diff --git a/pkgs/stream_transform/test/async_map_sample_test.dart b/pkgs/stream_transform/test/async_map_sample_test.dart
index 119e0c8..9c37cd4 100644
--- a/pkgs/stream_transform/test/async_map_sample_test.dart
+++ b/pkgs/stream_transform/test/async_map_sample_test.dart
@@ -200,4 +200,13 @@
}
});
}
+ test('handles null response from cancel', () async {
+ var controller = StreamController<int>();
+
+ var subscription = NullOnCancelStream(controller.stream)
+ .asyncMapSample((_) async {})
+ .listen(null);
+
+ await subscription.cancel();
+ });
}
diff --git a/pkgs/stream_transform/test/buffer_test.dart b/pkgs/stream_transform/test/buffer_test.dart
index 5800586..80a793c 100644
--- a/pkgs/stream_transform/test/buffer_test.dart
+++ b/pkgs/stream_transform/test/buffer_test.dart
@@ -243,4 +243,13 @@
]);
});
}
+
+ test('handles null response from cancel', () async {
+ var controller = StreamController<int>();
+ var trigger = StreamController<void>();
+ var subscription = NullOnCancelStream(controller.stream)
+ .buffer(trigger.stream)
+ .listen(null);
+ await subscription.cancel();
+ });
}
diff --git a/pkgs/stream_transform/test/combine_latest_all_test.dart b/pkgs/stream_transform/test/combine_latest_all_test.dart
index 2943449..d57d618 100644
--- a/pkgs/stream_transform/test/combine_latest_all_test.dart
+++ b/pkgs/stream_transform/test/combine_latest_all_test.dart
@@ -8,6 +8,8 @@
import 'package:stream_transform/stream_transform.dart';
+import 'utils.dart';
+
Future<void> tick() => Future(() {});
void main() {
@@ -164,4 +166,14 @@
});
});
});
+
+ test('handles null response from cancel', () async {
+ var source = StreamController<int>();
+ var other = StreamController<int>();
+
+ var subscription = NullOnCancelStream(source.stream)
+ .combineLatestAll([NullOnCancelStream(other.stream)]).listen(null);
+
+ await subscription.cancel();
+ });
}
diff --git a/pkgs/stream_transform/test/combine_latest_test.dart b/pkgs/stream_transform/test/combine_latest_test.dart
index 2b35e2c..4ea9499 100644
--- a/pkgs/stream_transform/test/combine_latest_test.dart
+++ b/pkgs/stream_transform/test/combine_latest_test.dart
@@ -9,6 +9,8 @@
import 'package:stream_transform/stream_transform.dart';
+import 'utils.dart';
+
void main() {
group('combineLatest', () {
test('flows through combine callback', () async {
@@ -168,6 +170,17 @@
});
});
});
+
+ test('handles null response from cancel', () async {
+ var source = StreamController<int>();
+ var other = StreamController<int>();
+
+ var subscription = NullOnCancelStream(source.stream)
+ .combineLatest(NullOnCancelStream(other.stream), (a, b) => null)
+ .listen(null);
+
+ await subscription.cancel();
+ });
}
class _NumberedException implements Exception {
diff --git a/pkgs/stream_transform/test/concurrent_async_expand_test.dart b/pkgs/stream_transform/test/concurrent_async_expand_test.dart
index f843c6f..9ed02d0 100644
--- a/pkgs/stream_transform/test/concurrent_async_expand_test.dart
+++ b/pkgs/stream_transform/test/concurrent_async_expand_test.dart
@@ -184,4 +184,19 @@
});
}
}
+
+ test('hendles null response from cancel', () async {
+ var source = StreamController<int>();
+ var other = StreamController<int>();
+
+ var subscription = NullOnCancelStream(source.stream)
+ .concurrentAsyncExpand((_) => NullOnCancelStream(other.stream))
+ .listen(null);
+
+ source.add(1);
+
+ await Future<void>(() {});
+
+ await subscription.cancel();
+ });
}
diff --git a/pkgs/stream_transform/test/merge_test.dart b/pkgs/stream_transform/test/merge_test.dart
index 24cd76b..2eef5d1 100644
--- a/pkgs/stream_transform/test/merge_test.dart
+++ b/pkgs/stream_transform/test/merge_test.dart
@@ -8,6 +8,8 @@
import 'package:stream_transform/stream_transform.dart';
+import 'utils.dart';
+
void main() {
group('merge', () {
test('includes all values', () async {
@@ -138,4 +140,15 @@
expect(secondListenerValues, [1, 2, 3, 4, 5, 6]);
});
});
+
+ test('handles null response rom cancel', () async {
+ var source = StreamController<int>();
+ var other = StreamController<int>();
+
+ var subscription = NullOnCancelStream(source.stream)
+ .merge(NullOnCancelStream(other.stream))
+ .listen(null);
+
+ await subscription.cancel();
+ });
}
diff --git a/pkgs/stream_transform/test/switch_test.dart b/pkgs/stream_transform/test/switch_test.dart
index ba26503..40030f0 100644
--- a/pkgs/stream_transform/test/switch_test.dart
+++ b/pkgs/stream_transform/test/switch_test.dart
@@ -146,4 +146,17 @@
expect(transformed.isBroadcast, true);
});
});
+
+ test('handles null response from cancel', () async {
+ var outer = StreamController<Stream<int>>();
+ var inner = StreamController<int>();
+
+ var subscription =
+ NullOnCancelStream(outer.stream).switchLatest().listen(null);
+
+ outer.add(NullOnCancelStream(inner.stream));
+ await Future<void>(() {});
+
+ await subscription.cancel();
+ });
}
diff --git a/pkgs/stream_transform/test/utils.dart b/pkgs/stream_transform/test/utils.dart
index b6196d6..4aa5837 100644
--- a/pkgs/stream_transform/test/utils.dart
+++ b/pkgs/stream_transform/test/utils.dart
@@ -4,6 +4,8 @@
import 'dart:async';
+import 'package:async/async.dart';
+
/// Cycle the event loop to ensure timers are started, then wait for a delay
/// longer than [milliseconds] to allow for the timer to fire.
Future<void> waitForTimer(int milliseconds) =>
@@ -23,3 +25,26 @@
}
const streamTypes = ['single subscription', 'broadcast'];
+
+class NullOnCancelStream<T> extends StreamView<T> {
+ final Stream<T> _stream;
+
+ NullOnCancelStream(this._stream) : super(_stream);
+
+ @override
+ StreamSubscription<T> listen(void Function(T) onData,
+ {Function onError, void Function() onDone, bool cancelOnError}) =>
+ _NullOnCancelSubscription(_stream.listen(onData,
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError));
+}
+
+class _NullOnCancelSubscription<T> extends DelegatingStreamSubscription<T> {
+ final StreamSubscription<T> _subscription;
+ _NullOnCancelSubscription(this._subscription) : super(_subscription);
+
+ @override
+ Future<void> cancel() {
+ _subscription.cancel();
+ return null;
+ }
+}