blob: a00d8bd65f9894f75c2e04325a6757991236ec3a [file] [log] [blame]
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import "dart:async";
import "package:async/async.dart";
import "package:test/test.dart";
/// Create an error with the same values as [base], except that it throwsA
/// when seeing the value [errorValue].
Stream streamError(Stream base, int errorValue, error) {
return base.map((x) => (x == errorValue) ? throw error : x);
}
/// Make a [Stream] from an [Iterable] by adding events to a stream controller
/// at periodic intervals.
Stream mks(Iterable iterable) {
Iterator iterator = iterable.iterator;
StreamController controller = new StreamController();
// Some varying time between 3 and 10 ms.
int ms = ((++ctr) * 5) % 7 + 3;
new Timer.periodic(new Duration(milliseconds: ms), (Timer timer) {
if (iterator.moveNext()) {
controller.add(iterator.current);
} else {
controller.close();
timer.cancel();
}
});
return controller.stream;
}
/// Counter used to give varying delays for streams.
int ctr = 0;
main() {
// Test that zipping [streams] gives the results iterated by [expectedData].
testZip(Iterable<Stream> streams, Iterable expectedData) {
List data = [];
Stream zip = new StreamZip(streams);
zip.listen(data.add, onDone: expectAsync(() {
expect(data, equals(expectedData));
}));
}
test("Basic", () {
testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([7, 8, 9])],
[[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
});
test("Uneven length 1", () {
testZip([mks([1, 2, 3, 99, 100]), mks([4, 5, 6]), mks([7, 8, 9])],
[[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
});
test("Uneven length 2", () {
testZip([mks([1, 2, 3]), mks([4, 5, 6, 99, 100]), mks([7, 8, 9])],
[[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
});
test("Uneven length 3", () {
testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([7, 8, 9, 99, 100])],
[[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
});
test("Uneven length 4", () {
testZip([mks([1, 2, 3, 98]), mks([4, 5, 6]), mks([7, 8, 9, 99, 100])],
[[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
});
test("Empty 1", () {
testZip([mks([]), mks([4, 5, 6]), mks([7, 8, 9])], []);
});
test("Empty 2", () {
testZip([mks([1, 2, 3]), mks([]), mks([7, 8, 9])], []);
});
test("Empty 3", () {
testZip([mks([1, 2, 3]), mks([4, 5, 6]), mks([])], []);
});
test("Empty source", () {
testZip([], []);
});
test("Single Source", () {
testZip([mks([1, 2, 3])], [[1], [2], [3]]);
});
test("Other-streams", () {
Stream st1 = mks([1, 2, 3, 4, 5, 6]).where((x) => x < 4);
Stream st2 = new Stream.periodic(const Duration(milliseconds: 5),
(x) => x + 4).take(3);
StreamController c = new StreamController.broadcast();
Stream st3 = c.stream;
testZip([st1, st2, st3],
[[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
c..add(7)..add(8)..add(9)..close();
});
test("Error 1", () {
expect(new StreamZip([streamError(mks([1, 2, 3]), 2, "BAD-1"),
mks([4, 5, 6]),
mks([7, 8, 9])]).toList(),
throwsA(equals("BAD-1")));
});
test("Error 2", () {
expect(new StreamZip([mks([1, 2, 3]),
streamError(mks([4, 5, 6]), 5, "BAD-2"),
mks([7, 8, 9])]).toList(),
throwsA(equals("BAD-2")));
});
test("Error 3", () {
expect(new StreamZip([mks([1, 2, 3]),
mks([4, 5, 6]),
streamError(mks([7, 8, 9]), 8, "BAD-3")]).toList(),
throwsA(equals("BAD-3")));
});
test("Error at end", () {
expect(new StreamZip([mks([1, 2, 3]),
streamError(mks([4, 5, 6]), 6, "BAD-4"),
mks([7, 8, 9])]).toList(),
throwsA(equals("BAD-4")));
});
test("Error before first end", () {
// StreamControllers' streams with no "close" called will never be done,
// so the fourth event of the first stream is guaranteed to come first.
expect(new StreamZip(
[streamError(mks([1, 2, 3, 4]), 4, "BAD-5"),
(new StreamController()..add(4)..add(5)..add(6)).stream,
(new StreamController()..add(7)..add(8)..add(9)).stream]
).toList(),
throwsA(equals("BAD-5")));
});
test("Error after first end", () {
StreamController controller = new StreamController();
controller..add(7)..add(8)..add(9);
// Transformer that puts error into controller when one of the first two
// streams have sent a done event.
StreamTransformer trans = new StreamTransformer.fromHandlers(
handleDone: (EventSink s) {
Timer.run(() { controller.addError("BAD-6"); });
s.close();
});
testZip([mks([1, 2, 3]).transform(trans),
mks([4, 5, 6]).transform(trans),
controller.stream],
[[1, 4, 7], [2, 5, 8], [3, 6, 9]]);
});
test("Pause/Resume", () {
int sc1p = 0;
StreamController c1 = new StreamController(
onPause: () {
sc1p++;
},
onResume: () {
sc1p--;
});
int sc2p = 0;
StreamController c2 = new StreamController(
onPause: () {
sc2p++;
},
onResume: () {
sc2p--;
});
var done = expectAsync((){
expect(sc1p, equals(1));
expect(sc2p, equals(0));
}); // Call to complete test.
Stream zip = new StreamZip([c1.stream, c2.stream]);
const ms25 = const Duration(milliseconds: 25);
// StreamIterator uses pause and resume to control flow.
StreamIterator it = new StreamIterator(zip);
it.moveNext().then((hasMore) {
expect(hasMore, isTrue);
expect(it.current, equals([1, 2]));
return it.moveNext();
}).then((hasMore) {
expect(hasMore, isTrue);
expect(it.current, equals([3, 4]));
c2.add(6);
return it.moveNext();
}).then((hasMore) {
expect(hasMore, isTrue);
expect(it.current, equals([5, 6]));
new Future.delayed(ms25).then((_) { c2.add(8); });
return it.moveNext();
}).then((hasMore) {
expect(hasMore, isTrue);
expect(it.current, equals([7, 8]));
c2.add(9);
return it.moveNext();
}).then((hasMore) {
expect(hasMore, isFalse);
done();
});
c1..add(1)..add(3)..add(5)..add(7)..close();
c2..add(2)..add(4);
});
test("pause-resume2", () {
var s1 = new Stream.fromIterable([0, 2, 4, 6, 8]);
var s2 = new Stream.fromIterable([1, 3, 5, 7]);
var sz = new StreamZip([s1, s2]);
int ctr = 0;
var sub;
sub = sz.listen(expectAsync((v) {
expect(v, equals([ctr * 2, ctr * 2 + 1]));
if (ctr == 1) {
sub.pause(new Future.delayed(const Duration(milliseconds: 25)));
} else if (ctr == 2) {
sub.pause();
new Future.delayed(const Duration(milliseconds: 25)).then((_) {
sub.resume();
});
}
ctr++;
}, count: 4));
});
}