blob: 018c8ab756470c1e6513aedf5e72b5f8d42cc6ae [file] [log] [blame]
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import "dart:async";
import "package:async/async.dart";
import "package:test/test.dart";
/// Create an error with the same values as [base], except that it throwsA
/// when seeing the value [errorValue].
Stream streamError(Stream base, int errorValue, error) {
return base.map((x) => (x == errorValue) ? throw error : x);
}
/// Make a [Stream] from an [Iterable] by adding events to a stream controller
/// at periodic intervals.
Stream mks(Iterable iterable) {
Iterator iterator = iterable.iterator;
StreamController controller = new StreamController();
// Some varying time between 3 and 10 ms.
int ms = ((++ctr) * 5) % 7 + 3;
new Timer.periodic(new Duration(milliseconds: ms), (Timer timer) {
if (iterator.moveNext()) {
controller.add(iterator.current);
} else {
controller.close();
timer.cancel();
}
});
return controller.stream;
}
/// Counter used to give varying delays for streams.
int ctr = 0;
main() {
// Test that zipping [streams] gives the results iterated by [expectedData].
testZip(Iterable<Stream> streams, Iterable expectedData) {
List data = [];
Stream zip = new StreamZip(streams);
zip.listen(data.add, onDone: expectAsync0(() {
expect(data, equals(expectedData));
}));
}
test("Basic", () {
testZip([
mks([1, 2, 3]),
mks([4, 5, 6]),
mks([7, 8, 9])
], [
[1, 4, 7],
[2, 5, 8],
[3, 6, 9]
]);
});
test("Uneven length 1", () {
testZip([
mks([1, 2, 3, 99, 100]),
mks([4, 5, 6]),
mks([7, 8, 9])
], [
[1, 4, 7],
[2, 5, 8],
[3, 6, 9]
]);
});
test("Uneven length 2", () {
testZip([
mks([1, 2, 3]),
mks([4, 5, 6, 99, 100]),
mks([7, 8, 9])
], [
[1, 4, 7],
[2, 5, 8],
[3, 6, 9]
]);
});
test("Uneven length 3", () {
testZip([
mks([1, 2, 3]),
mks([4, 5, 6]),
mks([7, 8, 9, 99, 100])
], [
[1, 4, 7],
[2, 5, 8],
[3, 6, 9]
]);
});
test("Uneven length 4", () {
testZip([
mks([1, 2, 3, 98]),
mks([4, 5, 6]),
mks([7, 8, 9, 99, 100])
], [
[1, 4, 7],
[2, 5, 8],
[3, 6, 9]
]);
});
test("Empty 1", () {
testZip([
mks([]),
mks([4, 5, 6]),
mks([7, 8, 9])
], []);
});
test("Empty 2", () {
testZip([
mks([1, 2, 3]),
mks([]),
mks([7, 8, 9])
], []);
});
test("Empty 3", () {
testZip([
mks([1, 2, 3]),
mks([4, 5, 6]),
mks([])
], []);
});
test("Empty source", () {
testZip([], []);
});
test("Single Source", () {
testZip([
mks([1, 2, 3])
], [
[1],
[2],
[3]
]);
});
test("Other-streams", () {
Stream st1 = mks([1, 2, 3, 4, 5, 6]).where((x) => x < 4);
Stream st2 =
new Stream.periodic(const Duration(milliseconds: 5), (x) => x + 4)
.take(3);
StreamController c = new StreamController.broadcast();
Stream st3 = c.stream;
testZip([
st1,
st2,
st3
], [
[1, 4, 7],
[2, 5, 8],
[3, 6, 9]
]);
c
..add(7)
..add(8)
..add(9)
..close();
});
test("Error 1", () {
expect(
new StreamZip([
streamError(mks([1, 2, 3]), 2, "BAD-1"),
mks([4, 5, 6]),
mks([7, 8, 9])
]).toList(),
throwsA(equals("BAD-1")));
});
test("Error 2", () {
expect(
new StreamZip([
mks([1, 2, 3]),
streamError(mks([4, 5, 6]), 5, "BAD-2"),
mks([7, 8, 9])
]).toList(),
throwsA(equals("BAD-2")));
});
test("Error 3", () {
expect(
new StreamZip([
mks([1, 2, 3]),
mks([4, 5, 6]),
streamError(mks([7, 8, 9]), 8, "BAD-3")
]).toList(),
throwsA(equals("BAD-3")));
});
test("Error at end", () {
expect(
new StreamZip([
mks([1, 2, 3]),
streamError(mks([4, 5, 6]), 6, "BAD-4"),
mks([7, 8, 9])
]).toList(),
throwsA(equals("BAD-4")));
});
test("Error before first end", () {
// StreamControllers' streams with no "close" called will never be done,
// so the fourth event of the first stream is guaranteed to come first.
expect(
new StreamZip([
streamError(mks([1, 2, 3, 4]), 4, "BAD-5"),
(new StreamController()..add(4)..add(5)..add(6)).stream,
(new StreamController()..add(7)..add(8)..add(9)).stream
]).toList(),
throwsA(equals("BAD-5")));
});
test("Error after first end", () {
StreamController controller = new StreamController();
controller..add(7)..add(8)..add(9);
// Transformer that puts error into controller when one of the first two
// streams have sent a done event.
StreamTransformer trans =
new StreamTransformer.fromHandlers(handleDone: (EventSink s) {
Timer.run(() {
controller.addError("BAD-6");
});
s.close();
});
testZip([
mks([1, 2, 3]).transform(trans),
mks([4, 5, 6]).transform(trans),
controller.stream
], [
[1, 4, 7],
[2, 5, 8],
[3, 6, 9]
]);
});
test("Pause/Resume", () {
int sc1p = 0;
StreamController c1 = new StreamController(onPause: () {
sc1p++;
}, onResume: () {
sc1p--;
});
int sc2p = 0;
StreamController c2 = new StreamController(onPause: () {
sc2p++;
}, onResume: () {
sc2p--;
});
var done = expectAsync0(() {
expect(sc1p, equals(1));
expect(sc2p, equals(0));
}); // Call to complete test.
Stream zip = new StreamZip([c1.stream, c2.stream]);
const ms25 = const Duration(milliseconds: 25);
// StreamIterator uses pause and resume to control flow.
StreamIterator it = new StreamIterator(zip);
it.moveNext().then((hasMore) {
expect(hasMore, isTrue);
expect(it.current, equals([1, 2]));
return it.moveNext();
}).then((hasMore) {
expect(hasMore, isTrue);
expect(it.current, equals([3, 4]));
c2.add(6);
return it.moveNext();
}).then((hasMore) {
expect(hasMore, isTrue);
expect(it.current, equals([5, 6]));
new Future.delayed(ms25).then((_) {
c2.add(8);
});
return it.moveNext();
}).then((hasMore) {
expect(hasMore, isTrue);
expect(it.current, equals([7, 8]));
c2.add(9);
return it.moveNext();
}).then((hasMore) {
expect(hasMore, isFalse);
done();
});
c1
..add(1)
..add(3)
..add(5)
..add(7)
..close();
c2..add(2)..add(4);
});
test("pause-resume2", () {
var s1 = new Stream.fromIterable([0, 2, 4, 6, 8]);
var s2 = new Stream.fromIterable([1, 3, 5, 7]);
var sz = new StreamZip([s1, s2]);
int ctr = 0;
StreamSubscription sub;
sub = sz.listen(expectAsync1((v) {
expect(v, equals([ctr * 2, ctr * 2 + 1]));
if (ctr == 1) {
sub.pause(new Future.delayed(const Duration(milliseconds: 25)));
} else if (ctr == 2) {
sub.pause();
new Future.delayed(const Duration(milliseconds: 25)).then((_) {
sub.resume();
});
}
ctr++;
}, count: 4));
});
}