| // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| part of dart.async; |
| |
| // ------------------------------------------------------------------- |
| // Controller for creating and adding events to a stream. |
| // ------------------------------------------------------------------- |
| |
| /// Type of a stream controller's `onListen`, `onPause` and `onResume` callbacks. |
| typedef void ControllerCallback(); |
| |
| /// Type of stream controller `onCancel` callbacks. |
| typedef FutureOr<void> ControllerCancelCallback(); |
| |
| /// A controller with the stream it controls. |
| /// |
| /// This controller allows sending data, error and done events on |
| /// its [stream]. |
| /// |
| /// This class can be used to create a simple stream that others |
| /// can listen on, and to push events to that stream. |
| /// |
| /// It's possible to check whether the stream is paused or not, and whether |
| /// it has subscribers or not, as well as getting a callback when either of |
| /// these change. |
| /// |
| /// Example: |
| /// ```dart |
| /// final streamController = StreamController( |
| /// onPause: () => print('Paused'), |
| /// onResume: () => print('Resumed'), |
| /// onCancel: () => print('Cancelled'), |
| /// onListen: () => print('Listens'), |
| /// ); |
| /// |
| /// streamController.stream.listen( |
| /// (event) => print('Event: $event'), |
| /// onDone: () => print('Done'), |
| /// onError: (error) => print(error), |
| /// ); |
| /// ``` |
| /// To check whether there is a subscriber on the stream, use [hasListener]. |
| /// ```dart continued |
| /// var hasListener = streamController.hasListener; // true |
| /// ``` |
| /// To send data events to the stream, use [add] or [addStream]. |
| /// ```dart continued |
| /// streamController.add(999); |
| /// final stream = Stream<int>.periodic( |
| /// const Duration(milliseconds: 200), (count) => count * count).take(4); |
| /// await streamController.addStream(stream); |
| /// ``` |
| /// To send an error event to the stream, use [addError] or [addStream]. |
| /// ```dart continued |
| /// streamController.addError(Exception('Issue 101')); |
| /// await streamController.addStream(Stream.error(Exception('Issue 404'))); |
| /// ``` |
| /// To check whether the stream is closed, use [isClosed]. |
| /// ```dart continued |
| /// var isClosed = streamController.isClosed; // false |
| /// ``` |
| /// To close the stream, use [close]. |
| /// ```dart continued |
| /// await streamController.close(); |
| /// isClosed = streamController.isClosed; // true |
| /// ``` |
| abstract class StreamController<T> implements StreamSink<T> { |
| /// The stream that this controller is controlling. |
| Stream<T> get stream; |
| |
| /// A controller with a [stream] that supports only one single subscriber. |
| /// |
| /// If [sync] is true, the returned stream controller is a |
| /// [SynchronousStreamController], and must be used with the care |
| /// and attention necessary to not break the [Stream] contract. If in doubt, |
| /// use the non-sync version. |
| /// |
| /// Using an asynchronous controller will never give the wrong |
| /// behavior, but using a synchronous controller incorrectly can cause |
| /// otherwise correct programs to break. |
| /// |
| /// A synchronous controller is only intended for optimizing event |
| /// propagation when one asynchronous event immediately triggers another. |
| /// It should not be used unless the calls to [add] or [addError] |
| /// are guaranteed to occur in places where it won't break `Stream` invariants. |
| /// |
| /// Use synchronous controllers only to forward (potentially transformed) |
| /// events from another stream or a future. |
| /// |
| /// A Stream should be inert until a subscriber starts listening on it (using |
| /// the [onListen] callback to start producing events). Streams should not |
| /// leak resources (like websockets) when no user ever listens on the stream. |
| /// |
| /// The controller buffers all incoming events until a subscriber is |
| /// registered, but this feature should only be used in rare circumstances. |
| /// |
| /// The [onPause] function is called when the stream becomes |
| /// paused. [onResume] is called when the stream resumed. |
| /// |
| /// The [onListen] callback is called when the stream |
| /// receives its listener and [onCancel] when the listener ends |
| /// its subscription. If [onCancel] needs to perform an asynchronous operation, |
| /// [onCancel] should return a future that completes when the cancel operation |
| /// is done. |
| /// |
| /// If the stream is canceled before the controller needs data the |
| /// [onResume] call might not be executed. |
| factory StreamController( |
| {void onListen()?, |
| void onPause()?, |
| void onResume()?, |
| FutureOr<void> onCancel()?, |
| bool sync = false}) { |
| return sync |
| ? _SyncStreamController<T>(onListen, onPause, onResume, onCancel) |
| : _AsyncStreamController<T>(onListen, onPause, onResume, onCancel); |
| } |
| |
| /// A controller where [stream] can be listened to more than once. |
| /// |
| /// The [Stream] returned by [stream] is a broadcast stream. |
| /// It can be listened to more than once. |
| /// |
| /// A Stream should be inert until a subscriber starts listening on it (using |
| /// the [onListen] callback to start producing events). Streams should not |
| /// leak resources (like websockets) when no user ever listens on the stream. |
| /// |
| /// Broadcast streams do not buffer events when there is no listener. |
| /// |
| /// The controller distributes any events to all currently subscribed |
| /// listeners at the time when [add], [addError] or [close] is called. |
| /// It is not allowed to call `add`, `addError`, or `close` before a previous |
| /// call has returned. The controller does not have any internal queue of |
| /// events, and if there are no listeners at the time the event is added, |
| /// it will just be dropped, or, if it is an error, be reported as uncaught. |
| /// |
| /// Each listener subscription is handled independently, |
| /// and if one pauses, only the pausing listener is affected. |
| /// A paused listener will buffer events internally until unpaused or canceled. |
| /// |
| /// If [sync] is true, events may be fired directly by the stream's |
| /// subscriptions during an [add], [addError] or [close] call. |
| /// The returned stream controller is a [SynchronousStreamController], |
| /// and must be used with the care and attention necessary to not break |
| /// the [Stream] contract. |
| /// See [Completer.sync] for some explanations on when a synchronous |
| /// dispatching can be used. |
| /// If in doubt, keep the controller non-sync. |
| /// |
| /// If [sync] is false, the event will always be fired at a later time, |
| /// after the code adding the event has completed. |
| /// In that case, no guarantees are given with regard to when |
| /// multiple listeners get the events, except that each listener will get |
| /// all events in the correct order. Each subscription handles the events |
| /// individually. |
| /// If two events are sent on an async controller with two listeners, |
| /// one of the listeners may get both events |
| /// before the other listener gets any. |
| /// A listener must be subscribed both when the event is initiated |
| /// (that is, when [add] is called) |
| /// and when the event is later delivered, |
| /// in order to receive the event. |
| /// |
| /// The [onListen] callback is called when the first listener is subscribed, |
| /// and the [onCancel] is called when there are no longer any active listeners. |
| /// If a listener is added again later, after the [onCancel] was called, |
| /// the [onListen] will be called again. |
| factory StreamController.broadcast( |
| {void onListen()?, void onCancel()?, bool sync = false}) { |
| return sync |
| ? _SyncBroadcastStreamController<T>(onListen, onCancel) |
| : _AsyncBroadcastStreamController<T>(onListen, onCancel); |
| } |
| |
| /// The callback which is called when the stream is listened to. |
| /// |
| /// May be set to `null`, in which case no callback will happen. |
| abstract void Function()? onListen; |
| |
| /// The callback which is called when the stream is paused. |
| /// |
| /// May be set to `null`, in which case no callback will happen. |
| /// |
| /// Pause related callbacks are not supported on broadcast stream controllers. |
| abstract void Function()? onPause; |
| |
| /// The callback which is called when the stream is resumed. |
| /// |
| /// May be set to `null`, in which case no callback will happen. |
| /// |
| /// Pause related callbacks are not supported on broadcast stream controllers. |
| abstract void Function()? onResume; |
| |
| /// The callback which is called when the stream is canceled. |
| /// |
| /// May be set to `null`, in which case no callback will happen. |
| abstract FutureOr<void> Function()? onCancel; |
| |
| /// Returns a view of this object that only exposes the [StreamSink] interface. |
| StreamSink<T> get sink; |
| |
| /// Whether the stream controller is closed for adding more events. |
| /// |
| /// The controller becomes closed by calling the [close] method. |
| /// New events cannot be added, by calling [add] or [addError], |
| /// to a closed controller. |
| /// |
| /// If the controller is closed, |
| /// the "done" event might not have been delivered yet, |
| /// but it has been scheduled, and it is too late to add more events. |
| bool get isClosed; |
| |
| /// Whether the subscription would need to buffer events. |
| /// |
| /// This is the case if the controller's stream has a listener and it is |
| /// paused, or if it has not received a listener yet. In that case, the |
| /// controller is considered paused as well. |
| /// |
| /// A broadcast stream controller is never considered paused. It always |
| /// forwards its events to all uncanceled subscriptions, if any, |
| /// and let the subscriptions handle their own pausing and buffering. |
| bool get isPaused; |
| |
| /// Whether there is a subscriber on the [Stream]. |
| bool get hasListener; |
| |
| /// Sends a data [event]. |
| /// |
| /// Listeners receive this event in a later microtask. |
| /// |
| /// Note that a synchronous controller (created by passing true to the `sync` |
| /// parameter of the `StreamController` constructor) delivers events |
| /// immediately. Since this behavior violates the contract mentioned here, |
| /// synchronous controllers should only be used as described in the |
| /// documentation to ensure that the delivered events always *appear* as if |
| /// they were delivered in a separate microtask. |
| void add(T event); |
| |
| /// Sends or enqueues an error event. |
| /// |
| /// Listeners receive this event at a later microtask. This behavior can be |
| /// overridden by using `sync` controllers. Note, however, that sync |
| /// controllers have to satisfy the preconditions mentioned in the |
| /// documentation of the constructors. |
| void addError(Object error, [StackTrace? stackTrace]); |
| |
| /// Closes the stream. |
| /// |
| /// No further events can be added to a closed stream. |
| /// |
| /// The returned future is the same future provided by [done]. |
| /// It is completed when the stream listeners is done sending events, |
| /// This happens either when the done event has been sent, |
| /// or when the subscriber on a single-subscription stream is canceled. |
| /// |
| /// A broadcast stream controller will send the done event |
| /// even if listeners are paused, so some broadcast events may not have been |
| /// received yet when the returned future completes. |
| /// |
| /// If no one listens to a non-broadcast stream, |
| /// or the listener pauses and never resumes, |
| /// the done event will not be sent and this future will never complete. |
| Future close(); |
| |
| /// A future which is completed when the stream controller is done |
| /// sending events. |
| /// |
| /// This happens either when the done event has been sent, or if the |
| /// subscriber on a single-subscription stream is canceled. |
| /// |
| /// A broadcast stream controller will send the done event |
| /// even if listeners are paused, so some broadcast events may not have been |
| /// received yet when the returned future completes. |
| /// |
| /// If there is no listener on a non-broadcast stream, |
| /// or the listener pauses and never resumes, |
| /// the done event will not be sent and this future will never complete. |
| Future get done; |
| |
| /// Receives events from [source] and puts them into this controller's stream. |
| /// |
| /// Returns a future which completes when the source stream is done. |
| /// |
| /// Events must not be added directly to this controller using [add], |
| /// [addError], [close] or [addStream], until the returned future |
| /// is complete. |
| /// |
| /// Data and error events are forwarded to this controller's stream. A done |
| /// event on the source will end the `addStream` operation and complete the |
| /// returned future. |
| /// |
| /// If [cancelOnError] is `true`, only the first error on [source] is |
| /// forwarded to the controller's stream, and the `addStream` ends |
| /// after this. If [cancelOnError] is false, all errors are forwarded |
| /// and only a done event will end the `addStream`. |
| /// If [cancelOnError] is omitted or `null`, it defaults to `false`. |
| Future addStream(Stream<T> source, {bool? cancelOnError}); |
| } |
| |
| /// A stream controller that delivers its events synchronously. |
| /// |
| /// A synchronous stream controller is intended for cases where |
| /// an already asynchronous event triggers an event on a stream. |
| /// |
| /// Instead of adding the event to the stream in a later microtask, |
| /// causing extra latency, the event is instead fired immediately by the |
| /// synchronous stream controller, as if the stream event was |
| /// the current event or microtask. |
| /// |
| /// The synchronous stream controller can be used to break the contract |
| /// on [Stream], and it must be used carefully to avoid doing so. |
| /// |
| /// The only advantage to using a [SynchronousStreamController] over a |
| /// normal [StreamController] is the improved latency. |
| /// Only use the synchronous version if the improvement is significant, |
| /// and if its use is safe. Otherwise just use a normal stream controller, |
| /// which will always have the correct behavior for a [Stream], and won't |
| /// accidentally break other code. |
| /// |
| /// Adding events to a synchronous controller should only happen as the |
| /// very last part of the handling of the original event. |
| /// At that point, adding an event to the stream is equivalent to |
| /// returning to the event loop and adding the event in the next microtask. |
| /// |
| /// Each listener callback will be run as if it was a top-level event |
| /// or microtask. This means that if it throws, the error will be reported as |
| /// uncaught as soon as possible. |
| /// This is one reason to add the event as the last thing in the original event |
| /// handler – any action done after adding the event will delay the report of |
| /// errors in the event listener callbacks. |
| /// |
| /// If an event is added in a setting that isn't known to be another event, |
| /// it may cause the stream's listener to get that event before the listener |
| /// is ready to handle it. We promise that after calling [Stream.listen], |
| /// you won't get any events until the code doing the listen has completed. |
| /// Calling [add] in response to a function call of unknown origin may break |
| /// that promise. |
| /// |
| /// An [onListen] callback from the controller is *not* an asynchronous event, |
| /// and adding events to the controller in the `onListen` callback is always |
| /// wrong. The events will be delivered before the listener has even received |
| /// the subscription yet. |
| /// |
| /// The synchronous broadcast stream controller also has a restrictions that a |
| /// normal stream controller does not: |
| /// The [add], [addError], [close] and [addStream] methods *must not* be |
| /// called while an event is being delivered. |
| /// That is, if a callback on a subscription on the controller's stream causes |
| /// a call to any of the functions above, the call will fail. |
| /// A broadcast stream may have more than one listener, and if an |
| /// event is added synchronously while another is being also in the process |
| /// of being added, the latter event might reach some listeners before |
| /// the former. To prevent that, an event cannot be added while a previous |
| /// event is being fired. |
| /// This guarantees that an event is fully delivered when the |
| /// first [add], [addError] or [close] returns, |
| /// and further events will be delivered in the correct order. |
| /// |
| /// This still only guarantees that the event is delivered to the subscription. |
| /// If the subscription is paused, the actual callback may still happen later, |
| /// and the event will instead be buffered by the subscription. |
| /// Barring pausing, and the following buffered events that haven't been |
| /// delivered yet, callbacks will be called synchronously when an event is added. |
| /// |
| /// Adding an event to a synchronous non-broadcast stream controller while |
| /// another event is in progress may cause the second event to be delayed |
| /// and not be delivered synchronously, and until that event is delivered, |
| /// the controller will not act synchronously. |
| abstract class SynchronousStreamController<T> implements StreamController<T> { |
| /// Adds event to the controller's stream. |
| /// |
| /// As [StreamController.add], but must not be called while an event is |
| /// being added by [add], [addError] or [close]. |
| void add(T data); |
| |
| /// Adds error to the controller's stream. |
| /// |
| /// As [StreamController.addError], but must not be called while an event is |
| /// being added by [add], [addError] or [close]. |
| void addError(Object error, [StackTrace? stackTrace]); |
| |
| /// Closes the controller's stream. |
| /// |
| /// As [StreamController.close], but must not be called while an event is |
| /// being added by [add], [addError] or [close]. |
| Future close(); |
| } |
| |
| abstract class _StreamControllerLifecycle<T> { |
| StreamSubscription<T> _subscribe(void onData(T data)?, Function? onError, |
| void onDone()?, bool cancelOnError); |
| void _recordPause(StreamSubscription<T> subscription) {} |
| void _recordResume(StreamSubscription<T> subscription) {} |
| Future<void>? _recordCancel(StreamSubscription<T> subscription) => null; |
| } |
| |
| // Base type for implementations of stream controllers. |
| abstract class _StreamControllerBase<T> |
| implements |
| StreamController<T>, |
| _StreamControllerLifecycle<T>, |
| _EventSink<T>, |
| _EventDispatch<T> {} |
| |
| /// Default implementation of [StreamController]. |
| /// |
| /// Controls a stream that only supports a single controller. |
| abstract class _StreamController<T> implements _StreamControllerBase<T> { |
| // The states are bit-flags. More than one can be set at a time. |
| // |
| // The "subscription state" goes through the states: |
| // initial -> subscribed -> canceled. |
| // These are mutually exclusive. |
| // The "closed" state records whether the [close] method has been called |
| // on the controller. This can be done at any time. If done before |
| // subscription, the done event is queued. If done after cancel, the done |
| // event is ignored (just as any other event after a cancel). |
| |
| /// The controller is in its initial state with no subscription. |
| static const int _STATE_INITIAL = 0; |
| |
| /// The controller has a subscription, but hasn't been closed or canceled. |
| /// |
| /// Keep in sync with |
| /// runtime/vm/stack_trace.cc:kStreamController_StateSubscribed. |
| static const int _STATE_SUBSCRIBED = 1; |
| |
| /// The subscription is canceled. |
| static const int _STATE_CANCELED = 2; |
| |
| /// Mask for the subscription state. |
| static const int _STATE_SUBSCRIPTION_MASK = 3; |
| |
| // The following state relate to the controller, not the subscription. |
| // If closed, adding more events is not allowed. |
| // If executing an [addStream], new events are not allowed either, but will |
| // be added by the stream. |
| |
| /// The controller is closed due to calling [close]. |
| /// |
| /// When the stream is closed, you can neither add new events nor add new |
| /// listeners. |
| static const int _STATE_CLOSED = 4; |
| |
| /// The controller is in the middle of an [addStream] operation. |
| /// |
| /// While adding events from a stream, no new events can be added directly |
| /// on the controller. |
| static const int _STATE_ADDSTREAM = 8; |
| |
| /// Field containing different data depending on the current subscription |
| /// state. |
| /// |
| /// If [_state] is [_STATE_INITIAL], the field may contain a [_PendingEvents] |
| /// for events added to the controller before a subscription. |
| /// |
| /// While [_state] is [_STATE_SUBSCRIBED], the field contains the subscription. |
| /// |
| /// When [_state] is [_STATE_CANCELED] the field is currently not used, |
| /// and will contain `null`. |
| @pragma("vm:entry-point") |
| Object? _varData; |
| |
| /// Current state of the controller. |
| @pragma("vm:entry-point") |
| int _state = _STATE_INITIAL; |
| |
| /// Future completed when the stream sends its last event. |
| /// |
| /// This is also the future returned by [close]. |
| // TODO(lrn): Could this be stored in the varData field too, if it's not |
| // accessed until the call to "close"? Then we need to special case if it's |
| // accessed earlier, or if close is called before subscribing. |
| _Future<void>? _doneFuture; |
| |
| void Function()? onListen; |
| void Function()? onPause; |
| void Function()? onResume; |
| FutureOr<void> Function()? onCancel; |
| |
| _StreamController(this.onListen, this.onPause, this.onResume, this.onCancel); |
| |
| // Return a new stream every time. The streams are equal, but not identical. |
| Stream<T> get stream => _ControllerStream<T>(this); |
| |
| /// Returns a view of this object that only exposes the [StreamSink] interface. |
| StreamSink<T> get sink => _StreamSinkWrapper<T>(this); |
| |
| /// Whether a listener has existed and been canceled. |
| /// |
| /// After this, adding more events will be ignored. |
| bool get _isCanceled => (_state & _STATE_CANCELED) != 0; |
| |
| /// Whether there is an active listener. |
| bool get hasListener => (_state & _STATE_SUBSCRIBED) != 0; |
| |
| /// Whether there has not been a listener yet. |
| bool get _isInitialState => |
| (_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL; |
| |
| bool get isClosed => (_state & _STATE_CLOSED) != 0; |
| |
| bool get isPaused => |
| hasListener ? _subscription._isInputPaused : !_isCanceled; |
| |
| bool get _isAddingStream => (_state & _STATE_ADDSTREAM) != 0; |
| |
| /// New events may not be added after close, or during addStream. |
| bool get _mayAddEvent => (_state < _STATE_CLOSED); |
| |
| // Returns the pending events. |
| // Pending events are events added before a subscription exists. |
| // They are added to the subscription when it is created. |
| // Pending events, if any, are kept in the _varData field until the |
| // stream is listened to. |
| // While adding a stream, pending events are moved into the |
| // state object to allow the state object to use the _varData field. |
| _PendingEvents<T>? get _pendingEvents { |
| assert(_isInitialState); |
| if (!_isAddingStream) { |
| return _varData as dynamic; |
| } |
| _StreamControllerAddStreamState<T> state = _varData as dynamic; |
| return state.varData; |
| } |
| |
| // Returns the pending events, and creates the object if necessary. |
| _StreamImplEvents<T> _ensurePendingEvents() { |
| assert(_isInitialState); |
| if (!_isAddingStream) { |
| Object? events = _varData; |
| if (events == null) { |
| _varData = events = _StreamImplEvents<T>(); |
| } |
| return events as dynamic; |
| } |
| _StreamControllerAddStreamState<T> state = _varData as dynamic; |
| Object? events = state.varData; |
| if (events == null) { |
| state.varData = events = _StreamImplEvents<T>(); |
| } |
| return events as dynamic; |
| } |
| |
| // Get the current subscription. |
| // If we are adding a stream, the subscription is moved into the state |
| // object to allow the state object to use the _varData field. |
| _ControllerSubscription<T> get _subscription { |
| assert(hasListener); |
| Object? varData = _varData; |
| if (_isAddingStream) { |
| _StreamControllerAddStreamState<Object?> streamState = varData as dynamic; |
| varData = streamState.varData; |
| } |
| return varData as dynamic; |
| } |
| |
| /// Creates an error describing why an event cannot be added. |
| /// |
| /// The reason, and therefore the error message, depends on the current state. |
| Error _badEventState() { |
| if (isClosed) { |
| return StateError("Cannot add event after closing"); |
| } |
| assert(_isAddingStream); |
| return StateError("Cannot add event while adding a stream"); |
| } |
| |
| // StreamSink interface. |
| Future addStream(Stream<T> source, {bool? cancelOnError}) { |
| if (!_mayAddEvent) throw _badEventState(); |
| if (_isCanceled) return _Future.immediate(null); |
| _StreamControllerAddStreamState<T> addState = |
| _StreamControllerAddStreamState<T>( |
| this, _varData, source, cancelOnError ?? false); |
| _varData = addState; |
| _state |= _STATE_ADDSTREAM; |
| return addState.addStreamFuture; |
| } |
| |
| /// Returns a future that is completed when the stream is done |
| /// processing events. |
| /// |
| /// This happens either when the done event has been sent, or if the |
| /// subscriber of a single-subscription stream is cancelled. |
| Future<void> get done => _ensureDoneFuture(); |
| |
| Future<void> _ensureDoneFuture() => |
| _doneFuture ??= _isCanceled ? Future._nullFuture : _Future<void>(); |
| |
| /// Send or enqueue a data event. |
| void add(T value) { |
| if (!_mayAddEvent) throw _badEventState(); |
| _add(value); |
| } |
| |
| /// Send or enqueue an error event. |
| void addError(Object error, [StackTrace? stackTrace]) { |
| checkNotNullable(error, "error"); |
| if (!_mayAddEvent) throw _badEventState(); |
| AsyncError? replacement = Zone.current.errorCallback(error, stackTrace); |
| if (replacement != null) { |
| error = replacement.error; |
| stackTrace = replacement.stackTrace; |
| } else { |
| stackTrace ??= AsyncError.defaultStackTrace(error); |
| } |
| _addError(error, stackTrace); |
| } |
| |
| /// Closes this controller and sends a done event on the stream. |
| /// |
| /// The first time a controller is closed, a "done" event is added to its |
| /// stream. |
| /// |
| /// You are allowed to close the controller more than once, but only the first |
| /// call has any effect. |
| /// |
| /// After closing, no further events may be added using [add], [addError] |
| /// or [addStream]. |
| /// |
| /// The returned future is completed when the done event has been delivered. |
| Future close() { |
| if (isClosed) { |
| return _ensureDoneFuture(); |
| } |
| if (!_mayAddEvent) throw _badEventState(); |
| _closeUnchecked(); |
| return _ensureDoneFuture(); |
| } |
| |
| void _closeUnchecked() { |
| _state |= _STATE_CLOSED; |
| if (hasListener) { |
| _sendDone(); |
| } else if (_isInitialState) { |
| _ensurePendingEvents().add(const _DelayedDone()); |
| } |
| } |
| |
| // EventSink interface. Used by the [addStream] events. |
| |
| // Add data event, used both by the [addStream] events and by [add]. |
| void _add(T value) { |
| if (hasListener) { |
| _sendData(value); |
| } else if (_isInitialState) { |
| _ensurePendingEvents().add(_DelayedData<T>(value)); |
| } |
| } |
| |
| void _addError(Object error, StackTrace stackTrace) { |
| if (hasListener) { |
| _sendError(error, stackTrace); |
| } else if (_isInitialState) { |
| _ensurePendingEvents().add(_DelayedError(error, stackTrace)); |
| } |
| } |
| |
| void _close() { |
| // End of addStream stream. |
| assert(_isAddingStream); |
| _StreamControllerAddStreamState<T> addState = _varData as dynamic; |
| _varData = addState.varData; |
| _state &= ~_STATE_ADDSTREAM; |
| addState.complete(); |
| } |
| |
| // _StreamControllerLifeCycle interface |
| |
| StreamSubscription<T> _subscribe(void onData(T data)?, Function? onError, |
| void onDone()?, bool cancelOnError) { |
| if (!_isInitialState) { |
| throw StateError("Stream has already been listened to."); |
| } |
| _ControllerSubscription<T> subscription = _ControllerSubscription<T>( |
| this, onData, onError, onDone, cancelOnError); |
| |
| _PendingEvents<T>? pendingEvents = _pendingEvents; |
| _state |= _STATE_SUBSCRIBED; |
| if (_isAddingStream) { |
| _StreamControllerAddStreamState<T> addState = _varData as dynamic; |
| addState.varData = subscription; |
| addState.resume(); |
| } else { |
| _varData = subscription; |
| } |
| subscription._setPendingEvents(pendingEvents); |
| subscription._guardCallback(() { |
| _runGuarded(onListen); |
| }); |
| |
| return subscription; |
| } |
| |
| Future<void>? _recordCancel(StreamSubscription<T> subscription) { |
| // When we cancel, we first cancel any stream being added, |
| // Then we call `onCancel`, and finally the _doneFuture is completed. |
| // If either of addStream's cancel or `onCancel` returns a future, |
| // we wait for it before continuing. |
| // Any error during this process ends up in the returned future. |
| // If more errors happen, we act as if it happens inside nested try/finallys |
| // or whenComplete calls, and only the last error ends up in the |
| // returned future. |
| Future<void>? result; |
| if (_isAddingStream) { |
| _StreamControllerAddStreamState<T> addState = _varData as dynamic; |
| result = addState.cancel(); |
| } |
| _varData = null; |
| _state = |
| (_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED; |
| |
| var onCancel = this.onCancel; |
| if (onCancel != null) { |
| if (result == null) { |
| // Only introduce a future if one is needed. |
| // If _onCancel returns null, no future is needed. |
| try { |
| var cancelResult = onCancel(); |
| if (cancelResult is Future<void>) { |
| result = cancelResult; |
| } |
| } catch (e, s) { |
| // Return the error in the returned future. |
| // Complete it asynchronously, so there is time for a listener |
| // to handle the error. |
| result = _Future().._asyncCompleteError(e, s); |
| } |
| } else { |
| // Simpler case when we already know that we will return a future. |
| result = result.whenComplete(onCancel); |
| } |
| } |
| |
| void complete() { |
| var doneFuture = _doneFuture; |
| if (doneFuture != null && doneFuture._mayComplete) { |
| doneFuture._asyncComplete(null); |
| } |
| } |
| |
| if (result != null) { |
| result = result.whenComplete(complete); |
| } else { |
| complete(); |
| } |
| |
| return result; |
| } |
| |
| void _recordPause(StreamSubscription<T> subscription) { |
| if (_isAddingStream) { |
| _StreamControllerAddStreamState<T> addState = _varData as dynamic; |
| addState.pause(); |
| } |
| _runGuarded(onPause); |
| } |
| |
| void _recordResume(StreamSubscription<T> subscription) { |
| if (_isAddingStream) { |
| _StreamControllerAddStreamState<T> addState = _varData as dynamic; |
| addState.resume(); |
| } |
| _runGuarded(onResume); |
| } |
| } |
| |
| abstract class _SyncStreamControllerDispatch<T> |
| implements _StreamController<T>, SynchronousStreamController<T> { |
| void _sendData(T data) { |
| _subscription._add(data); |
| } |
| |
| void _sendError(Object error, StackTrace stackTrace) { |
| _subscription._addError(error, stackTrace); |
| } |
| |
| void _sendDone() { |
| _subscription._close(); |
| } |
| } |
| |
| abstract class _AsyncStreamControllerDispatch<T> |
| implements _StreamController<T> { |
| void _sendData(T data) { |
| _subscription._addPending(_DelayedData<T>(data)); |
| } |
| |
| void _sendError(Object error, StackTrace stackTrace) { |
| _subscription._addPending(_DelayedError(error, stackTrace)); |
| } |
| |
| void _sendDone() { |
| _subscription._addPending(const _DelayedDone()); |
| } |
| } |
| |
| // TODO(lrn): Use common superclass for callback-controllers when VM supports |
| // constructors in mixin superclasses. |
| |
| @pragma("vm:entry-point") |
| class _AsyncStreamController<T> = _StreamController<T> |
| with _AsyncStreamControllerDispatch<T>; |
| |
| class _SyncStreamController<T> = _StreamController<T> |
| with _SyncStreamControllerDispatch<T>; |
| |
| void _runGuarded(void Function()? notificationHandler) { |
| if (notificationHandler == null) return; |
| try { |
| notificationHandler(); |
| } catch (e, s) { |
| Zone.current.handleUncaughtError(e, s); |
| } |
| } |
| |
| class _ControllerStream<T> extends _StreamImpl<T> { |
| _StreamControllerLifecycle<T> _controller; |
| |
| _ControllerStream(this._controller); |
| |
| StreamSubscription<T> _createSubscription(void onData(T data)?, |
| Function? onError, void onDone()?, bool cancelOnError) => |
| _controller._subscribe(onData, onError, onDone, cancelOnError); |
| |
| // Override == and hashCode so that new streams returned by the same |
| // controller are considered equal. The controller returns a new stream |
| // each time it's queried, but doesn't have to cache the result. |
| |
| int get hashCode => _controller.hashCode ^ 0x35323532; |
| |
| bool operator ==(Object other) { |
| if (identical(this, other)) return true; |
| return other is _ControllerStream && |
| identical(other._controller, this._controller); |
| } |
| } |
| |
| class _ControllerSubscription<T> extends _BufferingStreamSubscription<T> { |
| final _StreamControllerLifecycle<T> _controller; |
| |
| _ControllerSubscription(this._controller, void onData(T data)?, |
| Function? onError, void onDone()?, bool cancelOnError) |
| : super(onData, onError, onDone, cancelOnError); |
| |
| Future<void>? _onCancel() { |
| return _controller._recordCancel(this); |
| } |
| |
| void _onPause() { |
| _controller._recordPause(this); |
| } |
| |
| void _onResume() { |
| _controller._recordResume(this); |
| } |
| } |
| |
| /// A class that exposes only the [StreamSink] interface of an object. |
| class _StreamSinkWrapper<T> implements StreamSink<T> { |
| final StreamController _target; |
| _StreamSinkWrapper(this._target); |
| void add(T data) { |
| _target.add(data); |
| } |
| |
| void addError(Object error, [StackTrace? stackTrace]) { |
| _target.addError(error, stackTrace); |
| } |
| |
| Future close() => _target.close(); |
| |
| Future addStream(Stream<T> source) => _target.addStream(source); |
| |
| Future get done => _target.done; |
| } |
| |
| /// Object containing the state used to handle [StreamController.addStream]. |
| class _AddStreamState<T> { |
| // [_Future] returned by call to addStream. |
| final _Future addStreamFuture; |
| |
| // Subscription on stream argument to addStream. |
| final StreamSubscription addSubscription; |
| |
| _AddStreamState( |
| _EventSink<T> controller, Stream<T> source, bool cancelOnError) |
| : addStreamFuture = _Future(), |
| addSubscription = source.listen(controller._add, |
| onError: cancelOnError |
| ? makeErrorHandler(controller) |
| : controller._addError, |
| onDone: controller._close, |
| cancelOnError: cancelOnError); |
| |
| static makeErrorHandler(_EventSink controller) => (Object e, StackTrace s) { |
| controller._addError(e, s); |
| controller._close(); |
| }; |
| |
| void pause() { |
| addSubscription.pause(); |
| } |
| |
| void resume() { |
| addSubscription.resume(); |
| } |
| |
| /// Stop adding the stream. |
| /// |
| /// Complete the future returned by `StreamController.addStream` when |
| /// the cancel is complete. |
| /// |
| /// Return a future if the cancel takes time, otherwise return `null`. |
| Future<void> cancel() { |
| var cancel = addSubscription.cancel(); |
| if (cancel == null) { |
| addStreamFuture._asyncComplete(null); |
| return Future._nullFuture; |
| } |
| return cancel.whenComplete(() { |
| addStreamFuture._asyncComplete(null); |
| }); |
| } |
| |
| void complete() { |
| addStreamFuture._asyncComplete(null); |
| } |
| } |
| |
| class _StreamControllerAddStreamState<T> extends _AddStreamState<T> { |
| // The subscription or pending data of a _StreamController. |
| // Stored here because we reuse the `_varData` field in the _StreamController |
| // to store this state object. |
| var varData; |
| |
| _StreamControllerAddStreamState(_StreamController<T> controller, this.varData, |
| Stream<T> source, bool cancelOnError) |
| : super(controller, source, cancelOnError) { |
| if (controller.isPaused) { |
| addSubscription.pause(); |
| } |
| } |
| } |