Add throttle, audit, and scan (dart-lang/stream_transform#11)

- Add `scan`: fold which returns intermediate values
- Add `throttle`: block events for a duration after emitting a value
- Add `audit`: emits the last event received after a duration
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index c66484d..9afdbe4 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 0.0.4
+- Add `scan`: fold which returns intermediate values
+- Add `throttle`: block events for a duration after emitting a value
+- Add `audit`: emits the last event received after a duration
+
 ## 0.0.3
 
 - Add `tap`: React to values as they pass without being a subscriber on a stream
diff --git a/pkgs/stream_transform/README.md b/pkgs/stream_transform/README.md
index 356869e..0c64087 100644
--- a/pkgs/stream_transform/README.md
+++ b/pkgs/stream_transform/README.md
@@ -1,6 +1,11 @@
 Contains utility methods to create `StreamTransfomer` instances to manipulate
 Streams.
 
+# audit
+
+Audit waits for a period of time after receiving a value and then only emits
+the most recent value.
+
 # buffer
 
 Collects values from a source stream until a `trigger` stream fires and the
@@ -19,6 +24,11 @@
 
 Interleaves events from multiple streams into a single stream.
 
+# scan
+
+Scan is like fold, but instead of producing a single value it yields each 
+intermediate accumulation.
+
 # switchMap, switchLatest
 
 Flatten a Stream of Streams into a Stream which forwards values from the most
@@ -28,3 +38,7 @@
 
 Taps into a single-subscriber stream to react to values as they pass, without
 being a real subscriber.
+
+# throttle
+
+Blocks events for a duration after an event is successfully emitted.
\ No newline at end of file
diff --git a/pkgs/stream_transform/lib/src/audit.dart b/pkgs/stream_transform/lib/src/audit.dart
new file mode 100644
index 0000000..f4cadff
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/audit.dart
@@ -0,0 +1,33 @@
+// Copyright (c) 2017, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
+
+/// Creates a StreamTransformer which only emits once per [duration], at the
+/// end of the period.
+///
+/// Like `throttle`, except it always emits the most recently received event in
+/// a period.  Always introduces a delay of at most [duration].
+StreamTransformer<T, T> audit<T>(Duration duration) {
+  Timer timer;
+  bool shouldClose = false;
+  T recentData;
+
+  return new StreamTransformer.fromHandlers(
+      handleData: (T data, EventSink<T> sink) {
+    recentData = data;
+    timer ??= new Timer(duration, () {
+      sink.add(recentData);
+      timer = null;
+      if (shouldClose) {
+        sink.close();
+      }
+    });
+  }, handleDone: (EventSink<T> sink) {
+    if (timer != null) {
+      shouldClose = true;
+    } else {
+      sink.close();
+    }
+  });
+}
diff --git a/pkgs/stream_transform/lib/src/scan.dart b/pkgs/stream_transform/lib/src/scan.dart
new file mode 100644
index 0000000..518d332
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/scan.dart
@@ -0,0 +1,15 @@
+// Copyright (c) 2017, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
+
+/// Scan is like fold, but instead of producing a single value it yields
+/// each intermediate accumulation.
+StreamTransformer<S, T> scan<S, T>(
+        T initialValue, T combine(T previousValue, S element)) =>
+    new StreamTransformer<S, T>((stream, cancelOnError) {
+      T accumulated = initialValue;
+      return stream
+          .map((value) => accumulated = combine(accumulated, value))
+          .listen(null, cancelOnError: cancelOnError);
+    });
diff --git a/pkgs/stream_transform/lib/src/throttle.dart b/pkgs/stream_transform/lib/src/throttle.dart
new file mode 100644
index 0000000..030c3fd
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/throttle.dart
@@ -0,0 +1,21 @@
+// Copyright (c) 2017, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
+
+/// Creates a StreamTransformer which only emits once per [duration], at the
+/// beginning of the period.
+StreamTransformer<T, T> throttle<T>(Duration duration) {
+  Timer timer;
+
+  return new StreamTransformer.fromHandlers(handleData: (data, sink) {
+    if (timer == null) {
+      sink.add(data);
+      timer = new Timer(duration, () {
+        timer = null;
+      });
+    }
+  }, handleDone: (sink) {
+    sink.close();
+  });
+}
diff --git a/pkgs/stream_transform/lib/stream_transform.dart b/pkgs/stream_transform/lib/stream_transform.dart
index 96555ab..ba49b0b 100644
--- a/pkgs/stream_transform/lib/stream_transform.dart
+++ b/pkgs/stream_transform/lib/stream_transform.dart
@@ -8,3 +8,6 @@
 export 'src/merge.dart';
 export 'src/switch.dart';
 export 'src/tap.dart';
+export 'src/scan.dart';
+export 'src/throttle.dart';
+export 'src/audit.dart';
diff --git a/pkgs/stream_transform/test/audit_test.dart b/pkgs/stream_transform/test/audit_test.dart
new file mode 100644
index 0000000..b4e3151
--- /dev/null
+++ b/pkgs/stream_transform/test/audit_test.dart
@@ -0,0 +1,82 @@
+import 'dart:async';
+import 'package:test/test.dart';
+import 'package:stream_transform/stream_transform.dart';
+
+void main() {
+  var streamTypes = {
+    'single subscription': () => new StreamController(),
+    'broadcast': () => new StreamController.broadcast()
+  };
+  for (var streamType in streamTypes.keys) {
+    group('Stream type [$streamType]', () {
+      StreamController values;
+      List emittedValues;
+      bool valuesCanceled;
+      bool isDone;
+      List errors;
+      StreamSubscription subscription;
+
+      void setUpStreams(StreamTransformer transformer) {
+        valuesCanceled = false;
+        values = streamTypes[streamType]()
+          ..onCancel = () {
+            valuesCanceled = true;
+          };
+        emittedValues = [];
+        errors = [];
+        isDone = false;
+        subscription = values.stream
+            .transform(transformer)
+            .listen(emittedValues.add, onError: errors.add, onDone: () {
+          isDone = true;
+        });
+      }
+
+      group('audit', () {
+        setUp(() async {
+          setUpStreams(audit(const Duration(milliseconds: 5)));
+        });
+
+        test('cancels values', () async {
+          await subscription.cancel();
+          expect(valuesCanceled, true);
+        });
+
+        test('swallows values that come faster than duration', () async {
+          values.add(1);
+          values.add(2);
+          await values.close();
+          await new Future.delayed(const Duration(milliseconds: 10));
+          expect(emittedValues, [2]);
+        });
+
+        test('outputs multiple values spaced further than duration', () async {
+          values.add(1);
+          await new Future.delayed(const Duration(milliseconds: 10));
+          values.add(2);
+          await new Future.delayed(const Duration(milliseconds: 10));
+          expect(emittedValues, [1, 2]);
+        });
+
+        test('waits for pending value to close', () async {
+          values.add(1);
+          await new Future.delayed(const Duration(milliseconds: 10));
+          await values.close();
+          await new Future(() {});
+          expect(isDone, true);
+        });
+
+        test('closes output if there are no pending values', () async {
+          values.add(1);
+          await new Future.delayed(const Duration(milliseconds: 10));
+          values.add(2);
+          await new Future(() {});
+          await values.close();
+          expect(isDone, false);
+          await new Future.delayed(const Duration(milliseconds: 10));
+          expect(isDone, true);
+        });
+      });
+    });
+  }
+}
diff --git a/pkgs/stream_transform/test/scan_test.dart b/pkgs/stream_transform/test/scan_test.dart
new file mode 100644
index 0000000..51456f2
--- /dev/null
+++ b/pkgs/stream_transform/test/scan_test.dart
@@ -0,0 +1,17 @@
+import 'dart:async';
+
+import 'package:test/test.dart';
+
+import 'package:stream_transform/stream_transform.dart';
+
+void main() {
+  group('Scan', () {
+    test('produces intermediate values', () async {
+      var source = new Stream.fromIterable([1, 2, 3, 4]);
+      var sum = (int x, int y) => x + y;
+      var result = await source.transform(scan(0, sum)).toList();
+
+      expect(result, [1, 3, 6, 10]);
+    });
+  });
+}
diff --git a/pkgs/stream_transform/test/throttle_test.dart b/pkgs/stream_transform/test/throttle_test.dart
new file mode 100644
index 0000000..888caec
--- /dev/null
+++ b/pkgs/stream_transform/test/throttle_test.dart
@@ -0,0 +1,72 @@
+import 'dart:async';
+import 'package:test/test.dart';
+import 'package:stream_transform/stream_transform.dart';
+
+void main() {
+  var streamTypes = {
+    'single subscription': () => new StreamController(),
+    'broadcast': () => new StreamController.broadcast()
+  };
+  for (var streamType in streamTypes.keys) {
+    group('Stream type [$streamType]', () {
+      StreamController values;
+      List emittedValues;
+      bool valuesCanceled;
+      bool isDone;
+      List errors;
+      StreamSubscription subscription;
+
+      void setUpStreams(StreamTransformer transformer) {
+        valuesCanceled = false;
+        values = streamTypes[streamType]()
+          ..onCancel = () {
+            valuesCanceled = true;
+          };
+        emittedValues = [];
+        errors = [];
+        isDone = false;
+        subscription = values.stream
+            .transform(transformer)
+            .listen(emittedValues.add, onError: errors.add, onDone: () {
+          isDone = true;
+        });
+      }
+
+      group('throttle', () {
+        setUp(() async {
+          setUpStreams(throttle(const Duration(milliseconds: 5)));
+        });
+
+        test('cancels values', () async {
+          await subscription.cancel();
+          expect(valuesCanceled, true);
+        });
+
+        test('swallows values that come faster than duration', () async {
+          values.add(1);
+          values.add(2);
+          await values.close();
+          await new Future.delayed(const Duration(milliseconds: 10));
+          expect(emittedValues, [1]);
+        });
+
+        test('outputs multiple values spaced further than duration', () async {
+          values.add(1);
+          await new Future.delayed(const Duration(milliseconds: 10));
+          values.add(2);
+          await new Future.delayed(const Duration(milliseconds: 10));
+          expect(emittedValues, [1, 2]);
+        });
+
+        test('closes output immediately', () async {
+          values.add(1);
+          await new Future.delayed(const Duration(milliseconds: 10));
+          values.add(2);
+          await new Future(() {});
+          await values.close();
+          expect(isDone, true);
+        });
+      });
+    });
+  }
+}