blob: 37941656708e769ccd275bb8128c0fe22f8a9d68 [file] [log] [blame]
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:stream_transform/stream_transform.dart';
import 'package:test/test.dart';
import 'utils.dart';
void main() {
test('forwards errors from the convert callback', () async {
var errors = <String>[];
var source = Stream.fromIterable([1, 2, 3]);
source.concurrentAsyncExpand((i) {
// ignore: only_throw_errors
throw 'Error: $i';
}).listen((_) {}, onError: errors.add);
await Future<void>(() {});
expect(errors, ['Error: 1', 'Error: 2', 'Error: 3']);
});
for (var outerType in streamTypes) {
for (var innerType in streamTypes) {
group('concurrentAsyncExpand $outerType to $innerType', () {
late StreamController<int> outerController;
late bool outerCanceled;
late List<StreamController<String>> innerControllers;
late List<bool> innerCanceled;
late List<String> emittedValues;
late bool isDone;
late List<String> errors;
late Stream<String> transformed;
late StreamSubscription<String> subscription;
setUp(() {
outerController = createController(outerType)
..onCancel = () {
outerCanceled = true;
};
outerCanceled = false;
innerControllers = [];
innerCanceled = [];
emittedValues = [];
errors = [];
isDone = false;
transformed = outerController.stream.concurrentAsyncExpand((i) {
var index = innerControllers.length;
innerCanceled.add(false);
innerControllers.add(createController<String>(innerType)
..onCancel = () {
innerCanceled[index] = true;
});
return innerControllers.last.stream;
});
subscription = transformed
.listen(emittedValues.add, onError: errors.add, onDone: () {
isDone = true;
});
});
test('interleaves events from sub streams', () async {
outerController
..add(1)
..add(2);
await Future<void>(() {});
expect(emittedValues, isEmpty);
expect(innerControllers, hasLength(2));
innerControllers[0].add('First');
innerControllers[1].add('Second');
innerControllers[0].add('First again');
await Future<void>(() {});
expect(emittedValues, ['First', 'Second', 'First again']);
});
test('forwards errors from outer stream', () async {
outerController.addError('Error');
await Future<void>(() {});
expect(errors, ['Error']);
});
test('forwards errors from inner streams', () async {
outerController
..add(1)
..add(2);
await Future<void>(() {});
innerControllers[0].addError('Error 1');
innerControllers[1].addError('Error 2');
await Future<void>(() {});
expect(errors, ['Error 1', 'Error 2']);
});
test('can continue handling events after an error in outer stream',
() async {
outerController
..addError('Error')
..add(1);
await Future<void>(() {});
innerControllers[0].add('First');
await Future<void>(() {});
expect(emittedValues, ['First']);
expect(errors, ['Error']);
});
test('cancels outer subscription if output canceled', () async {
await subscription.cancel();
expect(outerCanceled, true);
});
if (outerType != 'broadcast' || innerType != 'single subscription') {
// A single subscription inner stream in a broadcast outer stream is
// not canceled.
test('cancels inner subscriptions if output canceled', () async {
outerController
..add(1)
..add(2);
await Future<void>(() {});
await subscription.cancel();
expect(innerCanceled, [true, true]);
});
}
test('stays open if any inner stream is still open', () async {
outerController.add(1);
await outerController.close();
await Future<void>(() {});
expect(isDone, false);
});
test('stays open if outer stream is still open', () async {
outerController.add(1);
await Future<void>(() {});
await innerControllers[0].close();
await Future<void>(() {});
expect(isDone, false);
});
test('closes after all inner streams and outer stream close', () async {
outerController.add(1);
await Future<void>(() {});
await innerControllers[0].close();
await outerController.close();
await Future<void>(() {});
expect(isDone, true);
});
if (outerType == 'broadcast') {
test('multiple listerns all get values', () async {
var otherValues = <String>[];
transformed.listen(otherValues.add);
outerController.add(1);
await Future<void>(() {});
innerControllers[0].add('First');
await Future<void>(() {});
expect(emittedValues, ['First']);
expect(otherValues, ['First']);
});
test('multiple listeners get closed', () async {
var otherDone = false;
transformed.listen(null, onDone: () => otherDone = true);
outerController.add(1);
await Future<void>(() {});
await innerControllers[0].close();
await outerController.close();
await Future<void>(() {});
expect(isDone, true);
expect(otherDone, true);
});
test('can cancel and relisten', () async {
outerController
..add(1)
..add(2);
await Future(() {});
innerControllers[0].add('First');
innerControllers[1].add('Second');
await Future(() {});
await subscription.cancel();
innerControllers[0].add('Ignored');
await Future(() {});
subscription = transformed.listen(emittedValues.add);
innerControllers[0].add('Also ignored');
outerController.add(3);
await Future(() {});
innerControllers[2].add('More');
await Future(() {});
expect(emittedValues, ['First', 'Second', 'More']);
});
}
});
}
}
}