blob: a2bad1a9d048011efe2222ae1902959754cfa218 [file] [log] [blame]
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:async/async.dart';
import 'package:test/test.dart';
import 'utils.dart';
void main() {
late StreamCloser<int> closer;
setUp(() {
closer = StreamCloser();
});
group('when the closer is never closed', () {
test('forwards data and done events', () {
expect(
createStream().transform(closer).toList(), completion([1, 2, 3, 4]));
});
test('forwards error events', () {
expect(Stream<int>.error('oh no').transform(closer).toList(),
throwsA('oh no'));
});
test('transforms a broadcast stream into a broadcast stream', () {
expect(const Stream<int>.empty().transform(closer).isBroadcast, isTrue);
});
test("doesn't eagerly listen", () {
var controller = StreamController<int>();
var transformed = controller.stream.transform(closer);
expect(controller.hasListener, isFalse);
transformed.listen(null);
expect(controller.hasListener, isTrue);
});
test('forwards pause and resume', () {
var controller = StreamController<int>();
var transformed = controller.stream.transform(closer);
var subscription = transformed.listen(null);
expect(controller.isPaused, isFalse);
subscription.pause();
expect(controller.isPaused, isTrue);
subscription.resume();
expect(controller.isPaused, isFalse);
});
test('forwards cancel', () {
var isCancelled = false;
var controller =
StreamController<int>(onCancel: () => isCancelled = true);
var transformed = controller.stream.transform(closer);
expect(isCancelled, isFalse);
var subscription = transformed.listen(null);
expect(isCancelled, isFalse);
subscription.cancel();
expect(isCancelled, isTrue);
});
test('forwards errors from cancel', () {
var controller = StreamController<int>(onCancel: () => throw 'oh no');
expect(controller.stream.transform(closer).listen(null).cancel(),
throwsA('oh no'));
});
});
group('when a stream is added before the closer is closed', () {
test('the stream emits a close event once the closer is closed', () async {
var queue = StreamQueue(createStream().transform(closer));
await expectLater(queue, emits(1));
await expectLater(queue, emits(2));
expect(closer.close(), completes);
expect(queue, emitsDone);
});
test('the inner subscription is canceled once the closer is closed', () {
var isCancelled = false;
var controller =
StreamController<int>(onCancel: () => isCancelled = true);
expect(controller.stream.transform(closer), emitsDone);
expect(closer.close(), completes);
expect(isCancelled, isTrue);
});
test('closer.close() forwards errors from StreamSubscription.cancel()', () {
var controller = StreamController<int>(onCancel: () => throw 'oh no');
expect(controller.stream.transform(closer), emitsDone);
expect(closer.close(), throwsA('oh no'));
});
test('closer.close() works even if a stream has already completed',
() async {
expect(await createStream().transform(closer).toList(),
equals([1, 2, 3, 4]));
expect(closer.close(), completes);
});
test('closer.close() works even if a stream has already been canceled',
() async {
createStream().transform(closer).listen(null).cancel();
expect(closer.close(), completes);
});
group('but listened afterwards', () {
test('the output stream immediately emits done', () {
var stream = createStream().transform(closer);
expect(closer.close(), completes);
expect(stream, emitsDone);
});
test(
'the underlying subscription is never listened if the stream is '
'never listened', () async {
var controller =
StreamController<int>(onListen: expectAsync0(() {}, count: 0));
controller.stream.transform(closer);
expect(closer.close(), completes);
await pumpEventQueue();
});
test(
'the underlying subscription is listened and then canceled once the '
'stream is listened', () {
var controller = StreamController<int>(
onListen: expectAsync0(() {}), onCancel: expectAsync0(() {}));
var stream = controller.stream.transform(closer);
expect(closer.close(), completes);
stream.listen(null);
});
test('Subscription.cancel() errors are silently ignored', () async {
var controller =
StreamController<int>(onCancel: expectAsync0(() => throw 'oh no'));
var stream = controller.stream.transform(closer);
expect(closer.close(), completes);
stream.listen(null);
await pumpEventQueue();
});
});
});
group('when a stream is added after the closer is closed', () {
test('the output stream immediately emits done', () {
expect(closer.close(), completes);
expect(createStream().transform(closer), emitsDone);
});
test(
'the underlying subscription is never listened if the stream is never '
'listened', () async {
expect(closer.close(), completes);
var controller =
StreamController<int>(onListen: expectAsync0(() {}, count: 0));
controller.stream.transform(closer);
await pumpEventQueue();
});
test(
'the underlying subscription is listened and then canceled once the '
'stream is listened', () {
expect(closer.close(), completes);
var controller = StreamController<int>(
onListen: expectAsync0(() {}), onCancel: expectAsync0(() {}));
controller.stream.transform(closer).listen(null);
});
test('Subscription.cancel() errors are silently ignored', () async {
expect(closer.close(), completes);
var controller =
StreamController<int>(onCancel: expectAsync0(() => throw 'oh no'));
controller.stream.transform(closer).listen(null);
await pumpEventQueue();
});
});
}
Stream<int> createStream() async* {
yield 1;
await flushMicrotasks();
yield 2;
await flushMicrotasks();
yield 3;
await flushMicrotasks();
yield 4;
}