Add typed wrapper functions to delegate classes.

These mirror the methods in the the collection package, and serve a
similar purpose of safely casting generic objects when the user is
confident that the actual object's values are more specific than the
static type.

R=floitsch@google.com, lrn@google.com

Review URL: https://codereview.chromium.org//1870543004 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 803b589..b18365f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,14 +1,23 @@
-## 1.9.1
+## 1.10.0
+
+* Add `DelegatingFuture.typed()`, `DelegatingStreamSubscription.typed()`,
+  `DelegatingStreamConsumer.typed()`, `DelegatingSink.typed()`,
+  `DelegatingEventSink.typed()`, and `DelegatingStreamSink.typed()` static
+  methods. These wrap untyped instances of these classes with the correct type
+  parameter, and assert the types of values as they're accessed.
+
+* Add a `DelegatingStream` class. This is behaviorally identical to `StreamView`
+  from `dart:async`, but it follows this package's naming conventions and
+  provides a `DelegatingStream.typed()` static method.
 
 * Fix all strong mode warnings and add generic method annotations.
 
-* `new StreamQueue()` now takes a `Stream<T>` rather than a `Stream<dynamic>`.
-  Passing a type that wasn't `is`-compatible with `Stream<T>` would already
-  throw an error under some circumstances, so this is not considered a breaking
-  change.
-
-* `new SubscriptionStream()` now takes a `Stream<T>` rather than a
-  `Stream<dynamic>`. Passing a type that wasn't `is`-compatible with `Stream<T>`
+* `new StreamQueue()`, `new SubscriptionStream()`, `new
+  DelegatingStreamSubscription()`, `new DelegatingStreamConsumer()`, `new
+  DelegatingSink()`, `new DelegatingEventSink()`, and `new
+  DelegatingStreamSink()` now take arguments with generic type arguments (for
+  example `Stream<T>`) rather than without (for example `Stream<dynamic>`).
+  Passing a type that wasn't `is`-compatible with the fully-specified generic
   would already throw an error under some circumstances, so this is not
   considered a breaking change.
 
diff --git a/lib/async.dart b/lib/async.dart
index 0d3e051..7055341 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -7,6 +7,7 @@
 export "src/delegate/event_sink.dart";
 export "src/delegate/future.dart";
 export "src/delegate/sink.dart";
+export "src/delegate/stream.dart";
 export "src/delegate/stream_consumer.dart";
 export "src/delegate/stream_sink.dart";
 export "src/delegate/stream_subscription.dart";
diff --git a/lib/src/delegate/event_sink.dart b/lib/src/delegate/event_sink.dart
index 5a525df..14501b2 100644
--- a/lib/src/delegate/event_sink.dart
+++ b/lib/src/delegate/event_sink.dart
@@ -12,7 +12,18 @@
   final EventSink _sink;
 
   /// Create a delegating sink forwarding calls to [sink].
-  DelegatingEventSink(EventSink sink) : _sink = sink;
+  DelegatingEventSink(EventSink<T> sink) : _sink = sink;
+
+  DelegatingEventSink._(this._sink);
+
+  /// Creates a wrapper that coerces the type of [sink].
+  ///
+  /// Unlike [new DelegatingEventSink], this only requires its argument to be an
+  /// instance of `EventSink`, not `EventSink<T>`. This means that calls to
+  /// [add] may throw a [CastError] if the argument type doesn't match the
+  /// reified type of [sink].
+  static EventSink/*<T>*/ typed/*<T>*/(EventSink sink) =>
+      sink is EventSink/*<T>*/ ? sink : new DelegatingEventSink._(sink);
 
   void add(T data) {
     _sink.add(data);
diff --git a/lib/src/delegate/future.dart b/lib/src/delegate/future.dart
index bfc075b..3f7ea72 100644
--- a/lib/src/delegate/future.dart
+++ b/lib/src/delegate/future.dart
@@ -4,6 +4,8 @@
 
 import 'dart:async';
 
+import '../typed/future.dart';
+
 /// A wrapper that forwards calls to a [Future].
 class DelegatingFuture<T> implements Future<T> {
   /// The wrapped [Future].
@@ -11,6 +13,15 @@
 
   DelegatingFuture(this._future);
 
+  /// Creates a wrapper which throws if [future]'s value isn't an instance of
+  /// `T`.
+  ///
+  /// This soundly converts a [Future] to a `Future<T>`, regardless of its
+  /// original generic type, by asserting that its value is an instance of `T`
+  /// whenever it's provided. If it's not, the future throws a [CastError].
+  static Future/*<T>*/ typed/*<T>*/(Future future) =>
+      future is Future/*<T>*/ ? future : new TypeSafeFuture/*<T>*/(future);
+
   Stream<T> asStream() => _future.asStream();
 
   Future catchError(Function onError, {bool test(Object error)}) =>
@@ -21,6 +32,6 @@
 
   Future<T> whenComplete(action()) => _future.whenComplete(action);
 
-  Future<T> timeout(Duration timeLimit, {void onTimeout()}) =>
+  Future<T> timeout(Duration timeLimit, {onTimeout()}) =>
     _future.timeout(timeLimit, onTimeout: onTimeout);
 }
diff --git a/lib/src/delegate/sink.dart b/lib/src/delegate/sink.dart
index cee2937..326c15b 100644
--- a/lib/src/delegate/sink.dart
+++ b/lib/src/delegate/sink.dart
@@ -10,8 +10,18 @@
   final Sink _sink;
 
   /// Create a delegating sink forwarding calls to [sink].
-  DelegatingSink(Sink sink)
-      : _sink = sink;
+  DelegatingSink(Sink<T> sink) : _sink = sink;
+
+  DelegatingSink._(this._sink);
+
+  /// Creates a wrapper that coerces the type of [sink].
+  ///
+  /// Unlike [new DelegatingSink], this only requires its argument to be an
+  /// instance of `Sink`, not `Sink<T>`. This means that calls to [add] may
+  /// throw a [CastError] if the argument type doesn't match the reified type of
+  /// [sink].
+  static Sink/*<T>*/ typed/*<T>*/(Sink sink) =>
+      sink is Sink/*<T>*/ ? sink : new DelegatingSink._(sink);
 
   void add(T data) {
     _sink.add(data);
diff --git a/lib/src/delegate/stream.dart b/lib/src/delegate/stream.dart
new file mode 100644
index 0000000..7562218
--- /dev/null
+++ b/lib/src/delegate/stream.dart
@@ -0,0 +1,28 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import '../typed/stream.dart';
+
+/// Simple delegating wrapper around a [Stream].
+///
+/// Subclasses can override individual methods, or use this to expose only the
+/// [Stream] methods of a subclass.
+///
+/// Note that this is identical to [StreamView] in `dart:async`. It's provided
+/// under this name for consistency with other `Delegating*` classes.
+class DelegatingStream<T> extends StreamView<T> {
+  DelegatingStream(Stream<T> stream) : super(stream);
+
+  /// Creates a wrapper which throws if [stream]'s events aren't instances of
+  /// `T`.
+  ///
+  /// This soundly converts a [Stream] to a `Stream<T>`, regardless of its
+  /// original generic type, by asserting that its events are instances of `T`
+  /// whenever they're provided. If they're not, the stream throws a
+  /// [CastError].
+  static Stream/*<T>*/ typed/*<T>*/(Stream stream) =>
+      stream is Stream/*<T>*/ ? stream : new TypeSafeStream/*<T>*/(stream);
+}
diff --git a/lib/src/delegate/stream_consumer.dart b/lib/src/delegate/stream_consumer.dart
index dcaf0c2..4f495d0 100644
--- a/lib/src/delegate/stream_consumer.dart
+++ b/lib/src/delegate/stream_consumer.dart
@@ -12,8 +12,20 @@
   final StreamConsumer _consumer;
 
   /// Create a delegating consumer forwarding calls to [consumer].
-  DelegatingStreamConsumer(StreamConsumer consumer)
-      : _consumer = consumer;
+  DelegatingStreamConsumer(StreamConsumer<T> consumer) : _consumer = consumer;
+
+  DelegatingStreamConsumer._(this._consumer);
+
+  /// Creates a wrapper that coerces the type of [consumer].
+  ///
+  /// Unlike [new StreamConsumer], this only requires its argument to be an
+  /// instance of `StreamConsumer`, not `StreamConsumer<T>`. This means that
+  /// calls to [addStream] may throw a [CastError] if the argument type doesn't
+  /// match the reified type of [consumer].
+  static StreamConsumer/*<T>*/ typed/*<T>*/(StreamConsumer consumer) =>
+      consumer is StreamConsumer/*<T>*/
+          ? consumer
+          : new DelegatingStreamConsumer._(consumer);
 
   Future addStream(Stream<T> stream) => _consumer.addStream(stream);
 
diff --git a/lib/src/delegate/stream_sink.dart b/lib/src/delegate/stream_sink.dart
index e06afc1..9b52b19 100644
--- a/lib/src/delegate/stream_sink.dart
+++ b/lib/src/delegate/stream_sink.dart
@@ -14,8 +14,18 @@
   Future get done => _sink.done;
 
   /// Create delegating sink forwarding calls to [sink].
-  DelegatingStreamSink(StreamSink sink)
-      : _sink = sink;
+  DelegatingStreamSink(StreamSink<T> sink) : _sink = sink;
+
+  DelegatingStreamSink._(this._sink);
+
+  /// Creates a wrapper that coerces the type of [sink].
+  ///
+  /// Unlike [new StreamSink], this only requires its argument to be an instance
+  /// of `StreamSink`, not `StreamSink<T>`. This means that calls to [add] may
+  /// throw a [CastError] if the argument type doesn't match the reified type of
+  /// [sink].
+  static StreamSink/*<T>*/ typed/*<T>*/(StreamSink sink) =>
+      sink is StreamSink/*<T>*/ ? sink : new DelegatingStreamSink._(sink);
 
   void add(T data) {
     _sink.add(data);
diff --git a/lib/src/delegate/stream_subscription.dart b/lib/src/delegate/stream_subscription.dart
index e7153b2..b86e91f 100644
--- a/lib/src/delegate/stream_subscription.dart
+++ b/lib/src/delegate/stream_subscription.dart
@@ -4,6 +4,8 @@
 
 import 'dart:async';
 
+import '../typed/stream_subscription.dart';
+
 /// Simple delegating wrapper around a [StreamSubscription].
 ///
 /// Subclasses can override individual methods.
@@ -11,9 +13,22 @@
   final StreamSubscription _source;
 
   /// Create delegating subscription forwarding calls to [sourceSubscription].
-  DelegatingStreamSubscription(StreamSubscription sourceSubscription)
+  DelegatingStreamSubscription(StreamSubscription<T> sourceSubscription)
       : _source = sourceSubscription;
 
+  /// Creates a wrapper which throws if [subscription]'s events aren't instances
+  /// of `T`.
+  ///
+  /// This soundly converts a [StreamSubscription] to a `StreamSubscription<T>`,
+  /// regardless of its original generic type, by asserting that its events are
+  /// instances of `T` whenever they're provided. If they're not, the
+  /// subscription throws a [CastError].
+  static StreamSubscription/*<T>*/ typed/*<T>*/(
+          StreamSubscription subscription) =>
+      subscription is StreamSubscription/*<T>*/
+          ? subscription
+          : new TypeSafeStreamSubscription/*<T>*/(subscription);
+
   void onData(void handleData(T data)) {
     _source.onData(handleData);
   }
diff --git a/lib/src/lazy_stream.dart b/lib/src/lazy_stream.dart
index 3ba4953..147fa8b 100644
--- a/lib/src/lazy_stream.dart
+++ b/lib/src/lazy_stream.dart
@@ -5,6 +5,7 @@
 import "dart:async";
 
 import "stream_completer.dart";
+import "delegate/stream.dart";
 
 /// A [Stream] wrapper that forwards to another [Stream] that's initialized
 /// lazily.
@@ -39,9 +40,14 @@
     _callback = null;
     var result = callback();
 
-    Stream stream = result is Future
-        ? StreamCompleter.fromFuture(result)
-        : result;
+    Stream<T> stream;
+    if (result is Future) {
+      stream = StreamCompleter.fromFuture(result.then((stream) {
+        return DelegatingStream.typed/*<T>*/(stream as Stream);
+      }));
+    } else {
+      stream = DelegatingStream.typed/*<T>*/(result as Stream);
+    }
 
     return stream.listen(onData,
         onError: onError, onDone: onDone, cancelOnError: cancelOnError);
diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart
index c448620..50ca81b 100644
--- a/lib/src/subscription_stream.dart
+++ b/lib/src/subscription_stream.dart
@@ -68,7 +68,7 @@
 /// source subscription on the first error.
 class _CancelOnErrorSubscriptionWrapper<T>
     extends DelegatingStreamSubscription<T> {
-  _CancelOnErrorSubscriptionWrapper(StreamSubscription subscription)
+  _CancelOnErrorSubscriptionWrapper(StreamSubscription<T> subscription)
       : super(subscription);
 
   void onError(Function handleError) {
diff --git a/lib/src/typed/future.dart b/lib/src/typed/future.dart
new file mode 100644
index 0000000..a269593
--- /dev/null
+++ b/lib/src/typed/future.dart
@@ -0,0 +1,25 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+class TypeSafeFuture<T> implements Future<T> {
+  final Future _future;
+
+  TypeSafeFuture(this._future);
+
+  Stream<T> asStream() => _future.then((value) => value as T).asStream();
+
+  Future catchError(Function onError, {bool test(Object error)}) =>
+      _future.catchError(onError, test: test);
+
+  Future/*<S>*/ then/*<S>*/(/*=S*/ onValue(T value), {Function onError}) =>
+      _future.then((value) => onValue(value as T), onError: onError);
+
+  Future<T> whenComplete(action()) =>
+      new TypeSafeFuture<T>(_future.whenComplete(action));
+
+  Future<T> timeout(Duration timeLimit, {onTimeout()}) =>
+      new TypeSafeFuture<T>(_future.timeout(timeLimit, onTimeout: onTimeout));
+}
diff --git a/lib/src/typed/stream.dart b/lib/src/typed/stream.dart
new file mode 100644
index 0000000..3db9f69
--- /dev/null
+++ b/lib/src/typed/stream.dart
@@ -0,0 +1,136 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:collection/collection.dart';
+
+import '../utils.dart';
+import 'stream_subscription.dart';
+
+class TypeSafeStream<T> implements Stream<T> {
+  final Stream _stream;
+
+  Future<T> get first async => (await _stream.first) as T;
+  Future<T> get last async => (await _stream.last) as T;
+  Future<T> get single async => (await _stream.single) as T;
+
+  bool get isBroadcast => _stream.isBroadcast;
+  Future<bool> get isEmpty => _stream.isEmpty;
+  Future<int> get length => _stream.length;
+
+  TypeSafeStream(this._stream);
+
+  Stream<T> asBroadcastStream(
+      {void onListen(StreamSubscription<T> subscription),
+      void onCancel(StreamSubscription<T> subscription)}) {
+    return new TypeSafeStream<T>(_stream.asBroadcastStream(
+        onListen: onListen == null
+            ? null
+            : (subscription) =>
+                  onListen(new TypeSafeStreamSubscription<T>(subscription)),
+        onCancel: onCancel == null
+            ? null
+            : (subscription) =>
+                  onCancel(new TypeSafeStreamSubscription<T>(subscription))));
+  }
+
+  // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed.
+  Stream asyncExpand(Stream convert(T event)) =>
+      _stream.asyncExpand(_validateType(convert));
+
+  // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed.
+  Stream asyncMap(convert(T event)) => _stream.asyncMap(_validateType(convert));
+
+  Stream<T> distinct([bool equals(T previous, T next)]) =>
+      new TypeSafeStream<T>(_stream.distinct(equals == null
+          ? null
+          : (previous, next) => equals(previous as T, next as T)));
+
+  // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed.
+  Future drain([futureValue]) => _stream.drain(futureValue);
+
+  Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) =>
+      _stream.expand(_validateType(convert));
+
+  Future firstWhere(bool test(T element), {Object defaultValue()}) =>
+      _stream.firstWhere(_validateType(test), defaultValue: defaultValue);
+
+  Future lastWhere(bool test(T element), {Object defaultValue()}) =>
+      _stream.lastWhere(_validateType(test), defaultValue: defaultValue);
+
+  Future<T> singleWhere(bool test(T element)) async =>
+      (await _stream.singleWhere(_validateType(test))) as T;
+
+  Future/*<S>*/ fold/*<S>*/(/*=S*/ initialValue,
+          /*=S*/ combine(/*=S*/ previous, T element)) =>
+      _stream.fold(initialValue,
+          (previous, element) => combine(previous, element as T));
+
+  Future forEach(void action(T element)) =>
+      _stream.forEach(_validateType(action));
+
+  Stream<T> handleError(Function onError, {bool test(error)}) =>
+      new TypeSafeStream<T>(_stream.handleError(onError, test: test));
+
+  StreamSubscription<T> listen(void onData(T value),
+          {Function onError, void onDone(), bool cancelOnError}) =>
+      new TypeSafeStreamSubscription<T>(_stream.listen(_validateType(onData),
+          onError: onError, onDone: onDone, cancelOnError: cancelOnError));
+
+  Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) =>
+      _stream.map(_validateType(convert));
+
+  // Don't forward to `_stream.pipe` because we want the consumer to see the
+  // type-asserted stream.
+  Future pipe(StreamConsumer<T> consumer) =>
+      consumer.addStream(this).then((_) => consumer.close());
+
+  Future<T> reduce(T combine(T previous, T element)) async {
+    var result = await _stream.reduce(
+        (previous, element) => combine(previous as T, element as T));
+    return result as T;
+  }
+
+  Stream<T> skipWhile(bool test(T element)) =>
+      new TypeSafeStream<T>(_stream.skipWhile(_validateType(test)));
+
+  Stream<T> takeWhile(bool test(T element)) =>
+      new TypeSafeStream<T>(_stream.takeWhile(_validateType(test)));
+
+  Stream timeout(Duration timeLimit, {void onTimeout(EventSink sink)}) =>
+      _stream.timeout(timeLimit, onTimeout: onTimeout);
+
+  Future<List<T>> toList() async =>
+      DelegatingList.typed/*<T>*/(await _stream.toList());
+
+  Future<Set<T>> toSet() async =>
+      DelegatingSet.typed/*<T>*/(await _stream.toSet());
+
+  // Don't forward to `_stream.transform` because we want the transformer to see
+  // the type-asserted stream.
+  Stream/*<S>*/ transform/*<S>*/(
+          StreamTransformer<T, dynamic/*=S*/> transformer) =>
+      transformer.bind(this);
+
+  Stream<T> where(bool test(T element)) =>
+      new TypeSafeStream<T>(_stream.where(_validateType(test)));
+
+  Future<bool> every(bool test(T element)) =>
+      _stream.every(_validateType(test));
+
+  Future<bool> any(bool test(T element)) => _stream.any(_validateType(test));
+  Stream<T> skip(int count) => new TypeSafeStream<T>(_stream.skip(count));
+  Stream<T> take(int count) => new TypeSafeStream<T>(_stream.take(count));
+  Future<T> elementAt(int index) async => (await _stream.elementAt(index)) as T;
+  Future<bool> contains(Object needle) => _stream.contains(needle);
+  Future<String> join([String separator = ""]) => _stream.join(separator);
+  String toString() => _stream.toString();
+
+  /// Returns a version of [function] that asserts that its argument is an
+  /// instance of `T`.
+  UnaryFunction/*<dynamic, S>*/ _validateType/*<S>*/(
+          /*=S*/ function(T value)) =>
+      function == null ? null : (value) => function(value as T);
+}
diff --git a/lib/src/typed/stream_subscription.dart b/lib/src/typed/stream_subscription.dart
new file mode 100644
index 0000000..800a4ba
--- /dev/null
+++ b/lib/src/typed/stream_subscription.dart
@@ -0,0 +1,36 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+class TypeSafeStreamSubscription<T> implements StreamSubscription<T> {
+  final StreamSubscription _subscription;
+
+  bool get isPaused => _subscription.isPaused;
+
+  TypeSafeStreamSubscription(this._subscription);
+
+  void onData(void handleData(T data)) {
+    _subscription.onData((data) => handleData(data as T));
+  }
+
+  void onError(Function handleError) {
+    _subscription.onError(handleError);
+  }
+
+  void onDone(void handleDone()) {
+    _subscription.onDone(handleDone);
+  }
+
+  void pause([Future resumeFuture]) {
+    _subscription.pause(resumeFuture);
+  }
+
+  void resume() {
+    _subscription.resume();
+  }
+
+  Future cancel() => _subscription.cancel();
+  Future asFuture([futureValue]) => _subscription.asFuture(futureValue);
+}
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
new file mode 100644
index 0000000..0066003
--- /dev/null
+++ b/lib/src/utils.dart
@@ -0,0 +1,7 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/// A generic typedef for a function that takes one type and returns another.
+typedef F UnaryFunction<E, F>(E argument);
+
diff --git a/pubspec.yaml b/pubspec.yaml
index 9e64244..0859e29 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,8 +1,10 @@
 name: async
-version: 1.9.1-dev
+version: 1.10.0
 author: Dart Team <misc@dartlang.org>
 description: Utility functions and classes related to the 'dart:async' library.
 homepage: https://www.github.com/dart-lang/async
+dependencies:
+  collection: "^1.5.0"
 dev_dependencies:
   fake_async: "^0.1.2"
   stack_trace: "^1.0.0"
diff --git a/test/typed_wrapper/future_test.dart b/test/typed_wrapper/future_test.dart
new file mode 100644
index 0000000..6601ac1
--- /dev/null
+++ b/test/typed_wrapper/future_test.dart
@@ -0,0 +1,117 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import "package:async/async.dart";
+import "package:async/src/typed/future.dart";
+import "package:test/test.dart";
+
+import '../utils.dart';
+
+void main() {
+  group("with valid types, forwards", () {
+    var wrapper;
+    var errorWrapper;
+    setUp(() {
+      wrapper = new TypeSafeFuture<int>(new Future<Object>.value(12));
+
+      var error = new Future<Object>.error("oh no");
+      error.catchError((_) {}); // Don't let the error cause the test to fail.
+      errorWrapper = new TypeSafeFuture<int>(error);
+    });
+
+    test("asStream()", () {
+      expect(wrapper.asStream().toList(), completion(equals([12])));
+      expect(errorWrapper.asStream().first, throwsA("oh no"));
+    });
+
+    test("catchError()", () {
+      expect(
+          wrapper.catchError(expectAsync((_) {}, count: 0),
+              test: expectAsync((_) {}, count: 0)),
+          completion(equals(12)));
+
+      expect(errorWrapper.catchError(expectAsync((error) {
+        expect(error, equals("oh no"));
+        return "value";
+      }), test: expectAsync((error) {
+        expect(error, equals("oh no"));
+        return true;
+      })), completion(equals("value")));
+    });
+
+    test("then()", () {
+      expect(wrapper.then((value) => value.toString()),
+          completion(equals("12")));
+      expect(errorWrapper.then(expectAsync((_) {}, count: 0)),
+          throwsA("oh no"));
+    });
+
+    test("whenComplete()", () {
+      expect(wrapper.whenComplete(expectAsync(() {})), completion(equals(12)));
+      expect(errorWrapper.whenComplete(expectAsync(() {})), throwsA("oh no"));
+    });
+
+    test("timeout()", () {
+      expect(wrapper.timeout(new Duration(seconds: 1)), completion(equals(12)));
+      expect(errorWrapper.timeout(new Duration(seconds: 1)), throwsA("oh no"));
+
+      expect(
+          new TypeSafeFuture<int>(new Completer<Object>().future)
+              .timeout(Duration.ZERO),
+          throwsA(new isInstanceOf<TimeoutException>()));
+
+      expect(
+          new TypeSafeFuture<int>(new Completer<Object>().future)
+              .timeout(Duration.ZERO, onTimeout: expectAsync(() => 15)),
+          completion(equals(15)));
+    });
+  });
+
+  group("with invalid types", () {
+    var wrapper;
+    setUp(() {
+      wrapper = new TypeSafeFuture<int>(new Future<Object>.value("foo"));
+    });
+
+    group("throws a CastError for", () {
+      test("asStream()", () {
+        expect(wrapper.asStream().first, throwsCastError);
+      });
+
+      test("then()", () {
+        expect(
+            wrapper.then(expectAsync((_) {}, count: 0),
+                onError: expectAsync((_) {}, count: 0)),
+            throwsCastError);
+      });
+
+      test("whenComplete()", () {
+        expect(wrapper.whenComplete(expectAsync(() {})).then((_) {}),
+            throwsCastError);
+      });
+
+      test("timeout()", () {
+        expect(wrapper.timeout(new Duration(seconds: 3)).then((_) {}),
+            throwsCastError);
+
+      expect(
+          new TypeSafeFuture<int>(new Completer<Object>().future)
+              .timeout(Duration.ZERO, onTimeout: expectAsync(() => "foo"))
+              .then((_) {}),
+          throwsCastError);
+      });
+    });
+
+    group("doesn't throw a CastError for", () {
+      test("catchError()", () {
+        // catchError has a Future<dynamic> return type, so even if there's no
+        // error we don't re-wrap the returned future.
+        expect(wrapper.catchError(expectAsync((_) {}, count: 0)),
+            completion(equals("foo")));
+      });
+    });
+  });
+}
diff --git a/test/typed_wrapper/stream_subscription_test.dart b/test/typed_wrapper/stream_subscription_test.dart
new file mode 100644
index 0000000..30d3892
--- /dev/null
+++ b/test/typed_wrapper/stream_subscription_test.dart
@@ -0,0 +1,144 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import "package:async/async.dart";
+import "package:async/src/typed/stream_subscription.dart";
+import "package:test/test.dart";
+
+import '../utils.dart';
+
+void main() {
+  group("with valid types, forwards", () {
+    var controller;
+    var wrapper;
+    var isCanceled;
+    setUp(() {
+      controller = new StreamController<Object>(onCancel: () {
+        isCanceled = true;
+      });
+      wrapper = new TypeSafeStreamSubscription<int>(
+          controller.stream.listen(null));
+    });
+
+    test("onData()", () {
+      wrapper.onData(expectAsync((data) {
+        expect(data, equals(1));
+      }));
+      controller.add(1);
+    });
+
+    test("onError()", () {
+      wrapper.onError(expectAsync((error) {
+        expect(error, equals("oh no"));
+      }));
+      controller.addError("oh no");
+    });
+
+    test("onDone()", () {
+      wrapper.onDone(expectAsync(() {}));
+      controller.close();
+    });
+
+    test("pause(), resume(), and isPaused", () async {
+      expect(wrapper.isPaused, isFalse);
+
+      wrapper.pause();
+      await flushMicrotasks();
+      expect(controller.isPaused, isTrue);
+      expect(wrapper.isPaused, isTrue);
+
+      wrapper.resume();
+      await flushMicrotasks();
+      expect(controller.isPaused, isFalse);
+      expect(wrapper.isPaused, isFalse);
+    });
+
+    test("cancel()", () async {
+      wrapper.cancel();
+      await flushMicrotasks();
+      expect(isCanceled, isTrue);
+    });
+
+    test("asFuture()", () {
+      expect(wrapper.asFuture(12), completion(equals(12)));
+      controller.close();
+    });
+  });
+
+  group("with invalid types,", () {
+    var controller;
+    var wrapper;
+    var isCanceled;
+    setUp(() {
+      controller = new StreamController<Object>(onCancel: () {
+        isCanceled = true;
+      });
+      wrapper = new TypeSafeStreamSubscription<int>(
+          controller.stream.listen(null));
+    });
+
+    group("throws a CastError for", () {
+      test("onData()", () {
+        expect(() {
+          // TODO(nweiz): Use the wrapper declared in setUp when sdk#26226 is
+          // fixed.
+          controller = new StreamController<Object>();
+          wrapper = new TypeSafeStreamSubscription<int>(
+              controller.stream.listen(null));
+
+          wrapper.onData(expectAsync((_) {}, count: 0));
+          controller.add("foo");
+        }, throwsZonedCastError);
+      });
+    });
+
+    group("doesn't throw a CastError for", () {
+      test("onError()", () {
+        wrapper.onError(expectAsync((error) {
+          expect(error, equals("oh no"));
+        }));
+        controller.add("foo");
+        controller.addError("oh no");
+      });
+
+      test("onDone()", () {
+        wrapper.onDone(expectAsync(() {}));
+        controller.add("foo");
+        controller.close();
+      });
+
+      test("pause(), resume(), and isPaused", () async {
+        controller.add("foo");
+
+        expect(wrapper.isPaused, isFalse);
+
+        wrapper.pause();
+        await flushMicrotasks();
+        expect(controller.isPaused, isTrue);
+        expect(wrapper.isPaused, isTrue);
+
+        wrapper.resume();
+        await flushMicrotasks();
+        expect(controller.isPaused, isFalse);
+        expect(wrapper.isPaused, isFalse);
+      });
+
+      test("cancel()", () async {
+        controller.add("foo");
+
+        wrapper.cancel();
+        await flushMicrotasks();
+        expect(isCanceled, isTrue);
+      });
+
+      test("asFuture()", () {
+        expect(wrapper.asFuture(12), completion(equals(12)));
+        controller.add("foo");
+        controller.close();
+      });
+    });
+  });
+}
diff --git a/test/typed_wrapper/stream_test.dart b/test/typed_wrapper/stream_test.dart
new file mode 100644
index 0000000..4936120
--- /dev/null
+++ b/test/typed_wrapper/stream_test.dart
@@ -0,0 +1,598 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import "package:async/async.dart";
+import "package:async/src/typed/stream.dart";
+import "package:test/test.dart";
+
+import '../utils.dart';
+
+void main() {
+  group("with valid types, forwards", () {
+    var controller;
+    var wrapper;
+    var emptyWrapper;
+    var singleWrapper;
+    var errorWrapper;
+    setUp(() {
+      controller = new StreamController<Object>()
+          ..add(1)..add(2)..add(3)..add(4)..add(5)..close();
+
+      // TODO(nweiz): Use public methods when test#414 is fixed and we can run
+      // this on DDC.
+      wrapper = new TypeSafeStream<int>(controller.stream);
+      emptyWrapper = new TypeSafeStream<int>(new Stream<Object>.empty());
+      singleWrapper = new TypeSafeStream<int>(
+          new Stream<Object>.fromIterable([1]));
+      errorWrapper = new TypeSafeStream<int>(
+          new Stream<Object>.fromFuture(new Future.error("oh no")));
+    });
+
+    test("first", () {
+      expect(wrapper.first, completion(equals(1)));
+      expect(emptyWrapper.first, throwsStateError);
+    });
+
+    test("last", () {
+      expect(wrapper.last, completion(equals(5)));
+      expect(emptyWrapper.last, throwsStateError);
+    });
+
+    test("single", () {
+      expect(wrapper.single, throwsStateError);
+      expect(singleWrapper.single, completion(equals(1)));
+    });
+
+    test("isBroadcast", () {
+      expect(wrapper.isBroadcast, isFalse);
+      var broadcastWrapper = new TypeSafeStream<int>(
+          new Stream<Object>.empty().asBroadcastStream());
+      expect(broadcastWrapper.isBroadcast, isTrue);
+    });
+
+    test("isEmpty", () {
+      expect(wrapper.isEmpty, completion(isFalse));
+      expect(emptyWrapper.isEmpty, completion(isTrue));
+    });
+
+    test("length", () {
+      expect(wrapper.length, completion(equals(5)));
+      expect(emptyWrapper.length, completion(equals(0)));
+    });
+
+    group("asBroadcastStream()", () {
+      test("with no parameters", () {
+        var broadcast = wrapper.asBroadcastStream();
+        expect(broadcast.toList(), completion(equals([1, 2, 3, 4, 5])));
+        expect(broadcast.toList(), completion(equals([1, 2, 3, 4, 5])));
+      });
+
+      test("with onListen", () {
+        var broadcast = wrapper.asBroadcastStream(
+            onListen: expectAsync((subscription) {
+          expect(subscription, new isInstanceOf<StreamSubscription<int>>());
+          subscription.pause();
+        }));
+
+        broadcast.listen(null);
+        expect(controller.isPaused, isTrue);
+      });
+
+      test("with onCancel", () {
+        var broadcast = wrapper.asBroadcastStream(
+            onCancel: expectAsync((subscription) {
+          expect(subscription, new isInstanceOf<StreamSubscription<int>>());
+          subscription.pause();
+        }));
+
+        broadcast.listen(null).cancel();
+        expect(controller.isPaused, isTrue);
+      });
+    });
+
+    test("asyncExpand()", () {
+      expect(
+          wrapper.asyncExpand((i) => new Stream.fromIterable([i, i])).toList(),
+          completion(equals([1, 1, 2, 2, 3, 3, 4, 4, 5, 5])));
+    });
+
+    test("asyncMap()", () {
+      expect(wrapper.asyncMap((i) => new Future.value(i * 2)).toList(),
+          completion(equals([2, 4, 6, 8, 10])));
+    });
+
+    group("distinct()", () {
+      test("without equals", () {
+        expect(wrapper.distinct().toList(),
+            completion(equals([1, 2, 3, 4, 5])));
+
+        expect(
+            new TypeSafeStream<int>(
+                    new Stream<Object>.fromIterable([1, 1, 2, 2, 3, 3]))
+                .distinct().toList(),
+            completion(equals([1, 2, 3])));
+      });
+
+      test("with equals", () {
+        expect(wrapper.distinct((i1, i2) => (i1 ~/ 2 == i2 ~/ 2)).toList(),
+            completion(equals([1, 2, 4])));
+      });
+    });
+
+    group("drain()", () {
+      test("without a value", () {
+        expect(wrapper.drain(), completes);
+        expect(() => wrapper.drain(), throwsStateError);
+      });
+
+      test("with a value", () {
+        expect(wrapper.drain(12), completion(equals(12)));
+      });
+    });
+
+    test("expand()", () {
+      expect(
+          wrapper.expand((i) => [i, i]).toList(),
+          completion(equals([1, 1, 2, 2, 3, 3, 4, 4, 5, 5])));
+    });
+
+    group("firstWhere()", () {
+      test("finding a value", () {
+        expect(wrapper.firstWhere((i) => i > 3), completion(equals(4)));
+      });
+
+      test("finding no value", () {
+        expect(wrapper.firstWhere((i) => i > 5), throwsStateError);
+      });
+
+      test("with a default value", () {
+        expect(wrapper.firstWhere((i) => i > 5, defaultValue: () => "value"),
+            completion(equals("value")));
+      });
+    });
+
+    group("lastWhere()", () {
+      test("finding a value", () {
+        expect(wrapper.lastWhere((i) => i < 3), completion(equals(2)));
+      });
+
+      test("finding no value", () {
+        expect(wrapper.lastWhere((i) => i > 5), throwsStateError);
+      });
+
+      test("with a default value", () {
+        expect(wrapper.lastWhere((i) => i > 5, defaultValue: () => "value"),
+            completion(equals("value")));
+      });
+    });
+
+    group("singleWhere()", () {
+      test("finding a single value", () {
+        expect(wrapper.singleWhere((i) => i == 3), completion(equals(3)));
+      });
+
+      test("finding no value", () {
+        expect(wrapper.singleWhere((i) => i == 6), throwsStateError);
+      });
+
+      test("finding multiple values", () {
+        expect(wrapper.singleWhere((i) => i.isOdd), throwsStateError);
+      });
+    });
+
+    test("fold()", () {
+      expect(wrapper.fold("foo", (previous, i) => previous + i.toString()),
+          completion(equals("foo12345")));
+    });
+
+    test("forEach()", () async {
+      emptyWrapper.forEach(expectAsync((_) {}, count: 0));
+
+      var results = [];
+      await wrapper.forEach(results.add);
+      expect(results, equals([1, 2, 3, 4, 5]));
+    });
+
+    group("handleError()", () {
+      test("without a test", () {
+        expect(errorWrapper.handleError(expectAsync((error) {
+          expect(error, equals("oh no"));
+        })).toList(), completion(isEmpty));
+      });
+
+      test("with a matching test", () {
+        expect(errorWrapper.handleError(expectAsync((error) {
+          expect(error, equals("oh no"));
+        }), test: expectAsync((error) {
+          expect(error, equals("oh no"));
+          return true;
+        })).toList(), completion(isEmpty));
+      });
+
+      test("with a matching test", () {
+        expect(errorWrapper.handleError(expectAsync((_) {}, count: 0),
+            test: expectAsync((error) {
+          expect(error, equals("oh no"));
+          return false;
+        })).toList(), throwsA("oh no"));
+      });
+    });
+
+    group("listen()", () {
+      test("with a callback", () {
+        var subscription;
+        subscription = wrapper.listen(expectAsync((data) {
+          expect(data, equals(1));
+
+          subscription.onData(expectAsync((data) {
+            expect(data, equals(2));
+            subscription.cancel();
+          }));
+        }));
+      });
+
+      test("with a null callback", () {
+        expect(wrapper.listen(null).asFuture(), completes);
+      });
+    });
+
+    test("map()", () {
+      expect(wrapper.map((i) => i * 2).toList(),
+          completion(equals([2, 4, 6, 8, 10])));
+    });
+
+    test("pipe()", () {
+      var consumer = new StreamController<T>();
+      expect(wrapper.pipe(consumer), completes);
+      expect(consumer.stream.toList(), completion(equals([1, 2, 3, 4, 5])));
+    });
+
+    test("reduce()", () {
+      expect(wrapper.reduce((value, i) => value + i), completion(equals(15)));
+      expect(emptyWrapper.reduce((value, i) => value + i), throwsStateError);
+    });
+
+    test("skipWhile()", () {
+      expect(wrapper.skipWhile((i) => i < 3).toList(),
+          completion(equals([3, 4, 5])));
+    });
+
+    test("takeWhile()", () {
+      expect(wrapper.takeWhile((i) => i < 3).toList(),
+          completion(equals([1, 2])));
+    });
+
+    test("toSet()", () {
+      expect(wrapper.toSet(), completion(unorderedEquals([1, 2, 3, 4, 5])));
+      expect(
+          new TypeSafeStream<int>(
+                  new Stream<Object>.fromIterable([1, 1, 2, 2, 3, 3]))
+              .toSet(),
+          completion(unorderedEquals([1, 2, 3])));
+    });
+
+    test("transform()", () {
+     var transformer = new StreamTransformer<int, String>.fromHandlers(
+         handleData: (data, sink) {
+       sink.add(data.toString());
+     });
+
+      expect(wrapper.transform(transformer).toList(),
+          completion(equals(["1", "2", "3", "4", "5"])));
+    });
+
+    test("where()", () {
+      expect(wrapper.where((i) => i.isOdd).toList(),
+          completion(equals([1, 3, 5])));
+    });
+
+    group("any()", () {
+      test("with matches", () {
+        expect(wrapper.any((i) => i > 3), completion(isTrue));
+      });
+
+      test("without matches", () {
+        expect(wrapper.any((i) => i > 5), completion(isFalse));
+      });
+    });
+
+    group("every()", () {
+      test("with all matches", () {
+        expect(wrapper.every((i) => i < 6), completion(isTrue));
+      });
+
+      test("with some non-matches", () {
+        expect(wrapper.every((i) => i > 3), completion(isFalse));
+      });
+    });
+
+    group("skip()", () {
+      test("with a valid index", () {
+        expect(wrapper.skip(3).toList(), completion(equals([4, 5])));
+      });
+
+      test("with a longer index than length", () {
+        expect(wrapper.skip(6).toList(), completion(isEmpty));
+      });
+
+      test("with a negative index", () {
+        expect(() => wrapper.skip(-1), throwsArgumentError);
+      });
+    });
+
+    group("take()", () {
+      test("with a valid index", () {
+        expect(wrapper.take(3).toList(), completion(equals([1, 2, 3])));
+      });
+
+      test("with a longer index than length", () {
+        expect(wrapper.take(6).toList(), completion(equals([1, 2, 3, 4, 5])));
+      });
+
+      test("with a negative index", () {
+        expect(wrapper.take(-1).toList(), completion(isEmpty));
+      });
+    });
+
+    group("elementAt()", () {
+      test("with a valid index", () {
+        expect(wrapper.elementAt(3), completion(equals(4)));
+      });
+
+      test("with too high an index", () {
+        expect(wrapper.elementAt(6), throwsRangeError);
+      });
+
+      test("with a negative index", () {
+        expect(wrapper.elementAt(-1), throwsArgumentError);
+      });
+    });
+
+    group("contains()", () {
+      test("with an element", () {
+        expect(wrapper.contains(2), completion(isTrue));
+      });
+
+      test("with a non-element", () {
+        expect(wrapper.contains(6), completion(isFalse));
+      });
+
+      test("with a non-element of a different type", () {
+        expect(wrapper.contains("foo"), completion(isFalse));
+      });
+    });
+
+    group("join()", () {
+      test("without a separator", () {
+        expect(wrapper.join(), completion(equals("12345")));
+      });
+
+      test("with a separator", () {
+        expect(wrapper.join(" "), completion(equals("1 2 3 4 5")));
+      });
+    });
+
+    test("toString()", () {
+      expect(wrapper.toString(), contains("Stream"));
+    });
+  });
+
+  group("with invalid types", () {
+    var wrapper;
+    var singleWrapper;
+    setUp(() {
+      wrapper = new TypeSafeStream<int>(
+          new Stream<Object>.fromIterable(["foo", "bar", "baz"]));
+      singleWrapper = new TypeSafeStream<int>(
+          new Stream<Object>.fromIterable(["foo"]));
+    });
+
+    group("throws a CastError for", () {
+      test("first", () {
+        expect(wrapper.first, throwsCastError);
+      });
+
+      test("last", () {
+        expect(wrapper.last, throwsCastError);
+      });
+
+      test("single", () {
+        expect(singleWrapper.single, throwsCastError);
+      });
+
+      test("asBroadcastStream()", () {
+        var broadcast = wrapper.asBroadcastStream();
+        expect(broadcast.first, throwsCastError);
+      });
+
+      test("asyncExpand()", () {
+        expect(wrapper.asyncExpand(expectAsync((_) {}, count: 0)).first,
+            throwsCastError);
+      });
+
+      test("asyncMap()", () {
+        expect(wrapper.asyncMap(expectAsync((_) {}, count: 0)).first,
+            throwsCastError);
+      });
+
+      group("distinct()", () {
+        test("without equals", () {
+          expect(wrapper.distinct().first, throwsCastError);
+        });
+
+        test("with equals", () {
+          expect(wrapper.distinct(expectAsync((_, __) {}, count: 0)).first,
+              throwsCastError);
+        });
+      });
+
+      test("expand()", () {
+        expect(wrapper.expand(expectAsync((_) {}, count: 0)).first,
+            throwsCastError);
+      });
+
+      test("firstWhere()", () {
+        expect(wrapper.firstWhere(expectAsync((_) {}, count: 0)),
+            throwsCastError);
+      });
+
+      test("lastWhere()", () {
+        expect(wrapper.lastWhere(expectAsync((_) {}, count: 0)),
+            throwsCastError);
+      });
+
+      test("singleWhere()", () {
+        expect(wrapper.singleWhere(expectAsync((_) {}, count: 0)),
+            throwsCastError);
+      });
+
+      test("fold()", () {
+        expect(wrapper.fold("foo", expectAsync((_, __) {}, count: 0)),
+            throwsCastError);
+      });
+
+      test("forEach()", () async {
+        expect(wrapper.forEach(expectAsync((_) {}, count: 0)), throwsCastError);
+      });
+
+      test("handleError()", () {
+        expect(wrapper.handleError(expectAsync((_) {}, count: 0)).first,
+            throwsCastError);
+      });
+
+      test("listen()", () {
+        expect(() => wrapper.take(1).listen(expectAsync((_) {}, count: 0)),
+            throwsZonedCastError);
+      });
+
+      test("map()", () {
+        expect(wrapper.map(expectAsync((_) {}, count: 0)).first,
+            throwsCastError);
+      });
+
+      test("reduce()", () {
+        expect(wrapper.reduce(expectAsync((_, __) {}, count: 0)),
+            throwsCastError);
+      });
+
+      test("skipWhile()", () {
+        expect(wrapper.skipWhile(expectAsync((_) {}, count: 0)).first,
+            throwsCastError);
+      });
+
+      test("takeWhile()", () {
+        expect(wrapper.takeWhile(expectAsync((_) {}, count: 0)).first,
+            throwsCastError);
+      });
+
+      test("toList()", () async {
+        var list = await wrapper.toList();
+        expect(() => list.first, throwsCastError);
+      }, skip: "Re-enable this when test can run DDC (test#414).");
+
+      test("toSet()", () async {
+        var asSet = await wrapper.toSet();
+        expect(() => asSet.first, throwsCastError);
+      }, skip: "Re-enable this when test can run DDC (test#414).");
+
+      test("where()", () {
+        expect(wrapper.where(expectAsync((_) {}, count: 0)).first,
+            throwsCastError);
+      });
+
+      test("any()", () {
+        expect(wrapper.any(expectAsync((_) {}, count: 0)), throwsCastError);
+      });
+
+      test("every()", () {
+        expect(wrapper.every(expectAsync((_) {}, count: 0)), throwsCastError);
+      });
+
+      test("skip()", () {
+        expect(wrapper.skip(1).first, throwsCastError);
+      });
+
+      test("take()", () {
+        expect(wrapper.take(1).first, throwsCastError);
+      });
+
+      test("elementAt()", () {
+        expect(wrapper.elementAt(1), throwsCastError);
+      });
+    });
+
+    group("doesn't throw a CastError for", () {
+      test("single", () {
+        expect(wrapper.single, throwsStateError);
+      });
+
+      test("length", () {
+        expect(wrapper.length, completion(equals(3)));
+      });
+
+      test("isBroadcast", () {
+        expect(wrapper.isBroadcast, isFalse);
+      });
+
+      test("isEmpty", () {
+        expect(wrapper.isEmpty, completion(isFalse));
+      });
+
+      group("drain()", () {
+        test("without a value", () {
+          expect(wrapper.drain(), completes);
+          expect(() => wrapper.drain(), throwsStateError);
+        });
+
+        test("with a value", () {
+          expect(wrapper.drain(12), completion(equals(12)));
+        });
+      });
+
+      test("skip()", () {
+        expect(() => wrapper.skip(-1), throwsArgumentError);
+      });
+
+      group("elementAt()", () {
+        test("with too high an index", () {
+          expect(wrapper.elementAt(6), throwsRangeError);
+        });
+
+        test("with a negative index", () {
+          expect(wrapper.elementAt(-1), throwsArgumentError);
+        });
+      });
+
+      group("contains()", () {
+        test("with an element", () {
+          expect(wrapper.contains("foo"), completion(isTrue));
+        });
+
+        test("with a non-element", () {
+          expect(wrapper.contains("qux"), completion(isFalse));
+        });
+
+        test("with a non-element of a different type", () {
+          expect(wrapper.contains(1), completion(isFalse));
+        });
+      });
+
+      group("join()", () {
+        test("without a separator", () {
+          expect(wrapper.join(), completion(equals("foobarbaz")));
+        });
+
+        test("with a separator", () {
+          expect(wrapper.join(" "), completion(equals("foo bar baz")));
+        });
+      });
+
+      test("toString()", () {
+        expect(wrapper.toString(), contains("Stream"));
+      });
+    });
+  });
+}
diff --git a/test/utils.dart b/test/utils.dart
index c32ea2c..7b65bcb 100644
--- a/test/utils.dart
+++ b/test/utils.dart
@@ -16,6 +16,29 @@
 /// Returns a function that fails the test if it is ever called.
 unreachable(String name) => ([a, b]) => fail("Unreachable: $name");
 
+// TODO(nweiz): Use the version of this in test when test#418 is fixed.
+/// A matcher that runs a callback in its own zone and asserts that that zone
+/// emits an error that matches [matcher].
+Matcher throwsZoned(matcher) => predicate((callback) {
+  var firstError = true;
+  runZoned(callback, onError: expectAsync((error, stackTrace) {
+    if (firstError) {
+      expect(error, matcher);
+      firstError = false;
+    } else {
+      registerException(error, stackTrace);
+    }
+  }, max: -1));
+  return true;
+});
+
+/// A matcher that runs a callback in its own zone and asserts that that zone
+/// emits a [CastError].
+final throwsZonedCastError = throwsZoned(new isInstanceOf<CastError>());
+
+/// A matcher that matches a callback or future that throws a [CastError].
+final throwsCastError = throwsA(new isInstanceOf<CastError>());
+
 /// A badly behaved stream which throws if it's ever listened to.
 ///
 /// Can be used to test cases where a stream should not be used.