blob: 3de0d582c69d41386ce7bf725ce9f7549fbb8bac [file] [log] [blame]
// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of dart.async;
/// Abstract and private interface for a place to put events.
abstract class _EventSink<T> {
void _add(T data);
void _addError(Object error, StackTrace stackTrace);
void _close();
}
/// Abstract and private interface for a place to send events.
///
/// Used by event buffering to finally dispatch the pending event, where
/// [_EventSink] is where the event first enters the stream subscription,
/// and may yet be buffered.
abstract class _EventDispatch<T> {
void _sendData(T data);
void _sendError(Object error, StackTrace stackTrace);
void _sendDone();
}
/// Default implementation of stream subscription of buffering events.
///
/// The only public methods are those of [StreamSubscription], so instances of
/// [_BufferingStreamSubscription] can be returned directly as a
/// [StreamSubscription] without exposing internal functionality.
///
/// The [StreamController] is a public facing version of [Stream] and this class,
/// with some methods made public.
///
/// The user interface of [_BufferingStreamSubscription] are the following
/// methods:
///
/// * [_add]: Add a data event to the stream.
/// * [_addError]: Add an error event to the stream.
/// * [_close]: Request to close the stream.
/// * [_onCancel]: Called when the subscription will provide no more events,
/// either due to being actively canceled, or after sending a done event.
/// * [_onPause]: Called when the subscription wants the event source to pause.
/// * [_onResume]: Called when allowing new events after a pause.
///
/// The user should not add new events when the subscription requests a paused,
/// but if it happens anyway, the subscription will enqueue the events just as
/// when new events arrive while still firing an old event.
class _BufferingStreamSubscription<T>
implements StreamSubscription<T>, _EventSink<T>, _EventDispatch<T> {
/// The `cancelOnError` flag from the `listen` call.
static const int _STATE_CANCEL_ON_ERROR = 1;
/// Whether the "done" event has been received.
/// No further events are accepted after this.
static const int _STATE_CLOSED = 2;
/// Set if the input has been asked not to send events.
///
/// This is not the same as being paused, since the input will remain paused
/// after a call to [resume] if there are pending events.
static const int _STATE_INPUT_PAUSED = 4;
/// Whether the subscription has been canceled.
///
/// Set by calling [cancel], or by handling a "done" event, or an "error" event
/// when `cancelOnError` is true.
static const int _STATE_CANCELED = 8;
/// Set when either:
///
/// * an error is sent, and [cancelOnError] is true, or
/// * a done event is sent.
///
/// If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the
/// state is unset, and no further events must be delivered.
static const int _STATE_WAIT_FOR_CANCEL = 16;
static const int _STATE_IN_CALLBACK = 32;
static const int _STATE_HAS_PENDING = 64;
static const int _STATE_PAUSE_COUNT = 128;
/* Event handlers provided in constructor. */
@pragma("vm:entry-point")
_DataHandler<T> _onData;
Function _onError;
_DoneHandler _onDone;
final Zone _zone;
/// Bit vector based on state-constants above.
int _state;
// TODO(floitsch): reuse another field
/// The future [_onCancel] may return.
Future? _cancelFuture;
/// Queue of pending events.
///
/// Is created when necessary, or set in constructor for preconfigured events.
_PendingEvents<T>? _pending;
_BufferingStreamSubscription(void onData(T data)?, Function? onError,
void onDone()?, bool cancelOnError)
: this.zoned(Zone.current, onData, onError, onDone, cancelOnError);
_BufferingStreamSubscription.zoned(this._zone, void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError)
: _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0),
_onData = _registerDataHandler<T>(_zone, onData),
_onError = _registerErrorHandler(_zone, onError),
_onDone = _registerDoneHandler(_zone, onDone);
/// Sets the subscription's pending events object.
///
/// This can only be done once. The pending events object is used for the
/// rest of the subscription's life cycle.
void _setPendingEvents(_PendingEvents<T>? pendingEvents) {
assert(_pending == null);
if (pendingEvents == null) return;
_pending = pendingEvents;
if (!pendingEvents.isEmpty) {
_state |= _STATE_HAS_PENDING;
pendingEvents.schedule(this);
}
}
// StreamSubscription interface.
void onData(void handleData(T event)?) {
_onData = _registerDataHandler<T>(_zone, handleData);
}
static void Function(T) _registerDataHandler<T>(
Zone zone, void Function(T)? handleData) {
return zone.registerUnaryCallback<void, T>(handleData ?? _nullDataHandler);
}
void onError(Function? handleError) {
_onError = _registerErrorHandler(_zone, handleError);
}
static Function _registerErrorHandler(Zone zone, Function? handleError) {
// TODO(lrn): Consider whether we need to register the null handler.
handleError ??= _nullErrorHandler;
if (handleError is void Function(Object, StackTrace)) {
return zone
.registerBinaryCallback<dynamic, Object, StackTrace>(handleError);
}
if (handleError is void Function(Object)) {
return zone.registerUnaryCallback<dynamic, Object>(handleError);
}
throw new ArgumentError("handleError callback must take either an Object "
"(the error), or both an Object (the error) and a StackTrace.");
}
void onDone(void handleDone()?) {
_onDone = _registerDoneHandler(_zone, handleDone);
}
static void Function() _registerDoneHandler(
Zone zone, void Function()? handleDone) {
return zone.registerCallback(handleDone ?? _nullDoneHandler);
}
void pause([Future<void>? resumeSignal]) {
if (_isCanceled) return;
bool wasPaused = _isPaused;
bool wasInputPaused = _isInputPaused;
// Increment pause count and mark input paused (if it isn't already).
_state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
resumeSignal?.whenComplete(resume);
if (!wasPaused) _pending?.cancelSchedule();
if (!wasInputPaused && !_inCallback) _guardCallback(_onPause);
}
void resume() {
if (_isCanceled) return;
if (_isPaused) {
_decrementPauseCount();
if (!_isPaused) {
if (_hasPending && !_pending!.isEmpty) {
// Input is still paused.
_pending!.schedule(this);
} else {
assert(_mayResumeInput);
_state &= ~_STATE_INPUT_PAUSED;
if (!_inCallback) _guardCallback(_onResume);
}
}
}
}
Future cancel() {
// The user doesn't want to receive any further events. If there is an
// error or done event pending (waiting for the cancel to be done) discard
// that event.
_state &= ~_STATE_WAIT_FOR_CANCEL;
if (!_isCanceled) {
_cancel();
}
return _cancelFuture ?? Future._nullFuture;
}
Future<E> asFuture<E>([E? futureValue]) {
E resultValue;
if (futureValue == null) {
if (!typeAcceptsNull<E>()) {
throw ArgumentError.notNull("futureValue");
}
resultValue = futureValue as dynamic;
} else {
resultValue = futureValue;
}
// Overwrite the onDone and onError handlers.
_Future<E> result = new _Future<E>();
_onDone = () {
result._complete(resultValue);
};
_onError = (Object error, StackTrace stackTrace) {
Future cancelFuture = cancel();
if (!identical(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() {
result._completeError(error, stackTrace);
});
} else {
result._completeError(error, stackTrace);
}
};
return result;
}
// State management.
bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0;
bool get _isClosed => (_state & _STATE_CLOSED) != 0;
bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
bool get _waitsForCancel => (_state & _STATE_WAIT_FOR_CANCEL) != 0;
bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0;
bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0;
bool get _isPaused => _state >= _STATE_PAUSE_COUNT;
bool get _canFire => _state < _STATE_IN_CALLBACK;
bool get _mayResumeInput => !_isPaused && (_pending?.isEmpty ?? true);
bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0;
bool get isPaused => _isPaused;
void _cancel() {
_state |= _STATE_CANCELED;
if (_hasPending) {
_pending!.cancelSchedule();
}
if (!_inCallback) _pending = null;
_cancelFuture = _onCancel();
}
/// Decrements the pause count.
///
/// Does not automatically unpause the input (call [_onResume]) when
/// the pause count reaches zero. This is handled elsewhere, and only
/// if there are no pending events buffered.
void _decrementPauseCount() {
assert(_isPaused);
_state -= _STATE_PAUSE_COUNT;
}
// _EventSink interface.
void _add(T data) {
assert(!_isClosed);
if (_isCanceled) return;
if (_canFire) {
_sendData(data);
} else {
_addPending(new _DelayedData<T>(data));
}
}
void _addError(Object error, StackTrace stackTrace) {
if (_isCanceled) return;
if (_canFire) {
_sendError(error, stackTrace); // Reports cancel after sending.
} else {
_addPending(new _DelayedError(error, stackTrace));
}
}
void _close() {
assert(!_isClosed);
if (_isCanceled) return;
_state |= _STATE_CLOSED;
if (_canFire) {
_sendDone();
} else {
_addPending(const _DelayedDone());
}
}
// Hooks called when the input is paused, unpaused or canceled.
// These must not throw. If overwritten to call user code, include suitable
// try/catch wrapping and send any errors to
// [_Zone.current.handleUncaughtError].
void _onPause() {
assert(_isInputPaused);
}
void _onResume() {
assert(!_isInputPaused);
}
Future<void>? _onCancel() {
assert(_isCanceled);
return null;
}
// Handle pending events.
/// Add a pending event.
///
/// If the subscription is not paused, this also schedules a firing
/// of pending events later (if necessary).
void _addPending(_DelayedEvent event) {
_StreamImplEvents<T>? pending = _pending as dynamic;
pending ??= _StreamImplEvents<T>();
_pending = pending;
pending.add(event);
if (!_hasPending) {
_state |= _STATE_HAS_PENDING;
if (!_isPaused) {
pending.schedule(this);
}
}
}
/* _EventDispatch interface. */
void _sendData(T data) {
assert(!_isCanceled);
assert(!_isPaused);
assert(!_inCallback);
bool wasInputPaused = _isInputPaused;
_state |= _STATE_IN_CALLBACK;
_zone.runUnaryGuarded(_onData, data);
_state &= ~_STATE_IN_CALLBACK;
_checkState(wasInputPaused);
}
void _sendError(Object error, StackTrace stackTrace) {
assert(!_isCanceled);
assert(!_isPaused);
assert(!_inCallback);
bool wasInputPaused = _isInputPaused;
void sendError() {
// If the subscription has been canceled while waiting for the cancel
// future to finish we must not report the error.
if (_isCanceled && !_waitsForCancel) return;
_state |= _STATE_IN_CALLBACK;
// TODO(floitsch): this dynamic should be 'void'.
var onError = _onError;
if (onError is void Function(Object, StackTrace)) {
_zone.runBinaryGuarded<Object, StackTrace>(onError, error, stackTrace);
} else {
_zone.runUnaryGuarded<Object>(_onError as dynamic, error);
}
_state &= ~_STATE_IN_CALLBACK;
}
if (_cancelOnError) {
_state |= _STATE_WAIT_FOR_CANCEL;
_cancel();
var cancelFuture = _cancelFuture;
if (cancelFuture != null &&
!identical(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(sendError);
} else {
sendError();
}
} else {
sendError();
// Only check state if not cancelOnError.
_checkState(wasInputPaused);
}
}
void _sendDone() {
assert(!_isCanceled);
assert(!_isPaused);
assert(!_inCallback);
void sendDone() {
// If the subscription has been canceled while waiting for the cancel
// future to finish we must not report the done event.
if (!_waitsForCancel) return;
_state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
_zone.runGuarded(_onDone);
_state &= ~_STATE_IN_CALLBACK;
}
_cancel();
_state |= _STATE_WAIT_FOR_CANCEL;
var cancelFuture = _cancelFuture;
if (cancelFuture != null && !identical(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(sendDone);
} else {
sendDone();
}
}
/// Call a hook function.
///
/// The call is properly wrapped in code to avoid other callbacks
/// during the call, and it checks for state changes after the call
/// that should cause further callbacks.
void _guardCallback(void Function() callback) {
assert(!_inCallback);
bool wasInputPaused = _isInputPaused;
_state |= _STATE_IN_CALLBACK;
callback();
_state &= ~_STATE_IN_CALLBACK;
_checkState(wasInputPaused);
}
/// Check if the input needs to be informed of state changes.
///
/// State changes are pausing, resuming and canceling.
///
/// After canceling, no further callbacks will happen.
///
/// The cancel callback is called after a user cancel, or after
/// the final done event is sent.
void _checkState(bool wasInputPaused) {
assert(!_inCallback);
if (_hasPending && _pending!.isEmpty) {
_state &= ~_STATE_HAS_PENDING;
if (_isInputPaused && _mayResumeInput) {
_state &= ~_STATE_INPUT_PAUSED;
}
}
// If the state changes during a callback, we immediately
// make a new state-change callback. Loop until the state didn't change.
while (true) {
if (_isCanceled) {
_pending = null;
return;
}
bool isInputPaused = _isInputPaused;
if (wasInputPaused == isInputPaused) break;
_state ^= _STATE_IN_CALLBACK;
if (isInputPaused) {
_onPause();
} else {
_onResume();
}
_state &= ~_STATE_IN_CALLBACK;
wasInputPaused = isInputPaused;
}
if (_hasPending && !_isPaused) {
_pending!.schedule(this);
}
}
}
// -------------------------------------------------------------------
// Common base class for single and multi-subscription streams.
// -------------------------------------------------------------------
abstract class _StreamImpl<T> extends Stream<T> {
// ------------------------------------------------------------------
// Stream interface.
StreamSubscription<T> listen(void onData(T data)?,
{Function? onError, void onDone()?, bool? cancelOnError}) {
cancelOnError ??= false;
StreamSubscription<T> subscription =
_createSubscription(onData, onError, onDone, cancelOnError);
_onListen(subscription);
return subscription;
}
// -------------------------------------------------------------------
/// Create a subscription object. Called by [subscribe].
StreamSubscription<T> _createSubscription(void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError) {
return new _BufferingStreamSubscription<T>(
onData, onError, onDone, cancelOnError);
}
/// Hook called when the subscription has been created.
void _onListen(StreamSubscription subscription) {}
}
typedef _PendingEvents<T> _EventGenerator<T>();
/// Stream that generates its own events.
class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
final _EventGenerator<T> _pending;
bool _isUsed = false;
/// Initializes the stream to have only the events provided by a
/// [_PendingEvents].
///
/// A new [_PendingEvents] must be generated for each listen.
_GeneratedStreamImpl(this._pending);
StreamSubscription<T> _createSubscription(void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError) {
if (_isUsed) throw new StateError("Stream has already been listened to.");
_isUsed = true;
return new _BufferingStreamSubscription<T>(
onData, onError, onDone, cancelOnError)
.._setPendingEvents(_pending());
}
}
/// Pending events object that gets its events from an [Iterable].
class _IterablePendingEvents<T> extends _PendingEvents<T> {
// The iterator providing data for data events.
// Set to null when iteration has completed.
Iterator<T>? _iterator;
_IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator;
bool get isEmpty => _iterator == null;
void handleNext(_EventDispatch<T> dispatch) {
var iterator = _iterator;
if (iterator == null) {
throw new StateError("No events pending.");
}
// Send one event per call to moveNext.
// If moveNext returns true, send the current element as data.
// If current throws, send that error, but keep iterating.
// If moveNext returns false, send a done event and clear the _iterator.
// If moveNext throws an error, send an error and prepare to send a done
// event afterwards.
bool movedNext = false;
try {
if (iterator.moveNext()) {
movedNext = true;
dispatch._sendData(iterator.current);
} else {
_iterator = null;
dispatch._sendDone();
}
} catch (e, s) {
if (!movedNext) {
// Threw in .moveNext().
// Ensure that we send a done afterwards.
_iterator = const EmptyIterator<Never>();
}
// Else threw in .current.
dispatch._sendError(e, s);
}
}
void clear() {
if (isScheduled) cancelSchedule();
_iterator = null;
}
}
// Internal helpers.
// Types of the different handlers on a stream. Types used to type fields.
typedef void _DataHandler<T>(T value);
typedef void _DoneHandler();
/// Default data handler, does nothing.
void _nullDataHandler(dynamic value) {}
/// Default error handler, reports the error to the current zone's handler.
void _nullErrorHandler(Object error, StackTrace stackTrace) {
Zone.current.handleUncaughtError(error, stackTrace);
}
/// Default done handler, does nothing.
void _nullDoneHandler() {}
/// A delayed event on a buffering stream subscription.
abstract class _DelayedEvent<T> {
/// Added as a linked list on the [StreamController].
_DelayedEvent? next;
/// Execute the delayed event on the [StreamController].
void perform(_EventDispatch<T> dispatch);
}
/// A delayed data event.
class _DelayedData<T> extends _DelayedEvent<T> {
final T value;
_DelayedData(this.value);
void perform(_EventDispatch<T> dispatch) {
dispatch._sendData(value);
}
}
/// A delayed error event.
class _DelayedError extends _DelayedEvent {
final Object error;
final StackTrace stackTrace;
_DelayedError(this.error, this.stackTrace);
void perform(_EventDispatch dispatch) {
dispatch._sendError(error, stackTrace);
}
}
/// A delayed done event.
class _DelayedDone implements _DelayedEvent {
const _DelayedDone();
void perform(_EventDispatch dispatch) {
dispatch._sendDone();
}
_DelayedEvent? get next => null;
void set next(_DelayedEvent? _) {
throw new StateError("No events after a done.");
}
}
/// Superclass for provider of pending events.
abstract class _PendingEvents<T> {
// No async event has been scheduled.
static const int _STATE_UNSCHEDULED = 0;
// An async event has been scheduled to run a function.
static const int _STATE_SCHEDULED = 1;
// An async event has been scheduled, but it will do nothing when it runs.
// Async events can't be preempted.
static const int _STATE_CANCELED = 3;
/// State of being scheduled.
///
/// Set to [_STATE_SCHEDULED] when pending events are scheduled for
/// async dispatch. Since we can't cancel a [scheduleMicrotask] call, if
/// scheduling is "canceled", the _state is simply set to [_STATE_CANCELED]
/// which will make the async code do nothing except resetting [_state].
///
/// If events are scheduled while the state is [_STATE_CANCELED], it is
/// merely switched back to [_STATE_SCHEDULED], but no new call to
/// [scheduleMicrotask] is performed.
int _state = _STATE_UNSCHEDULED;
bool get isEmpty;
bool get isScheduled => _state == _STATE_SCHEDULED;
bool get _eventScheduled => _state >= _STATE_SCHEDULED;
/// Schedule an event to run later.
///
/// If called more than once, it should be called with the same dispatch as
/// argument each time. It may reuse an earlier argument in some cases.
void schedule(_EventDispatch<T> dispatch) {
if (isScheduled) return;
assert(!isEmpty);
if (_eventScheduled) {
assert(_state == _STATE_CANCELED);
_state = _STATE_SCHEDULED;
return;
}
scheduleMicrotask(() {
int oldState = _state;
_state = _STATE_UNSCHEDULED;
if (oldState == _STATE_CANCELED) return;
handleNext(dispatch);
});
_state = _STATE_SCHEDULED;
}
void cancelSchedule() {
if (isScheduled) _state = _STATE_CANCELED;
}
void handleNext(_EventDispatch<T> dispatch);
/// Throw away any pending events and cancel scheduled events.
void clear();
}
/// Class holding pending events for a [_StreamImpl].
class _StreamImplEvents<T> extends _PendingEvents<T> {
/// Single linked list of [_DelayedEvent] objects.
_DelayedEvent? firstPendingEvent;
/// Last element in the list of pending events. New events are added after it.
_DelayedEvent? lastPendingEvent;
bool get isEmpty => lastPendingEvent == null;
void add(_DelayedEvent event) {
var lastEvent = lastPendingEvent;
if (lastEvent == null) {
firstPendingEvent = lastPendingEvent = event;
} else {
lastPendingEvent = lastEvent.next = event;
}
}
void handleNext(_EventDispatch<T> dispatch) {
assert(!isScheduled);
assert(!isEmpty);
_DelayedEvent event = firstPendingEvent!;
_DelayedEvent? nextEvent = event.next;
firstPendingEvent = nextEvent;
if (nextEvent == null) {
lastPendingEvent = null;
}
event.perform(dispatch);
}
void clear() {
if (isScheduled) cancelSchedule();
firstPendingEvent = lastPendingEvent = null;
}
}
typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription);
/// Done subscription that will send one done event as soon as possible.
class _DoneStreamSubscription<T> implements StreamSubscription<T> {
static const int _DONE_SENT = 1;
static const int _SCHEDULED = 2;
static const int _PAUSED = 4;
final Zone _zone;
int _state = 0;
_DoneHandler? _onDone;
_DoneStreamSubscription(this._onDone) : _zone = Zone.current {
_schedule();
}
bool get _isSent => (_state & _DONE_SENT) != 0;
bool get _isScheduled => (_state & _SCHEDULED) != 0;
bool get isPaused => _state >= _PAUSED;
void _schedule() {
if (_isScheduled) return;
_zone.scheduleMicrotask(_sendDone);
_state |= _SCHEDULED;
}
void onData(void handleData(T data)?) {}
void onError(Function? handleError) {}
void onDone(void handleDone()?) {
_onDone = handleDone;
}
void pause([Future<void>? resumeSignal]) {
_state += _PAUSED;
if (resumeSignal != null) resumeSignal.whenComplete(resume);
}
void resume() {
if (isPaused) {
_state -= _PAUSED;
if (!isPaused && !_isSent) {
_schedule();
}
}
}
Future cancel() => Future._nullFuture;
Future<E> asFuture<E>([E? futureValue]) {
E resultValue;
if (futureValue == null) {
if (!typeAcceptsNull<E>()) {
throw ArgumentError.notNull("futureValue");
}
resultValue = futureValue as dynamic;
} else {
resultValue = futureValue;
}
_Future<E> result = new _Future<E>();
_onDone = () {
result._completeWithValue(resultValue);
};
return result;
}
void _sendDone() {
_state &= ~_SCHEDULED;
if (isPaused) return;
_state |= _DONE_SENT;
var doneHandler = _onDone;
if (doneHandler != null) _zone.runGuarded(doneHandler);
}
}
class _AsBroadcastStream<T> extends Stream<T> {
final Stream<T> _source;
final _BroadcastCallback<T>? _onListenHandler;
final _BroadcastCallback<T>? _onCancelHandler;
final Zone _zone;
_AsBroadcastStreamController<T>? _controller;
StreamSubscription<T>? _subscription;
_AsBroadcastStream(
this._source,
void onListenHandler(StreamSubscription<T> subscription)?,
void onCancelHandler(StreamSubscription<T> subscription)?)
: _onListenHandler = onListenHandler == null
? null
: Zone.current.registerUnaryCallback<void, StreamSubscription<T>>(
onListenHandler),
_onCancelHandler = onCancelHandler == null
? null
: Zone.current.registerUnaryCallback<void, StreamSubscription<T>>(
onCancelHandler),
_zone = Zone.current {
_controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
}
bool get isBroadcast => true;
StreamSubscription<T> listen(void onData(T data)?,
{Function? onError, void onDone()?, bool? cancelOnError}) {
var controller = _controller;
if (controller == null || controller.isClosed) {
// Return a dummy subscription backed by nothing, since
// it will only ever send one done event.
return new _DoneStreamSubscription<T>(onDone);
}
_subscription ??= _source.listen(controller.add,
onError: controller.addError, onDone: controller.close);
return controller._subscribe(
onData, onError, onDone, cancelOnError ?? false);
}
void _onCancel() {
var controller = _controller;
bool shutdown = (controller == null) || controller.isClosed;
var cancelHandler = _onCancelHandler;
if (cancelHandler != null) {
_zone.runUnary(cancelHandler, new _BroadcastSubscriptionWrapper<T>(this));
}
if (shutdown) {
var subscription = _subscription;
if (subscription != null) {
subscription.cancel();
_subscription = null;
}
}
}
void _onListen() {
var listenHandler = _onListenHandler;
if (listenHandler != null) {
_zone.runUnary(listenHandler, new _BroadcastSubscriptionWrapper<T>(this));
}
}
// Methods called from _BroadcastSubscriptionWrapper.
void _cancelSubscription() {
// Called by [_controller] when it has no subscribers left.
var subscription = _subscription;
if (subscription != null) {
_subscription = null;
_controller = null; // Marks the stream as no longer listenable.
subscription.cancel();
}
}
void _pauseSubscription(Future<void>? resumeSignal) {
_subscription?.pause(resumeSignal);
}
void _resumeSubscription() {
_subscription?.resume();
}
bool get _isSubscriptionPaused {
return _subscription?.isPaused ?? false;
}
}
/// Wrapper for subscription that disallows changing handlers.
class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
final _AsBroadcastStream _stream;
_BroadcastSubscriptionWrapper(this._stream);
void onData(void handleData(T data)?) {
throw new UnsupportedError(
"Cannot change handlers of asBroadcastStream source subscription.");
}
void onError(Function? handleError) {
throw new UnsupportedError(
"Cannot change handlers of asBroadcastStream source subscription.");
}
void onDone(void handleDone()?) {
throw new UnsupportedError(
"Cannot change handlers of asBroadcastStream source subscription.");
}
void pause([Future<void>? resumeSignal]) {
_stream._pauseSubscription(resumeSignal);
}
void resume() {
_stream._resumeSubscription();
}
Future cancel() {
_stream._cancelSubscription();
return Future._nullFuture;
}
bool get isPaused {
return _stream._isSubscriptionPaused;
}
Future<E> asFuture<E>([E? futureValue]) {
throw new UnsupportedError(
"Cannot change handlers of asBroadcastStream source subscription.");
}
}
/// Simple implementation of [StreamIterator].
///
/// Pauses the stream between calls to [moveNext].
class _StreamIterator<T> implements StreamIterator<T> {
// The stream iterator is always in one of five states.
// The value of the [_stateData] field depends on the state.
//
// When `_subscription == null`, `_stateData != null`, and not listened yet:
// The stream iterator has been created, but [moveNext] has not been called
// yet. The [_stateData] field contains the stream to listen to on the first
// call to [moveNext] and [current] returns `null`.
//
// When `_subscription == null`, `_stateData != null`, during `listen` call.
// The `listen` call has not returned a subscription yet.
// The `_stateData` contains the future returned by the first [moveNext]
// call. This state is only detected inside the stream event callbacks,
// since it's the only case where they can get called while `_subscription`
// is `null`. (A well-behaved stream should not be emitting events during
// the `listen` call, but some do anyway). The [current] is `null`.
//
// When `_subscription != null` and `!_hasValue`:
// The user has called [moveNext] and the iterator is waiting for the next
// event. The [_stateData] field contains the [_Future] returned by the
// [_moveNext] call and [current] returns `null.`
//
// When `_subscription != null` and `_hasValue`:
// The most recent call to [moveNext] has completed with a `true` value
// and [current] provides the value of the data event.
// The [_stateData] field contains the [current] value.
//
// When `_subscription == null` and `_stateData == null`:
// The stream has completed or been canceled using [cancel].
// The stream completes on either a done event or an error event.
// The last call to [moveNext] has completed with `false` and [current]
// returns `null`.
/// Subscription being listened to.
///
/// Set to `null` when the stream subscription is done or canceled.
StreamSubscription<T>? _subscription;
/// Data value depending on the current state.
///
/// Before first call to [moveNext]: The stream to listen to.
///
/// After calling [moveNext] but before the returned future completes:
/// The returned future.
///
/// After calling [moveNext] and the returned future has completed
/// with `true`: The value of [current].
///
/// After calling [moveNext] and the returned future has completed
/// with `false`, or after calling [cancel]: `null`.
@pragma("vm:entry-point")
Object? _stateData;
/// Whether the iterator is between calls to `moveNext`.
/// This will usually cause the [_subscription] to be paused, but as an
/// optimization, we only pause after the [moveNext] future has been
/// completed.
bool _hasValue = false;
_StreamIterator(final Stream<T> stream)
: _stateData = checkNotNullable(stream, "stream");
T get current {
if (_hasValue) return _stateData as dynamic;
return null as dynamic;
}
Future<bool> moveNext() {
var subscription = _subscription;
if (subscription != null) {
if (_hasValue) {
var future = new _Future<bool>();
_stateData = future;
_hasValue = false;
subscription.resume();
return future;
}
throw new StateError("Already waiting for next.");
}
return _initializeOrDone();
}
/// Called if there is no active subscription when [moveNext] is called.
///
/// Either starts listening on the stream if this is the first call to
/// [moveNext], or returns a `false` future because the stream has already
/// ended.
Future<bool> _initializeOrDone() {
assert(_subscription == null);
var stateData = _stateData;
if (stateData != null) {
Stream<T> stream = stateData as dynamic;
var future = new _Future<bool>();
_stateData = future;
// The `listen` call may invoke user code, and it might try to emit
// events.
// We ignore data events during `listen`, but error or done events
// are used to asynchronously complete the future and set `_stateData`
// to null.
// This ensures that we do no other user-code callbacks during `listen`
// than the `onListen` itself. If that code manages to call `moveNext`
// again on this iterator, then we will get here and fail when the
// `_stateData` is a future instead of a stream.
var subscription = stream.listen(_onData,
onError: _onError, onDone: _onDone, cancelOnError: true);
if (_stateData != null) {
_subscription = subscription;
}
return future;
}
return Future._falseFuture;
}
Future cancel() {
var subscription = _subscription;
var stateData = _stateData;
_stateData = null;
if (subscription != null) {
_subscription = null;
if (!_hasValue) {
_Future<bool> future = stateData as dynamic;
future._asyncComplete(false);
} else {
_hasValue = false;
}
return subscription.cancel();
}
return Future._nullFuture;
}
void _onData(T data) {
// Ignore events sent during the `listen` call
// (which can happen if misusing synchronous broadcast stream controllers),
// or after `cancel` or `done` (for *really* misbehaving streams).
if (_subscription == null) return;
_Future<bool> moveNextFuture = _stateData as dynamic;
_stateData = data;
_hasValue = true;
moveNextFuture._complete(true);
if (_hasValue) _subscription?.pause();
}
void _onError(Object error, StackTrace stackTrace) {
var subscription = _subscription;
_Future<bool> moveNextFuture = _stateData as dynamic;
_subscription = null;
_stateData = null;
if (subscription != null) {
moveNextFuture._completeError(error, stackTrace);
} else {
// Event delivered during `listen` call.
moveNextFuture._asyncCompleteError(error, stackTrace);
}
}
void _onDone() {
var subscription = _subscription;
_Future<bool> moveNextFuture = _stateData as dynamic;
_subscription = null;
_stateData = null;
if (subscription != null) {
moveNextFuture._completeWithValue(false);
} else {
// Event delivered during `listen` call.
moveNextFuture._asyncCompleteWithValue(false);
}
}
}
/// An empty broadcast stream, sending a done event as soon as possible.
class _EmptyStream<T> extends Stream<T> {
const _EmptyStream() : super._internal();
bool get isBroadcast => true;
StreamSubscription<T> listen(void onData(T data)?,
{Function? onError, void onDone()?, bool? cancelOnError}) {
return new _DoneStreamSubscription<T>(onDone);
}
}
/// A stream which creates a new controller for each listener.
class _MultiStream<T> extends Stream<T> {
final bool isBroadcast;
/// The callback called for each listen.
final void Function(MultiStreamController<T>) _onListen;
_MultiStream(this._onListen, this.isBroadcast);
StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError}) {
var controller = _MultiStreamController<T>();
controller.onListen = () {
_onListen(controller);
};
return controller._subscribe(
onData, onError, onDone, cancelOnError ?? false);
}
}
class _MultiStreamController<T> extends _AsyncStreamController<T>
implements MultiStreamController<T> {
_MultiStreamController() : super(null, null, null, null);
void addSync(T data) {
if (!_mayAddEvent) throw _badEventState();
if (hasListener) _subscription._add(data);
}
void addErrorSync(Object error, [StackTrace? stackTrace]) {
if (!_mayAddEvent) throw _badEventState();
if (hasListener) {
_subscription._addError(error, stackTrace ?? StackTrace.empty);
}
}
void closeSync() {
if (isClosed) return;
if (!_mayAddEvent) throw _badEventState();
_state |= _StreamController._STATE_CLOSED;
if (hasListener) _subscription._close();
}
Stream<T> get stream {
throw UnsupportedError("Not available");
}
}