blob: 976860f953e3b1326c340274800fa1ff7492ef90 [file] [log] [blame]
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
/// Note: the VM concatenates all patch files into a single patch file. This
/// file is the first patch in "dart:async" which contains all the imports used
/// by patches of that library. We plan to change this when we have a shared
/// front end and simply use parts.
import "dart:_internal" show VMLibraryHooks, patch, unsafeCast;
/// These are the additional parts of this patch library:
// part "deferred_load_patch.dart";
// part "schedule_microtask_patch.dart";
// part "timer_patch.dart";
// Equivalent of calling FATAL from C++ code.
@pragma("vm:external-name", "DartAsync_fatal")
external _fatal(msg);
// We need to pass the value as first argument and leave the second and third
// arguments empty (used for error handling).
@pragma("vm:recognized", "other")
dynamic Function(dynamic) _asyncThenWrapperHelper(
dynamic Function(dynamic, dynamic) continuation) {
@pragma("vm:invisible")
dynamic thenWrapper(dynamic arg) => continuation(arg, /*stack_trace=*/ null);
// Any function that is used as an asynchronous callback must be registered
// in the current Zone. Normally, this is done by the future when a
// callback is registered (for example with `.then` or `.catchError`). In our
// case we want to reuse the same callback multiple times and therefore avoid
// the multiple registrations. For our internal futures (`_Future`) we can
// use the shortcut-version of `.then`, and skip the registration. However,
// that means that the continuation must be registered by us.
//
// Furthermore, we know that the root-zone doesn't actually do anything and
// we can therefore skip the registration call for it.
//
// Note, that the continuation accepts up to three arguments. If the current
// zone is the root zone, we don't wrap the continuation, and a bad
// `Future` implementation could potentially invoke the callback with the
// wrong number of arguments.
final currentZone = Zone._current;
if (identical(currentZone, _rootZone) ||
identical(currentZone._registerUnaryCallback,
_rootZone._registerUnaryCallback)) {
return thenWrapper;
}
return currentZone.registerUnaryCallback<dynamic, dynamic>(thenWrapper);
}
// We need to pass the exception and stack trace objects as second and third
// parameter to the continuation.
dynamic Function(Object, StackTrace) _asyncErrorWrapperHelper(
dynamic Function(dynamic, StackTrace) errorCallback) {
final currentZone = Zone._current;
if (identical(currentZone, _rootZone) ||
identical(currentZone._registerBinaryCallback,
_rootZone._registerBinaryCallback)) {
return errorCallback;
}
return currentZone
.registerBinaryCallback<dynamic, Object, StackTrace>(errorCallback);
}
/// Registers the [thenCallback] and [errorCallback] on the given [object].
///
/// If [object] is not a future, then it is wrapped into one.
///
/// Returns the result of registering with `.then`.
Future _awaitHelper(var object, dynamic Function(dynamic) thenCallback,
dynamic Function(Object, StackTrace) errorCallback) {
_Future future;
if (object is _Future) {
future = object;
} else if (object is! Future) {
future = new _Future().._setValue(object);
} else {
return object.then(thenCallback, onError: errorCallback);
}
// `object` is a `_Future`.
//
// Since the callbacks have been registered in the current zone (see
// [_asyncThenWrapperHelper] and [_asyncErrorWrapperHelper]), we can avoid
// another registration and directly invoke the no-zone-registration `.then`.
//
// We can only do this for our internal futures (the default implementation of
// all futures that are constructed by the `dart:async` library).
return future._thenAwait<dynamic>(thenCallback, errorCallback);
}
@pragma("vm:entry-point", "call")
void _asyncStarMoveNextHelper(var stream) {
if (stream is! _StreamImpl) {
return;
}
// stream is a _StreamImpl.
final generator = stream._generator;
if (generator == null) {
// No generator registered, this isn't an async* Stream.
return;
}
_moveNextDebuggerStepCheck(generator);
}
// _AsyncStarStreamController is used by the compiler to implement
// async* generator functions.
@pragma("vm:entry-point")
class _AsyncStarStreamController<T> {
@pragma("vm:entry-point")
StreamController<T> controller;
@pragma("vm:entry-point")
Function? asyncStarBody;
bool isAdding = false;
bool onListenReceived = false;
bool isScheduled = false;
bool isSuspendedAtYield = false;
_Future? cancellationFuture = null;
/// Argument passed to the generator when it is resumed after an addStream.
///
/// `true` if the generator should exit after `yield*` resumes.
/// `false` if the generator should continue after `yield*` resumes.
/// `null` otherwies.
bool? continuationArgument = null;
Stream<T> get stream {
final Stream<T> local = controller.stream;
if (local is _StreamImpl<T>) {
local._generator = asyncStarBody!;
}
return local;
}
void runBody() {
isScheduled = false;
isSuspendedAtYield = false;
final bool? argument = continuationArgument;
continuationArgument = null;
asyncStarBody!(argument, null);
}
void scheduleGenerator() {
if (isScheduled || controller.isPaused || isAdding) {
return;
}
isScheduled = true;
scheduleMicrotask(runBody);
}
// Adds element to stream, returns true if the caller should terminate
// execution of the generator.
//
// TODO(hausner): Per spec, the generator should be suspended before
// exiting when the stream is closed. We could add a getter like this:
// get isCancelled => controller.hasListener;
// The generator would translate a 'yield e' statement to
// controller.add(e);
// suspend;
// if (controller.isCancelled) return;
@pragma("vm:entry-point", "call")
bool add(T event) {
if (!onListenReceived) _fatal("yield before stream is listened to");
if (isSuspendedAtYield) _fatal("unexpected yield");
controller.add(event);
if (!controller.hasListener) {
return true;
}
scheduleGenerator();
isSuspendedAtYield = true;
return false;
}
// Adds the elements of stream into this controller's stream.
// The generator will be scheduled again when all of the
// elements of the added stream have been consumed.
@pragma("vm:entry-point", "call")
void addStream(Stream<T> stream) {
if (!onListenReceived) _fatal("yield before stream is listened to");
if (exitAfterYieldStarIfCancelled()) return;
isAdding = true;
final whenDoneAdding = controller.addStream(stream, cancelOnError: false);
whenDoneAdding.then((_) {
isAdding = false;
if (exitAfterYieldStarIfCancelled()) return;
resumeNormallyAfterYieldStar();
});
}
/// Schedules the generator to exit after `yield*` if stream was cancelled.
///
/// Returns `true` if generator is told to exit and `false` otherwise.
bool exitAfterYieldStarIfCancelled() {
// If consumer cancelled subscription we should tell async* generator to
// finish (i.e. run finally clauses and return).
if (!controller.hasListener) {
continuationArgument = true;
scheduleGenerator();
return true;
}
return false;
}
/// Schedules the generator to resume normally after `yield*`.
void resumeNormallyAfterYieldStar() {
continuationArgument = false;
scheduleGenerator();
if (!isScheduled) isSuspendedAtYield = true;
}
void addError(Object error, StackTrace stackTrace) {
// TODO(40614): Remove once non-nullability is sound.
ArgumentError.checkNotNull(error, "error");
final future = cancellationFuture;
if ((future != null) && future._mayComplete) {
// If the stream has been cancelled, complete the cancellation future
// with the error.
future._completeError(error, stackTrace);
return;
}
// If stream is cancelled, tell caller to exit the async generator.
if (!controller.hasListener) return;
controller.addError(error, stackTrace);
// No need to schedule the generator body here. This code is only
// called from the catch clause of the implicit try-catch-finally
// around the generator body. That is, we are on the error path out
// of the generator and do not need to run the generator again.
}
close() {
final future = cancellationFuture;
if ((future != null) && future._mayComplete) {
// If the stream has been cancelled, complete the cancellation future
// with the error.
future._completeWithValue(null);
}
controller.close();
}
_AsyncStarStreamController(this.asyncStarBody)
: controller = new StreamController(sync: true) {
controller.onListen = this.onListen;
controller.onResume = this.onResume;
controller.onCancel = this.onCancel;
}
onListen() {
assert(!onListenReceived);
onListenReceived = true;
scheduleGenerator();
}
onResume() {
if (isSuspendedAtYield) {
scheduleGenerator();
}
}
onCancel() {
if (controller.isClosed) {
return null;
}
if (cancellationFuture == null) {
cancellationFuture = new _Future();
// Only resume the generator if it is suspended at a yield.
// Cancellation does not affect an async generator that is
// suspended at an await.
if (isSuspendedAtYield) {
scheduleGenerator();
}
}
return cancellationFuture;
}
}
@patch
class _StreamImpl<T> {
/// The closure implementing the async-generator body that is creating events
/// for this stream.
Function? _generator;
}
@pragma("vm:entry-point", "call")
void _completeOnAsyncReturn(_Future _future, Object? value, bool is_sync) {
// The first awaited expression is invoked sync. so complete is async. to
// allow then and error handlers to be attached.
// async_jump_var=0 is prior to first await, =1 is first await.
if (!is_sync || value is Future) {
_future._asyncCompleteUnchecked(value);
} else {
_future._completeWithValue(value);
}
}
@pragma("vm:entry-point", "call")
void _completeWithNoFutureOnAsyncReturn(
_Future _future, Object? value, bool is_sync) {
// The first awaited expression is invoked sync. so complete is async. to
// allow then and error handlers to be attached.
// async_jump_var=0 is prior to first await, =1 is first await.
if (!is_sync) {
_future._asyncCompleteUncheckedNoFuture(value);
} else {
_future._completeWithValue(value);
}
}
@pragma("vm:entry-point", "call")
void _completeOnAsyncError(
_Future _future, Object e, StackTrace st, bool is_sync) {
if (!is_sync) {
_future._asyncCompleteError(e, st);
} else {
_future._completeError(e, st);
}
}
@pragma("vm:external-name", "AsyncStarMoveNext_debuggerStepCheck")
external void _moveNextDebuggerStepCheck(Function async_op);
@pragma("vm:entry-point")
class _SuspendState {
static const bool _trace = false;
@pragma("vm:entry-point", "call")
@pragma("vm:invisible")
static Object? _initAsync<T>() {
if (_trace) print('_initAsync<$T>');
return _Future<T>();
}
@pragma("vm:invisible")
@pragma("vm:recognized", "other")
void _createAsyncCallbacks() {
if (_trace) print('_createAsyncCallbacks');
@pragma("vm:invisible")
thenCallback(value) {
if (_trace) print('thenCallback (this=$this, value=$value)');
_resume(value, null, null);
}
@pragma("vm:invisible")
errorCallback(Object exception, StackTrace stackTrace) {
if (_trace) {
print('errorCallback (this=$this, '
'exception=$exception, stackTrace=$stackTrace)');
}
_resume(null, exception, stackTrace);
}
final currentZone = Zone._current;
if (identical(currentZone, _rootZone) ||
identical(currentZone._registerUnaryCallback,
_rootZone._registerUnaryCallback)) {
_thenCallback = thenCallback;
} else {
_thenCallback =
currentZone.registerUnaryCallback<dynamic, dynamic>(thenCallback);
}
if (identical(currentZone, _rootZone) ||
identical(currentZone._registerBinaryCallback,
_rootZone._registerBinaryCallback)) {
_errorCallback = errorCallback;
} else {
_errorCallback = currentZone
.registerBinaryCallback<dynamic, Object, StackTrace>(errorCallback);
}
}
@pragma("vm:invisible")
@pragma("vm:prefer-inline")
void _awaitCompletedFuture(_Future future) {
assert(future._isComplete);
final zone = Zone._current;
if (future._hasError) {
@pragma("vm:invisible")
void run() {
final AsyncError asyncError =
unsafeCast<AsyncError>(future._resultOrListeners);
if (!future._zone.inSameErrorZone(zone)) {
// Don't cross zone boundaries with errors.
future._zone
.handleUncaughtError(asyncError.error, asyncError.stackTrace);
} else {
zone.runBinary(
unsafeCast<dynamic Function(Object, StackTrace)>(_errorCallback),
asyncError.error,
asyncError.stackTrace);
}
}
future._zone.scheduleMicrotask(run);
} else {
@pragma("vm:invisible")
void run() {
zone.runUnary(unsafeCast<dynamic Function(dynamic)>(_thenCallback),
future._resultOrListeners);
}
future._zone.scheduleMicrotask(run);
}
}
@pragma("vm:invisible")
@pragma("vm:prefer-inline")
void _awaitNotFuture(Object? object) {
final zone = Zone._current;
@pragma("vm:invisible")
void run() {
zone.runUnary(
unsafeCast<dynamic Function(dynamic)>(_thenCallback), object);
}
zone.scheduleMicrotask(run);
}
@pragma("vm:invisible")
@pragma("vm:prefer-inline")
void _awaitUserDefinedFuture(Future future) {
// Create a generic callback closure and instantiate it
// using the type argument of Future.
// This is needed to avoid unsoundness which may happen if user-defined
// Future.then casts callback to Function(dynamic) passes a value of
// incorrect type.
@pragma("vm:invisible")
dynamic typedCallback<T>(T value) {
return unsafeCast<dynamic Function(dynamic)>(_thenCallback)(value);
}
future.then(
unsafeCast<dynamic Function(dynamic)>(
_instantiateClosureWithFutureTypeArgument(typedCallback, future)),
onError:
unsafeCast<dynamic Function(Object, StackTrace)>(_errorCallback));
}
@pragma("vm:entry-point", "call")
@pragma("vm:invisible")
Object? _await(Object? object) {
if (_trace) print('_await (object=$object)');
if (_thenCallback == null) {
_createAsyncCallbacks();
}
if (object is _Future) {
if (object._isComplete) {
_awaitCompletedFuture(object);
} else {
object._thenAwait<dynamic>(
unsafeCast<dynamic Function(dynamic)>(_thenCallback),
unsafeCast<dynamic Function(Object, StackTrace)>(_errorCallback));
}
} else if (object is! Future) {
_awaitNotFuture(object);
} else {
_awaitUserDefinedFuture(object);
}
return _functionData;
}
@pragma("vm:entry-point", "call")
@pragma("vm:invisible")
static Future _returnAsync(Object suspendState, Object? returnValue) {
if (_trace) {
print('_returnAsync (suspendState=$suspendState, '
'returnValue=$returnValue)');
}
_Future future;
if (suspendState is _SuspendState) {
future = unsafeCast<_Future>(suspendState._functionData);
} else {
future = unsafeCast<_Future>(suspendState);
}
if (returnValue is Future) {
future._asyncCompleteUnchecked(returnValue);
} else {
future._completeWithValue(returnValue);
}
return future;
}
@pragma("vm:entry-point", "call")
@pragma("vm:invisible")
static Future _returnAsyncNotFuture(
Object suspendState, Object? returnValue) {
if (_trace) {
print('_returnAsyncNotFuture (suspendState=$suspendState, '
'returnValue=$returnValue)');
}
_Future future;
if (suspendState is _SuspendState) {
future = unsafeCast<_Future>(suspendState._functionData);
} else {
future = unsafeCast<_Future>(suspendState);
}
future._completeWithValue(returnValue);
return future;
}
@pragma("vm:entry-point", "call")
@pragma("vm:invisible")
static Object? _initAsyncStar<T>() {
if (_trace) print('_initAsyncStar<$T>');
return _AsyncStarStreamController<T>(null);
}
@pragma("vm:invisible")
@pragma("vm:recognized", "other")
_createAsyncStarCallback(_AsyncStarStreamController controller) {
controller.asyncStarBody = (value, _) {
if (_trace) print('asyncStarBody callback (value=$value)');
_resume(value, null, null);
};
}
@pragma("vm:entry-point", "call")
@pragma("vm:invisible")
Object? _yieldAsyncStar(Object? object) {
final controller = unsafeCast<_AsyncStarStreamController>(_functionData);
if (controller.asyncStarBody == null) {
_createAsyncStarCallback(controller);
return controller.stream;
}
return null;
}
@pragma("vm:entry-point", "call")
@pragma("vm:invisible")
static void _returnAsyncStar(Object suspendState, Object? returnValue) {
if (_trace) {
print('_returnAsyncStar (suspendState=$suspendState, '
'returnValue=$returnValue)');
}
final controller = unsafeCast<_AsyncStarStreamController>(
unsafeCast<_SuspendState>(suspendState)._functionData);
controller.close();
}
@pragma("vm:entry-point", "call")
@pragma("vm:invisible")
static Object? _handleException(
Object suspendState, Object exception, StackTrace stackTrace) {
if (_trace) {
print('_handleException (suspendState=$suspendState, '
'exception=$exception, stackTrace=$stackTrace)');
}
Object? functionData;
bool isSync = true;
if (suspendState is _SuspendState) {
functionData = suspendState._functionData;
} else {
functionData = suspendState;
isSync = false;
}
if (functionData is _Future) {
// async function.
_completeOnAsyncError(functionData, exception, stackTrace, isSync);
} else if (functionData is _AsyncStarStreamController) {
// async* function.
functionData.addError(exception, stackTrace);
functionData.close();
} else {
throw 'Unexpected function data ${functionData.runtimeType} $functionData';
}
return functionData;
}
@pragma("vm:entry-point", "call")
@pragma("vm:invisible")
static Object? _initSyncStar<T>() {
if (_trace) print('_initSyncStar<$T>');
return _SyncStarIterable<T>();
}
@pragma("vm:entry-point", "call")
@pragma("vm:invisible")
Object? _yieldSyncStar(Object? object) {
if (_trace) print('_yieldSyncStar($object)');
final data = _functionData;
if (data is _SyncStarIterable) {
data._stateAtStart = this;
return data;
} else {
// Update state in the iterator in case SuspendState was reallocated.
unsafeCast<_SyncStarIterator>(data)._state = this;
}
return true;
}
@pragma("vm:entry-point", "call")
@pragma("vm:invisible")
static bool _returnSyncStar(Object suspendState, Object? returnValue) {
if (_trace) print('_returnSyncStar');
return false;
}
@pragma("vm:recognized", "other")
@pragma("vm:prefer-inline")
external set _functionData(Object value);
@pragma("vm:recognized", "other")
@pragma("vm:prefer-inline")
external Object get _functionData;
@pragma("vm:recognized", "other")
@pragma("vm:prefer-inline")
external set _thenCallback(void Function(dynamic)? value);
@pragma("vm:recognized", "other")
@pragma("vm:prefer-inline")
external void Function(dynamic)? get _thenCallback;
@pragma("vm:recognized", "other")
@pragma("vm:prefer-inline")
external set _errorCallback(dynamic Function(Object, StackTrace)? value);
@pragma("vm:recognized", "other")
@pragma("vm:prefer-inline")
external dynamic Function(Object, StackTrace)? get _errorCallback;
@pragma("vm:recognized", "other")
@pragma("vm:never-inline")
external Object? _resume(
Object? value, Object? exception, StackTrace? stackTrace);
@pragma("vm:recognized", "other")
@pragma("vm:prefer-inline")
external _SuspendState _clone();
@pragma("vm:external-name",
"SuspendState_instantiateClosureWithFutureTypeArgument")
external static Function _instantiateClosureWithFutureTypeArgument(
dynamic Function<T>(T) closure, Future future);
}
class _SyncStarIterable<T> extends Iterable<T> {
_SuspendState? _stateAtStart;
_SyncStarIterable();
Iterator<T> get iterator {
return _SyncStarIterator<T>(_stateAtStart!._clone());
}
}
class _SyncStarIterator<T> implements Iterator<T> {
_SuspendState? _state;
Iterator<T>? _yieldStarIterator;
// Stack of suspended sync* methods.
List<_SuspendState>? _stack;
// sync* method sets either _yieldStarIterable
// or _current before suspending.
@pragma("vm:entry-point")
T? _current;
@pragma("vm:entry-point")
Iterable<T>? _yieldStarIterable;
@override
T get current => _current as T;
_SyncStarIterator(_SuspendState state) : _state = state {
state._functionData = this;
}
@pragma('vm:prefer-inline')
bool _handleSyncStarMethodCompletion() {
_current = null;
_state = null;
final stack = _stack;
if (stack != null && stack.isNotEmpty) {
_state = stack.removeLast();
return true;
}
return false;
}
@override
bool moveNext() {
if (_state == null) {
return false;
}
Object? pendingException;
StackTrace? pendingStackTrace;
while (true) {
// First delegate to an active nested iterator (if any).
final iterator = _yieldStarIterator;
if (iterator != null) {
try {
if (iterator.moveNext()) {
_current = iterator.current;
return true;
}
} catch (exception, stackTrace) {
pendingException = exception;
pendingStackTrace = stackTrace;
}
_yieldStarIterator = null;
}
try {
// Resume current sync* method in order to move to the next value.
final bool hasMore =
_state!._resume(null, pendingException, pendingStackTrace) as bool;
pendingException = null;
pendingStackTrace = null;
if (!hasMore) {
if (_handleSyncStarMethodCompletion()) {
continue;
}
return false;
}
} catch (exception, stackTrace) {
pendingException = exception;
pendingStackTrace = stackTrace;
if (_handleSyncStarMethodCompletion()) {
continue;
}
rethrow;
}
// Case: yield* some_iterator.
final iterable = _yieldStarIterable;
if (iterable != null) {
if (iterable is _SyncStarIterable) {
// We got a recursive yield* of sync* function. Instead of creating
// a new iterator we replace our current _state (remembering the
// current _state for later resumption).
final stack = (_stack ??= []);
stack.add(_state!);
final nestedState =
unsafeCast<_SyncStarIterable>(iterable)._stateAtStart!._clone();
nestedState._functionData = this;
_state = nestedState;
} else {
_yieldStarIterator = iterable.iterator;
}
_yieldStarIterable = null;
_current = null;
// Fetch the next item.
continue;
}
return true;
}
}
}