| // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| // Patch file for the dart:async library. |
| |
| import 'dart:_async_status_codes' as async_status_codes; |
| import 'dart:_js_helper' show notNull, ReifyFunctionTypes; |
| import 'dart:_internal' show patch, unsafeCast; |
| import 'dart:_isolate_helper' show TimerImpl; |
| import 'dart:_foreign_helper' |
| show JS, JS_RAW_EXCEPTION, JSExportName, RAW_DART_FUNCTION_REF; |
| import 'dart:_runtime' as dart; |
| |
| @patch |
| void _trySetStackTrace(Object error, StackTrace stackTrace) { |
| if (error is Error) { |
| dart.trySetStackTrace(error, stackTrace); |
| } |
| } |
| |
| /// This function adapts ES6 generators to implement Dart's async/await. |
| /// |
| /// It's designed to interact with Dart's Future and follow Dart async/await |
| /// semantics. |
| /// |
| /// See https://github.com/dart-lang/sdk/issues/27315 for ideas on reconciling |
| /// Dart's Future and ES6 Promise. At that point we should use native JS |
| /// async/await. |
| /// |
| /// Inspired by `co`: https://github.com/tj/co/blob/master/index.js, which is a |
| /// stepping stone for ES async/await. |
| @JSExportName('async') |
| _async<T>(Function() initGenerator) { |
| var iter; |
| late Object? Function(Object?) onValue; |
| late Object Function(Object, StackTrace?) onError; |
| |
| onAwait(Object? value) { |
| _Future<Object?> f; |
| if (value is _Future) { |
| f = value; |
| } else if (value is Future) { |
| f = _Future(); |
| f._chainForeignFuture(value); |
| } else { |
| f = _Future.value(value); |
| } |
| f = JS('', '#', f._thenAwait(onValue, onError)); |
| return f; |
| } |
| |
| onValue = (value) { |
| var iteratorResult = JS('', '#.next(#)', iter, value); |
| value = JS('', '#.value', iteratorResult); |
| return JS<bool>('!', '#.done', iteratorResult) ? value : onAwait(value); |
| }; |
| |
| // If the awaited Future throws, we want to convert this to an exception |
| // thrown from the `yield` point, as if it was thrown there. |
| // |
| // If the exception is not caught inside `gen`, it will emerge here, which |
| // will send it to anyone listening on this async function's Future<T>. |
| // |
| // In essence, we are giving the code inside the generator a chance to |
| // use try-catch-finally. |
| onError = (value, stackTrace) { |
| var iteratorResult = JS( |
| '', |
| '#.throw(#)', |
| iter, |
| dart.createErrorWithStack(value, stackTrace), |
| ); |
| value = JS('', '#.value', iteratorResult); |
| return JS<bool>('!', '#.done', iteratorResult) ? value : onAwait(value); |
| }; |
| |
| var zone = Zone.current; |
| if (!identical(zone, _rootZone)) { |
| onValue = zone.registerUnaryCallback(onValue); |
| onError = zone.registerBinaryCallback(onError); |
| } |
| |
| var asyncFuture = _Future<T>(); |
| |
| // This will be set to true once we've yielded to the event loop. |
| // |
| // Before we've done that, we need to complete the future asynchronously to |
| // match dart2js/VM. See https://github.com/dart-lang/sdk/issues/33330 |
| // |
| // Once we've yielded to the event loop we can complete synchronously. |
| // Other implementations call this `isSync` to indicate that. |
| bool isRunningAsEvent = false; |
| runBody() { |
| try { |
| iter = JS('', '#[Symbol.iterator]()', initGenerator()); |
| var iteratorValue = JS('', '#.next(null)', iter); |
| var value = JS('', '#.value', iteratorValue); |
| if (JS<bool>('!', '#.done', iteratorValue)) { |
| // TODO(jmesserly): this is a workaround for ignored cast failures. |
| // Remove it once we've fixed those. We should be able to call: |
| // |
| // if (isRunningAsEvent) { |
| // asyncFuture._complete(value); |
| // } else { |
| // asyncFuture._asyncComplete(value); |
| // } |
| // |
| // But if the user code returns `Future<dynamic>` instead of |
| // `Future<T>`, that function won't recognize it as a future and will |
| // instead treat it as a completed value. |
| if (value is Future) { |
| if (value is _Future) { |
| _Future._chainCoreFuture(value, asyncFuture, isRunningAsEvent); |
| } else { |
| asyncFuture._chainForeignFuture(value); |
| } |
| } else if (isRunningAsEvent) { |
| asyncFuture._completeWithValue(JS('', '#', value)); |
| } else { |
| asyncFuture._asyncComplete(JS('', '#', value)); |
| } |
| } else { |
| _Future._chainCoreFuture(onAwait(value), asyncFuture, true); |
| } |
| } catch (e, s) { |
| if (isRunningAsEvent) { |
| _completeWithErrorCallback(asyncFuture, e, s); |
| } else { |
| asyncFuture._asyncCompleteErrorObject(_interceptCaughtError(e, s)); |
| } |
| } |
| } |
| |
| if (dart.startAsyncSynchronously) { |
| runBody(); |
| isRunningAsEvent = true; |
| } else { |
| isRunningAsEvent = true; |
| scheduleMicrotask(runBody); |
| } |
| return asyncFuture; |
| } |
| |
| /// Checks that the value being awaited is a Future of the expected type and |
| /// if not the value is wrapped in a new Future. |
| /// |
| /// Calls to the method are generated from the compiler when it detects a type |
| /// check is required on the expression in an await. |
| /// |
| /// Closes a soundness hole where null could leak from an awaited Future. |
| @JSExportName('awaitWithTypeCheck') |
| _awaitWithTypeCheck<T>(Object? value) => |
| value is T ? value : _Future.value(value); |
| |
| @patch |
| class _AsyncRun { |
| @patch |
| static void _scheduleImmediate(void Function() callback) { |
| _scheduleImmediateClosure(callback); |
| } |
| |
| // Lazily initialized. |
| static final _scheduleImmediateClosure = _initializeScheduleImmediate(); |
| |
| static void Function(void Function()) _initializeScheduleImmediate() { |
| // d8 support, see preambles/d8.js for the definition of `scheduleImmediate`. |
| // |
| // TODO(jmesserly): do we need this? It's only for our d8 stack trace test. |
| if (JS('', '#.scheduleImmediate', dart.global_) != null) { |
| return _scheduleImmediateJSOverride; |
| } |
| return _scheduleImmediateWithPromise; |
| } |
| |
| @ReifyFunctionTypes(false) |
| static void _scheduleImmediateJSOverride(void Function() callback) { |
| JS('void', '#.scheduleImmediate(#)', dart.global_, () { |
| callback(); |
| }); |
| } |
| |
| @ReifyFunctionTypes(false) |
| static void _scheduleImmediateWithPromise(void Function() callback) { |
| JS('', '#.Promise.resolve(null).then(#)', dart.global_, () { |
| callback(); |
| }); |
| } |
| } |
| |
| @patch |
| class Timer { |
| @patch |
| static Timer _createTimer(Duration duration, void Function() callback) { |
| int milliseconds = duration.inMilliseconds; |
| if (milliseconds < 0) milliseconds = 0; |
| return TimerImpl(milliseconds, callback); |
| } |
| |
| @patch |
| static Timer _createPeriodicTimer( |
| Duration duration, |
| void callback(Timer timer), |
| ) { |
| int milliseconds = duration.inMilliseconds; |
| if (milliseconds < 0) milliseconds = 0; |
| return TimerImpl.periodic(milliseconds, callback); |
| } |
| } |
| |
| /// Used by the compiler to implement `async*` functions. |
| /// |
| /// This is inspired by _AsyncStarStreamController in dart-lang/sdk's |
| /// runtime/lib/core_patch.dart |
| /// |
| /// Given input like: |
| /// |
| /// foo() async* { |
| /// yield 1; |
| /// yield* bar(); |
| /// print(await baz()); |
| /// } |
| /// |
| /// This compiles to: |
| /// |
| /// function foo() { |
| /// return new (AsyncStarImplOfT()).new(function*(stream) { |
| /// if (stream.add(1)) return; |
| /// yield; |
| /// if (stream.addStream(bar()) return; |
| /// yield; |
| /// print(yield baz()); |
| /// }); |
| /// } |
| /// |
| class _AsyncStarImpl<T> { |
| late StreamController<T> controller; |
| Object Function(_AsyncStarImpl<T>) initGenerator; |
| @notNull |
| bool isSuspendedAtYieldStar = false; |
| @notNull |
| bool onListenReceived = false; |
| @notNull |
| bool isScheduled = false; |
| @notNull |
| bool isSuspendedAtYield = false; |
| |
| /// Whether we're suspended at an `await`. |
| @notNull |
| bool isSuspendedAtAwait = false; |
| |
| Completer? cancellationCompleter; |
| late Object jsIterator; |
| |
| Null Function(Object, StackTrace)? _handleErrorCallback; |
| void Function([Object?])? _runBodyCallback; |
| |
| _AsyncStarImpl(this.initGenerator) { |
| controller = StreamController( |
| onListen: JS('!', 'this.onListen.bind(this)'), |
| onResume: JS('!', 'this.onResume.bind(this)'), |
| onCancel: JS('!', 'this.onCancel.bind(this)'), |
| ); |
| jsIterator = JS('!', '#[Symbol.iterator]()', initGenerator(this)); |
| } |
| |
| /// The stream produced by this `async*` function. |
| Stream<T> get stream => controller.stream; |
| |
| /// Returns the callback used for error handling. |
| /// |
| /// This callback throws the error back into the user code, at the appropriate |
| /// location (e.g. `await` `yield` or `yield*`). This gives user code a chance |
| /// to handle it try-catch. If they do not handle, the error gets routed to |
| /// the [stream] as an error via [addError]. |
| /// |
| /// As a performance optimization, this callback is only bound once to the |
| /// current [Zone]. This works because a single subscription stream should |
| /// always be running in its original zone. An `async*` method will always |
| /// save/restore the zone that was active when `listen()` was first called, |
| /// similar to a stream. This follows from section 16.14 of the Dart 4th |
| /// edition spec: |
| /// |
| /// > If `f` is marked `async*` (9), then a fresh instance `s` implementing |
| /// > the built-in class `Stream` is associated with the invocation and |
| /// > immediately returned. When `s` is listened to, execution of the body of |
| /// > `f` will begin. |
| /// |
| Null Function(Object, StackTrace) get handleError { |
| if (_handleErrorCallback == null) { |
| _handleErrorCallback = (error, StackTrace stackTrace) { |
| try { |
| JS( |
| '', |
| '#.throw(#)', |
| jsIterator, |
| dart.createErrorWithStack(error, stackTrace), |
| ); |
| } catch (e, newStack) { |
| // The generator didn't catch the error, or it threw a new one. |
| // Make sure to propagate the new error. |
| addError(e, newStack); |
| } |
| }; |
| var zone = Zone.current; |
| if (!identical(zone, Zone.root)) { |
| _handleErrorCallback = zone.bindBinaryCallback(_handleErrorCallback!); |
| } |
| } |
| return _handleErrorCallback!; |
| } |
| |
| void scheduleGenerator() { |
| // TODO(jmesserly): is this isPaused check in the right place? Assuming the |
| // async* Stream yields, then is paused (by other code), the body will |
| // already be scheduled. This will cause at least one more iteration to |
| // run (adding another data item to the Stream) before actually pausing. |
| // It could be fixed by moving the `isPaused` check inside `runBody`. |
| if (isScheduled || controller.isPaused || isSuspendedAtYieldStar) { |
| return; |
| } |
| isScheduled = true; |
| // Capture the current zone. See comment on [handleError] for more |
| // information about this optimization. |
| var zone = Zone.current; |
| if (_runBodyCallback == null) { |
| _runBodyCallback = JS('!', '#.bind(this)', runBody); |
| if (!identical(zone, Zone.root)) { |
| var registered = zone.registerUnaryCallback(_runBodyCallback!); |
| _runBodyCallback = ([arg]) => zone.runUnaryGuarded(registered, arg); |
| } |
| } |
| zone.scheduleMicrotask(_runBodyCallback!); |
| } |
| |
| void runBody(awaitValue) { |
| isScheduled = false; |
| isSuspendedAtYield = false; |
| isSuspendedAtAwait = false; |
| |
| Object iterResult; |
| try { |
| iterResult = JS('', '#.next(#)', jsIterator, awaitValue); |
| } catch (e, s) { |
| addError(e, s); |
| return; |
| } |
| |
| if (JS('!', '#.done', iterResult)) { |
| close(); |
| return; |
| } |
| |
| // If we're suspended at a yield/yield*, we're done for now. |
| if (isSuspendedAtYield || isSuspendedAtYieldStar) return; |
| |
| // Handle `await`: if we get a value passed to `yield` it means we are |
| // waiting on this Future. Make sure to prevent scheduling, and pass the |
| // value back as the result of the `yield`. |
| // |
| // TODO(jmesserly): is the timing here correct? The assumption here is |
| // that we should schedule `await` in `async*` the same as in `async`. |
| isSuspendedAtAwait = true; |
| FutureOr<Object?> value = JS('', '#.value', iterResult); |
| |
| // TODO(jmesserly): this logic was copied from `async` function impl. |
| _Future<Object?> f; |
| if (value is _Future) { |
| f = value; |
| } else if (value is Future) { |
| f = _Future(); |
| f._chainForeignFuture(value); |
| } else { |
| f = _Future.value(value); |
| } |
| f._thenAwait(_runBodyCallback!, handleError); |
| } |
| |
| /// Adds element to [stream] and returns true if the caller should terminate |
| /// execution of the generator. |
| /// |
| /// This is called from generated code like this: |
| /// |
| /// if (controller.add(1)) return; |
| /// yield; |
| // |
| // TODO(hausner): Per spec, the generator should be suspended before exiting |
| // when the stream is closed. We could add a getter like this: |
| // |
| // get isCancelled => controller.hasListener; |
| // |
| // The generator would translate a 'yield e' statement to |
| // |
| // controller.add(1); |
| // suspend; // this is `yield` in JS. |
| // if (controller.isCancelled) return; |
| bool add(T event) { |
| if (!onListenReceived) _fatal("yield before stream is listened to"); |
| if (isSuspendedAtYield) _fatal("unexpected yield"); |
| // If stream is cancelled, tell caller to exit the async generator. |
| if (!controller.hasListener) { |
| return true; |
| } |
| controller.add(event); |
| scheduleGenerator(); |
| isSuspendedAtYield = true; |
| return false; |
| } |
| |
| /// Adds the elements of [stream] into this [controller]'s stream, and returns |
| /// true if the caller should terminate execution of the generator. |
| /// |
| /// The generator will be scheduled again when all of the elements of the |
| /// added stream have been consumed. |
| bool addStream(Stream<T> stream) { |
| if (!onListenReceived) _fatal("yield* before stream is listened to"); |
| // If stream is cancelled, tell caller to exit the async generator. |
| if (!controller.hasListener) return true; |
| isSuspendedAtYieldStar = true; |
| var whenDoneAdding = controller.addStream(stream, cancelOnError: false); |
| whenDoneAdding.then((_) { |
| isSuspendedAtYieldStar = false; |
| scheduleGenerator(); |
| if (!isScheduled) isSuspendedAtYield = true; |
| }, onError: handleError); |
| return false; |
| } |
| |
| void addError(Object error, StackTrace stackTrace) { |
| ArgumentError.checkNotNull(error, "error"); |
| var completer = cancellationCompleter; |
| if (completer != null && !completer.isCompleted) { |
| // If the stream has been cancelled, complete the cancellation future |
| // with the error. |
| completer.completeError(error, stackTrace); |
| } else if (controller.hasListener) { |
| controller.addError(error, stackTrace); |
| } |
| // No need to schedule the generator body here. This code is only |
| // called from the catch clause of the implicit try-catch-finally |
| // around the generator body. That is, we are on the error path out |
| // of the generator and do not need to run the generator again. |
| close(); |
| } |
| |
| void close() { |
| var completer = cancellationCompleter; |
| if (completer != null && !completer.isCompleted) { |
| // If the stream has been cancelled, complete the cancellation future |
| // with the error. |
| completer.complete(); |
| } |
| controller.close(); |
| } |
| |
| onListen() { |
| assert(!onListenReceived); |
| onListenReceived = true; |
| scheduleGenerator(); |
| } |
| |
| onResume() { |
| if (isSuspendedAtYield) { |
| scheduleGenerator(); |
| } |
| } |
| |
| onCancel() { |
| if (controller.isClosed) { |
| return null; |
| } |
| if (cancellationCompleter == null) { |
| cancellationCompleter = Completer(); |
| // Only resume the generator if it is suspended at a yield. |
| // Cancellation does not affect an async generator that is |
| // suspended at an await. |
| if (isSuspendedAtYield) { |
| scheduleGenerator(); |
| } |
| } |
| return cancellationCompleter!.future; |
| } |
| |
| _fatal(String message) => throw StateError(message); |
| } |
| |
| class _AsyncAwaitCompleter<T> implements Completer<T> { |
| final _future = _Future<T>(); |
| bool isSync; |
| int hotRestartIteration = dart.hotRestartIteration; |
| |
| _AsyncAwaitCompleter() : isSync = false; |
| |
| void complete([FutureOr<T>? value]) { |
| // All paths require that if value is null, null as T succeeds. |
| value = (value == null) ? value as T : value; |
| if (!isSync) { |
| _future._asyncComplete(value); |
| } else if (value is Future<T>) { |
| assert(!_future._isComplete); |
| _future._chainFuture(value); |
| } else { |
| _future._completeWithValue(value); |
| } |
| } |
| |
| void completeError(Object e, [StackTrace? st]) { |
| st ??= AsyncError.defaultStackTrace(e); |
| if (isSync) { |
| _future._completeError(e, st); |
| } else { |
| _future._asyncCompleteError(e, st); |
| } |
| } |
| |
| Future<T> get future => _future; |
| bool get isCompleted => !_future._mayComplete; |
| } |
| |
| /// Creates a Completer for an `async` function. |
| /// |
| /// Used as part of the runtime support for the async/await transformation. All |
| /// calls to this method are generated by the compiler. |
| Completer<T> _makeAsyncAwaitCompleter<T>() { |
| return _AsyncAwaitCompleter<T>(); |
| } |
| |
| /// Initiates the computation of an `async` function and starts the body |
| /// synchronously. |
| /// |
| /// Used as part of the runtime support for the async/await transformation. All |
| /// calls to this method are generated by the compiler. |
| /// |
| /// This function sets up the first call into the transformed [bodyFunction]. |
| /// Independently, it takes the [completer] and returns the future of the |
| /// completer for convenience of the transformed code. |
| Future _asyncStartSync( |
| _WrappedAsyncBody bodyFunction, |
| _AsyncAwaitCompleter completer, |
| ) { |
| bodyFunction(async_status_codes.SUCCESS, null); |
| completer.isSync = true; |
| return completer.future; |
| } |
| |
| /// Performs the `await` operation of an `async` function. |
| /// |
| /// Used as part of the runtime support for the async/await transformation. All |
| /// calls to this method are generated by the compiler. |
| /// |
| /// Arranges for [bodyFunction] to be called when the future or value [object] |
| /// is completed with a code [async_status_codes.SUCCESS] or |
| /// [async_status_codes.ERROR] depending on the success of the future. |
| void _asyncAwait( |
| Object? object, |
| _WrappedAsyncBody bodyFunction, |
| _AsyncAwaitCompleter completer, |
| ) { |
| _awaitOnObject(object, bodyFunction); |
| } |
| |
| /// Completes the future of an `async` function. |
| /// |
| /// Used as part of the runtime support for the async/await transformation. All |
| /// calls to this method are generated by the compiler. |
| /// |
| /// This function is used when the `async` function returns (explicitly or |
| /// implicitly). |
| void _asyncReturn(Object? object, _AsyncAwaitCompleter completer) { |
| completer.complete(object); |
| } |
| |
| /// Completes the future of an `async` function with an error. |
| /// |
| /// Used as part of the runtime support for the async/await transformation. All |
| /// calls to this method are generated by the compiler. |
| /// |
| /// This function is used when the `async` function re-throws an exception. |
| void _asyncRethrow(Object? object, _AsyncAwaitCompleter completer) { |
| // The error is a js-error. |
| completer.completeError(dart.getThrown(object)!, dart.stackTrace(object)); |
| } |
| |
| /// Awaits on the given [object]. |
| /// |
| /// If the [object] is a Future, registers on it, otherwise wraps it into a |
| /// future first. |
| /// |
| /// The [bodyFunction] argument is the continuation that should be invoked |
| /// when the future completes. |
| void _awaitOnObject(Object? object, _WrappedAsyncBody bodyFunction) { |
| FutureOr<dynamic> Function(dynamic) thenCallback = |
| (result) => bodyFunction(async_status_codes.SUCCESS, result); |
| |
| Function errorCallback = (dynamic error, StackTrace stackTrace) { |
| final wrappedException = dart.createErrorWithStack(error, stackTrace); |
| bodyFunction(async_status_codes.ERROR, wrappedException); |
| }; |
| |
| if (object is _Future) { |
| // We can skip the zone registration, since the bodyFunction is already |
| // registered (see [_wrapJsFunctionForAsync]). |
| object._thenAwait(thenCallback, errorCallback); |
| } else if (object is Future) { |
| object.then(thenCallback, onError: errorCallback); |
| } else { |
| _Future future = _Future().._setValue(object); |
| // We can skip the zone registration, since the bodyFunction is already |
| // registered (see [_wrapJsFunctionForAsync]). |
| future._thenAwait(thenCallback, errorCallback); |
| } |
| } |
| |
| typedef _WrappedAsyncBody = void Function(int errorCode, Object? result); |
| |
| /// Wraps a JS function generated by the compiler with boiler plate to handle |
| /// errors and re-entry logic. |
| /// |
| /// Used as part of the runtime support for the async/await transformation. All |
| /// calls to this method are generated by the compiler. |
| _WrappedAsyncBody _wrapJsFunctionForAsync(dynamic /* js function */ function) { |
| var protected = JS( |
| '', |
| """ |
| (function (fn, ERROR) { |
| // Invokes [function] with [errorCode] and [result]. |
| // |
| // If (and as long as) the invocation throws, calls [function] again, |
| // with an error-code. |
| return function(errorCode, result) { |
| while (true) { |
| try { |
| fn(errorCode, result); |
| break; |
| } catch (error) { |
| result = error; |
| errorCode = ERROR; |
| } |
| } |
| } |
| })(#, #)""", |
| function, |
| async_status_codes.ERROR, |
| ); |
| |
| return Zone.current.registerBinaryCallback((int errorCode, dynamic result) { |
| JS('', '#(#, #)', protected, errorCode, result); |
| }); |
| } |
| |
| /// Implements the runtime support for async* functions. |
| /// |
| /// Called by the transformed function for each original return, await, yield, |
| /// yield* and before starting the function. |
| /// |
| /// When the async* function wants to return it calls this function with |
| /// [bodyFunctionOrErrorCode] == [async_status_codes.SUCCESS], the |
| /// asyncStarHelper takes this as signal to close the stream. |
| /// |
| /// When the async* function wants to signal that an uncaught error was thrown, |
| /// it calls this function with |
| /// [bodyFunctionOrErrorCode] == [async_status_codes.ERROR], the streamHelper |
| /// takes this as signal to addError [object] to the [controller] and close it. |
| /// |
| /// If the async* function wants to do a yield or yield*, it calls this function |
| /// with [object] being an [_IterationMarker]. |
| /// |
| /// In the case of a yield or yield*, if the stream subscription has been |
| /// canceled, schedules [bodyFunctionOrErrorCode] to be called with |
| /// [async_status_codes.STREAM_WAS_CANCELED]. |
| /// |
| /// If [object] is a single-yield [_IterationMarker], adds the value of the |
| /// [_IterationMarker] to the stream. If the stream subscription has been |
| /// paused, return early. Otherwise schedule the helper function to be |
| /// executed again. |
| /// |
| /// If [object] is a yield-star [_IterationMarker], starts listening to the |
| /// yielded stream, and adds all events and errors to our own controller (taking |
| /// care if the subscription has been paused or canceled) - when the sub-stream |
| /// is done, schedules [bodyFunctionOrErrorCode] again. |
| /// |
| /// If the async* function wants to do an await it calls this function with |
| /// [object] not an [_IterationMarker]. |
| /// |
| /// If [object] is not a [Future], it is wrapped in a `Future.value`. |
| /// The [bodyFunctionOrErrorCode] is called on completion of the future |
| /// (see [_awaitOnObject]). |
| /// |
| /// All calls to this method are generated by the compiler. |
| void _asyncStarHelper( |
| Object? object, |
| dynamic /* int | _WrappedAsyncBody */ bodyFunctionOrErrorCode, |
| _AsyncStarStreamController controller, |
| ) { |
| if (identical(bodyFunctionOrErrorCode, async_status_codes.SUCCESS)) { |
| // This happens on return from the async* function. |
| if (controller.isCanceled) { |
| controller.cancelationFuture!._completeWithValue(null); |
| } else { |
| controller.close(); |
| } |
| return; |
| } else if (identical(bodyFunctionOrErrorCode, async_status_codes.ERROR)) { |
| // The error is a js-error. |
| if (controller.isCanceled) { |
| controller.cancelationFuture!._completeError( |
| dart.getThrown(object)!, |
| dart.stackTrace(object), |
| ); |
| } else { |
| controller.addError(dart.getThrown(object)!, dart.stackTrace(object)); |
| controller.close(); |
| } |
| return; |
| } |
| |
| _WrappedAsyncBody bodyFunction = bodyFunctionOrErrorCode; |
| if (object is _IterationMarker) { |
| if (controller.isCanceled) { |
| bodyFunction(async_status_codes.STREAM_WAS_CANCELED, null); |
| return; |
| } |
| if (object.state == _IterationMarker.YIELD_SINGLE) { |
| controller.add(object.value); |
| |
| scheduleMicrotask(() { |
| if (controller.isPaused) { |
| // We only suspend the thread inside the microtask in order to allow |
| // listeners on the output stream to pause in response to the just |
| // output value, and have the stream immediately stop producing. |
| controller.isSuspended = true; |
| return; |
| } |
| bodyFunction( |
| controller.isCanceled |
| ? async_status_codes.STREAM_WAS_CANCELED |
| : async_status_codes.SUCCESS, |
| null, |
| ); |
| }); |
| return; |
| } else if (object.state == _IterationMarker.YIELD_STAR) { |
| Stream stream = object.value; |
| // Errors of [stream] are passed though to the main stream. (see |
| // [AsyncStreamController.addStream]). |
| controller.addStream(stream).then((_) { |
| // No need to check for pause because to get here the stream either |
| // completed normally or was cancelled. The stream cannot be paused |
| // after either of these states. |
| int errorCode = |
| controller.isCanceled |
| ? async_status_codes.STREAM_WAS_CANCELED |
| : async_status_codes.SUCCESS; |
| bodyFunction(errorCode, null); |
| }); |
| return; |
| } |
| } |
| |
| _awaitOnObject(object, bodyFunction); |
| } |
| |
| /// Gets the stream out of an async controller. |
| /// |
| /// Used as part of the runtime support for the async/await transformation. All |
| /// calls to this method are generated by the compiler. |
| Stream _streamOfController(_AsyncStarStreamController controller) { |
| return controller.stream; |
| } |
| |
| /// A wrapper around a [StreamController] that keeps track of the state of |
| /// the execution of an async* function. |
| /// |
| /// It can be in 1 of 3 states: |
| /// |
| /// - running/scheduled |
| /// - suspended |
| /// - canceled |
| /// |
| /// If yielding while the subscription is paused it will become suspended. And |
| /// only resume after the subscription is resumed or canceled. |
| class _AsyncStarStreamController<T> { |
| late StreamController<T> controller; |
| Stream get stream => controller.stream; |
| |
| /// True when the async* function has yielded while being paused. |
| /// |
| /// When true execution will only resume after a `onResume` or `onCancel` |
| /// event. |
| bool isSuspended = false; |
| |
| bool get isPaused => controller.isPaused; |
| |
| _Future? cancelationFuture = null; |
| |
| /// True after the StreamSubscription has been cancelled. |
| /// |
| /// When this is true, errors thrown from the async* body should go to the |
| /// [cancelationFuture] instead of adding them to [controller], and |
| /// returning from the async function should complete [cancelationFuture]. |
| bool get isCanceled => cancelationFuture != null; |
| |
| add(event) => controller.add(event); |
| |
| Future addStream(Stream<T> stream) { |
| return controller.addStream(stream, cancelOnError: false); |
| } |
| |
| addError(error, stackTrace) => controller.addError(error, stackTrace); |
| |
| close() => controller.close(); |
| |
| _AsyncStarStreamController(_WrappedAsyncBody body) { |
| _resumeBody() { |
| scheduleMicrotask(() { |
| body(async_status_codes.SUCCESS, null); |
| }); |
| } |
| |
| controller = StreamController<T>( |
| onListen: () { |
| _resumeBody(); |
| }, |
| onResume: () { |
| // Only schedule again if the async* function actually is suspended. |
| // Resume directly instead of scheduling, so that the sequence |
| // `pause-resume-pause` will result in one extra event produced. |
| if (isSuspended) { |
| isSuspended = false; |
| _resumeBody(); |
| } |
| }, |
| onCancel: () { |
| // If the async* is finished we ignore cancel events. |
| if (!controller.isClosed) { |
| cancelationFuture = _Future(); |
| if (isSuspended) { |
| // Resume the suspended async* function to run finalizers. |
| isSuspended = false; |
| scheduleMicrotask(() { |
| body(async_status_codes.STREAM_WAS_CANCELED, null); |
| }); |
| } |
| return cancelationFuture; |
| } |
| }, |
| ); |
| } |
| } |
| |
| /// Creates a stream controller for an `async*` function. |
| /// |
| /// Used as part of the runtime support for the async/await transformation. |
| _makeAsyncStarStreamController<T>(_WrappedAsyncBody body) { |
| return _AsyncStarStreamController<T>(body); |
| } |
| |
| class _IterationMarker { |
| static const YIELD_SINGLE = 0; |
| static const YIELD_STAR = 1; |
| static const ITERATION_ENDED = 2; |
| static const UNCAUGHT_ERROR = 3; |
| |
| final value; |
| final int state; |
| |
| const _IterationMarker._(this.state, this.value); |
| |
| static yieldStar(Object /* Iterable or Stream */ values) { |
| return _IterationMarker._(YIELD_STAR, values); |
| } |
| |
| static endOfIteration() { |
| return const _IterationMarker._(ITERATION_ENDED, null); |
| } |
| |
| static yieldSingle(Object value) { |
| return _IterationMarker._(YIELD_SINGLE, value); |
| } |
| |
| static uncaughtError(Object error) { |
| return _IterationMarker._(UNCAUGHT_ERROR, error); |
| } |
| |
| toString() => "IterationMarker($state, $value)"; |
| } |
| |
| /// _SyncStarIterator handles stepping a sync* generator body state machine. |
| /// |
| /// It also handles the stepping over 'nested' iterators to flatten yield* |
| /// statements. For non-sync* iterators, [_nestedIterator] contains the |
| /// iterator. We delegate to [_nestedIterator] when it is not `null`. |
| /// |
| /// For nested sync* iterators, this [Iterator] acts on behalf of the innermost |
| /// nested sync* iterator. The current state machine is suspended on a stack |
| /// until the inner state machine ends. |
| class _SyncStarIterator<T> implements Iterator<T> { |
| /// The state machine for the innermost _SyncStarIterator. |
| Object? _body; |
| |
| /// The current value, unless iterating a non-sync* nested iterator. |
| T? _current = null; |
| |
| /// Value passed back from state machine for uncaught exceptions. |
| Object? _datum; |
| |
| /// This is the nested iterator when iterating a yield* of a non-sync iterator. |
| Iterator<T>? _nestedIterator = null; |
| |
| /// Stack of suspended state machines when iterating a yield* of a sync* |
| /// iterator. |
| List? _suspendedBodies = null; |
| |
| _SyncStarIterator(this._body); |
| |
| T get current { |
| return _current as T; |
| } |
| |
| _resumeBody(int errorCode, Object? errorValue) { |
| final body = _body; |
| while (true) { |
| try { |
| return JS('', '#(#, #, #)', body, this, errorCode, errorValue); |
| } catch (error) { |
| errorValue = JS_RAW_EXCEPTION(); |
| errorCode = async_status_codes.ERROR; |
| } |
| } |
| } |
| |
| bool moveNext() { |
| Object? errorValue; |
| int errorCode = async_status_codes.SUCCESS; |
| while (true) { |
| final nestedIterator = _nestedIterator; |
| if (nestedIterator != null) { |
| try { |
| if (nestedIterator.moveNext()) { |
| _current = nestedIterator.current; |
| return true; |
| } else { |
| _nestedIterator = null; |
| } |
| } catch (error) { |
| errorValue = error; |
| errorCode = async_status_codes.ERROR; |
| _nestedIterator = null; |
| } |
| } |
| |
| var value = _resumeBody(errorCode, errorValue); |
| |
| if (async_status_codes.SYNC_STAR_YIELD == value) { |
| // The state-machine has assgned the value to _current. |
| return true; |
| } |
| if (async_status_codes.SYNC_STAR_DONE == value) { |
| _current = null; |
| var suspendedBodies = _suspendedBodies; |
| if (suspendedBodies == null || suspendedBodies.isEmpty) { |
| // Overwrite the body with a stub for an empty iterable. If [moveNext] |
| // is called 'too many' times, it continues to return `false`. |
| _body = RAW_DART_FUNCTION_REF(_terminatedBody); |
| return false; |
| } |
| // Resume the innermost suspended iterator. |
| _body = suspendedBodies.removeLast(); |
| errorCode = async_status_codes.SUCCESS; |
| errorValue = null; |
| continue; |
| } |
| if (async_status_codes.SYNC_STAR_YIELD_STAR == value) { |
| // The call to _yieldStar has modified the state. |
| errorCode = async_status_codes.SUCCESS; |
| errorValue = null; |
| continue; |
| } |
| if (async_status_codes.SYNC_STAR_UNCAUGHT_EXCEPTION == value) { |
| errorValue = _datum; |
| _datum = null; |
| var suspendedBodies = _suspendedBodies; |
| if (suspendedBodies == null || suspendedBodies.isEmpty) { |
| _current = null; |
| // Overwrite the body with a stub for an empty iterable. If [moveNext] |
| // is called after the exception propagates out of the `yield*` stack, |
| // it will return `false`. |
| _body = RAW_DART_FUNCTION_REF(_terminatedBody); |
| |
| // This is a wrapped exception, so we use JavaScript throw to throw |
| // it. |
| JS('', 'throw #', errorValue); |
| // The above is not seen as terminating, so we need this return: |
| return false; // unreachable |
| } |
| // Resume the innermost suspended iterator. |
| _body = suspendedBodies.removeLast(); |
| errorCode = async_status_codes.ERROR; |
| continue; |
| } |
| throw StateError('sync*'); |
| } |
| } |
| |
| static _terminatedBody(_1, _2, _3) => async_status_codes.SYNC_STAR_DONE; |
| |
| // Called from generated code. |
| int _yieldStar(Iterable<T> iterable) { |
| if (iterable is _SyncStarIterable) { |
| // Promotion fails, so we need this zero-cost 'cast'. |
| _SyncStarIterable<T> syncStarIterable = JS('', '#', iterable); |
| _SyncStarIterator inner = syncStarIterable.iterator; |
| // Suspend the current state machine and start acting on behalf of |
| // the nested state machine. |
| // |
| // TODO(sra): Recognize "tail yield*" statements and avoid |
| // suspending the current body when all it will do is step without |
| // effect to ITERATION_ENDED. |
| (_suspendedBodies ??= []).add(_body); |
| _body = inner._body; |
| return async_status_codes.SYNC_STAR_YIELD_STAR; |
| } else { |
| _nestedIterator = iterable.iterator; |
| return async_status_codes.SYNC_STAR_YIELD_STAR; |
| } |
| } |
| } |
| |
| /// Creates an Iterable for a `sync*` function. |
| /// |
| /// Used as part of the runtime support for the async/await transformation. |
| _SyncStarIterable<T> _makeSyncStarIterable<T>(body) { |
| return _SyncStarIterable<T>(body); |
| } |
| |
| /// An Iterable corresponding to a sync* method. |
| /// |
| /// Each invocation of a sync* method will return a new instance of this class. |
| class _SyncStarIterable<T> extends Iterable<T> { |
| /// This is a function that will return a helper function that does the |
| /// iteration of the sync*. |
| /// |
| /// Each invocation should give a body with fresh state. |
| final dynamic /* js function */ _outerHelper; |
| |
| _SyncStarIterable(this._outerHelper); |
| |
| _SyncStarIterator<T> get iterator => |
| _SyncStarIterator<T>(JS('', '#()', _outerHelper)); |
| } |
| |
| /// Wraps an `await`ed expression in [Future.value] if needed. |
| /// |
| /// If an expression `e` has a static type of `S`, then `await e` must first |
| /// check if the runtime type of `e` is `Future<flatten(S)>`. If it is, `e` can |
| /// be `await`ed directly. Otherwise, we must `await Future.value(e)`. Here, [T] |
| /// is expected to be `flatten(S)`. |
| /// |
| /// It suffices to use `_Future.value` rather than `Future.value` - see the |
| /// comments on https://github.com/dart-lang/sdk/issues/50601. |
| Future<T> _wrapAwaitedExpression<T>(Object? e) => |
| e is Future<T> ? e : _Future<T>.value(unsafeCast<T>(e)); |