// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'package:pedantic/pedantic.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart';

void main() {
  StreamChannelController controller;
  MultiChannel channel1;
  MultiChannel channel2;
  setUp(() {
    controller = StreamChannelController();
    channel1 = MultiChannel<int>(controller.local);
    channel2 = MultiChannel<int>(controller.foreign);
  });

  group('the default virtual channel', () {
    test('begins connected', () {
      var first = true;
      channel2.stream.listen(expectAsync1((message) {
        if (first) {
          expect(message, equals(1));
          first = false;
        } else {
          expect(message, equals(2));
        }
      }, count: 2));

      channel1.sink.add(1);
      channel1.sink.add(2);
    });

    test('closes the remote virtual channel when it closes', () {
      expect(channel2.stream.toList(), completion(isEmpty));
      expect(channel2.sink.done, completes);

      channel1.sink.close();
    });

    test('closes the local virtual channel when it closes', () {
      expect(channel1.stream.toList(), completion(isEmpty));
      expect(channel1.sink.done, completes);

      channel1.sink.close();
    });

    test(
        "doesn't closes the local virtual channel when the stream "
        'subscription is canceled', () {
      channel1.sink.done.then(expectAsync1((_) {}, count: 0));

      channel1.stream.listen((_) {}).cancel();

      // Ensure that there's enough time for the channel to close if it's going
      // to.
      return pumpEventQueue();
    });

    test(
        'closes the underlying channel when it closes without any other '
        'virtual channels', () {
      expect(controller.local.sink.done, completes);
      expect(controller.foreign.sink.done, completes);

      channel1.sink.close();
    });

    test(
        "doesn't close the underlying channel when it closes with other "
        'virtual channels', () {
      controller.local.sink.done.then(expectAsync1((_) {}, count: 0));
      controller.foreign.sink.done.then(expectAsync1((_) {}, count: 0));

      // Establish another virtual connection which should keep the underlying
      // connection open.
      channel2.virtualChannel(channel1.virtualChannel().id);
      channel1.sink.close();

      // Ensure that there's enough time for the underlying channel to complete
      // if it's going to.
      return pumpEventQueue();
    });
  });

  group('a locally-created virtual channel', () {
    VirtualChannel virtual1;
    VirtualChannel virtual2;
    setUp(() {
      virtual1 = channel1.virtualChannel();
      virtual2 = channel2.virtualChannel(virtual1.id);
    });

    test('sends messages only to the other virtual channel', () {
      var first = true;
      virtual2.stream.listen(expectAsync1((message) {
        if (first) {
          expect(message, equals(1));
          first = false;
        } else {
          expect(message, equals(2));
        }
      }, count: 2));

      // No other virtual channels should receive the message.
      for (var i = 0; i < 10; i++) {
        var virtual = channel2.virtualChannel(channel1.virtualChannel().id);
        virtual.stream.listen(expectAsync1((_) {}, count: 0));
      }
      channel2.stream.listen(expectAsync1((_) {}, count: 0));

      virtual1.sink.add(1);
      virtual1.sink.add(2);
    });

    test('closes the remote virtual channel when it closes', () {
      expect(virtual2.stream.toList(), completion(isEmpty));
      expect(virtual2.sink.done, completes);

      virtual1.sink.close();
    });

    test('closes the local virtual channel when it closes', () {
      expect(virtual1.stream.toList(), completion(isEmpty));
      expect(virtual1.sink.done, completes);

      virtual1.sink.close();
    });

    test(
        "doesn't closes the local virtual channel when the stream "
        'subscription is canceled', () {
      virtual1.sink.done.then(expectAsync1((_) {}, count: 0));
      virtual1.stream.listen((_) {}).cancel();

      // Ensure that there's enough time for the channel to close if it's going
      // to.
      return pumpEventQueue();
    });

    test(
        'closes the underlying channel when it closes without any other '
        'virtual channels', () async {
      // First close the default channel so we can test the new channel as the
      // last living virtual channel.
      unawaited(channel1.sink.close());

      await channel2.stream.toList();
      expect(controller.local.sink.done, completes);
      expect(controller.foreign.sink.done, completes);

      unawaited(virtual1.sink.close());
    });

    test(
        "doesn't close the underlying channel when it closes with other "
        'virtual channels', () {
      controller.local.sink.done.then(expectAsync1((_) {}, count: 0));
      controller.foreign.sink.done.then(expectAsync1((_) {}, count: 0));

      virtual1.sink.close();

      // Ensure that there's enough time for the underlying channel to complete
      // if it's going to.
      return pumpEventQueue();
    });

    test("doesn't conflict with a remote virtual channel", () {
      var virtual3 = channel2.virtualChannel();
      var virtual4 = channel1.virtualChannel(virtual3.id);

      // This is an implementation detail, but we assert it here to make sure
      // we're properly testing two channels with the same id.
      expect(virtual1.id, equals(virtual3.id));

      virtual2.stream
          .listen(expectAsync1((message) => expect(message, equals(1))));
      virtual4.stream
          .listen(expectAsync1((message) => expect(message, equals(2))));

      virtual1.sink.add(1);
      virtual3.sink.add(2);
    });
  });

  group('a remotely-created virtual channel', () {
    VirtualChannel virtual1;
    VirtualChannel virtual2;
    setUp(() {
      virtual1 = channel1.virtualChannel();
      virtual2 = channel2.virtualChannel(virtual1.id);
    });

    test('sends messages only to the other virtual channel', () {
      var first = true;
      virtual1.stream.listen(expectAsync1((message) {
        if (first) {
          expect(message, equals(1));
          first = false;
        } else {
          expect(message, equals(2));
        }
      }, count: 2));

      // No other virtual channels should receive the message.
      for (var i = 0; i < 10; i++) {
        var virtual = channel2.virtualChannel(channel1.virtualChannel().id);
        virtual.stream.listen(expectAsync1((_) {}, count: 0));
      }
      channel1.stream.listen(expectAsync1((_) {}, count: 0));

      virtual2.sink.add(1);
      virtual2.sink.add(2);
    });

    test('closes the remote virtual channel when it closes', () {
      expect(virtual1.stream.toList(), completion(isEmpty));
      expect(virtual1.sink.done, completes);

      virtual2.sink.close();
    });

    test('closes the local virtual channel when it closes', () {
      expect(virtual2.stream.toList(), completion(isEmpty));
      expect(virtual2.sink.done, completes);

      virtual2.sink.close();
    });

    test(
        "doesn't closes the local virtual channel when the stream "
        'subscription is canceled', () {
      virtual2.sink.done.then(expectAsync1((_) {}, count: 0));
      virtual2.stream.listen((_) {}).cancel();

      // Ensure that there's enough time for the channel to close if it's going
      // to.
      return pumpEventQueue();
    });

    test(
        'closes the underlying channel when it closes without any other '
        'virtual channels', () async {
      // First close the default channel so we can test the new channel as the
      // last living virtual channel.
      unawaited(channel2.sink.close());

      await channel1.stream.toList();
      expect(controller.local.sink.done, completes);
      expect(controller.foreign.sink.done, completes);

      unawaited(virtual2.sink.close());
    });

    test(
        "doesn't close the underlying channel when it closes with other "
        'virtual channels', () {
      controller.local.sink.done.then(expectAsync1((_) {}, count: 0));
      controller.foreign.sink.done.then(expectAsync1((_) {}, count: 0));

      virtual2.sink.close();

      // Ensure that there's enough time for the underlying channel to complete
      // if it's going to.
      return pumpEventQueue();
    });

    test("doesn't allow another virtual channel with the same id", () {
      expect(() => channel2.virtualChannel(virtual1.id), throwsArgumentError);
    });

    test('dispatches events received before the virtual channel is created',
        () async {
      virtual1 = channel1.virtualChannel();

      virtual1.sink.add(1);
      await pumpEventQueue();

      virtual1.sink.add(2);
      await pumpEventQueue();

      expect(channel2.virtualChannel(virtual1.id).stream, emitsInOrder([1, 2]));
    });

    test(
        'dispatches close events received before the virtual channel is '
        'created', () async {
      virtual1 = channel1.virtualChannel();

      unawaited(virtual1.sink.close());
      await pumpEventQueue();

      expect(channel2.virtualChannel(virtual1.id).stream.toList(),
          completion(isEmpty));
    });
  });

  group('when the underlying stream', () {
    VirtualChannel virtual1;
    VirtualChannel virtual2;
    setUp(() {
      virtual1 = channel1.virtualChannel();
      virtual2 = channel2.virtualChannel(virtual1.id);
    });

    test('closes, all virtual channels close', () {
      expect(channel1.stream.toList(), completion(isEmpty));
      expect(channel1.sink.done, completes);
      expect(channel2.stream.toList(), completion(isEmpty));
      expect(channel2.sink.done, completes);
      expect(virtual1.stream.toList(), completion(isEmpty));
      expect(virtual1.sink.done, completes);
      expect(virtual2.stream.toList(), completion(isEmpty));
      expect(virtual2.sink.done, completes);

      controller.local.sink.close();
    });

    test('closes, more virtual channels are created closed', () async {
      unawaited(channel2.sink.close());
      unawaited(virtual2.sink.close());

      // Wait for the existing channels to emit done events.
      await channel1.stream.toList();
      await virtual1.stream.toList();

      var virtual = channel1.virtualChannel();
      expect(virtual.stream.toList(), completion(isEmpty));
      expect(virtual.sink.done, completes);

      virtual = channel1.virtualChannel();
      expect(virtual.stream.toList(), completion(isEmpty));
      expect(virtual.sink.done, completes);
    });

    test('emits an error, the error is sent only to the default channel', () {
      channel1.stream.listen(expectAsync1((_) {}, count: 0),
          onError: expectAsync1((error) => expect(error, equals('oh no'))));
      virtual1.stream.listen(expectAsync1((_) {}, count: 0),
          onError: expectAsync1((_) {}, count: 0));

      controller.foreign.sink.addError('oh no');
    });
  });

  group('stream channel rules', () {
    group('for the main stream:', () {
      test(
          'closing the sink causes the stream to close before it emits any more '
          'events', () {
        channel1.sink.add(1);
        channel1.sink.add(2);
        channel1.sink.add(3);

        channel2.stream.listen(expectAsync1((message) {
          expect(message, equals(1));
          channel2.sink.close();
        }, count: 1));
      });

      test('after the stream closes, the sink ignores events', () async {
        unawaited(channel1.sink.close());

        // Wait for the done event to be delivered.
        await channel2.stream.toList();
        channel2.sink.add(1);
        channel2.sink.add(2);
        channel2.sink.add(3);
        unawaited(channel2.sink.close());

        // None of our channel.sink additions should make it to the other endpoint.
        channel1.stream.listen(expectAsync1((_) {}, count: 0));
        await pumpEventQueue();
      });

      test("canceling the stream's subscription has no effect on the sink",
          () async {
        unawaited(channel1.stream.listen(null).cancel());
        await pumpEventQueue();

        channel1.sink.add(1);
        channel1.sink.add(2);
        channel1.sink.add(3);
        unawaited(channel1.sink.close());
        expect(channel2.stream.toList(), completion(equals([1, 2, 3])));
      });

      test("canceling the stream's subscription doesn't stop a done event",
          () async {
        unawaited(channel1.stream.listen(null).cancel());
        await pumpEventQueue();

        unawaited(channel2.sink.close());
        await pumpEventQueue();

        channel1.sink.add(1);
        channel1.sink.add(2);
        channel1.sink.add(3);
        unawaited(channel1.sink.close());

        // The sink should be ignoring events because the channel closed.
        channel2.stream.listen(expectAsync1((_) {}, count: 0));
        await pumpEventQueue();
      });
    });

    group('for a virtual channel:', () {
      VirtualChannel virtual1;
      VirtualChannel virtual2;
      setUp(() {
        virtual1 = channel1.virtualChannel();
        virtual2 = channel2.virtualChannel(virtual1.id);
      });

      test(
          'closing the sink causes the stream to close before it emits any more '
          'events', () {
        virtual1.sink.add(1);
        virtual1.sink.add(2);
        virtual1.sink.add(3);

        virtual2.stream.listen(expectAsync1((message) {
          expect(message, equals(1));
          virtual2.sink.close();
        }, count: 1));
      });

      test('after the stream closes, the sink ignores events', () async {
        unawaited(virtual1.sink.close());

        // Wait for the done event to be delivered.
        await virtual2.stream.toList();
        virtual2.sink.add(1);
        virtual2.sink.add(2);
        virtual2.sink.add(3);
        unawaited(virtual2.sink.close());

        // None of our virtual.sink additions should make it to the other endpoint.
        virtual1.stream.listen(expectAsync1((_) {}, count: 0));
        await pumpEventQueue();
      });

      test("canceling the stream's subscription has no effect on the sink",
          () async {
        unawaited(virtual1.stream.listen(null).cancel());
        await pumpEventQueue();

        virtual1.sink.add(1);
        virtual1.sink.add(2);
        virtual1.sink.add(3);
        unawaited(virtual1.sink.close());
        expect(virtual2.stream.toList(), completion(equals([1, 2, 3])));
      });

      test("canceling the stream's subscription doesn't stop a done event",
          () async {
        unawaited(virtual1.stream.listen(null).cancel());
        await pumpEventQueue();

        unawaited(virtual2.sink.close());
        await pumpEventQueue();

        virtual1.sink.add(1);
        virtual1.sink.add(2);
        virtual1.sink.add(3);
        unawaited(virtual1.sink.close());

        // The sink should be ignoring events because the stream closed.
        virtual2.stream.listen(expectAsync1((_) {}, count: 0));
        await pumpEventQueue();
      });
    });
  });
}
