Add `fromHandlers` for easier broadcast streams (dart-lang/stream_transform#21)

Fixes dart-lang/stream_transform#18

StreamTransformer.fromHandlers constructor does not behave as expected
for broadcast streams - handlers are called once per listener rather
than once per event. This means that closures which share state for a
single source stream won't work correctly for broadcast streams with
multiple listeners. The transformers currently using this pattern are
most readable with it vs a custom StreamTransformer class for each one,
so add a `fromHandlers` utility which mimics that constructor but only
calls the handlers once per event and the broadcast multiplexing is done
on the other side.

- Add _StreamTransformers with an overridable handleData and handleDone.
  Eventually handleError will be needed, but this is all that is
  required to solve the bug with `debounce`.
- Add tests for _StreamTransformers across both single-subscrption and
  broadcast streams. All these tests pass with either `new
  StreamTransformers.fromhandlers` or the new `fromhandlers` except the
  'called once' tests.
- Add a pair of tests for debounce and debounceBuffer which exhibit the
  bug - data is only added to one of the listeners because the Timer is
  overridden to the last listener whose handleData is called.
- Update debounce to use `fromHandlers` and see that the tests pass.

Other StreamTransformers in this package have the same bug and will be
fixed in subsequent commits.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 6f632db..50c23d8 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 0.0.6
+
+- Bug Fix: `debounce` correctly adds data to all listeners on a broadcast
+  stream.
+
 ## 0.0.5
 
 - Bug Fix: Allow compiling switchLatest with Dart2Js.
diff --git a/pkgs/stream_transform/lib/src/debounce.dart b/pkgs/stream_transform/lib/src/debounce.dart
index 8d7870e..fffb221 100644
--- a/pkgs/stream_transform/lib/src/debounce.dart
+++ b/pkgs/stream_transform/lib/src/debounce.dart
@@ -3,6 +3,8 @@
 // BSD-style license that can be found in the LICENSE file.
 import 'dart:async';
 
+import 'from_handlers.dart';
+
 /// Creates a StreamTransformer which only emits when the source stream does not
 /// emit for [duration].
 ///
@@ -35,8 +37,7 @@
   Timer timer;
   R soFar;
   bool shouldClose = false;
-  return new StreamTransformer.fromHandlers(
-      handleData: (T value, EventSink<R> sink) {
+  return fromHandlers(handleData: (T value, EventSink<R> sink) {
     timer?.cancel();
     timer = new Timer(duration, () {
       sink.add(soFar);
diff --git a/pkgs/stream_transform/lib/src/from_handlers.dart b/pkgs/stream_transform/lib/src/from_handlers.dart
new file mode 100644
index 0000000..8157ea1
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/from_handlers.dart
@@ -0,0 +1,60 @@
+import 'dart:async';
+
+typedef void HandleData<S, T>(S value, EventSink<T> sink);
+typedef void HandleDone<T>(EventSink<T> sink);
+
+/// Like [new StreamTransformer.fromHandlers] but the handlers are called once
+/// per event rather than once per listener for broadcast streams.
+StreamTransformer<S, T> fromHandlers<S, T>(
+        {HandleData<S, T> handleData, HandleDone<T> handleDone}) =>
+    new _StreamTransformer(handleData: handleData, handleDone: handleDone);
+
+class _StreamTransformer<S, T> implements StreamTransformer<S, T> {
+  final HandleData<S, T> _handleData;
+  final HandleDone<T> _handleDone;
+
+  _StreamTransformer({HandleData<S, T> handleData, HandleDone<T> handleDone})
+      : _handleData = handleData ?? _defaultHandleData,
+        _handleDone = handleDone ?? _defaultHandleDone;
+
+  static _defaultHandleData<S, T>(S value, EventSink<T> sink) {
+    sink.add(value as T);
+  }
+
+  static _defaultHandleDone<T>(EventSink<T> sink) {
+    sink.close();
+  }
+
+  @override
+  Stream<T> bind(Stream<S> values) {
+    StreamController<T> controller;
+    if (values.isBroadcast) {
+      controller = new StreamController<T>.broadcast();
+    } else {
+      controller = new StreamController<T>();
+    }
+    StreamSubscription<S> subscription;
+    controller.onListen = () {
+      if (subscription != null) {
+        return;
+      }
+      subscription = values.listen((value) => _handleData(value, controller),
+          onError: controller.addError, onDone: () {
+        _handleDone(controller);
+      });
+    };
+    if (!values.isBroadcast) {
+      controller.onPause = () => subscription?.pause();
+      controller.onResume = () => subscription?.resume();
+    }
+    controller.onCancel = () {
+      if (controller.hasListener || subscription == null) {
+        return new Future.value();
+      }
+      var toCancel = subscription;
+      subscription = null;
+      return toCancel.cancel();
+    };
+    return controller.stream;
+  }
+}
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index e826730..f1c647e 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -2,7 +2,7 @@
 description: A collection of utilities to transform and manipulate streams.
 author: Dart Team <misc@dartlang.org>
 homepage: https://www.github.com/dart-lang/stream_transform
-version: 0.0.5
+version: 0.0.6-dev
 
 environment:
   sdk: ">=1.22.0 <2.0.0"
diff --git a/pkgs/stream_transform/test/debounce_test.dart b/pkgs/stream_transform/test/debounce_test.dart
index a556b41..a28d2ca 100644
--- a/pkgs/stream_transform/test/debounce_test.dart
+++ b/pkgs/stream_transform/test/debounce_test.dart
@@ -15,6 +15,7 @@
       bool isDone;
       List errors;
       StreamSubscription subscription;
+      Stream transformed;
 
       void setUpStreams(StreamTransformer transformer) {
         valuesCanceled = false;
@@ -25,8 +26,8 @@
         emittedValues = [];
         errors = [];
         isDone = false;
-        subscription = values.stream
-            .transform(transformer)
+        transformed = values.stream.transform(transformer);
+        subscription = transformed
             .listen(emittedValues.add, onError: errors.add, onDone: () {
           isDone = true;
         });
@@ -76,6 +77,18 @@
           await new Future.delayed(const Duration(milliseconds: 10));
           expect(isDone, true);
         });
+
+        if (streamType == 'broadcast') {
+          test('multiple listeners all get values', () async {
+            var otherValues = [];
+            transformed.listen(otherValues.add);
+            values.add(1);
+            values.add(2);
+            await new Future.delayed(const Duration(milliseconds: 10));
+            expect(emittedValues, [2]);
+            expect(otherValues, [2]);
+          });
+        }
       });
 
       group('debounceBuffer', () {
@@ -104,6 +117,22 @@
             [2]
           ]);
         });
+
+        if (streamType == 'broadcast') {
+          test('multiple listeners all get values', () async {
+            var otherValues = [];
+            transformed.listen(otherValues.add);
+            values.add(1);
+            values.add(2);
+            await new Future.delayed(const Duration(milliseconds: 10));
+            expect(emittedValues, [
+              [1, 2]
+            ]);
+            expect(otherValues, [
+              [1, 2]
+            ]);
+          });
+        }
       });
     });
   }
diff --git a/pkgs/stream_transform/test/from_handlers_test.dart b/pkgs/stream_transform/test/from_handlers_test.dart
new file mode 100644
index 0000000..28436bd
--- /dev/null
+++ b/pkgs/stream_transform/test/from_handlers_test.dart
@@ -0,0 +1,165 @@
+import 'dart:async';
+
+import 'package:test/test.dart';
+
+import 'package:stream_transform/src/from_handlers.dart';
+
+void main() {
+  StreamController values;
+  List emittedValues;
+  bool valuesCanceled;
+  bool isDone;
+  List errors;
+  Stream transformed;
+  StreamSubscription subscription;
+
+  void setUpForController(
+      StreamController controller, StreamTransformer transformer) {
+    valuesCanceled = false;
+    values = controller
+      ..onCancel = () {
+        valuesCanceled = true;
+      };
+    emittedValues = [];
+    errors = [];
+    isDone = false;
+    transformed = values.stream.transform(transformer);
+    subscription =
+        transformed.listen(emittedValues.add, onError: errors.add, onDone: () {
+      isDone = true;
+    });
+  }
+
+  group('default from_handlers', () {
+    group('Single subscription stream', () {
+      setUp(() {
+        setUpForController(new StreamController(), fromHandlers());
+      });
+
+      test('has correct stream type', () {
+        expect(transformed.isBroadcast, false);
+      });
+
+      test('forwards values', () async {
+        values.add(1);
+        values.add(2);
+        await new Future(() {});
+        expect(emittedValues, [1, 2]);
+      });
+
+      test('forwards errors', () async {
+        values.addError('error');
+        await new Future(() {});
+        expect(errors, ['error']);
+      });
+
+      test('forwards done', () async {
+        await values.close();
+        await new Future(() {});
+        expect(isDone, true);
+      });
+
+      test('forwards cancel', () async {
+        await subscription.cancel();
+        expect(valuesCanceled, true);
+      });
+    });
+
+    group('broadcast stream with muliple listeners', () {
+      List emittedValues2;
+      List errors2;
+      bool isDone2;
+      StreamSubscription subscription2;
+
+      setUp(() {
+        setUpForController(new StreamController.broadcast(), fromHandlers());
+        emittedValues2 = [];
+        errors2 = [];
+        isDone2 = false;
+        subscription2 = transformed
+            .listen(emittedValues2.add, onError: errors2.add, onDone: () {
+          isDone2 = true;
+        });
+      });
+
+      test('has correct stream type', () {
+        expect(transformed.isBroadcast, true);
+      });
+
+      test('forwards values', () async {
+        values.add(1);
+        values.add(2);
+        await new Future(() {});
+        expect(emittedValues, [1, 2]);
+        expect(emittedValues2, [1, 2]);
+      });
+
+      test('forwards errors', () async {
+        values.addError('error');
+        await new Future(() {});
+        expect(errors, ['error']);
+        expect(errors2, ['error']);
+      });
+
+      test('forwards done', () async {
+        await values.close();
+        await new Future(() {});
+        expect(isDone, true);
+        expect(isDone2, true);
+      });
+
+      test('forwards cancel', () async {
+        await subscription.cancel();
+        expect(valuesCanceled, false);
+        await subscription2.cancel();
+        expect(valuesCanceled, true);
+      });
+    });
+  });
+
+  group('custom handlers', () {
+    group('single subscription', () {
+      setUp(() async {
+        setUpForController(new StreamController(),
+            fromHandlers(handleData: (value, sink) {
+          sink.add(value + 1);
+        }));
+      });
+      test('uses transform from handleData', () async {
+        values.add(1);
+        values.add(2);
+        await new Future(() {});
+        expect(emittedValues, [2, 3]);
+      });
+    });
+
+    group('broadcast stream with multiple listeners', () {
+      int dataCallCount;
+      int doneCallCount;
+
+      setUp(() async {
+        dataCallCount = 0;
+        doneCallCount = 0;
+        setUpForController(
+            new StreamController.broadcast(),
+            fromHandlers(handleData: (value, sink) {
+              dataCallCount++;
+            }, handleDone: (sink) {
+              doneCallCount++;
+            }));
+        transformed.listen((_) {});
+      });
+
+      test('handles data once', () async {
+        values.add(1);
+        await new Future(() {});
+        expect(dataCallCount, 1);
+      });
+
+      test('handles done once', () async {
+        await values.close();
+        expect(doneCallCount, 1);
+      });
+    });
+  });
+}