// Copyright (c) 2020, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import "package:expect/async_helper.dart";
import "package:expect/expect.dart";

extension StreamRepeatLatestExtension<T extends Object> on Stream<T> {
  Stream<T> repeatLatest() {
    var done = false;
    T? latest = null;
    var currentListeners = <MultiStreamController<T>>{};
    this.listen(
      (event) {
        latest = event;
        for (var listener in [...currentListeners]) listener.addSync(event);
      },
      onError: (Object error, StackTrace stack) {
        for (var listener in [...currentListeners])
          listener.addErrorSync(error, stack);
      },
      onDone: () {
        done = true;
        latest = null;
        for (var listener in currentListeners) listener.closeSync();
        currentListeners.clear();
      },
    );
    return Stream.multi((controller) {
      if (done) {
        controller.close();
        return;
      }
      currentListeners.add(controller);
      var latestValue = latest;
      if (latestValue != null) controller.add(latestValue);
      controller.onCancel = () {
        currentListeners.remove(controller);
      };
    });
  }
}

void main() {
  asyncStart();
  testStreamsIndependent();
  asyncTest(testStreamNonOverlap);
  asyncTest(testRepeatLatest);
  asyncTest(testIncorrectUse);
  asyncEnd();
}

/// Test that the streams can provide different events.
void testStreamsIndependent() {
  var log = <String>[];
  var index = 0;
  var multi = Stream<List<int>>.multi((c) {
    var id = ++index;
    log.add("$id");
    for (var i = 0; i < id + 1; i++) {
      c.add([id, i]);
    }
    c.close();
  });
  void logList(List<int> l) {
    log.add("${l.first}-${l.last}");
  }

  asyncStart();
  Future.wait([multi.forEach(logList), multi.forEach(logList)]).whenComplete(
    () {
      Expect.equals(7, log.length);
      for (var element in ["1", "1-0", "1-1", "2", "2-0", "2-1", "2-2"]) {
        Expect.isTrue(log.contains(element));
      }
      asyncEnd();
    },
  );
}

/// Test that stream can be listened to again after having no listener.
Future<void> testStreamNonOverlap() async {
  var completer = Completer<Object?>();
  MultiStreamController<int>? controller;
  var stream = Stream<int>.multi((c) {
    controller = c;
    c.onCancel = () {
      controller = null;
      if (!completer.isCompleted) completer.complete(null);
    };
  });
  for (var i in [1, 2, 3]) {
    var log = <Object?>[];
    var subscription = stream.listen(
      (v) {
        log.add(v);
        if (!completer.isCompleted) completer.complete(v);
      },
      onError: (e, s) {
        log.add(e);
        if (!completer.isCompleted) completer.complete(e);
      },
      onDone: () {
        log.add(null);
        if (!completer.isCompleted) completer.complete(null);
      },
    );
    Expect.isNotNull(controller);
    controller!.add(1);
    await completer.future;
    Expect.listEquals([1], log);

    completer = Completer();
    controller!.add(2);
    await completer.future;
    Expect.listEquals([1, 2], log);

    completer = Completer();
    if (i == 2) {
      subscription.cancel();
    } else {
      controller!.close();
    }
    await completer.future;
    Expect.listEquals([1, 2, if (i != 2) null], log);
  }
}

/// Test that the [Stream.repeatLatest] example code works as described.
Future<void> testRepeatLatest() async {
  var c = StreamController<int>();
  var repStream = c.stream.repeatLatest();

  var f1 = repStream.first;
  c.add(1);
  var v1 = await f1;
  Expect.equals(1, v1);

  var f2 = repStream.take(2).toList();
  c.add(2);
  var l2 = await f2;
  Expect.listEquals([1, 2], l2);

  var f3 = repStream.take(2).toList();
  c.add(3);
  var l3 = await f3;
  Expect.listEquals([2, 3], l3);
}

// Test that errors are thrown when required,
// and use after cancel is ignored.
Future<void> testIncorrectUse() async {
  {
    var lock = Completer();
    var lock2 = Completer();
    var stream = Stream.multi((c) async {
      c.add(2);
      await lock.future;
      Expect.isTrue(!c.hasListener);
      c.add(2);
      c.addError("Error");
      c.close();
      // No adding after close.
      Expect.throws<StateError>(() => c.add(3));
      Expect.throws<StateError>(() => c.addSync(3));
      Expect.throws<StateError>(() => c.addError("E"));
      Expect.throws<StateError>(() => c.addErrorSync("E"));
      Expect.throws<StateError>(() => c.addStream(Stream.empty()));
      lock2.complete();
    });
    await for (var v in stream) {
      Expect.equals(2, v);
      break; // Cancels subscription.
    }
    lock.complete();
    await lock2.future;
  }

  {
    var lock = Completer();
    var lock2 = Completer();
    var stream = Stream.multi((c) async {
      c.add(2);
      await lock.future;
      Expect.isTrue(!c.hasListener);
      c.addSync(2);
      c.addErrorSync("Error");
      c.closeSync();
      // No adding after close.
      Expect.throws<StateError>(() => c.add(3));
      Expect.throws<StateError>(() => c.addSync(3));
      Expect.throws<StateError>(() => c.addError("E"));
      Expect.throws<StateError>(() => c.addErrorSync("E"));
      Expect.throws<StateError>(() => c.addStream(Stream.empty()));
      lock2.complete();
    });
    await for (var v in stream) {
      Expect.equals(2, v);
      break; // Cancels subscription.
    }
    lock.complete();
    await lock2.future;
  }

  {
    var stream = Stream.multi((c) async {
      var c2 = StreamController();
      c.addStream(c2.stream);
      // Now adding stream, cannot add events at the same time (for now!).
      Expect.throws<StateError>(() => c.add(1));
      Expect.throws<StateError>(() => c.addSync(1));
      Expect.throws<StateError>(() => c.addError("Error"));
      Expect.throws<StateError>(() => c.addErrorSync("Error"));
      Expect.throws<StateError>(() => c.close());
      Expect.throws<StateError>(() => c.closeSync());
      await c2.close();
      c.add(42);
      c.close();
    });
    await for (var v in stream) {
      Expect.equals(42, v);
    }
  }
}
