Fix throttle broadcast bug (dart-lang/stream_transform#23)
- Switch to `fromHandlers` utility.
- Add a test which fails with the old implementation.
- Add an extra `await new Future(() {})` in the test for handling the
Stream done event. The new StreamTransformer is not sync like this old
one - this behavior is still correct.
- Remove extra `handleDone` which matches the default behavior.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index e595919..00e2600 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,7 +1,7 @@
## 0.0.6
- Bug Fix: Some transformers did not correctly add data to all listeners on
- broadcast streams. Fixed for `debounce`, and `audit`.
+ broadcast streams. Fixed for `throttle`, `debounce`, and `audit`.
## 0.0.5
diff --git a/pkgs/stream_transform/lib/src/throttle.dart b/pkgs/stream_transform/lib/src/throttle.dart
index 030c3fd..b8faa25 100644
--- a/pkgs/stream_transform/lib/src/throttle.dart
+++ b/pkgs/stream_transform/lib/src/throttle.dart
@@ -3,19 +3,19 @@
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
+import 'from_handlers.dart';
+
/// Creates a StreamTransformer which only emits once per [duration], at the
/// beginning of the period.
StreamTransformer<T, T> throttle<T>(Duration duration) {
Timer timer;
- return new StreamTransformer.fromHandlers(handleData: (data, sink) {
+ return fromHandlers(handleData: (data, sink) {
if (timer == null) {
sink.add(data);
timer = new Timer(duration, () {
timer = null;
});
}
- }, handleDone: (sink) {
- sink.close();
});
}
diff --git a/pkgs/stream_transform/test/throttle_test.dart b/pkgs/stream_transform/test/throttle_test.dart
index 888caec..3647cb4 100644
--- a/pkgs/stream_transform/test/throttle_test.dart
+++ b/pkgs/stream_transform/test/throttle_test.dart
@@ -14,6 +14,7 @@
bool valuesCanceled;
bool isDone;
List errors;
+ Stream transformed;
StreamSubscription subscription;
void setUpStreams(StreamTransformer transformer) {
@@ -25,8 +26,8 @@
emittedValues = [];
errors = [];
isDone = false;
- subscription = values.stream
- .transform(transformer)
+ transformed = values.stream.transform(transformer);
+ subscription = transformed
.listen(emittedValues.add, onError: errors.add, onDone: () {
isDone = true;
});
@@ -64,8 +65,20 @@
values.add(2);
await new Future(() {});
await values.close();
+ await new Future(() {});
expect(isDone, true);
});
+
+ if (streamType == 'broadcast') {
+ test('multiple listeners all get values', () async {
+ var otherValues = [];
+ transformed.listen(otherValues.add);
+ values.add(1);
+ await new Future(() {});
+ expect(emittedValues, [1]);
+ expect(otherValues, [1]);
+ });
+ }
});
});
}