// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:async/async.dart';
import 'package:test/test.dart';

void main() {
  group('single-subscription', () {
    late StreamGroup<String> streamGroup;
    setUp(() {
      streamGroup = StreamGroup<String>();
    });

    test('buffers events from multiple sources', () async {
      var controller1 = StreamController<String>();
      streamGroup.add(controller1.stream);
      controller1.add('first');
      controller1.close();

      var controller2 = StreamController<String>();
      streamGroup.add(controller2.stream);
      controller2.add('second');
      controller2.close();

      await flushMicrotasks();

      expect(streamGroup.close(), completes);

      expect(streamGroup.stream.toList(),
          completion(unorderedEquals(['first', 'second'])));
    });

    test('buffers errors from multiple sources', () async {
      var controller1 = StreamController<String>();
      streamGroup.add(controller1.stream);
      controller1.addError('first');
      controller1.close();

      var controller2 = StreamController<String>();
      streamGroup.add(controller2.stream);
      controller2.addError('second');
      controller2.close();

      await flushMicrotasks();

      expect(streamGroup.close(), completes);

      var transformed = streamGroup.stream.transform(
          StreamTransformer<String, String>.fromHandlers(
              handleError: (error, _, sink) => sink.add('error: $error')));
      expect(transformed.toList(),
          completion(equals(['error: first', 'error: second'])));
    });

    test('buffers events and errors together', () async {
      var controller = StreamController<String>();
      streamGroup.add(controller.stream);

      controller.add('first');
      controller.addError('second');
      controller.add('third');
      controller.addError('fourth');
      controller.addError('fifth');
      controller.add('sixth');
      controller.close();

      await flushMicrotasks();

      expect(streamGroup.close(), completes);

      var transformed = streamGroup.stream.transform(
          StreamTransformer<String, String>.fromHandlers(
              handleData: (data, sink) => sink.add('data: $data'),
              handleError: (error, _, sink) => sink.add('error: $error')));
      expect(
          transformed.toList(),
          completion(equals([
            'data: first',
            'error: second',
            'data: third',
            'error: fourth',
            'error: fifth',
            'data: sixth'
          ])));
    });

    test("emits events once there's a listener", () {
      var controller = StreamController<String>();
      streamGroup.add(controller.stream);

      expect(
          streamGroup.stream.toList(), completion(equals(['first', 'second'])));

      controller.add('first');
      controller.add('second');
      controller.close();

      expect(streamGroup.close(), completes);
    });

    test("doesn't buffer events from a broadcast stream", () async {
      var controller = StreamController<String>.broadcast();
      streamGroup.add(controller.stream);

      controller.add('first');
      controller.add('second');
      controller.close();

      await flushMicrotasks();

      expect(streamGroup.close(), completes);
      expect(streamGroup.stream.toList(), completion(isEmpty));
    });

    test('when paused, buffers events from a broadcast stream', () async {
      var controller = StreamController<String>.broadcast();
      streamGroup.add(controller.stream);

      var events = [];
      var subscription = streamGroup.stream.listen(events.add);
      subscription.pause();

      controller.add('first');
      controller.add('second');
      controller.close();
      await flushMicrotasks();

      subscription.resume();
      expect(streamGroup.close(), completes);
      await flushMicrotasks();

      expect(events, equals(['first', 'second']));
    });

    test("emits events from a broadcast stream once there's a listener", () {
      var controller = StreamController<String>.broadcast();
      streamGroup.add(controller.stream);

      expect(
          streamGroup.stream.toList(), completion(equals(['first', 'second'])));

      controller.add('first');
      controller.add('second');
      controller.close();

      expect(streamGroup.close(), completes);
    });

    test('forwards cancel errors', () async {
      var subscription = streamGroup.stream.listen(null);

      var controller = StreamController<String>(onCancel: () => throw 'error');
      streamGroup.add(controller.stream);
      await flushMicrotasks();

      expect(subscription.cancel(), throwsA('error'));
    });

    test('forwards a cancel future', () async {
      var subscription = streamGroup.stream.listen(null);

      var completer = Completer();
      var controller =
          StreamController<String>(onCancel: () => completer.future);
      streamGroup.add(controller.stream);
      await flushMicrotasks();

      var fired = false;
      subscription.cancel().then((_) => fired = true);

      await flushMicrotasks();
      expect(fired, isFalse);

      completer.complete();
      await flushMicrotasks();
      expect(fired, isTrue);
    });

    test(
        'add() while active pauses the stream if the group is paused, then '
        'resumes once the group resumes', () async {
      var subscription = streamGroup.stream.listen(null);
      await flushMicrotasks();

      var paused = false;
      var controller = StreamController<String>(
          onPause: () => paused = true, onResume: () => paused = false);

      subscription.pause();
      await flushMicrotasks();

      streamGroup.add(controller.stream);
      await flushMicrotasks();
      expect(paused, isTrue);

      subscription.resume();
      await flushMicrotasks();
      expect(paused, isFalse);
    });

    group('add() while canceled', () {
      setUp(() async {
        streamGroup.stream.listen(null).cancel();
        await flushMicrotasks();
      });

      test('immediately listens to and cancels the stream', () async {
        var listened = false;
        var canceled = false;
        var controller = StreamController<String>(onListen: () {
          listened = true;
        }, onCancel: expectAsync0(() {
          expect(listened, isTrue);
          canceled = true;
        }));

        streamGroup.add(controller.stream);
        await flushMicrotasks();
        expect(listened, isTrue);
        expect(canceled, isTrue);
      });

      test('forwards cancel errors', () {
        var controller =
            StreamController<String>(onCancel: () => throw 'error');

        expect(streamGroup.add(controller.stream), throwsA('error'));
      });

      test('forwards a cancel future', () async {
        var completer = Completer();
        var controller =
            StreamController<String>(onCancel: () => completer.future);

        var fired = false;
        streamGroup.add(controller.stream)!.then((_) => fired = true);

        await flushMicrotasks();
        expect(fired, isFalse);

        completer.complete();
        await flushMicrotasks();
        expect(fired, isTrue);
      });
    });
  });

  group('broadcast', () {
    late StreamGroup<String> streamGroup;
    setUp(() {
      streamGroup = StreamGroup<String>.broadcast();
    });

    test('buffers events from multiple sources', () async {
      var controller1 = StreamController<String>();
      streamGroup.add(controller1.stream);
      controller1.add('first');
      controller1.close();

      var controller2 = StreamController<String>();
      streamGroup.add(controller2.stream);
      controller2.add('second');
      controller2.close();

      await flushMicrotasks();

      expect(streamGroup.close(), completes);

      expect(
          streamGroup.stream.toList(), completion(equals(['first', 'second'])));
    });

    test("emits events from multiple sources once there's a listener", () {
      var controller1 = StreamController<String>();
      streamGroup.add(controller1.stream);

      var controller2 = StreamController<String>();
      streamGroup.add(controller2.stream);

      expect(
          streamGroup.stream.toList(), completion(equals(['first', 'second'])));

      controller1.add('first');
      controller2.add('second');
      controller1.close();
      controller2.close();

      expect(streamGroup.close(), completes);
    });

    test("doesn't buffer events once a listener has been added and removed",
        () async {
      var controller = StreamController<String>();
      streamGroup.add(controller.stream);

      streamGroup.stream.listen(null).cancel();
      await flushMicrotasks();

      controller.add('first');
      controller.addError('second');
      controller.close();

      await flushMicrotasks();

      expect(streamGroup.close(), completes);
      expect(streamGroup.stream.toList(), completion(isEmpty));
    });

    test("doesn't buffer events from a broadcast stream", () async {
      var controller = StreamController<String>.broadcast();
      streamGroup.add(controller.stream);
      controller.add('first');
      controller.addError('second');
      controller.close();

      await flushMicrotasks();

      expect(streamGroup.close(), completes);
      expect(streamGroup.stream.toList(), completion(isEmpty));
    });

    test("emits events from a broadcast stream once there's a listener", () {
      var controller = StreamController<String>.broadcast();
      streamGroup.add(controller.stream);

      expect(
          streamGroup.stream.toList(), completion(equals(['first', 'second'])));

      controller.add('first');
      controller.add('second');
      controller.close();

      expect(streamGroup.close(), completes);
    });

    test('cancels and re-listens broadcast streams', () async {
      var subscription = streamGroup.stream.listen(null);

      var controller = StreamController<String>.broadcast();

      streamGroup.add(controller.stream);
      await flushMicrotasks();
      expect(controller.hasListener, isTrue);

      subscription.cancel();
      await flushMicrotasks();
      expect(controller.hasListener, isFalse);

      streamGroup.stream.listen(null);
      await flushMicrotasks();
      expect(controller.hasListener, isTrue);
    });

    test('never cancels single-subscription streams', () async {
      var subscription = streamGroup.stream.listen(null);

      var controller =
          StreamController<String>(onCancel: expectAsync0(() {}, count: 0));

      streamGroup.add(controller.stream);
      await flushMicrotasks();

      subscription.cancel();
      await flushMicrotasks();

      streamGroup.stream.listen(null);
      await flushMicrotasks();
    });

    test('drops events from a single-subscription stream while dormant',
        () async {
      var events = [];
      var subscription = streamGroup.stream.listen(events.add);

      var controller = StreamController<String>();
      streamGroup.add(controller.stream);
      await flushMicrotasks();

      controller.add('first');
      await flushMicrotasks();
      expect(events, equals(['first']));

      subscription.cancel();
      controller.add('second');
      await flushMicrotasks();
      expect(events, equals(['first']));

      streamGroup.stream.listen(events.add);
      controller.add('third');
      await flushMicrotasks();
      expect(events, equals(['first', 'third']));
    });

    test('a single-subscription stream can be removed while dormant', () async {
      var controller = StreamController<String>();
      streamGroup.add(controller.stream);
      await flushMicrotasks();

      streamGroup.stream.listen(null).cancel();
      await flushMicrotasks();

      streamGroup.remove(controller.stream);
      expect(controller.hasListener, isFalse);
      await flushMicrotasks();

      expect(streamGroup.stream.toList(), completion(isEmpty));
      controller.add('first');
      expect(streamGroup.close(), completes);
    });
  });

  group('regardless of type', () {
    group('single-subscription', () {
      regardlessOfType(() => StreamGroup<String>());
    });

    group('broadcast', () {
      regardlessOfType(() => StreamGroup<String>.broadcast());
    });
  });

  test('merge() emits events from all components streams', () async {
    var controller1 = StreamController<String>();
    var controller2 = StreamController<String>();

    var merged = StreamGroup.merge([controller1.stream, controller2.stream]);

    controller1.add('first');
    controller1.close();
    controller2.add('second');
    controller2.close();

    expect(await merged.toList(), ['first', 'second']);
  });

  test('mergeBroadcast() emits events from all components streams', () async {
    var controller1 = StreamController<String>();
    var controller2 = StreamController<String>();

    var merged =
        StreamGroup.mergeBroadcast([controller1.stream, controller2.stream]);

    controller1.add('first');
    controller1.close();
    controller2.add('second');
    controller2.close();

    expect(merged.isBroadcast, isTrue);

    expect(await merged.toList(), ['first', 'second']);
  });
}

void regardlessOfType(StreamGroup<String> Function() newStreamGroup) {
  late StreamGroup<String> streamGroup;
  setUp(() {
    streamGroup = newStreamGroup();
  });

  group('add()', () {
    group('while dormant', () {
      test("doesn't listen to the stream until the group is listened to",
          () async {
        var controller = StreamController<String>();

        expect(streamGroup.add(controller.stream), isNull);
        await flushMicrotasks();
        expect(controller.hasListener, isFalse);

        streamGroup.stream.listen(null);
        await flushMicrotasks();
        expect(controller.hasListener, isTrue);
      });

      test('is a no-op if the stream is already in the group', () {
        var controller = StreamController<String>();
        streamGroup.add(controller.stream);
        streamGroup.add(controller.stream);
        streamGroup.add(controller.stream);

        // If the stream was actually listened to multiple times, this would
        // throw a StateError.
        streamGroup.stream.listen(null);
      });
    });

    group('while active', () {
      setUp(() async {
        streamGroup.stream.listen(null);
        await flushMicrotasks();
      });

      test('listens to the stream immediately', () async {
        var controller = StreamController<String>();

        expect(streamGroup.add(controller.stream), isNull);
        await flushMicrotasks();
        expect(controller.hasListener, isTrue);
      });

      test('is a no-op if the stream is already in the group', () async {
        var controller = StreamController<String>();

        // If the stream were actually listened to more than once, future
        // calls to [add] would throw [StateError]s.
        streamGroup.add(controller.stream);
        streamGroup.add(controller.stream);
        streamGroup.add(controller.stream);
      });
    });
  });

  group('remove()', () {
    group('while dormant', () {
      test("stops emitting events for a stream that's removed", () async {
        var controller = StreamController<String>();
        streamGroup.add(controller.stream);

        expect(streamGroup.stream.toList(), completion(equals(['first'])));

        controller.add('first');
        await flushMicrotasks();
        controller.add('second');

        expect(streamGroup.remove(controller.stream), completion(null));
        expect(streamGroup.close(), completes);
      });

      test('is a no-op for an unknown stream', () {
        var controller = StreamController<String>();
        expect(streamGroup.remove(controller.stream), isNull);
      });

      test('and closed closes the group when the last stream is removed',
          () async {
        var controller1 = StreamController<String>();
        var controller2 = StreamController<String>();

        streamGroup.add(controller1.stream);
        streamGroup.add(controller2.stream);
        await flushMicrotasks();

        expect(streamGroup.isClosed, isFalse);
        streamGroup.close();
        expect(streamGroup.isClosed, isTrue);

        streamGroup.remove(controller1.stream);
        await flushMicrotasks();

        streamGroup.remove(controller2.stream);
        await flushMicrotasks();

        expect(streamGroup.stream.toList(), completion(isEmpty));
      });
    });

    group('while listening', () {
      test("doesn't emit events from a removed stream", () {
        var controller = StreamController<String>();
        streamGroup.add(controller.stream);

        // The subscription to [controller.stream] is canceled synchronously, so
        // the first event is dropped even though it was added before the
        // removal. This is documented in [StreamGroup.remove].
        expect(streamGroup.stream.toList(), completion(isEmpty));

        controller.add('first');
        expect(streamGroup.remove(controller.stream), completion(null));
        controller.add('second');

        expect(streamGroup.close(), completes);
      });

      test("cancels the stream's subscription", () async {
        var controller = StreamController<String>();
        streamGroup.add(controller.stream);

        streamGroup.stream.listen(null);
        await flushMicrotasks();
        expect(controller.hasListener, isTrue);

        streamGroup.remove(controller.stream);
        await flushMicrotasks();
        expect(controller.hasListener, isFalse);
      });

      test('forwards cancel errors', () async {
        var controller =
            StreamController<String>(onCancel: () => throw 'error');
        streamGroup.add(controller.stream);

        streamGroup.stream.listen(null);
        await flushMicrotasks();

        expect(streamGroup.remove(controller.stream), throwsA('error'));
      });

      test('forwards cancel futures', () async {
        var completer = Completer();
        var controller =
            StreamController<String>(onCancel: () => completer.future);

        streamGroup.stream.listen(null);
        await flushMicrotasks();

        streamGroup.add(controller.stream);
        await flushMicrotasks();

        var fired = false;
        streamGroup.remove(controller.stream)!.then((_) => fired = true);

        await flushMicrotasks();
        expect(fired, isFalse);

        completer.complete();
        await flushMicrotasks();
        expect(fired, isTrue);
      });

      test('is a no-op for an unknown stream', () async {
        var controller = StreamController<String>();
        streamGroup.stream.listen(null);
        await flushMicrotasks();

        expect(streamGroup.remove(controller.stream), isNull);
      });

      test('and closed closes the group when the last stream is removed',
          () async {
        var done = false;
        streamGroup.stream.listen(null, onDone: () => done = true);
        await flushMicrotasks();

        var controller1 = StreamController<String>();
        var controller2 = StreamController<String>();

        streamGroup.add(controller1.stream);
        streamGroup.add(controller2.stream);
        await flushMicrotasks();

        streamGroup.close();

        streamGroup.remove(controller1.stream);
        await flushMicrotasks();
        expect(done, isFalse);

        streamGroup.remove(controller2.stream);
        await flushMicrotasks();
        expect(done, isTrue);
      });
    });
  });

  group('close()', () {
    group('while dormant', () {
      test('if there are no streams, closes the group', () {
        expect(streamGroup.close(), completes);
        expect(streamGroup.stream.toList(), completion(isEmpty));
      });

      test(
          'if there are streams, closes the group once those streams close '
          "and there's a listener", () async {
        var controller1 = StreamController<String>();
        var controller2 = StreamController<String>();

        streamGroup.add(controller1.stream);
        streamGroup.add(controller2.stream);
        await flushMicrotasks();

        streamGroup.close();

        controller1.close();
        controller2.close();
        expect(streamGroup.stream.toList(), completion(isEmpty));
      });
    });

    group('while active', () {
      test('if there are no streams, closes the group', () {
        expect(streamGroup.stream.toList(), completion(isEmpty));
        expect(streamGroup.close(), completes);
      });

      test('if there are streams, closes the group once those streams close',
          () async {
        var done = false;
        streamGroup.stream.listen(null, onDone: () => done = true);
        await flushMicrotasks();

        var controller1 = StreamController<String>();
        var controller2 = StreamController<String>();

        streamGroup.add(controller1.stream);
        streamGroup.add(controller2.stream);
        await flushMicrotasks();

        streamGroup.close();
        await flushMicrotasks();
        expect(done, isFalse);

        controller1.close();
        await flushMicrotasks();
        expect(done, isFalse);

        controller2.close();
        await flushMicrotasks();
        expect(done, isTrue);
      });
    });

    test('returns a Future that completes once all events are dispatched',
        () async {
      var events = [];
      streamGroup.stream.listen(events.add);

      var controller = StreamController<String>();
      streamGroup.add(controller.stream);
      await flushMicrotasks();

      // Add a bunch of events. Each of these will get dispatched in a
      // separate microtask, so we can test that [close] only completes once
      // all of them have dispatched.
      controller.add('one');
      controller.add('two');
      controller.add('three');
      controller.add('four');
      controller.add('five');
      controller.add('six');
      controller.close();

      await streamGroup.close();
      expect(events, equals(['one', 'two', 'three', 'four', 'five', 'six']));
    });
  });

  group('onIdle', () {
    test('emits an event when the last pending stream emits done', () async {
      streamGroup.stream.listen(null);

      var idle = false;
      streamGroup.onIdle.listen((_) => idle = true);

      var controller1 = StreamController<String>();
      var controller2 = StreamController<String>();
      var controller3 = StreamController<String>();

      streamGroup.add(controller1.stream);
      streamGroup.add(controller2.stream);
      streamGroup.add(controller3.stream);

      await flushMicrotasks();
      expect(idle, isFalse);
      expect(streamGroup.isIdle, isFalse);

      controller1.close();
      await flushMicrotasks();
      expect(idle, isFalse);
      expect(streamGroup.isIdle, isFalse);

      controller2.close();
      await flushMicrotasks();
      expect(idle, isFalse);
      expect(streamGroup.isIdle, isFalse);

      controller3.close();
      await flushMicrotasks();
      expect(idle, isTrue);
      expect(streamGroup.isIdle, isTrue);
    });

    test('emits an event each time it becomes idle', () async {
      streamGroup.stream.listen(null);

      var idle = false;
      streamGroup.onIdle.listen((_) => idle = true);

      var controller = StreamController<String>();
      streamGroup.add(controller.stream);

      controller.close();
      await flushMicrotasks();
      expect(idle, isTrue);
      expect(streamGroup.isIdle, isTrue);

      idle = false;
      controller = StreamController<String>();
      streamGroup.add(controller.stream);

      await flushMicrotasks();
      expect(idle, isFalse);
      expect(streamGroup.isIdle, isFalse);

      controller.close();
      await flushMicrotasks();
      expect(idle, isTrue);
      expect(streamGroup.isIdle, isTrue);
    });

    test('emits an event when the group closes', () async {
      // It's important that the order of events here stays consistent over
      // time, since code may rely on it in subtle ways. Note that this is *not*
      // an official guarantee, so the authors of `async` are free to change
      // this behavior if they need to.
      var idle = false;
      var onIdleDone = false;
      var streamClosed = false;

      streamGroup.onIdle.listen(expectAsync1((_) {
        expect(streamClosed, isFalse);
        idle = true;
      }), onDone: expectAsync0(() {
        expect(idle, isTrue);
        expect(streamClosed, isTrue);
        onIdleDone = true;
      }));

      streamGroup.stream.drain().then(expectAsync1((_) {
        expect(idle, isTrue);
        expect(onIdleDone, isFalse);
        streamClosed = true;
      }));

      var controller = StreamController<String>();
      streamGroup.add(controller.stream);
      streamGroup.close();

      await flushMicrotasks();
      expect(idle, isFalse);
      expect(streamGroup.isIdle, isFalse);

      controller.close();
      await flushMicrotasks();
      expect(idle, isTrue);
      expect(streamGroup.isIdle, isTrue);
      expect(streamClosed, isTrue);
    });
  });
}

/// Wait for all microtasks to complete.
Future flushMicrotasks() => Future.delayed(Duration.zero);
