Add throttle, audit, and scan (dart-lang/stream_transform#11)
- Add `scan`: fold which returns intermediate values
- Add `throttle`: block events for a duration after emitting a value
- Add `audit`: emits the last event received after a duration
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index c66484d..9afdbe4 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 0.0.4
+- Add `scan`: fold which returns intermediate values
+- Add `throttle`: block events for a duration after emitting a value
+- Add `audit`: emits the last event received after a duration
+
## 0.0.3
- Add `tap`: React to values as they pass without being a subscriber on a stream
diff --git a/pkgs/stream_transform/README.md b/pkgs/stream_transform/README.md
index 356869e..0c64087 100644
--- a/pkgs/stream_transform/README.md
+++ b/pkgs/stream_transform/README.md
@@ -1,6 +1,11 @@
Contains utility methods to create `StreamTransfomer` instances to manipulate
Streams.
+# audit
+
+Audit waits for a period of time after receiving a value and then only emits
+the most recent value.
+
# buffer
Collects values from a source stream until a `trigger` stream fires and the
@@ -19,6 +24,11 @@
Interleaves events from multiple streams into a single stream.
+# scan
+
+Scan is like fold, but instead of producing a single value it yields each
+intermediate accumulation.
+
# switchMap, switchLatest
Flatten a Stream of Streams into a Stream which forwards values from the most
@@ -28,3 +38,7 @@
Taps into a single-subscriber stream to react to values as they pass, without
being a real subscriber.
+
+# throttle
+
+Blocks events for a duration after an event is successfully emitted.
\ No newline at end of file
diff --git a/pkgs/stream_transform/lib/src/audit.dart b/pkgs/stream_transform/lib/src/audit.dart
new file mode 100644
index 0000000..f4cadff
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/audit.dart
@@ -0,0 +1,33 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
+
+/// Creates a StreamTransformer which only emits once per [duration], at the
+/// end of the period.
+///
+/// Like `throttle`, except it always emits the most recently received event in
+/// a period. Always introduces a delay of at most [duration].
+StreamTransformer<T, T> audit<T>(Duration duration) {
+ Timer timer;
+ bool shouldClose = false;
+ T recentData;
+
+ return new StreamTransformer.fromHandlers(
+ handleData: (T data, EventSink<T> sink) {
+ recentData = data;
+ timer ??= new Timer(duration, () {
+ sink.add(recentData);
+ timer = null;
+ if (shouldClose) {
+ sink.close();
+ }
+ });
+ }, handleDone: (EventSink<T> sink) {
+ if (timer != null) {
+ shouldClose = true;
+ } else {
+ sink.close();
+ }
+ });
+}
diff --git a/pkgs/stream_transform/lib/src/scan.dart b/pkgs/stream_transform/lib/src/scan.dart
new file mode 100644
index 0000000..518d332
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/scan.dart
@@ -0,0 +1,15 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
+
+/// Scan is like fold, but instead of producing a single value it yields
+/// each intermediate accumulation.
+StreamTransformer<S, T> scan<S, T>(
+ T initialValue, T combine(T previousValue, S element)) =>
+ new StreamTransformer<S, T>((stream, cancelOnError) {
+ T accumulated = initialValue;
+ return stream
+ .map((value) => accumulated = combine(accumulated, value))
+ .listen(null, cancelOnError: cancelOnError);
+ });
diff --git a/pkgs/stream_transform/lib/src/throttle.dart b/pkgs/stream_transform/lib/src/throttle.dart
new file mode 100644
index 0000000..030c3fd
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/throttle.dart
@@ -0,0 +1,21 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
+
+/// Creates a StreamTransformer which only emits once per [duration], at the
+/// beginning of the period.
+StreamTransformer<T, T> throttle<T>(Duration duration) {
+ Timer timer;
+
+ return new StreamTransformer.fromHandlers(handleData: (data, sink) {
+ if (timer == null) {
+ sink.add(data);
+ timer = new Timer(duration, () {
+ timer = null;
+ });
+ }
+ }, handleDone: (sink) {
+ sink.close();
+ });
+}
diff --git a/pkgs/stream_transform/lib/stream_transform.dart b/pkgs/stream_transform/lib/stream_transform.dart
index 96555ab..ba49b0b 100644
--- a/pkgs/stream_transform/lib/stream_transform.dart
+++ b/pkgs/stream_transform/lib/stream_transform.dart
@@ -8,3 +8,6 @@
export 'src/merge.dart';
export 'src/switch.dart';
export 'src/tap.dart';
+export 'src/scan.dart';
+export 'src/throttle.dart';
+export 'src/audit.dart';
diff --git a/pkgs/stream_transform/test/audit_test.dart b/pkgs/stream_transform/test/audit_test.dart
new file mode 100644
index 0000000..b4e3151
--- /dev/null
+++ b/pkgs/stream_transform/test/audit_test.dart
@@ -0,0 +1,82 @@
+import 'dart:async';
+import 'package:test/test.dart';
+import 'package:stream_transform/stream_transform.dart';
+
+void main() {
+ var streamTypes = {
+ 'single subscription': () => new StreamController(),
+ 'broadcast': () => new StreamController.broadcast()
+ };
+ for (var streamType in streamTypes.keys) {
+ group('Stream type [$streamType]', () {
+ StreamController values;
+ List emittedValues;
+ bool valuesCanceled;
+ bool isDone;
+ List errors;
+ StreamSubscription subscription;
+
+ void setUpStreams(StreamTransformer transformer) {
+ valuesCanceled = false;
+ values = streamTypes[streamType]()
+ ..onCancel = () {
+ valuesCanceled = true;
+ };
+ emittedValues = [];
+ errors = [];
+ isDone = false;
+ subscription = values.stream
+ .transform(transformer)
+ .listen(emittedValues.add, onError: errors.add, onDone: () {
+ isDone = true;
+ });
+ }
+
+ group('audit', () {
+ setUp(() async {
+ setUpStreams(audit(const Duration(milliseconds: 5)));
+ });
+
+ test('cancels values', () async {
+ await subscription.cancel();
+ expect(valuesCanceled, true);
+ });
+
+ test('swallows values that come faster than duration', () async {
+ values.add(1);
+ values.add(2);
+ await values.close();
+ await new Future.delayed(const Duration(milliseconds: 10));
+ expect(emittedValues, [2]);
+ });
+
+ test('outputs multiple values spaced further than duration', () async {
+ values.add(1);
+ await new Future.delayed(const Duration(milliseconds: 10));
+ values.add(2);
+ await new Future.delayed(const Duration(milliseconds: 10));
+ expect(emittedValues, [1, 2]);
+ });
+
+ test('waits for pending value to close', () async {
+ values.add(1);
+ await new Future.delayed(const Duration(milliseconds: 10));
+ await values.close();
+ await new Future(() {});
+ expect(isDone, true);
+ });
+
+ test('closes output if there are no pending values', () async {
+ values.add(1);
+ await new Future.delayed(const Duration(milliseconds: 10));
+ values.add(2);
+ await new Future(() {});
+ await values.close();
+ expect(isDone, false);
+ await new Future.delayed(const Duration(milliseconds: 10));
+ expect(isDone, true);
+ });
+ });
+ });
+ }
+}
diff --git a/pkgs/stream_transform/test/scan_test.dart b/pkgs/stream_transform/test/scan_test.dart
new file mode 100644
index 0000000..51456f2
--- /dev/null
+++ b/pkgs/stream_transform/test/scan_test.dart
@@ -0,0 +1,17 @@
+import 'dart:async';
+
+import 'package:test/test.dart';
+
+import 'package:stream_transform/stream_transform.dart';
+
+void main() {
+ group('Scan', () {
+ test('produces intermediate values', () async {
+ var source = new Stream.fromIterable([1, 2, 3, 4]);
+ var sum = (int x, int y) => x + y;
+ var result = await source.transform(scan(0, sum)).toList();
+
+ expect(result, [1, 3, 6, 10]);
+ });
+ });
+}
diff --git a/pkgs/stream_transform/test/throttle_test.dart b/pkgs/stream_transform/test/throttle_test.dart
new file mode 100644
index 0000000..888caec
--- /dev/null
+++ b/pkgs/stream_transform/test/throttle_test.dart
@@ -0,0 +1,72 @@
+import 'dart:async';
+import 'package:test/test.dart';
+import 'package:stream_transform/stream_transform.dart';
+
+void main() {
+ var streamTypes = {
+ 'single subscription': () => new StreamController(),
+ 'broadcast': () => new StreamController.broadcast()
+ };
+ for (var streamType in streamTypes.keys) {
+ group('Stream type [$streamType]', () {
+ StreamController values;
+ List emittedValues;
+ bool valuesCanceled;
+ bool isDone;
+ List errors;
+ StreamSubscription subscription;
+
+ void setUpStreams(StreamTransformer transformer) {
+ valuesCanceled = false;
+ values = streamTypes[streamType]()
+ ..onCancel = () {
+ valuesCanceled = true;
+ };
+ emittedValues = [];
+ errors = [];
+ isDone = false;
+ subscription = values.stream
+ .transform(transformer)
+ .listen(emittedValues.add, onError: errors.add, onDone: () {
+ isDone = true;
+ });
+ }
+
+ group('throttle', () {
+ setUp(() async {
+ setUpStreams(throttle(const Duration(milliseconds: 5)));
+ });
+
+ test('cancels values', () async {
+ await subscription.cancel();
+ expect(valuesCanceled, true);
+ });
+
+ test('swallows values that come faster than duration', () async {
+ values.add(1);
+ values.add(2);
+ await values.close();
+ await new Future.delayed(const Duration(milliseconds: 10));
+ expect(emittedValues, [1]);
+ });
+
+ test('outputs multiple values spaced further than duration', () async {
+ values.add(1);
+ await new Future.delayed(const Duration(milliseconds: 10));
+ values.add(2);
+ await new Future.delayed(const Duration(milliseconds: 10));
+ expect(emittedValues, [1, 2]);
+ });
+
+ test('closes output immediately', () async {
+ values.add(1);
+ await new Future.delayed(const Duration(milliseconds: 10));
+ values.add(2);
+ await new Future(() {});
+ await values.close();
+ expect(isDone, true);
+ });
+ });
+ });
+ }
+}