Change onListen bailouts to asserts (dart-lang/stream_transform#68)

These transformers had a check that should never be true because the
behavior of the `StreamController` should prevent `onListen` from being
called multiple times while there is at least one active listener.
diff --git a/pkgs/stream_transform/lib/src/buffer.dart b/pkgs/stream_transform/lib/src/buffer.dart
index c05e79a..b4c937e 100644
--- a/pkgs/stream_transform/lib/src/buffer.dart
+++ b/pkgs/stream_transform/lib/src/buffer.dart
@@ -86,7 +86,7 @@
     }
 
     controller.onListen = () {
-      if (valueSub != null) return;
+      assert(valueSub == null);
       valueSub = values.listen(onValue,
           onError: controller.addError, onDone: onValuesDone);
       if (triggerSub != null) {
diff --git a/pkgs/stream_transform/lib/src/combine_latest_all.dart b/pkgs/stream_transform/lib/src/combine_latest_all.dart
index 8141a3f..679f3ef 100644
--- a/pkgs/stream_transform/lib/src/combine_latest_all.dart
+++ b/pkgs/stream_transform/lib/src/combine_latest_all.dart
@@ -63,7 +63,7 @@
     List<StreamSubscription> subscriptions;
 
     controller.onListen = () {
-      if (subscriptions != null) return;
+      assert(subscriptions == null);
 
       final latestData = List<T>(allStreams.length);
       final hasEmitted = <int>{};
diff --git a/pkgs/stream_transform/lib/src/followed_by.dart b/pkgs/stream_transform/lib/src/followed_by.dart
index df506f3..1705542 100644
--- a/pkgs/stream_transform/lib/src/followed_by.dart
+++ b/pkgs/stream_transform/lib/src/followed_by.dart
@@ -60,7 +60,7 @@
     currentDoneHandler = onFirstDone;
 
     controller.onListen = () {
-      if (subscription != null) return;
+      assert(subscription == null);
       listen();
       if (!first.isBroadcast) {
         controller.onPause = () {
diff --git a/pkgs/stream_transform/lib/src/from_handlers.dart b/pkgs/stream_transform/lib/src/from_handlers.dart
index 6096c4c..99f71cb 100644
--- a/pkgs/stream_transform/lib/src/from_handlers.dart
+++ b/pkgs/stream_transform/lib/src/from_handlers.dart
@@ -54,7 +54,7 @@
 
     StreamSubscription<S> subscription;
     controller.onListen = () {
-      if (subscription != null) return;
+      assert(subscription == null);
       var valuesDone = false;
       subscription = values.listen((value) => _handleData(value, controller),
           onError: (error, StackTrace stackTrace) {
diff --git a/pkgs/stream_transform/lib/src/merge.dart b/pkgs/stream_transform/lib/src/merge.dart
index ead8bc6..82aa06a 100644
--- a/pkgs/stream_transform/lib/src/merge.dart
+++ b/pkgs/stream_transform/lib/src/merge.dart
@@ -42,7 +42,7 @@
     List<StreamSubscription> subscriptions;
 
     controller.onListen = () {
-      if (subscriptions != null) return;
+      assert(subscriptions == null);
       var activeStreamCount = 0;
       subscriptions = allStreams.map((stream) {
         activeStreamCount++;
diff --git a/pkgs/stream_transform/lib/src/switch.dart b/pkgs/stream_transform/lib/src/switch.dart
index 87ae4ff..54dc843 100644
--- a/pkgs/stream_transform/lib/src/switch.dart
+++ b/pkgs/stream_transform/lib/src/switch.dart
@@ -38,7 +38,7 @@
     StreamSubscription<Stream<T>> outerSubscription;
 
     controller.onListen = () {
-      if (outerSubscription != null) return;
+      assert(outerSubscription == null);
 
       StreamSubscription<T> innerSubscription;
       var outerStreamDone = false;
diff --git a/pkgs/stream_transform/lib/src/where_type.dart b/pkgs/stream_transform/lib/src/where_type.dart
index 96eb394..6e5b08f 100644
--- a/pkgs/stream_transform/lib/src/where_type.dart
+++ b/pkgs/stream_transform/lib/src/where_type.dart
@@ -30,7 +30,7 @@
 
     StreamSubscription<Object> subscription;
     controller.onListen = () {
-      if (subscription != null) return;
+      assert(subscription == null);
       subscription = source.listen(
           (value) {
             if (value is R) controller.add(value);