blob: 76d81d1c3146dcc0d983cb9ab3ac5d6177df8d86 [file] [log] [blame]
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import "dart:async";
import "package:async/async.dart" show StreamCompleter;
import "package:test/test.dart";
import "utils.dart";
main() {
test("a stream is linked before listening", () async {
var completer = new StreamCompleter();
completer.setSourceStream(createStream());
expect(completer.stream.toList(), completion([1, 2, 3, 4]));
});
test("listened to before a stream is linked", () async {
var completer = new StreamCompleter();
var done = completer.stream.toList();
await flushMicrotasks();
completer.setSourceStream(createStream());
expect(done, completion([1, 2, 3, 4]));
});
test("cancel before linking a stream doesn't listen on stream", () async {
var completer = new StreamCompleter();
var subscription = completer.stream.listen(null);
subscription.pause(); // Should be ignored.
subscription.cancel();
completer.setSourceStream(new UnusableStream()); // Doesn't throw.
});
test("listen and pause before linking stream", () async {
var controller = new StreamCompleter();
var events = [];
var subscription = controller.stream.listen(events.add);
var done = subscription.asFuture();
subscription.pause();
var sourceController = new StreamController();
sourceController..add(1)..add(2)..add(3)..add(4);
controller.setSourceStream(sourceController.stream);
await flushMicrotasks();
expect(sourceController.hasListener, isTrue);
expect(sourceController.isPaused, isTrue);
expect(events, []);
subscription.resume();
await flushMicrotasks();
expect(sourceController.hasListener, isTrue);
expect(sourceController.isPaused, isFalse);
expect(events, [1, 2, 3, 4]);
sourceController.close();
await done;
expect(events, [1, 2, 3, 4]);
});
test("pause more than once", () async {
var completer = new StreamCompleter();
var events = [];
var subscription = completer.stream.listen(events.add);
var done = subscription.asFuture();
subscription.pause();
subscription.pause();
subscription.pause();
completer.setSourceStream(createStream());
for (int i = 0; i < 3; i++) {
await flushMicrotasks();
expect(events, []);
subscription.resume();
}
await done;
expect(events, [1, 2, 3, 4]);
});
test("cancel new stream before source is done", () async {
var completer = new StreamCompleter();
var lastEvent = -1;
var controller = new StreamController();
var subscription;
subscription = completer.stream.listen(
(value) {
expect(value, lessThan(3));
lastEvent = value;
if (value == 2) {
subscription.cancel();
}
},
onError: unreachable("error"),
onDone: unreachable("done"),
cancelOnError: true);
completer.setSourceStream(controller.stream);
expect(controller.hasListener, isTrue);
await flushMicrotasks();
expect(controller.hasListener, isTrue);
controller.add(1);
await flushMicrotasks();
expect(lastEvent, 1);
expect(controller.hasListener, isTrue);
controller.add(2);
await flushMicrotasks();
expect(lastEvent, 2);
expect(controller.hasListener, isFalse);
});
test("complete with setEmpty before listening", () async {
var completer = new StreamCompleter();
completer.setEmpty();
var done = new Completer();
completer.stream.listen(
unreachable("data"),
onError: unreachable("error"),
onDone: done.complete);
await done.future;
});
test("complete with setEmpty after listening", () async {
var completer = new StreamCompleter();
var done = new Completer();
completer.stream.listen(
unreachable("data"),
onError: unreachable("error"),
onDone: done.complete);
completer.setEmpty();
await done.future;
});
test("source stream isn't listened to until completer stream is", () async {
var completer = new StreamCompleter();
var controller;
controller = new StreamController(onListen: () {
scheduleMicrotask(controller.close);
});
completer.setSourceStream(controller.stream);
await flushMicrotasks();
expect(controller.hasListener, isFalse);
var subscription = completer.stream.listen(null);
expect(controller.hasListener, isTrue);
await subscription.asFuture();
});
test("cancelOnError true when listening before linking stream", () async {
var completer = new StreamCompleter();
var lastEvent = -1;
var controller = new StreamController();
completer.stream.listen(
(value) {
expect(value, lessThan(3));
lastEvent = value;
},
onError: (value) {
expect(value, "3");
lastEvent = value;
},
onDone: unreachable("done"),
cancelOnError: true);
completer.setSourceStream(controller.stream);
expect(controller.hasListener, isTrue);
await flushMicrotasks();
expect(controller.hasListener, isTrue);
controller.add(1);
await flushMicrotasks();
expect(lastEvent, 1);
expect(controller.hasListener, isTrue);
controller.add(2);
await flushMicrotasks();
expect(lastEvent, 2);
expect(controller.hasListener, isTrue);
controller.addError("3");
await flushMicrotasks();
expect(lastEvent, "3");
expect(controller.hasListener, isFalse);
});
test("cancelOnError true when listening after linking stream", () async {
var completer = new StreamCompleter();
var lastEvent = -1;
var controller = new StreamController();
completer.setSourceStream(controller.stream);
controller.add(1);
expect(controller.hasListener, isFalse);
completer.stream.listen(
(value) {
expect(value, lessThan(3));
lastEvent = value;
},
onError: (value) {
expect(value, "3");
lastEvent = value;
},
onDone: unreachable("done"),
cancelOnError: true);
expect(controller.hasListener, isTrue);
await flushMicrotasks();
expect(lastEvent, 1);
expect(controller.hasListener, isTrue);
controller.add(2);
await flushMicrotasks();
expect(lastEvent, 2);
expect(controller.hasListener, isTrue);
controller.addError("3");
await flushMicrotasks();
expect(controller.hasListener, isFalse);
});
test("linking a stream after setSourceStream before listen", () async {
var completer = new StreamCompleter();
completer.setSourceStream(createStream());
expect(() => completer.setSourceStream(createStream()), throwsStateError);
expect(() => completer.setEmpty(), throwsStateError);
await completer.stream.toList();
// Still fails after source is done
expect(() => completer.setSourceStream(createStream()), throwsStateError);
expect(() => completer.setEmpty(), throwsStateError);
});
test("linking a stream after setSourceStream after listen", () async {
var completer = new StreamCompleter();
var list = completer.stream.toList();
completer.setSourceStream(createStream());
expect(() => completer.setSourceStream(createStream()), throwsStateError);
expect(() => completer.setEmpty(), throwsStateError);
await list;
// Still fails after source is done.
expect(() => completer.setSourceStream(createStream()), throwsStateError);
expect(() => completer.setEmpty(), throwsStateError);
});
test("linking a stream after setEmpty before listen", () async {
var completer = new StreamCompleter();
completer.setEmpty();
expect(() => completer.setSourceStream(createStream()), throwsStateError);
expect(() => completer.setEmpty(), throwsStateError);
await completer.stream.toList();
// Still fails after source is done
expect(() => completer.setSourceStream(createStream()), throwsStateError);
expect(() => completer.setEmpty(), throwsStateError);
});
test("linking a stream after setEmpty() after listen", () async {
var completer = new StreamCompleter();
var list = completer.stream.toList();
completer.setEmpty();
expect(() => completer.setSourceStream(createStream()), throwsStateError);
expect(() => completer.setEmpty(), throwsStateError);
await list;
// Still fails after source is done.
expect(() => completer.setSourceStream(createStream()), throwsStateError);
expect(() => completer.setEmpty(), throwsStateError);
});
test("listening more than once after setting stream", () async {
var completer = new StreamCompleter();
completer.setSourceStream(createStream());
var list = completer.stream.toList();
expect(() => completer.stream.toList(), throwsStateError);
await list;
expect(() => completer.stream.toList(), throwsStateError);
});
test("listening more than once before setting stream", () async {
var completer = new StreamCompleter();
completer.stream.toList();
expect(() => completer.stream.toList(), throwsStateError);
});
test("setting onData etc. before and after setting stream", () async {
var completer = new StreamCompleter();
var controller = new StreamController();
var subscription = completer.stream.listen(null);
var lastEvent = 0;
subscription.onData((value) => lastEvent = value);
subscription.onError((value) => lastEvent = "$value");
subscription.onDone(() => lastEvent = -1);
completer.setSourceStream(controller.stream);
await flushMicrotasks();
controller.add(1);
await flushMicrotasks();
expect(lastEvent, 1);
controller.addError(2);
await flushMicrotasks();
expect(lastEvent, "2");
subscription.onData((value) => lastEvent = -value);
subscription.onError((value) => lastEvent = "${-value}");
controller.add(1);
await flushMicrotasks();
expect(lastEvent, -1);
controller.addError(2);
await flushMicrotasks();
expect(lastEvent, "-2");
controller.close();
await flushMicrotasks();
expect(lastEvent, -1);
});
test("pause w/ resume future accross setting stream", () async {
var completer = new StreamCompleter();
var resume = new Completer();
var subscription = completer.stream.listen(unreachable("data"));
subscription.pause(resume.future);
await flushMicrotasks();
completer.setSourceStream(createStream());
await flushMicrotasks();
resume.complete();
var events = [];
subscription.onData(events.add);
await subscription.asFuture();
expect(events, [1, 2, 3, 4]);
});
test("asFuture with error accross setting stream", () async {
var completer = new StreamCompleter();
var controller = new StreamController();
var subscription = completer.stream.listen(unreachable("data"),
cancelOnError: false);
var done = subscription.asFuture();
expect(controller.hasListener, isFalse);
completer.setSourceStream(controller.stream);
await flushMicrotasks();
expect(controller.hasListener, isTrue);
controller.addError(42);
await done.then(unreachable("data"), onError: (error) {
expect(error, 42);
});
expect(controller.hasListener, isFalse);
});
group("setError()", () {
test("produces a stream that emits a single error", () {
var completer = new StreamCompleter();
completer.stream.listen(
unreachable("data"),
onError: expectAsync((error, stackTrace) {
expect(error, equals("oh no"));
}),
onDone: expectAsync(() {}));
completer.setError("oh no");
});
test("produces a stream that emits a single error on a later listen",
() async {
var completer = new StreamCompleter();
completer.setError("oh no");
await flushMicrotasks();
completer.stream.listen(
unreachable("data"),
onError: expectAsync((error, stackTrace) {
expect(error, equals("oh no"));
}),
onDone: expectAsync(() {}));
});
});
}
Stream<int> createStream() async* {
yield 1;
await flushMicrotasks();
yield 2;
await flushMicrotasks();
yield 3;
await flushMicrotasks();
yield 4;
}