Add whereType transformer (dart-lang/stream_transform#60)

This needs to be implemented as it's own class instead of using
one of the helpers like `fromHandlers` because it has unique generic
type requirements. We want to avoid requring the call site to re-specify
the generic type of the input stream. The return type needs to be a
`StreamTransformer<Null, R>` to satisfy static checking.
However the argument to `bind` at runtime will never be a `Stream<Null>`
so we need to define it explicitly to widen the allowed argument.
diff --git a/pkgs/stream_transform/CHANGELOG.md b/pkgs/stream_transform/CHANGELOG.md
index 515f0ed..e26b1f1 100644
--- a/pkgs/stream_transform/CHANGELOG.md
+++ b/pkgs/stream_transform/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 0.0.15
+
+- Add `whereType`.
+
 ## 0.0.14+1
 
 - Allow using non-dev Dart 2 SDK.
diff --git a/pkgs/stream_transform/README.md b/pkgs/stream_transform/README.md
index 5e54457..89ce15d 100644
--- a/pkgs/stream_transform/README.md
+++ b/pkgs/stream_transform/README.md
@@ -63,3 +63,7 @@
 # throttle
 
 Blocks events for a duration after an event is successfully emitted.
+
+# whereType
+
+Like `Iterable.whereType` for a stream.
diff --git a/pkgs/stream_transform/lib/src/where_type.dart b/pkgs/stream_transform/lib/src/where_type.dart
new file mode 100644
index 0000000..96eb394
--- /dev/null
+++ b/pkgs/stream_transform/lib/src/where_type.dart
@@ -0,0 +1,54 @@
+// Copyright (c) 2018, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+/// Emits only the events which have type [R].
+///
+/// If the source stream is a broadcast stream the result will be as well.
+///
+/// Errors from the source stream are forwarded directly to the result stream.
+///
+/// The static type of the returned transformer takes `Null` so that it can
+/// satisfy the subtype requirements for the `stream.transform()` argument on
+/// any source stream. The argument to `bind` has been broadened to take
+/// `Stream<Object>` since it will never be passed a `Stream<Null>` at runtime.
+/// This is safe to use on any source stream.
+///
+/// [R] should be a subtype of the stream's generic type, otherwise nothing of
+/// type [R] could possibly be emitted, however there is no static or runtime
+/// checking that this is the case.
+StreamTransformer<Null, R> whereType<R>() => _WhereType<R>();
+
+class _WhereType<R> extends StreamTransformerBase<Null, R> {
+  @override
+  Stream<R> bind(Stream<Object> source) {
+    var controller = source.isBroadcast
+        ? StreamController<R>.broadcast(sync: true)
+        : StreamController<R>(sync: true);
+
+    StreamSubscription<Object> subscription;
+    controller.onListen = () {
+      if (subscription != null) return;
+      subscription = source.listen(
+          (value) {
+            if (value is R) controller.add(value);
+          },
+          onError: controller.addError,
+          onDone: () {
+            subscription = null;
+            controller.close();
+          });
+      if (!source.isBroadcast) {
+        controller.onPause = subscription.pause;
+        controller.onResume = subscription.resume;
+      }
+      controller.onCancel = () {
+        subscription?.cancel();
+        subscription = null;
+      };
+    };
+    return controller.stream;
+  }
+}
diff --git a/pkgs/stream_transform/lib/stream_transform.dart b/pkgs/stream_transform/lib/stream_transform.dart
index 13b80a4..bddb744 100644
--- a/pkgs/stream_transform/lib/stream_transform.dart
+++ b/pkgs/stream_transform/lib/stream_transform.dart
@@ -19,3 +19,4 @@
 export 'src/take_until.dart';
 export 'src/tap.dart';
 export 'src/throttle.dart';
+export 'src/where_type.dart';
diff --git a/pkgs/stream_transform/pubspec.yaml b/pkgs/stream_transform/pubspec.yaml
index b32a050..a29a467 100644
--- a/pkgs/stream_transform/pubspec.yaml
+++ b/pkgs/stream_transform/pubspec.yaml
@@ -2,7 +2,7 @@
 description: A collection of utilities to transform and manipulate streams.
 author: Dart Team <misc@dartlang.org>
 homepage: https://www.github.com/dart-lang/stream_transform
-version: 0.0.15-dev
+version: 0.0.15
 
 environment:
   sdk: ">=2.1.0 <3.0.0"
diff --git a/pkgs/stream_transform/test/where_type_test.dart b/pkgs/stream_transform/test/where_type_test.dart
new file mode 100644
index 0000000..2c6d7ed
--- /dev/null
+++ b/pkgs/stream_transform/test/where_type_test.dart
@@ -0,0 +1,50 @@
+// Copyright (c) 2018, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:test/test.dart';
+
+import 'package:stream_transform/stream_transform.dart';
+
+void main() {
+  test('forwards only events that match the type', () async {
+    var values = Stream.fromIterable([1, 'a', 2, 'b']);
+    var filtered = values.transform(whereType<String>());
+    expect(await filtered.toList(), ['a', 'b']);
+  });
+
+  test('can result in empty stream', () async {
+    var values = Stream.fromIterable([1, 2, 3, 4]);
+    var filtered = values.transform(whereType<String>());
+    expect(await filtered.isEmpty, true);
+  });
+
+  test('forwards values to multiple listeners', () async {
+    var values = StreamController.broadcast();
+    var filtered = values.stream.transform(whereType<String>());
+    var firstValues = [];
+    var secondValues = [];
+    filtered..listen(firstValues.add)..listen(secondValues.add);
+    values..add(1)..add('a')..add(2)..add('b');
+    await Future(() {});
+    expect(firstValues, ['a', 'b']);
+    expect(secondValues, ['a', 'b']);
+  });
+
+  test('closes streams with multiple listeners', () async {
+    var values = StreamController.broadcast();
+    var filtered = values.stream.transform(whereType<String>());
+    var firstDone = false;
+    var secondDone = false;
+    filtered
+      ..listen(null, onDone: () => firstDone = true)
+      ..listen(null, onDone: () => secondDone = true);
+    values.add(1);
+    values.add('a');
+    await values.close();
+    expect(firstDone, true);
+    expect(secondDone, true);
+  });
+}