| // Copyright (c) 2012, the Dart project authors.  Please see the AUTHORS file | 
 | // for details. All rights reserved. Use of this source code is governed by a | 
 | // BSD-style license that can be found in the LICENSE file. | 
 |  | 
 | part of dart.async; | 
 |  | 
 | // ------------------------------------------------------------------- | 
 | // Controller for creating and adding events to a stream. | 
 | // ------------------------------------------------------------------- | 
 |  | 
 | /** | 
 |  * Type of a stream controller's `onListen`, `onPause` and `onResume` callbacks. | 
 |  */ | 
 | typedef void ControllerCallback(); | 
 |  | 
 | /** | 
 |  * Type of stream controller `onCancel` callbacks. | 
 |  * | 
 |  * The callback may return either `void` or a future. | 
 |  */ | 
 | typedef ControllerCancelCallback(); | 
 |  | 
 | /** | 
 |  * A controller with the stream it controls. | 
 |  * | 
 |  * This controller allows sending data, error and done events on | 
 |  * its [stream]. | 
 |  * This class can be used to create a simple stream that others | 
 |  * can listen on, and to push events to that stream. | 
 |  * | 
 |  * It's possible to check whether the stream is paused or not, and whether | 
 |  * it has subscribers or not, as well as getting a callback when either of | 
 |  * these change. | 
 |  * | 
 |  * If the stream starts or stops having listeners (first listener subscribing, | 
 |  * last listener unsubscribing), the `onSubscriptionStateChange` callback | 
 |  * is notified as soon as possible. If the subscription stat changes during | 
 |  * an event firing or a callback being executed, the change will not be reported | 
 |  * until the current event or callback has finished. | 
 |  * If the pause state has also changed during an event or callback, only the | 
 |  * subscription state callback is notified. | 
 |  * | 
 |  * If the subscriber state has not changed, but the pause state has, the | 
 |  * `onPauseStateChange` callback is notified as soon as possible, after firing | 
 |  * a current event or completing another callback. This happens if the stream | 
 |  * is not paused, and a listener pauses it, or if the stream has been resumed | 
 |  * from pause and has no pending events. If the listeners resume a paused stream | 
 |  * while it still has queued events, the controller will still consider the | 
 |  * stream paused until all queued events have been dispatched. | 
 |  * | 
 |  * Whether to invoke a callback depends only on the state before and after | 
 |  * a stream action, for example firing an event. If the state changes multiple | 
 |  * times during the action, and then ends up in the same state as before, no | 
 |  * callback is performed. | 
 |  * | 
 |  * If listeners are added after the stream has completed (sent a "done" event), | 
 |  * the listeners will be sent a "done" event eventually, but they won't affect | 
 |  * the stream at all, and won't trigger callbacks. From the controller's point | 
 |  * of view, the stream is completely inert when has completed. | 
 |  */ | 
 | abstract class StreamController<T> implements StreamSink<T> { | 
 |   /** The stream that this controller is controlling. */ | 
 |   Stream<T> get stream; | 
 |  | 
 |   /** | 
 |    * A controller with a [stream] that supports only one single subscriber. | 
 |    * | 
 |    * If [sync] is true, the returned stream controller is a | 
 |    * [SynchronousStreamController], and must be used with the care | 
 |    * and attention necessary to not break the [Stream] contract. | 
 |    * See [Completer.sync] for some explanations on when a synchronous | 
 |    * dispatching can be used. | 
 |    * If in doubt, keep the controller non-sync. | 
 |    * | 
 |    * A Stream should be inert until a subscriber starts listening on it (using | 
 |    * the [onListen] callback to start producing events). Streams should not | 
 |    * leak resources (like websockets) when no user ever listens on the stream. | 
 |    * | 
 |    * The controller buffers all incoming events until a subscriber is | 
 |    * registered, but this feature should only be used in rare circumstances. | 
 |    * | 
 |    * The [onPause] function is called when the stream becomes | 
 |    * paused. [onResume] is called when the stream resumed. | 
 |    * | 
 |    * The [onListen] callback is called when the stream | 
 |    * receives its listener and [onCancel] when the listener ends | 
 |    * its subscription. If [onCancel] needs to perform an asynchronous operation, | 
 |    * [onCancel] should return a future that completes when the cancel operation | 
 |    * is done. | 
 |    * | 
 |    * If the stream is canceled before the controller needs new data the | 
 |    * [onResume] call might not be executed. | 
 |    */ | 
 |   factory StreamController({void onListen(), | 
 |                             void onPause(), | 
 |                             void onResume(), | 
 |                             onCancel(), | 
 |                             bool sync: false}) { | 
 |     return sync | 
 |          ? new _SyncStreamController<T>(onListen, onPause, onResume, onCancel) | 
 |          : new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); | 
 |   } | 
 |  | 
 |   /** | 
 |    * A controller where [stream] can be listened to more than once. | 
 |    * | 
 |    * The [Stream] returned by [stream] is a broadcast stream. | 
 |    * It can be listened to more than once. | 
 |    * | 
 |    * A Stream should be inert until a subscriber starts listening on it (using | 
 |    * the [onListen] callback to start producing events). Streams should not | 
 |    * leak resources (like websockets) when no user ever listens on the stream. | 
 |    * | 
 |    * Broadcast streams do not buffer events when there is no listener. | 
 |    * | 
 |    * The controller distributes any events to all currently subscribed | 
 |    * listeners at the time when [add], [addError] or [close] is called. | 
 |    * It is not allowed to call `add`, `addError`, or `close` before a previous | 
 |    * call has returned. The controller does not have any internal queue of | 
 |    * events, and if there are no listeners at the time the event is added, | 
 |    * it will just be dropped, or, if it is an error, be reported as uncaught. | 
 |    * | 
 |    * Each listener subscription is handled independently, | 
 |    * and if one pauses, only the pausing listener is affected. | 
 |    * A paused listener will buffer events internally until unpaused or canceled. | 
 |    * | 
 |    * If [sync] is true, events may be fired directly by the stream's | 
 |    * subscriptions during an [add], [addError] or [close] call. | 
 |    * The returned stream controller is a [SynchronousStreamController], | 
 |    * and must be used with the care and attention necessary to not break | 
 |    * the [Stream] contract. | 
 |    * See [Completer.sync] for some explanations on when a synchronous | 
 |    * dispatching can be used. | 
 |    * If in doubt, keep the controller non-sync. | 
 |    * | 
 |    * If [sync] is false, the event will always be fired at a later time, | 
 |    * after the code adding the event has completed. | 
 |    * In that case, no guarantees are given with regard to when | 
 |    * multiple listeners get the events, except that each listener will get | 
 |    * all events in the correct order. Each subscription handles the events | 
 |    * individually. | 
 |    * If two events are sent on an async controller with two listeners, | 
 |    * one of the listeners may get both events | 
 |    * before the other listener gets any. | 
 |    * A listener must be subscribed both when the event is initiated | 
 |    * (that is, when [add] is called) | 
 |    * and when the event is later delivered, | 
 |    * in order to receive the event. | 
 |    * | 
 |    * The [onListen] callback is called when the first listener is subscribed, | 
 |    * and the [onCancel] is called when there are no longer any active listeners. | 
 |    * If a listener is added again later, after the [onCancel] was called, | 
 |    * the [onListen] will be called again. | 
 |    */ | 
 |   factory StreamController.broadcast({void onListen(), | 
 |                                       void onCancel(), | 
 |                                       bool sync: false}) { | 
 |     return sync | 
 |         ? new _SyncBroadcastStreamController<T>(onListen, onCancel) | 
 |         : new _AsyncBroadcastStreamController<T>(onListen, onCancel); | 
 |   } | 
 |  | 
 |   /** | 
 |    * The callback which is called when the stream is listened to. | 
 |    * | 
 |    * May be set to `null`, in which case no callback will happen. | 
 |    */ | 
 |   ControllerCallback get onListen; | 
 |  | 
 |   void set onListen(void onListenHandler()); | 
 |  | 
 |   /** | 
 |    * The callback which is called when the stream is paused. | 
 |    * | 
 |    * May be set to `null`, in which case no callback will happen. | 
 |    * | 
 |    * Pause related callbacks are not supported on broadcast stream controllers. | 
 |    */ | 
 |   ControllerCallback get onPause; | 
 |  | 
 |   void set onPause(void onPauseHandler()); | 
 |  | 
 |   /** | 
 |    * The callback which is called when the stream is resumed. | 
 |    * | 
 |    * May be set to `null`, in which case no callback will happen. | 
 |    * | 
 |    * Pause related callbacks are not supported on broadcast stream controllers. | 
 |    */ | 
 |   ControllerCallback get onResume; | 
 |  | 
 |   void set onResume(void onResumeHandler()); | 
 |  | 
 |   /** | 
 |    * The callback which is called when the stream is canceled. | 
 |    * | 
 |    * May be set to `null`, in which case no callback will happen. | 
 |    */ | 
 |   ControllerCancelCallback get onCancel; | 
 |  | 
 |   void set onCancel(onCancelHandler()); | 
 |  | 
 |   /** | 
 |    * Returns a view of this object that only exposes the [StreamSink] interface. | 
 |    */ | 
 |   StreamSink<T> get sink; | 
 |  | 
 |   /** | 
 |    * Whether the stream controller is closed for adding more events. | 
 |    * | 
 |    * The controller becomes closed by calling the [close] method. | 
 |    * New events cannot be added, by calling [add] or [addError], | 
 |    * to a closed controller. | 
 |    * | 
 |    * If the controller is closed, | 
 |    * the "done" event might not have been delivered yet, | 
 |    * but it has been scheduled, and it is too late to add more events. | 
 |    */ | 
 |   bool get isClosed; | 
 |  | 
 |   /** | 
 |    * Whether the subscription would need to buffer events. | 
 |    * | 
 |    * This is the case if the controller's stream has a listener and it is | 
 |    * paused, or if it has not received a listener yet. In that case, the | 
 |    * controller is considered paused as well. | 
 |    * | 
 |    * A broadcast stream controller is never considered paused. It always | 
 |    * forwards its events to all uncanceled subscriptions, if any, | 
 |    * and let the subscriptions handle their own pausing and buffering. | 
 |    */ | 
 |   bool get isPaused; | 
 |  | 
 |   /** Whether there is a subscriber on the [Stream]. */ | 
 |   bool get hasListener; | 
 |  | 
 |   /** | 
 |    * Send or enqueue an error event. | 
 |    * | 
 |    * If [error] is `null`, it is replaced by a [NullThrownError]. | 
 |    */ | 
 |   void addError(Object error, [StackTrace stackTrace]); | 
 |  | 
 |   /** | 
 |    * Receives events from [source] and puts them into this controller's stream. | 
 |    * | 
 |    * Returns a future which completes when the source stream is done. | 
 |    * | 
 |    * Events must not be added directly to this controller using [add], | 
 |    * [addError], [close] or [addStream], until the returned future | 
 |    * is complete. | 
 |    * | 
 |    * Data and error events are forwarded to this controller's stream. A done | 
 |    * event on the source will end the `addStream` operation and complete the | 
 |    * returned future. | 
 |    * | 
 |    * If [cancelOnError] is true, only the first error on [source] is | 
 |    * forwarded to the controller's stream, and the `addStream` ends | 
 |    * after this. If [cancelOnError] is false, all errors are forwarded | 
 |    * and only a done event will end the `addStream`. | 
 |    */ | 
 |   Future addStream(Stream<T> source, {bool cancelOnError: true}); | 
 | } | 
 |  | 
 |  | 
 | /** | 
 |  * A stream controller that delivers its events synchronously. | 
 |  * | 
 |  * A synchronous stream controller is intended for cases where | 
 |  * an already asynchronous event triggers an event on a stream. | 
 |  * | 
 |  * Instead of adding the event to the stream in a later microtask, | 
 |  * causing extra latency, the event is instead fired immediately by the | 
 |  * synchronous stream controller, as if the stream event was | 
 |  * the current event or microtask. | 
 |  * | 
 |  * The synchronous stream controller can be used to break the contract | 
 |  * on [Stream], and it must be used carefully to avoid doing so. | 
 |  * | 
 |  * The only advantage to using a [SynchronousStreamController] over a | 
 |  * normal [StreamController] is the improved latency. | 
 |  * Only use the synchronous version if the improvement is significant, | 
 |  * and if its use is safe. Otherwise just use a normal stream controller, | 
 |  * which will always have the correct behavior for a [Stream], and won't | 
 |  * accidentally break other code. | 
 |  * | 
 |  * Adding events to a synchronous controller should only happen as the | 
 |  * very last part of a the handling of the original event. | 
 |  * At that point, adding an event to the stream is equivalent to | 
 |  * returning to the event loop and adding the event in the next microtask. | 
 |  * | 
 |  * Each listener callback will be run as if it was a top-level event | 
 |  * or microtask. This means that if it throws, the error will be reported as | 
 |  * uncaught as soon as possible. | 
 |  * This is one reason to add the event as the last thing in the original event | 
 |  * handler - any action done after adding the event will delay the report of | 
 |  * errors in the event listener callbacks. | 
 |  * | 
 |  * If an event is added in a setting that isn't known to be another event, | 
 |  * it may cause the stream's listener to get that event before the listener | 
 |  * is ready to handle it. We promise that after calling [Stream.listen], | 
 |  * you won't get any events until the code doing the listen has completed. | 
 |  * Calling [add] in response to a function call of unknown origin may break | 
 |  * that promise. | 
 |  * | 
 |  * An [onListen] callback from the controller is *not* an asynchronous event, | 
 |  * and adding events to the controller in the `onListen` callback is always | 
 |  * wrong. The events will be delivered before the listener has even received | 
 |  * the subscription yet. | 
 |  * | 
 |  * The synchronous broadcast stream controller also has a restrictions that a | 
 |  * normal stream controller does not: | 
 |  * The [add], [addError], [close] and [addStream] methods *must not* be | 
 |  * called while an event is being delivered. | 
 |  * That is, if a callback on a subscription on the controller's stream causes | 
 |  * a call to any of the functions above, the call will fail. | 
 |  * A broadcast stream may have more than one listener, and if an | 
 |  * event is added synchronously while another is being also in the process | 
 |  * of being added, the latter event might reach some listeners before | 
 |  * the former. To prevent that, an event cannot be added while a previous | 
 |  * event is being fired. | 
 |  * This guarantees that an event is fully delivered when the | 
 |  * first [add], [addError] or [close] returns, | 
 |  * and further events will be delivered in the correct order. | 
 |  * | 
 |  * This still only guarantees that the event is delivered to the subscription. | 
 |  * If the subscription is paused, the actual callback may still happen later, | 
 |  * and the event will instead be buffered by the subscription. | 
 |  * Barring pausing, and the following buffered events that haven't been | 
 |  * delivered yet, callbacks will be called synchronously when an event is added. | 
 |  * | 
 |  * Adding an event to a synchronous non-broadcast stream controller while | 
 |  * another event is in progress may cause the second event to be delayed | 
 |  * and not be delivered synchronously, and until that event is delivered, | 
 |  * the controller will not act synchronously. | 
 |  */ | 
 | abstract class SynchronousStreamController<T> implements StreamController<T> { | 
 |   /** | 
 |    * Adds event to the controller's stream. | 
 |    * | 
 |    * As [StreamController.add], but must not be called while an event is | 
 |    * being added by [add], [addError] or [close]. | 
 |    */ | 
 |   void add(T data); | 
 |  | 
 |   /** | 
 |    * Adds error to the controller's stream. | 
 |    * | 
 |    * As [StreamController.addError], but must not be called while an event is | 
 |    * being added by [add], [addError] or [close]. | 
 |    */ | 
 |   void addError(Object error, [StackTrace stackTrace]); | 
 |  | 
 |   /** | 
 |    * Closes the controller's stream. | 
 |    * | 
 |    * As [StreamController.close], but must not be called while an event is | 
 |    * being added by [add], [addError] or [close]. | 
 |    */ | 
 |   Future close(); | 
 | } | 
 |  | 
 | abstract class _StreamControllerLifecycle<T> { | 
 |   StreamSubscription<T> _subscribe( | 
 |       void onData(T data), | 
 |       Function onError, | 
 |       void onDone(), | 
 |       bool cancelOnError); | 
 |   void _recordPause(StreamSubscription<T> subscription) {} | 
 |   void _recordResume(StreamSubscription<T> subscription) {} | 
 |   Future _recordCancel(StreamSubscription<T> subscription) => null; | 
 | } | 
 |  | 
 | /** | 
 |  * Default implementation of [StreamController]. | 
 |  * | 
 |  * Controls a stream that only supports a single controller. | 
 |  */ | 
 | abstract class _StreamController<T> implements StreamController<T>, | 
 |                                                _StreamControllerLifecycle<T>, | 
 |                                                _EventSink<T>, | 
 |                                                _EventDispatch<T> { | 
 |   // The states are bit-flags. More than one can be set at a time. | 
 |   // | 
 |   // The "subscription state" goes through the states: | 
 |   //   initial -> subscribed -> canceled. | 
 |   // These are mutually exclusive. | 
 |   // The "closed" state records whether the [close] method has been called | 
 |   // on the controller. This can be done at any time. If done before | 
 |   // subscription, the done event is queued. If done after cancel, the done | 
 |   // event is ignored (just as any other event after a cancel). | 
 |  | 
 |   /** The controller is in its initial state with no subscription. */ | 
 |   static const int _STATE_INITIAL = 0; | 
 |   /** The controller has a subscription, but hasn't been closed or canceled. */ | 
 |   static const int _STATE_SUBSCRIBED = 1; | 
 |   /** The subscription is canceled. */ | 
 |   static const int _STATE_CANCELED = 2; | 
 |   /** Mask for the subscription state. */ | 
 |   static const int _STATE_SUBSCRIPTION_MASK = 3; | 
 |  | 
 |   // The following state relate to the controller, not the subscription. | 
 |   // If closed, adding more events is not allowed. | 
 |   // If executing an [addStream], new events are not allowed either, but will | 
 |   // be added by the stream. | 
 |  | 
 |   /** | 
 |    * The controller is closed due to calling [close]. | 
 |    * | 
 |    * When the stream is closed, you can neither add new events nor add new | 
 |    * listeners. | 
 |    */ | 
 |   static const int _STATE_CLOSED = 4; | 
 |   /** | 
 |    * The controller is in the middle of an [addStream] operation. | 
 |    * | 
 |    * While adding events from a stream, no new events can be added directly | 
 |    * on the controller. | 
 |    */ | 
 |   static const int _STATE_ADDSTREAM = 8; | 
 |  | 
 |   /** | 
 |    * Field containing different data depending on the current subscription | 
 |    * state. | 
 |    * | 
 |    * If [_state] is [_STATE_INITIAL], the field may contain a [_PendingEvents] | 
 |    * for events added to the controller before a subscription. | 
 |    * | 
 |    * While [_state] is [_STATE_SUBSCRIBED], the field contains the subscription. | 
 |    * | 
 |    * When [_state] is [_STATE_CANCELED] the field is currently not used. | 
 |    */ | 
 |   var _varData; | 
 |  | 
 |   /** Current state of the controller. */ | 
 |   int _state = _STATE_INITIAL; | 
 |  | 
 |   /** | 
 |    * Future completed when the stream sends its last event. | 
 |    * | 
 |    * This is also the future returned by [close]. | 
 |    */ | 
 |   // TODO(lrn): Could this be stored in the varData field too, if it's not | 
 |   // accessed until the call to "close"? Then we need to special case if it's | 
 |   // accessed earlier, or if close is called before subscribing. | 
 |   _Future _doneFuture; | 
 |  | 
 |   ControllerCallback onListen; | 
 |   ControllerCallback onPause; | 
 |   ControllerCallback onResume; | 
 |   ControllerCancelCallback onCancel; | 
 |  | 
 |   _StreamController(this.onListen, | 
 |                     this.onPause, | 
 |                     this.onResume, | 
 |                     this.onCancel); | 
 |  | 
 |   // Return a new stream every time. The streams are equal, but not identical. | 
 |   Stream<T> get stream => new _ControllerStream<T>(this); | 
 |  | 
 |   /** | 
 |    * Returns a view of this object that only exposes the [StreamSink] interface. | 
 |    */ | 
 |   StreamSink<T> get sink => new _StreamSinkWrapper<T>(this); | 
 |  | 
 |   /** | 
 |    * Whether a listener has existed and been canceled. | 
 |    * | 
 |    * After this, adding more events will be ignored. | 
 |    */ | 
 |   bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | 
 |  | 
 |   /** Whether there is an active listener. */ | 
 |   bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0; | 
 |  | 
 |   /** Whether there has not been a listener yet. */ | 
 |   bool get _isInitialState => | 
 |       (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL; | 
 |  | 
 |   bool get isClosed => (_state & _STATE_CLOSED) != 0; | 
 |  | 
 |   bool get isPaused => hasListener ? _subscription._isInputPaused | 
 |                                    : !_isCanceled; | 
 |  | 
 |   bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; | 
 |  | 
 |   /** New events may not be added after close, or during addStream. */ | 
 |   bool get _mayAddEvent => (_state < _STATE_CLOSED); | 
 |  | 
 |   // Returns the pending events. | 
 |   // Pending events are events added before a subscription exists. | 
 |   // They are added to the subscription when it is created. | 
 |   // Pending events, if any, are kept in the _varData field until the | 
 |   // stream is listened to. | 
 |   // While adding a stream, pending events are moved into the | 
 |   // state object to allow the state object to use the _varData field. | 
 |   _PendingEvents<T> get _pendingEvents { | 
 |     assert(_isInitialState); | 
 |     if (!_isAddingStream) { | 
 |       return _varData as Object /*=_PendingEvents<T>*/; | 
 |     } | 
 |     _StreamControllerAddStreamState<T> state = | 
 |       _varData as Object /*=_StreamControllerAddStreamState<T>*/; | 
 |     return state.varData as Object /*=_PendingEvents<T>*/; | 
 |   } | 
 |  | 
 |   // Returns the pending events, and creates the object if necessary. | 
 |   _StreamImplEvents<T> _ensurePendingEvents() { | 
 |     assert(_isInitialState); | 
 |     if (!_isAddingStream) { | 
 |       if (_varData == null) _varData = new _StreamImplEvents<T>(); | 
 |       return _varData as Object /*=_StreamImplEvents<T>*/; | 
 |     } | 
 |     _StreamControllerAddStreamState<T> state = | 
 |         _varData as Object /*=_StreamControllerAddStreamState<T>*/; | 
 |     if (state.varData == null) state.varData = new _StreamImplEvents<T>(); | 
 |     return state.varData as Object /*=_StreamImplEvents<T>*/; | 
 |   } | 
 |  | 
 |   // Get the current subscription. | 
 |   // If we are adding a stream, the subscription is moved into the state | 
 |   // object to allow the state object to use the _varData field. | 
 |   _ControllerSubscription<T> get _subscription { | 
 |     assert(hasListener); | 
 |     if (_isAddingStream) { | 
 |       _StreamControllerAddStreamState<T> addState = | 
 |           _varData as Object /*=_StreamControllerAddStreamState<T>*/; | 
 |       return addState.varData as Object /*=_ControllerSubscription<T>*/; | 
 |     } | 
 |     return _varData as Object /*=_ControllerSubscription<T>*/; | 
 |   } | 
 |  | 
 |   /** | 
 |    * Creates an error describing why an event cannot be added. | 
 |    * | 
 |    * The reason, and therefore the error message, depends on the current state. | 
 |    */ | 
 |   Error _badEventState() { | 
 |     if (isClosed) { | 
 |       return new StateError("Cannot add event after closing"); | 
 |     } | 
 |     assert(_isAddingStream); | 
 |     return new StateError("Cannot add event while adding a stream"); | 
 |   } | 
 |  | 
 |   // StreamSink interface. | 
 |   Future addStream(Stream<T> source, {bool cancelOnError: true}) { | 
 |     if (!_mayAddEvent) throw _badEventState(); | 
 |     if (_isCanceled) return new _Future.immediate(null); | 
 |     _StreamControllerAddStreamState<T> addState = | 
 |         new _StreamControllerAddStreamState<T>(this, | 
 |                                                _varData, | 
 |                                                source, | 
 |                                                cancelOnError); | 
 |     _varData = addState; | 
 |     _state |= _STATE_ADDSTREAM; | 
 |     return addState.addStreamFuture; | 
 |   } | 
 |  | 
 |   /** | 
 |    * Returns a future that is completed when the stream is done | 
 |    * processing events. | 
 |    * | 
 |    * This happens either when the done event has been sent, or if the | 
 |    * subscriber of a single-subscription stream is cancelled. | 
 |    */ | 
 |   Future get done => _ensureDoneFuture(); | 
 |  | 
 |   Future _ensureDoneFuture() { | 
 |     if (_doneFuture == null) { | 
 |       _doneFuture = _isCanceled ? Future._nullFuture : new _Future(); | 
 |     } | 
 |     return _doneFuture; | 
 |   } | 
 |  | 
 |   /** | 
 |    * Send or enqueue a data event. | 
 |    */ | 
 |   void add(T value) { | 
 |     if (!_mayAddEvent) throw _badEventState(); | 
 |     _add(value); | 
 |   } | 
 |  | 
 |   /** | 
 |    * Send or enqueue an error event. | 
 |    */ | 
 |   void addError(Object error, [StackTrace stackTrace]) { | 
 |     if (!_mayAddEvent) throw _badEventState(); | 
 |     error = _nonNullError(error); | 
 |     AsyncError replacement = Zone.current.errorCallback(error, stackTrace); | 
 |     if (replacement != null) { | 
 |       error = _nonNullError(replacement.error); | 
 |       stackTrace = replacement.stackTrace; | 
 |     } | 
 |     _addError(error, stackTrace); | 
 |   } | 
 |  | 
 |   /** | 
 |    * Closes this controller and sends a done event on the stream. | 
 |    * | 
 |    * The first time a controller is closed, a "done" event is added to its | 
 |    * stream. | 
 |    * | 
 |    * You are allowed to close the controller more than once, but only the first | 
 |    * call has any effect. | 
 |    * | 
 |    * After closing, no further events may be added using [add], [addError] | 
 |    * or [addStream]. | 
 |    * | 
 |    * The returned future is completed when the done event has been delivered. | 
 |    */ | 
 |   Future close() { | 
 |     if (isClosed) { | 
 |       return _ensureDoneFuture(); | 
 |     } | 
 |     if (!_mayAddEvent) throw _badEventState(); | 
 |     _closeUnchecked(); | 
 |     return _ensureDoneFuture(); | 
 |   } | 
 |  | 
 |   void _closeUnchecked() { | 
 |     _state |= _STATE_CLOSED; | 
 |     if (hasListener) { | 
 |       _sendDone(); | 
 |     } else if (_isInitialState) { | 
 |       _ensurePendingEvents().add(const _DelayedDone()); | 
 |     } | 
 |   } | 
 |  | 
 |   // EventSink interface. Used by the [addStream] events. | 
 |  | 
 |   // Add data event, used both by the [addStream] events and by [add]. | 
 |   void _add(T value) { | 
 |     if (hasListener) { | 
 |       _sendData(value); | 
 |     } else if (_isInitialState) { | 
 |       _ensurePendingEvents().add(new _DelayedData<T>(value)); | 
 |     } | 
 |   } | 
 |  | 
 |   void _addError(Object error, StackTrace stackTrace) { | 
 |     if (hasListener) { | 
 |       _sendError(error, stackTrace); | 
 |     } else if (_isInitialState) { | 
 |       _ensurePendingEvents().add(new _DelayedError(error, stackTrace)); | 
 |     } | 
 |   } | 
 |  | 
 |   void _close() { | 
 |     // End of addStream stream. | 
 |     assert(_isAddingStream); | 
 |     _StreamControllerAddStreamState<T> addState = | 
 |         _varData as Object /*=_StreamControllerAddStreamState<T>*/; | 
 |     _varData = addState.varData; | 
 |     _state &= ~_STATE_ADDSTREAM; | 
 |     addState.complete(); | 
 |   } | 
 |  | 
 |   // _StreamControllerLifeCycle interface | 
 |  | 
 |   StreamSubscription<T> _subscribe( | 
 |       void onData(T data), | 
 |       Function onError, | 
 |       void onDone(), | 
 |       bool cancelOnError) { | 
 |     if (!_isInitialState) { | 
 |       throw new StateError("Stream has already been listened to."); | 
 |     } | 
 |     _ControllerSubscription<T> subscription = | 
 |         new _ControllerSubscription<T>(this, onData, onError, onDone, | 
 |                                        cancelOnError); | 
 |  | 
 |     _PendingEvents<T> pendingEvents = _pendingEvents; | 
 |     _state |= _STATE_SUBSCRIBED; | 
 |     if (_isAddingStream) { | 
 |       _StreamControllerAddStreamState<T> addState = | 
 |           _varData as Object /*=_StreamControllerAddStreamState<T>*/; | 
 |       addState.varData = subscription; | 
 |       addState.resume(); | 
 |     } else { | 
 |       _varData = subscription; | 
 |     } | 
 |     subscription._setPendingEvents(pendingEvents); | 
 |     subscription._guardCallback(() { | 
 |       _runGuarded(onListen); | 
 |     }); | 
 |  | 
 |     return subscription; | 
 |   } | 
 |  | 
 |   Future _recordCancel(StreamSubscription<T> subscription) { | 
 |     // When we cancel, we first cancel any stream being added, | 
 |     // Then we call `onCancel`, and finally the _doneFuture is completed. | 
 |     // If either of addStream's cancel or `onCancel` returns a future, | 
 |     // we wait for it before continuing. | 
 |     // Any error during this process ends up in the returned future. | 
 |     // If more errors happen, we act as if it happens inside nested try/finallys | 
 |     // or whenComplete calls, and only the last error ends up in the | 
 |     // returned future. | 
 |     Future result; | 
 |     if (_isAddingStream) { | 
 |       _StreamControllerAddStreamState<T> addState = | 
 |           _varData as Object /*=_StreamControllerAddStreamState<T>*/; | 
 |       result = addState.cancel(); | 
 |     } | 
 |     _varData = null; | 
 |     _state = | 
 |         (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; | 
 |  | 
 |     if (onCancel != null) { | 
 |       if (result == null) { | 
 |         // Only introduce a future if one is needed. | 
 |         // If _onCancel returns null, no future is needed. | 
 |         try { | 
 |           result = onCancel(); | 
 |         } catch (e, s) { | 
 |           // Return the error in the returned future. | 
 |           // Complete it asynchronously, so there is time for a listener | 
 |           // to handle the error. | 
 |           result = new _Future().._asyncCompleteError(e, s); | 
 |         } | 
 |       } else { | 
 |         // Simpler case when we already know that we will return a future. | 
 |         result = result.whenComplete(onCancel); | 
 |       } | 
 |     } | 
 |  | 
 |     void complete() { | 
 |       if (_doneFuture != null && _doneFuture._mayComplete) { | 
 |         _doneFuture._asyncComplete(null); | 
 |       } | 
 |     } | 
 |  | 
 |     if (result != null) { | 
 |       result = result.whenComplete(complete); | 
 |     } else { | 
 |       complete(); | 
 |     } | 
 |  | 
 |     return result; | 
 |   } | 
 |  | 
 |   void _recordPause(StreamSubscription<T> subscription) { | 
 |     if (_isAddingStream) { | 
 |       _StreamControllerAddStreamState<T> addState = | 
 |           _varData as Object /*=_StreamControllerAddStreamState<T>*/; | 
 |       addState.pause(); | 
 |     } | 
 |     _runGuarded(onPause); | 
 |   } | 
 |  | 
 |   void _recordResume(StreamSubscription<T> subscription) { | 
 |     if (_isAddingStream) { | 
 |       _StreamControllerAddStreamState<T> addState = | 
 |           _varData as Object /*=_StreamControllerAddStreamState<T>*/; | 
 |       addState.resume(); | 
 |     } | 
 |     _runGuarded(onResume); | 
 |   } | 
 | } | 
 |  | 
 | abstract class _SyncStreamControllerDispatch<T> | 
 |     implements _StreamController<T>, SynchronousStreamController<T> { | 
 |   int get _state; | 
 |   void set _state(int state); | 
 |  | 
 |   void _sendData(T data) { | 
 |     _subscription._add(data); | 
 |   } | 
 |  | 
 |   void _sendError(Object error, StackTrace stackTrace) { | 
 |     _subscription._addError(error, stackTrace); | 
 |   } | 
 |  | 
 |   void _sendDone() { | 
 |     _subscription._close(); | 
 |   } | 
 | } | 
 |  | 
 | abstract class _AsyncStreamControllerDispatch<T> | 
 |     implements _StreamController<T> { | 
 |   void _sendData(T data) { | 
 |     _subscription._addPending(new _DelayedData<dynamic /*=T*/>(data)); | 
 |   } | 
 |  | 
 |   void _sendError(Object error, StackTrace stackTrace) { | 
 |     _subscription._addPending(new _DelayedError(error, stackTrace)); | 
 |   } | 
 |  | 
 |   void _sendDone() { | 
 |     _subscription._addPending(const _DelayedDone()); | 
 |   } | 
 | } | 
 |  | 
 | // TODO(lrn): Use common superclass for callback-controllers when VM supports | 
 | // constructors in mixin superclasses. | 
 |  | 
 | class _AsyncStreamController<T> = _StreamController<T> | 
 |                                   with _AsyncStreamControllerDispatch<T>; | 
 |  | 
 | class _SyncStreamController<T> = _StreamController<T> | 
 |                                  with _SyncStreamControllerDispatch<T>; | 
 |  | 
 | typedef _NotificationHandler(); | 
 |  | 
 | Future _runGuarded(_NotificationHandler notificationHandler) { | 
 |   if (notificationHandler == null) return null; | 
 |   try { | 
 |     var result = notificationHandler(); | 
 |     if (result is Future) return result; | 
 |     return null; | 
 |   } catch (e, s) { | 
 |     Zone.current.handleUncaughtError(e, s); | 
 |   } | 
 | } | 
 |  | 
 | class _ControllerStream<T> extends _StreamImpl<T> { | 
 |   _StreamControllerLifecycle<T> _controller; | 
 |  | 
 |   _ControllerStream(this._controller); | 
 |  | 
 |   StreamSubscription<T> _createSubscription( | 
 |       void onData(T data), | 
 |       Function onError, | 
 |       void onDone(), | 
 |       bool cancelOnError) => | 
 |     _controller._subscribe(onData, onError, onDone, cancelOnError); | 
 |  | 
 |   // Override == and hashCode so that new streams returned by the same | 
 |   // controller are considered equal. The controller returns a new stream | 
 |   // each time it's queried, but doesn't have to cache the result. | 
 |  | 
 |   int get hashCode => _controller.hashCode ^ 0x35323532; | 
 |  | 
 |   bool operator==(Object other) { | 
 |     if (identical(this, other)) return true; | 
 |     if (other is! _ControllerStream) return false; | 
 |     _ControllerStream otherStream = other; | 
 |     return identical(otherStream._controller, this._controller); | 
 |   } | 
 | } | 
 |  | 
 | class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { | 
 |   final _StreamControllerLifecycle<T> _controller; | 
 |  | 
 |   _ControllerSubscription(this._controller, void onData(T data), | 
 |                           Function onError, void onDone(), bool cancelOnError) | 
 |       : super(onData, onError, onDone, cancelOnError); | 
 |  | 
 |   Future _onCancel() { | 
 |     return _controller._recordCancel(this); | 
 |   } | 
 |  | 
 |   void _onPause() { | 
 |     _controller._recordPause(this); | 
 |   } | 
 |  | 
 |   void _onResume() { | 
 |     _controller._recordResume(this); | 
 |   } | 
 | } | 
 |  | 
 |  | 
 | /** A class that exposes only the [StreamSink] interface of an object. */ | 
 | class _StreamSinkWrapper<T> implements StreamSink<T> { | 
 |   final StreamController _target; | 
 |   _StreamSinkWrapper(this._target); | 
 |   void add(T data) { _target.add(data); } | 
 |   void addError(Object error, [StackTrace stackTrace]) { | 
 |     _target.addError(error, stackTrace); | 
 |   } | 
 |   Future close() => _target.close(); | 
 |   Future addStream(Stream<T> source, {bool cancelOnError: true}) => | 
 |       _target.addStream(source, cancelOnError: cancelOnError); | 
 |   Future get done => _target.done; | 
 | } | 
 |  | 
 | /** | 
 |  * Object containing the state used to handle [StreamController.addStream]. | 
 |  */ | 
 | class _AddStreamState<T> { | 
 |   // [_Future] returned by call to addStream. | 
 |   final _Future addStreamFuture; | 
 |  | 
 |   // Subscription on stream argument to addStream. | 
 |   final StreamSubscription addSubscription; | 
 |  | 
 |   _AddStreamState(_EventSink<T> controller, Stream source, bool cancelOnError) | 
 |       : addStreamFuture = new _Future(), | 
 |         addSubscription = source.listen(controller._add, | 
 |                                         onError: cancelOnError | 
 |                                              ? makeErrorHandler(controller) | 
 |                                              : controller._addError, | 
 |                                         onDone: controller._close, | 
 |                                         cancelOnError: cancelOnError); | 
 |  | 
 |   static makeErrorHandler(_EventSink controller) => | 
 |       (e, StackTrace s) { | 
 |         controller._addError(e, s); | 
 |         controller._close(); | 
 |       }; | 
 |  | 
 |   void pause() { | 
 |     addSubscription.pause(); | 
 |   } | 
 |  | 
 |   void resume() { | 
 |     addSubscription.resume(); | 
 |   } | 
 |  | 
 |   /** | 
 |    * Stop adding the stream. | 
 |    * | 
 |    * Complete the future returned by `StreamController.addStream` when | 
 |    * the cancel is complete. | 
 |    * | 
 |    * Return a future if the cancel takes time, otherwise return `null`. | 
 |    */ | 
 |   Future cancel() { | 
 |     var cancel = addSubscription.cancel(); | 
 |     if (cancel == null) { | 
 |       addStreamFuture._asyncComplete(null); | 
 |       return null; | 
 |     } | 
 |     return cancel.whenComplete(() { addStreamFuture._asyncComplete(null); }); | 
 |   } | 
 |  | 
 |   void complete() { | 
 |     addStreamFuture._asyncComplete(null); | 
 |   } | 
 | } | 
 |  | 
 | class _StreamControllerAddStreamState<T> extends _AddStreamState<T> { | 
 |   // The subscription or pending data of a _StreamController. | 
 |   // Stored here because we reuse the `_varData` field  in the _StreamController | 
 |   // to store this state object. | 
 |   var varData; | 
 |  | 
 |   _StreamControllerAddStreamState(_StreamController<T> controller, | 
 |                                   this.varData, | 
 |                                   Stream source, | 
 |                                   bool cancelOnError) | 
 |       : super(controller, source, cancelOnError) { | 
 |     if (controller.isPaused) { | 
 |       addSubscription.pause(); | 
 |     } | 
 |   } | 
 | } |