blob: b8138d7b4dda863c3c20bcaa2fc90a6c9ba80675 [file] [log] [blame]
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
/// This library adapts ES6 generators to implement Dart's async/await.
/// It's designed to interact with Dart's Future/Stream and follow Dart
/// async/await semantics.
/// See https://github.com/dart-lang/dev_compiler/issues/245 for ideas on
/// reconciling Dart's Future and ES6 Promise.
/// Inspired by `co`: https://github.com/tj/co/blob/master/index.js, which is a
/// stepping stone for proposed ES7 async/await, and uses ES6 Promises.
part of dart._runtime;
final _jsIterator = JS('', 'Symbol("_jsIterator")');
final _current = JS('', 'Symbol("_current")');
syncStar(gen, E, @rest args) => JS('', '''(() => {
const SyncIterable_E = ${getGenericClass(SyncIterable)}($E);
return new SyncIterable_E($gen, $args);
})()''');
@JSExportName('async')
async_(gen, T, @rest args) => JS('', '''(() => {
let iter;
function onValue(res) {
if (res === void 0) res = null;
return next(iter.next(res));
}
function onError(err) {
// If the awaited Future throws, we want to convert this to an exception
// thrown from the `yield` point, as if it was thrown there.
//
// If the exception is not caught inside `gen`, it will emerge here, which
// will send it to anyone listening on this async function's Future<T>.
//
// In essence, we are giving the code inside the generator a chance to
// use try-catch-finally.
return next(iter.throw(err));
}
function next(ret) {
if (ret.done) return ret.value;
// Checks if the awaited value is a Future.
let future = ret.value;
if (!$instanceOf(future, ${getGenericClass(Future)})) {
future = $Future.value(future);
}
// Chain the Future so `await` receives the Future's value.
return future.then($dynamic)(onValue, {onError: onError});
}
return ${getGenericClass(Future)}($T).new(function() {
iter = $gen.apply(null, $args)[Symbol.iterator]();
return onValue();
});
})()''');
// Implementation inspired by _AsyncStarStreamController in
// dart-lang/sdk's runtime/lib/core_patch.dart
//
// Given input like:
//
// foo() async* {
// yield 1;
// yield* bar();
// print(await baz());
// }
//
// This generates as:
//
// function foo() {
// return dart.asyncStar(function*(stream) {
// if (stream.add(1)) return;
// yield;
// if (stream.addStream(bar()) return;
// yield;
// print(yield baz());
// });
// }
//
// TODO(ochafik): Port back to Dart (which it used to be in the past).
final _AsyncStarStreamController = JS('', '''
class _AsyncStarStreamController {
constructor(generator, T, args) {
this.isAdding = false;
this.isWaiting = false;
this.isScheduled = false;
this.isSuspendedAtYield = false;
this.canceler = null;
this.iterator = generator(this, ...args)[Symbol.iterator]();
this.controller = ${getGenericClass(StreamController)}(T).new({
onListen: () => this.scheduleGenerator(),
onResume: () => this.onResume(),
onCancel: () => this.onCancel()
});
}
onResume() {
if (this.isSuspendedAtYield) {
this.scheduleGenerator();
}
}
onCancel() {
if (this.controller.isClosed) {
return null;
}
if (this.canceler == null) {
this.canceler = $Completer.new();
this.scheduleGenerator();
}
return this.canceler.future;
}
close() {
if (this.canceler != null && !this.canceler.isCompleted) {
// If the stream has been cancelled, complete the cancellation future
// with the error.
this.canceler.complete();
}
this.controller.close();
}
scheduleGenerator() {
// TODO(jmesserly): is this paused check in the right place? Assuming the
// async* Stream yields, then is paused (by other code), the body will
// already be scheduled. This will cause at least one more iteration to
// run (adding another data item to the Stream) before actually pausing.
// It could be fixed by moving the `isPaused` check inside `runBody`.
if (this.isScheduled || this.controller.isPaused ||
this.isAdding || this.isWaiting) {
return;
}
this.isScheduled = true;
$scheduleMicrotask(() => this.runBody());
}
runBody(opt_awaitValue) {
this.isScheduled = false;
this.isSuspendedAtYield = false;
this.isWaiting = false;
let iter;
try {
iter = this.iterator.next(opt_awaitValue);
} catch (e) {
this.addError(e, $stackTrace(e));
this.close();
return;
}
if (iter.done) {
this.close();
return;
}
// If we're suspended at a yield/yield*, we're done for now.
if (this.isSuspendedAtYield || this.isAdding) return;
// Handle `await`: if we get a value passed to `yield` it means we are
// waiting on this Future. Make sure to prevent scheduling, and pass the
// value back as the result of the `yield`.
//
// TODO(jmesserly): is the timing here correct? The assumption here is
// that we should schedule `await` in `async*` the same as in `async`.
this.isWaiting = true;
let future = iter.value;
if (!$instanceOf(future, ${getGenericClass(Future)})) {
future = $Future.value(future);
}
return future.then($dynamic)((x) => this.runBody(x),
{ onError: (e, s) => this.throwError(e, s) });
}
// Adds element to stream, returns true if the caller should terminate
// execution of the generator.
add(event) {
// If stream is cancelled, tell caller to exit the async generator.
if (!this.controller.hasListener) return true;
this.controller.add(event);
this.scheduleGenerator();
this.isSuspendedAtYield = true;
return false;
}
// Adds the elements of stream into this controller's stream.
// The generator will be scheduled again when all of the
// elements of the added stream have been consumed.
// Returns true if the caller should terminate
// execution of the generator.
addStream(stream) {
// If stream is cancelled, tell caller to exit the async generator.
if (!this.controller.hasListener) return true;
this.isAdding = true;
this.controller.addStream(stream, {cancelOnError: false}).then($dynamic)(
() => {
this.isAdding = false;
this.scheduleGenerator();
}, { onError: (e, s) => this.throwError(e, s) });
}
throwError(error, stackTrace) {
try {
this.iterator.throw(error);
} catch (e) {
this.addError(e, stackTrace);
}
}
addError(error, stackTrace) {
if ((this.canceler != null) && !this.canceler.isCompleted) {
// If the stream has been cancelled, complete the cancellation future
// with the error.
this.canceler.completeError(error, stackTrace);
return;
}
if (!this.controller.hasListener) return;
this.controller.addError(error, stackTrace);
}
}
''');
/// Returns a Stream of T implemented by an async* function. */
///
asyncStar(gen, T, @rest args) => JS('', '''(() => {
return new $_AsyncStarStreamController($gen, $T, $args).controller.stream;
})()''');