| // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| part of dart.async; |
| |
| /** Abstract and private interface for a place to put events. */ |
| abstract class _EventSink<T> { |
| void _add(T data); |
| void _addError(Object error); |
| void _close(); |
| } |
| |
| /** |
| * Abstract and private interface for a place to send events. |
| * |
| * Used by event buffering to finally dispatch the pending event, where |
| * [_EventSink] is where the event first enters the stream subscription, |
| * and may yet be buffered. |
| */ |
| abstract class _EventDispatch<T> { |
| void _sendData(T data); |
| void _sendError(Object error); |
| void _sendDone(); |
| } |
| |
| /** |
| * Default implementation of stream subscription of buffering events. |
| * |
| * The only public methods are those of [StreamSubscription], so instances of |
| * [_BufferingStreamSubscription] can be returned directly as a |
| * [StreamSubscription] without exposing internal functionality. |
| * |
| * The [StreamController] is a public facing version of [Stream] and this class, |
| * with some methods made public. |
| * |
| * The user interface of [_BufferingStreamSubscription] are the following |
| * methods: |
| * * [_add]: Add a data event to the stream. |
| * * [_addError]: Add an error event to the stream. |
| * * [_close]: Request to close the stream. |
| * * [_onCancel]: Called when the subscription will provide no more events, |
| * either due to being actively canceled, or after sending a done event. |
| * * [_onPause]: Called when the subscription wants the event source to pause. |
| * * [_onResume]: Called when allowing new events after a pause. |
| * The user should not add new events when the subscription requests a paused, |
| * but if it happens anyway, the subscription will enqueue the events just as |
| * when new events arrive while still firing an old event. |
| */ |
| class _BufferingStreamSubscription<T> implements StreamSubscription<T>, |
| _EventSink<T>, |
| _EventDispatch<T> { |
| /** The `cancelOnError` flag from the `listen` call. */ |
| static const int _STATE_CANCEL_ON_ERROR = 1; |
| /** |
| * Whether the "done" event has been received. |
| * No further events are accepted after this. |
| */ |
| static const int _STATE_CLOSED = 2; |
| /** |
| * Set if the input has been asked not to send events. |
| * |
| * This is not the same as being paused, since the input will remain paused |
| * after a call to [resume] if there are pending events. |
| */ |
| static const int _STATE_INPUT_PAUSED = 4; |
| /** |
| * Whether the subscription has been canceled. |
| * |
| * Set by calling [cancel], or by handling a "done" event, or an "error" event |
| * when `cancelOnError` is true. |
| */ |
| static const int _STATE_CANCELED = 8; |
| static const int _STATE_IN_CALLBACK = 16; |
| static const int _STATE_HAS_PENDING = 32; |
| static const int _STATE_PAUSE_COUNT = 64; |
| static const int _STATE_PAUSE_COUNT_SHIFT = 6; |
| |
| /* Event handlers provided in constructor. */ |
| /* TODO(7733): Fix Function->_DataHandler<T> when dart2js understands |
| * parameterized function types. */ |
| Function _onData; |
| _ErrorHandler _onError; |
| _DoneHandler _onDone; |
| final _Zone _zone = _Zone.current; |
| |
| /** Bit vector based on state-constants above. */ |
| int _state; |
| |
| /** |
| * Queue of pending events. |
| * |
| * Is created when necessary, or set in constructor for preconfigured events. |
| */ |
| _PendingEvents _pending; |
| |
| _BufferingStreamSubscription(this._onData, |
| this._onError, |
| this._onDone, |
| bool cancelOnError) |
| : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0) { |
| assert(_onData != null); |
| assert(_onError != null); |
| assert(_onDone != null); |
| _zone.expectCallback(); |
| } |
| |
| /** |
| * Sets the subscription's pending events object. |
| * |
| * This can only be done once. The pending events object is used for the |
| * rest of the subscription's life cycle. |
| */ |
| void _setPendingEvents(_PendingEvents pendingEvents) { |
| assert(_pending == null); |
| if (pendingEvents == null) return; |
| _pending = pendingEvents; |
| if (!pendingEvents.isEmpty) { |
| _state |= _STATE_HAS_PENDING; |
| _pending.schedule(this); |
| } |
| } |
| |
| /** |
| * Extracts the pending events from a canceled stream. |
| * |
| * This can only be done during the [_onCancel] method call. After that, |
| * any remaining pending events will be cleared. |
| */ |
| _PendingEvents _extractPending() { |
| assert(_isCanceled); |
| _PendingEvents events = _pending; |
| _pending = null; |
| return events; |
| } |
| |
| // StreamSubscription interface. |
| |
| void onData(void handleData(T event)) { |
| if (handleData == null) handleData = _nullDataHandler; |
| _onData = handleData; |
| } |
| |
| void onError(void handleError(error)) { |
| if (handleError == null) handleError = _nullErrorHandler; |
| _onError = handleError; |
| } |
| |
| void onDone(void handleDone()) { |
| if (handleDone == null) handleDone = _nullDoneHandler; |
| _onDone = handleDone; |
| } |
| |
| void pause([Future resumeSignal]) { |
| if (_isCanceled) return; |
| bool wasPaused = _isPaused; |
| bool wasInputPaused = _isInputPaused; |
| // Increment pause count and mark input paused (if it isn't already). |
| _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
| if (resumeSignal != null) resumeSignal.whenComplete(resume); |
| if (!wasPaused && _pending != null) _pending.cancelSchedule(); |
| if (!wasInputPaused && !_inCallback) _guardCallback(_onPause); |
| } |
| |
| void resume() { |
| if (_isCanceled) return; |
| if (_isPaused) { |
| _decrementPauseCount(); |
| if (!_isPaused) { |
| if (_hasPending && !_pending.isEmpty) { |
| // Input is still paused. |
| _pending.schedule(this); |
| } else { |
| assert(_mayResumeInput); |
| _state &= ~_STATE_INPUT_PAUSED; |
| if (!_inCallback) _guardCallback(_onResume); |
| } |
| } |
| } |
| } |
| |
| void cancel() { |
| if (_isCanceled) return; |
| _cancel(); |
| if (!_inCallback) { |
| // otherwise checkState will be called after firing or callback completes. |
| _state |= _STATE_IN_CALLBACK; |
| _onCancel(); |
| _pending = null; |
| _state &= ~_STATE_IN_CALLBACK; |
| } |
| } |
| |
| Future asFuture([var futureValue]) { |
| _FutureImpl<T> result = new _FutureImpl<T>(); |
| |
| // Overwrite the onDone and onError handlers. |
| _onDone = () { result._setValue(futureValue); }; |
| _onError = (error) { |
| cancel(); |
| result._setError(error); |
| }; |
| |
| return result; |
| } |
| |
| // State management. |
| |
| bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0; |
| bool get _isClosed => (_state & _STATE_CLOSED) != 0; |
| bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
| bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0; |
| bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0; |
| bool get _isPaused => _state >= _STATE_PAUSE_COUNT; |
| bool get _canFire => _state < _STATE_IN_CALLBACK; |
| bool get _mayResumeInput => |
| !_isPaused && (_pending == null || _pending.isEmpty); |
| bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0; |
| |
| bool get isPaused => _isPaused; |
| |
| void _cancel() { |
| _state |= _STATE_CANCELED; |
| _zone.cancelCallbackExpectation(); |
| if (_hasPending) { |
| _pending.cancelSchedule(); |
| } |
| } |
| |
| /** |
| * Increment the pause count. |
| * |
| * Also marks input as paused. |
| */ |
| void _incrementPauseCount() { |
| _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED; |
| } |
| |
| /** |
| * Decrements the pause count. |
| * |
| * Does not automatically unpause the input (call [_onResume]) when |
| * the pause count reaches zero. This is handled elsewhere, and only |
| * if there are no pending events buffered. |
| */ |
| void _decrementPauseCount() { |
| assert(_isPaused); |
| _state -= _STATE_PAUSE_COUNT; |
| } |
| |
| // _EventSink interface. |
| |
| void _add(T data) { |
| assert(!_isClosed); |
| if (_isCanceled) return; |
| if (_canFire) { |
| _sendData(data); |
| } else { |
| _addPending(new _DelayedData(data)); |
| } |
| } |
| |
| void _addError(Object error) { |
| if (_isCanceled) return; |
| if (_canFire) { |
| _sendError(error); // Reports cancel after sending. |
| } else { |
| _addPending(new _DelayedError(error)); |
| } |
| } |
| |
| void _close() { |
| assert(!_isClosed); |
| if (_isCanceled) return; |
| _state |= _STATE_CLOSED; |
| if (_canFire) { |
| _sendDone(); |
| } else { |
| _addPending(const _DelayedDone()); |
| } |
| } |
| |
| // Hooks called when the input is paused, unpaused or canceled. |
| // These must not throw. If overwritten to call user code, include suitable |
| // try/catch wrapping and send any errors to |
| // [_Zone.current.handleUncaughtError]. |
| void _onPause() { |
| assert(_isInputPaused); |
| } |
| |
| void _onResume() { |
| assert(!_isInputPaused); |
| } |
| |
| void _onCancel() { |
| assert(_isCanceled); |
| } |
| |
| // Handle pending events. |
| |
| /** |
| * Add a pending event. |
| * |
| * If the subscription is not paused, this also schedules a firing |
| * of pending events later (if necessary). |
| */ |
| void _addPending(_DelayedEvent event) { |
| _StreamImplEvents pending = _pending; |
| if (_pending == null) pending = _pending = new _StreamImplEvents(); |
| pending.add(event); |
| if (!_hasPending) { |
| _state |= _STATE_HAS_PENDING; |
| if (!_isPaused) { |
| _pending.schedule(this); |
| } |
| } |
| } |
| |
| /* _EventDispatch interface. */ |
| |
| void _sendData(T data) { |
| assert(!_isCanceled); |
| assert(!_isPaused); |
| assert(!_inCallback); |
| bool wasInputPaused = _isInputPaused; |
| _state |= _STATE_IN_CALLBACK; |
| _zone.executePeriodicCallbackGuarded(() => _onData(data)); |
| _state &= ~_STATE_IN_CALLBACK; |
| _checkState(wasInputPaused); |
| } |
| |
| void _sendError(var error) { |
| assert(!_isCanceled); |
| assert(!_isPaused); |
| assert(!_inCallback); |
| bool wasInputPaused = _isInputPaused; |
| _state |= _STATE_IN_CALLBACK; |
| if (!_zone.inSameErrorZone(_Zone.current)) { |
| // Errors are not allowed to traverse zone boundaries. |
| _Zone.current.handleUncaughtError(error); |
| } else { |
| _zone.executePeriodicCallbackGuarded(() => _onError(error)); |
| } |
| _state &= ~_STATE_IN_CALLBACK; |
| if (_cancelOnError) { |
| _cancel(); |
| } |
| _checkState(wasInputPaused); |
| } |
| |
| void _sendDone() { |
| assert(!_isCanceled); |
| assert(!_isPaused); |
| assert(!_inCallback); |
| _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK); |
| _zone.executeCallbackGuarded(_onDone); |
| _onCancel(); // No checkState after cancel, it is always the last event. |
| _state &= ~_STATE_IN_CALLBACK; |
| } |
| |
| /** |
| * Call a hook function. |
| * |
| * The call is properly wrapped in code to avoid other callbacks |
| * during the call, and it checks for state changes after the call |
| * that should cause further callbacks. |
| */ |
| void _guardCallback(callback) { |
| assert(!_inCallback); |
| bool wasInputPaused = _isInputPaused; |
| _state |= _STATE_IN_CALLBACK; |
| callback(); |
| _state &= ~_STATE_IN_CALLBACK; |
| _checkState(wasInputPaused); |
| } |
| |
| /** |
| * Check if the input needs to be informed of state changes. |
| * |
| * State changes are pausing, resuming and canceling. |
| * |
| * After canceling, no further callbacks will happen. |
| * |
| * The cancel callback is called after a user cancel, or after |
| * the final done event is sent. |
| */ |
| void _checkState(bool wasInputPaused) { |
| assert(!_inCallback); |
| if (_hasPending && _pending.isEmpty) { |
| _state &= ~_STATE_HAS_PENDING; |
| if (_isInputPaused && _mayResumeInput) { |
| _state &= ~_STATE_INPUT_PAUSED; |
| } |
| } |
| // If the state changes during a callback, we immediately |
| // make a new state-change callback. Loop until the state didn't change. |
| while (true) { |
| if (_isCanceled) { |
| _onCancel(); |
| _pending = null; |
| return; |
| } |
| bool isInputPaused = _isInputPaused; |
| if (wasInputPaused == isInputPaused) break; |
| _state ^= _STATE_IN_CALLBACK; |
| if (isInputPaused) { |
| _onPause(); |
| } else { |
| _onResume(); |
| } |
| _state &= ~_STATE_IN_CALLBACK; |
| wasInputPaused = isInputPaused; |
| } |
| if (_hasPending && !_isPaused) { |
| _pending.schedule(this); |
| } |
| } |
| } |
| |
| // ------------------------------------------------------------------- |
| // Common base class for single and multi-subscription streams. |
| // ------------------------------------------------------------------- |
| abstract class _StreamImpl<T> extends Stream<T> { |
| // ------------------------------------------------------------------ |
| // Stream interface. |
| |
| StreamSubscription<T> listen(void onData(T data), |
| { void onError(error), |
| void onDone(), |
| bool cancelOnError }) { |
| if (onData == null) onData = _nullDataHandler; |
| if (onError == null) onError = _nullErrorHandler; |
| if (onDone == null) onDone = _nullDoneHandler; |
| cancelOnError = identical(true, cancelOnError); |
| StreamSubscription subscription = |
| _createSubscription(onData, onError, onDone, cancelOnError); |
| _onListen(subscription); |
| return subscription; |
| } |
| |
| // ------------------------------------------------------------------- |
| /** Create a subscription object. Called by [subcribe]. */ |
| _BufferingStreamSubscription<T> _createSubscription( |
| void onData(T data), |
| void onError(error), |
| void onDone(), |
| bool cancelOnError) { |
| return new _BufferingStreamSubscription<T>( |
| onData, onError, onDone, cancelOnError); |
| } |
| |
| /** Hook called when the subscription has been created. */ |
| void _onListen(StreamSubscription subscription) {} |
| } |
| |
| typedef _PendingEvents _EventGenerator(); |
| |
| /** Stream that generates its own events. */ |
| class _GeneratedStreamImpl<T> extends _StreamImpl<T> { |
| final _EventGenerator _pending; |
| /** |
| * Initializes the stream to have only the events provided by a |
| * [_PendingEvents]. |
| * |
| * A new [_PendingEvents] must be generated for each listen. |
| */ |
| _GeneratedStreamImpl(this._pending); |
| |
| StreamSubscription _createSubscription(void onData(T data), |
| void onError(Object error), |
| void onDone(), |
| bool cancelOnError) { |
| _BufferingStreamSubscription<T> subscription = |
| new _BufferingStreamSubscription( |
| onData, onError, onDone, cancelOnError); |
| subscription._setPendingEvents(_pending()); |
| return subscription; |
| } |
| } |
| |
| |
| /** Pending events object that gets its events from an [Iterable]. */ |
| class _IterablePendingEvents<T> extends _PendingEvents { |
| // The iterator providing data for data events. |
| // Set to null when iteration has completed. |
| Iterator<T> _iterator; |
| |
| _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator; |
| |
| bool get isEmpty => _iterator == null; |
| |
| void handleNext(_EventDispatch dispatch) { |
| if (_iterator == null) { |
| throw new StateError("No events pending."); |
| } |
| // Send one event per call to moveNext. |
| // If moveNext returns true, send the current element as data. |
| // If moveNext returns false, send a done event and clear the _iterator. |
| // If moveNext throws an error, send an error and clear the _iterator. |
| // After an error, no further events will be sent. |
| bool isDone; |
| try { |
| isDone = !_iterator.moveNext(); |
| } catch (e, s) { |
| _iterator = null; |
| dispatch._sendError(_asyncError(e, s)); |
| return; |
| } |
| if (!isDone) { |
| dispatch._sendData(_iterator.current); |
| } else { |
| _iterator = null; |
| dispatch._sendDone(); |
| } |
| } |
| |
| void clear() { |
| if (isScheduled) cancelSchedule(); |
| _iterator = null; |
| } |
| } |
| |
| |
| // Internal helpers. |
| |
| // Types of the different handlers on a stream. Types used to type fields. |
| typedef void _DataHandler<T>(T value); |
| typedef void _ErrorHandler(error); |
| typedef void _DoneHandler(); |
| |
| |
| /** Default data handler, does nothing. */ |
| void _nullDataHandler(var value) {} |
| |
| /** Default error handler, reports the error to the global handler. */ |
| void _nullErrorHandler(error) { |
| _Zone.current.handleUncaughtError(error); |
| } |
| |
| /** Default done handler, does nothing. */ |
| void _nullDoneHandler() {} |
| |
| |
| /** A delayed event on a buffering stream subscription. */ |
| abstract class _DelayedEvent { |
| /** Added as a linked list on the [StreamController]. */ |
| _DelayedEvent next; |
| /** Execute the delayed event on the [StreamController]. */ |
| void perform(_EventDispatch dispatch); |
| } |
| |
| /** A delayed data event. */ |
| class _DelayedData<T> extends _DelayedEvent{ |
| final T value; |
| _DelayedData(this.value); |
| void perform(_EventDispatch<T> dispatch) { |
| dispatch._sendData(value); |
| } |
| } |
| |
| /** A delayed error event. */ |
| class _DelayedError extends _DelayedEvent { |
| final error; |
| _DelayedError(this.error); |
| void perform(_EventDispatch dispatch) { |
| dispatch._sendError(error); |
| } |
| } |
| |
| /** A delayed done event. */ |
| class _DelayedDone implements _DelayedEvent { |
| const _DelayedDone(); |
| void perform(_EventDispatch dispatch) { |
| dispatch._sendDone(); |
| } |
| |
| _DelayedEvent get next => null; |
| |
| void set next(_DelayedEvent _) { |
| throw new StateError("No events after a done."); |
| } |
| } |
| |
| /** Superclass for provider of pending events. */ |
| abstract class _PendingEvents { |
| // No async event has been scheduled. |
| static const int _STATE_UNSCHEDULED = 0; |
| // An async event has been scheduled to run a function. |
| static const int _STATE_SCHEDULED = 1; |
| // An async event has been scheduled, but it will do nothing when it runs. |
| // Async events can't be preempted. |
| static const int _STATE_CANCELED = 3; |
| |
| /** |
| * State of being scheduled. |
| * |
| * Set to [_STATE_SCHEDULED] when pending events are scheduled for |
| * async dispatch. Since we can't cancel a [runAsync] call, if schduling |
| * is "canceled", the _state is simply set to [_STATE_CANCELED] which will |
| * make the async code do nothing except resetting [_state]. |
| * |
| * If events are scheduled while the state is [_STATE_CANCELED], it is |
| * merely switched back to [_STATE_SCHEDULED], but no new call to [runAsync] |
| * is performed. |
| */ |
| int _state = _STATE_UNSCHEDULED; |
| |
| bool get isEmpty; |
| |
| bool get isScheduled => _state == _STATE_SCHEDULED; |
| bool get _eventScheduled => _state >= _STATE_SCHEDULED; |
| |
| /** |
| * Schedule an event to run later. |
| * |
| * If called more than once, it should be called with the same dispatch as |
| * argument each time. It may reuse an earlier argument in some cases. |
| */ |
| void schedule(_EventDispatch dispatch) { |
| if (isScheduled) return; |
| assert(!isEmpty); |
| if (_eventScheduled) { |
| assert(_state == _STATE_CANCELED); |
| _state = _STATE_SCHEDULED; |
| return; |
| } |
| runAsync(() { |
| int oldState = _state; |
| _state = _STATE_UNSCHEDULED; |
| if (oldState == _STATE_CANCELED) return; |
| handleNext(dispatch); |
| }); |
| _state = _STATE_SCHEDULED; |
| } |
| |
| void cancelSchedule() { |
| if (isScheduled) _state = _STATE_CANCELED; |
| } |
| |
| void handleNext(_EventDispatch dispatch); |
| |
| /** Throw away any pending events and cancel scheduled events. */ |
| void clear(); |
| } |
| |
| |
| /** Class holding pending events for a [_StreamImpl]. */ |
| class _StreamImplEvents extends _PendingEvents { |
| /// Single linked list of [_DelayedEvent] objects. |
| _DelayedEvent firstPendingEvent = null; |
| /// Last element in the list of pending events. New events are added after it. |
| _DelayedEvent lastPendingEvent = null; |
| |
| bool get isEmpty => lastPendingEvent == null; |
| |
| void add(_DelayedEvent event) { |
| if (lastPendingEvent == null) { |
| firstPendingEvent = lastPendingEvent = event; |
| } else { |
| lastPendingEvent = lastPendingEvent.next = event; |
| } |
| } |
| |
| void handleNext(_EventDispatch dispatch) { |
| assert(!isScheduled); |
| _DelayedEvent event = firstPendingEvent; |
| firstPendingEvent = event.next; |
| if (firstPendingEvent == null) { |
| lastPendingEvent = null; |
| } |
| event.perform(dispatch); |
| } |
| |
| void clear() { |
| if (isScheduled) cancelSchedule(); |
| firstPendingEvent = lastPendingEvent = null; |
| } |
| } |
| |
| class _BroadcastLinkedList { |
| _BroadcastLinkedList _next; |
| _BroadcastLinkedList _previous; |
| |
| void _unlink() { |
| _previous._next = _next; |
| _next._previous = _previous; |
| _next = _previous = this; |
| } |
| |
| void _insertBefore(_BroadcastLinkedList newNext) { |
| _BroadcastLinkedList newPrevious = newNext._previous; |
| newPrevious._next = this; |
| newNext._previous = _previous; |
| _previous._next = newNext; |
| _previous = newPrevious; |
| } |
| } |
| |
| class _AsBroadcastStream<T> extends Stream<T> { |
| final Stream<T> _source; |
| _AsBroadcastStreamController<T> _controller; |
| StreamSubscription<T> _subscription; |
| |
| _AsBroadcastStream(this._source) { |
| _controller = new _AsBroadcastStreamController<T>(null, _onCancel); |
| } |
| |
| bool get isBroadcast => true; |
| |
| StreamSubscription<T> listen(void onData(T data), |
| { void onError(Object error), |
| void onDone(), |
| bool cancelOnError}) { |
| if (_controller == null) { |
| throw new StateError("Source stream has been closed."); |
| } |
| if (_subscription == null) { |
| _subscription = _source.listen(_controller.add, |
| onError: _controller.addError, |
| onDone: _controller.close); |
| } |
| return _controller.stream.listen(onData, onError: onError, onDone: onDone, |
| cancelOnError: cancelOnError); |
| } |
| |
| void _onCancel() { |
| // Called by [_controller] when it has no subscribers left. |
| StreamSubscription subscription = _subscription; |
| _subscription = null; |
| _controller = null; // Marks the stream as no longer listenable. |
| subscription.cancel(); |
| } |
| } |
| |
| /** |
| * Simple implementation of [StreamIterator]. |
| */ |
| class _StreamIteratorImpl<T> implements StreamIterator<T> { |
| // Internal state of the stream iterator. |
| // At any time, it is in one of these states. |
| // The interpretation of the [_futureOrPrefecth] field depends on the state. |
| // In _STATE_MOVING, the _data field holds the most recently returned |
| // future. |
| // When in one of the _STATE_EXTRA_* states, the it may hold the |
| // next data/error object, and the subscription is paused. |
| |
| /// The simple state where [_data] holds the data to return, and [moveNext] |
| /// is allowed. The subscription is actively listening. |
| static const int _STATE_FOUND = 0; |
| /// State set after [moveNext] has returned false or an error, |
| /// or after calling [cancel]. The subscription is always canceled. |
| static const int _STATE_DONE = 1; |
| /// State set after calling [moveNext], but before its returned future has |
| /// completed. Calling [moveNext] again is not allowed in this state. |
| /// The subscription is actively listening. |
| static const int _STATE_MOVING = 2; |
| /// States set when another event occurs while in _STATE_FOUND. |
| /// This extra overflow event is cached until the next call to [moveNext], |
| /// which will complete as if it received the event normally. |
| /// The subscription is paused in these states, so we only ever get one |
| /// event too many. |
| static const int _STATE_EXTRA_DATA = 3; |
| static const int _STATE_EXTRA_ERROR = 4; |
| static const int _STATE_EXTRA_DONE = 5; |
| |
| /// Subscription being listened to. |
| StreamSubscription _subscription; |
| |
| /// The current element represented by the most recent call to moveNext. |
| /// |
| /// Is null between the time moveNext is called and its future completes. |
| T _current = null; |
| |
| /// The future returned by the most recent call to [moveNext]. |
| /// |
| /// Also used to store the next value/error in case the stream provides an |
| /// event before [moveNext] is called again. In that case, the stream will |
| /// be paused to prevent further events. |
| var _futureOrPrefetch = null; |
| |
| /// The current state. |
| int _state = _STATE_FOUND; |
| |
| _StreamIteratorImpl(final Stream<T> stream) { |
| _subscription = stream.listen(_onData, |
| onError: _onError, |
| onDone: _onDone, |
| cancelOnError: true); |
| } |
| |
| T get current => _current; |
| |
| Future<bool> moveNext() { |
| if (_state == _STATE_DONE) { |
| return new _FutureImpl<bool>.immediate(false); |
| } |
| if (_state == _STATE_MOVING) { |
| throw new StateError("Already waiting for next."); |
| } |
| if (_state == _STATE_FOUND) { |
| _state = _STATE_MOVING; |
| _futureOrPrefetch = new _FutureImpl<bool>(); |
| return _futureOrPrefetch; |
| } else { |
| assert(_state >= _STATE_EXTRA_DATA); |
| switch (_state) { |
| case _STATE_EXTRA_DATA: |
| _state = _STATE_FOUND; |
| _current = _futureOrPrefetch; |
| _futureOrPrefetch = null; |
| _subscription.resume(); |
| return new _FutureImpl<bool>.immediate(true); |
| case _STATE_EXTRA_ERROR: |
| Object prefetch = _futureOrPrefetch; |
| _clear(); |
| return new _FutureImpl<bool>.immediateError(prefetch); |
| case _STATE_EXTRA_DONE: |
| _clear(); |
| return new _FutureImpl<bool>.immediate(false); |
| } |
| } |
| } |
| |
| /** Clears up the internal state when the iterator ends. */ |
| void _clear() { |
| _subscription = null; |
| _futureOrPrefetch = null; |
| _current = null; |
| _state = _STATE_DONE; |
| } |
| |
| void cancel() { |
| StreamSubscription subscription = _subscription; |
| if (_state == _STATE_MOVING) { |
| _FutureImpl<bool> hasNext = _futureOrPrefetch; |
| _clear(); |
| hasNext._setValue(false); |
| } else { |
| _clear(); |
| } |
| subscription.cancel(); |
| } |
| |
| void _onData(T data) { |
| if (_state == _STATE_MOVING) { |
| _current = data; |
| _FutureImpl<bool> hasNext = _futureOrPrefetch; |
| _futureOrPrefetch = null; |
| _state = _STATE_FOUND; |
| hasNext._setValue(true); |
| return; |
| } |
| _subscription.pause(); |
| assert(_futureOrPrefetch == null); |
| _futureOrPrefetch = data; |
| _state = _STATE_EXTRA_DATA; |
| } |
| |
| void _onError(Object error) { |
| if (_state == _STATE_MOVING) { |
| _FutureImpl<bool> hasNext = _futureOrPrefetch; |
| // We have cancelOnError: true, so the subscription is canceled. |
| _clear(); |
| hasNext._setError(error); |
| return; |
| } |
| _subscription.pause(); |
| assert(_futureOrPrefetch == null); |
| _futureOrPrefetch = error; |
| _state = _STATE_EXTRA_ERROR; |
| } |
| |
| void _onDone() { |
| if (_state == _STATE_MOVING) { |
| _FutureImpl<bool> hasNext = _futureOrPrefetch; |
| _clear(); |
| hasNext._setValue(false); |
| return; |
| } |
| _subscription.pause(); |
| _futureOrPrefetch = null; |
| _state = _STATE_EXTRA_DONE; |
| } |
| } |