// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import "dart:async";

import "package:async/async.dart" show SubscriptionStream;
import "package:test/test.dart";

import "utils.dart";

main() {
  test("subscription stream of an entire subscription", () async {
    var stream = createStream();
    var subscription = stream.listen(null);
    var subscriptionStream = new SubscriptionStream<int>(subscription);
    await flushMicrotasks();
    expect(subscriptionStream.toList(), completion([1, 2, 3, 4]));
  });

  test("subscription stream after two events", () async {
    var stream = createStream();
    var skips = 0;
    var completer = new Completer();
    StreamSubscription<int> subscription;
    subscription = stream.listen((value) {
      ++skips;
      expect(value, skips);
      if (skips == 2) {
        completer.complete(new SubscriptionStream<int>(subscription));
      }
    });
    var subscriptionStream = await completer.future;
    await flushMicrotasks();
    expect(subscriptionStream.toList(), completion([3, 4]));
  });

  test("listening twice fails", () async {
    var stream = createStream();
    var sourceSubscription = stream.listen(null);
    var subscriptionStream = new SubscriptionStream<int>(sourceSubscription);
    var subscription = subscriptionStream.listen(null);
    expect(() => subscriptionStream.listen(null), throwsA(anything));
    await subscription.cancel();
  });

  test("pause and cancel passed through to original stream", () async {
    var controller = new StreamController(onCancel: () async => 42);
    var sourceSubscription = controller.stream.listen(null);
    var subscriptionStream = new SubscriptionStream(sourceSubscription);
    expect(controller.isPaused, isTrue);
    dynamic lastEvent;
    var subscription = subscriptionStream.listen((value) {
      lastEvent = value;
    });
    controller.add(1);

    await flushMicrotasks();
    expect(lastEvent, 1);
    expect(controller.isPaused, isFalse);

    subscription.pause();
    expect(controller.isPaused, isTrue);

    subscription.resume();
    expect(controller.isPaused, isFalse);

    expect(await subscription.cancel(), 42);
    expect(controller.hasListener, isFalse);
  });

  group("cancelOnError source:", () {
    for (var sourceCancels in [false, true]) {
      group("${sourceCancels ? "yes" : "no"}:", () {
        SubscriptionStream subscriptionStream;
        Future onCancel; // Completes if source stream is canceled before done.
        setUp(() {
          var cancelCompleter = new Completer();
          var source = createErrorStream(cancelCompleter);
          onCancel = cancelCompleter.future;
          var sourceSubscription =
              source.listen(null, cancelOnError: sourceCancels);
          subscriptionStream = new SubscriptionStream<int>(sourceSubscription);
        });

        test("- subscriptionStream: no", () async {
          var done = new Completer();
          var events = [];
          subscriptionStream.listen(events.add,
              onError: events.add, onDone: done.complete, cancelOnError: false);
          var expected = [1, 2, "To err is divine!"];
          if (sourceCancels) {
            await onCancel;
            // And [done] won't complete at all.
            bool isDone = false;
            done.future.then((_) {
              isDone = true;
            });
            await new Future.delayed(const Duration(milliseconds: 5));
            expect(isDone, false);
          } else {
            expected.add(4);
            await done.future;
          }
          expect(events, expected);
        });

        test("- subscriptionStream: yes", () async {
          var completer = new Completer();
          var events = [];
          subscriptionStream.listen(events.add,
              onError: (value) {
                events.add(value);
                completer.complete();
              },
              onDone: () => throw "should not happen",
              cancelOnError: true);
          await completer.future;
          await flushMicrotasks();
          expect(events, [1, 2, "To err is divine!"]);
        });
      });
    }

    for (var cancelOnError in [false, true]) {
      group(cancelOnError ? "yes" : "no", () {
        test("- no error, value goes to asFuture", () async {
          var stream = createStream();
          var sourceSubscription =
              stream.listen(null, cancelOnError: cancelOnError);
          var subscriptionStream = new SubscriptionStream(sourceSubscription);
          var subscription =
              subscriptionStream.listen(null, cancelOnError: cancelOnError);
          expect(subscription.asFuture(42), completion(42));
        });

        test("- error goes to asFuture", () async {
          var stream = createErrorStream();
          var sourceSubscription =
              stream.listen(null, cancelOnError: cancelOnError);
          var subscriptionStream = new SubscriptionStream(sourceSubscription);

          var subscription =
              subscriptionStream.listen(null, cancelOnError: cancelOnError);
          expect(subscription.asFuture(), throwsA(anything));
        });
      });
    }
  });
}

Stream<int> createStream() async* {
  yield 1;
  await flushMicrotasks();
  yield 2;
  await flushMicrotasks();
  yield 3;
  await flushMicrotasks();
  yield 4;
}

Stream<int> createErrorStream([Completer onCancel]) async* {
  bool canceled = true;
  try {
    yield 1;
    await flushMicrotasks();
    yield 2;
    await flushMicrotasks();
    yield* new Future<int>.error("To err is divine!").asStream();
    await flushMicrotasks();
    yield 4;
    await flushMicrotasks();
    canceled = false;
  } finally {
    // Completes before the "done", but should be after all events.
    if (canceled && onCancel != null) {
      await flushMicrotasks();
      onCancel.complete();
    }
  }
}

Stream<int> createLongStream() async* {
  for (int i = 0; i < 200; i++) yield i;
}
