Add `fromHandlers` for easier broadcast streams (dart-lang/stream_transform#21)
Fixes dart-lang/stream_transform#18
StreamTransformer.fromHandlers constructor does not behave as expected
for broadcast streams - handlers are called once per listener rather
than once per event. This means that closures which share state for a
single source stream won't work correctly for broadcast streams with
multiple listeners. The transformers currently using this pattern are
most readable with it vs a custom StreamTransformer class for each one,
so add a `fromHandlers` utility which mimics that constructor but only
calls the handlers once per event and the broadcast multiplexing is done
on the other side.
- Add _StreamTransformers with an overridable handleData and handleDone.
Eventually handleError will be needed, but this is all that is
required to solve the bug with `debounce`.
- Add tests for _StreamTransformers across both single-subscrption and
broadcast streams. All these tests pass with either `new
StreamTransformers.fromhandlers` or the new `fromhandlers` except the
'called once' tests.
- Add a pair of tests for debounce and debounceBuffer which exhibit the
bug - data is only added to one of the listeners because the Timer is
overridden to the last listener whose handleData is called.
- Update debounce to use `fromHandlers` and see that the tests pass.
Other StreamTransformers in this package have the same bug and will be
fixed in subsequent commits.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 6f632db..50c23d8 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 0.0.6
+
+- Bug Fix: `debounce` correctly adds data to all listeners on a broadcast
+ stream.
+
## 0.0.5
- Bug Fix: Allow compiling switchLatest with Dart2Js.
diff --git a/pkgs/stream_transform/lib/src/debounce.dart b/pkgs/stream_transform/lib/src/debounce.dart
index 8d7870e..fffb221 100644
--- a/pkgs/stream_transform/lib/src/debounce.dart
+++ b/pkgs/stream_transform/lib/src/debounce.dart
@@ -3,6 +3,8 @@
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
+import 'from_handlers.dart';
+
/// Creates a StreamTransformer which only emits when the source stream does not
/// emit for [duration].
///
@@ -35,8 +37,7 @@
Timer timer;
R soFar;
bool shouldClose = false;
- return new StreamTransformer.fromHandlers(
- handleData: (T value, EventSink<R> sink) {
+ return fromHandlers(handleData: (T value, EventSink<R> sink) {
timer?.cancel();
timer = new Timer(duration, () {
sink.add(soFar);
diff --git a/pkgs/stream_transform/lib/src/from_handlers.dart b/pkgs/stream_transform/lib/src/from_handlers.dart
new file mode 100644
index 0000000..8157ea1
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/from_handlers.dart
@@ -0,0 +1,60 @@
+import 'dart:async';
+
+typedef void HandleData<S, T>(S value, EventSink<T> sink);
+typedef void HandleDone<T>(EventSink<T> sink);
+
+/// Like [new StreamTransformer.fromHandlers] but the handlers are called once
+/// per event rather than once per listener for broadcast streams.
+StreamTransformer<S, T> fromHandlers<S, T>(
+ {HandleData<S, T> handleData, HandleDone<T> handleDone}) =>
+ new _StreamTransformer(handleData: handleData, handleDone: handleDone);
+
+class _StreamTransformer<S, T> implements StreamTransformer<S, T> {
+ final HandleData<S, T> _handleData;
+ final HandleDone<T> _handleDone;
+
+ _StreamTransformer({HandleData<S, T> handleData, HandleDone<T> handleDone})
+ : _handleData = handleData ?? _defaultHandleData,
+ _handleDone = handleDone ?? _defaultHandleDone;
+
+ static _defaultHandleData<S, T>(S value, EventSink<T> sink) {
+ sink.add(value as T);
+ }
+
+ static _defaultHandleDone<T>(EventSink<T> sink) {
+ sink.close();
+ }
+
+ @override
+ Stream<T> bind(Stream<S> values) {
+ StreamController<T> controller;
+ if (values.isBroadcast) {
+ controller = new StreamController<T>.broadcast();
+ } else {
+ controller = new StreamController<T>();
+ }
+ StreamSubscription<S> subscription;
+ controller.onListen = () {
+ if (subscription != null) {
+ return;
+ }
+ subscription = values.listen((value) => _handleData(value, controller),
+ onError: controller.addError, onDone: () {
+ _handleDone(controller);
+ });
+ };
+ if (!values.isBroadcast) {
+ controller.onPause = () => subscription?.pause();
+ controller.onResume = () => subscription?.resume();
+ }
+ controller.onCancel = () {
+ if (controller.hasListener || subscription == null) {
+ return new Future.value();
+ }
+ var toCancel = subscription;
+ subscription = null;
+ return toCancel.cancel();
+ };
+ return controller.stream;
+ }
+}
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index e826730..f1c647e 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -2,7 +2,7 @@
description: A collection of utilities to transform and manipulate streams.
author: Dart Team <misc@dartlang.org>
homepage: https://www.github.com/dart-lang/stream_transform
-version: 0.0.5
+version: 0.0.6-dev
environment:
sdk: ">=1.22.0 <2.0.0"
diff --git a/pkgs/stream_transform/test/debounce_test.dart b/pkgs/stream_transform/test/debounce_test.dart
index a556b41..a28d2ca 100644
--- a/pkgs/stream_transform/test/debounce_test.dart
+++ b/pkgs/stream_transform/test/debounce_test.dart
@@ -15,6 +15,7 @@
bool isDone;
List errors;
StreamSubscription subscription;
+ Stream transformed;
void setUpStreams(StreamTransformer transformer) {
valuesCanceled = false;
@@ -25,8 +26,8 @@
emittedValues = [];
errors = [];
isDone = false;
- subscription = values.stream
- .transform(transformer)
+ transformed = values.stream.transform(transformer);
+ subscription = transformed
.listen(emittedValues.add, onError: errors.add, onDone: () {
isDone = true;
});
@@ -76,6 +77,18 @@
await new Future.delayed(const Duration(milliseconds: 10));
expect(isDone, true);
});
+
+ if (streamType == 'broadcast') {
+ test('multiple listeners all get values', () async {
+ var otherValues = [];
+ transformed.listen(otherValues.add);
+ values.add(1);
+ values.add(2);
+ await new Future.delayed(const Duration(milliseconds: 10));
+ expect(emittedValues, [2]);
+ expect(otherValues, [2]);
+ });
+ }
});
group('debounceBuffer', () {
@@ -104,6 +117,22 @@
[2]
]);
});
+
+ if (streamType == 'broadcast') {
+ test('multiple listeners all get values', () async {
+ var otherValues = [];
+ transformed.listen(otherValues.add);
+ values.add(1);
+ values.add(2);
+ await new Future.delayed(const Duration(milliseconds: 10));
+ expect(emittedValues, [
+ [1, 2]
+ ]);
+ expect(otherValues, [
+ [1, 2]
+ ]);
+ });
+ }
});
});
}
diff --git a/pkgs/stream_transform/test/from_handlers_test.dart b/pkgs/stream_transform/test/from_handlers_test.dart
new file mode 100644
index 0000000..28436bd
--- /dev/null
+++ b/pkgs/stream_transform/test/from_handlers_test.dart
@@ -0,0 +1,165 @@
+import 'dart:async';
+
+import 'package:test/test.dart';
+
+import 'package:stream_transform/src/from_handlers.dart';
+
+void main() {
+ StreamController values;
+ List emittedValues;
+ bool valuesCanceled;
+ bool isDone;
+ List errors;
+ Stream transformed;
+ StreamSubscription subscription;
+
+ void setUpForController(
+ StreamController controller, StreamTransformer transformer) {
+ valuesCanceled = false;
+ values = controller
+ ..onCancel = () {
+ valuesCanceled = true;
+ };
+ emittedValues = [];
+ errors = [];
+ isDone = false;
+ transformed = values.stream.transform(transformer);
+ subscription =
+ transformed.listen(emittedValues.add, onError: errors.add, onDone: () {
+ isDone = true;
+ });
+ }
+
+ group('default from_handlers', () {
+ group('Single subscription stream', () {
+ setUp(() {
+ setUpForController(new StreamController(), fromHandlers());
+ });
+
+ test('has correct stream type', () {
+ expect(transformed.isBroadcast, false);
+ });
+
+ test('forwards values', () async {
+ values.add(1);
+ values.add(2);
+ await new Future(() {});
+ expect(emittedValues, [1, 2]);
+ });
+
+ test('forwards errors', () async {
+ values.addError('error');
+ await new Future(() {});
+ expect(errors, ['error']);
+ });
+
+ test('forwards done', () async {
+ await values.close();
+ await new Future(() {});
+ expect(isDone, true);
+ });
+
+ test('forwards cancel', () async {
+ await subscription.cancel();
+ expect(valuesCanceled, true);
+ });
+ });
+
+ group('broadcast stream with muliple listeners', () {
+ List emittedValues2;
+ List errors2;
+ bool isDone2;
+ StreamSubscription subscription2;
+
+ setUp(() {
+ setUpForController(new StreamController.broadcast(), fromHandlers());
+ emittedValues2 = [];
+ errors2 = [];
+ isDone2 = false;
+ subscription2 = transformed
+ .listen(emittedValues2.add, onError: errors2.add, onDone: () {
+ isDone2 = true;
+ });
+ });
+
+ test('has correct stream type', () {
+ expect(transformed.isBroadcast, true);
+ });
+
+ test('forwards values', () async {
+ values.add(1);
+ values.add(2);
+ await new Future(() {});
+ expect(emittedValues, [1, 2]);
+ expect(emittedValues2, [1, 2]);
+ });
+
+ test('forwards errors', () async {
+ values.addError('error');
+ await new Future(() {});
+ expect(errors, ['error']);
+ expect(errors2, ['error']);
+ });
+
+ test('forwards done', () async {
+ await values.close();
+ await new Future(() {});
+ expect(isDone, true);
+ expect(isDone2, true);
+ });
+
+ test('forwards cancel', () async {
+ await subscription.cancel();
+ expect(valuesCanceled, false);
+ await subscription2.cancel();
+ expect(valuesCanceled, true);
+ });
+ });
+ });
+
+ group('custom handlers', () {
+ group('single subscription', () {
+ setUp(() async {
+ setUpForController(new StreamController(),
+ fromHandlers(handleData: (value, sink) {
+ sink.add(value + 1);
+ }));
+ });
+ test('uses transform from handleData', () async {
+ values.add(1);
+ values.add(2);
+ await new Future(() {});
+ expect(emittedValues, [2, 3]);
+ });
+ });
+
+ group('broadcast stream with multiple listeners', () {
+ int dataCallCount;
+ int doneCallCount;
+
+ setUp(() async {
+ dataCallCount = 0;
+ doneCallCount = 0;
+ setUpForController(
+ new StreamController.broadcast(),
+ fromHandlers(handleData: (value, sink) {
+ dataCallCount++;
+ }, handleDone: (sink) {
+ doneCallCount++;
+ }));
+ transformed.listen((_) {});
+ });
+
+ test('handles data once', () async {
+ values.add(1);
+ await new Future(() {});
+ expect(dataCallCount, 1);
+ });
+
+ test('handles done once', () async {
+ await values.close();
+ expect(doneCallCount, 1);
+ });
+ });
+ });
+}