| // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| import 'dart:async'; |
| |
| import 'package:collection/collection.dart'; |
| |
| /// An asynchronous operation that can be cancelled. |
| /// |
| /// The value of this operation is exposed as [value]. When this operation is |
| /// cancelled, [value] won't complete either successfully or with an error. If |
| /// [value] has already completed, cancelling the operation does nothing. |
| class CancelableOperation<T> { |
| /// The completer that produced this operation. |
| /// |
| /// That completer is canceled when [cancel] is called. |
| CancelableCompleter<T> _completer; |
| |
| CancelableOperation._(this._completer); |
| |
| /// Creates a [CancelableOperation] with the same result as the [result] |
| /// future. |
| /// |
| /// When this operation is canceled, [onCancel] will be called and any value |
| /// or error later produced by [result] will be discarded. |
| /// If [onCancel] returns a [Future], it will be returned by [cancel]. |
| /// |
| /// The [onCancel] function will be called synchronously |
| /// when the new operation is canceled, and will be called at most once. |
| /// |
| /// Calling this constructor is equivalent to creating a |
| /// [CancelableCompleter] and completing it with [result]. |
| factory CancelableOperation.fromFuture(Future<T> result, |
| {FutureOr Function()? onCancel}) => |
| (CancelableCompleter<T>(onCancel: onCancel)..complete(result)).operation; |
| |
| /// Creates a [CancelableOperation] which completes to [value]. |
| /// |
| /// Canceling this operation does nothing. |
| /// |
| /// Calling this constructor is equivalent to creating a |
| /// [CancelableCompleter] and completing it with [value]. |
| factory CancelableOperation.fromValue(T value) => |
| (CancelableCompleter<T>()..complete(value)).operation; |
| |
| /// Creates a [CancelableOperation] wrapping [subscription]. |
| /// |
| /// This overrides [StreamSubscription.onDone] and |
| /// [StreamSubscription.onError] so that the returned operation will complete |
| /// when the subscription completes or emits an error. |
| /// When this operation is canceled or when it emits an error, the |
| /// subscription will be canceled (unlike |
| /// `CancelableOperation.fromFuture(subscription.asFuture())`). |
| static CancelableOperation<void> fromSubscription( |
| StreamSubscription<void> subscription) { |
| var completer = CancelableCompleter<void>(onCancel: subscription.cancel); |
| subscription.onDone(completer.complete); |
| subscription.onError((Object error, StackTrace stackTrace) { |
| subscription.cancel().whenComplete(() { |
| completer.completeError(error, stackTrace); |
| }); |
| }); |
| return completer.operation; |
| } |
| |
| /// Creates a [CancelableOperation] that completes with the value of the first |
| /// of [operations] to complete. |
| /// |
| /// Once any of [operations] completes, its result is forwarded to the |
| /// new [CancelableOperation] and the rest are cancelled. If the |
| /// new operation is cancelled, all the [operations] are cancelled as |
| /// well. |
| static CancelableOperation<T> race<T>( |
| Iterable<CancelableOperation<T>> operations) { |
| operations = operations.toList(); |
| if (operations.isEmpty) { |
| throw ArgumentError('May not be empty', 'operations'); |
| } |
| |
| var done = false; |
| // Note: if one or more of the completers have already completed, |
| // they're not actually cancelled by this. |
| Future<void> cancelAll() { |
| done = true; |
| return Future.wait([ |
| for (var operation in operations) |
| if (!operation.isCanceled) operation.cancel() |
| ]); |
| } |
| |
| var completer = CancelableCompleter<T>(onCancel: cancelAll); |
| for (var operation in operations) { |
| operation.then((value) { |
| if (!done) cancelAll().whenComplete(() => completer.complete(value)); |
| }, onError: (error, stackTrace) { |
| if (!done) { |
| cancelAll() |
| .whenComplete(() => completer.completeError(error, stackTrace)); |
| } |
| }, propagateCancel: false); |
| } |
| |
| return completer.operation; |
| } |
| |
| /// The result of this operation, if not cancelled. |
| /// |
| /// This future will not complete if the operation is cancelled. |
| /// Use [valueOrCancellation] for a future which completes |
| /// both if the operation is cancelled and if it isn't. |
| Future<T> get value => _completer._inner?.future ?? Completer<T>().future; |
| |
| /// Creates a [Stream] containing the result of this operation. |
| /// |
| /// This is like `value.asStream()`, but if a subscription to the stream is |
| /// canceled, this operation is as well. |
| Stream<T> asStream() { |
| var controller = |
| StreamController<T>(sync: true, onCancel: _completer._cancel); |
| |
| _completer._inner?.future.then((value) { |
| controller.add(value); |
| controller.close(); |
| }, onError: (Object error, StackTrace stackTrace) { |
| controller.addError(error, stackTrace); |
| controller.close(); |
| }); |
| return controller.stream; |
| } |
| |
| /// Creates a [Future] that completes when this operation completes *or* when |
| /// it's cancelled. |
| /// |
| /// If this operation completes, this completes to the same result as [value]. |
| /// If this operation is cancelled, the returned future waits for the future |
| /// returned by [cancel], then completes to [cancellationValue]. |
| Future<T?> valueOrCancellation([T? cancellationValue]) { |
| var completer = Completer<T?>.sync(); |
| value.then(completer.complete, onError: completer.completeError); |
| |
| _completer._cancelCompleter?.future.then((_) { |
| completer.complete(cancellationValue); |
| }, onError: completer.completeError); |
| |
| return completer.future; |
| } |
| |
| /// Creates a new cancelable operation to be completed when this operation |
| /// completes normally or as an error, or is cancelled. |
| /// |
| /// If this operation completes normally the value is passed to [onValue] |
| /// and the returned operation is completed with the result. |
| /// |
| /// If this operation completes as an error, and no [onError] callback is |
| /// provided, the returned operation is completed with the same error and |
| /// stack trace. |
| /// If this operation completes as an error, and an [onError] callback is |
| /// provided, the returned operation is completed with the result. |
| /// |
| /// If this operation is canceled, and no [onCancel] callback is provided, |
| /// the returned operation is canceled. |
| /// If this operation is canceled, and an [onCancel] callback is provided, |
| /// the returned operation is completed with the result. |
| /// |
| /// At most one of [onValue], [onError], or [onCancel] will be called. |
| /// If any of [onValue], [onError], or [onCancel] throw a synchronous error, |
| /// or return a `Future` that completes as an error, the error will be |
| /// forwarded through the returned operation. |
| /// |
| /// If the returned operation is canceled before this operation completes or |
| /// is canceled, the [onValue], [onError], and [onCancel] callbacks will not |
| /// be invoked. If [propagateCancel] is `true` (the default) then this |
| /// operation is canceled as well. Pass `false` if there are multiple |
| /// listeners on this operation and canceling the [onValue], [onError], and |
| /// [onCancel] callbacks should not cancel the other listeners. |
| CancelableOperation<R> then<R>(FutureOr<R> Function(T) onValue, |
| {FutureOr<R> Function(Object, StackTrace)? onError, |
| FutureOr<R> Function()? onCancel, |
| bool propagateCancel = true}) => |
| thenOperation<R>((value, completer) { |
| completer.complete(onValue(value)); |
| }, |
| onError: onError == null |
| ? null |
| : (error, stackTrace, completer) { |
| completer.complete(onError(error, stackTrace)); |
| }, |
| onCancel: onCancel == null |
| ? null |
| : (completer) { |
| completer.complete(onCancel()); |
| }, |
| propagateCancel: propagateCancel); |
| |
| /// Creates a new cancelable operation to be completed when this operation |
| /// completes normally or as an error, or is cancelled. |
| /// |
| /// If this operation completes normally the value is passed to [onValue] |
| /// with a [CancelableCompleter] controlling the returned operation. |
| /// |
| /// If this operation completes as an error, and no [onError] callback is |
| /// provided, the returned operation is completed with the same error and |
| /// stack trace. |
| /// If this operation completes as an error, and an [onError] callback is |
| /// provided, the error and stack trace are passed to [onError] with a |
| /// [CancelableCompleter] controlling the returned operation. |
| /// |
| /// If this operation is canceled, and no [onCancel] callback is provided, |
| /// the returned operation is canceled. |
| /// If this operation is canceled, and an [onCancel] callback is provided, |
| /// the [onCancel] callback is called with a [CancelableCompleter] controlling |
| /// the returned operation. |
| /// |
| /// At most one of [onValue], [onError], or [onCancel] will be called. |
| /// If any of [onValue], [onError], or [onCancel] throw a synchronous error, |
| /// or return a `Future` that completes as an error, the error will be |
| /// forwarded through the returned operation. |
| /// |
| /// If the returned operation is canceled before this operation completes or |
| /// is canceled, the [onValue], [onError], and [onCancel] callbacks will not |
| /// be invoked. If [propagateCancel] is `true` (the default) then this |
| /// operation is canceled as well. Pass `false` if there are multiple |
| /// listeners on this operation and canceling the [onValue], [onError], and |
| /// [onCancel] callbacks should not cancel the other listeners. |
| CancelableOperation<R> thenOperation<R>( |
| FutureOr<void> Function(T, CancelableCompleter<R>) onValue, |
| {FutureOr<void> Function(Object, StackTrace, CancelableCompleter<R>)? |
| onError, |
| FutureOr<void> Function(CancelableCompleter<R>)? onCancel, |
| bool propagateCancel = true}) { |
| final completer = CancelableCompleter<R>( |
| onCancel: propagateCancel ? _cancelIfNotCanceled : null); |
| |
| // if `_completer._inner` completes before `completer` is cancelled |
| // call `onValue` or `onError` with the result, and complete `completer` |
| // with the result of that call (unless cancelled in the meantime). |
| // |
| // If `_completer._cancelCompleter` completes (always with a value) |
| // before `completer` is cancelled, then call `onCancel` (if supplied) |
| // with that that value and complete `completer` with the result of that |
| // call (unless cancelled in the meantime). |
| // |
| // If any of the callbacks throw synchronously, the `completer` is |
| // completed with that error. |
| // |
| // If no `onCancel` is provided, and `_completer._cancelCompleter` |
| // completes before `completer` is cancelled, |
| // then cancel `cancelCompleter`. (Cancelling twice is safe.) |
| |
| _completer._inner?.future.then<void>((value) async { |
| if (completer.isCanceled) return; |
| try { |
| await onValue(value, completer); |
| } catch (error, stack) { |
| completer.completeError(error, stack); |
| } |
| }, |
| onError: onError == null |
| ? completer.completeError // Is ignored if already cancelled. |
| : (Object error, StackTrace stack) async { |
| if (completer.isCanceled) return; |
| try { |
| await onError(error, stack, completer); |
| } catch (error2, stack2) { |
| completer.completeErrorIfPending( |
| error2, identical(error, error2) ? stack : stack2); |
| } |
| }); |
| final cancelForwarder = _CancelForwarder<R>(completer, onCancel); |
| if (_completer.isCanceled) { |
| cancelForwarder._forward(); |
| } else { |
| (_completer._cancelForwarders ??= []).add(cancelForwarder); |
| } |
| return completer.operation; |
| } |
| |
| /// Cancels this operation. |
| /// |
| /// If this operation [isCompleted] or [isCanceled] this call is ignored. |
| /// Returns the result of the `onCancel` callback, if one exists. |
| Future cancel() => _completer._cancel(); |
| |
| Future<void>? _cancelIfNotCanceled() => isCanceled ? null : cancel(); |
| |
| /// Whether this operation has been canceled before it completed. |
| bool get isCanceled => _completer._isCanceled; |
| |
| /// Whether the result of this operation is ready. |
| /// |
| /// When ready, the [value] future is completed with the result value |
| /// or error, and this operation can no longer be cancelled. |
| /// An operation may be complete before the listeners on [value] are invoked. |
| bool get isCompleted => _completer._isCompleted; |
| } |
| |
| /// A completer for a [CancelableOperation]. |
| class CancelableCompleter<T> { |
| // The cancelable completer is in one of the following states: |
| // * Initial: |
| // _inner != null |
| // _cancelCompleter != null |
| // _mayComplete: true |
| // |
| // * Async-completed: `complete` called with a future while Initial. |
| // _inner != null |
| // _cancelCompleter != null |
| // _mayComplete: false |
| // |
| // * Completed: `complete` called with a value or `completeError` called |
| // while Initial, or the future passed in Async-completed completes |
| // while AsyncCompleted. |
| // _inner != null |
| // _cancelCompleter == null |
| // _mayComplete: false |
| // |
| // * Cancelled may-complete: `_cancel` called while Initial. |
| // Allows calling `complete`/`completeError` even if it does nothing. |
| // _inner == null |
| // _cancelCompleter != null |
| // _mayComplete: true |
| // |
| // * Cancelled can't-complete: `_cancel` called while Async-completed. |
| // _inner == null |
| // _cancelCompleter != null |
| // _mayComplete: false |
| |
| /// The completer for the wrapped future. |
| /// |
| /// At most one of `_inner.future` and `_cancelCompleter.future` will |
| /// ever complete. |
| /// Set to `null` when when the operation is canceled, because then |
| /// it's guaranteed that this completer will never complete. |
| Completer<T>? _inner = Completer<T>(); |
| |
| /// Completed when `cancel` is called. |
| /// |
| /// At most one of `_inner.future` and `_cancelCompleter.future` will |
| /// ever complete. |
| /// Set to `null` when [_inner] is completed, because then it's |
| /// guaranteed that this completer will never complete. |
| Completer<Object?>? _cancelCompleter = Completer<Object?>(); |
| |
| /// The callback to call if the operation is canceled. |
| final FutureOr<void> Function()? _onCancel; |
| |
| /// Additional cancellations to forward during cancel. |
| /// |
| /// When a cancelable operation is chained through `then` or `thenOperation` a |
| /// cancellation on the original operation will synchronously cancel the |
| /// chained operations. |
| List<_CancelForwarder>? _cancelForwarders; |
| |
| /// Whether [complete] or [completeError] may still be called. |
| /// |
| /// Set to false when calling either. |
| /// |
| /// When completing by calling [complete] with a future, |
| /// it's still possible to cancel until the result is actually |
| /// available. |
| /// You are also allowed to call [complete] or [completeError] |
| /// after the operation has been canceled, as long as you only call it once. |
| /// It just won't do anything after the operation is cancelled. |
| /// This value only guards the calls to [complete] and [completeError]. |
| bool _mayComplete = true; |
| |
| /// The operation controlled by this completer. |
| late final operation = CancelableOperation<T>._(this); |
| |
| /// Creates a new completer for a [CancelableOperation]. |
| /// |
| /// The cancelable [operation] can be completed using |
| /// [complete] or [completeError]. |
| /// |
| /// The [onCancel] function is called if the [operation] is canceled, |
| /// by calling [CancelableOperation.cancel] |
| /// before the operation has completed. |
| /// If [onCancel] returns a [Future], |
| /// that future is also returned by [CancelableOperation.cancel]. |
| /// |
| /// The [onCancel] function will be called at most once. |
| CancelableCompleter({FutureOr Function()? onCancel}) : _onCancel = onCancel; |
| |
| /// Whether the [_inner] completer has been completed. |
| /// |
| /// At this point it's no longer possible to cancel the operation. |
| bool get _isCompleted => _cancelCompleter == null; |
| |
| /// Whether the completer was canceled before the result was ready. |
| /// |
| /// At this point, it's no longer possible to complete the operation. |
| bool get _isCanceled => _inner == null; |
| |
| /// Whether the [complete] or [completeError] have been called. |
| /// |
| /// Once this completer has been completed with either a result or error, |
| /// neither method may be called again. |
| /// |
| /// If [complete] was called with a [Future] argument, this completer may be |
| /// completed before it's [operation] is completed. In that case the |
| /// [operation] may still be canceled before the result is available. |
| bool get isCompleted => !_mayComplete; |
| |
| /// Whether the completer was canceled before the result was ready. |
| bool get isCanceled => _isCanceled; |
| |
| /// Completes [operation] with [value]. |
| /// |
| /// If [value] is a [Future] the [operation] will complete |
| /// with the result of that `Future` once it is available. |
| /// In that case [isCompleted] will be `true` before the [operation] |
| /// is complete. |
| /// |
| /// If the type [T] is not nullable [value] may be not be omitted or `null`. |
| /// |
| /// This method may not be called after either [complete] or [completeError] |
| /// has been called once. |
| /// The [isCompleted] is true when either of these methods have been called. |
| void complete([FutureOr<T>? value]) { |
| if (!_mayComplete) throw StateError('Operation already completed'); |
| _mayComplete = false; |
| |
| if (value is! Future<T>) { |
| _completeNow()?.complete(value); |
| return; |
| } |
| |
| if (_inner == null) { |
| // Make sure errors from [value] aren't top-leveled. |
| value.ignore(); |
| return; |
| } |
| |
| value.then((result) { |
| _completeNow()?.complete(result); |
| }, onError: (Object error, StackTrace stackTrace) { |
| _completeNow()?.completeError(error, stackTrace); |
| }); |
| } |
| |
| /// Makes this [CancelableCompleter.operation] complete with the same result |
| /// as [result]. |
| /// |
| /// If [propagateCancel] is `true` (the default), and the [operation] of this |
| /// completer is canceled before [result] completes, then [result] is also |
| /// canceled. |
| void completeOperation(CancelableOperation<T> result, |
| {bool propagateCancel = true}) { |
| if (!_mayComplete) throw StateError('Already completed'); |
| _mayComplete = false; |
| if (isCanceled) { |
| if (propagateCancel) result.cancel(); |
| result.value.ignore(); |
| return; |
| } |
| result.then<void>((value) { |
| _inner?.complete( |
| value); // _inner is set to null if this.operation is cancelled. |
| }, onError: (error, stack) { |
| _inner?.completeError(error, stack); |
| }, onCancel: () { |
| operation.cancel(); |
| }); |
| if (propagateCancel) { |
| _cancelCompleter?.future.whenComplete(result.cancel); |
| } |
| } |
| |
| /// Completer to use for completing with a result. |
| /// |
| /// Returns `null` if it's not possible to complete any more. |
| /// Sets [_cancelCompleter] to `null` if returning non-`null`. |
| Completer<T>? _completeNow() { |
| var inner = _inner; |
| if (inner == null) return null; |
| _cancelCompleter = null; |
| return inner; |
| } |
| |
| /// Completes [operation] with [error] and [stackTrace]. |
| /// |
| /// This method may not be called after either [complete] or [completeError] |
| /// has been called once. |
| /// The [isCompleted] is true when either of these methods have been called. |
| void completeError(Object error, [StackTrace? stackTrace]) { |
| if (!_mayComplete) throw StateError('Operation already completed'); |
| _mayComplete = false; |
| _completeNow()?.completeError(error, stackTrace); |
| } |
| |
| /// Cancels the operation. |
| /// |
| /// If the operation has already completed, prior to being cancelled, |
| /// this method does nothing. |
| /// If the operation has already been cancelled, this method returns |
| /// the same result as the first call to `_cancel`. |
| /// |
| /// The result of the operation may only be available some time after |
| /// the completer has been completed (using [complete] or [completeError], |
| /// which sets [isCompleted] to true) if completed with a [Future]. |
| /// The completer can be cancelled until the result becomes available, |
| /// even if [isCompleted] is true. |
| Future<void> _cancel() { |
| var cancelCompleter = _cancelCompleter; |
| if (cancelCompleter == null) return Future.value(null); |
| |
| if (_inner != null) { |
| _inner = null; |
| cancelCompleter.complete(_invokeCancelCallbacks()); |
| } |
| return cancelCompleter.future; |
| } |
| |
| /// Invoke [_onCancel] and forward to other completers in [_cancelForwarders]. |
| /// |
| /// Returns the same value as [_onCancel]. Legacy uses may return a value |
| /// despite the signature having `void` return. |
| Future<Object?> _invokeCancelCallbacks() async { |
| final FutureOr<Object?> toReturn = _onCancel?.call(); |
| final isFuture = toReturn is Future; |
| final cancelFutures = <Future<Object?>>[ |
| if (isFuture) toReturn, |
| ...?_cancelForwarders?.map(_forward).whereNotNull() |
| ]; |
| final results = (isFuture && cancelFutures.length == 1) |
| ? [await toReturn] |
| : cancelFutures.isNotEmpty |
| ? await Future.wait(cancelFutures) |
| : const []; |
| return isFuture ? results.first : toReturn; |
| } |
| } |
| |
| class _CancelForwarder<T> { |
| final CancelableCompleter<T> completer; |
| final FutureOr<void> Function(CancelableCompleter<T>)? onCancel; |
| _CancelForwarder(this.completer, this.onCancel); |
| |
| Future<void>? _forward() { |
| if (completer.isCanceled) return null; |
| final onCancel = this.onCancel; |
| if (onCancel == null) return completer._cancel(); |
| try { |
| final result = onCancel(completer); |
| if (result is Future) { |
| return result.catchError(completer.completeErrorIfPending); |
| } |
| } catch (error, stack) { |
| completer.completeErrorIfPending(error, stack); |
| } |
| return null; |
| } |
| } |
| |
| // Helper function to avoid a closure for `List<_CancelForwarder>.map`. |
| Future<void>? _forward(_CancelForwarder<Object?> forwarder) => |
| forwarder._forward(); |
| |
| extension on CancelableCompleter { |
| void completeErrorIfPending(Object error, StackTrace stackTrace) { |
| if (isCompleted) return; |
| completeError(error, stackTrace); |
| } |
| } |