blob: c454dca9ce70fd5f83f66601ef36e6553ab401c0 [file] [log] [blame]
// Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// Test merging streams.
library merge_stream_test;
import "dart:async";
import '../../../pkg/unittest/lib/unittest.dart';
import 'event_helper.dart';
testSupercedeStream() {
{ // Simple case of superceding lower priority streams.
StreamController s1 = new StreamController.multiSubscription();
StreamController s2 = new StreamController.multiSubscription();
StreamController s3 = new StreamController.multiSubscription();
Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
Events expected = new Events()..add(1)..add(2)..add(3)..add(4)..close();
Events actual = new Events.capture(merge);
s1.add(1);
s2.add(2);
s1.add(1); // Ignored.
s2.add(3);
s3.add(4);
s2.add(3); // Ignored.
s3.close();
Expect.listEquals(expected.events, actual.events);
}
{ // Superceding more than one stream at a time.
StreamController s1 = new StreamController.multiSubscription();
StreamController s2 = new StreamController.multiSubscription();
StreamController s3 = new StreamController.multiSubscription();
Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
Events expected = new Events()..add(1)..add(2)..close();
Events actual = new Events.capture(merge);
s1.add(1);
s3.add(2);
s1.add(1); // Ignored.
s2.add(1); // Ignored.
s3.close();
Expect.listEquals(expected.events, actual.events);
}
{ // Closing a stream before superceding it.
StreamController s1 = new StreamController.multiSubscription();
StreamController s2 = new StreamController.multiSubscription();
StreamController s3 = new StreamController.multiSubscription();
Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
Events expected = new Events()..add(1)..add(2)..add(3)..close();
Events actual = new Events.capture(merge);
s1.add(1);
s1.close();
s3.close();
s2.add(2);
s2.add(3);
s2.close();
Expect.listEquals(expected.events, actual.events);
}
{ // Errors from all non-superceded streams are forwarded.
StreamController s1 = new StreamController.multiSubscription();
StreamController s2 = new StreamController.multiSubscription();
StreamController s3 = new StreamController.multiSubscription();
Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
Events expected =
new Events()..add(1)..error("1")..error("2")..error("3")
..add(3)..error("6")..add(4)..close();
Events actual = new Events.capture(merge);
s1.add(1);
s1.signalError(new AsyncError("1"));
s2.signalError(new AsyncError("2"));
s3.signalError(new AsyncError("3"));
s3.add(3);
s1.signalError(new AsyncError("4"));
s2.signalError(new AsyncError("5"));
s3.signalError(new AsyncError("6"));
s1.close();
s2.close();
s3.add(4);
s3.close();
Expect.listEquals(expected.events, actual.events);
}
test("Pausing on a superceding stream", () {
StreamController s1 = new StreamController.multiSubscription();
StreamController s2 = new StreamController.multiSubscription();
StreamController s3 = new StreamController.multiSubscription();
Stream merge = new Stream.superceding([s1.stream, s2.stream, s3.stream]);
Events expected = new Events()..add(1)..add(2)..add(3);
Events actual = new Events.capture(merge);
s1.add(1);
s2.add(2);
s2.add(3);
Expect.listEquals(expected.events, actual.events);
actual.pause(); // Pauses the stream that feeds the actual Events.
Events expected2 = expected.copy();
expected..add(5)..add(6)..close();
expected2..add(6)..close();
s1.add(4);
s2.add(5); // May or may not arrive before '6' when resuming.
s3.add(6);
s3.close();
actual.onDone(expectAsync0(() {
if (expected.events.length == actual.events.length) {
Expect.listEquals(expected.events, actual.events);
} else {
Expect.listEquals(expected2.events, actual.events);
}
}));
actual.resume();
});
}
void testCyclicStream() {
test("Simple case of superceding lower priority streams", () {
StreamController s1 = new StreamController.multiSubscription();
StreamController s2 = new StreamController.multiSubscription();
StreamController s3 = new StreamController.multiSubscription();
Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]);
Events expected =
new Events()..add(1)..add(2)..add(3)..add(4)..add(5)..add(6)..close();
Events actual = new Events.capture(merge);
Expect.isFalse(s1.isPaused);
Expect.isTrue(s2.isPaused);
Expect.isTrue(s3.isPaused);
s3.add(3);
s1.add(1);
s1.add(4);
s1.add(6);
s1.close();
s2.add(2);
s2.add(5);
s2.close();
s3.close();
actual.onDone(expectAsync0(() {
Expect.listEquals(expected.events, actual.events);
}));
});
test("Cyclic merge with errors", () {
StreamController s1 = new StreamController.multiSubscription();
StreamController s2 = new StreamController.multiSubscription();
StreamController s3 = new StreamController.multiSubscription();
Stream merge = new Stream.cyclic([s1.stream, s2.stream, s3.stream]);
Events expected =
new Events()..add(1)..error("1")..add(2)..add(3)..error("2")
..add(4)..add(5)..error("3")..add(6)..close();
Events actual = new Events.capture(merge);
Expect.isFalse(s1.isPaused);
Expect.isTrue(s2.isPaused);
Expect.isTrue(s3.isPaused);
s3.add(3);
s3.signalError(new AsyncError("3")); // Error just before a "done".
s1.add(1);
s1.signalError(new AsyncError("2")); // Error between events.
s1.add(4);
s1.add(6);
s1.close();
s2.signalError(new AsyncError("1")); // Error as first event.
s2.add(2);
s2.add(5);
s2.close();
s3.close();
actual.onDone(expectAsync0(() {
Expect.listEquals(expected.events, actual.events);
}));
});
}
main() {
testSupercedeStream();
testCyclicStream();
}