blob: a0792509134aafc98216a1c40b20f4fe3ddca1a0 [file] [log] [blame]
// Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// Test the basic StreamController and StreamController.broadcast.
library stream_controller_async_test;
import 'dart:async';
import 'package:expect/expect.dart';
import 'package:async_helper/async_minitest.dart';
import 'event_helper.dart';
import 'stream_state_helper.dart';
void cancelSub(StreamSubscription sub) {
sub.cancel();
}
testController() {
// Test fold
test("StreamController.fold", () {
StreamController c = new StreamController();
Stream stream = c.stream.asBroadcastStream(onCancel: cancelSub);
stream.fold(0, (a, b) => a + b).then(expectAsync((int v) {
Expect.equals(42, v);
}));
c.add(10);
c.add(32);
c.close();
});
test("StreamController.fold throws", () {
StreamController c = new StreamController();
Stream stream = c.stream.asBroadcastStream(onCancel: cancelSub);
stream.fold(0, (a, b) {
throw "Fnyf!";
}).catchError(expectAsync((error) {
Expect.equals("Fnyf!", error);
}));
c.add(42);
});
}
testSingleController() {
test("Single-subscription StreamController.fold", () {
StreamController c = new StreamController();
Stream stream = c.stream;
stream.fold(0, (a, b) => a + b).then(expectAsync((int v) {
Expect.equals(42, v);
}));
c.add(10);
c.add(32);
c.close();
});
test("Single-subscription StreamController.fold throws", () {
StreamController c = new StreamController();
Stream stream = c.stream;
stream.fold(0, (a, b) {
throw "Fnyf!";
}).catchError(expectAsync((e) {
Expect.equals("Fnyf!", e);
}));
c.add(42);
});
test(
"Single-subscription StreamController events are buffered when"
" there is no subscriber", () {
StreamController c = new StreamController();
EventSink sink = c.sink;
Stream stream = c.stream;
int counter = 0;
sink.add(1);
sink.add(2);
sink.close();
stream.listen((data) {
counter += data;
}, onDone: expectAsync(() {
Expect.equals(3, counter);
}));
});
}
testExtraMethods() {
Events sentEvents = new Events()
..add(7)
..add(9)
..add(13)
..add(87)
..close();
test("forEach", () {
StreamController c = new StreamController();
Events actualEvents = new Events();
Future f = c.stream.forEach(actualEvents.add);
f.then(expectAsync((_) {
actualEvents.close();
Expect.listEquals(sentEvents.events, actualEvents.events);
}));
sentEvents.replay(c);
});
test("forEachError", () {
Events sentEvents = new Events()
..add(7)
..error("bad")
..add(87)
..close();
StreamController c = new StreamController();
Events actualEvents = new Events();
Future f = c.stream.forEach(actualEvents.add);
f.catchError(expectAsync((error) {
Expect.equals("bad", error);
Expect.listEquals((new Events()..add(7)).events, actualEvents.events);
}));
sentEvents.replay(c);
});
test("forEachError2", () {
Events sentEvents = new Events()
..add(7)
..add(9)
..add(87)
..close();
StreamController c = new StreamController();
Events actualEvents = new Events();
Future f = c.stream.forEach((x) {
if (x == 9) throw "bad";
actualEvents.add(x);
});
f.catchError(expectAsync((error) {
Expect.equals("bad", error);
Expect.listEquals((new Events()..add(7)).events, actualEvents.events);
}));
sentEvents.replay(c);
});
test("firstWhere", () {
StreamController c = new StreamController();
Future f = c.stream.firstWhere((x) => (x % 3) == 0);
f.then(expectAsync((v) {
Expect.equals(9, v);
}));
sentEvents.replay(c);
});
test("firstWhere 2", () {
StreamController c = new StreamController();
Future f = c.stream.firstWhere((x) => (x % 4) == 0);
f.catchError(expectAsync((e) {}));
sentEvents.replay(c);
});
test("firstWhere 3", () {
StreamController c = new StreamController();
Future f = c.stream.firstWhere((x) => (x % 4) == 0, orElse: () => 999);
f.then(expectAsync((v) {
Expect.equals(999, v);
}));
sentEvents.replay(c);
});
test("lastWhere", () {
StreamController c = new StreamController();
Future f = c.stream.lastWhere((x) => (x % 3) == 0);
f.then(expectAsync((v) {
Expect.equals(87, v);
}));
sentEvents.replay(c);
});
test("lastWhere 2", () {
StreamController c = new StreamController();
Future f = c.stream.lastWhere((x) => (x % 4) == 0);
f.catchError(expectAsync((e) {}));
sentEvents.replay(c);
});
test("lastWhere 3", () {
StreamController c = new StreamController();
Future f = c.stream.lastWhere((x) => (x % 4) == 0, orElse: () => 999);
f.then(expectAsync((v) {
Expect.equals(999, v);
}));
sentEvents.replay(c);
});
test("singleWhere", () {
StreamController c = new StreamController();
Future f = c.stream.singleWhere((x) => (x % 9) == 0);
f.then(expectAsync((v) {
Expect.equals(9, v);
}));
sentEvents.replay(c);
});
test("singleWhere 2", () {
StreamController c = new StreamController();
Future f = c.stream.singleWhere((x) => (x % 3) == 0); // Matches 9 and 87..
f.catchError(expectAsync((error) {
Expect.isTrue(error is StateError);
}));
sentEvents.replay(c);
});
test("first", () {
StreamController c = new StreamController();
Future f = c.stream.first;
f.then(expectAsync((v) {
Expect.equals(7, v);
}));
sentEvents.replay(c);
});
test("first empty", () {
StreamController c = new StreamController();
Future f = c.stream.first;
f.catchError(expectAsync((error) {
Expect.isTrue(error is StateError);
}));
Events emptyEvents = new Events()..close();
emptyEvents.replay(c);
});
test("first error", () {
StreamController c = new StreamController();
Future f = c.stream.first;
f.catchError(expectAsync((error) {
Expect.equals("error", error);
}));
Events errorEvents = new Events()
..error("error")
..close();
errorEvents.replay(c);
});
test("first error 2", () {
StreamController c = new StreamController();
Future f = c.stream.first;
f.catchError(expectAsync((error) {
Expect.equals("error", error);
}));
Events errorEvents = new Events()
..error("error")
..error("error2")
..close();
errorEvents.replay(c);
});
test("last", () {
StreamController c = new StreamController();
Future f = c.stream.last;
f.then(expectAsync((v) {
Expect.equals(87, v);
}));
sentEvents.replay(c);
});
test("last empty", () {
StreamController c = new StreamController();
Future f = c.stream.last;
f.catchError(expectAsync((error) {
Expect.isTrue(error is StateError);
}));
Events emptyEvents = new Events()..close();
emptyEvents.replay(c);
});
test("last error", () {
StreamController c = new StreamController();
Future f = c.stream.last;
f.catchError(expectAsync((error) {
Expect.equals("error", error);
}));
Events errorEvents = new Events()
..error("error")
..close();
errorEvents.replay(c);
});
test("last error 2", () {
StreamController c = new StreamController();
Future f = c.stream.last;
f.catchError(expectAsync((error) {
Expect.equals("error", error);
}));
Events errorEvents = new Events()
..error("error")
..error("error2")
..close();
errorEvents.replay(c);
});
test("elementAt", () {
StreamController c = new StreamController();
Future f = c.stream.elementAt(2);
f.then(expectAsync((v) {
Expect.equals(13, v);
}));
sentEvents.replay(c);
});
test("elementAt 2", () {
StreamController c = new StreamController();
Future f = c.stream.elementAt(20);
f.catchError(expectAsync((error) {
Expect.isTrue(error is RangeError);
}));
sentEvents.replay(c);
});
test("drain", () {
StreamController c = new StreamController();
Future f = c.stream.drain();
f.then(expectAsync((v) {
Expect.equals(null, v);
}));
sentEvents.replay(c);
});
test("drain error", () {
StreamController c = new StreamController();
Future f = c.stream.drain();
f.catchError(expectAsync((error) {
Expect.equals("error", error);
}));
Events errorEvents = new Events()
..error("error")
..error("error2")
..close();
errorEvents.replay(c);
});
}
testPause() {
test("pause event-unpause", () {
StreamProtocolTest test = new StreamProtocolTest();
Completer completer = new Completer();
test
..expectListen()
..expectData(42, () {
test.pause(completer.future);
})
..expectPause(() {
completer.complete(null);
})
..expectData(43)
..expectData(44)
..expectCancel()
..expectDone(test.terminate);
test.listen();
test.add(42);
test.add(43);
test.add(44);
test.close();
});
test("pause twice event-unpause", () {
StreamProtocolTest test = new StreamProtocolTest();
Completer completer = new Completer();
Completer completer2 = new Completer();
test
..expectListen()
..expectData(42, () {
test.pause(completer.future);
test.pause(completer2.future);
})
..expectPause(() {
completer.future.then(completer2.complete);
completer.complete(null);
})
..expectData(43)
..expectData(44)
..expectCancel()
..expectDone(test.terminate);
test
..listen()
..add(42)
..add(43)
..add(44)
..close();
});
test("pause twice direct-unpause", () {
StreamProtocolTest test = new StreamProtocolTest();
test
..expectListen()
..expectData(42, () {
test.pause();
test.pause();
})
..expectPause(() {
test.resume();
test.resume();
})
..expectData(43)
..expectData(44)
..expectCancel()
..expectDone(test.terminate);
test
..listen()
..add(42)
..add(43)
..add(44)
..close();
});
test("pause twice direct-event-unpause", () {
StreamProtocolTest test = new StreamProtocolTest();
Completer completer = new Completer();
test
..expectListen()
..expectData(42, () {
test.pause();
test.pause(completer.future);
test.add(43);
test.add(44);
test.close();
})
..expectPause(() {
completer.future.then((v) => test.resume());
completer.complete(null);
})
..expectData(43)
..expectData(44)
..expectCancel()
..expectDone(test.terminate);
test
..listen()
..add(42);
});
}
class TestError {
const TestError();
}
testRethrow() {
TestError error = const TestError();
testStream(name, streamValueTransform) {
test("rethrow-$name-value", () {
StreamController c = new StreamController();
Stream s = streamValueTransform(c.stream, (v) {
throw error;
});
s.listen((_) {
Expect.fail("unexpected value");
}, onError: expectAsync((e) {
Expect.identical(error, e);
}));
c.add(null);
c.close();
});
}
testStreamError(name, streamErrorTransform) {
test("rethrow-$name-error", () {
StreamController c = new StreamController();
Stream s = streamErrorTransform(c.stream, (e) {
throw error;
});
s.listen((_) {
Expect.fail("unexpected value");
}, onError: expectAsync((e) {
Expect.identical(error, e);
}));
c.addError("SOME ERROR");
c.close();
});
}
testFuture(name, streamValueTransform) {
test("rethrow-$name-value", () {
StreamController c = new StreamController();
Future f = streamValueTransform(c.stream, (v) {
throw error;
});
f.then((v) {
Expect.fail("unreachable");
}, onError: expectAsync((e) {
Expect.identical(error, e);
}));
// Need two values to trigger compare for reduce.
c.add(0);
c.add(1);
c.close();
});
}
testStream("where", (s, act) => s.where(act));
testStream("map", (s, act) => s.map(act));
testStream("expand", (s, act) => s.expand(act));
testStream("where", (s, act) => s.where(act));
testStreamError("handleError", (s, act) => s.handleError(act));
testStreamError("handleTest", (s, act) => s.handleError((v) {}, test: act));
testFuture("forEach", (s, act) => s.forEach(act));
testFuture("every", (s, act) => s.every(act));
testFuture("any", (s, act) => s.any(act));
testFuture("reduce", (s, act) => s.reduce((a, b) => act(b)));
testFuture("fold", (s, act) => s.fold(0, (a, b) => act(b)));
testFuture("drain", (s, act) => s.drain().then(act));
}
void testBroadcastController() {
test("broadcast-controller-basic", () {
StreamProtocolTest test = new StreamProtocolTest.broadcast();
test
..expectListen()
..expectData(42)
..expectCancel()
..expectDone(test.terminate);
test
..listen()
..add(42)
..close();
});
test("broadcast-controller-listen-twice", () {
StreamProtocolTest test = new StreamProtocolTest.broadcast();
test
..expectListen()
..expectData(42, () {
test.listen();
test.add(37);
test.close();
})
// Order is not guaranteed between subscriptions if not sync.
..expectData(37)
..expectData(37)
..expectDone()
..expectCancel()
..expectDone(test.terminate);
test.listen();
test.add(42);
});
test("broadcast-controller-listen-twice-non-overlap", () {
StreamProtocolTest test = new StreamProtocolTest.broadcast();
test
..expectListen(() {
test.add(42);
})
..expectData(42, () {
test.cancel();
})
..expectCancel(() {
test.listen();
})
..expectListen(() {
test.add(37);
})
..expectData(37, () {
test.close();
})
..expectCancel()
..expectDone(test.terminate);
test.listen();
});
test("broadcast-controller-individual-pause", () {
StreamProtocolTest test = new StreamProtocolTest.broadcast();
var sub1;
test
..expectListen()
..expectData(42)
..expectData(42, () {
sub1.pause();
})
..expectData(43, () {
sub1.cancel();
test.listen();
test.add(44);
test.expectData(44);
test.expectData(44, test.terminate);
});
sub1 = test.listen();
test.listen();
test.add(42);
test.add(43);
});
test("broadcast-controller-add-in-callback", () {
StreamProtocolTest test = new StreamProtocolTest.broadcast();
test.expectListen();
var sub = test.listen();
test.add(42);
sub.expectData(42, () {
test.add(87);
sub.cancel();
});
test.expectCancel(() {
test.add(37);
test.terminate();
});
});
}
void testAsBroadcast() {
test("asBroadcast-not-canceled", () {
StreamProtocolTest test = new StreamProtocolTest.asBroadcast();
var sub;
test
..expectListen()
..expectBroadcastListen((_) {
test.add(42);
})
..expectData(42, () {
sub.cancel();
})
..expectBroadcastCancel((_) {
sub = test.listen();
})
..expectBroadcastListen((_) {
test.terminate();
});
sub = test.listen();
});
test("asBroadcast-canceled", () {
StreamProtocolTest test = new StreamProtocolTest.asBroadcast();
var sub;
test
..expectListen()
..expectBroadcastListen((_) {
test.add(42);
})
..expectData(42, () {
sub.cancel();
})
..expectBroadcastCancel((originalSub) {
originalSub.cancel();
})
..expectCancel(test.terminate);
sub = test.listen();
});
test("asBroadcast-pause-original", () {
StreamProtocolTest test = new StreamProtocolTest.asBroadcast();
var sub;
test
..expectListen()
..expectBroadcastListen((_) {
test.add(42);
test.add(43);
})
..expectData(42, () {
sub.cancel();
})
..expectBroadcastCancel((originalSub) {
originalSub.pause(); // Pause before sending 43 from original sub.
})
..expectPause(() {
sub = test.listen();
})
..expectBroadcastListen((originalSub) {
originalSub.resume();
})
..expectData(43)
..expectResume(() {
test.close();
})
..expectCancel()
..expectDone()
..expectBroadcastCancel((_) => test.terminate());
sub = test.listen();
});
}
void testSink({bool sync, bool broadcast, bool asBroadcast}) {
String type =
"${sync ? "S" : "A"}${broadcast ? "B" : "S"}${asBroadcast ? "aB" : ""}";
test("$type-controller-sink", () {
var done = expectAsync(() {});
var c = broadcast
? new StreamController.broadcast(sync: sync)
: new StreamController(sync: sync);
var expected = new Events()
..add(42)
..error("error")
..add(1)
..add(2)
..add(3)
..add(4)
..add(5)
..add(43)
..close();
var actual = new Events.capture(
asBroadcast ? c.stream.asBroadcastStream() : c.stream);
var sink = c.sink;
sink.add(42);
sink.addError("error");
sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5])).then((_) {
sink.add(43);
return sink.close();
}).then((_) {
Expect.listEquals(expected.events, actual.events);
done();
});
});
test("$type-controller-sink-canceled", () {
var done = expectAsync(() {});
var c = broadcast
? new StreamController.broadcast(sync: sync)
: new StreamController(sync: sync);
var expected = new Events()
..add(42)
..error("error")
..add(1)
..add(2)
..add(3);
var stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
var actual = new Events();
var sub;
// Cancel subscription after receiving "3" event.
sub = stream.listen((v) {
if (v == 3) sub.cancel();
actual.add(v);
}, onError: actual.error);
var sink = c.sink;
sink.add(42);
sink.addError("error");
sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5])).then((_) {
Expect.listEquals(expected.events, actual.events);
// Close controller as well. It has no listener. If it is a broadcast
// stream, it will still be open, and we read the "done" future before
// closing. A normal stream is already done when its listener cancels.
Future doneFuture = sink.done;
sink.close();
return doneFuture;
}).then((_) {
// No change in events.
Expect.listEquals(expected.events, actual.events);
done();
});
});
test("$type-controller-sink-paused", () {
var done = expectAsync(() {});
var c = broadcast
? new StreamController.broadcast(sync: sync)
: new StreamController(sync: sync);
var expected = new Events()
..add(42)
..error("error")
..add(1)
..add(2)
..add(3)
..add(4)
..add(5)
..add(43)
..close();
var stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
var actual = new Events();
var sub;
var pauseIsDone = false;
sub = stream.listen((v) {
if (v == 3) {
sub.pause(new Future.delayed(const Duration(milliseconds: 15), () {
pauseIsDone = true;
}));
}
actual.add(v);
}, onError: actual.error, onDone: actual.close);
var sink = c.sink;
sink.add(42);
sink.addError("error");
sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5])).then((_) {
sink.add(43);
return sink.close();
}).then((_) {
if (asBroadcast || broadcast) {
// The done-future of the sink completes when it passes
// the done event to the asBroadcastStream controller, which is
// before the final listener gets the event.
// Wait for the done event to be *delivered* before testing the
// events.
actual.onDone(() {
Expect.listEquals(expected.events, actual.events);
done();
});
} else {
Expect.listEquals(expected.events, actual.events);
done();
}
});
});
test("$type-controller-addstream-error-stop", () {
// Check that addStream defaults to ending after the first error.
var done = expectAsync(() {});
StreamController c = broadcast
? new StreamController.broadcast(sync: sync)
: new StreamController(sync: sync);
Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
var actual = new Events.capture(stream);
var source = new Events();
source
..add(1)
..add(2)
..error("BAD")
..add(3)
..error("FAIL")
..close();
var expected = new Events()
..add(1)
..add(2)
..error("BAD")
..close();
StreamController sourceController = new StreamController();
c.addStream(sourceController.stream, cancelOnError: true).then((_) {
c.close().then((_) {
Expect.listEquals(expected.events, actual.events);
done();
});
});
source.replay(sourceController);
});
test("$type-controller-addstream-error-forward", () {
// Check that addStream with cancelOnError:false passes all data and errors
// to the controller.
var done = expectAsync(() {});
StreamController c = broadcast
? new StreamController.broadcast(sync: sync)
: new StreamController(sync: sync);
Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
var actual = new Events.capture(stream);
var source = new Events();
source
..add(1)
..add(2)
..addError("BAD")
..add(3)
..addError("FAIL")
..close();
StreamController sourceController = new StreamController();
c.addStream(sourceController.stream).then((_) {
c.close().then((_) {
Expect.listEquals(source.events, actual.events);
done();
});
});
source.replay(sourceController);
});
test("$type-controller-addstream-twice", () {
// Using addStream twice on the same stream
var done = expectAsync(() {});
StreamController c = broadcast
? new StreamController.broadcast(sync: sync)
: new StreamController(sync: sync);
Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream;
var actual = new Events.capture(stream);
// Streams of five events, throws on 3.
Stream s1 = new Stream.fromIterable([1, 2, 3, 4, 5])
.map((x) => (x == 3 ? throw x : x));
Stream s2 = new Stream.fromIterable([1, 2, 3, 4, 5])
.map((x) => (x == 3 ? throw x : x));
Events expected = new Events();
expected
..add(1)
..add(2)
..error(3);
expected
..add(1)
..add(2)
..error(3)
..add(4)
..add(5);
expected..close();
c.addStream(s1, cancelOnError: true).then((_) {
c.addStream(s2, cancelOnError: false).then((_) {
c.close().then((_) {
Expect.listEquals(expected.events, actual.events);
done();
});
});
});
});
}
main() {
testController();
testSingleController();
testExtraMethods();
testPause();
testRethrow();
testBroadcastController();
testAsBroadcast();
testSink(sync: true, broadcast: false, asBroadcast: false);
testSink(sync: true, broadcast: false, asBroadcast: true);
testSink(sync: true, broadcast: true, asBroadcast: false);
testSink(sync: false, broadcast: false, asBroadcast: false);
testSink(sync: false, broadcast: false, asBroadcast: true);
testSink(sync: false, broadcast: true, asBroadcast: false);
}