Implement tap (dart-lang/stream_transform#9)

This is like the Rx function `do`, but that is a reserved word in Dart.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index c820833..e33fbb1 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 0.0.3
+
+- Add `tap`: React to values as they pass without being a subscriber on a stream
+
 ## 0.0.2
 
 - Add `concat`: Appends streams in series
diff --git a/pkgs/stream_transform/README.md b/pkgs/stream_transform/README.md
index b5ffaa1..ea2685a 100644
--- a/pkgs/stream_transform/README.md
+++ b/pkgs/stream_transform/README.md
@@ -18,3 +18,8 @@
 # merge, mergeAll
 
 Interleaves events from multiple streams into a single stream.
+
+# tap
+
+Taps into a single-subscriber stream to react to values as they pass, without
+being a real subscriber.
diff --git a/pkgs/stream_transform/lib/src/tap.dart b/pkgs/stream_transform/lib/src/tap.dart
new file mode 100644
index 0000000..2ec8cf8
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/tap.dart
@@ -0,0 +1,31 @@
+// Copyright (c) 2017, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
+
+/// Taps into a Stream to allow additional handling on a single-subscriber
+/// stream without first wrapping as a broadcast stream.
+///
+/// The callback will be called with every value from the stream before it is
+/// forwarded to listeners on the stream. Errors from the callbacks are ignored.
+///
+/// The tapped stream may not emit any values until the result stream has a
+/// listener, and may be canceled only by the listener.
+StreamTransformer<T, T> tap<T>(void fn(T value),
+        {void onError(error, stackTrace), void onDone()}) =>
+    new StreamTransformer.fromHandlers(handleData: (value, sink) {
+      try {
+        fn(value);
+      } catch (_) {/*Ignore*/}
+      sink.add(value);
+    }, handleError: (error, stackTrace, sink) {
+      try {
+        onError?.call(error, stackTrace);
+      } catch (_) {/*Ignore*/}
+      sink.addError(error, stackTrace);
+    }, handleDone: (sink) {
+      try {
+        onDone?.call();
+      } catch (_) {/*Ignore*/}
+      sink.close();
+    });
diff --git a/pkgs/stream_transform/lib/stream_transform.dart b/pkgs/stream_transform/lib/stream_transform.dart
index de66dbe..62d6df7 100644
--- a/pkgs/stream_transform/lib/stream_transform.dart
+++ b/pkgs/stream_transform/lib/stream_transform.dart
@@ -6,3 +6,4 @@
 export 'src/concat.dart';
 export 'src/debounce.dart';
 export 'src/merge.dart';
+export 'src/tap.dart';
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index 15c2bbe..75415ef 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -2,7 +2,7 @@
 description: A collection of utilities to transform and manipulate streams.
 author: Dart Team <misc@dartlang.org>
 homepage: https://www.github.com/dart-lang/stream_transform
-version: 0.0.2
+version: 0.0.3-dev
 
 environment:
   sdk: ">=1.22.0 <2.0.0"
diff --git a/pkgs/stream_transform/test/tap_test.dart b/pkgs/stream_transform/test/tap_test.dart
new file mode 100644
index 0000000..8aecee6
--- /dev/null
+++ b/pkgs/stream_transform/test/tap_test.dart
@@ -0,0 +1,56 @@
+import 'package:test/test.dart';
+import 'dart:async';
+
+import 'package:stream_transform/stream_transform.dart';
+
+void main() {
+  test('calls function for values', () async {
+    var valuesSeen = [];
+    var stream = new Stream.fromIterable([1, 2, 3]);
+    await stream.transform(tap(valuesSeen.add)).last;
+    expect(valuesSeen, [1, 2, 3]);
+  });
+
+  test('forwards values', () async {
+    var stream = new Stream.fromIterable([1, 2, 3]);
+    var values = await stream.transform(tap((_) {})).toList();
+    expect(values, [1, 2, 3]);
+  });
+
+  test('calls function for errors', () async {
+    var error;
+    var source = new StreamController();
+    source.stream
+        .transform(tap((_) {}, onError: (e, st) {
+          error = e;
+        }))
+        .listen((_) {}, onError: (_) {});
+    source.addError('error');
+    await new Future(() {});
+    expect(error, 'error');
+  });
+
+  test('forwards errors', () async {
+    var error;
+    var source = new StreamController();
+    source.stream.transform(tap((_) {}, onError: (e, st) {})).listen((_) {},
+        onError: (e) {
+      error = e;
+    });
+    source.addError('error');
+    await new Future(() {});
+    expect(error, 'error');
+  });
+
+  test('calls function on done', () async {
+    var doneCalled = false;
+    var source = new StreamController();
+    source.stream
+        .transform((tap((_) {}, onDone: () {
+          doneCalled = true;
+        })))
+        .listen((_) {});
+    await source.close();
+    expect(doneCalled, true);
+  });
+}