blob: 1146a13fa837313663812b65410cd96b8f9c6d80 [file] [log] [blame]
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
extension TransformByHandlers<S> on Stream<S> {
/// Transform a stream by callbacks.
///
/// This is similar to `transform(StreamTransformer.fromHandler(...))` except
/// that the handlers are called once per event rather than called for the
/// same event for each listener on a broadcast stream.
Stream<T> transformByHandlers<T>(
{required void Function(S, EventSink<T>) onData,
void Function(Object, StackTrace, EventSink<T>)? onError,
void Function(EventSink<T>)? onDone}) {
final handleError = onError ?? _defaultHandleError;
final handleDone = onDone ?? _defaultHandleDone;
var controller = isBroadcast
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
StreamSubscription<S>? subscription;
controller.onListen = () {
assert(subscription == null);
var valuesDone = false;
subscription = listen((value) => onData(value, controller),
onError: (Object error, StackTrace stackTrace) {
handleError(error, stackTrace, controller);
}, onDone: () {
valuesDone = true;
handleDone(controller);
});
if (!isBroadcast) {
controller
..onPause = subscription!.pause
..onResume = subscription!.resume;
}
controller.onCancel = () {
var toCancel = subscription;
subscription = null;
if (!valuesDone) return toCancel!.cancel();
return null;
};
};
return controller.stream;
}
static void _defaultHandleError<T>(
Object error, StackTrace stackTrace, EventSink<T> sink) {
sink.addError(error, stackTrace);
}
static void _defaultHandleDone<T>(EventSink<T> sink) {
sink.close();
}
}