Add typed wrapper functions to delegate classes.
These mirror the methods in the the collection package, and serve a
similar purpose of safely casting generic objects when the user is
confident that the actual object's values are more specific than the
static type.
R=floitsch@google.com, lrn@google.com
Review URL: https://codereview.chromium.org//1870543004 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 803b589..b18365f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,14 +1,23 @@
-## 1.9.1
+## 1.10.0
+
+* Add `DelegatingFuture.typed()`, `DelegatingStreamSubscription.typed()`,
+ `DelegatingStreamConsumer.typed()`, `DelegatingSink.typed()`,
+ `DelegatingEventSink.typed()`, and `DelegatingStreamSink.typed()` static
+ methods. These wrap untyped instances of these classes with the correct type
+ parameter, and assert the types of values as they're accessed.
+
+* Add a `DelegatingStream` class. This is behaviorally identical to `StreamView`
+ from `dart:async`, but it follows this package's naming conventions and
+ provides a `DelegatingStream.typed()` static method.
* Fix all strong mode warnings and add generic method annotations.
-* `new StreamQueue()` now takes a `Stream<T>` rather than a `Stream<dynamic>`.
- Passing a type that wasn't `is`-compatible with `Stream<T>` would already
- throw an error under some circumstances, so this is not considered a breaking
- change.
-
-* `new SubscriptionStream()` now takes a `Stream<T>` rather than a
- `Stream<dynamic>`. Passing a type that wasn't `is`-compatible with `Stream<T>`
+* `new StreamQueue()`, `new SubscriptionStream()`, `new
+ DelegatingStreamSubscription()`, `new DelegatingStreamConsumer()`, `new
+ DelegatingSink()`, `new DelegatingEventSink()`, and `new
+ DelegatingStreamSink()` now take arguments with generic type arguments (for
+ example `Stream<T>`) rather than without (for example `Stream<dynamic>`).
+ Passing a type that wasn't `is`-compatible with the fully-specified generic
would already throw an error under some circumstances, so this is not
considered a breaking change.
diff --git a/lib/async.dart b/lib/async.dart
index 0d3e051..7055341 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -7,6 +7,7 @@
export "src/delegate/event_sink.dart";
export "src/delegate/future.dart";
export "src/delegate/sink.dart";
+export "src/delegate/stream.dart";
export "src/delegate/stream_consumer.dart";
export "src/delegate/stream_sink.dart";
export "src/delegate/stream_subscription.dart";
diff --git a/lib/src/delegate/event_sink.dart b/lib/src/delegate/event_sink.dart
index 5a525df..14501b2 100644
--- a/lib/src/delegate/event_sink.dart
+++ b/lib/src/delegate/event_sink.dart
@@ -12,7 +12,18 @@
final EventSink _sink;
/// Create a delegating sink forwarding calls to [sink].
- DelegatingEventSink(EventSink sink) : _sink = sink;
+ DelegatingEventSink(EventSink<T> sink) : _sink = sink;
+
+ DelegatingEventSink._(this._sink);
+
+ /// Creates a wrapper that coerces the type of [sink].
+ ///
+ /// Unlike [new DelegatingEventSink], this only requires its argument to be an
+ /// instance of `EventSink`, not `EventSink<T>`. This means that calls to
+ /// [add] may throw a [CastError] if the argument type doesn't match the
+ /// reified type of [sink].
+ static EventSink/*<T>*/ typed/*<T>*/(EventSink sink) =>
+ sink is EventSink/*<T>*/ ? sink : new DelegatingEventSink._(sink);
void add(T data) {
_sink.add(data);
diff --git a/lib/src/delegate/future.dart b/lib/src/delegate/future.dart
index bfc075b..3f7ea72 100644
--- a/lib/src/delegate/future.dart
+++ b/lib/src/delegate/future.dart
@@ -4,6 +4,8 @@
import 'dart:async';
+import '../typed/future.dart';
+
/// A wrapper that forwards calls to a [Future].
class DelegatingFuture<T> implements Future<T> {
/// The wrapped [Future].
@@ -11,6 +13,15 @@
DelegatingFuture(this._future);
+ /// Creates a wrapper which throws if [future]'s value isn't an instance of
+ /// `T`.
+ ///
+ /// This soundly converts a [Future] to a `Future<T>`, regardless of its
+ /// original generic type, by asserting that its value is an instance of `T`
+ /// whenever it's provided. If it's not, the future throws a [CastError].
+ static Future/*<T>*/ typed/*<T>*/(Future future) =>
+ future is Future/*<T>*/ ? future : new TypeSafeFuture/*<T>*/(future);
+
Stream<T> asStream() => _future.asStream();
Future catchError(Function onError, {bool test(Object error)}) =>
@@ -21,6 +32,6 @@
Future<T> whenComplete(action()) => _future.whenComplete(action);
- Future<T> timeout(Duration timeLimit, {void onTimeout()}) =>
+ Future<T> timeout(Duration timeLimit, {onTimeout()}) =>
_future.timeout(timeLimit, onTimeout: onTimeout);
}
diff --git a/lib/src/delegate/sink.dart b/lib/src/delegate/sink.dart
index cee2937..326c15b 100644
--- a/lib/src/delegate/sink.dart
+++ b/lib/src/delegate/sink.dart
@@ -10,8 +10,18 @@
final Sink _sink;
/// Create a delegating sink forwarding calls to [sink].
- DelegatingSink(Sink sink)
- : _sink = sink;
+ DelegatingSink(Sink<T> sink) : _sink = sink;
+
+ DelegatingSink._(this._sink);
+
+ /// Creates a wrapper that coerces the type of [sink].
+ ///
+ /// Unlike [new DelegatingSink], this only requires its argument to be an
+ /// instance of `Sink`, not `Sink<T>`. This means that calls to [add] may
+ /// throw a [CastError] if the argument type doesn't match the reified type of
+ /// [sink].
+ static Sink/*<T>*/ typed/*<T>*/(Sink sink) =>
+ sink is Sink/*<T>*/ ? sink : new DelegatingSink._(sink);
void add(T data) {
_sink.add(data);
diff --git a/lib/src/delegate/stream.dart b/lib/src/delegate/stream.dart
new file mode 100644
index 0000000..7562218
--- /dev/null
+++ b/lib/src/delegate/stream.dart
@@ -0,0 +1,28 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import '../typed/stream.dart';
+
+/// Simple delegating wrapper around a [Stream].
+///
+/// Subclasses can override individual methods, or use this to expose only the
+/// [Stream] methods of a subclass.
+///
+/// Note that this is identical to [StreamView] in `dart:async`. It's provided
+/// under this name for consistency with other `Delegating*` classes.
+class DelegatingStream<T> extends StreamView<T> {
+ DelegatingStream(Stream<T> stream) : super(stream);
+
+ /// Creates a wrapper which throws if [stream]'s events aren't instances of
+ /// `T`.
+ ///
+ /// This soundly converts a [Stream] to a `Stream<T>`, regardless of its
+ /// original generic type, by asserting that its events are instances of `T`
+ /// whenever they're provided. If they're not, the stream throws a
+ /// [CastError].
+ static Stream/*<T>*/ typed/*<T>*/(Stream stream) =>
+ stream is Stream/*<T>*/ ? stream : new TypeSafeStream/*<T>*/(stream);
+}
diff --git a/lib/src/delegate/stream_consumer.dart b/lib/src/delegate/stream_consumer.dart
index dcaf0c2..4f495d0 100644
--- a/lib/src/delegate/stream_consumer.dart
+++ b/lib/src/delegate/stream_consumer.dart
@@ -12,8 +12,20 @@
final StreamConsumer _consumer;
/// Create a delegating consumer forwarding calls to [consumer].
- DelegatingStreamConsumer(StreamConsumer consumer)
- : _consumer = consumer;
+ DelegatingStreamConsumer(StreamConsumer<T> consumer) : _consumer = consumer;
+
+ DelegatingStreamConsumer._(this._consumer);
+
+ /// Creates a wrapper that coerces the type of [consumer].
+ ///
+ /// Unlike [new StreamConsumer], this only requires its argument to be an
+ /// instance of `StreamConsumer`, not `StreamConsumer<T>`. This means that
+ /// calls to [addStream] may throw a [CastError] if the argument type doesn't
+ /// match the reified type of [consumer].
+ static StreamConsumer/*<T>*/ typed/*<T>*/(StreamConsumer consumer) =>
+ consumer is StreamConsumer/*<T>*/
+ ? consumer
+ : new DelegatingStreamConsumer._(consumer);
Future addStream(Stream<T> stream) => _consumer.addStream(stream);
diff --git a/lib/src/delegate/stream_sink.dart b/lib/src/delegate/stream_sink.dart
index e06afc1..9b52b19 100644
--- a/lib/src/delegate/stream_sink.dart
+++ b/lib/src/delegate/stream_sink.dart
@@ -14,8 +14,18 @@
Future get done => _sink.done;
/// Create delegating sink forwarding calls to [sink].
- DelegatingStreamSink(StreamSink sink)
- : _sink = sink;
+ DelegatingStreamSink(StreamSink<T> sink) : _sink = sink;
+
+ DelegatingStreamSink._(this._sink);
+
+ /// Creates a wrapper that coerces the type of [sink].
+ ///
+ /// Unlike [new StreamSink], this only requires its argument to be an instance
+ /// of `StreamSink`, not `StreamSink<T>`. This means that calls to [add] may
+ /// throw a [CastError] if the argument type doesn't match the reified type of
+ /// [sink].
+ static StreamSink/*<T>*/ typed/*<T>*/(StreamSink sink) =>
+ sink is StreamSink/*<T>*/ ? sink : new DelegatingStreamSink._(sink);
void add(T data) {
_sink.add(data);
diff --git a/lib/src/delegate/stream_subscription.dart b/lib/src/delegate/stream_subscription.dart
index e7153b2..b86e91f 100644
--- a/lib/src/delegate/stream_subscription.dart
+++ b/lib/src/delegate/stream_subscription.dart
@@ -4,6 +4,8 @@
import 'dart:async';
+import '../typed/stream_subscription.dart';
+
/// Simple delegating wrapper around a [StreamSubscription].
///
/// Subclasses can override individual methods.
@@ -11,9 +13,22 @@
final StreamSubscription _source;
/// Create delegating subscription forwarding calls to [sourceSubscription].
- DelegatingStreamSubscription(StreamSubscription sourceSubscription)
+ DelegatingStreamSubscription(StreamSubscription<T> sourceSubscription)
: _source = sourceSubscription;
+ /// Creates a wrapper which throws if [subscription]'s events aren't instances
+ /// of `T`.
+ ///
+ /// This soundly converts a [StreamSubscription] to a `StreamSubscription<T>`,
+ /// regardless of its original generic type, by asserting that its events are
+ /// instances of `T` whenever they're provided. If they're not, the
+ /// subscription throws a [CastError].
+ static StreamSubscription/*<T>*/ typed/*<T>*/(
+ StreamSubscription subscription) =>
+ subscription is StreamSubscription/*<T>*/
+ ? subscription
+ : new TypeSafeStreamSubscription/*<T>*/(subscription);
+
void onData(void handleData(T data)) {
_source.onData(handleData);
}
diff --git a/lib/src/lazy_stream.dart b/lib/src/lazy_stream.dart
index 3ba4953..147fa8b 100644
--- a/lib/src/lazy_stream.dart
+++ b/lib/src/lazy_stream.dart
@@ -5,6 +5,7 @@
import "dart:async";
import "stream_completer.dart";
+import "delegate/stream.dart";
/// A [Stream] wrapper that forwards to another [Stream] that's initialized
/// lazily.
@@ -39,9 +40,14 @@
_callback = null;
var result = callback();
- Stream stream = result is Future
- ? StreamCompleter.fromFuture(result)
- : result;
+ Stream<T> stream;
+ if (result is Future) {
+ stream = StreamCompleter.fromFuture(result.then((stream) {
+ return DelegatingStream.typed/*<T>*/(stream as Stream);
+ }));
+ } else {
+ stream = DelegatingStream.typed/*<T>*/(result as Stream);
+ }
return stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
diff --git a/lib/src/subscription_stream.dart b/lib/src/subscription_stream.dart
index c448620..50ca81b 100644
--- a/lib/src/subscription_stream.dart
+++ b/lib/src/subscription_stream.dart
@@ -68,7 +68,7 @@
/// source subscription on the first error.
class _CancelOnErrorSubscriptionWrapper<T>
extends DelegatingStreamSubscription<T> {
- _CancelOnErrorSubscriptionWrapper(StreamSubscription subscription)
+ _CancelOnErrorSubscriptionWrapper(StreamSubscription<T> subscription)
: super(subscription);
void onError(Function handleError) {
diff --git a/lib/src/typed/future.dart b/lib/src/typed/future.dart
new file mode 100644
index 0000000..a269593
--- /dev/null
+++ b/lib/src/typed/future.dart
@@ -0,0 +1,25 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+class TypeSafeFuture<T> implements Future<T> {
+ final Future _future;
+
+ TypeSafeFuture(this._future);
+
+ Stream<T> asStream() => _future.then((value) => value as T).asStream();
+
+ Future catchError(Function onError, {bool test(Object error)}) =>
+ _future.catchError(onError, test: test);
+
+ Future/*<S>*/ then/*<S>*/(/*=S*/ onValue(T value), {Function onError}) =>
+ _future.then((value) => onValue(value as T), onError: onError);
+
+ Future<T> whenComplete(action()) =>
+ new TypeSafeFuture<T>(_future.whenComplete(action));
+
+ Future<T> timeout(Duration timeLimit, {onTimeout()}) =>
+ new TypeSafeFuture<T>(_future.timeout(timeLimit, onTimeout: onTimeout));
+}
diff --git a/lib/src/typed/stream.dart b/lib/src/typed/stream.dart
new file mode 100644
index 0000000..3db9f69
--- /dev/null
+++ b/lib/src/typed/stream.dart
@@ -0,0 +1,136 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:collection/collection.dart';
+
+import '../utils.dart';
+import 'stream_subscription.dart';
+
+class TypeSafeStream<T> implements Stream<T> {
+ final Stream _stream;
+
+ Future<T> get first async => (await _stream.first) as T;
+ Future<T> get last async => (await _stream.last) as T;
+ Future<T> get single async => (await _stream.single) as T;
+
+ bool get isBroadcast => _stream.isBroadcast;
+ Future<bool> get isEmpty => _stream.isEmpty;
+ Future<int> get length => _stream.length;
+
+ TypeSafeStream(this._stream);
+
+ Stream<T> asBroadcastStream(
+ {void onListen(StreamSubscription<T> subscription),
+ void onCancel(StreamSubscription<T> subscription)}) {
+ return new TypeSafeStream<T>(_stream.asBroadcastStream(
+ onListen: onListen == null
+ ? null
+ : (subscription) =>
+ onListen(new TypeSafeStreamSubscription<T>(subscription)),
+ onCancel: onCancel == null
+ ? null
+ : (subscription) =>
+ onCancel(new TypeSafeStreamSubscription<T>(subscription))));
+ }
+
+ // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed.
+ Stream asyncExpand(Stream convert(T event)) =>
+ _stream.asyncExpand(_validateType(convert));
+
+ // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed.
+ Stream asyncMap(convert(T event)) => _stream.asyncMap(_validateType(convert));
+
+ Stream<T> distinct([bool equals(T previous, T next)]) =>
+ new TypeSafeStream<T>(_stream.distinct(equals == null
+ ? null
+ : (previous, next) => equals(previous as T, next as T)));
+
+ // TODO(nweiz): Give this a generic parameter when sdk#26125 is fixed.
+ Future drain([futureValue]) => _stream.drain(futureValue);
+
+ Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) =>
+ _stream.expand(_validateType(convert));
+
+ Future firstWhere(bool test(T element), {Object defaultValue()}) =>
+ _stream.firstWhere(_validateType(test), defaultValue: defaultValue);
+
+ Future lastWhere(bool test(T element), {Object defaultValue()}) =>
+ _stream.lastWhere(_validateType(test), defaultValue: defaultValue);
+
+ Future<T> singleWhere(bool test(T element)) async =>
+ (await _stream.singleWhere(_validateType(test))) as T;
+
+ Future/*<S>*/ fold/*<S>*/(/*=S*/ initialValue,
+ /*=S*/ combine(/*=S*/ previous, T element)) =>
+ _stream.fold(initialValue,
+ (previous, element) => combine(previous, element as T));
+
+ Future forEach(void action(T element)) =>
+ _stream.forEach(_validateType(action));
+
+ Stream<T> handleError(Function onError, {bool test(error)}) =>
+ new TypeSafeStream<T>(_stream.handleError(onError, test: test));
+
+ StreamSubscription<T> listen(void onData(T value),
+ {Function onError, void onDone(), bool cancelOnError}) =>
+ new TypeSafeStreamSubscription<T>(_stream.listen(_validateType(onData),
+ onError: onError, onDone: onDone, cancelOnError: cancelOnError));
+
+ Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) =>
+ _stream.map(_validateType(convert));
+
+ // Don't forward to `_stream.pipe` because we want the consumer to see the
+ // type-asserted stream.
+ Future pipe(StreamConsumer<T> consumer) =>
+ consumer.addStream(this).then((_) => consumer.close());
+
+ Future<T> reduce(T combine(T previous, T element)) async {
+ var result = await _stream.reduce(
+ (previous, element) => combine(previous as T, element as T));
+ return result as T;
+ }
+
+ Stream<T> skipWhile(bool test(T element)) =>
+ new TypeSafeStream<T>(_stream.skipWhile(_validateType(test)));
+
+ Stream<T> takeWhile(bool test(T element)) =>
+ new TypeSafeStream<T>(_stream.takeWhile(_validateType(test)));
+
+ Stream timeout(Duration timeLimit, {void onTimeout(EventSink sink)}) =>
+ _stream.timeout(timeLimit, onTimeout: onTimeout);
+
+ Future<List<T>> toList() async =>
+ DelegatingList.typed/*<T>*/(await _stream.toList());
+
+ Future<Set<T>> toSet() async =>
+ DelegatingSet.typed/*<T>*/(await _stream.toSet());
+
+ // Don't forward to `_stream.transform` because we want the transformer to see
+ // the type-asserted stream.
+ Stream/*<S>*/ transform/*<S>*/(
+ StreamTransformer<T, dynamic/*=S*/> transformer) =>
+ transformer.bind(this);
+
+ Stream<T> where(bool test(T element)) =>
+ new TypeSafeStream<T>(_stream.where(_validateType(test)));
+
+ Future<bool> every(bool test(T element)) =>
+ _stream.every(_validateType(test));
+
+ Future<bool> any(bool test(T element)) => _stream.any(_validateType(test));
+ Stream<T> skip(int count) => new TypeSafeStream<T>(_stream.skip(count));
+ Stream<T> take(int count) => new TypeSafeStream<T>(_stream.take(count));
+ Future<T> elementAt(int index) async => (await _stream.elementAt(index)) as T;
+ Future<bool> contains(Object needle) => _stream.contains(needle);
+ Future<String> join([String separator = ""]) => _stream.join(separator);
+ String toString() => _stream.toString();
+
+ /// Returns a version of [function] that asserts that its argument is an
+ /// instance of `T`.
+ UnaryFunction/*<dynamic, S>*/ _validateType/*<S>*/(
+ /*=S*/ function(T value)) =>
+ function == null ? null : (value) => function(value as T);
+}
diff --git a/lib/src/typed/stream_subscription.dart b/lib/src/typed/stream_subscription.dart
new file mode 100644
index 0000000..800a4ba
--- /dev/null
+++ b/lib/src/typed/stream_subscription.dart
@@ -0,0 +1,36 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+class TypeSafeStreamSubscription<T> implements StreamSubscription<T> {
+ final StreamSubscription _subscription;
+
+ bool get isPaused => _subscription.isPaused;
+
+ TypeSafeStreamSubscription(this._subscription);
+
+ void onData(void handleData(T data)) {
+ _subscription.onData((data) => handleData(data as T));
+ }
+
+ void onError(Function handleError) {
+ _subscription.onError(handleError);
+ }
+
+ void onDone(void handleDone()) {
+ _subscription.onDone(handleDone);
+ }
+
+ void pause([Future resumeFuture]) {
+ _subscription.pause(resumeFuture);
+ }
+
+ void resume() {
+ _subscription.resume();
+ }
+
+ Future cancel() => _subscription.cancel();
+ Future asFuture([futureValue]) => _subscription.asFuture(futureValue);
+}
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
new file mode 100644
index 0000000..0066003
--- /dev/null
+++ b/lib/src/utils.dart
@@ -0,0 +1,7 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/// A generic typedef for a function that takes one type and returns another.
+typedef F UnaryFunction<E, F>(E argument);
+
diff --git a/pubspec.yaml b/pubspec.yaml
index 9e64244..0859e29 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,8 +1,10 @@
name: async
-version: 1.9.1-dev
+version: 1.10.0
author: Dart Team <misc@dartlang.org>
description: Utility functions and classes related to the 'dart:async' library.
homepage: https://www.github.com/dart-lang/async
+dependencies:
+ collection: "^1.5.0"
dev_dependencies:
fake_async: "^0.1.2"
stack_trace: "^1.0.0"
diff --git a/test/typed_wrapper/future_test.dart b/test/typed_wrapper/future_test.dart
new file mode 100644
index 0000000..6601ac1
--- /dev/null
+++ b/test/typed_wrapper/future_test.dart
@@ -0,0 +1,117 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import "package:async/async.dart";
+import "package:async/src/typed/future.dart";
+import "package:test/test.dart";
+
+import '../utils.dart';
+
+void main() {
+ group("with valid types, forwards", () {
+ var wrapper;
+ var errorWrapper;
+ setUp(() {
+ wrapper = new TypeSafeFuture<int>(new Future<Object>.value(12));
+
+ var error = new Future<Object>.error("oh no");
+ error.catchError((_) {}); // Don't let the error cause the test to fail.
+ errorWrapper = new TypeSafeFuture<int>(error);
+ });
+
+ test("asStream()", () {
+ expect(wrapper.asStream().toList(), completion(equals([12])));
+ expect(errorWrapper.asStream().first, throwsA("oh no"));
+ });
+
+ test("catchError()", () {
+ expect(
+ wrapper.catchError(expectAsync((_) {}, count: 0),
+ test: expectAsync((_) {}, count: 0)),
+ completion(equals(12)));
+
+ expect(errorWrapper.catchError(expectAsync((error) {
+ expect(error, equals("oh no"));
+ return "value";
+ }), test: expectAsync((error) {
+ expect(error, equals("oh no"));
+ return true;
+ })), completion(equals("value")));
+ });
+
+ test("then()", () {
+ expect(wrapper.then((value) => value.toString()),
+ completion(equals("12")));
+ expect(errorWrapper.then(expectAsync((_) {}, count: 0)),
+ throwsA("oh no"));
+ });
+
+ test("whenComplete()", () {
+ expect(wrapper.whenComplete(expectAsync(() {})), completion(equals(12)));
+ expect(errorWrapper.whenComplete(expectAsync(() {})), throwsA("oh no"));
+ });
+
+ test("timeout()", () {
+ expect(wrapper.timeout(new Duration(seconds: 1)), completion(equals(12)));
+ expect(errorWrapper.timeout(new Duration(seconds: 1)), throwsA("oh no"));
+
+ expect(
+ new TypeSafeFuture<int>(new Completer<Object>().future)
+ .timeout(Duration.ZERO),
+ throwsA(new isInstanceOf<TimeoutException>()));
+
+ expect(
+ new TypeSafeFuture<int>(new Completer<Object>().future)
+ .timeout(Duration.ZERO, onTimeout: expectAsync(() => 15)),
+ completion(equals(15)));
+ });
+ });
+
+ group("with invalid types", () {
+ var wrapper;
+ setUp(() {
+ wrapper = new TypeSafeFuture<int>(new Future<Object>.value("foo"));
+ });
+
+ group("throws a CastError for", () {
+ test("asStream()", () {
+ expect(wrapper.asStream().first, throwsCastError);
+ });
+
+ test("then()", () {
+ expect(
+ wrapper.then(expectAsync((_) {}, count: 0),
+ onError: expectAsync((_) {}, count: 0)),
+ throwsCastError);
+ });
+
+ test("whenComplete()", () {
+ expect(wrapper.whenComplete(expectAsync(() {})).then((_) {}),
+ throwsCastError);
+ });
+
+ test("timeout()", () {
+ expect(wrapper.timeout(new Duration(seconds: 3)).then((_) {}),
+ throwsCastError);
+
+ expect(
+ new TypeSafeFuture<int>(new Completer<Object>().future)
+ .timeout(Duration.ZERO, onTimeout: expectAsync(() => "foo"))
+ .then((_) {}),
+ throwsCastError);
+ });
+ });
+
+ group("doesn't throw a CastError for", () {
+ test("catchError()", () {
+ // catchError has a Future<dynamic> return type, so even if there's no
+ // error we don't re-wrap the returned future.
+ expect(wrapper.catchError(expectAsync((_) {}, count: 0)),
+ completion(equals("foo")));
+ });
+ });
+ });
+}
diff --git a/test/typed_wrapper/stream_subscription_test.dart b/test/typed_wrapper/stream_subscription_test.dart
new file mode 100644
index 0000000..30d3892
--- /dev/null
+++ b/test/typed_wrapper/stream_subscription_test.dart
@@ -0,0 +1,144 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import "package:async/async.dart";
+import "package:async/src/typed/stream_subscription.dart";
+import "package:test/test.dart";
+
+import '../utils.dart';
+
+void main() {
+ group("with valid types, forwards", () {
+ var controller;
+ var wrapper;
+ var isCanceled;
+ setUp(() {
+ controller = new StreamController<Object>(onCancel: () {
+ isCanceled = true;
+ });
+ wrapper = new TypeSafeStreamSubscription<int>(
+ controller.stream.listen(null));
+ });
+
+ test("onData()", () {
+ wrapper.onData(expectAsync((data) {
+ expect(data, equals(1));
+ }));
+ controller.add(1);
+ });
+
+ test("onError()", () {
+ wrapper.onError(expectAsync((error) {
+ expect(error, equals("oh no"));
+ }));
+ controller.addError("oh no");
+ });
+
+ test("onDone()", () {
+ wrapper.onDone(expectAsync(() {}));
+ controller.close();
+ });
+
+ test("pause(), resume(), and isPaused", () async {
+ expect(wrapper.isPaused, isFalse);
+
+ wrapper.pause();
+ await flushMicrotasks();
+ expect(controller.isPaused, isTrue);
+ expect(wrapper.isPaused, isTrue);
+
+ wrapper.resume();
+ await flushMicrotasks();
+ expect(controller.isPaused, isFalse);
+ expect(wrapper.isPaused, isFalse);
+ });
+
+ test("cancel()", () async {
+ wrapper.cancel();
+ await flushMicrotasks();
+ expect(isCanceled, isTrue);
+ });
+
+ test("asFuture()", () {
+ expect(wrapper.asFuture(12), completion(equals(12)));
+ controller.close();
+ });
+ });
+
+ group("with invalid types,", () {
+ var controller;
+ var wrapper;
+ var isCanceled;
+ setUp(() {
+ controller = new StreamController<Object>(onCancel: () {
+ isCanceled = true;
+ });
+ wrapper = new TypeSafeStreamSubscription<int>(
+ controller.stream.listen(null));
+ });
+
+ group("throws a CastError for", () {
+ test("onData()", () {
+ expect(() {
+ // TODO(nweiz): Use the wrapper declared in setUp when sdk#26226 is
+ // fixed.
+ controller = new StreamController<Object>();
+ wrapper = new TypeSafeStreamSubscription<int>(
+ controller.stream.listen(null));
+
+ wrapper.onData(expectAsync((_) {}, count: 0));
+ controller.add("foo");
+ }, throwsZonedCastError);
+ });
+ });
+
+ group("doesn't throw a CastError for", () {
+ test("onError()", () {
+ wrapper.onError(expectAsync((error) {
+ expect(error, equals("oh no"));
+ }));
+ controller.add("foo");
+ controller.addError("oh no");
+ });
+
+ test("onDone()", () {
+ wrapper.onDone(expectAsync(() {}));
+ controller.add("foo");
+ controller.close();
+ });
+
+ test("pause(), resume(), and isPaused", () async {
+ controller.add("foo");
+
+ expect(wrapper.isPaused, isFalse);
+
+ wrapper.pause();
+ await flushMicrotasks();
+ expect(controller.isPaused, isTrue);
+ expect(wrapper.isPaused, isTrue);
+
+ wrapper.resume();
+ await flushMicrotasks();
+ expect(controller.isPaused, isFalse);
+ expect(wrapper.isPaused, isFalse);
+ });
+
+ test("cancel()", () async {
+ controller.add("foo");
+
+ wrapper.cancel();
+ await flushMicrotasks();
+ expect(isCanceled, isTrue);
+ });
+
+ test("asFuture()", () {
+ expect(wrapper.asFuture(12), completion(equals(12)));
+ controller.add("foo");
+ controller.close();
+ });
+ });
+ });
+}
diff --git a/test/typed_wrapper/stream_test.dart b/test/typed_wrapper/stream_test.dart
new file mode 100644
index 0000000..4936120
--- /dev/null
+++ b/test/typed_wrapper/stream_test.dart
@@ -0,0 +1,598 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import "package:async/async.dart";
+import "package:async/src/typed/stream.dart";
+import "package:test/test.dart";
+
+import '../utils.dart';
+
+void main() {
+ group("with valid types, forwards", () {
+ var controller;
+ var wrapper;
+ var emptyWrapper;
+ var singleWrapper;
+ var errorWrapper;
+ setUp(() {
+ controller = new StreamController<Object>()
+ ..add(1)..add(2)..add(3)..add(4)..add(5)..close();
+
+ // TODO(nweiz): Use public methods when test#414 is fixed and we can run
+ // this on DDC.
+ wrapper = new TypeSafeStream<int>(controller.stream);
+ emptyWrapper = new TypeSafeStream<int>(new Stream<Object>.empty());
+ singleWrapper = new TypeSafeStream<int>(
+ new Stream<Object>.fromIterable([1]));
+ errorWrapper = new TypeSafeStream<int>(
+ new Stream<Object>.fromFuture(new Future.error("oh no")));
+ });
+
+ test("first", () {
+ expect(wrapper.first, completion(equals(1)));
+ expect(emptyWrapper.first, throwsStateError);
+ });
+
+ test("last", () {
+ expect(wrapper.last, completion(equals(5)));
+ expect(emptyWrapper.last, throwsStateError);
+ });
+
+ test("single", () {
+ expect(wrapper.single, throwsStateError);
+ expect(singleWrapper.single, completion(equals(1)));
+ });
+
+ test("isBroadcast", () {
+ expect(wrapper.isBroadcast, isFalse);
+ var broadcastWrapper = new TypeSafeStream<int>(
+ new Stream<Object>.empty().asBroadcastStream());
+ expect(broadcastWrapper.isBroadcast, isTrue);
+ });
+
+ test("isEmpty", () {
+ expect(wrapper.isEmpty, completion(isFalse));
+ expect(emptyWrapper.isEmpty, completion(isTrue));
+ });
+
+ test("length", () {
+ expect(wrapper.length, completion(equals(5)));
+ expect(emptyWrapper.length, completion(equals(0)));
+ });
+
+ group("asBroadcastStream()", () {
+ test("with no parameters", () {
+ var broadcast = wrapper.asBroadcastStream();
+ expect(broadcast.toList(), completion(equals([1, 2, 3, 4, 5])));
+ expect(broadcast.toList(), completion(equals([1, 2, 3, 4, 5])));
+ });
+
+ test("with onListen", () {
+ var broadcast = wrapper.asBroadcastStream(
+ onListen: expectAsync((subscription) {
+ expect(subscription, new isInstanceOf<StreamSubscription<int>>());
+ subscription.pause();
+ }));
+
+ broadcast.listen(null);
+ expect(controller.isPaused, isTrue);
+ });
+
+ test("with onCancel", () {
+ var broadcast = wrapper.asBroadcastStream(
+ onCancel: expectAsync((subscription) {
+ expect(subscription, new isInstanceOf<StreamSubscription<int>>());
+ subscription.pause();
+ }));
+
+ broadcast.listen(null).cancel();
+ expect(controller.isPaused, isTrue);
+ });
+ });
+
+ test("asyncExpand()", () {
+ expect(
+ wrapper.asyncExpand((i) => new Stream.fromIterable([i, i])).toList(),
+ completion(equals([1, 1, 2, 2, 3, 3, 4, 4, 5, 5])));
+ });
+
+ test("asyncMap()", () {
+ expect(wrapper.asyncMap((i) => new Future.value(i * 2)).toList(),
+ completion(equals([2, 4, 6, 8, 10])));
+ });
+
+ group("distinct()", () {
+ test("without equals", () {
+ expect(wrapper.distinct().toList(),
+ completion(equals([1, 2, 3, 4, 5])));
+
+ expect(
+ new TypeSafeStream<int>(
+ new Stream<Object>.fromIterable([1, 1, 2, 2, 3, 3]))
+ .distinct().toList(),
+ completion(equals([1, 2, 3])));
+ });
+
+ test("with equals", () {
+ expect(wrapper.distinct((i1, i2) => (i1 ~/ 2 == i2 ~/ 2)).toList(),
+ completion(equals([1, 2, 4])));
+ });
+ });
+
+ group("drain()", () {
+ test("without a value", () {
+ expect(wrapper.drain(), completes);
+ expect(() => wrapper.drain(), throwsStateError);
+ });
+
+ test("with a value", () {
+ expect(wrapper.drain(12), completion(equals(12)));
+ });
+ });
+
+ test("expand()", () {
+ expect(
+ wrapper.expand((i) => [i, i]).toList(),
+ completion(equals([1, 1, 2, 2, 3, 3, 4, 4, 5, 5])));
+ });
+
+ group("firstWhere()", () {
+ test("finding a value", () {
+ expect(wrapper.firstWhere((i) => i > 3), completion(equals(4)));
+ });
+
+ test("finding no value", () {
+ expect(wrapper.firstWhere((i) => i > 5), throwsStateError);
+ });
+
+ test("with a default value", () {
+ expect(wrapper.firstWhere((i) => i > 5, defaultValue: () => "value"),
+ completion(equals("value")));
+ });
+ });
+
+ group("lastWhere()", () {
+ test("finding a value", () {
+ expect(wrapper.lastWhere((i) => i < 3), completion(equals(2)));
+ });
+
+ test("finding no value", () {
+ expect(wrapper.lastWhere((i) => i > 5), throwsStateError);
+ });
+
+ test("with a default value", () {
+ expect(wrapper.lastWhere((i) => i > 5, defaultValue: () => "value"),
+ completion(equals("value")));
+ });
+ });
+
+ group("singleWhere()", () {
+ test("finding a single value", () {
+ expect(wrapper.singleWhere((i) => i == 3), completion(equals(3)));
+ });
+
+ test("finding no value", () {
+ expect(wrapper.singleWhere((i) => i == 6), throwsStateError);
+ });
+
+ test("finding multiple values", () {
+ expect(wrapper.singleWhere((i) => i.isOdd), throwsStateError);
+ });
+ });
+
+ test("fold()", () {
+ expect(wrapper.fold("foo", (previous, i) => previous + i.toString()),
+ completion(equals("foo12345")));
+ });
+
+ test("forEach()", () async {
+ emptyWrapper.forEach(expectAsync((_) {}, count: 0));
+
+ var results = [];
+ await wrapper.forEach(results.add);
+ expect(results, equals([1, 2, 3, 4, 5]));
+ });
+
+ group("handleError()", () {
+ test("without a test", () {
+ expect(errorWrapper.handleError(expectAsync((error) {
+ expect(error, equals("oh no"));
+ })).toList(), completion(isEmpty));
+ });
+
+ test("with a matching test", () {
+ expect(errorWrapper.handleError(expectAsync((error) {
+ expect(error, equals("oh no"));
+ }), test: expectAsync((error) {
+ expect(error, equals("oh no"));
+ return true;
+ })).toList(), completion(isEmpty));
+ });
+
+ test("with a matching test", () {
+ expect(errorWrapper.handleError(expectAsync((_) {}, count: 0),
+ test: expectAsync((error) {
+ expect(error, equals("oh no"));
+ return false;
+ })).toList(), throwsA("oh no"));
+ });
+ });
+
+ group("listen()", () {
+ test("with a callback", () {
+ var subscription;
+ subscription = wrapper.listen(expectAsync((data) {
+ expect(data, equals(1));
+
+ subscription.onData(expectAsync((data) {
+ expect(data, equals(2));
+ subscription.cancel();
+ }));
+ }));
+ });
+
+ test("with a null callback", () {
+ expect(wrapper.listen(null).asFuture(), completes);
+ });
+ });
+
+ test("map()", () {
+ expect(wrapper.map((i) => i * 2).toList(),
+ completion(equals([2, 4, 6, 8, 10])));
+ });
+
+ test("pipe()", () {
+ var consumer = new StreamController<T>();
+ expect(wrapper.pipe(consumer), completes);
+ expect(consumer.stream.toList(), completion(equals([1, 2, 3, 4, 5])));
+ });
+
+ test("reduce()", () {
+ expect(wrapper.reduce((value, i) => value + i), completion(equals(15)));
+ expect(emptyWrapper.reduce((value, i) => value + i), throwsStateError);
+ });
+
+ test("skipWhile()", () {
+ expect(wrapper.skipWhile((i) => i < 3).toList(),
+ completion(equals([3, 4, 5])));
+ });
+
+ test("takeWhile()", () {
+ expect(wrapper.takeWhile((i) => i < 3).toList(),
+ completion(equals([1, 2])));
+ });
+
+ test("toSet()", () {
+ expect(wrapper.toSet(), completion(unorderedEquals([1, 2, 3, 4, 5])));
+ expect(
+ new TypeSafeStream<int>(
+ new Stream<Object>.fromIterable([1, 1, 2, 2, 3, 3]))
+ .toSet(),
+ completion(unorderedEquals([1, 2, 3])));
+ });
+
+ test("transform()", () {
+ var transformer = new StreamTransformer<int, String>.fromHandlers(
+ handleData: (data, sink) {
+ sink.add(data.toString());
+ });
+
+ expect(wrapper.transform(transformer).toList(),
+ completion(equals(["1", "2", "3", "4", "5"])));
+ });
+
+ test("where()", () {
+ expect(wrapper.where((i) => i.isOdd).toList(),
+ completion(equals([1, 3, 5])));
+ });
+
+ group("any()", () {
+ test("with matches", () {
+ expect(wrapper.any((i) => i > 3), completion(isTrue));
+ });
+
+ test("without matches", () {
+ expect(wrapper.any((i) => i > 5), completion(isFalse));
+ });
+ });
+
+ group("every()", () {
+ test("with all matches", () {
+ expect(wrapper.every((i) => i < 6), completion(isTrue));
+ });
+
+ test("with some non-matches", () {
+ expect(wrapper.every((i) => i > 3), completion(isFalse));
+ });
+ });
+
+ group("skip()", () {
+ test("with a valid index", () {
+ expect(wrapper.skip(3).toList(), completion(equals([4, 5])));
+ });
+
+ test("with a longer index than length", () {
+ expect(wrapper.skip(6).toList(), completion(isEmpty));
+ });
+
+ test("with a negative index", () {
+ expect(() => wrapper.skip(-1), throwsArgumentError);
+ });
+ });
+
+ group("take()", () {
+ test("with a valid index", () {
+ expect(wrapper.take(3).toList(), completion(equals([1, 2, 3])));
+ });
+
+ test("with a longer index than length", () {
+ expect(wrapper.take(6).toList(), completion(equals([1, 2, 3, 4, 5])));
+ });
+
+ test("with a negative index", () {
+ expect(wrapper.take(-1).toList(), completion(isEmpty));
+ });
+ });
+
+ group("elementAt()", () {
+ test("with a valid index", () {
+ expect(wrapper.elementAt(3), completion(equals(4)));
+ });
+
+ test("with too high an index", () {
+ expect(wrapper.elementAt(6), throwsRangeError);
+ });
+
+ test("with a negative index", () {
+ expect(wrapper.elementAt(-1), throwsArgumentError);
+ });
+ });
+
+ group("contains()", () {
+ test("with an element", () {
+ expect(wrapper.contains(2), completion(isTrue));
+ });
+
+ test("with a non-element", () {
+ expect(wrapper.contains(6), completion(isFalse));
+ });
+
+ test("with a non-element of a different type", () {
+ expect(wrapper.contains("foo"), completion(isFalse));
+ });
+ });
+
+ group("join()", () {
+ test("without a separator", () {
+ expect(wrapper.join(), completion(equals("12345")));
+ });
+
+ test("with a separator", () {
+ expect(wrapper.join(" "), completion(equals("1 2 3 4 5")));
+ });
+ });
+
+ test("toString()", () {
+ expect(wrapper.toString(), contains("Stream"));
+ });
+ });
+
+ group("with invalid types", () {
+ var wrapper;
+ var singleWrapper;
+ setUp(() {
+ wrapper = new TypeSafeStream<int>(
+ new Stream<Object>.fromIterable(["foo", "bar", "baz"]));
+ singleWrapper = new TypeSafeStream<int>(
+ new Stream<Object>.fromIterable(["foo"]));
+ });
+
+ group("throws a CastError for", () {
+ test("first", () {
+ expect(wrapper.first, throwsCastError);
+ });
+
+ test("last", () {
+ expect(wrapper.last, throwsCastError);
+ });
+
+ test("single", () {
+ expect(singleWrapper.single, throwsCastError);
+ });
+
+ test("asBroadcastStream()", () {
+ var broadcast = wrapper.asBroadcastStream();
+ expect(broadcast.first, throwsCastError);
+ });
+
+ test("asyncExpand()", () {
+ expect(wrapper.asyncExpand(expectAsync((_) {}, count: 0)).first,
+ throwsCastError);
+ });
+
+ test("asyncMap()", () {
+ expect(wrapper.asyncMap(expectAsync((_) {}, count: 0)).first,
+ throwsCastError);
+ });
+
+ group("distinct()", () {
+ test("without equals", () {
+ expect(wrapper.distinct().first, throwsCastError);
+ });
+
+ test("with equals", () {
+ expect(wrapper.distinct(expectAsync((_, __) {}, count: 0)).first,
+ throwsCastError);
+ });
+ });
+
+ test("expand()", () {
+ expect(wrapper.expand(expectAsync((_) {}, count: 0)).first,
+ throwsCastError);
+ });
+
+ test("firstWhere()", () {
+ expect(wrapper.firstWhere(expectAsync((_) {}, count: 0)),
+ throwsCastError);
+ });
+
+ test("lastWhere()", () {
+ expect(wrapper.lastWhere(expectAsync((_) {}, count: 0)),
+ throwsCastError);
+ });
+
+ test("singleWhere()", () {
+ expect(wrapper.singleWhere(expectAsync((_) {}, count: 0)),
+ throwsCastError);
+ });
+
+ test("fold()", () {
+ expect(wrapper.fold("foo", expectAsync((_, __) {}, count: 0)),
+ throwsCastError);
+ });
+
+ test("forEach()", () async {
+ expect(wrapper.forEach(expectAsync((_) {}, count: 0)), throwsCastError);
+ });
+
+ test("handleError()", () {
+ expect(wrapper.handleError(expectAsync((_) {}, count: 0)).first,
+ throwsCastError);
+ });
+
+ test("listen()", () {
+ expect(() => wrapper.take(1).listen(expectAsync((_) {}, count: 0)),
+ throwsZonedCastError);
+ });
+
+ test("map()", () {
+ expect(wrapper.map(expectAsync((_) {}, count: 0)).first,
+ throwsCastError);
+ });
+
+ test("reduce()", () {
+ expect(wrapper.reduce(expectAsync((_, __) {}, count: 0)),
+ throwsCastError);
+ });
+
+ test("skipWhile()", () {
+ expect(wrapper.skipWhile(expectAsync((_) {}, count: 0)).first,
+ throwsCastError);
+ });
+
+ test("takeWhile()", () {
+ expect(wrapper.takeWhile(expectAsync((_) {}, count: 0)).first,
+ throwsCastError);
+ });
+
+ test("toList()", () async {
+ var list = await wrapper.toList();
+ expect(() => list.first, throwsCastError);
+ }, skip: "Re-enable this when test can run DDC (test#414).");
+
+ test("toSet()", () async {
+ var asSet = await wrapper.toSet();
+ expect(() => asSet.first, throwsCastError);
+ }, skip: "Re-enable this when test can run DDC (test#414).");
+
+ test("where()", () {
+ expect(wrapper.where(expectAsync((_) {}, count: 0)).first,
+ throwsCastError);
+ });
+
+ test("any()", () {
+ expect(wrapper.any(expectAsync((_) {}, count: 0)), throwsCastError);
+ });
+
+ test("every()", () {
+ expect(wrapper.every(expectAsync((_) {}, count: 0)), throwsCastError);
+ });
+
+ test("skip()", () {
+ expect(wrapper.skip(1).first, throwsCastError);
+ });
+
+ test("take()", () {
+ expect(wrapper.take(1).first, throwsCastError);
+ });
+
+ test("elementAt()", () {
+ expect(wrapper.elementAt(1), throwsCastError);
+ });
+ });
+
+ group("doesn't throw a CastError for", () {
+ test("single", () {
+ expect(wrapper.single, throwsStateError);
+ });
+
+ test("length", () {
+ expect(wrapper.length, completion(equals(3)));
+ });
+
+ test("isBroadcast", () {
+ expect(wrapper.isBroadcast, isFalse);
+ });
+
+ test("isEmpty", () {
+ expect(wrapper.isEmpty, completion(isFalse));
+ });
+
+ group("drain()", () {
+ test("without a value", () {
+ expect(wrapper.drain(), completes);
+ expect(() => wrapper.drain(), throwsStateError);
+ });
+
+ test("with a value", () {
+ expect(wrapper.drain(12), completion(equals(12)));
+ });
+ });
+
+ test("skip()", () {
+ expect(() => wrapper.skip(-1), throwsArgumentError);
+ });
+
+ group("elementAt()", () {
+ test("with too high an index", () {
+ expect(wrapper.elementAt(6), throwsRangeError);
+ });
+
+ test("with a negative index", () {
+ expect(wrapper.elementAt(-1), throwsArgumentError);
+ });
+ });
+
+ group("contains()", () {
+ test("with an element", () {
+ expect(wrapper.contains("foo"), completion(isTrue));
+ });
+
+ test("with a non-element", () {
+ expect(wrapper.contains("qux"), completion(isFalse));
+ });
+
+ test("with a non-element of a different type", () {
+ expect(wrapper.contains(1), completion(isFalse));
+ });
+ });
+
+ group("join()", () {
+ test("without a separator", () {
+ expect(wrapper.join(), completion(equals("foobarbaz")));
+ });
+
+ test("with a separator", () {
+ expect(wrapper.join(" "), completion(equals("foo bar baz")));
+ });
+ });
+
+ test("toString()", () {
+ expect(wrapper.toString(), contains("Stream"));
+ });
+ });
+ });
+}
diff --git a/test/utils.dart b/test/utils.dart
index c32ea2c..7b65bcb 100644
--- a/test/utils.dart
+++ b/test/utils.dart
@@ -16,6 +16,29 @@
/// Returns a function that fails the test if it is ever called.
unreachable(String name) => ([a, b]) => fail("Unreachable: $name");
+// TODO(nweiz): Use the version of this in test when test#418 is fixed.
+/// A matcher that runs a callback in its own zone and asserts that that zone
+/// emits an error that matches [matcher].
+Matcher throwsZoned(matcher) => predicate((callback) {
+ var firstError = true;
+ runZoned(callback, onError: expectAsync((error, stackTrace) {
+ if (firstError) {
+ expect(error, matcher);
+ firstError = false;
+ } else {
+ registerException(error, stackTrace);
+ }
+ }, max: -1));
+ return true;
+});
+
+/// A matcher that runs a callback in its own zone and asserts that that zone
+/// emits a [CastError].
+final throwsZonedCastError = throwsZoned(new isInstanceOf<CastError>());
+
+/// A matcher that matches a callback or future that throws a [CastError].
+final throwsCastError = throwsA(new isInstanceOf<CastError>());
+
/// A badly behaved stream which throws if it's ever listened to.
///
/// Can be used to test cases where a stream should not be used.