// Copyright (c) 2012, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

part of dart.async;

/// Abstract and private interface for a place to put events.
abstract class _EventSink<T> {
  void _add(T data);
  void _addError(Object error, StackTrace stackTrace);
  void _close();
}

/// Abstract and private interface for a place to send events.
///
/// Used by event buffering to finally dispatch the pending event, where
/// [_EventSink] is where the event first enters the stream subscription,
/// and may yet be buffered.
abstract class _EventDispatch<T> {
  void _sendData(T data);
  void _sendError(Object error, StackTrace stackTrace);
  void _sendDone();
}

/// Default implementation of stream subscription of buffering events.
///
/// The only public methods are those of [StreamSubscription], so instances of
/// [_BufferingStreamSubscription] can be returned directly as a
/// [StreamSubscription] without exposing internal functionality.
///
/// The [StreamController] is a public facing version of [Stream] and this class,
/// with some methods made public.
///
/// The user interface of [_BufferingStreamSubscription] are the following
/// methods:
///
/// * [_add]: Add a data event to the stream.
/// * [_addError]: Add an error event to the stream.
/// * [_close]: Request to close the stream.
/// * [_onCancel]: Called when the subscription will provide no more events,
///     either due to being actively canceled, or after sending a done event.
/// * [_onPause]: Called when the subscription wants the event source to pause.
/// * [_onResume]: Called when allowing new events after a pause.
///
/// The user should not add new events when the subscription requests a paused,
/// but if it happens anyway, the subscription will enqueue the events just as
/// when new events arrive while still firing an old event.
class _BufferingStreamSubscription<T>
    implements StreamSubscription<T>, _EventSink<T>, _EventDispatch<T> {
  /// The `cancelOnError` flag from the `listen` call.
  static const int _STATE_CANCEL_ON_ERROR = 1;

  /// Whether the "done" event has been received.
  /// No further events are accepted after this.
  static const int _STATE_CLOSED = 2;

  /// Set if the input has been asked not to send events.
  ///
  /// This is not the same as being paused, since the input will remain paused
  /// after a call to [resume] if there are pending events.
  static const int _STATE_INPUT_PAUSED = 4;

  /// Whether the subscription has been canceled.
  ///
  /// Set by calling [cancel], or by handling a "done" event, or an "error" event
  /// when `cancelOnError` is true.
  static const int _STATE_CANCELED = 8;

  /// Set when either:
  ///
  ///   * an error is sent, and [cancelOnError] is true, or
  ///   * a done event is sent.
  ///
  /// If the subscription is canceled while _STATE_WAIT_FOR_CANCEL is set, the
  /// state is unset, and no further events must be delivered.
  static const int _STATE_WAIT_FOR_CANCEL = 16;
  static const int _STATE_IN_CALLBACK = 32;
  static const int _STATE_HAS_PENDING = 64;
  static const int _STATE_PAUSE_COUNT = 128;

  /* Event handlers provided in constructor. */
  @pragma("vm:entry-point")
  _DataHandler<T> _onData;
  Function _onError;
  _DoneHandler _onDone;

  final Zone _zone;

  /// Bit vector based on state-constants above.
  int _state;

  // TODO(floitsch): reuse another field
  /// The future [_onCancel] may return.
  Future? _cancelFuture;

  /// Queue of pending events.
  ///
  /// Is created when necessary, or set in constructor for preconfigured events.
  _PendingEvents<T>? _pending;

  _BufferingStreamSubscription(void onData(T data)?, Function? onError,
      void onDone()?, bool cancelOnError)
      : this.zoned(Zone.current, onData, onError, onDone, cancelOnError);

  _BufferingStreamSubscription.zoned(this._zone, void onData(T data)?,
      Function? onError, void onDone()?, bool cancelOnError)
      : _state = (cancelOnError ? _STATE_CANCEL_ON_ERROR : 0),
        _onData = _registerDataHandler<T>(_zone, onData),
        _onError = _registerErrorHandler(_zone, onError),
        _onDone = _registerDoneHandler(_zone, onDone);

  /// Sets the subscription's pending events object.
  ///
  /// This can only be done once. The pending events object is used for the
  /// rest of the subscription's life cycle.
  void _setPendingEvents(_PendingEvents<T>? pendingEvents) {
    assert(_pending == null);
    if (pendingEvents == null) return;
    _pending = pendingEvents;
    if (!pendingEvents.isEmpty) {
      _state |= _STATE_HAS_PENDING;
      pendingEvents.schedule(this);
    }
  }

  // StreamSubscription interface.

  void onData(void handleData(T event)?) {
    _onData = _registerDataHandler<T>(_zone, handleData);
  }

  static void Function(T) _registerDataHandler<T>(
      Zone zone, void Function(T)? handleData) {
    return zone.registerUnaryCallback<void, T>(handleData ?? _nullDataHandler);
  }

  void onError(Function? handleError) {
    _onError = _registerErrorHandler(_zone, handleError);
  }

  static Function _registerErrorHandler(Zone zone, Function? handleError) {
    // TODO(lrn): Consider whether we need to register the null handler.
    handleError ??= _nullErrorHandler;
    if (handleError is void Function(Object, StackTrace)) {
      return zone
          .registerBinaryCallback<dynamic, Object, StackTrace>(handleError);
    }
    if (handleError is void Function(Object)) {
      return zone.registerUnaryCallback<dynamic, Object>(handleError);
    }
    throw new ArgumentError("handleError callback must take either an Object "
        "(the error), or both an Object (the error) and a StackTrace.");
  }

  void onDone(void handleDone()?) {
    _onDone = _registerDoneHandler(_zone, handleDone);
  }

  static void Function() _registerDoneHandler(
      Zone zone, void Function()? handleDone) {
    return zone.registerCallback(handleDone ?? _nullDoneHandler);
  }

  void pause([Future<void>? resumeSignal]) {
    if (_isCanceled) return;
    bool wasPaused = _isPaused;
    bool wasInputPaused = _isInputPaused;
    // Increment pause count and mark input paused (if it isn't already).
    _state = (_state + _STATE_PAUSE_COUNT) | _STATE_INPUT_PAUSED;
    resumeSignal?.whenComplete(resume);
    if (!wasPaused) _pending?.cancelSchedule();
    if (!wasInputPaused && !_inCallback) _guardCallback(_onPause);
  }

  void resume() {
    if (_isCanceled) return;
    if (_isPaused) {
      _decrementPauseCount();
      if (!_isPaused) {
        if (_hasPending && !_pending!.isEmpty) {
          // Input is still paused.
          _pending!.schedule(this);
        } else {
          assert(_mayResumeInput);
          _state &= ~_STATE_INPUT_PAUSED;
          if (!_inCallback) _guardCallback(_onResume);
        }
      }
    }
  }

  Future cancel() {
    // The user doesn't want to receive any further events. If there is an
    // error or done event pending (waiting for the cancel to be done) discard
    // that event.
    _state &= ~_STATE_WAIT_FOR_CANCEL;
    if (!_isCanceled) {
      _cancel();
    }
    return _cancelFuture ?? Future._nullFuture;
  }

  Future<E> asFuture<E>([E? futureValue]) {
    E resultValue;
    if (futureValue == null) {
      if (!typeAcceptsNull<E>()) {
        throw ArgumentError.notNull("futureValue");
      }
      resultValue = futureValue as dynamic;
    } else {
      resultValue = futureValue;
    }
    // Overwrite the onDone and onError handlers.
    _Future<E> result = new _Future<E>();
    _onDone = () {
      result._complete(resultValue);
    };
    _onError = (Object error, StackTrace stackTrace) {
      Future cancelFuture = cancel();
      if (!identical(cancelFuture, Future._nullFuture)) {
        cancelFuture.whenComplete(() {
          result._completeError(error, stackTrace);
        });
      } else {
        result._completeError(error, stackTrace);
      }
    };
    return result;
  }

  // State management.

  bool get _isInputPaused => (_state & _STATE_INPUT_PAUSED) != 0;
  bool get _isClosed => (_state & _STATE_CLOSED) != 0;
  bool get _isCanceled => (_state & _STATE_CANCELED) != 0;
  bool get _waitsForCancel => (_state & _STATE_WAIT_FOR_CANCEL) != 0;
  bool get _inCallback => (_state & _STATE_IN_CALLBACK) != 0;
  bool get _hasPending => (_state & _STATE_HAS_PENDING) != 0;
  bool get _isPaused => _state >= _STATE_PAUSE_COUNT;
  bool get _canFire => _state < _STATE_IN_CALLBACK;
  bool get _mayResumeInput => !_isPaused && (_pending?.isEmpty ?? true);
  bool get _cancelOnError => (_state & _STATE_CANCEL_ON_ERROR) != 0;

  bool get isPaused => _isPaused;

  void _cancel() {
    _state |= _STATE_CANCELED;
    if (_hasPending) {
      _pending!.cancelSchedule();
    }
    if (!_inCallback) _pending = null;
    _cancelFuture = _onCancel();
  }

  /// Decrements the pause count.
  ///
  /// Does not automatically unpause the input (call [_onResume]) when
  /// the pause count reaches zero. This is handled elsewhere, and only
  /// if there are no pending events buffered.
  void _decrementPauseCount() {
    assert(_isPaused);
    _state -= _STATE_PAUSE_COUNT;
  }

  // _EventSink interface.

  void _add(T data) {
    assert(!_isClosed);
    if (_isCanceled) return;
    if (_canFire) {
      _sendData(data);
    } else {
      _addPending(new _DelayedData<T>(data));
    }
  }

  void _addError(Object error, StackTrace stackTrace) {
    if (_isCanceled) return;
    if (_canFire) {
      _sendError(error, stackTrace); // Reports cancel after sending.
    } else {
      _addPending(new _DelayedError(error, stackTrace));
    }
  }

  void _close() {
    assert(!_isClosed);
    if (_isCanceled) return;
    _state |= _STATE_CLOSED;
    if (_canFire) {
      _sendDone();
    } else {
      _addPending(const _DelayedDone());
    }
  }

  // Hooks called when the input is paused, unpaused or canceled.
  // These must not throw. If overwritten to call user code, include suitable
  // try/catch wrapping and send any errors to
  // [_Zone.current.handleUncaughtError].
  void _onPause() {
    assert(_isInputPaused);
  }

  void _onResume() {
    assert(!_isInputPaused);
  }

  Future<void>? _onCancel() {
    assert(_isCanceled);
    return null;
  }

  // Handle pending events.

  /// Add a pending event.
  ///
  /// If the subscription is not paused, this also schedules a firing
  /// of pending events later (if necessary).
  void _addPending(_DelayedEvent event) {
    _StreamImplEvents<T>? pending = _pending as dynamic;
    pending ??= _StreamImplEvents<T>();
    _pending = pending;
    pending.add(event);
    if (!_hasPending) {
      _state |= _STATE_HAS_PENDING;
      if (!_isPaused) {
        pending.schedule(this);
      }
    }
  }

  /* _EventDispatch interface. */

  void _sendData(T data) {
    assert(!_isCanceled);
    assert(!_isPaused);
    assert(!_inCallback);
    bool wasInputPaused = _isInputPaused;
    _state |= _STATE_IN_CALLBACK;
    _zone.runUnaryGuarded(_onData, data);
    _state &= ~_STATE_IN_CALLBACK;
    _checkState(wasInputPaused);
  }

  void _sendError(Object error, StackTrace stackTrace) {
    assert(!_isCanceled);
    assert(!_isPaused);
    assert(!_inCallback);
    bool wasInputPaused = _isInputPaused;

    void sendError() {
      // If the subscription has been canceled while waiting for the cancel
      // future to finish we must not report the error.
      if (_isCanceled && !_waitsForCancel) return;
      _state |= _STATE_IN_CALLBACK;
      // TODO(floitsch): this dynamic should be 'void'.
      var onError = _onError;
      if (onError is void Function(Object, StackTrace)) {
        _zone.runBinaryGuarded<Object, StackTrace>(onError, error, stackTrace);
      } else {
        _zone.runUnaryGuarded<Object>(_onError as dynamic, error);
      }
      _state &= ~_STATE_IN_CALLBACK;
    }

    if (_cancelOnError) {
      _state |= _STATE_WAIT_FOR_CANCEL;
      _cancel();
      var cancelFuture = _cancelFuture;
      if (cancelFuture != null &&
          !identical(cancelFuture, Future._nullFuture)) {
        cancelFuture.whenComplete(sendError);
      } else {
        sendError();
      }
    } else {
      sendError();
      // Only check state if not cancelOnError.
      _checkState(wasInputPaused);
    }
  }

  void _sendDone() {
    assert(!_isCanceled);
    assert(!_isPaused);
    assert(!_inCallback);

    void sendDone() {
      // If the subscription has been canceled while waiting for the cancel
      // future to finish we must not report the done event.
      if (!_waitsForCancel) return;
      _state |= (_STATE_CANCELED | _STATE_CLOSED | _STATE_IN_CALLBACK);
      _zone.runGuarded(_onDone);
      _state &= ~_STATE_IN_CALLBACK;
    }

    _cancel();
    _state |= _STATE_WAIT_FOR_CANCEL;
    var cancelFuture = _cancelFuture;
    if (cancelFuture != null && !identical(cancelFuture, Future._nullFuture)) {
      cancelFuture.whenComplete(sendDone);
    } else {
      sendDone();
    }
  }

  /// Call a hook function.
  ///
  /// The call is properly wrapped in code to avoid other callbacks
  /// during the call, and it checks for state changes after the call
  /// that should cause further callbacks.
  void _guardCallback(void Function() callback) {
    assert(!_inCallback);
    bool wasInputPaused = _isInputPaused;
    _state |= _STATE_IN_CALLBACK;
    callback();
    _state &= ~_STATE_IN_CALLBACK;
    _checkState(wasInputPaused);
  }

  /// Check if the input needs to be informed of state changes.
  ///
  /// State changes are pausing, resuming and canceling.
  ///
  /// After canceling, no further callbacks will happen.
  ///
  /// The cancel callback is called after a user cancel, or after
  /// the final done event is sent.
  void _checkState(bool wasInputPaused) {
    assert(!_inCallback);
    if (_hasPending && _pending!.isEmpty) {
      _state &= ~_STATE_HAS_PENDING;
      if (_isInputPaused && _mayResumeInput) {
        _state &= ~_STATE_INPUT_PAUSED;
      }
    }
    // If the state changes during a callback, we immediately
    // make a new state-change callback. Loop until the state didn't change.
    while (true) {
      if (_isCanceled) {
        _pending = null;
        return;
      }
      bool isInputPaused = _isInputPaused;
      if (wasInputPaused == isInputPaused) break;
      _state ^= _STATE_IN_CALLBACK;
      if (isInputPaused) {
        _onPause();
      } else {
        _onResume();
      }
      _state &= ~_STATE_IN_CALLBACK;
      wasInputPaused = isInputPaused;
    }
    if (_hasPending && !_isPaused) {
      _pending!.schedule(this);
    }
  }
}

// -------------------------------------------------------------------
// Common base class for single and multi-subscription streams.
// -------------------------------------------------------------------
abstract class _StreamImpl<T> extends Stream<T> {
  // ------------------------------------------------------------------
  // Stream interface.

  StreamSubscription<T> listen(void onData(T data)?,
      {Function? onError, void onDone()?, bool? cancelOnError}) {
    cancelOnError ??= false;
    StreamSubscription<T> subscription =
        _createSubscription(onData, onError, onDone, cancelOnError);
    _onListen(subscription);
    return subscription;
  }

  // -------------------------------------------------------------------
  /// Create a subscription object. Called by [subcribe].
  StreamSubscription<T> _createSubscription(void onData(T data)?,
      Function? onError, void onDone()?, bool cancelOnError) {
    return new _BufferingStreamSubscription<T>(
        onData, onError, onDone, cancelOnError);
  }

  /// Hook called when the subscription has been created.
  void _onListen(StreamSubscription subscription) {}
}

typedef _PendingEvents<T> _EventGenerator<T>();

/// Stream that generates its own events.
class _GeneratedStreamImpl<T> extends _StreamImpl<T> {
  final _EventGenerator<T> _pending;
  bool _isUsed = false;

  /// Initializes the stream to have only the events provided by a
  /// [_PendingEvents].
  ///
  /// A new [_PendingEvents] must be generated for each listen.
  _GeneratedStreamImpl(this._pending);

  StreamSubscription<T> _createSubscription(void onData(T data)?,
      Function? onError, void onDone()?, bool cancelOnError) {
    if (_isUsed) throw new StateError("Stream has already been listened to.");
    _isUsed = true;
    return new _BufferingStreamSubscription<T>(
        onData, onError, onDone, cancelOnError)
      .._setPendingEvents(_pending());
  }
}

/// Pending events object that gets its events from an [Iterable].
class _IterablePendingEvents<T> extends _PendingEvents<T> {
  // The iterator providing data for data events.
  // Set to null when iteration has completed.
  Iterator<T>? _iterator;

  _IterablePendingEvents(Iterable<T> data) : _iterator = data.iterator;

  bool get isEmpty => _iterator == null;

  void handleNext(_EventDispatch<T> dispatch) {
    var iterator = _iterator;
    if (iterator == null) {
      throw new StateError("No events pending.");
    }
    // Send one event per call to moveNext.
    // If moveNext returns true, send the current element as data.
    // If current throws, send that error, but keep iterating.
    // If moveNext returns false, send a done event and clear the _iterator.
    // If moveNext throws an error, send an error and prepare to send a done
    // event afterwards.
    bool movedNext = false;
    try {
      if (iterator.moveNext()) {
        movedNext = true;
        dispatch._sendData(iterator.current);
      } else {
        _iterator = null;
        dispatch._sendDone();
      }
    } catch (e, s) {
      if (!movedNext) {
        // Threw in .moveNext().
        // Ensure that we send a done afterwards.
        _iterator = const EmptyIterator<Never>();
      }
      // Else threw in .current.
      dispatch._sendError(e, s);
    }
  }

  void clear() {
    if (isScheduled) cancelSchedule();
    _iterator = null;
  }
}

// Internal helpers.

// Types of the different handlers on a stream. Types used to type fields.
typedef void _DataHandler<T>(T value);
typedef void _DoneHandler();

/// Default data handler, does nothing.
void _nullDataHandler(dynamic value) {}

/// Default error handler, reports the error to the current zone's handler.
void _nullErrorHandler(Object error, StackTrace stackTrace) {
  Zone.current.handleUncaughtError(error, stackTrace);
}

/// Default done handler, does nothing.
void _nullDoneHandler() {}

/// A delayed event on a buffering stream subscription.
abstract class _DelayedEvent<T> {
  /// Added as a linked list on the [StreamController].
  _DelayedEvent? next;

  /// Execute the delayed event on the [StreamController].
  void perform(_EventDispatch<T> dispatch);
}

/// A delayed data event.
class _DelayedData<T> extends _DelayedEvent<T> {
  final T value;
  _DelayedData(this.value);
  void perform(_EventDispatch<T> dispatch) {
    dispatch._sendData(value);
  }
}

/// A delayed error event.
class _DelayedError extends _DelayedEvent {
  final Object error;
  final StackTrace stackTrace;

  _DelayedError(this.error, this.stackTrace);
  void perform(_EventDispatch dispatch) {
    dispatch._sendError(error, stackTrace);
  }
}

/// A delayed done event.
class _DelayedDone implements _DelayedEvent {
  const _DelayedDone();
  void perform(_EventDispatch dispatch) {
    dispatch._sendDone();
  }

  _DelayedEvent? get next => null;

  void set next(_DelayedEvent? _) {
    throw new StateError("No events after a done.");
  }
}

/// Superclass for provider of pending events.
abstract class _PendingEvents<T> {
  // No async event has been scheduled.
  static const int _STATE_UNSCHEDULED = 0;
  // An async event has been scheduled to run a function.
  static const int _STATE_SCHEDULED = 1;
  // An async event has been scheduled, but it will do nothing when it runs.
  // Async events can't be preempted.
  static const int _STATE_CANCELED = 3;

  /// State of being scheduled.
  ///
  /// Set to [_STATE_SCHEDULED] when pending events are scheduled for
  /// async dispatch. Since we can't cancel a [scheduleMicrotask] call, if
  /// scheduling is "canceled", the _state is simply set to [_STATE_CANCELED]
  /// which will make the async code do nothing except resetting [_state].
  ///
  /// If events are scheduled while the state is [_STATE_CANCELED], it is
  /// merely switched back to [_STATE_SCHEDULED], but no new call to
  /// [scheduleMicrotask] is performed.
  int _state = _STATE_UNSCHEDULED;

  bool get isEmpty;

  bool get isScheduled => _state == _STATE_SCHEDULED;
  bool get _eventScheduled => _state >= _STATE_SCHEDULED;

  /// Schedule an event to run later.
  ///
  /// If called more than once, it should be called with the same dispatch as
  /// argument each time. It may reuse an earlier argument in some cases.
  void schedule(_EventDispatch<T> dispatch) {
    if (isScheduled) return;
    assert(!isEmpty);
    if (_eventScheduled) {
      assert(_state == _STATE_CANCELED);
      _state = _STATE_SCHEDULED;
      return;
    }
    scheduleMicrotask(() {
      int oldState = _state;
      _state = _STATE_UNSCHEDULED;
      if (oldState == _STATE_CANCELED) return;
      handleNext(dispatch);
    });
    _state = _STATE_SCHEDULED;
  }

  void cancelSchedule() {
    if (isScheduled) _state = _STATE_CANCELED;
  }

  void handleNext(_EventDispatch<T> dispatch);

  /// Throw away any pending events and cancel scheduled events.
  void clear();
}

/// Class holding pending events for a [_StreamImpl].
class _StreamImplEvents<T> extends _PendingEvents<T> {
  /// Single linked list of [_DelayedEvent] objects.
  _DelayedEvent? firstPendingEvent;

  /// Last element in the list of pending events. New events are added after it.
  _DelayedEvent? lastPendingEvent;

  bool get isEmpty => lastPendingEvent == null;

  void add(_DelayedEvent event) {
    var lastEvent = lastPendingEvent;
    if (lastEvent == null) {
      firstPendingEvent = lastPendingEvent = event;
    } else {
      lastPendingEvent = lastEvent.next = event;
    }
  }

  void handleNext(_EventDispatch<T> dispatch) {
    assert(!isScheduled);
    assert(!isEmpty);
    _DelayedEvent event = firstPendingEvent!;
    _DelayedEvent? nextEvent = event.next;
    firstPendingEvent = nextEvent;
    if (nextEvent == null) {
      lastPendingEvent = null;
    }
    event.perform(dispatch);
  }

  void clear() {
    if (isScheduled) cancelSchedule();
    firstPendingEvent = lastPendingEvent = null;
  }
}

typedef void _BroadcastCallback<T>(StreamSubscription<T> subscription);

/// Done subscription that will send one done event as soon as possible.
class _DoneStreamSubscription<T> implements StreamSubscription<T> {
  static const int _DONE_SENT = 1;
  static const int _SCHEDULED = 2;
  static const int _PAUSED = 4;

  final Zone _zone;
  int _state = 0;
  _DoneHandler? _onDone;

  _DoneStreamSubscription(this._onDone) : _zone = Zone.current {
    _schedule();
  }

  bool get _isSent => (_state & _DONE_SENT) != 0;
  bool get _isScheduled => (_state & _SCHEDULED) != 0;
  bool get isPaused => _state >= _PAUSED;

  void _schedule() {
    if (_isScheduled) return;
    _zone.scheduleMicrotask(_sendDone);
    _state |= _SCHEDULED;
  }

  void onData(void handleData(T data)?) {}
  void onError(Function? handleError) {}
  void onDone(void handleDone()?) {
    _onDone = handleDone;
  }

  void pause([Future<void>? resumeSignal]) {
    _state += _PAUSED;
    if (resumeSignal != null) resumeSignal.whenComplete(resume);
  }

  void resume() {
    if (isPaused) {
      _state -= _PAUSED;
      if (!isPaused && !_isSent) {
        _schedule();
      }
    }
  }

  Future cancel() => Future._nullFuture;

  Future<E> asFuture<E>([E? futureValue]) {
    E resultValue;
    if (futureValue == null) {
      if (!typeAcceptsNull<E>()) {
        throw ArgumentError.notNull("futureValue");
      }
      resultValue = futureValue as dynamic;
    } else {
      resultValue = futureValue;
    }
    _Future<E> result = new _Future<E>();
    _onDone = () {
      result._completeWithValue(resultValue);
    };
    return result;
  }

  void _sendDone() {
    _state &= ~_SCHEDULED;
    if (isPaused) return;
    _state |= _DONE_SENT;
    var doneHandler = _onDone;
    if (doneHandler != null) _zone.runGuarded(doneHandler);
  }
}

class _AsBroadcastStream<T> extends Stream<T> {
  final Stream<T> _source;
  final _BroadcastCallback<T>? _onListenHandler;
  final _BroadcastCallback<T>? _onCancelHandler;
  final Zone _zone;

  _AsBroadcastStreamController<T>? _controller;
  StreamSubscription<T>? _subscription;

  _AsBroadcastStream(
      this._source,
      void onListenHandler(StreamSubscription<T> subscription)?,
      void onCancelHandler(StreamSubscription<T> subscription)?)
      : _onListenHandler = onListenHandler == null
            ? null
            : Zone.current.registerUnaryCallback<void, StreamSubscription<T>>(
                onListenHandler),
        _onCancelHandler = onCancelHandler == null
            ? null
            : Zone.current.registerUnaryCallback<void, StreamSubscription<T>>(
                onCancelHandler),
        _zone = Zone.current {
    _controller = new _AsBroadcastStreamController<T>(_onListen, _onCancel);
  }

  bool get isBroadcast => true;

  StreamSubscription<T> listen(void onData(T data)?,
      {Function? onError, void onDone()?, bool? cancelOnError}) {
    var controller = _controller;
    if (controller == null || controller.isClosed) {
      // Return a dummy subscription backed by nothing, since
      // it will only ever send one done event.
      return new _DoneStreamSubscription<T>(onDone);
    }
    _subscription ??= _source.listen(controller.add,
        onError: controller.addError, onDone: controller.close);
    return controller._subscribe(
        onData, onError, onDone, cancelOnError ?? false);
  }

  void _onCancel() {
    var controller = _controller;
    bool shutdown = (controller == null) || controller.isClosed;
    var cancelHandler = _onCancelHandler;
    if (cancelHandler != null) {
      _zone.runUnary(cancelHandler, new _BroadcastSubscriptionWrapper<T>(this));
    }
    if (shutdown) {
      var subscription = _subscription;
      if (subscription != null) {
        subscription.cancel();
        _subscription = null;
      }
    }
  }

  void _onListen() {
    var listenHandler = _onListenHandler;
    if (listenHandler != null) {
      _zone.runUnary(listenHandler, new _BroadcastSubscriptionWrapper<T>(this));
    }
  }

  // Methods called from _BroadcastSubscriptionWrapper.
  void _cancelSubscription() {
    // Called by [_controller] when it has no subscribers left.
    var subscription = _subscription;
    if (subscription != null) {
      _subscription = null;
      _controller = null; // Marks the stream as no longer listenable.
      subscription.cancel();
    }
  }

  void _pauseSubscription(Future<void>? resumeSignal) {
    _subscription?.pause(resumeSignal);
  }

  void _resumeSubscription() {
    _subscription?.resume();
  }

  bool get _isSubscriptionPaused {
    return _subscription?.isPaused ?? false;
  }
}

/// Wrapper for subscription that disallows changing handlers.
class _BroadcastSubscriptionWrapper<T> implements StreamSubscription<T> {
  final _AsBroadcastStream _stream;

  _BroadcastSubscriptionWrapper(this._stream);

  void onData(void handleData(T data)?) {
    throw new UnsupportedError(
        "Cannot change handlers of asBroadcastStream source subscription.");
  }

  void onError(Function? handleError) {
    throw new UnsupportedError(
        "Cannot change handlers of asBroadcastStream source subscription.");
  }

  void onDone(void handleDone()?) {
    throw new UnsupportedError(
        "Cannot change handlers of asBroadcastStream source subscription.");
  }

  void pause([Future<void>? resumeSignal]) {
    _stream._pauseSubscription(resumeSignal);
  }

  void resume() {
    _stream._resumeSubscription();
  }

  Future cancel() {
    _stream._cancelSubscription();
    return Future._nullFuture;
  }

  bool get isPaused {
    return _stream._isSubscriptionPaused;
  }

  Future<E> asFuture<E>([E? futureValue]) {
    throw new UnsupportedError(
        "Cannot change handlers of asBroadcastStream source subscription.");
  }
}

/// Simple implementation of [StreamIterator].
///
/// Pauses the stream between calls to [moveNext].
class _StreamIterator<T> implements StreamIterator<T> {
  // The stream iterator is always in one of five states.
  // The value of the [_stateData] field depends on the state.
  //
  // When `_subscription == null`, `_stateData != null`, and not listened yet:
  // The stream iterator has been created, but [moveNext] has not been called
  // yet. The [_stateData] field contains the stream to listen to on the first
  // call to [moveNext] and [current] returns `null`.
  //
  // When `_subscription == null`, `_stateData != null`, during `listen` call.
  // The `listen` call has not returned a subscription yet.
  // The `_stateData` contains the future returned by the first [moveNext]
  // call. This state is only detected inside the stream event callbacks,
  // since it's the only case where they can get called while `_subscription`
  // is `null`. (A well-behaved stream should not be emitting events during
  // the `listen` call, but some do anyway). The [current] is `null`.
  //
  // When `_subscription != null` and `!_hasValue`:
  // The user has called [moveNext] and the iterator is waiting for the next
  // event. The [_stateData] field contains the [_Future] returned by the
  // [_moveNext] call and [current] returns `null.`
  //
  // When `_subscription != null` and `_hasValue`:
  // The most recent call to [moveNext] has completed with a `true` value
  // and [current] provides the value of the data event.
  // The [_stateData] field contains the [current] value.
  //
  // When `_subscription == null` and `_stateData == null`:
  // The stream has completed or been canceled using [cancel].
  // The stream completes on either a done event or an error event.
  // The last call to [moveNext] has completed with `false` and [current]
  // returns `null`.

  /// Subscription being listened to.
  ///
  /// Set to `null` when the stream subscription is done or canceled.
  StreamSubscription<T>? _subscription;

  /// Data value depending on the current state.
  ///
  /// Before first call to [moveNext]: The stream to listen to.
  ///
  /// After calling [moveNext] but before the returned future completes:
  /// The returned future.
  ///
  /// After calling [moveNext] and the returned future has completed
  /// with `true`: The value of [current].
  ///
  /// After calling [moveNext] and the returned future has completed
  /// with `false`, or after calling [cancel]: `null`.
  @pragma("vm:entry-point")
  Object? _stateData;

  /// Whether the iterator is between calls to `moveNext`.
  /// This will usually cause the [_subscription] to be paused, but as an
  /// optimization, we only pause after the [moveNext] future has been
  /// completed.
  bool _hasValue = false;

  _StreamIterator(final Stream<T> stream)
      : _stateData = checkNotNullable(stream, "stream");

  T get current {
    if (_hasValue) return _stateData as dynamic;
    return null as dynamic;
  }

  Future<bool> moveNext() {
    var subscription = _subscription;
    if (subscription != null) {
      if (_hasValue) {
        var future = new _Future<bool>();
        _stateData = future;
        _hasValue = false;
        subscription.resume();
        return future;
      }
      throw new StateError("Already waiting for next.");
    }
    return _initializeOrDone();
  }

  /// Called if there is no active subscription when [moveNext] is called.
  ///
  /// Either starts listening on the stream if this is the first call to
  /// [moveNext], or returns a `false` future because the stream has already
  /// ended.
  Future<bool> _initializeOrDone() {
    assert(_subscription == null);
    var stateData = _stateData;
    if (stateData != null) {
      Stream<T> stream = stateData as dynamic;
      var future = new _Future<bool>();
      _stateData = future;
      // The `listen` call may invoke user code, and it might try to emit
      // events.
      // We ignore data events during `listen`, but error or done events
      // are used to asynchronously complete the future and set `_stateData`
      // to null.
      // This ensures that we do no other user-code callbacks during `listen`
      // than the `onListen` itself. If that code manages to call `moveNext`
      // again on this iterator, then we will get here and fail when the
      // `_stateData` is a future instead of a stream.
      var subscription = stream.listen(_onData,
          onError: _onError, onDone: _onDone, cancelOnError: true);
      if (_stateData != null) {
        _subscription = subscription;
      }
      return future;
    }
    return Future._falseFuture;
  }

  Future cancel() {
    var subscription = _subscription;
    var stateData = _stateData;
    _stateData = null;
    if (subscription != null) {
      _subscription = null;
      if (!_hasValue) {
        _Future<bool> future = stateData as dynamic;
        future._asyncComplete(false);
      } else {
        _hasValue = false;
      }
      return subscription.cancel();
    }
    return Future._nullFuture;
  }

  void _onData(T data) {
    // Ignore events sent during the `listen` call
    // (which can happen if misusing synchronous broadcast stream controllers),
    // or after `cancel` or `done` (for *really* misbehaving streams).
    if (_subscription == null) return;
    _Future<bool> moveNextFuture = _stateData as dynamic;
    _stateData = data;
    _hasValue = true;
    moveNextFuture._complete(true);
    if (_hasValue) _subscription?.pause();
  }

  void _onError(Object error, StackTrace stackTrace) {
    var subscription = _subscription;
    _Future<bool> moveNextFuture = _stateData as dynamic;
    _subscription = null;
    _stateData = null;
    if (subscription != null) {
      moveNextFuture._completeError(error, stackTrace);
    } else {
      // Event delivered during `listen` call.
      moveNextFuture._asyncCompleteError(error, stackTrace);
    }
  }

  void _onDone() {
    var subscription = _subscription;
    _Future<bool> moveNextFuture = _stateData as dynamic;
    _subscription = null;
    _stateData = null;
    if (subscription != null) {
      moveNextFuture._completeWithValue(false);
    } else {
      // Event delivered during `listen` call.
      moveNextFuture._asyncCompleteWithValue(false);
    }
  }
}

/// An empty broadcast stream, sending a done event as soon as possible.
class _EmptyStream<T> extends Stream<T> {
  const _EmptyStream() : super._internal();
  bool get isBroadcast => true;
  StreamSubscription<T> listen(void onData(T data)?,
      {Function? onError, void onDone()?, bool? cancelOnError}) {
    return new _DoneStreamSubscription<T>(onDone);
  }
}

/// A stream which creates a new controller for each listener.
class _MultiStream<T> extends Stream<T> {
  final bool isBroadcast;

  /// The callback called for each listen.
  final void Function(MultiStreamController<T>) _onListen;

  _MultiStream(this._onListen, this.isBroadcast);

  StreamSubscription<T> listen(void onData(T event)?,
      {Function? onError, void onDone()?, bool? cancelOnError}) {
    var controller = _MultiStreamController<T>();
    controller.onListen = () {
      _onListen(controller);
    };
    return controller._subscribe(
        onData, onError, onDone, cancelOnError ?? false);
  }
}

class _MultiStreamController<T> extends _AsyncStreamController<T>
    implements MultiStreamController<T> {
  _MultiStreamController() : super(null, null, null, null);

  void addSync(T data) {
    if (!_mayAddEvent) throw _badEventState();
    if (hasListener) _subscription._add(data);
  }

  void addErrorSync(Object error, [StackTrace? stackTrace]) {
    if (!_mayAddEvent) throw _badEventState();
    if (hasListener) {
      _subscription._addError(error, stackTrace ?? StackTrace.empty);
    }
  }

  void closeSync() {
    if (isClosed) return;
    if (!_mayAddEvent) throw _badEventState();
    _state |= _StreamController._STATE_CLOSED;
    if (hasListener) _subscription._close();
  }

  Stream<T> get stream {
    throw UnsupportedError("Not available");
  }
}
