// Copyright (c) 2017, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';

import 'package:stream_transform/stream_transform.dart';
import 'package:test/test.dart';

import 'utils.dart';

void main() {
  late StreamController<void> trigger;
  late StreamController<int> values;
  late List<List<int>> emittedValues;
  late bool valuesCanceled;
  late bool triggerCanceled;
  late bool triggerPaused;
  late bool isDone;
  late List<String> errors;
  late Stream<List<int>> transformed;
  late StreamSubscription<List<int>> subscription;

  void setUpForStreamTypes(String triggerType, String valuesType,
      {required bool longPoll}) {
    valuesCanceled = false;
    triggerCanceled = false;
    triggerPaused = false;
    trigger = createController(triggerType)
      ..onCancel = () {
        triggerCanceled = true;
      };
    if (triggerType == 'single subscription') {
      trigger.onPause = () {
        triggerPaused = true;
      };
    }
    values = createController(valuesType)
      ..onCancel = () {
        valuesCanceled = true;
      };
    emittedValues = [];
    errors = [];
    isDone = false;
    transformed = values.stream.buffer(trigger.stream, longPoll: longPoll);
    subscription =
        transformed.listen(emittedValues.add, onError: errors.add, onDone: () {
      isDone = true;
    });
  }

  for (var triggerType in streamTypes) {
    for (var valuesType in streamTypes) {
      group('Trigger type: [$triggerType], Values type: [$valuesType]', () {
        group('general behavior', () {
          setUp(() {
            setUpForStreamTypes(triggerType, valuesType, longPoll: true);
          });

          test('does not emit before `trigger`', () async {
            values.add(1);
            await Future(() {});
            expect(emittedValues, isEmpty);
            trigger.add(null);
            await Future(() {});
            expect(emittedValues, [
              [1]
            ]);
          });

          test('groups values between trigger', () async {
            values
              ..add(1)
              ..add(2);
            await Future(() {});
            trigger.add(null);
            values
              ..add(3)
              ..add(4);
            await Future(() {});
            trigger.add(null);
            await Future(() {});
            expect(emittedValues, [
              [1, 2],
              [3, 4]
            ]);
          });

          test('cancels value subscription when output canceled', () async {
            expect(valuesCanceled, false);
            await subscription.cancel();
            expect(valuesCanceled, true);
          });

          test('closes when trigger ends', () async {
            expect(isDone, false);
            await trigger.close();
            await Future(() {});
            expect(isDone, true);
          });

          test('closes after outputting final values when source closes',
              () async {
            expect(isDone, false);
            values.add(1);
            await values.close();
            expect(isDone, false);
            trigger.add(null);
            await Future(() {});
            expect(emittedValues, [
              [1]
            ]);
            expect(isDone, true);
          });

          test('closes when source closes and there are no buffered', () async {
            expect(isDone, false);
            await values.close();
            await Future(() {});
            expect(isDone, true);
          });

          test('forwards errors from trigger', () async {
            trigger.addError('error');
            await Future(() {});
            expect(errors, ['error']);
          });

          test('forwards errors from values', () async {
            values.addError('error');
            await Future(() {});
            expect(errors, ['error']);
          });
        });

        group('long polling', () {
          setUp(() {
            setUpForStreamTypes(triggerType, valuesType, longPoll: true);
          });

          test('emits immediately if trigger emits before a value', () async {
            trigger.add(null);
            await Future(() {});
            expect(emittedValues, isEmpty);
            values.add(1);
            await Future(() {});
            expect(emittedValues, [
              [1]
            ]);
          });

          test('two triggers in a row - emit buffere then emit next value',
              () async {
            values
              ..add(1)
              ..add(2);
            await Future(() {});
            trigger
              ..add(null)
              ..add(null);
            await Future(() {});
            values.add(3);
            await Future(() {});
            expect(emittedValues, [
              [1, 2],
              [3]
            ]);
          });

          test('pre-emptive trigger then trigger after values', () async {
            trigger.add(null);
            await Future(() {});
            values
              ..add(1)
              ..add(2);
            await Future(() {});
            trigger.add(null);
            await Future(() {});
            expect(emittedValues, [
              [1],
              [2]
            ]);
          });

          test('multiple pre-emptive triggers, only emits first value',
              () async {
            trigger
              ..add(null)
              ..add(null);
            await Future(() {});
            values
              ..add(1)
              ..add(2);
            await Future(() {});
            expect(emittedValues, [
              [1]
            ]);
          });

          test('closes if there is no waiting long poll when source closes',
              () async {
            expect(isDone, false);
            values.add(1);
            trigger.add(null);
            await values.close();
            await Future(() {});
            expect(isDone, true);
          });

          test('waits to emit if there waiting long poll when trigger closes',
              () async {
            trigger.add(null);
            await trigger.close();
            expect(isDone, false);
            values.add(1);
            await Future(() {});
            expect(emittedValues, [
              [1]
            ]);
            expect(isDone, true);
          });
        });

        group('immediate polling', () {
          setUp(() {
            setUpForStreamTypes(triggerType, valuesType, longPoll: false);
          });

          test('emits empty list before values', () async {
            trigger.add(null);
            await Future(() {});
            expect(emittedValues, [<int>[]]);
          });

          test('emits empty list after emitting values', () async {
            values
              ..add(1)
              ..add(2);
            await Future(() {});
            trigger
              ..add(null)
              ..add(null);
            await Future(() {});
            expect(emittedValues, [
              [1, 2],
              <int>[]
            ]);
          });
        });
      });
    }
  }

  test('always cancels trigger if values is singlesubscription', () async {
    setUpForStreamTypes('broadcast', 'single subscription', longPoll: true);
    expect(triggerCanceled, false);
    await subscription.cancel();
    expect(triggerCanceled, true);

    setUpForStreamTypes('single subscription', 'single subscription',
        longPoll: true);
    expect(triggerCanceled, false);
    await subscription.cancel();
    expect(triggerCanceled, true);
  });

  test('cancels trigger if trigger is broadcast', () async {
    setUpForStreamTypes('broadcast', 'broadcast', longPoll: true);
    expect(triggerCanceled, false);
    await subscription.cancel();
    expect(triggerCanceled, true);
  });

  test('pauses single subscription trigger for broadcast values', () async {
    setUpForStreamTypes('single subscription', 'broadcast', longPoll: true);
    expect(triggerCanceled, false);
    expect(triggerPaused, false);
    await subscription.cancel();
    expect(triggerCanceled, false);
    expect(triggerPaused, true);
  });

  for (var triggerType in streamTypes) {
    test('cancel and relisten with [$triggerType] trigger', () async {
      setUpForStreamTypes(triggerType, 'broadcast', longPoll: true);
      values.add(1);
      trigger.add(null);
      await Future(() {});
      expect(emittedValues, [
        [1]
      ]);
      await subscription.cancel();
      values.add(2);
      trigger.add(null);
      await Future(() {});
      subscription = transformed.listen(emittedValues.add);
      values.add(3);
      trigger.add(null);
      await Future(() {});
      expect(emittedValues, [
        [1],
        [3]
      ]);
    });
  }
}
