blob: 5b8b3e77be873d12bf1f75a0309803980b30a87a [file] [log] [blame]
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE filevents.
import 'dart:async';
import 'package:async/async.dart';
import 'package:test/test.dart';
void main() {
late StreamController controller;
setUp(() {
controller = StreamController();
});
test('passes through data events', () {
controller.sink.rejectErrors()
..add(1)
..add(2)
..add(3);
expect(controller.stream, emitsInOrder([1, 2, 3]));
});
test('passes through close events', () {
controller.sink.rejectErrors()
..add(1)
..close();
expect(controller.stream, emitsInOrder([1, emitsDone]));
});
test('passes through data events from addStream()', () {
controller.sink.rejectErrors().addStream(Stream.fromIterable([1, 2, 3]));
expect(controller.stream, emitsInOrder([1, 2, 3]));
});
test('allows multiple addStream() calls', () async {
var transformed = controller.sink.rejectErrors();
await transformed.addStream(Stream.fromIterable([1, 2, 3]));
await transformed.addStream(Stream.fromIterable([4, 5, 6]));
expect(controller.stream, emitsInOrder([1, 2, 3, 4, 5, 6]));
});
group('on addError()', () {
test('forwards the error to done', () {
var transformed = controller.sink.rejectErrors();
transformed.addError('oh no');
expect(transformed.done, throwsA('oh no'));
});
test('closes the underlying sink', () {
var transformed = controller.sink.rejectErrors();
transformed.addError('oh no');
transformed.done.catchError((_) {});
expect(controller.stream, emitsDone);
});
test('ignores further events', () async {
var transformed = controller.sink.rejectErrors();
transformed.addError('oh no');
transformed.done.catchError((_) {});
expect(controller.stream, emitsDone);
// Try adding events synchronously and asynchronously and verify that they
// don't throw and also aren't passed to the underlying sink.
transformed
..add(1)
..addError('another');
await pumpEventQueue();
transformed
..add(2)
..addError('yet another');
});
test('cancels the current subscription', () async {
var inputCanceled = false;
var inputController =
StreamController(onCancel: () => inputCanceled = true);
var transformed = controller.sink.rejectErrors()
..addStream(inputController.stream);
inputController.addError('oh no');
transformed.done.catchError((_) {});
await pumpEventQueue();
expect(inputCanceled, isTrue);
});
});
group('when the inner sink\'s done future completes', () {
test('done completes', () async {
var completer = Completer();
var transformed = NullStreamSink(done: completer.future).rejectErrors();
var doneCompleted = false;
transformed.done.then((_) => doneCompleted = true);
await pumpEventQueue();
expect(doneCompleted, isFalse);
completer.complete();
await pumpEventQueue();
expect(doneCompleted, isTrue);
});
test('an outstanding addStream() completes', () async {
var completer = Completer();
var transformed = NullStreamSink(done: completer.future).rejectErrors();
var addStreamCompleted = false;
transformed
.addStream(StreamController().stream)
.then((_) => addStreamCompleted = true);
await pumpEventQueue();
expect(addStreamCompleted, isFalse);
completer.complete();
await pumpEventQueue();
expect(addStreamCompleted, isTrue);
});
test('an outstanding addStream()\'s subscription is cancelled', () async {
var completer = Completer();
var transformed = NullStreamSink(done: completer.future).rejectErrors();
var addStreamCancelled = false;
transformed.addStream(
StreamController(onCancel: () => addStreamCancelled = true).stream);
await pumpEventQueue();
expect(addStreamCancelled, isFalse);
completer.complete();
await pumpEventQueue();
expect(addStreamCancelled, isTrue);
});
test('forwards an outstanding addStream()\'s cancellation error', () async {
var completer = Completer();
var transformed = NullStreamSink(done: completer.future).rejectErrors();
expect(
transformed.addStream(
StreamController(onCancel: () => throw 'oh no').stream),
throwsA('oh no'));
completer.complete();
});
group('forwards its error', () {
test('through done', () async {
expect(NullStreamSink(done: Future.error('oh no')).rejectErrors().done,
throwsA('oh no'));
});
test('through close', () async {
expect(
NullStreamSink(done: Future.error('oh no')).rejectErrors().close(),
throwsA('oh no'));
});
});
});
group('after closing', () {
test('throws on add()', () {
var sink = controller.sink.rejectErrors()..close();
expect(() => sink.add(1), throwsStateError);
});
test('throws on addError()', () {
var sink = controller.sink.rejectErrors()..close();
expect(() => sink.addError('oh no'), throwsStateError);
});
test('throws on addStream()', () {
var sink = controller.sink.rejectErrors()..close();
expect(() => sink.addStream(Stream.empty()), throwsStateError);
});
test('allows close()', () {
var sink = controller.sink.rejectErrors()..close();
sink.close(); // Shouldn't throw
});
});
group('during an active addStream()', () {
test('throws on add()', () {
var sink = controller.sink.rejectErrors()
..addStream(StreamController().stream);
expect(() => sink.add(1), throwsStateError);
});
test('throws on addError()', () {
var sink = controller.sink.rejectErrors()
..addStream(StreamController().stream);
expect(() => sink.addError('oh no'), throwsStateError);
});
test('throws on addStream()', () {
var sink = controller.sink.rejectErrors()
..addStream(StreamController().stream);
expect(() => sink.addStream(Stream.empty()), throwsStateError);
});
test('throws on close()', () {
var sink = controller.sink.rejectErrors()
..addStream(StreamController().stream);
expect(() => sink.close(), throwsStateError);
});
});
}