// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:async/async.dart';
import 'package:test/test.dart';

import 'utils.dart';

void main() {
  group("with no callbacks", () {
    test("forwards cancellation", () async {
      var isCanceled = false;
      var cancelCompleter = new Completer();
      var controller = new StreamController(onCancel: expectAsync0(() {
        isCanceled = true;
        return cancelCompleter.future;
      }));
      var subscription = controller.stream
          .transform(subscriptionTransformer())
          .listen(expectAsync1((_) {}, count: 0));

      var cancelFired = false;
      subscription.cancel().then(expectAsync1((_) {
        cancelFired = true;
      }));

      await flushMicrotasks();
      expect(isCanceled, isTrue);
      expect(cancelFired, isFalse);

      cancelCompleter.complete();
      await flushMicrotasks();
      expect(cancelFired, isTrue);

      // This shouldn't call the onCancel callback again.
      expect(subscription.cancel(), completes);
    });

    test("forwards pausing and resuming", () async {
      var controller = new StreamController();
      var subscription = controller.stream
          .transform(subscriptionTransformer())
          .listen(expectAsync1((_) {}, count: 0));

      subscription.pause();
      await flushMicrotasks();
      expect(controller.isPaused, isTrue);

      subscription.pause();
      await flushMicrotasks();
      expect(controller.isPaused, isTrue);

      subscription.resume();
      await flushMicrotasks();
      expect(controller.isPaused, isTrue);

      subscription.resume();
      await flushMicrotasks();
      expect(controller.isPaused, isFalse);
    });

    test("forwards pausing with a resume future", () async {
      var controller = new StreamController();
      var subscription = controller.stream
          .transform(subscriptionTransformer())
          .listen(expectAsync1((_) {}, count: 0));

      var completer = new Completer();
      subscription.pause(completer.future);
      await flushMicrotasks();
      expect(controller.isPaused, isTrue);

      completer.complete();
      await flushMicrotasks();
      expect(controller.isPaused, isFalse);
    });
  });

  group("with a cancel callback", () {
    test("invokes the callback when the subscription is canceled", () async {
      var isCanceled = false;
      var callbackInvoked = false;
      var controller = new StreamController(onCancel: expectAsync0(() {
        isCanceled = true;
      }));
      var subscription = controller.stream.transform(
          subscriptionTransformer(handleCancel: expectAsync1((inner) {
        callbackInvoked = true;
        inner.cancel();
      }))).listen(expectAsync1((_) {}, count: 0));

      await flushMicrotasks();
      expect(callbackInvoked, isFalse);
      expect(isCanceled, isFalse);

      subscription.cancel();
      await flushMicrotasks();
      expect(callbackInvoked, isTrue);
      expect(isCanceled, isTrue);
    });

    test("invokes the callback once and caches its result", () async {
      var completer = new Completer();
      var controller = new StreamController();
      var subscription = controller.stream
          .transform(subscriptionTransformer(
              handleCancel: expectAsync1((inner) => completer.future)))
          .listen(expectAsync1((_) {}, count: 0));

      var cancelFired1 = false;
      subscription.cancel().then(expectAsync1((_) {
        cancelFired1 = true;
      }));

      var cancelFired2 = false;
      subscription.cancel().then(expectAsync1((_) {
        cancelFired2 = true;
      }));

      await flushMicrotasks();
      expect(cancelFired1, isFalse);
      expect(cancelFired2, isFalse);

      completer.complete();
      await flushMicrotasks();
      expect(cancelFired1, isTrue);
      expect(cancelFired2, isTrue);
    });
  });

  group("with a pause callback", () {
    test("invokes the callback when pause is called", () async {
      var pauseCount = 0;
      var controller = new StreamController();
      var subscription = controller.stream
          .transform(subscriptionTransformer(
              handlePause: expectAsync1((inner) {
            pauseCount++;
            inner.pause();
          }, count: 3)))
          .listen(expectAsync1((_) {}, count: 0));

      await flushMicrotasks();
      expect(pauseCount, equals(0));

      subscription.pause();
      await flushMicrotasks();
      expect(pauseCount, equals(1));

      subscription.pause();
      await flushMicrotasks();
      expect(pauseCount, equals(2));

      subscription.resume();
      subscription.resume();
      await flushMicrotasks();
      expect(pauseCount, equals(2));

      subscription.pause();
      await flushMicrotasks();
      expect(pauseCount, equals(3));
    });

    test("doesn't invoke the callback when the subscription has been canceled",
        () async {
      var controller = new StreamController();
      var subscription = controller.stream
          .transform(subscriptionTransformer(
              handlePause: expectAsync1((_) {}, count: 0)))
          .listen(expectAsync1((_) {}, count: 0));

      subscription.cancel();
      subscription.pause();
      subscription.pause();
      subscription.pause();
    });
  });

  group("with a resume callback", () {
    test("invokes the callback when resume is called", () async {
      var resumeCount = 0;
      var controller = new StreamController();
      var subscription = controller.stream
          .transform(subscriptionTransformer(
              handleResume: expectAsync1((inner) {
            resumeCount++;
            inner.resume();
          }, count: 3)))
          .listen(expectAsync1((_) {}, count: 0));

      await flushMicrotasks();
      expect(resumeCount, equals(0));

      subscription.resume();
      await flushMicrotasks();
      expect(resumeCount, equals(1));

      subscription.pause();
      subscription.pause();
      await flushMicrotasks();
      expect(resumeCount, equals(1));

      subscription.resume();
      await flushMicrotasks();
      expect(resumeCount, equals(2));

      subscription.resume();
      await flushMicrotasks();
      expect(resumeCount, equals(3));
    });

    test("invokes the callback when a resume future completes", () async {
      var resumed = false;
      var controller = new StreamController();
      var subscription = controller.stream.transform(
          subscriptionTransformer(handleResume: expectAsync1((inner) {
        resumed = true;
        inner.resume();
      }))).listen(expectAsync1((_) {}, count: 0));

      var completer = new Completer();
      subscription.pause(completer.future);
      await flushMicrotasks();
      expect(resumed, isFalse);

      completer.complete();
      await flushMicrotasks();
      expect(resumed, isTrue);
    });

    test("doesn't invoke the callback when the subscription has been canceled",
        () async {
      var controller = new StreamController();
      var subscription = controller.stream
          .transform(subscriptionTransformer(
              handlePause: expectAsync1((_) {}, count: 0)))
          .listen(expectAsync1((_) {}, count: 0));

      subscription.cancel();
      subscription.resume();
      subscription.resume();
      subscription.resume();
    });
  });

  group("when the outer subscription is canceled but the inner is not", () {
    StreamSubscription subscription;
    setUp(() {
      var controller = new StreamController();
      subscription = controller.stream
          .transform(subscriptionTransformer(handleCancel: (_) {}))
          .listen(expectAsync1((_) {}, count: 0),
              onError: expectAsync2((_, __) {}, count: 0),
              onDone: expectAsync0(() {}, count: 0));
      subscription.cancel();
      controller.add(1);
      controller.addError("oh no!");
      controller.close();
    });

    test("doesn't call a new onData", () async {
      subscription.onData(expectAsync1((_) {}, count: 0));
      await flushMicrotasks();
    });

    test("doesn't call a new onError", () async {
      subscription.onError(expectAsync2((_, __) {}, count: 0));
      await flushMicrotasks();
    });

    test("doesn't call a new onDone", () async {
      subscription.onDone(expectAsync0(() {}, count: 0));
      await flushMicrotasks();
    });

    test("isPaused returns false", () {
      expect(subscription.isPaused, isFalse);
    });

    test("asFuture never completes", () async {
      subscription.asFuture().then(expectAsync1((_) {}, count: 0));
      await flushMicrotasks();
    });
  });
}
