blob: 82393fcdf71dce6b0c590c91cf72b513bb2e4d55 [file] [log] [blame]
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import "dart:async";
import "package:async/async.dart";
import "package:test/test.dart";
import "utils.dart";
main() {
var completer;
setUp(() {
completer = new StreamSinkCompleter();
});
group("when a stream is linked before events are added", () {
test("data events are forwarded", () {
var sink = new TestSink();
completer.setDestinationSink(sink);
completer.sink..add(1)..add(2)..add(3)..add(4);
expect(sink.results[0].asValue.value, equals(1));
expect(sink.results[1].asValue.value, equals(2));
expect(sink.results[2].asValue.value, equals(3));
expect(sink.results[3].asValue.value, equals(4));
});
test("error events are forwarded", () {
var sink = new TestSink();
completer.setDestinationSink(sink);
completer.sink..addError("oh no")..addError("that's bad");
expect(sink.results[0].asError.error, equals("oh no"));
expect(sink.results[1].asError.error, equals("that's bad"));
});
test("addStream is forwarded", () async {
var sink = new TestSink();
completer.setDestinationSink(sink);
var controller = new StreamController();
completer.sink.addStream(controller.stream);
controller.add(1);
controller.addError("oh no");
controller.add(2);
controller.addError("that's bad");
await flushMicrotasks();
expect(sink.results[0].asValue.value, equals(1));
expect(sink.results[1].asError.error, equals("oh no"));
expect(sink.results[2].asValue.value, equals(2));
expect(sink.results[3].asError.error, equals("that's bad"));
expect(sink.isClosed, isFalse);
controller.close();
await flushMicrotasks();
expect(sink.isClosed, isFalse);
});
test("close() is forwarded", () {
var sink = new TestSink();
completer.setDestinationSink(sink);
completer.sink.close();
expect(sink.isClosed, isTrue);
});
test("the future from the inner close() is returned", () async {
var closeCompleter = new Completer();
var sink = new TestSink(onDone: () => closeCompleter.future);
completer.setDestinationSink(sink);
var closeCompleted = false;
completer.sink.close().then(expectAsync((_) {
closeCompleted = true;
}));
await flushMicrotasks();
expect(closeCompleted, isFalse);
closeCompleter.complete();
await flushMicrotasks();
expect(closeCompleted, isTrue);
});
test("errors are forwarded from the inner close()", () {
var sink = new TestSink(onDone: () => throw "oh no");
completer.setDestinationSink(sink);
expect(completer.sink.done, throwsA("oh no"));
expect(completer.sink.close(), throwsA("oh no"));
});
test("errors aren't top-leveled if only close() is listened to", () async {
var sink = new TestSink(onDone: () => throw "oh no");
completer.setDestinationSink(sink);
expect(completer.sink.close(), throwsA("oh no"));
// Give the event loop a chance to top-level errors if it's going to.
await flushMicrotasks();
});
test("errors aren't top-leveled if only done is listened to", () async {
var sink = new TestSink(onDone: () => throw "oh no");
completer.setDestinationSink(sink);
completer.sink.close();
expect(completer.sink.done, throwsA("oh no"));
// Give the event loop a chance to top-level errors if it's going to.
await flushMicrotasks();
});
});
group("when a stream is linked after events are added", () {
test("data events are forwarded", () async {
completer.sink..add(1)..add(2)..add(3)..add(4);
await flushMicrotasks();
var sink = new TestSink();
completer.setDestinationSink(sink);
await flushMicrotasks();
expect(sink.results[0].asValue.value, equals(1));
expect(sink.results[1].asValue.value, equals(2));
expect(sink.results[2].asValue.value, equals(3));
expect(sink.results[3].asValue.value, equals(4));
});
test("error events are forwarded", () async {
completer.sink..addError("oh no")..addError("that's bad");
await flushMicrotasks();
var sink = new TestSink();
completer.setDestinationSink(sink);
await flushMicrotasks();
expect(sink.results[0].asError.error, equals("oh no"));
expect(sink.results[1].asError.error, equals("that's bad"));
});
test("addStream is forwarded", () async {
var controller = new StreamController();
completer.sink.addStream(controller.stream);
controller.add(1);
controller.addError("oh no");
controller.add(2);
controller.addError("that's bad");
controller.close();
await flushMicrotasks();
var sink = new TestSink();
completer.setDestinationSink(sink);
await flushMicrotasks();
expect(sink.results[0].asValue.value, equals(1));
expect(sink.results[1].asError.error, equals("oh no"));
expect(sink.results[2].asValue.value, equals(2));
expect(sink.results[3].asError.error, equals("that's bad"));
expect(sink.isClosed, isFalse);
});
test("close() is forwarded", () async {
completer.sink.close();
await flushMicrotasks();
var sink = new TestSink();
completer.setDestinationSink(sink);
await flushMicrotasks();
expect(sink.isClosed, isTrue);
});
test("the future from the inner close() is returned", () async {
var closeCompleted = false;
completer.sink.close().then(expectAsync((_) {
closeCompleted = true;
}));
await flushMicrotasks();
var closeCompleter = new Completer();
var sink = new TestSink(onDone: () => closeCompleter.future);
completer.setDestinationSink(sink);
await flushMicrotasks();
expect(closeCompleted, isFalse);
closeCompleter.complete();
await flushMicrotasks();
expect(closeCompleted, isTrue);
});
test("errors are forwarded from the inner close()", () async {
expect(completer.sink.done, throwsA("oh no"));
expect(completer.sink.close(), throwsA("oh no"));
await flushMicrotasks();
var sink = new TestSink(onDone: () => throw "oh no");
completer.setDestinationSink(sink);
});
test("errors aren't top-leveled if only close() is listened to", () async {
expect(completer.sink.close(), throwsA("oh no"));
await flushMicrotasks();
var sink = new TestSink(onDone: () => throw "oh no");
completer.setDestinationSink(sink);
// Give the event loop a chance to top-level errors if it's going to.
await flushMicrotasks();
});
test("errors aren't top-leveled if only done is listened to", () async {
completer.sink.close();
expect(completer.sink.done, throwsA("oh no"));
await flushMicrotasks();
var sink = new TestSink(onDone: () => throw "oh no");
completer.setDestinationSink(sink);
// Give the event loop a chance to top-level errors if it's going to.
await flushMicrotasks();
});
});
test("the sink is closed, the destination is set, then done is read",
() async {
expect(completer.sink.close(), completes);
await flushMicrotasks();
completer.setDestinationSink(new TestSink());
await flushMicrotasks();
expect(completer.sink.done, completes);
});
test("done is read, the destination is set, then the sink is closed",
() async {
expect(completer.sink.done, completes);
await flushMicrotasks();
completer.setDestinationSink(new TestSink());
await flushMicrotasks();
expect(completer.sink.close(), completes);
});
group("fromFuture()", () {
test("with a successful completion", () async {
var futureCompleter = new Completer();
var sink = StreamSinkCompleter.fromFuture(futureCompleter.future);
sink.add(1);
sink.add(2);
sink.add(3);
sink.close();
var testSink = new TestSink();
futureCompleter.complete(testSink);
await testSink.done;
expect(testSink.results[0].asValue.value, equals(1));
expect(testSink.results[1].asValue.value, equals(2));
expect(testSink.results[2].asValue.value, equals(3));
});
test("with an error", () async {
var futureCompleter = new Completer();
var sink = StreamSinkCompleter.fromFuture(futureCompleter.future);
expect(sink.done, throwsA("oh no"));
futureCompleter.completeError("oh no");
});
});
group("setError()", () {
test("produces a closed sink with the error", () {
completer.setError("oh no");
expect(completer.sink.done, throwsA("oh no"));
expect(completer.sink.close(), throwsA("oh no"));
});
test("produces an error even if done was accessed earlier", () async {
expect(completer.sink.done, throwsA("oh no"));
expect(completer.sink.close(), throwsA("oh no"));
await flushMicrotasks();
completer.setError("oh no");
});
});
test("doesn't allow the destination sink to be set multiple times", () {
completer.setDestinationSink(new TestSink());
expect(() => completer.setDestinationSink(new TestSink()),
throwsStateError);
expect(() => completer.setDestinationSink(new TestSink()),
throwsStateError);
});
}