| // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| /** |
| * Support for asynchronous programming, |
| * with classes such as Future and Stream. |
| * |
| * Understanding [Future]s and [Stream]s is a prerequisite for |
| * writing just about any Dart program. |
| * |
| * To use this library in your code: |
| * |
| * import 'dart:async'; |
| * |
| * ## Future |
| * |
| * A Future object represents a computation whose return value |
| * might not yet be available. |
| * The Future returns the value of the computation |
| * when it completes at some time in the future. |
| * Futures are often used for potentially lengthy computations |
| * such as I/O and interaction with users. |
| * |
| * Many methods in the Dart libraries return Futures when |
| * performing tasks. For example, when binding an HttpServer |
| * to a host and port, the `bind()` method returns a Future. |
| * |
| * HttpServer.bind('127.0.0.1', 4444) |
| * .then((server) => print('${server.isBroadcast}')) |
| * .catchError(print); |
| * |
| * [Future.then] registers a callback function that runs |
| * when the Future's operation, in this case the `bind()` method, |
| * completes successfully. |
| * The value returned by the operation |
| * is passed into the callback function. |
| * In this example, the `bind()` method returns the HttpServer |
| * object. The callback function prints one of its properties. |
| * [Future.catchError] registers a callback function that |
| * runs if an error occurs within the Future. |
| * |
| * ## Stream |
| * |
| * A Stream provides an asynchronous sequence of data. |
| * Examples of data sequences include individual events, like mouse clicks, |
| * or sequential chunks of larger data, like multiple byte lists with the |
| * contents of a file |
| * such as mouse clicks, and a stream of byte lists read from a file. |
| * The following example opens a file for reading. |
| * [Stream.listen] registers a callback function that runs |
| * each time more data is available. |
| * |
| * Stream<List<int>> stream = new File('quotes.txt').openRead(); |
| * stream.transform(UTF8.decoder).listen(print); |
| * |
| * The stream emits a sequence of a list of bytes. |
| * The program must interpret the bytes or handle the raw byte data. |
| * Here, the code uses a UTF8 decoder (provided in the `dart:convert` library) |
| * to convert the sequence of bytes into a sequence |
| * of Dart strings. |
| * |
| * Another common use of streams is for user-generated events |
| * in a web app: The following code listens for mouse clicks on a button. |
| * |
| * querySelector('#myButton').onClick.listen((_) => print('Click.')); |
| * |
| * ## Other resources |
| * |
| * * The [dart:async section of the library tour][asynchronous-programming]: |
| * A brief overview of asynchronous programming. |
| * |
| * * [Use Future-Based APIs][futures-tutorial]: A closer look at Futures and |
| * how to use them to write asynchronous Dart code. |
| * |
| * * [Futures and Error Handling][futures-error-handling]: Everything you |
| * wanted to know about handling errors and exceptions when working with |
| * Futures (but were afraid to ask). |
| * |
| * * [The Event Loop and Dart](https://www.dartlang.org/articles/event-loop/): |
| * Learn how Dart handles the event queue and microtask queue, so you can |
| * write better asynchronous code with fewer surprises. |
| * |
| * * [test package: Asynchronous Tests][test-readme]: How to test asynchronous |
| * code. |
| * |
| * [asynchronous-programming]: https://www.dartlang.org/docs/dart-up-and-running/ch03.html#dartasync---asynchronous-programming |
| * [futures-tutorial]: https://www.dartlang.org/docs/tutorials/futures/ |
| * [futures-error-handling]: https://www.dartlang.org/articles/futures-and-error-handling/ |
| * [test-readme]: https://pub.dartlang.org/packages/test |
| */ |
| library dart.async; |
| |
| import "dart:collection"; |
| import "dart:_internal" show printToZone, printToConsole, IterableElementError; |
| import "dart:_internal" hide Symbol; |
| |
| part 'async_error.dart'; |
| part 'broadcast_stream_controller.dart'; |
| part 'deferred_load.dart'; |
| part 'future.dart'; |
| part 'future_impl.dart'; |
| part 'schedule_microtask.dart'; |
| part 'stream.dart'; |
| part 'stream_controller.dart'; |
| part 'stream_impl.dart'; |
| part 'stream_pipe.dart'; |
| part 'stream_transformers.dart'; |
| part 'timer.dart'; |
| part 'zone.dart'; |
| |
| _fatal(msg) native "DartAsync_fatal"; |
| Function _asyncThenWrapperHelper(continuation) { |
| // Any function that is used as an asynchronous callback must be registered |
| // in the current Zone. Normally, this is done by the future when a |
| // callback is registered (for example with `.then` or `.catchError`). In our |
| // case we want to reuse the same callback multiple times and therefore avoid |
| // the multiple registrations. For our internal futures (`_Future`) we can |
| // use the shortcut-version of `.then`, and skip the registration. However, |
| // that means that the continuation must be registered by us. |
| // |
| // Furthermore, we know that the root-zone doesn't actually do anything and |
| // we can therefore skip the registration call for it. |
| // |
| // Note, that the continuation accepts up to three arguments. If the current |
| // zone is the root zone, we don't wrap the continuation, and a bad |
| // `Future` implementation could potentially invoke the callback with the |
| // wrong number of arguments. |
| if (Zone.current == Zone.ROOT) return continuation; |
| return Zone.current.registerUnaryCallback((x) => continuation(x, null, null)); |
| } |
| Function _asyncErrorWrapperHelper(continuation) { |
| // See comments of `_asyncThenWrapperHelper`. |
| var errorCallback = (e, s) => continuation(null, e, s); |
| if (Zone.current == Zone.ROOT) return errorCallback; |
| return Zone.current.registerBinaryCallback(errorCallback); |
| } |
| /// Registers the [thenCallback] and [errorCallback] on the given [object]. |
| /// |
| /// If [object] is not a future, then it is wrapped into one. |
| /// |
| /// Returns the result of registering with `.then`. |
| Future _awaitHelper( |
| var object, Function thenCallback, Function errorCallback, var awaiter) { |
| if (object is! Future) { |
| object = new _Future().._setValue(object); |
| } else if (object is! _Future) { |
| return object.then(thenCallback, onError: errorCallback); |
| } |
| // `object` is a `_Future`. |
| // |
| // Since the callbacks have been registered in the current zone (see |
| // [_asyncThenWrapperHelper] and [_asyncErrorWrapperHelper]), we can avoid |
| // another registration and directly invoke the no-zone-registration `.then`. |
| // |
| // We can only do this for our internal futures (the default implementation of |
| // all futures that are constructed by the `dart:async` library). |
| object._awaiter = awaiter; |
| return object._thenNoZoneRegistration(thenCallback, errorCallback); |
| } |
| void _asyncStarListenHelper(var object, var awaiter) { |
| if (object is! _StreamImpl) { |
| return; |
| } |
| // `object` is a `_StreamImpl`. |
| object._awaiter = awaiter; |
| } |
| void _asyncStarMoveNextHelper(var stream) { |
| if (stream is! _StreamImpl) { |
| return; |
| } |
| // stream is a _StreamImpl. |
| if (stream._generator == null) { |
| // No generator registered, this isn't an async* Stream. |
| return; |
| } |
| _moveNextDebuggerStepCheck(stream._generator); |
| } |
| class _AsyncStarStreamController { |
| StreamController controller; |
| Function asyncStarBody; |
| bool isAdding = false; |
| bool onListenReceived = false; |
| bool isScheduled = false; |
| bool isSuspendedAtYield = false; |
| Completer cancellationCompleter = null; |
| |
| Stream get stream { |
| Stream local = controller.stream; |
| if (local is! _StreamImpl) { |
| return local; |
| } |
| local._generator = asyncStarBody; |
| return local; |
| } |
| |
| void runBody() { |
| isScheduled = false; |
| isSuspendedAtYield = false; |
| asyncStarBody(); |
| } |
| |
| void scheduleGenerator() { |
| if (isScheduled || controller.isPaused || isAdding) { |
| return; |
| } |
| isScheduled = true; |
| scheduleMicrotask(runBody); |
| } |
| |
| // Adds element to stream, returns true if the caller should terminate |
| // execution of the generator. |
| // |
| // TODO(hausner): Per spec, the generator should be suspended before |
| // exiting when the stream is closed. We could add a getter like this: |
| // get isCancelled => controller.hasListener; |
| // The generator would translate a 'yield e' statement to |
| // controller.add(e); |
| // suspend; |
| // if (controller.isCancelled) return; |
| bool add(event) { |
| if (!onListenReceived) _fatal("yield before stream is listened to"); |
| if (isSuspendedAtYield) _fatal("unexpected yield"); |
| // If stream is cancelled, tell caller to exit the async generator. |
| if (!controller.hasListener) { |
| return true; |
| } |
| controller.add(event); |
| scheduleGenerator(); |
| isSuspendedAtYield = true; |
| return false; |
| } |
| |
| // Adds the elements of stream into this controller's stream. |
| // The generator will be scheduled again when all of the |
| // elements of the added stream have been consumed. |
| // Returns true if the caller should terminate |
| // execution of the generator. |
| bool addStream(Stream stream) { |
| if (!onListenReceived) _fatal("yield before stream is listened to"); |
| // If stream is cancelled, tell caller to exit the async generator. |
| if (!controller.hasListener) return true; |
| isAdding = true; |
| var whenDoneAdding = |
| controller.addStream(stream as Stream, cancelOnError: false); |
| whenDoneAdding.then((_) { |
| isAdding = false; |
| scheduleGenerator(); |
| if (!isScheduled) isSuspendedAtYield = true; |
| }); |
| return false; |
| } |
| |
| void addError(error, stackTrace) { |
| if ((cancellationCompleter != null) && !cancellationCompleter.isCompleted) { |
| // If the stream has been cancelled, complete the cancellation future |
| // with the error. |
| cancellationCompleter.completeError(error, stackTrace); |
| return; |
| } |
| // If stream is cancelled, tell caller to exit the async generator. |
| if (!controller.hasListener) return; |
| controller.addError(error, stackTrace); |
| // No need to schedule the generator body here. This code is only |
| // called from the catch clause of the implicit try-catch-finally |
| // around the generator body. That is, we are on the error path out |
| // of the generator and do not need to run the generator again. |
| } |
| |
| close() { |
| if ((cancellationCompleter != null) && !cancellationCompleter.isCompleted) { |
| // If the stream has been cancelled, complete the cancellation future |
| // with the error. |
| cancellationCompleter.complete(); |
| } |
| controller.close(); |
| } |
| |
| _AsyncStarStreamController(this.asyncStarBody) { |
| controller = new StreamController( |
| onListen: this.onListen, |
| onResume: this.onResume, |
| onCancel: this.onCancel); |
| } |
| |
| onListen() { |
| assert(!onListenReceived); |
| onListenReceived = true; |
| scheduleGenerator(); |
| } |
| |
| onResume() { |
| if (isSuspendedAtYield) { |
| scheduleGenerator(); |
| } |
| } |
| |
| onCancel() { |
| if (controller.isClosed) { |
| return null; |
| } |
| if (cancellationCompleter == null) { |
| cancellationCompleter = new Completer(); |
| // Only resume the generator if it is suspended at a yield. |
| // Cancellation does not affect an async generator that is |
| // suspended at an await. |
| if (isSuspendedAtYield) { |
| scheduleGenerator(); |
| } |
| } |
| return cancellationCompleter.future; |
| } |
| } |
| void _completeOnAsyncReturn(Object completer, Object value) { |
| completer.complete(value); |
| } |
| /// Returns a [StackTrace] object containing the synchronous prefix for this |
| /// asynchronous method. |
| Object _asyncStackTraceHelper(Function async_op) |
| native "StackTrace_asyncStackTraceHelper"; |
| void _clearAsyncThreadStackTrace() |
| native "StackTrace_clearAsyncThreadStackTrace"; |
| void _setAsyncThreadStackTrace(StackTrace stackTrace) |
| native "StackTrace_setAsyncThreadStackTrace"; |
| void _moveNextDebuggerStepCheck(Function async_op) |
| native "AsyncStarMoveNext_debuggerStepCheck"; |
| final Set<String> _loadedLibraries = new Set<String>(); |
| typedef void _ScheduleImmediateClosure(void callback()); |
| class _ScheduleImmediate { |
| static _ScheduleImmediateClosure _closure; |
| } |
| void _setScheduleImmediateClosure(_ScheduleImmediateClosure closure) { |
| _ScheduleImmediate._closure = closure; |
| } |
| typedef Timer _TimerFactoryClosure( |
| int milliseconds, void callback(Timer timer), bool repeating); |
| class _TimerFactory { |
| static _TimerFactoryClosure _factory; |
| } |