blob: 591f32fc7b6d202c6b48240409d981306c02c440 [file] [log] [blame]
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:async/async.dart';
import 'package:test/test.dart';
import 'utils.dart';
void main() {
late StreamSinkCompleter completer;
setUp(() {
completer = StreamSinkCompleter();
});
group('when a stream is linked before events are added', () {
test('data events are forwarded', () {
var sink = TestSink();
completer.setDestinationSink(sink);
completer.sink..add(1)..add(2)..add(3)..add(4);
expect(sink.results[0].asValue!.value, equals(1));
expect(sink.results[1].asValue!.value, equals(2));
expect(sink.results[2].asValue!.value, equals(3));
expect(sink.results[3].asValue!.value, equals(4));
});
test('error events are forwarded', () {
var sink = TestSink();
completer.setDestinationSink(sink);
completer.sink..addError('oh no')..addError("that's bad");
expect(sink.results[0].asError!.error, equals('oh no'));
expect(sink.results[1].asError!.error, equals("that's bad"));
});
test('addStream is forwarded', () async {
var sink = TestSink();
completer.setDestinationSink(sink);
var controller = StreamController();
completer.sink.addStream(controller.stream);
controller.add(1);
controller.addError('oh no');
controller.add(2);
controller.addError("that's bad");
await flushMicrotasks();
expect(sink.results[0].asValue!.value, equals(1));
expect(sink.results[1].asError!.error, equals('oh no'));
expect(sink.results[2].asValue!.value, equals(2));
expect(sink.results[3].asError!.error, equals("that's bad"));
expect(sink.isClosed, isFalse);
controller.close();
await flushMicrotasks();
expect(sink.isClosed, isFalse);
});
test('close() is forwarded', () {
var sink = TestSink();
completer.setDestinationSink(sink);
completer.sink.close();
expect(sink.isClosed, isTrue);
});
test('the future from the inner close() is returned', () async {
var closeCompleter = Completer();
var sink = TestSink(onDone: () => closeCompleter.future);
completer.setDestinationSink(sink);
var closeCompleted = false;
completer.sink.close().then(expectAsync1((_) {
closeCompleted = true;
}));
await flushMicrotasks();
expect(closeCompleted, isFalse);
closeCompleter.complete();
await flushMicrotasks();
expect(closeCompleted, isTrue);
});
test('errors are forwarded from the inner close()', () {
var sink = TestSink(onDone: () => throw 'oh no');
completer.setDestinationSink(sink);
expect(completer.sink.done, throwsA('oh no'));
expect(completer.sink.close(), throwsA('oh no'));
});
test("errors aren't top-leveled if only close() is listened to", () async {
var sink = TestSink(onDone: () => throw 'oh no');
completer.setDestinationSink(sink);
expect(completer.sink.close(), throwsA('oh no'));
// Give the event loop a chance to top-level errors if it's going to.
await flushMicrotasks();
});
test("errors aren't top-leveled if only done is listened to", () async {
var sink = TestSink(onDone: () => throw 'oh no');
completer.setDestinationSink(sink);
completer.sink.close();
expect(completer.sink.done, throwsA('oh no'));
// Give the event loop a chance to top-level errors if it's going to.
await flushMicrotasks();
});
});
group('when a stream is linked after events are added', () {
test('data events are forwarded', () async {
completer.sink..add(1)..add(2)..add(3)..add(4);
await flushMicrotasks();
var sink = TestSink();
completer.setDestinationSink(sink);
await flushMicrotasks();
expect(sink.results[0].asValue!.value, equals(1));
expect(sink.results[1].asValue!.value, equals(2));
expect(sink.results[2].asValue!.value, equals(3));
expect(sink.results[3].asValue!.value, equals(4));
});
test('error events are forwarded', () async {
completer.sink..addError('oh no')..addError("that's bad");
await flushMicrotasks();
var sink = TestSink();
completer.setDestinationSink(sink);
await flushMicrotasks();
expect(sink.results[0].asError!.error, equals('oh no'));
expect(sink.results[1].asError!.error, equals("that's bad"));
});
test('addStream is forwarded', () async {
var controller = StreamController();
completer.sink.addStream(controller.stream);
controller.add(1);
controller.addError('oh no');
controller.add(2);
controller.addError("that's bad");
controller.close();
await flushMicrotasks();
var sink = TestSink();
completer.setDestinationSink(sink);
await flushMicrotasks();
expect(sink.results[0].asValue!.value, equals(1));
expect(sink.results[1].asError!.error, equals('oh no'));
expect(sink.results[2].asValue!.value, equals(2));
expect(sink.results[3].asError!.error, equals("that's bad"));
expect(sink.isClosed, isFalse);
});
test('close() is forwarded', () async {
completer.sink.close();
await flushMicrotasks();
var sink = TestSink();
completer.setDestinationSink(sink);
await flushMicrotasks();
expect(sink.isClosed, isTrue);
});
test('the future from the inner close() is returned', () async {
var closeCompleted = false;
completer.sink.close().then(expectAsync1((_) {
closeCompleted = true;
}));
await flushMicrotasks();
var closeCompleter = Completer();
var sink = TestSink(onDone: () => closeCompleter.future);
completer.setDestinationSink(sink);
await flushMicrotasks();
expect(closeCompleted, isFalse);
closeCompleter.complete();
await flushMicrotasks();
expect(closeCompleted, isTrue);
});
test('errors are forwarded from the inner close()', () async {
expect(completer.sink.done, throwsA('oh no'));
expect(completer.sink.close(), throwsA('oh no'));
await flushMicrotasks();
var sink = TestSink(onDone: () => throw 'oh no');
completer.setDestinationSink(sink);
});
test("errors aren't top-leveled if only close() is listened to", () async {
expect(completer.sink.close(), throwsA('oh no'));
await flushMicrotasks();
var sink = TestSink(onDone: () => throw 'oh no');
completer.setDestinationSink(sink);
// Give the event loop a chance to top-level errors if it's going to.
await flushMicrotasks();
});
test("errors aren't top-leveled if only done is listened to", () async {
completer.sink.close();
expect(completer.sink.done, throwsA('oh no'));
await flushMicrotasks();
var sink = TestSink(onDone: () => throw 'oh no');
completer.setDestinationSink(sink);
// Give the event loop a chance to top-level errors if it's going to.
await flushMicrotasks();
});
});
test('the sink is closed, the destination is set, then done is read',
() async {
expect(completer.sink.close(), completes);
await flushMicrotasks();
completer.setDestinationSink(TestSink());
await flushMicrotasks();
expect(completer.sink.done, completes);
});
test('done is read, the destination is set, then the sink is closed',
() async {
expect(completer.sink.done, completes);
await flushMicrotasks();
completer.setDestinationSink(TestSink());
await flushMicrotasks();
expect(completer.sink.close(), completes);
});
group('fromFuture()', () {
test('with a successful completion', () async {
var futureCompleter = Completer<StreamSink>();
var sink = StreamSinkCompleter.fromFuture(futureCompleter.future);
sink.add(1);
sink.add(2);
sink.add(3);
sink.close();
var testSink = TestSink();
futureCompleter.complete(testSink);
await testSink.done;
expect(testSink.results[0].asValue!.value, equals(1));
expect(testSink.results[1].asValue!.value, equals(2));
expect(testSink.results[2].asValue!.value, equals(3));
});
test('with an error', () async {
var futureCompleter = Completer<StreamSink>();
var sink = StreamSinkCompleter.fromFuture(futureCompleter.future);
expect(sink.done, throwsA('oh no'));
futureCompleter.completeError('oh no');
});
});
group('setError()', () {
test('produces a closed sink with the error', () {
completer.setError('oh no');
expect(completer.sink.done, throwsA('oh no'));
expect(completer.sink.close(), throwsA('oh no'));
});
test('produces an error even if done was accessed earlier', () async {
expect(completer.sink.done, throwsA('oh no'));
expect(completer.sink.close(), throwsA('oh no'));
await flushMicrotasks();
completer.setError('oh no');
});
});
test("doesn't allow the destination sink to be set multiple times", () {
completer.setDestinationSink(TestSink());
expect(() => completer.setDestinationSink(TestSink()), throwsStateError);
expect(() => completer.setDestinationSink(TestSink()), throwsStateError);
});
}