Use sync forwarders in buffer (dart-lang/stream_transform#31)

Towards dart-lang/stream_transform#24

Fixes a bug where the output would close too soon if there was a pending
trigger.

Fixes some bugs in the test.

Fixes a bug with canceling when values are broadcast but trigger is
single subscription.

Refactor for better readability

- Fix tests to pass the valuesType instead of the triggerType for both
  streams.
- Refactor for more clear parallel structure between handlers for values
  and triggers.
- Inline _collectToList
- Add booleans to track when streams are done rather than overload what
  it means for their subscriptions to be null.
- Only return a Future from onCancel when there were streams open that
  needed to be canceled. This means that some of the extra awaits in
  tests need to be left in to properly forward errors closing stream.
- Pull the trigger canceling tests out of the loops since the behavior
  depends on both stream types.
- Cancel rather than pause trigger subscription if we won't need it
  again.
- Add tests that broadcast stream (from values) can be canceled and
  relistened
- Set up onPause, onResume, and onCancel in onListen
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index da8fd25..25d39a5 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -6,6 +6,10 @@
   listener.
 - Bug Fix: Allow canceling and re-listening to broadcast streams after a
   `merge` transform.
+- Bug Fix: Broadcast streams which are buffered using a single-subscription
+  trigger can be canceled and re-listened.
+- Bug Fix: Buffer outputs one more value if there is a pending trigger before
+  the trigger closes.
 - Bug Fix: Single-subscription streams concatted after broadcast streams are
   handled correctly.
 - Use sync `StreamControllers` for forwarding where possible.
diff --git a/pkgs/stream_transform/lib/src/buffer.dart b/pkgs/stream_transform/lib/src/buffer.dart
index 4f2976d..f5ef7bf 100644
--- a/pkgs/stream_transform/lib/src/buffer.dart
+++ b/pkgs/stream_transform/lib/src/buffer.dart
@@ -14,12 +14,6 @@
 /// the output.
 StreamTransformer<T, List<T>> buffer<T>(Stream trigger) => new _Buffer(trigger);
 
-List<T> _collectToList<T>(T element, List<T> soFar) {
-  soFar ??= <T>[];
-  soFar.add(element);
-  return soFar;
-}
-
 /// A StreamTransformer which aggregates values and emits when it sees a value
 /// on [_trigger].
 ///
@@ -36,36 +30,17 @@
 
   @override
   Stream<List<T>> bind(Stream<T> values) {
-    StreamController<List<T>> controller;
-    if (values.isBroadcast) {
-      controller = new StreamController<List<T>>.broadcast();
-    } else {
-      controller = new StreamController<List<T>>();
-    }
+    var controller = values.isBroadcast
+        ? new StreamController<List<T>>.broadcast(sync: true)
+        : new StreamController<List<T>>(sync: true);
 
     List<T> currentResults;
     bool waitingForTrigger = true;
-    StreamSubscription valuesSub;
+    bool isTriggerDone = false;
+    bool isValueDone = false;
+    StreamSubscription valueSub;
     StreamSubscription triggerSub;
 
-    cancelValues() {
-      var sub = valuesSub;
-      valuesSub = null;
-      return sub?.cancel() ?? new Future.value();
-    }
-
-    cancelTrigger() {
-      var sub = triggerSub;
-      triggerSub = null;
-      return sub?.cancel() ?? new Future.value();
-    }
-
-    closeController() {
-      var ctl = controller;
-      controller = null;
-      return ctl?.close() ?? new Future.value();
-    }
-
     emit() {
       controller.add(currentResults);
       currentResults = null;
@@ -73,72 +48,77 @@
     }
 
     onValue(T value) {
-      currentResults = _collectToList(value, currentResults);
-      if (!waitingForTrigger) {
-        emit();
+      (currentResults ??= <T>[]).add(value);
+
+      if (!waitingForTrigger) emit();
+
+      if (isTriggerDone) {
+        valueSub.cancel();
+        controller.close();
       }
     }
 
-    valuesDone() {
-      valuesSub = null;
+    onValuesDone() {
+      isValueDone = true;
       if (currentResults == null) {
-        closeController();
-        cancelTrigger();
+        triggerSub?.cancel();
+        controller.close();
       }
     }
 
     onTrigger(_) {
-      if (currentResults == null) {
-        waitingForTrigger = false;
-        return;
-      }
-      emit();
-      if (valuesSub == null) {
-        closeController();
-        cancelTrigger();
+      waitingForTrigger = false;
+
+      if (currentResults != null) emit();
+
+      if (isValueDone) {
+        triggerSub.cancel();
+        controller.close();
       }
     }
 
-    triggerDone() {
-      cancelValues();
-      closeController();
+    onTriggerDone() {
+      isTriggerDone = true;
+      if (waitingForTrigger) {
+        valueSub?.cancel();
+        controller.close();
+      }
     }
 
     controller.onListen = () {
-      if (valuesSub != null) return;
-      valuesSub = values.listen(onValue,
-          onError: controller.addError, onDone: valuesDone);
+      if (valueSub != null) return;
+      valueSub = values.listen(onValue,
+          onError: controller.addError, onDone: onValuesDone);
       if (triggerSub != null) {
         if (triggerSub.isPaused) triggerSub.resume();
       } else {
         triggerSub = _trigger.listen(onTrigger,
-            onError: controller.addError, onDone: triggerDone);
+            onError: controller.addError, onDone: onTriggerDone);
       }
-    };
-
-    // Forward methods from listener
-    if (!values.isBroadcast) {
-      controller.onPause = () {
-        valuesSub?.pause();
-        triggerSub?.pause();
-      };
-      controller.onResume = () {
-        valuesSub?.resume();
-        triggerSub?.resume();
-      };
-      controller.onCancel =
-          () => Future.wait([cancelValues(), cancelTrigger()]);
-    } else {
+      if (!values.isBroadcast) {
+        controller.onPause = () {
+          valueSub?.pause();
+          triggerSub?.pause();
+        };
+        controller.onResume = () {
+          valueSub?.resume();
+          triggerSub?.resume();
+        };
+      }
       controller.onCancel = () {
-        if (controller?.hasListener ?? false) return;
-        if (_trigger.isBroadcast) {
-          cancelTrigger();
+        var toCancel = <StreamSubscription>[];
+        if (!isValueDone) toCancel.add(valueSub);
+        valueSub = null;
+        if (_trigger.isBroadcast || !values.isBroadcast) {
+          if (!isTriggerDone) toCancel.add(triggerSub);
+          triggerSub = null;
         } else {
           triggerSub.pause();
         }
-        cancelValues();
+        if (toCancel.isEmpty) return null;
+        return Future.wait(toCancel.map((s) => s.cancel()));
       };
-    }
+    };
     return controller.stream;
   }
 }
diff --git a/pkgs/stream_transform/test/buffer_test.dart b/pkgs/stream_transform/test/buffer_test.dart
index c371ac5..4d3a2b6 100644
--- a/pkgs/stream_transform/test/buffer_test.dart
+++ b/pkgs/stream_transform/test/buffer_test.dart
@@ -12,37 +12,49 @@
     'single subscription': () => new StreamController(),
     'broadcast': () => new StreamController.broadcast()
   };
+  StreamController trigger;
+  StreamController values;
+  List emittedValues;
+  bool valuesCanceled;
+  bool triggerCanceled;
+  bool triggerPaused;
+  bool isDone;
+  List errors;
+  Stream transformed;
+  StreamSubscription subscription;
+
+  void setUpForStreamTypes(String triggerType, String valuesType) {
+    valuesCanceled = false;
+    triggerCanceled = false;
+    triggerPaused = false;
+    trigger = streamTypes[triggerType]()
+      ..onCancel = () {
+        triggerCanceled = true;
+      };
+    if (triggerType == 'single subscription') {
+      trigger.onPause = () {
+        triggerPaused = true;
+      };
+    }
+    values = streamTypes[valuesType]()
+      ..onCancel = () {
+        valuesCanceled = true;
+      };
+    emittedValues = [];
+    errors = [];
+    isDone = false;
+    transformed = values.stream.transform(buffer(trigger.stream));
+    subscription =
+        transformed.listen(emittedValues.add, onError: errors.add, onDone: () {
+      isDone = true;
+    });
+  }
+
   for (var triggerType in streamTypes.keys) {
     for (var valuesType in streamTypes.keys) {
       group('Trigger type: [$triggerType], Values type: [$valuesType]', () {
-        StreamController trigger;
-        StreamController values;
-        List emittedValues;
-        bool valuesCanceled;
-        bool triggerCanceled;
-        bool isDone;
-        List errors;
-        StreamSubscription subscription;
-
-        setUp(() async {
-          valuesCanceled = false;
-          triggerCanceled = false;
-          trigger = streamTypes[triggerType]()
-            ..onCancel = () {
-              triggerCanceled = true;
-            };
-          values = streamTypes[triggerType]()
-            ..onCancel = () {
-              valuesCanceled = true;
-            };
-          emittedValues = [];
-          errors = [];
-          isDone = false;
-          subscription = values.stream
-              .transform(buffer(trigger.stream))
-              .listen(emittedValues.add, onError: errors.add, onDone: () {
-            isDone = true;
-          });
+        setUp(() {
+          setUpForStreamTypes(triggerType, valuesType);
         });
 
         test('does not emit before `trigger`', () async {
@@ -130,12 +142,6 @@
           expect(valuesCanceled, true);
         });
 
-        test('cancels trigger subscription when output canceled', () async {
-          expect(triggerCanceled, false);
-          await subscription.cancel();
-          expect(triggerCanceled, true);
-        });
-
         test('closes when trigger ends', () async {
           expect(isDone, false);
           await trigger.close();
@@ -157,8 +163,7 @@
           expect(isDone, true);
         });
 
-        test(
-            'closes immediately if there are no pending values when source closes',
+        test('closes if there are no pending values when source closes',
             () async {
           expect(isDone, false);
           values.add(1);
@@ -168,6 +173,19 @@
           expect(isDone, true);
         });
 
+        test('waits to emit if there is a pending trigger when trigger closes',
+            () async {
+          trigger.add(null);
+          await trigger.close();
+          expect(isDone, false);
+          values.add(1);
+          await new Future(() {});
+          expect(emittedValues, [
+            [1]
+          ]);
+          expect(isDone, true);
+        });
+
         test('forwards errors from trigger', () async {
           trigger.addError('error');
           await new Future(() {});
@@ -182,4 +200,56 @@
       });
     }
   }
+
+  test('always cancels trigger if values is singlesubscription', () async {
+    setUpForStreamTypes('broadcast', 'single subscription');
+    expect(triggerCanceled, false);
+    await subscription.cancel();
+    expect(triggerCanceled, true);
+
+    setUpForStreamTypes('single subscription', 'single subscription');
+    expect(triggerCanceled, false);
+    await subscription.cancel();
+    expect(triggerCanceled, true);
+  });
+
+  test('cancels trigger if trigger is broadcast', () async {
+    setUpForStreamTypes('broadcast', 'broadcast');
+    expect(triggerCanceled, false);
+    await subscription.cancel();
+    expect(triggerCanceled, true);
+  });
+
+  test('pauses single subscription trigger for broadcast values', () async {
+    setUpForStreamTypes('single subscription', 'broadcast');
+    expect(triggerCanceled, false);
+    expect(triggerPaused, false);
+    await subscription.cancel();
+    expect(triggerCanceled, false);
+    expect(triggerPaused, true);
+  });
+
+  for (var triggerType in streamTypes.keys) {
+    test('cancel and relisten with [$triggerType] trigger', () async {
+      setUpForStreamTypes(triggerType, 'broadcast');
+      values.add(1);
+      trigger.add(null);
+      await new Future(() {});
+      expect(emittedValues, [
+        [1]
+      ]);
+      await subscription.cancel();
+      values.add(2);
+      trigger.add(null);
+      await new Future(() {});
+      subscription = transformed.listen(emittedValues.add);
+      values.add(3);
+      trigger.add(null);
+      await new Future(() {});
+      expect(emittedValues, [
+        [1],
+        [3]
+      ]);
+    });
+  }
 }