blob: 8157ea1695466ba2203b9faa0028f7a82298c5bc [file] [log] [blame]
import 'dart:async';
typedef void HandleData<S, T>(S value, EventSink<T> sink);
typedef void HandleDone<T>(EventSink<T> sink);
/// Like [new StreamTransformer.fromHandlers] but the handlers are called once
/// per event rather than once per listener for broadcast streams.
StreamTransformer<S, T> fromHandlers<S, T>(
{HandleData<S, T> handleData, HandleDone<T> handleDone}) =>
new _StreamTransformer(handleData: handleData, handleDone: handleDone);
class _StreamTransformer<S, T> implements StreamTransformer<S, T> {
final HandleData<S, T> _handleData;
final HandleDone<T> _handleDone;
_StreamTransformer({HandleData<S, T> handleData, HandleDone<T> handleDone})
: _handleData = handleData ?? _defaultHandleData,
_handleDone = handleDone ?? _defaultHandleDone;
static _defaultHandleData<S, T>(S value, EventSink<T> sink) {
sink.add(value as T);
}
static _defaultHandleDone<T>(EventSink<T> sink) {
sink.close();
}
@override
Stream<T> bind(Stream<S> values) {
StreamController<T> controller;
if (values.isBroadcast) {
controller = new StreamController<T>.broadcast();
} else {
controller = new StreamController<T>();
}
StreamSubscription<S> subscription;
controller.onListen = () {
if (subscription != null) {
return;
}
subscription = values.listen((value) => _handleData(value, controller),
onError: controller.addError, onDone: () {
_handleDone(controller);
});
};
if (!values.isBroadcast) {
controller.onPause = () => subscription?.pause();
controller.onResume = () => subscription?.resume();
}
controller.onCancel = () {
if (controller.hasListener || subscription == null) {
return new Future.value();
}
var toCancel = subscription;
subscription = null;
return toCancel.cancel();
};
return controller.stream;
}
}