Fix throttle broadcast bug (dart-lang/stream_transform#23)

- Switch to `fromHandlers` utility.
- Add a test which fails with the old implementation.
- Add an extra `await new Future(() {})` in the test for handling the
  Stream done event. The new StreamTransformer is not sync like this old
  one - this behavior is still correct.
- Remove extra `handleDone` which matches the default behavior.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index e595919..00e2600 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,7 +1,7 @@
 ## 0.0.6
 
 - Bug Fix: Some transformers did not correctly add data to all listeners on
-  broadcast streams. Fixed for `debounce`, and `audit`.
+  broadcast streams. Fixed for `throttle`, `debounce`, and `audit`.
 
 ## 0.0.5
 
diff --git a/pkgs/stream_transform/lib/src/throttle.dart b/pkgs/stream_transform/lib/src/throttle.dart
index 030c3fd..b8faa25 100644
--- a/pkgs/stream_transform/lib/src/throttle.dart
+++ b/pkgs/stream_transform/lib/src/throttle.dart
@@ -3,19 +3,19 @@
 // BSD-style license that can be found in the LICENSE file.
 import 'dart:async';
 
+import 'from_handlers.dart';
+
 /// Creates a StreamTransformer which only emits once per [duration], at the
 /// beginning of the period.
 StreamTransformer<T, T> throttle<T>(Duration duration) {
   Timer timer;
 
-  return new StreamTransformer.fromHandlers(handleData: (data, sink) {
+  return fromHandlers(handleData: (data, sink) {
     if (timer == null) {
       sink.add(data);
       timer = new Timer(duration, () {
         timer = null;
       });
     }
-  }, handleDone: (sink) {
-    sink.close();
   });
 }
diff --git a/pkgs/stream_transform/test/throttle_test.dart b/pkgs/stream_transform/test/throttle_test.dart
index 888caec..3647cb4 100644
--- a/pkgs/stream_transform/test/throttle_test.dart
+++ b/pkgs/stream_transform/test/throttle_test.dart
@@ -14,6 +14,7 @@
       bool valuesCanceled;
       bool isDone;
       List errors;
+      Stream transformed;
       StreamSubscription subscription;
 
       void setUpStreams(StreamTransformer transformer) {
@@ -25,8 +26,8 @@
         emittedValues = [];
         errors = [];
         isDone = false;
-        subscription = values.stream
-            .transform(transformer)
+        transformed = values.stream.transform(transformer);
+        subscription = transformed
             .listen(emittedValues.add, onError: errors.add, onDone: () {
           isDone = true;
         });
@@ -64,8 +65,20 @@
           values.add(2);
           await new Future(() {});
           await values.close();
+          await new Future(() {});
           expect(isDone, true);
         });
+
+        if (streamType == 'broadcast') {
+          test('multiple listeners all get values', () async {
+            var otherValues = [];
+            transformed.listen(otherValues.add);
+            values.add(1);
+            await new Future(() {});
+            expect(emittedValues, [1]);
+            expect(otherValues, [1]);
+          });
+        }
       });
     });
   }