// Copyright (c) 2019, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:stream_transform/stream_transform.dart';
import 'package:test/test.dart';

import 'utils.dart';

void main() {
  test('forwards errors from the convert callback', () async {
    var errors = <String>[];
    var source = Stream.fromIterable([1, 2, 3]);
    source.concurrentAsyncExpand<void>((i) {
      // ignore: only_throw_errors
      throw 'Error: $i';
    }).listen((_) {}, onError: errors.add);
    await Future<void>(() {});
    expect(errors, ['Error: 1', 'Error: 2', 'Error: 3']);
  });

  for (var outerType in streamTypes) {
    for (var innerType in streamTypes) {
      group('concurrentAsyncExpand $outerType to $innerType', () {
        late StreamController<int> outerController;
        late bool outerCanceled;
        late List<StreamController<String>> innerControllers;
        late List<bool> innerCanceled;
        late List<String> emittedValues;
        late bool isDone;
        late List<String> errors;
        late Stream<String> transformed;
        late StreamSubscription<String> subscription;

        setUp(() {
          outerController = createController(outerType)
            ..onCancel = () {
              outerCanceled = true;
            };
          outerCanceled = false;
          innerControllers = [];
          innerCanceled = [];
          emittedValues = [];
          errors = [];
          isDone = false;
          transformed = outerController.stream.concurrentAsyncExpand((i) {
            var index = innerControllers.length;
            innerCanceled.add(false);
            innerControllers.add(createController<String>(innerType)
              ..onCancel = () {
                innerCanceled[index] = true;
              });
            return innerControllers.last.stream;
          });
          subscription = transformed
              .listen(emittedValues.add, onError: errors.add, onDone: () {
            isDone = true;
          });
        });

        test('interleaves events from sub streams', () async {
          outerController
            ..add(1)
            ..add(2);
          await Future<void>(() {});
          expect(emittedValues, isEmpty);
          expect(innerControllers, hasLength(2));
          innerControllers[0].add('First');
          innerControllers[1].add('Second');
          innerControllers[0].add('First again');
          await Future<void>(() {});
          expect(emittedValues, ['First', 'Second', 'First again']);
        });

        test('forwards errors from outer stream', () async {
          outerController.addError('Error');
          await Future<void>(() {});
          expect(errors, ['Error']);
        });

        test('forwards errors from inner streams', () async {
          outerController
            ..add(1)
            ..add(2);
          await Future<void>(() {});
          innerControllers[0].addError('Error 1');
          innerControllers[1].addError('Error 2');
          await Future<void>(() {});
          expect(errors, ['Error 1', 'Error 2']);
        });

        test('can continue handling events after an error in outer stream',
            () async {
          outerController
            ..addError('Error')
            ..add(1);
          await Future<void>(() {});
          innerControllers[0].add('First');
          await Future<void>(() {});
          expect(emittedValues, ['First']);
          expect(errors, ['Error']);
        });

        test('cancels outer subscription if output canceled', () async {
          await subscription.cancel();
          expect(outerCanceled, true);
        });

        if (outerType != 'broadcast' || innerType != 'single subscription') {
          // A single subscription inner stream in a broadcast outer stream is
          // not canceled.
          test('cancels inner subscriptions if output canceled', () async {
            outerController
              ..add(1)
              ..add(2);
            await Future<void>(() {});
            await subscription.cancel();
            expect(innerCanceled, [true, true]);
          });
        }

        test('stays open if any inner stream is still open', () async {
          outerController.add(1);
          await outerController.close();
          await Future<void>(() {});
          expect(isDone, false);
        });

        test('stays open if outer stream is still open', () async {
          outerController.add(1);
          await Future<void>(() {});
          await innerControllers[0].close();
          await Future<void>(() {});
          expect(isDone, false);
        });

        test('closes after all inner streams and outer stream close', () async {
          outerController.add(1);
          await Future<void>(() {});
          await innerControllers[0].close();
          await outerController.close();
          await Future<void>(() {});
          expect(isDone, true);
        });

        if (outerType == 'broadcast') {
          test('multiple listerns all get values', () async {
            var otherValues = <String>[];
            transformed.listen(otherValues.add);
            outerController.add(1);
            await Future<void>(() {});
            innerControllers[0].add('First');
            await Future<void>(() {});
            expect(emittedValues, ['First']);
            expect(otherValues, ['First']);
          });

          test('multiple listeners get closed', () async {
            var otherDone = false;
            transformed.listen(null, onDone: () => otherDone = true);
            outerController.add(1);
            await Future<void>(() {});
            await innerControllers[0].close();
            await outerController.close();
            await Future<void>(() {});
            expect(isDone, true);
            expect(otherDone, true);
          });

          test('can cancel and relisten', () async {
            outerController
              ..add(1)
              ..add(2);
            await Future(() {});
            innerControllers[0].add('First');
            innerControllers[1].add('Second');
            await Future(() {});
            await subscription.cancel();
            innerControllers[0].add('Ignored');
            await Future(() {});
            subscription = transformed.listen(emittedValues.add);
            innerControllers[0].add('Also ignored');
            outerController.add(3);
            await Future(() {});
            innerControllers[2].add('More');
            await Future(() {});
            expect(emittedValues, ['First', 'Second', 'More']);
          });
        }
      });
    }
  }
}
