blob: 453304348ff07e0c24067218bc3e6145e36b7dc5 [file] [log] [blame]
// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of dart.async;
/// Runs user code and takes actions depending on success or failure.
_runUserCode<T>(T userCode(), onSuccess(T value),
onError(Object error, StackTrace stackTrace)) {
try {
onSuccess(userCode());
} catch (e, s) {
AsyncError? replacement = Zone.current.errorCallback(e, s);
if (replacement == null) {
onError(e, s);
} else {
var error = replacement.error;
var stackTrace = replacement.stackTrace;
onError(error, stackTrace);
}
}
}
/** Helper function to cancel a subscription and wait for the potential future,
before completing with an error. */
void _cancelAndError(StreamSubscription subscription, _Future future,
Object error, StackTrace stackTrace) {
var cancelFuture = subscription.cancel();
if (cancelFuture != null && !identical(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() => future._completeError(error, stackTrace));
} else {
future._completeError(error, stackTrace);
}
}
void _cancelAndErrorWithReplacement(StreamSubscription subscription,
_Future future, Object error, StackTrace stackTrace) {
AsyncError? replacement = Zone.current.errorCallback(error, stackTrace);
if (replacement != null) {
error = replacement.error;
stackTrace = replacement.stackTrace;
}
_cancelAndError(subscription, future, error, stackTrace);
}
/// Helper function to make an onError argument to [_runUserCode].
void Function(Object error, StackTrace stackTrace) _cancelAndErrorClosure(
StreamSubscription subscription, _Future future) {
return (Object error, StackTrace stackTrace) {
_cancelAndError(subscription, future, error, stackTrace);
};
}
/** Helper function to cancel a subscription and wait for the potential future,
before completing with a value. */
void _cancelAndValue(StreamSubscription subscription, _Future future, value) {
var cancelFuture = subscription.cancel();
if (cancelFuture != null && !identical(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() => future._complete(value));
} else {
future._complete(value);
}
}
/// A [Stream] that forwards subscriptions to another stream.
///
/// This stream implements [Stream], but forwards all subscriptions
/// to an underlying stream, and wraps the returned subscription to
/// modify the events on the way.
///
/// This class is intended for internal use only.
abstract class _ForwardingStream<S, T> extends Stream<T> {
final Stream<S> _source;
_ForwardingStream(this._source);
bool get isBroadcast => _source.isBroadcast;
StreamSubscription<T> listen(void onData(T value)?,
{Function? onError, void onDone()?, bool? cancelOnError}) {
return _createSubscription(onData, onError, onDone, cancelOnError ?? false);
}
StreamSubscription<T> _createSubscription(void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError) {
return new _ForwardingStreamSubscription<S, T>(
this, onData, onError, onDone, cancelOnError);
}
// Override the following methods in subclasses to change the behavior.
void _handleData(S data, _EventSink<T> sink);
void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) {
sink._addError(error, stackTrace);
}
void _handleDone(_EventSink<T> sink) {
sink._close();
}
}
/// Abstract superclass for subscriptions that forward to other subscriptions.
class _ForwardingStreamSubscription<S, T>
extends _BufferingStreamSubscription<T> {
final _ForwardingStream<S, T> _stream;
StreamSubscription<S>? _subscription;
_ForwardingStreamSubscription(this._stream, void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError)
: super(onData, onError, onDone, cancelOnError) {
_subscription = _stream._source
.listen(_handleData, onError: _handleError, onDone: _handleDone);
}
// _StreamSink interface.
// Transformers sending more than one event have no way to know if the stream
// is canceled or closed after the first, so we just ignore remaining events.
void _add(T data) {
if (_isClosed) return;
super._add(data);
}
void _addError(Object error, StackTrace stackTrace) {
if (_isClosed) return;
super._addError(error, stackTrace);
}
// StreamSubscription callbacks.
void _onPause() {
_subscription?.pause();
}
void _onResume() {
_subscription?.resume();
}
Future<void>? _onCancel() {
var subscription = _subscription;
if (subscription != null) {
_subscription = null;
return subscription.cancel();
}
return null;
}
// Methods used as listener on source subscription.
void _handleData(S data) {
_stream._handleData(data, this);
}
void _handleError(error, StackTrace stackTrace) {
_stream._handleError(error, stackTrace, this);
}
void _handleDone() {
_stream._handleDone(this);
}
}
// -------------------------------------------------------------------
// Stream transformers used by the default Stream implementation.
// -------------------------------------------------------------------
void _addErrorWithReplacement(
_EventSink sink, Object error, StackTrace stackTrace) {
AsyncError? replacement = Zone.current.errorCallback(error, stackTrace);
if (replacement != null) {
error = replacement.error;
stackTrace = replacement.stackTrace;
}
sink._addError(error, stackTrace);
}
class _WhereStream<T> extends _ForwardingStream<T, T> {
final bool Function(T) _test;
_WhereStream(Stream<T> source, bool test(T value))
: _test = test,
super(source);
void _handleData(T inputEvent, _EventSink<T> sink) {
bool satisfies;
try {
satisfies = _test(inputEvent);
} catch (e, s) {
_addErrorWithReplacement(sink, e, s);
return;
}
if (satisfies) {
sink._add(inputEvent);
}
}
}
typedef T _Transformation<S, T>(S value);
/// A stream pipe that converts data events before passing them on.
class _MapStream<S, T> extends _ForwardingStream<S, T> {
final _Transformation<S, T> _transform;
_MapStream(Stream<S> source, T transform(S event))
: this._transform = transform,
super(source);
void _handleData(S inputEvent, _EventSink<T> sink) {
T outputEvent;
try {
outputEvent = _transform(inputEvent);
} catch (e, s) {
_addErrorWithReplacement(sink, e, s);
return;
}
sink._add(outputEvent);
}
}
/// A stream pipe that converts data events before passing them on.
class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
final _Transformation<S, Iterable<T>> _expand;
_ExpandStream(Stream<S> source, Iterable<T> expand(S event))
: this._expand = expand,
super(source);
void _handleData(S inputEvent, _EventSink<T> sink) {
try {
for (T value in _expand(inputEvent)) {
sink._add(value);
}
} catch (e, s) {
// If either _expand or iterating the generated iterator throws,
// we abort the iteration.
_addErrorWithReplacement(sink, e, s);
}
}
}
/// A stream pipe that converts or disposes error events
/// before passing them on.
class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
final Function _onError;
final bool Function(Object)? _test;
_HandleErrorStream(Stream<T> source, this._onError, this._test)
: super(source);
void _handleData(T data, _EventSink<T> sink) {
sink._add(data);
}
void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) {
bool matches = true;
var test = _test;
if (test != null) {
try {
matches = test(error);
} catch (e, s) {
_addErrorWithReplacement(sink, e, s);
return;
}
}
if (matches) {
try {
_invokeErrorHandler(_onError, error, stackTrace);
} catch (e, s) {
if (identical(e, error)) {
sink._addError(error, stackTrace);
} else {
_addErrorWithReplacement(sink, e, s);
}
return;
}
} else {
sink._addError(error, stackTrace);
}
}
}
class _TakeStream<T> extends _ForwardingStream<T, T> {
final int _count;
_TakeStream(Stream<T> source, int count)
: this._count = count,
super(source);
StreamSubscription<T> _createSubscription(void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError) {
if (_count == 0) {
_source.listen(null).cancel();
return new _DoneStreamSubscription<T>(onDone);
}
return new _StateStreamSubscription<int, T>(
this, onData, onError, onDone, cancelOnError, _count);
}
void _handleData(T inputEvent, _EventSink<T> sink) {
var subscription = sink as _StateStreamSubscription<int, T>;
int count = subscription._subState;
if (count > 0) {
sink._add(inputEvent);
count -= 1;
subscription._subState = count;
if (count == 0) {
// Closing also unsubscribes all subscribers, which unsubscribes
// this from source.
sink._close();
}
}
}
}
/// A [_ForwardingStreamSubscription] with one extra state field.
///
/// Use by several different classes, storing an integer, bool or general.
class _StateStreamSubscription<S, T>
extends _ForwardingStreamSubscription<T, T> {
S _subState;
_StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError, this._subState)
: super(stream, onData, onError, onDone, cancelOnError);
}
class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
final bool Function(T) _test;
_TakeWhileStream(Stream<T> source, bool test(T value))
: this._test = test,
super(source);
void _handleData(T inputEvent, _EventSink<T> sink) {
bool satisfies;
try {
satisfies = _test(inputEvent);
} catch (e, s) {
_addErrorWithReplacement(sink, e, s);
// The test didn't say true. Didn't say false either, but we stop anyway.
sink._close();
return;
}
if (satisfies) {
sink._add(inputEvent);
} else {
sink._close();
}
}
}
class _SkipStream<T> extends _ForwardingStream<T, T> {
final int _count;
_SkipStream(Stream<T> source, int count)
: this._count = count,
super(source) {
// This test is done early to avoid handling an async error
// in the _handleData method.
RangeError.checkNotNegative(count, "count");
}
StreamSubscription<T> _createSubscription(void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError) {
return new _StateStreamSubscription<int, T>(
this, onData, onError, onDone, cancelOnError, _count);
}
void _handleData(T inputEvent, _EventSink<T> sink) {
var subscription = sink as _StateStreamSubscription<int, T>;
int count = subscription._subState;
if (count > 0) {
subscription._subState = count - 1;
return;
}
sink._add(inputEvent);
}
}
class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
final bool Function(T) _test;
_SkipWhileStream(Stream<T> source, bool test(T value))
: this._test = test,
super(source);
StreamSubscription<T> _createSubscription(void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError) {
return new _StateStreamSubscription<bool, T>(
this, onData, onError, onDone, cancelOnError, false);
}
void _handleData(T inputEvent, _EventSink<T> sink) {
var subscription = sink as _StateStreamSubscription<bool, T>;
bool hasFailed = subscription._subState;
if (hasFailed) {
sink._add(inputEvent);
return;
}
bool satisfies;
try {
satisfies = _test(inputEvent);
} catch (e, s) {
_addErrorWithReplacement(sink, e, s);
// A failure to return a boolean is considered "not matching".
subscription._subState = true;
return;
}
if (!satisfies) {
subscription._subState = true;
sink._add(inputEvent);
}
}
}
class _DistinctStream<T> extends _ForwardingStream<T, T> {
static final _SENTINEL = new Object();
final bool Function(T, T)? _equals;
_DistinctStream(Stream<T> source, bool equals(T a, T b)?)
: _equals = equals,
super(source);
StreamSubscription<T> _createSubscription(void onData(T data)?,
Function? onError, void onDone()?, bool cancelOnError) {
return new _StateStreamSubscription<Object?, T>(
this, onData, onError, onDone, cancelOnError, _SENTINEL);
}
void _handleData(T inputEvent, _EventSink<T> sink) {
var subscription = sink as _StateStreamSubscription<Object?, T>;
var previous = subscription._subState;
if (identical(previous, _SENTINEL)) {
// First event. Cannot use [_equals].
subscription._subState = inputEvent;
sink._add(inputEvent);
} else {
T previousEvent = previous as T;
var equals = _equals;
bool isEqual;
try {
if (equals == null) {
isEqual = (previousEvent == inputEvent);
} else {
isEqual = equals(previousEvent, inputEvent);
}
} catch (e, s) {
_addErrorWithReplacement(sink, e, s);
return;
}
if (!isEqual) {
sink._add(inputEvent);
subscription._subState = inputEvent;
}
}
}
}