blob: 8522a42cd46b082372bf6b7cd3b349fdb52c2be3 [file] [log] [blame]
// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
/**
* Support for asynchronous programming,
* with classes such as Future and Stream.
*
* Understanding [Future]s and [Stream]s is a prerequisite for
* writing just about any Dart program.
*
* To use this library in your code:
*
* import 'dart:async';
*
* ## Future
*
* A Future object represents a computation whose return value
* might not yet be available.
* The Future returns the value of the computation
* when it completes at some time in the future.
* Futures are often used for potentially lengthy computations
* such as I/O and interaction with users.
*
* Many methods in the Dart libraries return Futures when
* performing tasks. For example, when binding an HttpServer
* to a host and port, the `bind()` method returns a Future.
*
* HttpServer.bind('127.0.0.1', 4444)
* .then((server) => print('${server.isBroadcast}'))
* .catchError(print);
*
* [Future.then] registers a callback function that runs
* when the Future's operation, in this case the `bind()` method,
* completes successfully.
* The value returned by the operation
* is passed into the callback function.
* In this example, the `bind()` method returns the HttpServer
* object. The callback function prints one of its properties.
* [Future.catchError] registers a callback function that
* runs if an error occurs within the Future.
*
* ## Stream
*
* A Stream provides an asynchronous sequence of data.
* Examples of data sequences include individual events, like mouse clicks,
* or sequential chunks of larger data, like multiple byte lists with the
* contents of a file
* such as mouse clicks, and a stream of byte lists read from a file.
* The following example opens a file for reading.
* [Stream.listen] registers a callback function that runs
* each time more data is available.
*
* Stream<List<int>> stream = new File('quotes.txt').openRead();
* stream.transform(UTF8.decoder).listen(print);
*
* The stream emits a sequence of a list of bytes.
* The program must interpret the bytes or handle the raw byte data.
* Here, the code uses a UTF8 decoder (provided in the `dart:convert` library)
* to convert the sequence of bytes into a sequence
* of Dart strings.
*
* Another common use of streams is for user-generated events
* in a web app: The following code listens for mouse clicks on a button.
*
* querySelector('#myButton').onClick.listen((_) => print('Click.'));
*
* ## Other resources
*
* * The [dart:async section of the library tour][asynchronous-programming]:
* A brief overview of asynchronous programming.
*
* * [Use Future-Based APIs][futures-tutorial]: A closer look at Futures and
* how to use them to write asynchronous Dart code.
*
* * [Futures and Error Handling][futures-error-handling]: Everything you
* wanted to know about handling errors and exceptions when working with
* Futures (but were afraid to ask).
*
* * [The Event Loop and Dart](https://www.dartlang.org/articles/event-loop/):
* Learn how Dart handles the event queue and microtask queue, so you can
* write better asynchronous code with fewer surprises.
*
* * [test package: Asynchronous Tests][test-readme]: How to test asynchronous
* code.
*
* [asynchronous-programming]: https://www.dartlang.org/docs/dart-up-and-running/ch03.html#dartasync---asynchronous-programming
* [futures-tutorial]: https://www.dartlang.org/docs/tutorials/futures/
* [futures-error-handling]: https://www.dartlang.org/articles/futures-and-error-handling/
* [test-readme]: https://pub.dartlang.org/packages/test
*/
library dart.async;
import "dart:collection";
import "dart:_internal" show printToZone, printToConsole, IterableElementError;
import "dart:_internal" hide Symbol;
part 'async_error.dart';
part 'broadcast_stream_controller.dart';
part 'deferred_load.dart';
part 'future.dart';
part 'future_impl.dart';
part 'schedule_microtask.dart';
part 'stream.dart';
part 'stream_controller.dart';
part 'stream_impl.dart';
part 'stream_pipe.dart';
part 'stream_transformers.dart';
part 'timer.dart';
part 'zone.dart';
_fatal(msg) native "DartAsync_fatal";
Function _asyncThenWrapperHelper(continuation) {
// Any function that is used as an asynchronous callback must be registered
// in the current Zone. Normally, this is done by the future when a
// callback is registered (for example with `.then` or `.catchError`). In our
// case we want to reuse the same callback multiple times and therefore avoid
// the multiple registrations. For our internal futures (`_Future`) we can
// use the shortcut-version of `.then`, and skip the registration. However,
// that means that the continuation must be registered by us.
//
// Furthermore, we know that the root-zone doesn't actually do anything and
// we can therefore skip the registration call for it.
//
// Note, that the continuation accepts up to three arguments. If the current
// zone is the root zone, we don't wrap the continuation, and a bad
// `Future` implementation could potentially invoke the callback with the
// wrong number of arguments.
if (Zone.current == Zone.ROOT) return continuation;
return Zone.current.registerUnaryCallback((x) => continuation(x, null, null));
}
Function _asyncErrorWrapperHelper(continuation) {
// See comments of `_asyncThenWrapperHelper`.
var errorCallback = (e, s) => continuation(null, e, s);
if (Zone.current == Zone.ROOT) return errorCallback;
return Zone.current.registerBinaryCallback(errorCallback);
}
/// Registers the [thenCallback] and [errorCallback] on the given [object].
///
/// If [object] is not a future, then it is wrapped into one.
///
/// Returns the result of registering with `.then`.
Future _awaitHelper(
var object, Function thenCallback, Function errorCallback, var awaiter) {
if (object is! Future) {
object = new _Future().._setValue(object);
} else if (object is! _Future) {
return object.then(thenCallback, onError: errorCallback);
}
// `object` is a `_Future`.
//
// Since the callbacks have been registered in the current zone (see
// [_asyncThenWrapperHelper] and [_asyncErrorWrapperHelper]), we can avoid
// another registration and directly invoke the no-zone-registration `.then`.
//
// We can only do this for our internal futures (the default implementation of
// all futures that are constructed by the `dart:async` library).
object._awaiter = awaiter;
return object._thenNoZoneRegistration(thenCallback, errorCallback);
}
void _asyncStarListenHelper(var object, var awaiter) {
if (object is! _StreamImpl) {
return;
}
// `object` is a `_StreamImpl`.
object._awaiter = awaiter;
}
void _asyncStarMoveNextHelper(var stream) {
if (stream is! _StreamImpl) {
return;
}
// stream is a _StreamImpl.
if (stream._generator == null) {
// No generator registered, this isn't an async* Stream.
return;
}
_moveNextDebuggerStepCheck(stream._generator);
}
class _AsyncStarStreamController {
StreamController controller;
Function asyncStarBody;
bool isAdding = false;
bool onListenReceived = false;
bool isScheduled = false;
bool isSuspendedAtYield = false;
Completer cancellationCompleter = null;
Stream get stream {
Stream local = controller.stream;
if (local is! _StreamImpl) {
return local;
}
local._generator = asyncStarBody;
return local;
}
void runBody() {
isScheduled = false;
isSuspendedAtYield = false;
asyncStarBody();
}
void scheduleGenerator() {
if (isScheduled || controller.isPaused || isAdding) {
return;
}
isScheduled = true;
scheduleMicrotask(runBody);
}
// Adds element to stream, returns true if the caller should terminate
// execution of the generator.
//
// TODO(hausner): Per spec, the generator should be suspended before
// exiting when the stream is closed. We could add a getter like this:
// get isCancelled => controller.hasListener;
// The generator would translate a 'yield e' statement to
// controller.add(e);
// suspend;
// if (controller.isCancelled) return;
bool add(event) {
if (!onListenReceived) _fatal("yield before stream is listened to");
if (isSuspendedAtYield) _fatal("unexpected yield");
// If stream is cancelled, tell caller to exit the async generator.
if (!controller.hasListener) {
return true;
}
controller.add(event);
scheduleGenerator();
isSuspendedAtYield = true;
return false;
}
// Adds the elements of stream into this controller's stream.
// The generator will be scheduled again when all of the
// elements of the added stream have been consumed.
// Returns true if the caller should terminate
// execution of the generator.
bool addStream(Stream stream) {
if (!onListenReceived) _fatal("yield before stream is listened to");
// If stream is cancelled, tell caller to exit the async generator.
if (!controller.hasListener) return true;
isAdding = true;
var whenDoneAdding =
controller.addStream(stream as Stream, cancelOnError: false);
whenDoneAdding.then((_) {
isAdding = false;
scheduleGenerator();
if (!isScheduled) isSuspendedAtYield = true;
});
return false;
}
void addError(error, stackTrace) {
if ((cancellationCompleter != null) && !cancellationCompleter.isCompleted) {
// If the stream has been cancelled, complete the cancellation future
// with the error.
cancellationCompleter.completeError(error, stackTrace);
return;
}
// If stream is cancelled, tell caller to exit the async generator.
if (!controller.hasListener) return;
controller.addError(error, stackTrace);
// No need to schedule the generator body here. This code is only
// called from the catch clause of the implicit try-catch-finally
// around the generator body. That is, we are on the error path out
// of the generator and do not need to run the generator again.
}
close() {
if ((cancellationCompleter != null) && !cancellationCompleter.isCompleted) {
// If the stream has been cancelled, complete the cancellation future
// with the error.
cancellationCompleter.complete();
}
controller.close();
}
_AsyncStarStreamController(this.asyncStarBody) {
controller = new StreamController(
onListen: this.onListen,
onResume: this.onResume,
onCancel: this.onCancel);
}
onListen() {
assert(!onListenReceived);
onListenReceived = true;
scheduleGenerator();
}
onResume() {
if (isSuspendedAtYield) {
scheduleGenerator();
}
}
onCancel() {
if (controller.isClosed) {
return null;
}
if (cancellationCompleter == null) {
cancellationCompleter = new Completer();
// Only resume the generator if it is suspended at a yield.
// Cancellation does not affect an async generator that is
// suspended at an await.
if (isSuspendedAtYield) {
scheduleGenerator();
}
}
return cancellationCompleter.future;
}
}
void _completeOnAsyncReturn(Object completer, Object value) {
completer.complete(value);
}
/// Returns a [StackTrace] object containing the synchronous prefix for this
/// asynchronous method.
Object _asyncStackTraceHelper(Function async_op)
native "StackTrace_asyncStackTraceHelper";
void _clearAsyncThreadStackTrace()
native "StackTrace_clearAsyncThreadStackTrace";
void _setAsyncThreadStackTrace(StackTrace stackTrace)
native "StackTrace_setAsyncThreadStackTrace";
void _moveNextDebuggerStepCheck(Function async_op)
native "AsyncStarMoveNext_debuggerStepCheck";
final Set<String> _loadedLibraries = new Set<String>();
typedef void _ScheduleImmediateClosure(void callback());
class _ScheduleImmediate {
static _ScheduleImmediateClosure _closure;
}
void _setScheduleImmediateClosure(_ScheduleImmediateClosure closure) {
_ScheduleImmediate._closure = closure;
}
typedef Timer _TimerFactoryClosure(
int milliseconds, void callback(Timer timer), bool repeating);
class _TimerFactory {
static _TimerFactoryClosure _factory;
}