blob: 05cde3a93e7a8ae572bbdf0b3a8795fc857f7662 [file] [log] [blame]
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import "package:async/src/typed/stream.dart";
import "package:test/test.dart";
import '../utils.dart';
void main() {
group("with valid types, forwards", () {
var controller;
var wrapper;
var emptyWrapper;
var singleWrapper;
var errorWrapper;
setUp(() {
controller = new StreamController<Object>()
..add(1)..add(2)..add(3)..add(4)..add(5)..close();
// TODO(nweiz): Use public methods when test#414 is fixed and we can run
// this on DDC.
wrapper = new TypeSafeStream<int>(controller.stream);
emptyWrapper = new TypeSafeStream<int>(new Stream<Object>.empty());
singleWrapper = new TypeSafeStream<int>(
new Stream<Object>.fromIterable([1]));
errorWrapper = new TypeSafeStream<int>(
new Stream<Object>.fromFuture(new Future.error("oh no")));
});
test("first", () {
expect(wrapper.first, completion(equals(1)));
expect(emptyWrapper.first, throwsStateError);
});
test("last", () {
expect(wrapper.last, completion(equals(5)));
expect(emptyWrapper.last, throwsStateError);
});
test("single", () {
expect(wrapper.single, throwsStateError);
expect(singleWrapper.single, completion(equals(1)));
});
test("isBroadcast", () {
expect(wrapper.isBroadcast, isFalse);
var broadcastWrapper = new TypeSafeStream<int>(
new Stream<Object>.empty().asBroadcastStream());
expect(broadcastWrapper.isBroadcast, isTrue);
});
test("isEmpty", () {
expect(wrapper.isEmpty, completion(isFalse));
expect(emptyWrapper.isEmpty, completion(isTrue));
});
test("length", () {
expect(wrapper.length, completion(equals(5)));
expect(emptyWrapper.length, completion(equals(0)));
});
group("asBroadcastStream()", () {
test("with no parameters", () {
var broadcast = wrapper.asBroadcastStream();
expect(broadcast.toList(), completion(equals([1, 2, 3, 4, 5])));
expect(broadcast.toList(), completion(equals([1, 2, 3, 4, 5])));
});
test("with onListen", () {
var broadcast = wrapper.asBroadcastStream(
onListen: expectAsync((subscription) {
expect(subscription, new isInstanceOf<StreamSubscription<int>>());
subscription.pause();
}));
broadcast.listen(null);
expect(controller.isPaused, isTrue);
});
test("with onCancel", () {
var broadcast = wrapper.asBroadcastStream(
onCancel: expectAsync((subscription) {
expect(subscription, new isInstanceOf<StreamSubscription<int>>());
subscription.pause();
}));
broadcast.listen(null).cancel();
expect(controller.isPaused, isTrue);
});
});
test("asyncExpand()", () {
expect(
wrapper.asyncExpand((i) => new Stream.fromIterable([i, i])).toList(),
completion(equals([1, 1, 2, 2, 3, 3, 4, 4, 5, 5])));
});
test("asyncMap()", () {
expect(wrapper.asyncMap((i) => new Future.value(i * 2)).toList(),
completion(equals([2, 4, 6, 8, 10])));
});
group("distinct()", () {
test("without equals", () {
expect(wrapper.distinct().toList(),
completion(equals([1, 2, 3, 4, 5])));
expect(
new TypeSafeStream<int>(
new Stream<Object>.fromIterable([1, 1, 2, 2, 3, 3]))
.distinct().toList(),
completion(equals([1, 2, 3])));
});
test("with equals", () {
expect(wrapper.distinct((i1, i2) => (i1 ~/ 2 == i2 ~/ 2)).toList(),
completion(equals([1, 2, 4])));
});
});
group("drain()", () {
test("without a value", () {
expect(wrapper.drain(), completes);
expect(() => wrapper.drain(), throwsStateError);
});
test("with a value", () {
expect(wrapper.drain(12), completion(equals(12)));
});
});
test("expand()", () {
expect(
wrapper.expand((i) => [i, i]).toList(),
completion(equals([1, 1, 2, 2, 3, 3, 4, 4, 5, 5])));
});
group("firstWhere()", () {
test("finding a value", () {
expect(wrapper.firstWhere((i) => i > 3), completion(equals(4)));
});
test("finding no value", () {
expect(wrapper.firstWhere((i) => i > 5), throwsStateError);
});
test("with a default value", () {
expect(wrapper.firstWhere((i) => i > 5, defaultValue: () => "value"),
completion(equals("value")));
});
});
group("lastWhere()", () {
test("finding a value", () {
expect(wrapper.lastWhere((i) => i < 3), completion(equals(2)));
});
test("finding no value", () {
expect(wrapper.lastWhere((i) => i > 5), throwsStateError);
});
test("with a default value", () {
expect(wrapper.lastWhere((i) => i > 5, defaultValue: () => "value"),
completion(equals("value")));
});
});
group("singleWhere()", () {
test("finding a single value", () {
expect(wrapper.singleWhere((i) => i == 3), completion(equals(3)));
});
test("finding no value", () {
expect(wrapper.singleWhere((i) => i == 6), throwsStateError);
});
test("finding multiple values", () {
expect(wrapper.singleWhere((i) => i.isOdd), throwsStateError);
});
});
test("fold()", () {
expect(wrapper.fold("foo", (previous, i) => previous + i.toString()),
completion(equals("foo12345")));
});
test("forEach()", () async {
emptyWrapper.forEach(expectAsync((_) {}, count: 0));
var results = [];
await wrapper.forEach(results.add);
expect(results, equals([1, 2, 3, 4, 5]));
});
group("handleError()", () {
test("without a test", () {
expect(errorWrapper.handleError(expectAsync((error) {
expect(error, equals("oh no"));
})).toList(), completion(isEmpty));
});
test("with a matching test", () {
expect(errorWrapper.handleError(expectAsync((error) {
expect(error, equals("oh no"));
}), test: expectAsync((error) {
expect(error, equals("oh no"));
return true;
})).toList(), completion(isEmpty));
});
test("with a matching test", () {
expect(errorWrapper.handleError(expectAsync((_) {}, count: 0),
test: expectAsync((error) {
expect(error, equals("oh no"));
return false;
})).toList(), throwsA("oh no"));
});
});
group("listen()", () {
test("with a callback", () {
var subscription;
subscription = wrapper.listen(expectAsync((data) {
expect(data, equals(1));
subscription.onData(expectAsync((data) {
expect(data, equals(2));
subscription.cancel();
}));
}));
});
test("with a null callback", () {
expect(wrapper.listen(null).asFuture(), completes);
});
});
test("map()", () {
expect(wrapper.map((i) => i * 2).toList(),
completion(equals([2, 4, 6, 8, 10])));
});
test("pipe()", () {
var consumer = new StreamController();
expect(wrapper.pipe(consumer), completes);
expect(consumer.stream.toList(), completion(equals([1, 2, 3, 4, 5])));
});
test("reduce()", () {
expect(wrapper.reduce((value, i) => value + i), completion(equals(15)));
expect(emptyWrapper.reduce((value, i) => value + i), throwsStateError);
});
test("skipWhile()", () {
expect(wrapper.skipWhile((i) => i < 3).toList(),
completion(equals([3, 4, 5])));
});
test("takeWhile()", () {
expect(wrapper.takeWhile((i) => i < 3).toList(),
completion(equals([1, 2])));
});
test("toSet()", () {
expect(wrapper.toSet(), completion(unorderedEquals([1, 2, 3, 4, 5])));
expect(
new TypeSafeStream<int>(
new Stream<Object>.fromIterable([1, 1, 2, 2, 3, 3]))
.toSet(),
completion(unorderedEquals([1, 2, 3])));
});
test("transform()", () {
var transformer = new StreamTransformer<int, String>.fromHandlers(
handleData: (data, sink) {
sink.add(data.toString());
});
expect(wrapper.transform(transformer).toList(),
completion(equals(["1", "2", "3", "4", "5"])));
});
test("where()", () {
expect(wrapper.where((i) => i.isOdd).toList(),
completion(equals([1, 3, 5])));
});
group("any()", () {
test("with matches", () {
expect(wrapper.any((i) => i > 3), completion(isTrue));
});
test("without matches", () {
expect(wrapper.any((i) => i > 5), completion(isFalse));
});
});
group("every()", () {
test("with all matches", () {
expect(wrapper.every((i) => i < 6), completion(isTrue));
});
test("with some non-matches", () {
expect(wrapper.every((i) => i > 3), completion(isFalse));
});
});
group("skip()", () {
test("with a valid index", () {
expect(wrapper.skip(3).toList(), completion(equals([4, 5])));
});
test("with a longer index than length", () {
expect(wrapper.skip(6).toList(), completion(isEmpty));
});
test("with a negative index", () {
expect(() => wrapper.skip(-1), throwsArgumentError);
});
});
group("take()", () {
test("with a valid index", () {
expect(wrapper.take(3).toList(), completion(equals([1, 2, 3])));
});
test("with a longer index than length", () {
expect(wrapper.take(6).toList(), completion(equals([1, 2, 3, 4, 5])));
});
test("with a negative index", () {
expect(wrapper.take(-1).toList(), completion(isEmpty));
});
});
group("elementAt()", () {
test("with a valid index", () {
expect(wrapper.elementAt(3), completion(equals(4)));
});
test("with too high an index", () {
expect(wrapper.elementAt(6), throwsRangeError);
});
test("with a negative index", () {
expect(wrapper.elementAt(-1), throwsArgumentError);
});
});
group("contains()", () {
test("with an element", () {
expect(wrapper.contains(2), completion(isTrue));
});
test("with a non-element", () {
expect(wrapper.contains(6), completion(isFalse));
});
test("with a non-element of a different type", () {
expect(wrapper.contains("foo"), completion(isFalse));
});
});
group("join()", () {
test("without a separator", () {
expect(wrapper.join(), completion(equals("12345")));
});
test("with a separator", () {
expect(wrapper.join(" "), completion(equals("1 2 3 4 5")));
});
});
test("toString()", () {
expect(wrapper.toString(), contains("Stream"));
});
});
group("with invalid types", () {
var wrapper;
var singleWrapper;
setUp(() {
wrapper = new TypeSafeStream<int>(
new Stream<Object>.fromIterable(["foo", "bar", "baz"]));
singleWrapper = new TypeSafeStream<int>(
new Stream<Object>.fromIterable(["foo"]));
});
group("throws a CastError for", () {
test("first", () {
expect(wrapper.first, throwsCastError);
});
test("last", () {
expect(wrapper.last, throwsCastError);
});
test("single", () {
expect(singleWrapper.single, throwsCastError);
});
test("asBroadcastStream()", () {
var broadcast = wrapper.asBroadcastStream();
expect(broadcast.first, throwsCastError);
});
test("asyncExpand()", () {
expect(wrapper.asyncExpand(expectAsync((_) {}, count: 0)).first,
throwsCastError);
});
test("asyncMap()", () {
expect(wrapper.asyncMap(expectAsync((_) {}, count: 0)).first,
throwsCastError);
});
group("distinct()", () {
test("without equals", () {
expect(wrapper.distinct().first, throwsCastError);
});
test("with equals", () {
expect(wrapper.distinct(expectAsync((_, __) {}, count: 0)).first,
throwsCastError);
});
});
test("expand()", () {
expect(wrapper.expand(expectAsync((_) {}, count: 0)).first,
throwsCastError);
});
test("firstWhere()", () {
expect(wrapper.firstWhere(expectAsync((_) {}, count: 0)),
throwsCastError);
});
test("lastWhere()", () {
expect(wrapper.lastWhere(expectAsync((_) {}, count: 0)),
throwsCastError);
});
test("singleWhere()", () {
expect(wrapper.singleWhere(expectAsync((_) {}, count: 0)),
throwsCastError);
});
test("fold()", () {
expect(wrapper.fold("foo", expectAsync((_, __) {}, count: 0)),
throwsCastError);
});
test("forEach()", () async {
expect(wrapper.forEach(expectAsync((_) {}, count: 0)), throwsCastError);
});
test("handleError()", () {
expect(wrapper.handleError(expectAsync((_) {}, count: 0)).first,
throwsCastError);
});
test("listen()", () {
expect(() => wrapper.take(1).listen(expectAsync((_) {}, count: 0)),
throwsZonedCastError);
});
test("map()", () {
expect(wrapper.map(expectAsync((_) {}, count: 0)).first,
throwsCastError);
});
test("reduce()", () {
expect(wrapper.reduce(expectAsync((_, __) {}, count: 0)),
throwsCastError);
});
test("skipWhile()", () {
expect(wrapper.skipWhile(expectAsync((_) {}, count: 0)).first,
throwsCastError);
});
test("takeWhile()", () {
expect(wrapper.takeWhile(expectAsync((_) {}, count: 0)).first,
throwsCastError);
});
test("toList()", () async {
var list = await wrapper.toList();
expect(() => list.first, throwsCastError);
}, skip: "Re-enable this when test can run DDC (test#414).");
test("toSet()", () async {
var asSet = await wrapper.toSet();
expect(() => asSet.first, throwsCastError);
}, skip: "Re-enable this when test can run DDC (test#414).");
test("where()", () {
expect(wrapper.where(expectAsync((_) {}, count: 0)).first,
throwsCastError);
});
test("any()", () {
expect(wrapper.any(expectAsync((_) {}, count: 0)), throwsCastError);
});
test("every()", () {
expect(wrapper.every(expectAsync((_) {}, count: 0)), throwsCastError);
});
test("skip()", () {
expect(wrapper.skip(1).first, throwsCastError);
});
test("take()", () {
expect(wrapper.take(1).first, throwsCastError);
});
test("elementAt()", () {
expect(wrapper.elementAt(1), throwsCastError);
});
});
group("doesn't throw a CastError for", () {
test("single", () {
expect(wrapper.single, throwsStateError);
});
test("length", () {
expect(wrapper.length, completion(equals(3)));
});
test("isBroadcast", () {
expect(wrapper.isBroadcast, isFalse);
});
test("isEmpty", () {
expect(wrapper.isEmpty, completion(isFalse));
});
group("drain()", () {
test("without a value", () {
expect(wrapper.drain(), completes);
expect(() => wrapper.drain(), throwsStateError);
});
test("with a value", () {
expect(wrapper.drain(12), completion(equals(12)));
});
});
test("skip()", () {
expect(() => wrapper.skip(-1), throwsArgumentError);
});
group("elementAt()", () {
test("with too high an index", () {
expect(wrapper.elementAt(6), throwsRangeError);
});
test("with a negative index", () {
expect(wrapper.elementAt(-1), throwsArgumentError);
});
});
group("contains()", () {
test("with an element", () {
expect(wrapper.contains("foo"), completion(isTrue));
});
test("with a non-element", () {
expect(wrapper.contains("qux"), completion(isFalse));
});
test("with a non-element of a different type", () {
expect(wrapper.contains(1), completion(isFalse));
});
});
group("join()", () {
test("without a separator", () {
expect(wrapper.join(), completion(equals("foobarbaz")));
});
test("with a separator", () {
expect(wrapper.join(" "), completion(equals("foo bar baz")));
});
});
test("toString()", () {
expect(wrapper.toString(), contains("Stream"));
});
});
});
}