Forward DelegatingStream.typed to Stream.cast (#54)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c8f4d63..0807821 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 2.0.6
+
+* Add further support for Dart 2.0 library changes to `Stream`.
+
## 2.0.5
* Fix Dart 2.0 [runtime cast errors][sdk#27223] in `StreamQueue`.
diff --git a/lib/src/delegate/stream.dart b/lib/src/delegate/stream.dart
index d08b4ab..116d11f 100644
--- a/lib/src/delegate/stream.dart
+++ b/lib/src/delegate/stream.dart
@@ -4,8 +4,6 @@
import 'dart:async';
-import '../typed/stream.dart';
-
/// Simple delegating wrapper around a [Stream].
///
/// Subclasses can override individual methods, or use this to expose only the
@@ -23,6 +21,5 @@
/// original generic type, by asserting that its events are instances of `T`
/// whenever they're provided. If they're not, the stream throws a
/// [CastError].
- static Stream<T> typed<T>(Stream stream) =>
- stream is Stream<T> ? stream : new TypeSafeStream<T>(stream);
+ static Stream<T> typed<T>(Stream stream) => stream.cast();
}
diff --git a/lib/src/typed/stream.dart b/lib/src/typed/stream.dart
deleted file mode 100644
index b3b0513..0000000
--- a/lib/src/typed/stream.dart
+++ /dev/null
@@ -1,135 +0,0 @@
-// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-import 'dart:async';
-
-import 'package:collection/collection.dart';
-
-import '../utils.dart';
-import 'stream_subscription.dart';
-import '../delegate/event_sink.dart';
-
-class TypeSafeStream<T> extends Stream<T> {
- final Stream _stream;
-
- Future<T> get first async => (await _stream.first) as T;
- Future<T> get last async => (await _stream.last) as T;
- Future<T> get single async => (await _stream.single) as T;
-
- bool get isBroadcast => _stream.isBroadcast;
- Future<bool> get isEmpty => _stream.isEmpty;
- Future<int> get length => _stream.length;
-
- TypeSafeStream(this._stream);
-
- Stream<T> asBroadcastStream(
- {void onListen(StreamSubscription<T> subscription),
- void onCancel(StreamSubscription<T> subscription)}) {
- return new TypeSafeStream<T>(_stream.asBroadcastStream(
- onListen: onListen == null
- ? null
- : (subscription) =>
- onListen(new TypeSafeStreamSubscription<T>(subscription)),
- onCancel: onCancel == null
- ? null
- : (subscription) =>
- onCancel(new TypeSafeStreamSubscription<T>(subscription))));
- }
-
- Stream<E> asyncExpand<E>(Stream<E> convert(T event)) =>
- _stream.asyncExpand(_validateType(convert));
-
- Stream<E> asyncMap<E>(convert(T event)) =>
- _stream.asyncMap(_validateType(convert));
-
- Stream<T> distinct([bool equals(T previous, T next)]) =>
- new TypeSafeStream<T>(_stream.distinct(equals == null
- ? null
- : (previous, next) => equals(previous as T, next as T)));
-
- Future<E> drain<E>([E futureValue]) => _stream.drain(futureValue);
-
- Stream<S> expand<S>(Iterable<S> convert(T value)) =>
- _stream.expand(_validateType(convert));
-
- Future<T> firstWhere(bool test(T element),
- {Object defaultValue(), T orElse()}) =>
- _stream.firstWhere(_validateType(test),
- defaultValue: defaultValue, orElse: orElse);
-
- Future<T> lastWhere(bool test(T element),
- {Object defaultValue(), T orElse()}) =>
- _stream.lastWhere(_validateType(test),
- defaultValue: defaultValue, orElse: orElse);
-
- Future<T> singleWhere(bool test(T element), {T orElse()}) async =>
- await _stream.singleWhere(_validateType(test), orElse: orElse);
-
- Future<S> fold<S>(S initialValue, S combine(S previous, T element)) =>
- _stream.fold(
- initialValue, (previous, element) => combine(previous, element as T));
-
- Future forEach(void action(T element)) =>
- _stream.forEach(_validateType(action));
-
- Stream<T> handleError(Function onError, {bool test(error)}) =>
- new TypeSafeStream<T>(_stream.handleError(onError, test: test));
-
- StreamSubscription<T> listen(void onData(T value),
- {Function onError, void onDone(), bool cancelOnError}) =>
- new TypeSafeStreamSubscription<T>(_stream.listen(_validateType(onData),
- onError: onError, onDone: onDone, cancelOnError: cancelOnError));
-
- Stream<S> map<S>(S convert(T event)) => _stream.map(_validateType(convert));
-
- // Don't forward to `_stream.pipe` because we want the consumer to see the
- // type-asserted stream.
- Future pipe(StreamConsumer<T> consumer) =>
- consumer.addStream(this).then((_) => consumer.close());
-
- Future<T> reduce(T combine(T previous, T element)) async {
- var result = await _stream
- .reduce((previous, element) => combine(previous as T, element as T));
- return result as T;
- }
-
- Stream<T> skipWhile(bool test(T element)) =>
- new TypeSafeStream<T>(_stream.skipWhile(_validateType(test)));
-
- Stream<T> takeWhile(bool test(T element)) =>
- new TypeSafeStream<T>(_stream.takeWhile(_validateType(test)));
-
- Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) =>
- new TypeSafeStream<T>(_stream.timeout(timeLimit,
- onTimeout: (sink) => onTimeout(DelegatingEventSink.typed(sink))));
-
- Future<List<T>> toList() async =>
- DelegatingList.typed<T>(await _stream.toList());
-
- Future<Set<T>> toSet() async => DelegatingSet.typed<T>(await _stream.toSet());
-
- // Don't forward to `_stream.transform` because we want the transformer to see
- // the type-asserted stream.
- Stream<S> transform<S>(StreamTransformer<T, S> transformer) =>
- transformer.bind(this);
-
- Stream<T> where(bool test(T element)) =>
- new TypeSafeStream<T>(_stream.where(_validateType(test)));
-
- Future<bool> every(bool test(T element)) =>
- _stream.every(_validateType(test));
-
- Future<bool> any(bool test(T element)) => _stream.any(_validateType(test));
- Stream<T> skip(int count) => new TypeSafeStream<T>(_stream.skip(count));
- Stream<T> take(int count) => new TypeSafeStream<T>(_stream.take(count));
- Future<T> elementAt(int index) async => (await _stream.elementAt(index)) as T;
- Future<bool> contains(Object needle) => _stream.contains(needle);
- Future<String> join([String separator = ""]) => _stream.join(separator);
- String toString() => _stream.toString();
-
- /// Returns a version of [function] that asserts that its argument is an
- /// instance of `T`.
- UnaryFunction<dynamic, S> _validateType<S>(S function(T value)) =>
- function == null ? null : (value) => function(value as T);
-}
diff --git a/pubspec.yaml b/pubspec.yaml
index e163e4c..e8dafaa 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: async
-version: 2.0.5
+version: 2.0.6
author: Dart Team <misc@dartlang.org>
description: Utility functions and classes related to the 'dart:async' library.
homepage: https://www.github.com/dart-lang/async
diff --git a/test/typed_wrapper/stream_test.dart b/test/typed_wrapper/stream_test.dart
deleted file mode 100644
index 61d3ff4..0000000
--- a/test/typed_wrapper/stream_test.dart
+++ /dev/null
@@ -1,610 +0,0 @@
-// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-@Skip("Re-enable this when test can run DDC (test#414).")
-
-import 'dart:async';
-
-import "package:async/src/typed/stream.dart";
-import "package:test/test.dart";
-
-import '../utils.dart';
-
-void main() {
- group("with valid types, forwards", () {
- var controller;
- var wrapper;
- var emptyWrapper;
- var singleWrapper;
- var errorWrapper;
- setUp(() {
- controller = new StreamController<Object>()
- ..add(1)
- ..add(2)
- ..add(3)
- ..add(4)
- ..add(5)
- ..close();
-
- // TODO(nweiz): Use public methods when test#414 is fixed and we can run
- // this on DDC.
- wrapper = new TypeSafeStream<int>(controller.stream);
- emptyWrapper = new TypeSafeStream<int>(new Stream<Object>.empty());
- singleWrapper =
- new TypeSafeStream<int>(new Stream<Object>.fromIterable([1]));
- errorWrapper = new TypeSafeStream<int>(
- new Stream<Object>.fromFuture(new Future.error("oh no")));
- });
-
- test("first", () {
- expect(wrapper.first, completion(equals(1)));
- expect(emptyWrapper.first, throwsStateError);
- });
-
- test("last", () {
- expect(wrapper.last, completion(equals(5)));
- expect(emptyWrapper.last, throwsStateError);
- });
-
- test("single", () {
- expect(wrapper.single, throwsStateError);
- expect(singleWrapper.single, completion(equals(1)));
- });
-
- test("isBroadcast", () {
- expect(wrapper.isBroadcast, isFalse);
- var broadcastWrapper = new TypeSafeStream<int>(
- new Stream<Object>.empty().asBroadcastStream());
- expect(broadcastWrapper.isBroadcast, isTrue);
- });
-
- test("isEmpty", () {
- expect(wrapper.isEmpty, completion(isFalse));
- expect(emptyWrapper.isEmpty, completion(isTrue));
- });
-
- test("length", () {
- expect(wrapper.length, completion(equals(5)));
- expect(emptyWrapper.length, completion(equals(0)));
- });
-
- group("asBroadcastStream()", () {
- test("with no parameters", () {
- var broadcast = wrapper.asBroadcastStream();
- expect(broadcast.toList(), completion(equals([1, 2, 3, 4, 5])));
- expect(broadcast.toList(), completion(equals([1, 2, 3, 4, 5])));
- });
-
- test("with onListen", () {
- var broadcast =
- wrapper.asBroadcastStream(onListen: expectAsync1((subscription) {
- expect(subscription, new isInstanceOf<StreamSubscription<int>>());
- subscription.pause();
- }));
-
- broadcast.listen(null);
- expect(controller.isPaused, isTrue);
- });
-
- test("with onCancel", () {
- var broadcast =
- wrapper.asBroadcastStream(onCancel: expectAsync1((subscription) {
- expect(subscription, new isInstanceOf<StreamSubscription<int>>());
- subscription.pause();
- }));
-
- broadcast.listen(null).cancel();
- expect(controller.isPaused, isTrue);
- });
- });
-
- test("asyncExpand()", () {
- expect(
- wrapper.asyncExpand((i) => new Stream.fromIterable([i, i])).toList(),
- completion(equals([1, 1, 2, 2, 3, 3, 4, 4, 5, 5])));
- });
-
- test("asyncMap()", () {
- expect(wrapper.asyncMap((i) => new Future.value(i * 2)).toList(),
- completion(equals([2, 4, 6, 8, 10])));
- });
-
- group("distinct()", () {
- test("without equals", () {
- expect(
- wrapper.distinct().toList(), completion(equals([1, 2, 3, 4, 5])));
-
- expect(
- new TypeSafeStream<int>(
- new Stream<Object>.fromIterable([1, 1, 2, 2, 3, 3]))
- .distinct()
- .toList(),
- completion(equals([1, 2, 3])));
- });
-
- test("with equals", () {
- expect(wrapper.distinct((i1, i2) => (i1 ~/ 2 == i2 ~/ 2)).toList(),
- completion(equals([1, 2, 4])));
- });
- });
-
- group("drain()", () {
- test("without a value", () {
- expect(wrapper.drain(), completes);
- expect(() => wrapper.drain(), throwsStateError);
- });
-
- test("with a value", () {
- expect(wrapper.drain(12), completion(equals(12)));
- });
- });
-
- test("expand()", () {
- expect(wrapper.expand((i) => [i, i]).toList(),
- completion(equals([1, 1, 2, 2, 3, 3, 4, 4, 5, 5])));
- });
-
- group("firstWhere()", () {
- test("finding a value", () {
- expect(wrapper.firstWhere((i) => i > 3), completion(equals(4)));
- });
-
- test("finding no value", () {
- expect(wrapper.firstWhere((i) => i > 5), throwsStateError);
- });
-
- test("with a default value", () {
- expect(wrapper.firstWhere((i) => i > 5, defaultValue: () => "value"),
- completion(equals("value")));
- });
- });
-
- group("lastWhere()", () {
- test("finding a value", () {
- expect(wrapper.lastWhere((i) => i < 3), completion(equals(2)));
- });
-
- test("finding no value", () {
- expect(wrapper.lastWhere((i) => i > 5), throwsStateError);
- });
-
- test("with a default value", () {
- expect(wrapper.lastWhere((i) => i > 5, defaultValue: () => "value"),
- completion(equals("value")));
- });
- });
-
- group("singleWhere()", () {
- test("finding a single value", () {
- expect(wrapper.singleWhere((i) => i == 3), completion(equals(3)));
- });
-
- test("finding no value", () {
- expect(wrapper.singleWhere((i) => i == 6), throwsStateError);
- });
-
- test("finding multiple values", () {
- expect(wrapper.singleWhere((i) => i.isOdd), throwsStateError);
- });
- });
-
- test("fold()", () {
- expect(wrapper.fold("foo", (previous, i) => previous + i.toString()),
- completion(equals("foo12345")));
- });
-
- test("forEach()", () async {
- emptyWrapper.forEach(expectAsync1((_) {}, count: 0));
-
- var results = [];
- await wrapper.forEach(results.add);
- expect(results, equals([1, 2, 3, 4, 5]));
- });
-
- group("handleError()", () {
- test("without a test", () {
- expect(
- errorWrapper.handleError(expectAsync1((error) {
- expect(error, equals("oh no"));
- })).toList(),
- completion(isEmpty));
- });
-
- test("with a matching test", () {
- expect(
- errorWrapper.handleError(expectAsync1((error) {
- expect(error, equals("oh no"));
- }), test: expectAsync1((error) {
- expect(error, equals("oh no"));
- return true;
- })).toList(),
- completion(isEmpty));
- });
-
- test("with a matching test", () {
- expect(
- errorWrapper.handleError(expectAsync1((_) {}, count: 0),
- test: expectAsync1((error) {
- expect(error, equals("oh no"));
- return false;
- })).toList(),
- throwsA("oh no"));
- });
- });
-
- group("listen()", () {
- test("with a callback", () {
- var subscription;
- subscription = wrapper.listen(expectAsync1((data) {
- expect(data, equals(1));
-
- subscription.onData(expectAsync1((data) {
- expect(data, equals(2));
- subscription.cancel();
- }));
- }));
- });
-
- test("with a null callback", () {
- expect(wrapper.listen(null).asFuture(), completes);
- });
- });
-
- test("map()", () {
- expect(wrapper.map((i) => i * 2).toList(),
- completion(equals([2, 4, 6, 8, 10])));
- });
-
- test("pipe()", () {
- var consumer = new StreamController();
- expect(wrapper.pipe(consumer), completes);
- expect(consumer.stream.toList(), completion(equals([1, 2, 3, 4, 5])));
- });
-
- test("reduce()", () {
- expect(wrapper.reduce((value, i) => value + i), completion(equals(15)));
- expect(emptyWrapper.reduce((value, i) => value + i), throwsStateError);
- });
-
- test("skipWhile()", () {
- expect(wrapper.skipWhile((i) => i < 3).toList(),
- completion(equals([3, 4, 5])));
- });
-
- test("takeWhile()", () {
- expect(
- wrapper.takeWhile((i) => i < 3).toList(), completion(equals([1, 2])));
- });
-
- test("toSet()", () {
- expect(wrapper.toSet(), completion(unorderedEquals([1, 2, 3, 4, 5])));
- expect(
- new TypeSafeStream<int>(
- new Stream<Object>.fromIterable([1, 1, 2, 2, 3, 3])).toSet(),
- completion(unorderedEquals([1, 2, 3])));
- });
-
- test("transform()", () {
- var transformer = new StreamTransformer<int, String>.fromHandlers(
- handleData: (data, sink) {
- sink.add(data.toString());
- });
-
- expect(wrapper.transform(transformer).toList(),
- completion(equals(["1", "2", "3", "4", "5"])));
- });
-
- test("where()", () {
- expect(wrapper.where((i) => i.isOdd).toList(),
- completion(equals([1, 3, 5])));
- });
-
- group("any()", () {
- test("with matches", () {
- expect(wrapper.any((i) => i > 3), completion(isTrue));
- });
-
- test("without matches", () {
- expect(wrapper.any((i) => i > 5), completion(isFalse));
- });
- });
-
- group("every()", () {
- test("with all matches", () {
- expect(wrapper.every((i) => i < 6), completion(isTrue));
- });
-
- test("with some non-matches", () {
- expect(wrapper.every((i) => i > 3), completion(isFalse));
- });
- });
-
- group("skip()", () {
- test("with a valid index", () {
- expect(wrapper.skip(3).toList(), completion(equals([4, 5])));
- });
-
- test("with a longer index than length", () {
- expect(wrapper.skip(6).toList(), completion(isEmpty));
- });
-
- test("with a negative index", () {
- expect(() => wrapper.skip(-1), throwsArgumentError);
- });
- });
-
- group("take()", () {
- test("with a valid index", () {
- expect(wrapper.take(3).toList(), completion(equals([1, 2, 3])));
- });
-
- test("with a longer index than length", () {
- expect(wrapper.take(6).toList(), completion(equals([1, 2, 3, 4, 5])));
- });
-
- test("with a negative index", () {
- expect(wrapper.take(-1).toList(), completion(isEmpty));
- });
- });
-
- group("elementAt()", () {
- test("with a valid index", () {
- expect(wrapper.elementAt(3), completion(equals(4)));
- });
-
- test("with too high an index", () {
- expect(wrapper.elementAt(6), throwsRangeError);
- });
-
- test("with a negative index", () {
- expect(wrapper.elementAt(-1), throwsArgumentError);
- });
- });
-
- group("contains()", () {
- test("with an element", () {
- expect(wrapper.contains(2), completion(isTrue));
- });
-
- test("with a non-element", () {
- expect(wrapper.contains(6), completion(isFalse));
- });
-
- test("with a non-element of a different type", () {
- expect(wrapper.contains("foo"), completion(isFalse));
- });
- });
-
- group("join()", () {
- test("without a separator", () {
- expect(wrapper.join(), completion(equals("12345")));
- });
-
- test("with a separator", () {
- expect(wrapper.join(" "), completion(equals("1 2 3 4 5")));
- });
- });
-
- test("toString()", () {
- expect(wrapper.toString(), contains("Stream"));
- });
- });
-
- group("with invalid types", () {
- var wrapper;
- var singleWrapper;
- setUp(() {
- wrapper = new TypeSafeStream<int>(
- new Stream<Object>.fromIterable(["foo", "bar", "baz"]));
- singleWrapper =
- new TypeSafeStream<int>(new Stream<Object>.fromIterable(["foo"]));
- });
-
- group("throws a CastError for", () {
- test("first", () {
- expect(wrapper.first, throwsCastError);
- });
-
- test("last", () {
- expect(wrapper.last, throwsCastError);
- });
-
- test("single", () {
- expect(singleWrapper.single, throwsCastError);
- });
-
- test("asBroadcastStream()", () {
- var broadcast = wrapper.asBroadcastStream();
- expect(broadcast.first, throwsCastError);
- });
-
- test("asyncExpand()", () {
- expect(wrapper.asyncExpand(expectAsync1((_) {}, count: 0)).first,
- throwsCastError);
- });
-
- test("asyncMap()", () {
- expect(wrapper.asyncMap(expectAsync1((_) {}, count: 0)).first,
- throwsCastError);
- });
-
- group("distinct()", () {
- test("without equals", () {
- expect(wrapper.distinct().first, throwsCastError);
- });
-
- test("with equals", () {
- expect(wrapper.distinct(expectAsync2((_, __) {}, count: 0)).first,
- throwsCastError);
- });
- });
-
- test("expand()", () {
- expect(wrapper.expand(expectAsync1((_) {}, count: 0)).first,
- throwsCastError);
- });
-
- test("firstWhere()", () {
- expect(wrapper.firstWhere(expectAsync1((_) {}, count: 0)),
- throwsCastError);
- });
-
- test("lastWhere()", () {
- expect(
- wrapper.lastWhere(expectAsync1((_) {}, count: 0)), throwsCastError);
- });
-
- test("singleWhere()", () {
- expect(wrapper.singleWhere(expectAsync1((_) {}, count: 0)),
- throwsCastError);
- });
-
- test("fold()", () {
- expect(wrapper.fold("foo", expectAsync2((_, __) {}, count: 0)),
- throwsCastError);
- });
-
- test("forEach()", () async {
- expect(
- wrapper.forEach(expectAsync1((_) {}, count: 0)), throwsCastError);
- });
-
- test("handleError()", () {
- expect(wrapper.handleError(expectAsync1((_) {}, count: 0)).first,
- throwsCastError);
- });
-
- test("listen()", () {
- expect(() => wrapper.take(1).listen(expectAsync1((_) {}, count: 0)),
- throwsZonedCastError);
- });
-
- test("map()", () {
- expect(
- wrapper.map(expectAsync1((_) {}, count: 0)).first, throwsCastError);
- });
-
- test("reduce()", () {
- expect(wrapper.reduce(expectAsync2((_, __) {}, count: 0)),
- throwsCastError);
- });
-
- test("skipWhile()", () {
- expect(wrapper.skipWhile(expectAsync1((_) {}, count: 0)).first,
- throwsCastError);
- });
-
- test("takeWhile()", () {
- expect(wrapper.takeWhile(expectAsync1((_) {}, count: 0)).first,
- throwsCastError);
- });
-
- test("toList()", () async {
- var list = await wrapper.toList();
- expect(() => list.first, throwsCastError);
- });
-
- test("toSet()", () async {
- var asSet = await wrapper.toSet();
- expect(() => asSet.first, throwsCastError);
- });
-
- test("where()", () {
- expect(wrapper.where(expectAsync1((_) {}, count: 0)).first,
- throwsCastError);
- });
-
- test("any()", () {
- expect(wrapper.any(expectAsync1((_) {}, count: 0)), throwsCastError);
- });
-
- test("every()", () {
- expect(wrapper.every(expectAsync1((_) {}, count: 0)), throwsCastError);
- });
-
- test("skip()", () {
- expect(wrapper.skip(1).first, throwsCastError);
- });
-
- test("take()", () {
- expect(wrapper.take(1).first, throwsCastError);
- });
-
- test("elementAt()", () {
- expect(wrapper.elementAt(1), throwsCastError);
- });
- });
-
- group("doesn't throw a CastError for", () {
- test("single", () {
- expect(wrapper.single, throwsStateError);
- });
-
- test("length", () {
- expect(wrapper.length, completion(equals(3)));
- });
-
- test("isBroadcast", () {
- expect(wrapper.isBroadcast, isFalse);
- });
-
- test("isEmpty", () {
- expect(wrapper.isEmpty, completion(isFalse));
- });
-
- group("drain()", () {
- test("without a value", () {
- expect(wrapper.drain(), completes);
- expect(() => wrapper.drain(), throwsStateError);
- });
-
- test("with a value", () {
- expect(wrapper.drain(12), completion(equals(12)));
- });
- });
-
- test("skip()", () {
- expect(() => wrapper.skip(-1), throwsArgumentError);
- });
-
- group("elementAt()", () {
- test("with too high an index", () {
- expect(wrapper.elementAt(6), throwsRangeError);
- });
-
- test("with a negative index", () {
- expect(wrapper.elementAt(-1), throwsArgumentError);
- });
- });
-
- group("contains()", () {
- test("with an element", () {
- expect(wrapper.contains("foo"), completion(isTrue));
- });
-
- test("with a non-element", () {
- expect(wrapper.contains("qux"), completion(isFalse));
- });
-
- test("with a non-element of a different type", () {
- expect(wrapper.contains(1), completion(isFalse));
- });
- });
-
- group("join()", () {
- test("without a separator", () {
- expect(wrapper.join(), completion(equals("foobarbaz")));
- });
-
- test("with a separator", () {
- expect(wrapper.join(" "), completion(equals("foo bar baz")));
- });
- });
-
- test("toString()", () {
- expect(wrapper.toString(), contains("Stream"));
- });
- });
- });
-}