// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:collection/collection.dart';

/// An asynchronous operation that can be cancelled.
///
/// The value of this operation is exposed as [value]. When this operation is
/// cancelled, [value] won't complete either successfully or with an error. If
/// [value] has already completed, cancelling the operation does nothing.
class CancelableOperation<T> {
  /// The completer that produced this operation.
  ///
  /// That completer is canceled when [cancel] is called.
  CancelableCompleter<T> _completer;

  CancelableOperation._(this._completer);

  /// Creates a [CancelableOperation] with the same result as the [result]
  /// future.
  ///
  /// When this operation is canceled, [onCancel] will be called and any value
  /// or error later produced by [result] will be discarded.
  /// If [onCancel] returns a [Future], it will be returned by [cancel].
  ///
  /// The [onCancel] function will be called synchronously
  /// when the new operation is canceled, and will be called at most once.
  ///
  /// Calling this constructor is equivalent to creating a
  /// [CancelableCompleter] and completing it with [result].
  factory CancelableOperation.fromFuture(Future<T> result,
          {FutureOr Function()? onCancel}) =>
      (CancelableCompleter<T>(onCancel: onCancel)..complete(result)).operation;

  /// Creates a [CancelableOperation] which completes to [value].
  ///
  /// Canceling this operation does nothing.
  ///
  /// Calling this constructor is equivalent to creating a
  /// [CancelableCompleter] and completing it with [value].
  factory CancelableOperation.fromValue(T value) =>
      (CancelableCompleter<T>()..complete(value)).operation;

  /// Creates a [CancelableOperation] wrapping [subscription].
  ///
  /// This overrides [StreamSubscription.onDone] and
  /// [StreamSubscription.onError] so that the returned operation will complete
  /// when the subscription completes or emits an error.
  /// When this operation is canceled or when it emits an error, the
  /// subscription will be canceled (unlike
  /// `CancelableOperation.fromFuture(subscription.asFuture())`).
  static CancelableOperation<void> fromSubscription(
      StreamSubscription<void> subscription) {
    var completer = CancelableCompleter<void>(onCancel: subscription.cancel);
    subscription.onDone(completer.complete);
    subscription.onError((Object error, StackTrace stackTrace) {
      subscription.cancel().whenComplete(() {
        completer.completeError(error, stackTrace);
      });
    });
    return completer.operation;
  }

  /// Creates a [CancelableOperation] that completes with the value of the first
  /// of [operations] to complete.
  ///
  /// Once any of [operations] completes, its result is forwarded to the
  /// new [CancelableOperation] and the rest are cancelled. If the
  /// new operation is cancelled, all the [operations] are cancelled as
  /// well.
  static CancelableOperation<T> race<T>(
      Iterable<CancelableOperation<T>> operations) {
    operations = operations.toList();
    if (operations.isEmpty) {
      throw ArgumentError('May not be empty', 'operations');
    }

    var done = false;
    // Note: if one or more of the completers have already completed,
    // they're not actually cancelled by this.
    Future<void> cancelAll() {
      done = true;
      return Future.wait([
        for (var operation in operations)
          if (!operation.isCanceled) operation.cancel()
      ]);
    }

    var completer = CancelableCompleter<T>(onCancel: cancelAll);
    for (var operation in operations) {
      operation.then((value) {
        if (!done) cancelAll().whenComplete(() => completer.complete(value));
      }, onError: (error, stackTrace) {
        if (!done) {
          cancelAll()
              .whenComplete(() => completer.completeError(error, stackTrace));
        }
      }, propagateCancel: false);
    }

    return completer.operation;
  }

  /// The result of this operation, if not cancelled.
  ///
  /// This future will not complete if the operation is cancelled.
  /// Use [valueOrCancellation] for a future which completes
  /// both if the operation is cancelled and if it isn't.
  Future<T> get value => _completer._inner?.future ?? Completer<T>().future;

  /// Creates a [Stream] containing the result of this operation.
  ///
  /// This is like `value.asStream()`, but if a subscription to the stream is
  /// canceled, this operation is as well.
  Stream<T> asStream() {
    var controller =
        StreamController<T>(sync: true, onCancel: _completer._cancel);

    _completer._inner?.future.then((value) {
      controller.add(value);
      controller.close();
    }, onError: (Object error, StackTrace stackTrace) {
      controller.addError(error, stackTrace);
      controller.close();
    });
    return controller.stream;
  }

  /// Creates a [Future] that completes when this operation completes *or* when
  /// it's cancelled.
  ///
  /// If this operation completes, this completes to the same result as [value].
  /// If this operation is cancelled, the returned future waits for the future
  /// returned by [cancel], then completes to [cancellationValue].
  Future<T?> valueOrCancellation([T? cancellationValue]) {
    var completer = Completer<T?>.sync();
    value.then(completer.complete, onError: completer.completeError);

    _completer._cancelCompleter?.future.then((_) {
      completer.complete(cancellationValue);
    }, onError: completer.completeError);

    return completer.future;
  }

  /// Creates a new cancelable operation to be completed when this operation
  /// completes normally or as an error, or is cancelled.
  ///
  /// If this operation completes normally the value is passed to [onValue]
  /// and the returned operation is completed with the result.
  ///
  /// If this operation completes as an error, and no [onError] callback is
  /// provided, the returned operation is completed with the same error and
  /// stack trace.
  /// If this operation completes as an error, and an [onError] callback is
  /// provided, the returned operation is completed with the result.
  ///
  /// If this operation is canceled, and no [onCancel] callback is provided,
  /// the returned operation is canceled.
  /// If this operation is canceled, and an [onCancel] callback is provided,
  /// the returned operation is completed with the result.
  ///
  /// At most one of [onValue], [onError], or [onCancel] will be called.
  /// If any of [onValue], [onError], or [onCancel] throw a synchronous error,
  /// or return a `Future` that completes as an error, the error will be
  /// forwarded through the returned operation.
  ///
  /// If the returned operation is canceled before this operation completes or
  /// is canceled, the [onValue], [onError], and [onCancel] callbacks will not
  /// be invoked. If [propagateCancel] is `true` (the default) then this
  /// operation is canceled as well. Pass `false` if there are multiple
  /// listeners on this operation and canceling the [onValue], [onError], and
  /// [onCancel] callbacks should not cancel the other listeners.
  CancelableOperation<R> then<R>(FutureOr<R> Function(T) onValue,
          {FutureOr<R> Function(Object, StackTrace)? onError,
          FutureOr<R> Function()? onCancel,
          bool propagateCancel = true}) =>
      thenOperation<R>((value, completer) {
        completer.complete(onValue(value));
      },
          onError: onError == null
              ? null
              : (error, stackTrace, completer) {
                  completer.complete(onError(error, stackTrace));
                },
          onCancel: onCancel == null
              ? null
              : (completer) {
                  completer.complete(onCancel());
                },
          propagateCancel: propagateCancel);

  /// Creates a new cancelable operation to be completed when this operation
  /// completes normally or as an error, or is cancelled.
  ///
  /// If this operation completes normally the value is passed to [onValue]
  /// with a [CancelableCompleter] controlling the returned operation.
  ///
  /// If this operation completes as an error, and no [onError] callback is
  /// provided, the returned operation is completed with the same error and
  /// stack trace.
  /// If this operation completes as an error, and an [onError] callback is
  /// provided, the error and stack trace are passed to [onError] with a
  /// [CancelableCompleter] controlling the returned operation.
  ///
  /// If this operation is canceled, and no [onCancel] callback is provided,
  /// the returned operation is canceled.
  /// If this operation is canceled, and an [onCancel] callback is provided,
  /// the [onCancel] callback is called with a [CancelableCompleter] controlling
  /// the returned operation.
  ///
  /// At most one of [onValue], [onError], or [onCancel] will be called.
  /// If any of [onValue], [onError], or [onCancel] throw a synchronous error,
  /// or return a `Future` that completes as an error, the error will be
  /// forwarded through the returned operation.
  ///
  /// If the returned operation is canceled before this operation completes or
  /// is canceled, the [onValue], [onError], and [onCancel] callbacks will not
  /// be invoked. If [propagateCancel] is `true` (the default) then this
  /// operation is canceled as well. Pass `false` if there are multiple
  /// listeners on this operation and canceling the [onValue], [onError], and
  /// [onCancel] callbacks should not cancel the other listeners.
  CancelableOperation<R> thenOperation<R>(
      FutureOr<void> Function(T, CancelableCompleter<R>) onValue,
      {FutureOr<void> Function(Object, StackTrace, CancelableCompleter<R>)?
          onError,
      FutureOr<void> Function(CancelableCompleter<R>)? onCancel,
      bool propagateCancel = true}) {
    final completer = CancelableCompleter<R>(
        onCancel: propagateCancel ? _cancelIfNotCanceled : null);

    // if `_completer._inner` completes before `completer` is cancelled
    // call `onValue` or `onError` with the result, and complete `completer`
    // with the result of that call (unless cancelled in the meantime).
    //
    // If `_completer._cancelCompleter` completes (always with a value)
    // before `completer` is cancelled, then call `onCancel` (if supplied)
    // with that that value and complete `completer` with the result of that
    // call (unless cancelled in the meantime).
    //
    // If any of the callbacks throw synchronously, the `completer` is
    // completed with that error.
    //
    // If no `onCancel` is provided, and `_completer._cancelCompleter`
    // completes before `completer` is cancelled,
    // then cancel `cancelCompleter`. (Cancelling twice is safe.)

    _completer._inner?.future.then<void>((value) async {
      if (completer.isCanceled) return;
      try {
        await onValue(value, completer);
      } catch (error, stack) {
        completer.completeError(error, stack);
      }
    },
        onError: onError == null
            ? completer.completeError // Is ignored if already cancelled.
            : (Object error, StackTrace stack) async {
                if (completer.isCanceled) return;
                try {
                  await onError(error, stack, completer);
                } catch (error2, stack2) {
                  completer.completeErrorIfPending(
                      error2, identical(error, error2) ? stack : stack2);
                }
              });
    final cancelForwarder = _CancelForwarder<R>(completer, onCancel);
    if (_completer.isCanceled) {
      cancelForwarder._forward();
    } else {
      (_completer._cancelForwarders ??= []).add(cancelForwarder);
    }
    return completer.operation;
  }

  /// Cancels this operation.
  ///
  /// If this operation [isCompleted] or [isCanceled] this call is ignored.
  /// Returns the result of the `onCancel` callback, if one exists.
  Future cancel() => _completer._cancel();

  Future<void>? _cancelIfNotCanceled() => isCanceled ? null : cancel();

  /// Whether this operation has been canceled before it completed.
  bool get isCanceled => _completer._isCanceled;

  /// Whether the result of this operation is ready.
  ///
  /// When ready, the [value] future is completed with the result value
  /// or error, and this operation can no longer be cancelled.
  /// An operation may be complete before the listeners on [value] are invoked.
  bool get isCompleted => _completer._isCompleted;
}

/// A completer for a [CancelableOperation].
class CancelableCompleter<T> {
  // The cancelable completer is in one of the following states:
  // * Initial:
  //      _inner != null
  //      _cancelCompleter != null
  //      _mayComplete: true
  //
  // * Async-completed: `complete` called with a future while Initial.
  //      _inner != null
  //      _cancelCompleter != null
  //      _mayComplete: false
  //
  // * Completed: `complete` called with a value or `completeError` called
  //     while Initial, or the future passed in Async-completed completes
  //     while AsyncCompleted.
  //      _inner != null
  //      _cancelCompleter == null
  //      _mayComplete: false
  //
  // * Cancelled may-complete: `_cancel` called while Initial.
  //      Allows calling `complete`/`completeError` even if it does nothing.
  //      _inner == null
  //      _cancelCompleter != null
  //      _mayComplete: true
  //
  // * Cancelled can't-complete: `_cancel` called while Async-completed.
  //      _inner == null
  //      _cancelCompleter != null
  //      _mayComplete: false

  /// The completer for the wrapped future.
  ///
  /// At most one of `_inner.future` and `_cancelCompleter.future` will
  /// ever complete.
  /// Set to `null` when when the operation is canceled, because then
  /// it's guaranteed that this completer will never complete.
  Completer<T>? _inner = Completer<T>();

  /// Completed when `cancel` is called.
  ///
  /// At most one of `_inner.future` and `_cancelCompleter.future` will
  /// ever complete.
  /// Set to `null` when [_inner] is completed, because then it's
  /// guaranteed that this completer will never complete.
  Completer<Object?>? _cancelCompleter = Completer<Object?>();

  /// The callback to call if the operation is canceled.
  final FutureOr<void> Function()? _onCancel;

  /// Additional cancellations to forward during cancel.
  ///
  /// When a cancelable operation is chained through `then` or `thenOperation` a
  /// cancellation on the original operation will synchronously cancel the
  /// chained operations.
  List<_CancelForwarder>? _cancelForwarders;

  /// Whether [complete] or [completeError] may still be called.
  ///
  /// Set to false when calling either.
  ///
  /// When completing by calling [complete] with a future,
  /// it's still possible to cancel until the result is actually
  /// available.
  /// You are also allowed to call [complete] or [completeError]
  /// after the operation has been canceled, as long as you only call it once.
  /// It just won't do anything after the operation is cancelled.
  /// This value only guards the calls to [complete] and [completeError].
  bool _mayComplete = true;

  /// The operation controlled by this completer.
  late final operation = CancelableOperation<T>._(this);

  /// Creates a new completer for a [CancelableOperation].
  ///
  /// The cancelable [operation] can be completed using
  /// [complete] or [completeError].
  ///
  /// The [onCancel] function is called if the [operation] is canceled,
  /// by calling [CancelableOperation.cancel]
  /// before the operation has completed.
  /// If [onCancel] returns a [Future],
  /// that future is also returned by [CancelableOperation.cancel].
  ///
  /// The [onCancel] function will be called at most once.
  CancelableCompleter({FutureOr Function()? onCancel}) : _onCancel = onCancel;

  /// Whether the [_inner] completer has been completed.
  ///
  /// At this point it's no longer possible to cancel the operation.
  bool get _isCompleted => _cancelCompleter == null;

  /// Whether the completer was canceled before the result was ready.
  ///
  /// At this point, it's no longer possible to complete the operation.
  bool get _isCanceled => _inner == null;

  /// Whether the [complete] or [completeError] have been called.
  ///
  /// Once this completer has been completed with either a result or error,
  /// neither method may be called again.
  ///
  /// If [complete] was called with a [Future] argument, this completer may be
  /// completed before it's [operation] is completed. In that case the
  /// [operation] may still be canceled before the result is available.
  bool get isCompleted => !_mayComplete;

  /// Whether the completer was canceled before the result was ready.
  bool get isCanceled => _isCanceled;

  /// Completes [operation] with [value].
  ///
  /// If [value] is a [Future] the [operation] will complete
  /// with the result of that `Future` once it is available.
  /// In that case [isCompleted] will be `true` before the [operation]
  /// is complete.
  ///
  /// If the type [T] is not nullable [value] may be not be omitted or `null`.
  ///
  /// This method may not be called after either [complete] or [completeError]
  /// has been called once.
  /// The [isCompleted] is true when either of these methods have been called.
  void complete([FutureOr<T>? value]) {
    if (!_mayComplete) throw StateError('Operation already completed');
    _mayComplete = false;

    if (value is! Future<T>) {
      _completeNow()?.complete(value);
      return;
    }

    if (_inner == null) {
      // Make sure errors from [value] aren't top-leveled.
      value.ignore();
      return;
    }

    value.then((result) {
      _completeNow()?.complete(result);
    }, onError: (Object error, StackTrace stackTrace) {
      _completeNow()?.completeError(error, stackTrace);
    });
  }

  /// Makes this [CancelableCompleter.operation] complete with the same result
  /// as [result].
  ///
  /// If [propagateCancel] is `true` (the default), and the [operation] of this
  /// completer is canceled before [result] completes, then [result] is also
  /// canceled.
  void completeOperation(CancelableOperation<T> result,
      {bool propagateCancel = true}) {
    if (!_mayComplete) throw StateError('Already completed');
    _mayComplete = false;
    if (isCanceled) {
      if (propagateCancel) result.cancel();
      result.value.ignore();
      return;
    }
    result.then<void>((value) {
      _inner?.complete(
          value); // _inner is set to null if this.operation is cancelled.
    }, onError: (error, stack) {
      _inner?.completeError(error, stack);
    }, onCancel: () {
      operation.cancel();
    });
    if (propagateCancel) {
      _cancelCompleter?.future.whenComplete(result.cancel);
    }
  }

  /// Completer to use for completing with a result.
  ///
  /// Returns `null` if it's not possible to complete any more.
  /// Sets [_cancelCompleter] to `null` if returning non-`null`.
  Completer<T>? _completeNow() {
    var inner = _inner;
    if (inner == null) return null;
    _cancelCompleter = null;
    return inner;
  }

  /// Completes [operation] with [error] and [stackTrace].
  ///
  /// This method may not be called after either [complete] or [completeError]
  /// has been called once.
  /// The [isCompleted] is true when either of these methods have been called.
  void completeError(Object error, [StackTrace? stackTrace]) {
    if (!_mayComplete) throw StateError('Operation already completed');
    _mayComplete = false;
    _completeNow()?.completeError(error, stackTrace);
  }

  /// Cancels the operation.
  ///
  /// If the operation has already completed, prior to being cancelled,
  /// this method does nothing.
  /// If the operation has already been cancelled, this method returns
  /// the same result as the first call to `_cancel`.
  ///
  /// The result of the operation may only be available some time after
  /// the completer has been completed (using [complete] or [completeError],
  /// which sets [isCompleted] to true) if completed with a [Future].
  /// The completer can be cancelled until the result becomes available,
  /// even if [isCompleted] is true.
  Future<void> _cancel() {
    var cancelCompleter = _cancelCompleter;
    if (cancelCompleter == null) return Future.value(null);

    if (_inner != null) {
      _inner = null;
      cancelCompleter.complete(_invokeCancelCallbacks());
    }
    return cancelCompleter.future;
  }

  /// Invoke [_onCancel] and forward to other completers in [_cancelForwarders].
  ///
  /// Returns the same value as [_onCancel]. Legacy uses may return a value
  /// despite the signature having `void` return.
  Future<Object?> _invokeCancelCallbacks() async {
    final FutureOr<Object?> toReturn = _onCancel?.call();
    final isFuture = toReturn is Future;
    final cancelFutures = <Future<Object?>>[
      if (isFuture) toReturn,
      ...?_cancelForwarders?.map(_forward).whereNotNull()
    ];
    final results = (isFuture && cancelFutures.length == 1)
        ? [await toReturn]
        : cancelFutures.isNotEmpty
            ? await Future.wait(cancelFutures)
            : const [];
    return isFuture ? results.first : toReturn;
  }
}

class _CancelForwarder<T> {
  final CancelableCompleter<T> completer;
  final FutureOr<void> Function(CancelableCompleter<T>)? onCancel;
  _CancelForwarder(this.completer, this.onCancel);

  Future<void>? _forward() {
    if (completer.isCanceled) return null;
    final onCancel = this.onCancel;
    if (onCancel == null) return completer._cancel();
    try {
      final result = onCancel(completer);
      if (result is Future) {
        return result.catchError(completer.completeErrorIfPending);
      }
    } catch (error, stack) {
      completer.completeErrorIfPending(error, stack);
    }
    return null;
  }
}

// Helper function to avoid a closure for  `List<_CancelForwarder>.map`.
Future<void>? _forward(_CancelForwarder<Object?> forwarder) =>
    forwarder._forward();

extension on CancelableCompleter {
  void completeErrorIfPending(Object error, StackTrace stackTrace) {
    if (isCompleted) return;
    completeError(error, stackTrace);
  }
}
