|  | // Copyright (c) 2012, the Dart project authors.  Please see the AUTHORS file | 
|  | // for details. All rights reserved. Use of this source code is governed by a | 
|  | // BSD-style license that can be found in the LICENSE file. | 
|  |  | 
|  | part of dart.async; | 
|  |  | 
|  | /** Abstract and private interface for a place to put events. */ | 
|  | abstract class _EventSink<T> { | 
|  | void _add(T data); | 
|  | void _addError(Object error); | 
|  | void _close(); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Abstract and private interface for a place to send events. | 
|  | * | 
|  | * Used by event buffering to finally dispatch the pending event, where | 
|  | * [_EventSink] is where the event first enters the stream subscription, | 
|  | * and may yet be buffered. | 
|  | */ | 
|  | abstract class _EventDispatch<T> { | 
|  | void _sendData(T data); | 
|  | void _sendError(Object error); | 
|  | void _sendDone(); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Default implementation of stream subscription of buffering events. | 
|  | * | 
|  | * The only public methods are those of [StreamSubscription], so instances of | 
|  | * [_BufferingStreamSubscription] can be returned directly as a | 
|  | * [StreamSubscription] without exposing internal functionality. | 
|  | * | 
|  | * The [StreamController] is a public facing version of [Stream] and this class, | 
|  | * with some methods made public. | 
|  | * | 
|  | * The user interface of [_BufferingStreamSubscription] are the following | 
|  | * methods: | 
|  | * * [_add]: Add a data event to the stream. | 
|  | * * [_addError]: Add an error event to the stream. | 
|  | * * [_close]: Request to close the stream. | 
|  | * * [_onCancel]: Called when the subscription will provide no more events, | 
|  | *     either due to being actively canceled, or after sending a done event. | 
|  | * * [_onPause]: Called when the subscription wants the event source to pause. | 
|  | * * [_onResume]: Called when allowing new events after a pause. | 
|  | * The user should not add new events when the subscription requests a paused, | 
|  | * but if it happens anyway, the subscription will enqueue the events just as | 
|  | * when new events arrive while still firing an old event. | 
|  | */ | 
|  | class _BufferingStreamSubscription<T> implements StreamSubscription<T>, | 
|  | _EventSink<T>, | 
|  | _EventDispatch<T> { | 
|  | /** The `cancelOnError` flag from the `listen` call. */ | 
|  | static const int _STATE_CANCEL_ON_ERROR = 1; | 
|  | /** | 
|  | * Whether the "done" event has been received. | 
|  | * No further events are accepted after this. | 
|  | */ | 
|  | static const int _STATE_CLOSED = 2; | 
|  | /** | 
|  | * Set if the input has been asked not to send events. | 
|  | * | 
|  | * This is not the same as being paused, since the input will remain paused | 
|  | * after a call to [resume] if there are pending events. | 
|  | */ | 
|  | static const int _STATE_INPUT_PAUSED = 4; | 
|  | /** | 
|  | * Whether the subscription has been canceled. | 
|  | * | 
|  | * Set by calling [cancel], or by handling a "done" event, or an "error" event | 
|  | * when `cancelOnError` is true. | 
|  | */ | 
|  | static const int _STATE_CANCELED = 8; | 
|  | static const int _STATE_IN_CALLBACK = 16; | 
|  | static const int _STATE_HAS_PENDING = 32; | 
|  | static const int _STATE_PAUSE_COUNT = 64; | 
|  | static const int _STATE_PAUSE_COUNT_SHIFT = 6; | 
|  |  | 
|  | /* Event handlers provided in constructor. */ | 
|  | _DataHandler<T> _onData; | 
|  | _ErrorHandler _onError; | 
|  | _DoneHandler _onDone; | 
|  | final _Zone _zone = _Zone.current; | 
|  |  | 
|  | /** Bit vector based on state-constants above. */ | 
|  | int _state; | 
|  |  | 
|  | /** | 
|  | * Queue of pending events. | 
|  | * | 
|  | * Is created when necessary, or set in constructor for preconfigured events. | 
|  | */ | 
|  | _PendingEvents _pending; | 
|  |  | 
|  | _BufferingStreamSubscription(this._onData, | 
|  | this._onError, | 
|  | this._onDone, | 
|  | bool cancelOnError) | 
|  | : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { | 
|  | assert(_onData != null); | 
|  | assert(_onError != null); | 
|  | assert(_onDone != null); | 
|  | _zone.expectCallback(); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Sets the subscription's pending events object. | 
|  | * | 
|  | * This can only be done once. The pending events object is used for the | 
|  | * rest of the subscription's life cycle. | 
|  | */ | 
|  | void _setPendingEvents(_PendingEvents pendingEvents) { | 
|  | assert(_pending == null); | 
|  | if (pendingEvents == null) return; | 
|  | _pending = pendingEvents; | 
|  | if (!pendingEvents.isEmpty) { | 
|  | _state |= _STATE_HAS_PENDING; | 
|  | _pending.schedule(this); | 
|  | } | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Extracts the pending events from a canceled stream. | 
|  | * | 
|  | * This can only be done during the [_onCancel] method call. After that, | 
|  | * any remaining pending events will be cleared. | 
|  | */ | 
|  | _PendingEvents _extractPending() { | 
|  | assert(_isCanceled); | 
|  | _PendingEvents events = _pending; | 
|  | _pending = null; | 
|  | return events; | 
|  | } | 
|  |  | 
|  | // StreamSubscription interface. | 
|  |  | 
|  | void onData(void handleData(T event)) { | 
|  | if (handleData == null) handleData = _nullDataHandler; | 
|  | _onData = handleData; | 
|  | } | 
|  |  | 
|  | void onError(void handleError(error)) { | 
|  | if (handleError == null) handleError = _nullErrorHandler; | 
|  | _onError = handleError; | 
|  | } | 
|  |  | 
|  | void onDone(void handleDone()) { | 
|  | if (handleDone == null) handleDone = _nullDoneHandler; | 
|  | _onDone = handleDone; | 
|  | } | 
|  |  | 
|  | void pause([Future resumeSignal]) { | 
|  | if (_isCanceled) return; | 
|  | bool wasPaused = _isPaused; | 
|  | bool wasInputPaused = _isInputPaused; | 
|  | // Increment pause count and mark input paused (if it isn't already). | 
|  | _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | 
|  | if (resumeSignal != null) resumeSignal.whenComplete(resume); | 
|  | if (!wasPaused && _pending != null) _pending.cancelSchedule(); | 
|  | if (!wasInputPaused && !_inCallback) _guardCallback(_onPause); | 
|  | } | 
|  |  | 
|  | void resume() { | 
|  | if (_isCanceled) return; | 
|  | if (_isPaused) { | 
|  | _decrementPauseCount(); | 
|  | if (!_isPaused) { | 
|  | if (_hasPending && !_pending.isEmpty) { | 
|  | // Input is still paused. | 
|  | _pending.schedule(this); | 
|  | } else { | 
|  | assert(_mayResumeInput); | 
|  | _state &= ~_STATE_INPUT_PAUSED; | 
|  | if (!_inCallback) _guardCallback(_onResume); | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | void cancel() { | 
|  | if (_isCanceled) return; | 
|  | _cancel(); | 
|  | if (!_inCallback) { | 
|  | // otherwise checkState will be called after firing or callback completes. | 
|  | _state |= _STATE_IN_CALLBACK; | 
|  | _onCancel(); | 
|  | _pending = null; | 
|  | _state &= ~_STATE_IN_CALLBACK; | 
|  | } | 
|  | } | 
|  |  | 
|  | Future asFuture([var futureValue]) { | 
|  | _FutureImpl<T> result = new _FutureImpl<T>(); | 
|  |  | 
|  | // Overwrite the onDone and onError handlers. | 
|  | _onDone = () { result._setValue(futureValue); }; | 
|  | _onError = (error) { | 
|  | cancel(); | 
|  | result._setError(error); | 
|  | }; | 
|  |  | 
|  | return result; | 
|  | } | 
|  |  | 
|  | // State management. | 
|  |  | 
|  | bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; | 
|  | bool get _isClosed => (_state & _STATE_CLOSED) != 0; | 
|  | bool get _isCanceled => (_state & _STATE_CANCELED) != 0; | 
|  | bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; | 
|  | bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; | 
|  | bool get _isPaused => _state >= _STATE_PAUSE_COUNT; | 
|  | bool get _canFire => _state < _STATE_IN_CALLBACK; | 
|  | bool get _mayResumeInput => | 
|  | !_isPaused && (_pending == null || _pending.isEmpty); | 
|  | bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; | 
|  |  | 
|  | bool get isPaused => _isPaused; | 
|  |  | 
|  | void _cancel() { | 
|  | _state |= _STATE_CANCELED; | 
|  | _zone.cancelCallbackExpectation(); | 
|  | if (_hasPending) { | 
|  | _pending.cancelSchedule(); | 
|  | } | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Increment the pause count. | 
|  | * | 
|  | * Also marks input as paused. | 
|  | */ | 
|  | void _incrementPauseCount() { | 
|  | _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Decrements the pause count. | 
|  | * | 
|  | * Does not automatically unpause the input (call [_onResume]) when | 
|  | * the pause count reaches zero. This is handled elsewhere, and only | 
|  | * if there are no pending events buffered. | 
|  | */ | 
|  | void _decrementPauseCount() { | 
|  | assert(_isPaused); | 
|  | _state -= _STATE_PAUSE_COUNT; | 
|  | } | 
|  |  | 
|  | // _EventSink interface. | 
|  |  | 
|  | void _add(T data) { | 
|  | assert(!_isClosed); | 
|  | if (_isCanceled) return; | 
|  | if (_canFire) { | 
|  | _sendData(data); | 
|  | } else { | 
|  | _addPending(new _DelayedData(data)); | 
|  | } | 
|  | } | 
|  |  | 
|  | void _addError(Object error) { | 
|  | if (_isCanceled) return; | 
|  | if (_canFire) { | 
|  | _sendError(error);  // Reports cancel after sending. | 
|  | } else { | 
|  | _addPending(new _DelayedError(error)); | 
|  | } | 
|  | } | 
|  |  | 
|  | void _close() { | 
|  | assert(!_isClosed); | 
|  | if (_isCanceled) return; | 
|  | _state |= _STATE_CLOSED; | 
|  | if (_canFire) { | 
|  | _sendDone(); | 
|  | } else { | 
|  | _addPending(const _DelayedDone()); | 
|  | } | 
|  | } | 
|  |  | 
|  | // Hooks called when the input is paused, unpaused or canceled. | 
|  | // These must not throw. If overwritten to call user code, include suitable | 
|  | // try/catch wrapping and send any errors to | 
|  | // [_Zone.current.handleUncaughtError]. | 
|  | void _onPause() { | 
|  | assert(_isInputPaused); | 
|  | } | 
|  |  | 
|  | void _onResume() { | 
|  | assert(!_isInputPaused); | 
|  | } | 
|  |  | 
|  | void _onCancel() { | 
|  | assert(_isCanceled); | 
|  | } | 
|  |  | 
|  | // Handle pending events. | 
|  |  | 
|  | /** | 
|  | * Add a pending event. | 
|  | * | 
|  | * If the subscription is not paused, this also schedules a firing | 
|  | * of pending events later (if necessary). | 
|  | */ | 
|  | void _addPending(_DelayedEvent event) { | 
|  | _StreamImplEvents pending = _pending; | 
|  | if (_pending == null) pending = _pending = new _StreamImplEvents(); | 
|  | pending.add(event); | 
|  | if (!_hasPending) { | 
|  | _state |= _STATE_HAS_PENDING; | 
|  | if (!_isPaused) { | 
|  | _pending.schedule(this); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /* _EventDispatch interface. */ | 
|  |  | 
|  | void _sendData(T data) { | 
|  | assert(!_isCanceled); | 
|  | assert(!_isPaused); | 
|  | assert(!_inCallback); | 
|  | bool wasInputPaused = _isInputPaused; | 
|  | _state |= _STATE_IN_CALLBACK; | 
|  | _zone.executePeriodicCallbackGuarded(() => _onData(data)); | 
|  | _state &= ~_STATE_IN_CALLBACK; | 
|  | _checkState(wasInputPaused); | 
|  | } | 
|  |  | 
|  | void _sendError(var error) { | 
|  | assert(!_isCanceled); | 
|  | assert(!_isPaused); | 
|  | assert(!_inCallback); | 
|  | bool wasInputPaused = _isInputPaused; | 
|  | _state |= _STATE_IN_CALLBACK; | 
|  | if (!_zone.inSameErrorZone(_Zone.current)) { | 
|  | // Errors are not allowed to traverse zone boundaries. | 
|  | _Zone.current.handleUncaughtError(error); | 
|  | } else { | 
|  | _zone.executePeriodicCallbackGuarded(() => _onError(error)); | 
|  | } | 
|  | _state &= ~_STATE_IN_CALLBACK; | 
|  | if (_cancelOnError) { | 
|  | _cancel(); | 
|  | } | 
|  | _checkState(wasInputPaused); | 
|  | } | 
|  |  | 
|  | void _sendDone() { | 
|  | assert(!_isCanceled); | 
|  | assert(!_isPaused); | 
|  | assert(!_inCallback); | 
|  | _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); | 
|  | _zone.executeCallbackGuarded(_onDone); | 
|  | _onCancel();  // No checkState after cancel, it is always the last event. | 
|  | _state &= ~_STATE_IN_CALLBACK; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Call a hook function. | 
|  | * | 
|  | * The call is properly wrapped in code to avoid other callbacks | 
|  | * during the call, and it checks for state changes after the call | 
|  | * that should cause further callbacks. | 
|  | */ | 
|  | void _guardCallback(callback) { | 
|  | assert(!_inCallback); | 
|  | bool wasInputPaused = _isInputPaused; | 
|  | _state |= _STATE_IN_CALLBACK; | 
|  | callback(); | 
|  | _state &= ~_STATE_IN_CALLBACK; | 
|  | _checkState(wasInputPaused); | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Check if the input needs to be informed of state changes. | 
|  | * | 
|  | * State changes are pausing, resuming and canceling. | 
|  | * | 
|  | * After canceling, no further callbacks will happen. | 
|  | * | 
|  | * The cancel callback is called after a user cancel, or after | 
|  | * the final done event is sent. | 
|  | */ | 
|  | void _checkState(bool wasInputPaused) { | 
|  | assert(!_inCallback); | 
|  | if (_hasPending && _pending.isEmpty) { | 
|  | _state &= ~_STATE_HAS_PENDING; | 
|  | if (_isInputPaused && _mayResumeInput) { | 
|  | _state &= ~_STATE_INPUT_PAUSED; | 
|  | } | 
|  | } | 
|  | // If the state changes during a callback, we immediately | 
|  | // make a new state-change callback. Loop until the state didn't change. | 
|  | while (true) { | 
|  | if (_isCanceled) { | 
|  | _onCancel(); | 
|  | _pending = null; | 
|  | return; | 
|  | } | 
|  | bool isInputPaused = _isInputPaused; | 
|  | if (wasInputPaused == isInputPaused) break; | 
|  | _state ^= _STATE_IN_CALLBACK; | 
|  | if (isInputPaused) { | 
|  | _onPause(); | 
|  | } else { | 
|  | _onResume(); | 
|  | } | 
|  | _state &= ~_STATE_IN_CALLBACK; | 
|  | wasInputPaused = isInputPaused; | 
|  | } | 
|  | if (_hasPending && !_isPaused) { | 
|  | _pending.schedule(this); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | // ------------------------------------------------------------------- | 
|  | // Common base class for single and multi-subscription streams. | 
|  | // ------------------------------------------------------------------- | 
|  | abstract class _StreamImpl<T> extends Stream<T> { | 
|  | // ------------------------------------------------------------------ | 
|  | // Stream interface. | 
|  |  | 
|  | StreamSubscription<T> listen(void onData(T data), | 
|  | { void onError(error), | 
|  | void onDone(), | 
|  | bool cancelOnError }) { | 
|  | if (onData == null) onData = _nullDataHandler; | 
|  | if (onError == null) onError = _nullErrorHandler; | 
|  | if (onDone == null) onDone = _nullDoneHandler; | 
|  | cancelOnError = identical(true, cancelOnError); | 
|  | StreamSubscription subscription = | 
|  | _createSubscription(onData, onError, onDone, cancelOnError); | 
|  | _onListen(subscription); | 
|  | return subscription; | 
|  | } | 
|  |  | 
|  | // ------------------------------------------------------------------- | 
|  | /** Create a subscription object. Called by [subcribe]. */ | 
|  | _BufferingStreamSubscription<T> _createSubscription( | 
|  | void onData(T data), | 
|  | void onError(error), | 
|  | void onDone(), | 
|  | bool cancelOnError) { | 
|  | return new _BufferingStreamSubscription<T>( | 
|  | onData, onError, onDone, cancelOnError); | 
|  | } | 
|  |  | 
|  | /** Hook called when the subscription has been created. */ | 
|  | void _onListen(StreamSubscription subscription) {} | 
|  | } | 
|  |  | 
|  | typedef _PendingEvents _EventGenerator(); | 
|  |  | 
|  | /** Stream that generates its own events. */ | 
|  | class _GeneratedStreamImpl<T> extends _StreamImpl<T> { | 
|  | final _EventGenerator _pending; | 
|  | /** | 
|  | * Initializes the stream to have only the events provided by a | 
|  | * [_PendingEvents]. | 
|  | * | 
|  | * A new [_PendingEvents] must be generated for each listen. | 
|  | */ | 
|  | _GeneratedStreamImpl(this._pending); | 
|  |  | 
|  | StreamSubscription _createSubscription(void onData(T data), | 
|  | void onError(Object error), | 
|  | void onDone(), | 
|  | bool cancelOnError) { | 
|  | _BufferingStreamSubscription<T> subscription = | 
|  | new _BufferingStreamSubscription( | 
|  | onData, onError, onDone, cancelOnError); | 
|  | subscription._setPendingEvents(_pending()); | 
|  | return subscription; | 
|  | } | 
|  | } | 
|  |  | 
|  |  | 
|  | /** Pending events object that gets its events from an [Iterable]. */ | 
|  | class _IterablePendingEvents<T> extends _PendingEvents { | 
|  | // The iterator providing data for data events. | 
|  | // Set to null when iteration has completed. | 
|  | Iterator<T> _iterator; | 
|  |  | 
|  | _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; | 
|  |  | 
|  | bool get isEmpty => _iterator == null; | 
|  |  | 
|  | void handleNext(_EventDispatch dispatch) { | 
|  | if (_iterator == null) { | 
|  | throw new StateError("No events pending."); | 
|  | } | 
|  | // Send one event per call to moveNext. | 
|  | // If moveNext returns true, send the current element as data. | 
|  | // If moveNext returns false, send a done event and clear the _iterator. | 
|  | // If moveNext throws an error, send an error and clear the _iterator. | 
|  | // After an error, no further events will be sent. | 
|  | bool isDone; | 
|  | try { | 
|  | isDone = !_iterator.moveNext(); | 
|  | } catch (e, s) { | 
|  | _iterator = null; | 
|  | dispatch._sendError(_asyncError(e, s)); | 
|  | return; | 
|  | } | 
|  | if (!isDone) { | 
|  | dispatch._sendData(_iterator.current); | 
|  | } else { | 
|  | _iterator = null; | 
|  | dispatch._sendDone(); | 
|  | } | 
|  | } | 
|  |  | 
|  | void clear() { | 
|  | if (isScheduled) cancelSchedule(); | 
|  | _iterator = null; | 
|  | } | 
|  | } | 
|  |  | 
|  |  | 
|  | // Internal helpers. | 
|  |  | 
|  | // Types of the different handlers on a stream. Types used to type fields. | 
|  | typedef void _DataHandler<T>(T value); | 
|  | typedef void _ErrorHandler(error); | 
|  | typedef void _DoneHandler(); | 
|  |  | 
|  |  | 
|  | /** Default data handler, does nothing. */ | 
|  | void _nullDataHandler(var value) {} | 
|  |  | 
|  | /** Default error handler, reports the error to the global handler. */ | 
|  | void _nullErrorHandler(error) { | 
|  | _Zone.current.handleUncaughtError(error); | 
|  | } | 
|  |  | 
|  | /** Default done handler, does nothing. */ | 
|  | void _nullDoneHandler() {} | 
|  |  | 
|  |  | 
|  | /** A delayed event on a buffering stream subscription. */ | 
|  | abstract class _DelayedEvent { | 
|  | /** Added as a linked list on the [StreamController]. */ | 
|  | _DelayedEvent next; | 
|  | /** Execute the delayed event on the [StreamController]. */ | 
|  | void perform(_EventDispatch dispatch); | 
|  | } | 
|  |  | 
|  | /** A delayed data event. */ | 
|  | class _DelayedData<T> extends _DelayedEvent { | 
|  | final T value; | 
|  | _DelayedData(this.value); | 
|  | void perform(_EventDispatch<T> dispatch) { | 
|  | dispatch._sendData(value); | 
|  | } | 
|  | } | 
|  |  | 
|  | /** A delayed error event. */ | 
|  | class _DelayedError extends _DelayedEvent { | 
|  | final error; | 
|  | _DelayedError(this.error); | 
|  | void perform(_EventDispatch dispatch) { | 
|  | dispatch._sendError(error); | 
|  | } | 
|  | } | 
|  |  | 
|  | /** A delayed done event. */ | 
|  | class _DelayedDone implements _DelayedEvent { | 
|  | const _DelayedDone(); | 
|  | void perform(_EventDispatch dispatch) { | 
|  | dispatch._sendDone(); | 
|  | } | 
|  |  | 
|  | _DelayedEvent get next => null; | 
|  |  | 
|  | void set next(_DelayedEvent _) { | 
|  | throw new StateError("No events after a done."); | 
|  | } | 
|  | } | 
|  |  | 
|  | /** Superclass for provider of pending events. */ | 
|  | abstract class _PendingEvents { | 
|  | // No async event has been scheduled. | 
|  | static const int _STATE_UNSCHEDULED = 0; | 
|  | // An async event has been scheduled to run a function. | 
|  | static const int _STATE_SCHEDULED = 1; | 
|  | // An async event has been scheduled, but it will do nothing when it runs. | 
|  | // Async events can't be preempted. | 
|  | static const int _STATE_CANCELED = 3; | 
|  |  | 
|  | /** | 
|  | * State of being scheduled. | 
|  | * | 
|  | * Set to [_STATE_SCHEDULED] when pending events are scheduled for | 
|  | * async dispatch. Since we can't cancel a [runAsync] call, if schduling | 
|  | * is "canceled", the _state is simply set to [_STATE_CANCELED] which will | 
|  | * make the async code do nothing except resetting [_state]. | 
|  | * | 
|  | * If events are scheduled while the state is [_STATE_CANCELED], it is | 
|  | * merely switched back to [_STATE_SCHEDULED], but no new call to [runAsync] | 
|  | * is performed. | 
|  | */ | 
|  | int _state = _STATE_UNSCHEDULED; | 
|  |  | 
|  | bool get isEmpty; | 
|  |  | 
|  | bool get isScheduled => _state == _STATE_SCHEDULED; | 
|  | bool get _eventScheduled => _state >= _STATE_SCHEDULED; | 
|  |  | 
|  | /** | 
|  | * Schedule an event to run later. | 
|  | * | 
|  | * If called more than once, it should be called with the same dispatch as | 
|  | * argument each time. It may reuse an earlier argument in some cases. | 
|  | */ | 
|  | void schedule(_EventDispatch dispatch) { | 
|  | if (isScheduled) return; | 
|  | assert(!isEmpty); | 
|  | if (_eventScheduled) { | 
|  | assert(_state == _STATE_CANCELED); | 
|  | _state = _STATE_SCHEDULED; | 
|  | return; | 
|  | } | 
|  | runAsync(() { | 
|  | int oldState = _state; | 
|  | _state = _STATE_UNSCHEDULED; | 
|  | if (oldState == _STATE_CANCELED) return; | 
|  | handleNext(dispatch); | 
|  | }); | 
|  | _state = _STATE_SCHEDULED; | 
|  | } | 
|  |  | 
|  | void cancelSchedule() { | 
|  | if (isScheduled) _state = _STATE_CANCELED; | 
|  | } | 
|  |  | 
|  | void handleNext(_EventDispatch dispatch); | 
|  |  | 
|  | /** Throw away any pending events and cancel scheduled events. */ | 
|  | void clear(); | 
|  | } | 
|  |  | 
|  |  | 
|  | /** Class holding pending events for a [_StreamImpl]. */ | 
|  | class _StreamImplEvents extends _PendingEvents { | 
|  | /// Single linked list of [_DelayedEvent] objects. | 
|  | _DelayedEvent firstPendingEvent = null; | 
|  | /// Last element in the list of pending events. New events are added after it. | 
|  | _DelayedEvent lastPendingEvent = null; | 
|  |  | 
|  | bool get isEmpty => lastPendingEvent == null; | 
|  |  | 
|  | void add(_DelayedEvent event) { | 
|  | if (lastPendingEvent == null) { | 
|  | firstPendingEvent = lastPendingEvent = event; | 
|  | } else { | 
|  | lastPendingEvent = lastPendingEvent.next = event; | 
|  | } | 
|  | } | 
|  |  | 
|  | void handleNext(_EventDispatch dispatch) { | 
|  | assert(!isScheduled); | 
|  | _DelayedEvent event = firstPendingEvent; | 
|  | firstPendingEvent = event.next; | 
|  | if (firstPendingEvent == null) { | 
|  | lastPendingEvent = null; | 
|  | } | 
|  | event.perform(dispatch); | 
|  | } | 
|  |  | 
|  | void clear() { | 
|  | if (isScheduled) cancelSchedule(); | 
|  | firstPendingEvent = lastPendingEvent = null; | 
|  | } | 
|  | } | 
|  |  | 
|  | class _BroadcastLinkedList { | 
|  | _BroadcastLinkedList _next; | 
|  | _BroadcastLinkedList _previous; | 
|  |  | 
|  | void _unlink() { | 
|  | _previous._next = _next; | 
|  | _next._previous = _previous; | 
|  | _next = _previous = this; | 
|  | } | 
|  |  | 
|  | void _insertBefore(_BroadcastLinkedList newNext) { | 
|  | _BroadcastLinkedList newPrevious = newNext._previous; | 
|  | newPrevious._next = this; | 
|  | newNext._previous = _previous; | 
|  | _previous._next = newNext; | 
|  | _previous = newPrevious; | 
|  | } | 
|  | } | 
|  |  | 
|  | typedef void _broadcastCallback(StreamSubscription subscription); | 
|  |  | 
|  | /** | 
|  | * Dummy subscription that will never receive any events. | 
|  | */ | 
|  | class _DummyStreamSubscription<T> implements StreamSubscription<T> { | 
|  | int _pauseCounter = 0; | 
|  |  | 
|  | void onData(void handleData(T data)) {} | 
|  | void onError(void handleError(Object data)) {} | 
|  | void onDone(void handleDone()) {} | 
|  |  | 
|  | void pause([Future resumeSignal]) { | 
|  | _pauseCounter++; | 
|  | if (resumeSignal != null) resumeSignal.then((_) { resume(); }); | 
|  | } | 
|  | void resume() { | 
|  | if (_pauseCounter > 0) _pauseCounter--; | 
|  | } | 
|  | void cancel() {} | 
|  | bool get isPaused => _pauseCounter > 0; | 
|  |  | 
|  | Future asFuture([futureValue]) => new _FutureImpl(); | 
|  | } | 
|  |  | 
|  | class _AsBroadcastStream<T> extends Stream<T> { | 
|  | final Stream<T> _source; | 
|  | final _broadcastCallback _onListenHandler; | 
|  | final _broadcastCallback _onCancelHandler; | 
|  | final _Zone _zone; | 
|  |  | 
|  | _AsBroadcastStreamController<T> _controller; | 
|  | StreamSubscription<T> _subscription; | 
|  |  | 
|  | _AsBroadcastStream(this._source, | 
|  | this._onListenHandler, | 
|  | this._onCancelHandler) | 
|  | : _zone = _Zone.current { | 
|  | _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel); | 
|  | // Keep zone alive until we are done doing callbacks. | 
|  | _zone.expectCallback(); | 
|  | } | 
|  |  | 
|  | bool get isBroadcast => true; | 
|  |  | 
|  | StreamSubscription<T> listen(void onData(T data), | 
|  | { void onError(Object error), | 
|  | void onDone(), | 
|  | bool cancelOnError}) { | 
|  | if (_controller == null) { | 
|  | // Return a dummy subscription backed by nothing, since | 
|  | // it won't ever receive any events. | 
|  | return new _DummyStreamSubscription<T>(); | 
|  | } | 
|  | if (_subscription == null) { | 
|  | _subscription = _source.listen(_controller.add, | 
|  | onError: _controller.addError, | 
|  | onDone: _controller.close); | 
|  | } | 
|  | if (onData == null) onData = _nullDataHandler; | 
|  | if (onError == null) onError = _nullErrorHandler; | 
|  | if (onDone == null) onDone = _nullDoneHandler; | 
|  | cancelOnError = identical(true, cancelOnError); | 
|  | return _controller._subscribe(onData, onError, onDone, cancelOnError); | 
|  | } | 
|  |  | 
|  | void _onCancel() { | 
|  | bool shutdown = (_controller == null) || _controller.isClosed; | 
|  | if (_onCancelHandler != null) { | 
|  | _zone.executePeriodicCallbackGuarded( | 
|  | () => _onCancelHandler(new _BroadcastSubscriptionWrapper(this))); | 
|  | } | 
|  | if (shutdown) { | 
|  | if (_subscription != null) { | 
|  | _subscription.cancel(); | 
|  | _subscription = null; | 
|  | } | 
|  | _zone.cancelCallbackExpectation(); | 
|  | } | 
|  | } | 
|  |  | 
|  | void _onListen() { | 
|  | if (_onListenHandler != null) { | 
|  | _zone.executePeriodicCallbackGuarded( | 
|  | () => _onListenHandler(new _BroadcastSubscriptionWrapper(this))); | 
|  | } | 
|  | } | 
|  |  | 
|  | // Methods called from _BroadcastSubscriptionWrapper. | 
|  | void _cancelSubscription() { | 
|  | if (_subscription == null) return; | 
|  | // Called by [_controller] when it has no subscribers left. | 
|  | StreamSubscription subscription = _subscription; | 
|  | _subscription = null; | 
|  | if (_controller._isEmpty) { | 
|  | _zone.cancelCallbackExpectation(); | 
|  | } | 
|  | _controller = null;  // Marks the stream as no longer listenable. | 
|  | subscription.cancel(); | 
|  | } | 
|  |  | 
|  | void _pauseSubscription(Future resumeSignal) { | 
|  | if (_subscription == null) return; | 
|  | _subscription.pause(resumeSignal); | 
|  | } | 
|  |  | 
|  | void _resumeSubscription() { | 
|  | if (_subscription == null) return; | 
|  | _subscription.resume(); | 
|  | } | 
|  |  | 
|  | bool get _isSubscriptionPaused { | 
|  | if (_subscription == null) return false; | 
|  | return _subscription.isPaused; | 
|  | } | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Wrapper for subscription that disallows changing handlers. | 
|  | */ | 
|  | class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> { | 
|  | final _AsBroadcastStream _stream; | 
|  |  | 
|  | _BroadcastSubscriptionWrapper(this._stream); | 
|  |  | 
|  | void onData(void handleData(T data)) { | 
|  | throw new UnsupportedError( | 
|  | "Cannot change handlers of asBroadcastStream source subscription."); | 
|  | } | 
|  |  | 
|  | void onError(void handleError(Object data)) { | 
|  | throw new UnsupportedError( | 
|  | "Cannot change handlers of asBroadcastStream source subscription."); | 
|  | } | 
|  |  | 
|  | void onDone(void handleDone()) { | 
|  | throw new UnsupportedError( | 
|  | "Cannot change handlers of asBroadcastStream source subscription."); | 
|  | } | 
|  |  | 
|  | void pause([Future resumeSignal]) { | 
|  | _stream._pauseSubscription(resumeSignal); | 
|  | } | 
|  |  | 
|  | void resume() { | 
|  | _stream._resumeSubscription(); | 
|  | } | 
|  |  | 
|  | void cancel() { | 
|  | _stream._cancelSubscription(); | 
|  | } | 
|  |  | 
|  | bool get isPaused { | 
|  | return _stream._isSubscriptionPaused; | 
|  | } | 
|  |  | 
|  | Future asFuture([var futureValue]) { | 
|  | throw new UnsupportedError( | 
|  | "Cannot change handlers of asBroadcastStream source subscription."); | 
|  | } | 
|  | } | 
|  |  | 
|  |  | 
|  | /** | 
|  | * Simple implementation of [StreamIterator]. | 
|  | */ | 
|  | class _StreamIteratorImpl<T> implements StreamIterator<T> { | 
|  | // Internal state of the stream iterator. | 
|  | // At any time, it is in one of these states. | 
|  | // The interpretation of the [_futureOrPrefecth] field depends on the state. | 
|  | // In _STATE_MOVING, the _data field holds the most recently returned | 
|  | // future. | 
|  | // When in one of the _STATE_EXTRA_* states, the it may hold the | 
|  | // next data/error object, and the subscription is paused. | 
|  |  | 
|  | /// The simple state where [_data] holds the data to return, and [moveNext] | 
|  | /// is allowed. The subscription is actively listening. | 
|  | static const int _STATE_FOUND = 0; | 
|  | /// State set after [moveNext] has returned false or an error, | 
|  | /// or after calling [cancel]. The subscription is always canceled. | 
|  | static const int _STATE_DONE = 1; | 
|  | /// State set after calling [moveNext], but before its returned future has | 
|  | /// completed. Calling [moveNext] again is not allowed in this state. | 
|  | /// The subscription is actively listening. | 
|  | static const int _STATE_MOVING = 2; | 
|  | /// States set when another event occurs while in _STATE_FOUND. | 
|  | /// This extra overflow event is cached until the next call to [moveNext], | 
|  | /// which will complete as if it received the event normally. | 
|  | /// The subscription is paused in these states, so we only ever get one | 
|  | /// event too many. | 
|  | static const int _STATE_EXTRA_DATA = 3; | 
|  | static const int _STATE_EXTRA_ERROR = 4; | 
|  | static const int _STATE_EXTRA_DONE = 5; | 
|  |  | 
|  | /// Subscription being listened to. | 
|  | StreamSubscription _subscription; | 
|  |  | 
|  | /// The current element represented by the most recent call to moveNext. | 
|  | /// | 
|  | /// Is null between the time moveNext is called and its future completes. | 
|  | T _current = null; | 
|  |  | 
|  | /// The future returned by the most recent call to [moveNext]. | 
|  | /// | 
|  | /// Also used to store the next value/error in case the stream provides an | 
|  | /// event before [moveNext] is called again. In that case, the stream will | 
|  | /// be paused to prevent further events. | 
|  | var _futureOrPrefetch = null; | 
|  |  | 
|  | /// The current state. | 
|  | int _state = _STATE_FOUND; | 
|  |  | 
|  | _StreamIteratorImpl(final Stream<T> stream) { | 
|  | _subscription = stream.listen(_onData, | 
|  | onError: _onError, | 
|  | onDone: _onDone, | 
|  | cancelOnError: true); | 
|  | } | 
|  |  | 
|  | T get current => _current; | 
|  |  | 
|  | Future<bool> moveNext() { | 
|  | if (_state == _STATE_DONE) { | 
|  | return new _FutureImpl<bool>.immediate(false); | 
|  | } | 
|  | if (_state == _STATE_MOVING) { | 
|  | throw new StateError("Already waiting for next."); | 
|  | } | 
|  | if (_state == _STATE_FOUND) { | 
|  | _state = _STATE_MOVING; | 
|  | _futureOrPrefetch = new _FutureImpl<bool>(); | 
|  | return _futureOrPrefetch; | 
|  | } else { | 
|  | assert(_state >= _STATE_EXTRA_DATA); | 
|  | switch (_state) { | 
|  | case _STATE_EXTRA_DATA: | 
|  | _state = _STATE_FOUND; | 
|  | _current = _futureOrPrefetch; | 
|  | _futureOrPrefetch = null; | 
|  | _subscription.resume(); | 
|  | return new _FutureImpl<bool>.immediate(true); | 
|  | case _STATE_EXTRA_ERROR: | 
|  | Object prefetch = _futureOrPrefetch; | 
|  | _clear(); | 
|  | return new _FutureImpl<bool>.immediateError(prefetch); | 
|  | case _STATE_EXTRA_DONE: | 
|  | _clear(); | 
|  | return new _FutureImpl<bool>.immediate(false); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /** Clears up the internal state when the iterator ends. */ | 
|  | void _clear() { | 
|  | _subscription = null; | 
|  | _futureOrPrefetch = null; | 
|  | _current = null; | 
|  | _state = _STATE_DONE; | 
|  | } | 
|  |  | 
|  | void cancel() { | 
|  | StreamSubscription subscription = _subscription; | 
|  | if (_state == _STATE_MOVING) { | 
|  | _FutureImpl<bool> hasNext = _futureOrPrefetch; | 
|  | _clear(); | 
|  | hasNext._setValue(false); | 
|  | } else { | 
|  | _clear(); | 
|  | } | 
|  | subscription.cancel(); | 
|  | } | 
|  |  | 
|  | void _onData(T data) { | 
|  | if (_state == _STATE_MOVING) { | 
|  | _current = data; | 
|  | _FutureImpl<bool> hasNext = _futureOrPrefetch; | 
|  | _futureOrPrefetch = null; | 
|  | _state = _STATE_FOUND; | 
|  | hasNext._setValue(true); | 
|  | return; | 
|  | } | 
|  | _subscription.pause(); | 
|  | assert(_futureOrPrefetch == null); | 
|  | _futureOrPrefetch = data; | 
|  | _state = _STATE_EXTRA_DATA; | 
|  | } | 
|  |  | 
|  | void _onError(Object error) { | 
|  | if (_state == _STATE_MOVING) { | 
|  | _FutureImpl<bool> hasNext = _futureOrPrefetch; | 
|  | // We have cancelOnError: true, so the subscription is canceled. | 
|  | _clear(); | 
|  | hasNext._setError(error); | 
|  | return; | 
|  | } | 
|  | _subscription.pause(); | 
|  | assert(_futureOrPrefetch == null); | 
|  | _futureOrPrefetch = error; | 
|  | _state = _STATE_EXTRA_ERROR; | 
|  | } | 
|  |  | 
|  | void _onDone() { | 
|  | if (_state == _STATE_MOVING) { | 
|  | _FutureImpl<bool> hasNext = _futureOrPrefetch; | 
|  | _clear(); | 
|  | hasNext._setValue(false); | 
|  | return; | 
|  | } | 
|  | _subscription.pause(); | 
|  | _futureOrPrefetch = null; | 
|  | _state = _STATE_EXTRA_DONE; | 
|  | } | 
|  | } |