|  | // Copyright (c) 2012, the Dart project authors.  Please see the AUTHORS file | 
|  | // for details. All rights reserved. Use of this source code is governed by a | 
|  | // BSD-style license that can be found in the LICENSE file. | 
|  |  | 
|  | part of dart.async; | 
|  |  | 
|  | /// Runs user code and takes actions depending on success or failure. | 
|  | _runUserCode<T>( | 
|  | T userCode(), | 
|  | onSuccess(T value), | 
|  | onError(Object error, StackTrace stackTrace), | 
|  | ) { | 
|  | try { | 
|  | onSuccess(userCode()); | 
|  | } catch (error, stackTrace) { | 
|  | AsyncError? replacement = _interceptError(error, stackTrace); | 
|  | if (replacement != null) { | 
|  | onError(replacement.error, replacement.stackTrace); | 
|  | } else { | 
|  | onError(error, stackTrace); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /** Helper function to cancel a subscription and wait for the potential future, | 
|  | before completing with an error. */ | 
|  | void _cancelAndError( | 
|  | StreamSubscription subscription, | 
|  | _Future future, | 
|  | AsyncError error, | 
|  | ) { | 
|  | var cancelFuture = subscription.cancel(); | 
|  | if (!identical(cancelFuture, Future._nullFuture)) { | 
|  | cancelFuture.whenComplete(() => future._completeErrorObject(error)); | 
|  | } else { | 
|  | future._completeErrorObject(error); | 
|  | } | 
|  | } | 
|  |  | 
|  | void _cancelAndErrorWithReplacement( | 
|  | StreamSubscription subscription, | 
|  | _Future future, | 
|  | Object error, | 
|  | StackTrace stackTrace, | 
|  | ) { | 
|  | _cancelAndError( | 
|  | subscription, | 
|  | future, | 
|  | _interceptCaughtError(error, stackTrace), | 
|  | ); | 
|  | } | 
|  |  | 
|  | /// Helper function to make an onError argument to [_runUserCode]. | 
|  | /// | 
|  | /// The error is already an asynchronous error, so is not intercepted. | 
|  | void Function(Object error, StackTrace stackTrace) _cancelAndErrorClosure( | 
|  | StreamSubscription subscription, | 
|  | _Future future, | 
|  | ) { | 
|  | return (Object error, StackTrace stackTrace) { | 
|  | _cancelAndError(subscription, future, AsyncError(error, stackTrace)); | 
|  | }; | 
|  | } | 
|  |  | 
|  | // Helper function to cancel a subscription and wait for the potential future, | 
|  | // before completing with a value. | 
|  | void _cancelAndValue(StreamSubscription subscription, _Future future, value) { | 
|  | var cancelFuture = subscription.cancel(); | 
|  | if (!identical(cancelFuture, Future._nullFuture)) { | 
|  | cancelFuture.whenComplete(() => future._complete(value)); | 
|  | } else { | 
|  | future._complete(value); | 
|  | } | 
|  | } | 
|  |  | 
|  | /// A [Stream] that forwards subscriptions to another stream. | 
|  | /// | 
|  | /// This stream implements [Stream], but forwards all subscriptions | 
|  | /// to an underlying stream, and wraps the returned subscription to | 
|  | /// modify the events on the way. | 
|  | /// | 
|  | /// This class is intended for internal use only. | 
|  | abstract class _ForwardingStream<S, T> extends Stream<T> { | 
|  | final Stream<S> _source; | 
|  |  | 
|  | _ForwardingStream(this._source); | 
|  |  | 
|  | bool get isBroadcast => _source.isBroadcast; | 
|  |  | 
|  | StreamSubscription<T> listen( | 
|  | void onData(T value)?, { | 
|  | Function? onError, | 
|  | void onDone()?, | 
|  | bool? cancelOnError, | 
|  | }) { | 
|  | return _createSubscription(onData, onError, onDone, cancelOnError ?? false); | 
|  | } | 
|  |  | 
|  | StreamSubscription<T> _createSubscription( | 
|  | void onData(T data)?, | 
|  | Function? onError, | 
|  | void onDone()?, | 
|  | bool cancelOnError, | 
|  | ) { | 
|  | return new _ForwardingStreamSubscription<S, T>( | 
|  | this, | 
|  | onData, | 
|  | onError, | 
|  | onDone, | 
|  | cancelOnError, | 
|  | ); | 
|  | } | 
|  |  | 
|  | // Override the following methods in subclasses to change the behavior. | 
|  |  | 
|  | void _handleData(S data, _EventSink<T> sink); | 
|  |  | 
|  | void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { | 
|  | sink._addError(error, stackTrace); | 
|  | } | 
|  |  | 
|  | void _handleDone(_EventSink<T> sink) { | 
|  | sink._close(); | 
|  | } | 
|  | } | 
|  |  | 
|  | /// Abstract superclass for subscriptions that forward to other subscriptions. | 
|  | class _ForwardingStreamSubscription<S, T> | 
|  | extends _BufferingStreamSubscription<T> { | 
|  | final _ForwardingStream<S, T> _stream; | 
|  |  | 
|  | StreamSubscription<S>? _subscription; | 
|  |  | 
|  | _ForwardingStreamSubscription( | 
|  | this._stream, | 
|  | void onData(T data)?, | 
|  | Function? onError, | 
|  | void onDone()?, | 
|  | bool cancelOnError, | 
|  | ) : super(onData, onError, onDone, cancelOnError) { | 
|  | _subscription = _stream._source.listen( | 
|  | _handleData, | 
|  | onError: _handleError, | 
|  | onDone: _handleDone, | 
|  | ); | 
|  | } | 
|  |  | 
|  | // _StreamSink interface. | 
|  | // Transformers sending more than one event have no way to know if the stream | 
|  | // is canceled or closed after the first, so we just ignore remaining events. | 
|  |  | 
|  | void _add(T data) { | 
|  | if (_isClosed) return; | 
|  | super._add(data); | 
|  | } | 
|  |  | 
|  | void _addError(Object error, StackTrace stackTrace) { | 
|  | if (_isClosed) return; | 
|  | super._addError(error, stackTrace); | 
|  | } | 
|  |  | 
|  | // StreamSubscription callbacks. | 
|  |  | 
|  | void _onPause() { | 
|  | _subscription?.pause(); | 
|  | } | 
|  |  | 
|  | void _onResume() { | 
|  | _subscription?.resume(); | 
|  | } | 
|  |  | 
|  | Future<void>? _onCancel() { | 
|  | var subscription = _subscription; | 
|  | if (subscription != null) { | 
|  | _subscription = null; | 
|  | return subscription.cancel(); | 
|  | } | 
|  | return null; | 
|  | } | 
|  |  | 
|  | // Methods used as listener on source subscription. | 
|  |  | 
|  | void _handleData(S data) { | 
|  | _stream._handleData(data, this); | 
|  | } | 
|  |  | 
|  | void _handleError(error, StackTrace stackTrace) { | 
|  | _stream._handleError(error, stackTrace, this); | 
|  | } | 
|  |  | 
|  | void _handleDone() { | 
|  | _stream._handleDone(this); | 
|  | } | 
|  | } | 
|  |  | 
|  | // ------------------------------------------------------------------- | 
|  | // Stream transformers used by the default Stream implementation. | 
|  | // ------------------------------------------------------------------- | 
|  |  | 
|  | void _addErrorWithReplacement( | 
|  | _EventSink sink, | 
|  | Object error, | 
|  | StackTrace stackTrace, | 
|  | ) { | 
|  | var replacement = _interceptError(error, stackTrace); | 
|  | if (replacement != null) { | 
|  | error = replacement.error; | 
|  | stackTrace = replacement.stackTrace; | 
|  | } | 
|  | sink._addError(error, stackTrace); | 
|  | } | 
|  |  | 
|  | class _WhereStream<T> extends _ForwardingStream<T, T> { | 
|  | final bool Function(T) _test; | 
|  |  | 
|  | _WhereStream(Stream<T> source, bool test(T value)) | 
|  | : _test = test, | 
|  | super(source); | 
|  |  | 
|  | void _handleData(T inputEvent, _EventSink<T> sink) { | 
|  | bool satisfies; | 
|  | try { | 
|  | satisfies = _test(inputEvent); | 
|  | } catch (e, s) { | 
|  | _addErrorWithReplacement(sink, e, s); | 
|  | return; | 
|  | } | 
|  | if (satisfies) { | 
|  | sink._add(inputEvent); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | typedef T _Transformation<S, T>(S value); | 
|  |  | 
|  | /// A stream pipe that converts data events before passing them on. | 
|  | class _MapStream<S, T> extends _ForwardingStream<S, T> { | 
|  | final _Transformation<S, T> _transform; | 
|  |  | 
|  | _MapStream(Stream<S> source, T transform(S event)) | 
|  | : this._transform = transform, | 
|  | super(source); | 
|  |  | 
|  | void _handleData(S inputEvent, _EventSink<T> sink) { | 
|  | T outputEvent; | 
|  | try { | 
|  | outputEvent = _transform(inputEvent); | 
|  | } catch (e, s) { | 
|  | _addErrorWithReplacement(sink, e, s); | 
|  | return; | 
|  | } | 
|  | sink._add(outputEvent); | 
|  | } | 
|  | } | 
|  |  | 
|  | /// A stream pipe that converts data events before passing them on. | 
|  | class _ExpandStream<S, T> extends _ForwardingStream<S, T> { | 
|  | final _Transformation<S, Iterable<T>> _expand; | 
|  |  | 
|  | _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) | 
|  | : this._expand = expand, | 
|  | super(source); | 
|  |  | 
|  | void _handleData(S inputEvent, _EventSink<T> sink) { | 
|  | try { | 
|  | for (T value in _expand(inputEvent)) { | 
|  | sink._add(value); | 
|  | } | 
|  | } catch (e, s) { | 
|  | // If either _expand or iterating the generated iterator throws, | 
|  | // we abort the iteration. | 
|  | _addErrorWithReplacement(sink, e, s); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /// A stream pipe that converts or disposes error events | 
|  | /// before passing them on. | 
|  | class _HandleErrorStream<T> extends _ForwardingStream<T, T> { | 
|  | final void Function(Object, StackTrace) _onError; | 
|  | final bool Function(Object)? _test; | 
|  |  | 
|  | _HandleErrorStream(Stream<T> source, this._onError, this._test) | 
|  | : super(source); | 
|  |  | 
|  | void _handleData(T data, _EventSink<T> sink) { | 
|  | sink._add(data); | 
|  | } | 
|  |  | 
|  | void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { | 
|  | bool matches = true; | 
|  | var test = _test; | 
|  | if (test != null) { | 
|  | try { | 
|  | matches = test(error); | 
|  | } catch (e, s) { | 
|  | _addErrorWithReplacement(sink, e, s); | 
|  | return; | 
|  | } | 
|  | } | 
|  | if (matches) { | 
|  | try { | 
|  | _onError(error, stackTrace); | 
|  | } catch (e, s) { | 
|  | if (identical(e, error)) { | 
|  | sink._addError(error, stackTrace); | 
|  | } else { | 
|  | _addErrorWithReplacement(sink, e, s); | 
|  | } | 
|  | return; | 
|  | } | 
|  | } else { | 
|  | sink._addError(error, stackTrace); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | class _TakeStream<T> extends _ForwardingStream<T, T> { | 
|  | final int _count; | 
|  |  | 
|  | _TakeStream(Stream<T> source, int count) : this._count = count, super(source); | 
|  |  | 
|  | StreamSubscription<T> _createSubscription( | 
|  | void onData(T data)?, | 
|  | Function? onError, | 
|  | void onDone()?, | 
|  | bool cancelOnError, | 
|  | ) { | 
|  | if (_count == 0) { | 
|  | _source.listen(null).cancel(); | 
|  | return new _DoneStreamSubscription<T>(onDone); | 
|  | } | 
|  | return new _StateStreamSubscription<int, T>( | 
|  | this, | 
|  | onData, | 
|  | onError, | 
|  | onDone, | 
|  | cancelOnError, | 
|  | _count, | 
|  | ); | 
|  | } | 
|  |  | 
|  | void _handleData(T inputEvent, _EventSink<T> sink) { | 
|  | var subscription = sink as _StateStreamSubscription<int, T>; | 
|  | int count = subscription._subState; | 
|  | if (count > 0) { | 
|  | sink._add(inputEvent); | 
|  | count -= 1; | 
|  | subscription._subState = count; | 
|  | if (count == 0) { | 
|  | // Closing also unsubscribes all subscribers, which unsubscribes | 
|  | // this from source. | 
|  | sink._close(); | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /// A [_ForwardingStreamSubscription] with one extra state field. | 
|  | /// | 
|  | /// Use by several different classes, storing an integer, bool or general. | 
|  | class _StateStreamSubscription<S, T> | 
|  | extends _ForwardingStreamSubscription<T, T> { | 
|  | S _subState; | 
|  |  | 
|  | _StateStreamSubscription( | 
|  | _ForwardingStream<T, T> stream, | 
|  | void onData(T data)?, | 
|  | Function? onError, | 
|  | void onDone()?, | 
|  | bool cancelOnError, | 
|  | this._subState, | 
|  | ) : super(stream, onData, onError, onDone, cancelOnError); | 
|  | } | 
|  |  | 
|  | class _TakeWhileStream<T> extends _ForwardingStream<T, T> { | 
|  | final bool Function(T) _test; | 
|  |  | 
|  | _TakeWhileStream(Stream<T> source, bool test(T value)) | 
|  | : this._test = test, | 
|  | super(source); | 
|  |  | 
|  | void _handleData(T inputEvent, _EventSink<T> sink) { | 
|  | bool satisfies; | 
|  | try { | 
|  | satisfies = _test(inputEvent); | 
|  | } catch (e, s) { | 
|  | _addErrorWithReplacement(sink, e, s); | 
|  | // The test didn't say true. Didn't say false either, but we stop anyway. | 
|  | sink._close(); | 
|  | return; | 
|  | } | 
|  | if (satisfies) { | 
|  | sink._add(inputEvent); | 
|  | } else { | 
|  | sink._close(); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | class _SkipStream<T> extends _ForwardingStream<T, T> { | 
|  | final int _count; | 
|  |  | 
|  | _SkipStream(Stream<T> source, int count) | 
|  | : this._count = count, | 
|  | super(source) { | 
|  | // This test is done early to avoid handling an async error | 
|  | // in the _handleData method. | 
|  | RangeError.checkNotNegative(count, "count"); | 
|  | } | 
|  |  | 
|  | StreamSubscription<T> _createSubscription( | 
|  | void onData(T data)?, | 
|  | Function? onError, | 
|  | void onDone()?, | 
|  | bool cancelOnError, | 
|  | ) { | 
|  | return new _StateStreamSubscription<int, T>( | 
|  | this, | 
|  | onData, | 
|  | onError, | 
|  | onDone, | 
|  | cancelOnError, | 
|  | _count, | 
|  | ); | 
|  | } | 
|  |  | 
|  | void _handleData(T inputEvent, _EventSink<T> sink) { | 
|  | var subscription = sink as _StateStreamSubscription<int, T>; | 
|  | int count = subscription._subState; | 
|  | if (count > 0) { | 
|  | subscription._subState = count - 1; | 
|  | return; | 
|  | } | 
|  | sink._add(inputEvent); | 
|  | } | 
|  | } | 
|  |  | 
|  | class _SkipWhileStream<T> extends _ForwardingStream<T, T> { | 
|  | final bool Function(T) _test; | 
|  |  | 
|  | _SkipWhileStream(Stream<T> source, bool test(T value)) | 
|  | : this._test = test, | 
|  | super(source); | 
|  |  | 
|  | StreamSubscription<T> _createSubscription( | 
|  | void onData(T data)?, | 
|  | Function? onError, | 
|  | void onDone()?, | 
|  | bool cancelOnError, | 
|  | ) { | 
|  | return new _StateStreamSubscription<bool, T>( | 
|  | this, | 
|  | onData, | 
|  | onError, | 
|  | onDone, | 
|  | cancelOnError, | 
|  | false, | 
|  | ); | 
|  | } | 
|  |  | 
|  | void _handleData(T inputEvent, _EventSink<T> sink) { | 
|  | var subscription = sink as _StateStreamSubscription<bool, T>; | 
|  | bool hasFailed = subscription._subState; | 
|  | if (hasFailed) { | 
|  | sink._add(inputEvent); | 
|  | return; | 
|  | } | 
|  | bool satisfies; | 
|  | try { | 
|  | satisfies = _test(inputEvent); | 
|  | } catch (e, s) { | 
|  | _addErrorWithReplacement(sink, e, s); | 
|  | // A failure to return a boolean is considered "not matching". | 
|  | subscription._subState = true; | 
|  | return; | 
|  | } | 
|  | if (!satisfies) { | 
|  | subscription._subState = true; | 
|  | sink._add(inputEvent); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | class _DistinctStream<T> extends _ForwardingStream<T, T> { | 
|  | static final _SENTINEL = new Object(); | 
|  |  | 
|  | final bool Function(T, T)? _equals; | 
|  |  | 
|  | _DistinctStream(Stream<T> source, bool equals(T a, T b)?) | 
|  | : _equals = equals, | 
|  | super(source); | 
|  |  | 
|  | StreamSubscription<T> _createSubscription( | 
|  | void onData(T data)?, | 
|  | Function? onError, | 
|  | void onDone()?, | 
|  | bool cancelOnError, | 
|  | ) { | 
|  | return new _StateStreamSubscription<Object?, T>( | 
|  | this, | 
|  | onData, | 
|  | onError, | 
|  | onDone, | 
|  | cancelOnError, | 
|  | _SENTINEL, | 
|  | ); | 
|  | } | 
|  |  | 
|  | void _handleData(T inputEvent, _EventSink<T> sink) { | 
|  | var subscription = sink as _StateStreamSubscription<Object?, T>; | 
|  | var previous = subscription._subState; | 
|  | if (identical(previous, _SENTINEL)) { | 
|  | // First event. Cannot use [_equals]. | 
|  | subscription._subState = inputEvent; | 
|  | sink._add(inputEvent); | 
|  | } else { | 
|  | T previousEvent = previous as T; | 
|  | var equals = _equals; | 
|  | bool isEqual; | 
|  | try { | 
|  | if (equals == null) { | 
|  | isEqual = (previousEvent == inputEvent); | 
|  | } else { | 
|  | isEqual = equals(previousEvent, inputEvent); | 
|  | } | 
|  | } catch (e, s) { | 
|  | _addErrorWithReplacement(sink, e, s); | 
|  | return; | 
|  | } | 
|  | if (!isEqual) { | 
|  | sink._add(inputEvent); | 
|  | subscription._subState = inputEvent; | 
|  | } | 
|  | } | 
|  | } | 
|  | } |