|  | // Copyright (c) 2012, the Dart project authors.  Please see the AUTHORS file | 
|  | // for details. All rights reserved. Use of this source code is governed by a | 
|  | // BSD-style license that can be found in the LICENSE file. | 
|  |  | 
|  | part of dart.async; | 
|  |  | 
|  | // ------------------------------------------------------------------- | 
|  | // Controller for creating and adding events to a stream. | 
|  | // ------------------------------------------------------------------- | 
|  |  | 
|  | /** | 
|  | * Type of a stream controller's `onListen`, `onPause` and `onResume` callbacks. | 
|  | */ | 
|  | typedef void ControllerCallback(); | 
|  |  | 
|  | /** | 
|  | * Type of stream controller `onCancel` callbacks. | 
|  | * | 
|  | * The callback may return either `void` or a future. | 
|  | */ | 
|  | typedef ControllerCancelCallback(); | 
|  |  | 
|  | /** | 
|  | * A controller with the stream it controls. | 
|  | * | 
|  | * This controller allows sending data, error and done events on | 
|  | * its [stream]. | 
|  | * This class can be used to create a simple stream that others | 
|  | * can listen on, and to push events to that stream. | 
|  | * | 
|  | * It's possible to check whether the stream is paused or not, and whether | 
|  | * it has subscribers or not, as well as getting a callback when either of | 
|  | * these change. | 
|  | */ | 
|  | abstract class StreamController<T> implements StreamSink<T> { | 
|  | /** The stream that this controller is controlling. */ | 
|  | Stream<T> get stream; | 
|  |  | 
|  | /** | 
|  | * A controller with a [stream] that supports only one single subscriber. | 
|  | * | 
|  | * If [sync] is true, the returned stream controller is a | 
|  | * [SynchronousStreamController], and must be used with the care | 
|  | * and attention necessary to not break the [Stream] contract. If in doubt, | 
|  | * use the non-sync version. | 
|  | * | 
|  | * Using an asynchronous controller will never give the wrong | 
|  | * behavior, but using a synchronous controller incorrectly can cause | 
|  | * otherwise correct programs to break. | 
|  | * | 
|  | * A synchronous controller is only intended for optimizing event | 
|  | * propagation when one asynchronous event immediately triggers another. | 
|  | * It should not be used unless the calls to [add] or [addError] | 
|  | * are guaranteed to occur in places where it won't break `Stream` invariants. | 
|  | * | 
|  | * Use synchronous controllers only to forward (potentially transformed) | 
|  | * events from another stream or a future. | 
|  | * | 
|  | * A Stream should be inert until a subscriber starts listening on it (using | 
|  | * the [onListen] callback to start producing events). Streams should not | 
|  | * leak resources (like websockets) when no user ever listens on the stream. | 
|  | * | 
|  | * The controller buffers all incoming events until a subscriber is | 
|  | * registered, but this feature should only be used in rare circumstances. | 
|  | * | 
|  | * The [onPause] function is called when the stream becomes | 
|  | * paused. [onResume] is called when the stream resumed. | 
|  | * | 
|  | * The [onListen] callback is called when the stream | 
|  | * receives its listener and [onCancel] when the listener ends | 
|  | * its subscription. If [onCancel] needs to perform an asynchronous operation, | 
|  | * [onCancel] should return a future that completes when the cancel operation | 
|  | * is done. | 
|  | * | 
|  | * If the stream is canceled before the controller needs new data the | 
|  | * [onResume] call might not be executed. | 
|  | */ | 
|  | factory StreamController( | 
|  | {void onListen(), | 
|  | void onPause(), | 
|  | void onResume(), | 
|  | onCancel(), | 
|  | bool sync: false}) { | 
|  | return sync | 
|  | ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) | 
|  | : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * A controller where [stream] can be listened to more than once. | 
|  | * | 
|  | * The [Stream] returned by [stream] is a broadcast stream. | 
|  | * It can be listened to more than once. | 
|  | * | 
|  | * A Stream should be inert until a subscriber starts listening on it (using | 
|  | * the [onListen] callback to start producing events). Streams should not | 
|  | * leak resources (like websockets) when no user ever listens on the stream. | 
|  | * | 
|  | * Broadcast streams do not buffer events when there is no listener. | 
|  | * | 
|  | * The controller distributes any events to all currently subscribed | 
|  | * listeners at the time when [add], [addError] or [close] is called. | 
|  | * It is not allowed to call `add`, `addError`, or `close` before a previous | 
|  | * call has returned. The controller does not have any internal queue of | 
|  | * events, and if there are no listeners at the time the event is added, | 
|  | * it will just be dropped, or, if it is an error, be reported as uncaught. | 
|  | * | 
|  | * Each listener subscription is handled independently, | 
|  | * and if one pauses, only the pausing listener is affected. | 
|  | * A paused listener will buffer events internally until unpaused or canceled. | 
|  | * | 
|  | * If [sync] is true, events may be fired directly by the stream's | 
|  | * subscriptions during an [add], [addError] or [close] call. | 
|  | * The returned stream controller is a [SynchronousStreamController], | 
|  | * and must be used with the care and attention necessary to not break | 
|  | * the [Stream] contract. | 
|  | * See [Completer.sync] for some explanations on when a synchronous | 
|  | * dispatching can be used. | 
|  | * If in doubt, keep the controller non-sync. | 
|  | * | 
|  | * If [sync] is false, the event will always be fired at a later time, | 
|  | * after the code adding the event has completed. | 
|  | * In that case, no guarantees are given with regard to when | 
|  | * multiple listeners get the events, except that each listener will get | 
|  | * all events in the correct order. Each subscription handles the events | 
|  | * individually. | 
|  | * If two events are sent on an async controller with two listeners, | 
|  | * one of the listeners may get both events | 
|  | * before the other listener gets any. | 
|  | * A listener must be subscribed both when the event is initiated | 
|  | * (that is, when [add] is called) | 
|  | * and when the event is later delivered, | 
|  | * in order to receive the event. | 
|  | * | 
|  | * The [onListen] callback is called when the first listener is subscribed, | 
|  | * and the [onCancel] is called when there are no longer any active listeners. | 
|  | * If a listener is added again later, after the [onCancel] was called, | 
|  | * the [onListen] will be called again. | 
|  | */ | 
|  | factory StreamController.broadcast( | 
|  | {void onListen(), void onCancel(), bool sync: false}) { | 
|  | return sync | 
|  | ? new _SyncBroadcastStreamController<T>(onListen, onCancel) | 
|  | : new _AsyncBroadcastStreamController<T>(onListen, onCancel); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * The callback which is called when the stream is listened to. | 
|  | * | 
|  | * May be set to `null`, in which case no callback will happen. | 
|  | */ | 
|  | ControllerCallback get onListen; | 
|  |  | 
|  | void set onListen(void onListenHandler()); | 
|  |  | 
|  | /** | 
|  | * The callback which is called when the stream is paused. | 
|  | * | 
|  | * May be set to `null`, in which case no callback will happen. | 
|  | * | 
|  | * Pause related callbacks are not supported on broadcast stream controllers. | 
|  | */ | 
|  | ControllerCallback get onPause; | 
|  |  | 
|  | void set onPause(void onPauseHandler()); | 
|  |  | 
|  | /** | 
|  | * The callback which is called when the stream is resumed. | 
|  | * | 
|  | * May be set to `null`, in which case no callback will happen. | 
|  | * | 
|  | * Pause related callbacks are not supported on broadcast stream controllers. | 
|  | */ | 
|  | ControllerCallback get onResume; | 
|  |  | 
|  | void set onResume(void onResumeHandler()); | 
|  |  | 
|  | /** | 
|  | * The callback which is called when the stream is canceled. | 
|  | * | 
|  | * May be set to `null`, in which case no callback will happen. | 
|  | */ | 
|  | ControllerCancelCallback get onCancel; | 
|  |  | 
|  | void set onCancel(onCancelHandler()); | 
|  |  | 
|  | /** | 
|  | * Returns a view of this object that only exposes the [StreamSink] interface. | 
|  | */ | 
|  | StreamSink<T> get sink; | 
|  |  | 
|  | /** | 
|  | * Whether the stream controller is closed for adding more events. | 
|  | * | 
|  | * The controller becomes closed by calling the [close] method. | 
|  | * New events cannot be added, by calling [add] or [addError], | 
|  | * to a closed controller. | 
|  | * | 
|  | * If the controller is closed, | 
|  | * the "done" event might not have been delivered yet, | 
|  | * but it has been scheduled, and it is too late to add more events. | 
|  | */ | 
|  | bool get isClosed; | 
|  |  | 
|  | /** | 
|  | * Whether the subscription would need to buffer events. | 
|  | * | 
|  | * This is the case if the controller's stream has a listener and it is | 
|  | * paused, or if it has not received a listener yet. In that case, the | 
|  | * controller is considered paused as well. | 
|  | * | 
|  | * A broadcast stream controller is never considered paused. It always | 
|  | * forwards its events to all uncanceled subscriptions, if any, | 
|  | * and let the subscriptions handle their own pausing and buffering. | 
|  | */ | 
|  | bool get isPaused; | 
|  |  | 
|  | /** Whether there is a subscriber on the [Stream]. */ | 
|  | bool get hasListener; | 
|  |  | 
|  | /** | 
|  | * Sends a data [event]. | 
|  | * | 
|  | * Listeners receive this event in a later microtask. | 
|  | * | 
|  | * Note that a synchronous controller (created by passing true to the `sync` | 
|  | * parameter of the `StreamController` constructor) delivers events | 
|  | * immediately. Since this behavior violates the contract mentioned here, | 
|  | * synchronous controllers should only be used as described in the | 
|  | * documentation to ensure that the delivered events always *appear* as if | 
|  | * they were delivered in a separate microtask. | 
|  | */ | 
|  | void add(T event); | 
|  |  | 
|  | /** | 
|  | * Sends or enqueues an error event. | 
|  | * | 
|  | * If [error] is `null`, it is replaced by a [NullThrownError]. | 
|  | * | 
|  | * Listeners receive this event at a later microtask. This behavior can be | 
|  | * overridden by using `sync` controllers. Note, however, that sync | 
|  | * controllers have to satisfy the preconditions mentioned in the | 
|  | * documentation of the constructors. | 
|  | */ | 
|  | void addError(Object error, [StackTrace stackTrace]); | 
|  |  | 
|  | /** | 
|  | * Closes the stream. | 
|  | * | 
|  | * Listeners receive the done event at a later microtask. This behavior can be | 
|  | * overridden by using `sync` controllers. Note, however, that sync | 
|  | * controllers have to satisfy the preconditions mentioned in the | 
|  | * documentation of the constructors. | 
|  | */ | 
|  | Future close(); | 
|  |  | 
|  | /** | 
|  | * Receives events from [source] and puts them into this controller's stream. | 
|  | * | 
|  | * Returns a future which completes when the source stream is done. | 
|  | * | 
|  | * Events must not be added directly to this controller using [add], | 
|  | * [addError], [close] or [addStream], until the returned future | 
|  | * is complete. | 
|  | * | 
|  | * Data and error events are forwarded to this controller's stream. A done | 
|  | * event on the source will end the `addStream` operation and complete the | 
|  | * returned future. | 
|  | * | 
|  | * If [cancelOnError] is true, only the first error on [source] is | 
|  | * forwarded to the controller's stream, and the `addStream` ends | 
|  | * after this. If [cancelOnError] is false, all errors are forwarded | 
|  | * and only a done event will end the `addStream`. | 
|  | * If [cancelOnError] is omitted, it defaults to false. | 
|  | */ | 
|  | Future addStream(Stream<T> source, {bool cancelOnError}); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * A stream controller that delivers its events synchronously. | 
|  | * | 
|  | * A synchronous stream controller is intended for cases where | 
|  | * an already asynchronous event triggers an event on a stream. | 
|  | * | 
|  | * Instead of adding the event to the stream in a later microtask, | 
|  | * causing extra latency, the event is instead fired immediately by the | 
|  | * synchronous stream controller, as if the stream event was | 
|  | * the current event or microtask. | 
|  | * | 
|  | * The synchronous stream controller can be used to break the contract | 
|  | * on [Stream], and it must be used carefully to avoid doing so. | 
|  | * | 
|  | * The only advantage to using a [SynchronousStreamController] over a | 
|  | * normal [StreamController] is the improved latency. | 
|  | * Only use the synchronous version if the improvement is significant, | 
|  | * and if its use is safe. Otherwise just use a normal stream controller, | 
|  | * which will always have the correct behavior for a [Stream], and won't | 
|  | * accidentally break other code. | 
|  | * | 
|  | * Adding events to a synchronous controller should only happen as the | 
|  | * very last part of the handling of the original event. | 
|  | * At that point, adding an event to the stream is equivalent to | 
|  | * returning to the event loop and adding the event in the next microtask. | 
|  | * | 
|  | * Each listener callback will be run as if it was a top-level event | 
|  | * or microtask. This means that if it throws, the error will be reported as | 
|  | * uncaught as soon as possible. | 
|  | * This is one reason to add the event as the last thing in the original event | 
|  | * handler - any action done after adding the event will delay the report of | 
|  | * errors in the event listener callbacks. | 
|  | * | 
|  | * If an event is added in a setting that isn't known to be another event, | 
|  | * it may cause the stream's listener to get that event before the listener | 
|  | * is ready to handle it. We promise that after calling [Stream.listen], | 
|  | * you won't get any events until the code doing the listen has completed. | 
|  | * Calling [add] in response to a function call of unknown origin may break | 
|  | * that promise. | 
|  | * | 
|  | * An [onListen] callback from the controller is *not* an asynchronous event, | 
|  | * and adding events to the controller in the `onListen` callback is always | 
|  | * wrong. The events will be delivered before the listener has even received | 
|  | * the subscription yet. | 
|  | * | 
|  | * The synchronous broadcast stream controller also has a restrictions that a | 
|  | * normal stream controller does not: | 
|  | * The [add], [addError], [close] and [addStream] methods *must not* be | 
|  | * called while an event is being delivered. | 
|  | * That is, if a callback on a subscription on the controller's stream causes | 
|  | * a call to any of the functions above, the call will fail. | 
|  | * A broadcast stream may have more than one listener, and if an | 
|  | * event is added synchronously while another is being also in the process | 
|  | * of being added, the latter event might reach some listeners before | 
|  | * the former. To prevent that, an event cannot be added while a previous | 
|  | * event is being fired. | 
|  | * This guarantees that an event is fully delivered when the | 
|  | * first [add], [addError] or [close] returns, | 
|  | * and further events will be delivered in the correct order. | 
|  | * | 
|  | * This still only guarantees that the event is delivered to the subscription. | 
|  | * If the subscription is paused, the actual callback may still happen later, | 
|  | * and the event will instead be buffered by the subscription. | 
|  | * Barring pausing, and the following buffered events that haven't been | 
|  | * delivered yet, callbacks will be called synchronously when an event is added. | 
|  | * | 
|  | * Adding an event to a synchronous non-broadcast stream controller while | 
|  | * another event is in progress may cause the second event to be delayed | 
|  | * and not be delivered synchronously, and until that event is delivered, | 
|  | * the controller will not act synchronously. | 
|  | */ | 
|  | abstract class SynchronousStreamController<T> implements StreamController<T> { | 
|  | /** | 
|  | * Adds event to the controller's stream. | 
|  | * | 
|  | * As [StreamController.add], but must not be called while an event is | 
|  | * being added by [add], [addError] or [close]. | 
|  | */ | 
|  | void add(T data); | 
|  |  | 
|  | /** | 
|  | * Adds error to the controller's stream. | 
|  | * | 
|  | * As [StreamController.addError], but must not be called while an event is | 
|  | * being added by [add], [addError] or [close]. | 
|  | */ | 
|  | void addError(Object error, [StackTrace stackTrace]); | 
|  |  | 
|  | /** | 
|  | * Closes the controller's stream. | 
|  | * | 
|  | * As [StreamController.close], but must not be called while an event is | 
|  | * being added by [add], [addError] or [close]. | 
|  | */ | 
|  | Future close(); | 
|  | } | 
|  |  | 
|  | abstract class _StreamControllerLifecycle<T> { | 
|  | StreamSubscription<T> _subscribe( | 
|  | void onData(T data), Function onError, void onDone(), bool cancelOnError); | 
|  | void _recordPause(StreamSubscription<T> subscription) {} | 
|  | void _recordResume(StreamSubscription<T> subscription) {} | 
|  | Future _recordCancel(StreamSubscription<T> subscription) => null; | 
|  | } | 
|  |  | 
|  | // Base type for implementations of stream controllers. | 
|  | abstract class _StreamControllerBase<T> | 
|  | implements | 
|  | StreamController<T>, | 
|  | _StreamControllerLifecycle<T>, | 
|  | _EventSink<T>, | 
|  | _EventDispatch<T> {} | 
|  |  | 
|  | /** | 
|  | * Default implementation of [StreamController]. | 
|  | * | 
|  | * Controls a stream that only supports a single controller. | 
|  | */ | 
|  | abstract class _StreamController<T> implements _StreamControllerBase<T> { | 
|  | // The states are bit-flags. More than one can be set at a time. | 
|  | // | 
|  | // The "subscription state" goes through the states: | 
|  | //   initial -> subscribed -> canceled. | 
|  | // These are mutually exclusive. | 
|  | // The "closed" state records whether the [close] method has been called | 
|  | // on the controller. This can be done at any time. If done before | 
|  | // subscription, the done event is queued. If done after cancel, the done | 
|  | // event is ignored (just as any other event after a cancel). | 
|  |  | 
|  | /** The controller is in its initial state with no subscription. */ | 
|  | static const int _STATE_INITIAL = 0; | 
|  | /** The controller has a subscription, but hasn't been closed or canceled. */ | 
|  | static const int _STATE_SUBSCRIBED = 1; | 
|  | /** The subscription is canceled. */ | 
|  | static const int _STATE_CANCELED = 2; | 
|  | /** Mask for the subscription state. */ | 
|  | static const int _STATE_SUBSCRIPTION_MASK = 3; | 
|  |  | 
|  | // The following state relate to the controller, not the subscription. | 
|  | // If closed, adding more events is not allowed. | 
|  | // If executing an [addStream], new events are not allowed either, but will | 
|  | // be added by the stream. | 
|  |  | 
|  | /** | 
|  | * The controller is closed due to calling [close]. | 
|  | * | 
|  | * When the stream is closed, you can neither add new events nor add new | 
|  | * listeners. | 
|  | */ | 
|  | static const int _STATE_CLOSED = 4; | 
|  | /** | 
|  | * The controller is in the middle of an [addStream] operation. | 
|  | * | 
|  | * While adding events from a stream, no new events can be added directly | 
|  | * on the controller. | 
|  | */ | 
|  | static const int _STATE_ADDSTREAM = 8; | 
|  |  | 
|  | /** | 
|  | * Field containing different data depending on the current subscription | 
|  | * state. | 
|  | * | 
|  | * If [_state] is [_STATE_INITIAL], the field may contain a [_PendingEvents] | 
|  | * for events added to the controller before a subscription. | 
|  | * | 
|  | * While [_state] is [_STATE_SUBSCRIBED], the field contains the subscription. | 
|  | * | 
|  | * When [_state] is [_STATE_CANCELED] the field is currently not used. | 
|  | */ | 
|  | var _varData; | 
|  |  | 
|  | /** Current state of the controller. */ | 
|  | int _state = _STATE_INITIAL; | 
|  |  | 
|  | /** | 
|  | * Future completed when the stream sends its last event. | 
|  | * | 
|  | * This is also the future returned by [close]. | 
|  | */ | 
|  | // TODO(lrn): Could this be stored in the varData field too, if it's not | 
|  | // accessed until the call to "close"? Then we need to special case if it's | 
|  | // accessed earlier, or if close is called before subscribing. | 
|  | _Future _doneFuture; | 
|  |  | 
|  | ControllerCallback onListen; | 
|  | ControllerCallback onPause; | 
|  | ControllerCallback onResume; | 
|  | ControllerCancelCallback onCancel; | 
|  |  | 
|  | _StreamController(this.onListen, this.onPause, this.onResume, this.onCancel); | 
|  |  | 
|  | // Return a new stream every time. The streams are equal, but not identical. | 
|  | Stream<T> get stream => new _ControllerStream<T>(this); | 
|  |  | 
|  | /** | 
|  | * Returns a view of this object that only exposes the [StreamSink] interface. | 
|  | */ | 
|  | StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); | 
|  |  | 
|  | /** | 
|  | * Whether a listener has existed and been canceled. | 
|  | * | 
|  | * After this, adding more events will be ignored. | 
|  | */ | 
|  | bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | 
|  |  | 
|  | /** Whether there is an active listener. */ | 
|  | bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0; | 
|  |  | 
|  | /** Whether there has not been a listener yet. */ | 
|  | bool get _isInitialState => | 
|  | (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL; | 
|  |  | 
|  | bool get isClosed => (_state & _STATE_CLOSED) != 0; | 
|  |  | 
|  | bool get isPaused => | 
|  | hasListener ? _subscription._isInputPaused : !_isCanceled; | 
|  |  | 
|  | bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; | 
|  |  | 
|  | /** New events may not be added after close, or during addStream. */ | 
|  | bool get _mayAddEvent => (_state < _STATE_CLOSED); | 
|  |  | 
|  | // Returns the pending events. | 
|  | // Pending events are events added before a subscription exists. | 
|  | // They are added to the subscription when it is created. | 
|  | // Pending events, if any, are kept in the _varData field until the | 
|  | // stream is listened to. | 
|  | // While adding a stream, pending events are moved into the | 
|  | // state object to allow the state object to use the _varData field. | 
|  | _PendingEvents<T> get _pendingEvents { | 
|  | assert(_isInitialState); | 
|  | if (!_isAddingStream) { | 
|  | return _varData; | 
|  | } | 
|  | _StreamControllerAddStreamState<T> state = _varData; | 
|  | return state.varData; | 
|  | } | 
|  |  | 
|  | // Returns the pending events, and creates the object if necessary. | 
|  | _StreamImplEvents<T> _ensurePendingEvents() { | 
|  | assert(_isInitialState); | 
|  | if (!_isAddingStream) { | 
|  | _varData ??= new _StreamImplEvents<T>(); | 
|  | return _varData; | 
|  | } | 
|  | _StreamControllerAddStreamState<T> state = _varData; | 
|  | if (state.varData == null) state.varData = new _StreamImplEvents<T>(); | 
|  | return state.varData; | 
|  | } | 
|  |  | 
|  | // Get the current subscription. | 
|  | // If we are adding a stream, the subscription is moved into the state | 
|  | // object to allow the state object to use the _varData field. | 
|  | _ControllerSubscription<T> get _subscription { | 
|  | assert(hasListener); | 
|  | if (_isAddingStream) { | 
|  | _StreamControllerAddStreamState<T> addState = _varData; | 
|  | return addState.varData; | 
|  | } | 
|  | return _varData; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Creates an error describing why an event cannot be added. | 
|  | * | 
|  | * The reason, and therefore the error message, depends on the current state. | 
|  | */ | 
|  | Error _badEventState() { | 
|  | if (isClosed) { | 
|  | return new StateError("Cannot add event after closing"); | 
|  | } | 
|  | assert(_isAddingStream); | 
|  | return new StateError("Cannot add event while adding a stream"); | 
|  | } | 
|  |  | 
|  | // StreamSink interface. | 
|  | Future addStream(Stream<T> source, {bool cancelOnError}) { | 
|  | if (!_mayAddEvent) throw _badEventState(); | 
|  | if (_isCanceled) return new _Future.immediate(null); | 
|  | _StreamControllerAddStreamState<T> addState = | 
|  | new _StreamControllerAddStreamState<T>( | 
|  | this, _varData, source, cancelOnError ?? false); | 
|  | _varData = addState; | 
|  | _state |= _STATE_ADDSTREAM; | 
|  | return addState.addStreamFuture; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Returns a future that is completed when the stream is done | 
|  | * processing events. | 
|  | * | 
|  | * This happens either when the done event has been sent, or if the | 
|  | * subscriber of a single-subscription stream is cancelled. | 
|  | */ | 
|  | Future get done => _ensureDoneFuture(); | 
|  |  | 
|  | Future _ensureDoneFuture() { | 
|  | _doneFuture ??= _isCanceled ? Future._nullFuture : new _Future(); | 
|  | return _doneFuture; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Send or enqueue a data event. | 
|  | */ | 
|  | void add(T value) { | 
|  | if (!_mayAddEvent) throw _badEventState(); | 
|  | _add(value); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Send or enqueue an error event. | 
|  | */ | 
|  | void addError(Object error, [StackTrace stackTrace]) { | 
|  | if (!_mayAddEvent) throw _badEventState(); | 
|  | error = _nonNullError(error); | 
|  | AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | 
|  | if (replacement != null) { | 
|  | error = _nonNullError(replacement.error); | 
|  | stackTrace = replacement.stackTrace; | 
|  | } | 
|  | _addError(error, stackTrace); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Closes this controller and sends a done event on the stream. | 
|  | * | 
|  | * The first time a controller is closed, a "done" event is added to its | 
|  | * stream. | 
|  | * | 
|  | * You are allowed to close the controller more than once, but only the first | 
|  | * call has any effect. | 
|  | * | 
|  | * After closing, no further events may be added using [add], [addError] | 
|  | * or [addStream]. | 
|  | * | 
|  | * The returned future is completed when the done event has been delivered. | 
|  | */ | 
|  | Future close() { | 
|  | if (isClosed) { | 
|  | return _ensureDoneFuture(); | 
|  | } | 
|  | if (!_mayAddEvent) throw _badEventState(); | 
|  | _closeUnchecked(); | 
|  | return _ensureDoneFuture(); | 
|  | } | 
|  |  | 
|  | void _closeUnchecked() { | 
|  | _state |= _STATE_CLOSED; | 
|  | if (hasListener) { | 
|  | _sendDone(); | 
|  | } else if (_isInitialState) { | 
|  | _ensurePendingEvents().add(const _DelayedDone()); | 
|  | } | 
|  | } | 
|  |  | 
|  | // EventSink interface. Used by the [addStream] events. | 
|  |  | 
|  | // Add data event, used both by the [addStream] events and by [add]. | 
|  | void _add(T value) { | 
|  | if (hasListener) { | 
|  | _sendData(value); | 
|  | } else if (_isInitialState) { | 
|  | _ensurePendingEvents().add(new _DelayedData<T>(value)); | 
|  | } | 
|  | } | 
|  |  | 
|  | void _addError(Object error, StackTrace stackTrace) { | 
|  | if (hasListener) { | 
|  | _sendError(error, stackTrace); | 
|  | } else if (_isInitialState) { | 
|  | _ensurePendingEvents().add(new _DelayedError(error, stackTrace)); | 
|  | } | 
|  | } | 
|  |  | 
|  | void _close() { | 
|  | // End of addStream stream. | 
|  | assert(_isAddingStream); | 
|  | _StreamControllerAddStreamState<T> addState = _varData; | 
|  | _varData = addState.varData; | 
|  | _state &= ~_STATE_ADDSTREAM; | 
|  | addState.complete(); | 
|  | } | 
|  |  | 
|  | // _StreamControllerLifeCycle interface | 
|  |  | 
|  | StreamSubscription<T> _subscribe(void onData(T data), Function onError, | 
|  | void onDone(), bool cancelOnError) { | 
|  | if (!_isInitialState) { | 
|  | throw new StateError("Stream has already been listened to."); | 
|  | } | 
|  | _ControllerSubscription<T> subscription = new _ControllerSubscription<T>( | 
|  | this, onData, onError, onDone, cancelOnError); | 
|  |  | 
|  | _PendingEvents<T> pendingEvents = _pendingEvents; | 
|  | _state |= _STATE_SUBSCRIBED; | 
|  | if (_isAddingStream) { | 
|  | _StreamControllerAddStreamState<T> addState = _varData; | 
|  | addState.varData = subscription; | 
|  | addState.resume(); | 
|  | } else { | 
|  | _varData = subscription; | 
|  | } | 
|  | subscription._setPendingEvents(pendingEvents); | 
|  | subscription._guardCallback(() { | 
|  | _runGuarded(onListen); | 
|  | }); | 
|  |  | 
|  | return subscription; | 
|  | } | 
|  |  | 
|  | Future _recordCancel(StreamSubscription<T> subscription) { | 
|  | // When we cancel, we first cancel any stream being added, | 
|  | // Then we call `onCancel`, and finally the _doneFuture is completed. | 
|  | // If either of addStream's cancel or `onCancel` returns a future, | 
|  | // we wait for it before continuing. | 
|  | // Any error during this process ends up in the returned future. | 
|  | // If more errors happen, we act as if it happens inside nested try/finallys | 
|  | // or whenComplete calls, and only the last error ends up in the | 
|  | // returned future. | 
|  | Future result; | 
|  | if (_isAddingStream) { | 
|  | _StreamControllerAddStreamState<T> addState = _varData; | 
|  | result = addState.cancel(); | 
|  | } | 
|  | _varData = null; | 
|  | _state = | 
|  | (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; | 
|  |  | 
|  | if (onCancel != null) { | 
|  | if (result == null) { | 
|  | // Only introduce a future if one is needed. | 
|  | // If _onCancel returns null, no future is needed. | 
|  | try { | 
|  | result = onCancel(); | 
|  | } catch (e, s) { | 
|  | // Return the error in the returned future. | 
|  | // Complete it asynchronously, so there is time for a listener | 
|  | // to handle the error. | 
|  | result = new _Future().._asyncCompleteError(e, s); | 
|  | } | 
|  | } else { | 
|  | // Simpler case when we already know that we will return a future. | 
|  | result = result.whenComplete(onCancel); | 
|  | } | 
|  | } | 
|  |  | 
|  | void complete() { | 
|  | if (_doneFuture != null && _doneFuture._mayComplete) { | 
|  | _doneFuture._asyncComplete(null); | 
|  | } | 
|  | } | 
|  |  | 
|  | if (result != null) { | 
|  | result = result.whenComplete(complete); | 
|  | } else { | 
|  | complete(); | 
|  | } | 
|  |  | 
|  | return result; | 
|  | } | 
|  |  | 
|  | void _recordPause(StreamSubscription<T> subscription) { | 
|  | if (_isAddingStream) { | 
|  | _StreamControllerAddStreamState<T> addState = _varData; | 
|  | addState.pause(); | 
|  | } | 
|  | _runGuarded(onPause); | 
|  | } | 
|  |  | 
|  | void _recordResume(StreamSubscription<T> subscription) { | 
|  | if (_isAddingStream) { | 
|  | _StreamControllerAddStreamState<T> addState = _varData; | 
|  | addState.resume(); | 
|  | } | 
|  | _runGuarded(onResume); | 
|  | } | 
|  | } | 
|  |  | 
|  | abstract class _SyncStreamControllerDispatch<T> | 
|  | implements _StreamController<T>, SynchronousStreamController<T> { | 
|  | int get _state; | 
|  | void set _state(int state); | 
|  |  | 
|  | void _sendData(T data) { | 
|  | _subscription._add(data); | 
|  | } | 
|  |  | 
|  | void _sendError(Object error, StackTrace stackTrace) { | 
|  | _subscription._addError(error, stackTrace); | 
|  | } | 
|  |  | 
|  | void _sendDone() { | 
|  | _subscription._close(); | 
|  | } | 
|  | } | 
|  |  | 
|  | abstract class _AsyncStreamControllerDispatch<T> | 
|  | implements _StreamController<T> { | 
|  | void _sendData(T data) { | 
|  | _subscription._addPending(new _DelayedData<T>(data)); | 
|  | } | 
|  |  | 
|  | void _sendError(Object error, StackTrace stackTrace) { | 
|  | _subscription._addPending(new _DelayedError(error, stackTrace)); | 
|  | } | 
|  |  | 
|  | void _sendDone() { | 
|  | _subscription._addPending(const _DelayedDone()); | 
|  | } | 
|  | } | 
|  |  | 
|  | // TODO(lrn): Use common superclass for callback-controllers when VM supports | 
|  | // constructors in mixin superclasses. | 
|  |  | 
|  | class _AsyncStreamController<T> = _StreamController<T> | 
|  | with _AsyncStreamControllerDispatch<T>; | 
|  |  | 
|  | class _SyncStreamController<T> = _StreamController<T> | 
|  | with _SyncStreamControllerDispatch<T>; | 
|  |  | 
|  | typedef _NotificationHandler(); | 
|  |  | 
|  | void _runGuarded(_NotificationHandler notificationHandler) { | 
|  | if (notificationHandler == null) return; | 
|  | try { | 
|  | notificationHandler(); | 
|  | } catch (e, s) { | 
|  | Zone.current.handleUncaughtError(e, s); | 
|  | } | 
|  | } | 
|  |  | 
|  | class _ControllerStream<T> extends _StreamImpl<T> { | 
|  | _StreamControllerLifecycle<T> _controller; | 
|  |  | 
|  | _ControllerStream(this._controller); | 
|  |  | 
|  | StreamSubscription<T> _createSubscription(void onData(T data), | 
|  | Function onError, void onDone(), bool cancelOnError) => | 
|  | _controller._subscribe(onData, onError, onDone, cancelOnError); | 
|  |  | 
|  | // Override == and hashCode so that new streams returned by the same | 
|  | // controller are considered equal. The controller returns a new stream | 
|  | // each time it's queried, but doesn't have to cache the result. | 
|  |  | 
|  | int get hashCode => _controller.hashCode ^ 0x35323532; | 
|  |  | 
|  | bool operator ==(Object other) { | 
|  | if (identical(this, other)) return true; | 
|  | return other is _ControllerStream && | 
|  | identical(other._controller, this._controller); | 
|  | } | 
|  | } | 
|  |  | 
|  | class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { | 
|  | final _StreamControllerLifecycle<T> _controller; | 
|  |  | 
|  | _ControllerSubscription(this._controller, void onData(T data), | 
|  | Function onError, void onDone(), bool cancelOnError) | 
|  | : super(onData, onError, onDone, cancelOnError); | 
|  |  | 
|  | Future _onCancel() { | 
|  | return _controller._recordCancel(this); | 
|  | } | 
|  |  | 
|  | void _onPause() { | 
|  | _controller._recordPause(this); | 
|  | } | 
|  |  | 
|  | void _onResume() { | 
|  | _controller._recordResume(this); | 
|  | } | 
|  | } | 
|  |  | 
|  | /** A class that exposes only the [StreamSink] interface of an object. */ | 
|  | class _StreamSinkWrapper<T> implements StreamSink<T> { | 
|  | final StreamController _target; | 
|  | _StreamSinkWrapper(this._target); | 
|  | void add(T data) { | 
|  | _target.add(data); | 
|  | } | 
|  |  | 
|  | void addError(Object error, [StackTrace stackTrace]) { | 
|  | _target.addError(error, stackTrace); | 
|  | } | 
|  |  | 
|  | Future close() => _target.close(); | 
|  |  | 
|  | Future addStream(Stream<T> source) => _target.addStream(source); | 
|  |  | 
|  | Future get done => _target.done; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Object containing the state used to handle [StreamController.addStream]. | 
|  | */ | 
|  | class _AddStreamState<T> { | 
|  | // [_Future] returned by call to addStream. | 
|  | final _Future addStreamFuture; | 
|  |  | 
|  | // Subscription on stream argument to addStream. | 
|  | final StreamSubscription addSubscription; | 
|  |  | 
|  | _AddStreamState( | 
|  | _EventSink<T> controller, Stream<T> source, bool cancelOnError) | 
|  | : addStreamFuture = new _Future(), | 
|  | addSubscription = source.listen(controller._add, | 
|  | onError: cancelOnError | 
|  | ? makeErrorHandler(controller) | 
|  | : controller._addError, | 
|  | onDone: controller._close, | 
|  | cancelOnError: cancelOnError); | 
|  |  | 
|  | static makeErrorHandler(_EventSink controller) => (e, StackTrace s) { | 
|  | controller._addError(e, s); | 
|  | controller._close(); | 
|  | }; | 
|  |  | 
|  | void pause() { | 
|  | addSubscription.pause(); | 
|  | } | 
|  |  | 
|  | void resume() { | 
|  | addSubscription.resume(); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Stop adding the stream. | 
|  | * | 
|  | * Complete the future returned by `StreamController.addStream` when | 
|  | * the cancel is complete. | 
|  | * | 
|  | * Return a future if the cancel takes time, otherwise return `null`. | 
|  | */ | 
|  | Future cancel() { | 
|  | var cancel = addSubscription.cancel(); | 
|  | if (cancel == null) { | 
|  | addStreamFuture._asyncComplete(null); | 
|  | return null; | 
|  | } | 
|  | return cancel.whenComplete(() { | 
|  | addStreamFuture._asyncComplete(null); | 
|  | }); | 
|  | } | 
|  |  | 
|  | void complete() { | 
|  | addStreamFuture._asyncComplete(null); | 
|  | } | 
|  | } | 
|  |  | 
|  | class _StreamControllerAddStreamState<T> extends _AddStreamState<T> { | 
|  | // The subscription or pending data of a _StreamController. | 
|  | // Stored here because we reuse the `_varData` field  in the _StreamController | 
|  | // to store this state object. | 
|  | var varData; | 
|  |  | 
|  | _StreamControllerAddStreamState(_StreamController<T> controller, this.varData, | 
|  | Stream<T> source, bool cancelOnError) | 
|  | : super(controller, source, cancelOnError) { | 
|  | if (controller.isPaused) { | 
|  | addSubscription.pause(); | 
|  | } | 
|  | } | 
|  | } |