blob: 306ada8bf5bc5af82f8b3adeae45c97e84e76db5 [file] [log] [blame]
// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of dart._internal;
// Casting wrappers for asynchronous classes.
class CastStream<S, T> extends Stream<T> {
final Stream<S> _source;
bool get isBroadcast => _source.isBroadcast;
StreamSubscription<T> listen(void Function(T data)? onData,
{Function? onError, void Function()? onDone, bool? cancelOnError}) {
return new CastStreamSubscription<S, T>(
_source.listen(null, onDone: onDone, cancelOnError: cancelOnError))
Stream<R> cast<R>() => new CastStream<S, R>(_source);
class CastStreamSubscription<S, T> implements StreamSubscription<T> {
final StreamSubscription<S> _source;
/// Zone where listen was called.
final Zone _zone = Zone.current;
/// User's data handler.
void Function(T)? _handleData;
/// Copy of _source's handleError so we can report errors in onData.
Function? _handleError;
CastStreamSubscription(this._source) {
Future cancel() => _source.cancel();
void onData(void Function(T data)? handleData) {
_handleData = handleData == null
? null
: _zone.registerUnaryCallback<dynamic, T>(handleData);
void onError(Function? handleError) {
if (handleError == null) {
_handleError = null;
} else if (handleError is void Function(Object, StackTrace)) {
_handleError = _zone
.registerBinaryCallback<dynamic, Object, StackTrace>(handleError);
} else if (handleError is void Function(Object)) {
_handleError = _zone.registerUnaryCallback<dynamic, Object>(handleError);
} else {
throw ArgumentError("handleError callback must take either an Object "
"(the error), or both an Object (the error) and a StackTrace.");
void onDone(void handleDone()?) {
void _onData(S data) {
if (_handleData == null) return;
T targetData;
try {
targetData = data as T;
} catch (error, stack) {
var handleError = _handleError;
if (handleError == null) {
_zone.handleUncaughtError(error, stack);
} else if (handleError is void Function(Object, StackTrace)) {
_zone.runBinaryGuarded<Object, StackTrace>(handleError, error, stack);
} else {
handleError as void Function(Object), error);
_zone.runUnaryGuarded(_handleData!, targetData);
void pause([Future? resumeSignal]) {
void resume() {
bool get isPaused => _source.isPaused;
Future<E> asFuture<E>([E? futureValue]) => _source.asFuture<E>(futureValue);
class CastStreamTransformer<SS, ST, TS, TT>
extends StreamTransformerBase<TS, TT> {
final StreamTransformer<SS, ST> _source;
StreamTransformer<RS, RT> cast<RS, RT>() =>
new CastStreamTransformer<SS, ST, RS, RT>(_source);
Stream<TT> bind(Stream<TS> stream) =>
class CastConverter<SS, ST, TS, TT> extends Converter<TS, TT> {
final Converter<SS, ST> _source;
TT convert(TS input) => _source.convert(input as SS) as TT;
// cast is inherited from Converter.
Stream<TT> bind(Stream<TS> stream) =>
Converter<RS, RT> cast<RS, RT>() =>
new CastConverter<SS, ST, RS, RT>(_source);