| // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| import 'dart:async'; |
| |
| import '../stream_sink_transformer.dart'; |
| import '../delegate/stream_sink.dart'; |
| |
| /// The type of the callback for handling data events. |
| typedef HandleData<S, T> = void Function(S data, EventSink<T> sink); |
| |
| /// The type of the callback for handling error events. |
| // |
| // TODO: Update to take a non-nullable StackTrace once that change lands in |
| // the sdk. |
| typedef HandleError<T> = void Function( |
| Object error, StackTrace stackTrace, EventSink<T> sink); |
| |
| /// The type of the callback for handling done events. |
| typedef HandleDone<T> = void Function(EventSink<T> sink); |
| |
| /// A [StreamSinkTransformer] that delegates events to the given handlers. |
| class HandlerTransformer<S, T> implements StreamSinkTransformer<S, T> { |
| /// The handler for data events. |
| final HandleData<S, T>? _handleData; |
| |
| /// The handler for error events. |
| final HandleError<T>? _handleError; |
| |
| /// The handler for done events. |
| final HandleDone<T>? _handleDone; |
| |
| HandlerTransformer(this._handleData, this._handleError, this._handleDone); |
| |
| @override |
| StreamSink<S> bind(StreamSink<T> sink) => _HandlerSink<S, T>(this, sink); |
| } |
| |
| /// A sink created by [HandlerTransformer]. |
| class _HandlerSink<S, T> implements StreamSink<S> { |
| /// The transformer that created this sink. |
| final HandlerTransformer<S, T> _transformer; |
| |
| /// The original sink that's being transformed. |
| final StreamSink<T> _inner; |
| |
| /// The wrapper for [_inner] whose [StreamSink.close] method can't emit |
| /// errors. |
| final StreamSink<T> _safeCloseInner; |
| |
| @override |
| Future get done => _inner.done; |
| |
| _HandlerSink(this._transformer, StreamSink<T> inner) |
| : _inner = inner, |
| _safeCloseInner = _SafeCloseSink<T>(inner); |
| |
| @override |
| void add(S event) { |
| var handleData = _transformer._handleData; |
| if (handleData == null) { |
| _inner.add(event as T); |
| } else { |
| handleData(event, _safeCloseInner); |
| } |
| } |
| |
| @override |
| void addError(error, [StackTrace? stackTrace]) { |
| var handleError = _transformer._handleError; |
| if (handleError == null) { |
| _inner.addError(error, stackTrace); |
| } else { |
| handleError(error, stackTrace ?? AsyncError.defaultStackTrace(error), |
| _safeCloseInner); |
| } |
| } |
| |
| @override |
| Future addStream(Stream<S> stream) { |
| return _inner.addStream(stream.transform( |
| StreamTransformer<S, T>.fromHandlers( |
| handleData: _transformer._handleData, |
| handleError: _transformer._handleError, |
| handleDone: _closeSink))); |
| } |
| |
| @override |
| Future close() { |
| var handleDone = _transformer._handleDone; |
| if (handleDone == null) return _inner.close(); |
| |
| handleDone(_safeCloseInner); |
| return _inner.done; |
| } |
| } |
| |
| /// A wrapper for [StreamSink]s that swallows any errors returned by [close]. |
| /// |
| /// [HandlerTransformer] passes this to its handlers to ensure that when they |
| /// call [close], they don't leave any dangling [Future]s behind that might emit |
| /// unhandleable errors. |
| class _SafeCloseSink<T> extends DelegatingStreamSink<T> { |
| _SafeCloseSink(StreamSink<T> inner) : super(inner); |
| |
| @override |
| Future close() => super.close().catchError((_) {}); |
| } |
| |
| /// A function to pass as a [StreamTransformer]'s `handleDone` callback. |
| void _closeSink(EventSink sink) { |
| sink.close(); |
| } |