Change onListen bailouts to asserts (dart-lang/stream_transform#68)
These transformers had a check that should never be true because the
behavior of the `StreamController` should prevent `onListen` from being
called multiple times while there is at least one active listener.
diff --git a/pkgs/stream_transform/lib/src/buffer.dart b/pkgs/stream_transform/lib/src/buffer.dart
index c05e79a..b4c937e 100644
--- a/pkgs/stream_transform/lib/src/buffer.dart
+++ b/pkgs/stream_transform/lib/src/buffer.dart
@@ -86,7 +86,7 @@
}
controller.onListen = () {
- if (valueSub != null) return;
+ assert(valueSub == null);
valueSub = values.listen(onValue,
onError: controller.addError, onDone: onValuesDone);
if (triggerSub != null) {
diff --git a/pkgs/stream_transform/lib/src/combine_latest_all.dart b/pkgs/stream_transform/lib/src/combine_latest_all.dart
index 8141a3f..679f3ef 100644
--- a/pkgs/stream_transform/lib/src/combine_latest_all.dart
+++ b/pkgs/stream_transform/lib/src/combine_latest_all.dart
@@ -63,7 +63,7 @@
List<StreamSubscription> subscriptions;
controller.onListen = () {
- if (subscriptions != null) return;
+ assert(subscriptions == null);
final latestData = List<T>(allStreams.length);
final hasEmitted = <int>{};
diff --git a/pkgs/stream_transform/lib/src/followed_by.dart b/pkgs/stream_transform/lib/src/followed_by.dart
index df506f3..1705542 100644
--- a/pkgs/stream_transform/lib/src/followed_by.dart
+++ b/pkgs/stream_transform/lib/src/followed_by.dart
@@ -60,7 +60,7 @@
currentDoneHandler = onFirstDone;
controller.onListen = () {
- if (subscription != null) return;
+ assert(subscription == null);
listen();
if (!first.isBroadcast) {
controller.onPause = () {
diff --git a/pkgs/stream_transform/lib/src/from_handlers.dart b/pkgs/stream_transform/lib/src/from_handlers.dart
index 6096c4c..99f71cb 100644
--- a/pkgs/stream_transform/lib/src/from_handlers.dart
+++ b/pkgs/stream_transform/lib/src/from_handlers.dart
@@ -54,7 +54,7 @@
StreamSubscription<S> subscription;
controller.onListen = () {
- if (subscription != null) return;
+ assert(subscription == null);
var valuesDone = false;
subscription = values.listen((value) => _handleData(value, controller),
onError: (error, StackTrace stackTrace) {
diff --git a/pkgs/stream_transform/lib/src/merge.dart b/pkgs/stream_transform/lib/src/merge.dart
index ead8bc6..82aa06a 100644
--- a/pkgs/stream_transform/lib/src/merge.dart
+++ b/pkgs/stream_transform/lib/src/merge.dart
@@ -42,7 +42,7 @@
List<StreamSubscription> subscriptions;
controller.onListen = () {
- if (subscriptions != null) return;
+ assert(subscriptions == null);
var activeStreamCount = 0;
subscriptions = allStreams.map((stream) {
activeStreamCount++;
diff --git a/pkgs/stream_transform/lib/src/switch.dart b/pkgs/stream_transform/lib/src/switch.dart
index 87ae4ff..54dc843 100644
--- a/pkgs/stream_transform/lib/src/switch.dart
+++ b/pkgs/stream_transform/lib/src/switch.dart
@@ -38,7 +38,7 @@
StreamSubscription<Stream<T>> outerSubscription;
controller.onListen = () {
- if (outerSubscription != null) return;
+ assert(outerSubscription == null);
StreamSubscription<T> innerSubscription;
var outerStreamDone = false;
diff --git a/pkgs/stream_transform/lib/src/where_type.dart b/pkgs/stream_transform/lib/src/where_type.dart
index 96eb394..6e5b08f 100644
--- a/pkgs/stream_transform/lib/src/where_type.dart
+++ b/pkgs/stream_transform/lib/src/where_type.dart
@@ -30,7 +30,7 @@
StreamSubscription<Object> subscription;
controller.onListen = () {
- if (subscription != null) return;
+ assert(subscription == null);
subscription = source.listen(
(value) {
if (value is R) controller.add(value);