Add files.
diff --git a/lib/src/sink_adapter.dart b/lib/src/sink_adapter.dart new file mode 100644 index 0000000..c9cb6ec --- /dev/null +++ b/lib/src/sink_adapter.dart
@@ -0,0 +1,211 @@ +// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +/// An [EventSink] which forwards event sink calls to an [EventHandler]. +/// +/// The [close] method rof [EventHandler.onDone] can return a future, +/// but [EventSink.close] doesn't. +/// Code expecting an [EventSink] won't check for such a future, +/// and an [EventHandler] used with an [EventSinkAdapter] should never +/// return a future. +class EventSinkAdapter<T> implements EventSink<T> { + final EventHandler<T> _handler; + bool _closed = false; + + /// Creates an event sink forwarding events to [handler]. + EventSinkAdapter(EventHandler<T> handler) : _handler = handler; + + @override + void add(T event) { + _checkCanAdd(); + _handler.onData(event); + } + + @override + void addError(Object error, [StackTrace? stackTrace]) { + _checkCanAdd(); + _handler.onError(error, stackTrace ?? _defaultStack(error)); + } + + @override + void close() { + if (!_closed) { + _closed = true; + _handler.onDone(); + } + } + + void _checkCanAdd() { + if (_closed) { + throw StateError('Cannot add event after closing'); + } + } +} + +/// A [StreamSink] which forwards stream sink calls to an [EventHandler]. +class StreamSinkAdapter<T> implements StreamSink<T> { + final EventHandler<T> _handler; + final Completer<void> _doneFuture = Completer(); + bool _addingStream = false; + + /// Creates a stream sink which forwards events to [handler]. + StreamSinkAdapter(EventHandler<T> handler) : _handler = handler; + + @override + Future<void> addStream(Stream<T> stream) async { + _checkCanAdd(); + var completer = Completer<void>.sync(); + _addingStream = true; + stream.listen(_handler.onData, onError: _handler.onError, onDone: () { + _addingStream = false; + completer.complete(null); + }); + return completer.future; + } + + void _checkCanAdd() { + if (_addingStream) { + throw StateError('Cannot add event while adding a stream'); + } + if (_doneFuture.isCompleted) { + throw StateError('Cannot add event after closing'); + } + } + + @override + Future<void> close() { + if (_addingStream) { + throw StateError('Cannot add event while adding a stream'); + } + if (!_doneFuture.isCompleted) _doneFuture.complete(_handler.onDone()); + return _doneFuture.future; + } + + @override + Future get done => _doneFuture.future; + + @override + void add(T event) { + _checkCanAdd(); + _handler.onData(event); + } + + @override + void addError(Object error, [StackTrace? stackTrace]) { + _checkCanAdd(); + _handler.onError(error, stackTrace ?? _defaultStack(error)); + } +} + +/// Helper function to ensure a stack trace is available. +/// +/// If [error] is an [Error], its [Error.stackTrace] is used. +/// If [error] is not an [Error], or its [Error.stackTrace] is `null`, +/// the [StackTrace.current] trace is used instead. +StackTrace _defaultStack(Object error) => + (error is Error ? error.stackTrace : null) ?? StackTrace.current; + +/// Generalized unified callback handler for events from a stream. +/// +/// A subclass can extend this class an override any of the +/// [onData], [onError] or [onDone] methods. +/// The default implementation simply ignores the events. +class EventHandler<T> { + EventHandler(); + + /// Creates an event handler from the provided callbacks. + /// + /// Any callback not provided means that the corresponding event + /// has no effect. + factory EventHandler.fromCallbacks( + {void Function(T)? onData, + void Function(Object, StackTrace)? onError, + Future<void>? Function()? onDone}) = CallbackEventHandler<T>; + + /// Called for data events. + void onData(T value) {} + + /// Called for error events. + void onError(Object error, StackTrace stackTrace) {} + + /// Called for the done event. + Future<void>? onDone() { + return null; + } +} + +/// An event handler which forwards events to provided callback functions. +class CallbackEventHandler<T> implements EventHandler<T> { + final void Function(T)? _onData; + final void Function(Object, StackTrace)? _onError; + final Future<void>? Function()? _onDone; + + /// Creates an event handler forwarding to the provided callback fcunctions. + CallbackEventHandler( + {void Function(T)? onData, + void Function(Object, StackTrace)? onError, + Future<void>? Function()? onDone}) + : _onData = onData, + _onError = onError, + _onDone = onDone; + + @override + void onData(T value) { + _onData?.call(value); + } + + @override + Future<void>? onDone() => _onDone?.call(); + + @override + void onError(Object error, StackTrace stackTrace) { + _onError?.call(error, stackTrace); + } +} + +/// An [EventHandler] where the function called for events can be changed. +/// +/// This class is not zone aware. The functions stored in [dataHandler], +/// [errorHandler] and [doneHandler] are called in the current zone +/// at the time when [onData], [onError] or [onDone] is called. +class MutableCallbackEventHandler<T> implements EventHandler<T> { + /// The data handler called by [onData], if any. + /// + /// When set to `null`, data events have no effect. + void Function(T)? dataHandler; + + /// The error handler called by [onError], if any. + /// + /// When set to `null`, error events have no effect. + void Function(Object, StackTrace)? errorHandler; + + /// The done handler called by [onDone], if any. + /// + /// When set to `null`, done events have no effect. + Future<void>? Function()? doneHandler; + + /// Creates a mutable event handler initialized with any provided callbacks. + MutableCallbackEventHandler( + {void Function(T)? onData, + void Function(Object, StackTrace)? onError, + Future<void>? Function()? onDone}) + : dataHandler = onData, + errorHandler = onError, + doneHandler = onDone; + + @override + void onData(T value) { + dataHandler?.call(value); + } + + @override + void onError(Object error, StackTrace stackTrace) { + errorHandler?.call(error, stackTrace); + } + + @override + Future<void>? onDone() => doneHandler?.call(); +}
diff --git a/test/sink_adapter_test.dart b/test/sink_adapter_test.dart new file mode 100644 index 0000000..9c10170 --- /dev/null +++ b/test/sink_adapter_test.dart
@@ -0,0 +1,258 @@ +// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +@deprecated +library sink_base_test; + +import 'dart:async'; + +import 'package:test/test.dart'; + +import 'package:async/async.dart'; + +void main() { + group('CallbackEventHandler', () { + test('no callbacks', () { + // Nothing happens, but doesn't crash. + var handler = CallbackEventHandler<int>(); + handler.onData(0); + handler.onError('error', StackTrace.current); + handler.onDone(); + }); + test('callbacks', () { + var list = []; + var handler = CallbackEventHandler<int>(onData: (o) { + list.add('data:$o'); + }, onError: (e, s) { + list.add('error:$e'); + }, onDone: () { + list.add('done'); + }); + handler.onData(0); + handler.onError('e1', StackTrace.current); + handler.onData(2); + handler.onError('e3', StackTrace.current); + handler.onDone(); + expect(list, ['data:0', 'error:e1', 'data:2', 'error:e3', 'done']); + }); + }); + + group('MutableCallbackEventHandler', () { + test('no initial callbacks', () { + // Nothing happens, but doesn't crash. + var list = []; + var handler = MutableCallbackEventHandler<int>(); + handler.onData(0); + handler.onError('e1', StackTrace.current); + handler.onDone(); + expect(list, []); + handler.dataHandler = (o) { + list.add('data:$o'); + }; + handler.errorHandler = (e, s) { + list.add('error:$e'); + }; + handler.doneHandler = () { + list.add('done'); + }; + handler.onData(0); + handler.onError('e1', StackTrace.current); + handler.onDone(); + expect(list, ['data:0', 'error:e1', 'done']); + }); + + test('initial callbacks', () { + var list = []; + var list2 = []; + var handler = MutableCallbackEventHandler<int>(onData: (o) { + list.add('data:$o'); + }, onError: (e, s) { + list.add('error:$e'); + }, onDone: () { + list.add('done'); + }); + handler.onData(0); + handler.onError('e1', StackTrace.current); + handler.onData(2); + handler.onError('e3', StackTrace.current); + handler.onDone(); + expect(list, ['data:0', 'error:e1', 'data:2', 'error:e3', 'done']); + expect(list2, []); + handler.dataHandler = (o) { + list2.add('data:$o'); + }; + handler.errorHandler = (e, s) { + list2.add('error:$e'); + }; + handler.doneHandler = () { + list2.add('done'); + }; + handler.onData(0); + handler.onError('e1', StackTrace.current); + handler.onDone(); + expect(list, ['data:0', 'error:e1', 'data:2', 'error:e3', 'done']); + expect(list2, ['data:0', 'error:e1', 'done']); + }); + }); + + // We don't explicitly test [EventSinkBase] because it shares all the relevant + // implementation with [StreamSinkBase]. + void testAdapter(EventSink<T> Function<T>(EventHandler<T>) create) { + test('forwards add() to onAdd()', () { + Object? value; + var sink = create<int>(CallbackEventHandler(onError: (error, stack) { + expect(stack, isA<StackTrace>()); + value = error; + })); + sink.addError('error1', StackTrace.current); + expect(value, 'error1'); + sink.addError('error2'); + expect(value, 'error2'); + }); + + test('forwards close() to onClose()', () { + var closed = false; + var sink = create<int>(CallbackEventHandler(onDone: () { + closed = true; + return null; + })); + sink.close(); + expect(closed, true); + }); + + test('onClose() is only invoked once', () { + var closed = 0; + var sink = create<int>(CallbackEventHandler(onDone: () { + closed++; + })); + expect(closed, 0); + sink.close(); + expect(closed, 1); + sink.close(); + sink.close(); + sink.close(); + expect(closed, 1); + }); + + group("once it's closed", () { + test('add() throws an error', () { + var sink = create<int>(EventHandler()); + sink.close(); + expect(() => sink.add(1), throwsStateError); + }); + + test('addError() throws an error', () { + var sink = create<int>(EventHandler()); + sink.close(); + expect(() => sink.addError('error'), throwsStateError); + }); + }); + } + + group('EventSinkAdapter', () { + testAdapter(<T>(handler) => EventSinkAdapter<T>(handler)); + }); + group('StreamSinkAdapter', () { + testAdapter(<T>(handler) => StreamSinkAdapter<T>(handler)); + test('forwards addStream() to onAdd() and onError()', () { + var list = []; + var sink = StreamSinkAdapter<int>(CallbackEventHandler(onData: (value) { + list.add('data:$value'); + }, onError: (error, stackTrace) { + list.add('error:$error'); + expect(stackTrace, isA<StackTrace>()); + })); + var controller = StreamController<int>(sync: true); + sink.addStream(controller.stream); + + controller.add(123); + controller.addError('error1', StackTrace.current); + controller.add(456); + expect(list, ['data:123', 'error:error1', 'data:456']); + }); + + test('addStream() returns once the stream closes', () async { + var sink = StreamSinkAdapter<int>(EventHandler()); // Ignores all events. + var controller = StreamController<int>(); + var addStreamCompleted = false; + var addStreamTask = sink.addStream(controller.stream); + addStreamTask.whenComplete(() { + addStreamCompleted = true; + }); + await pumpEventQueue(); + + controller.addError('error', StackTrace.current); + await pumpEventQueue(); + expect(addStreamCompleted, false); + + // Will close the stream and complete the future as a microtask. + controller.close(); + await pumpEventQueue(); + expect(addStreamCompleted, true); + }); + + test('all invocations of close() return the same future', () async { + var completer = Completer(); + var sink = StreamSinkAdapter<int>( + CallbackEventHandler(onDone: () => completer.future)); + var close1 = sink.close(); + var close2 = sink.close(); + expect(close1, same(close2)); + }); + + test('done returns the same future as close()', () async { + var completer = Completer<void>(); + var sink = StreamSinkAdapter<int>( + CallbackEventHandler(onDone: () => completer.future)); + + var done = sink.done; + var close = sink.close(); + expect(done, same(close)); + + var doneCompleted = false; + done.whenComplete(() => doneCompleted = true); + + await pumpEventQueue(); + expect(doneCompleted, false); + + completer.complete(); + await pumpEventQueue(); + expect(doneCompleted, true); + }); + + group('during addStream()', () { + test('add() throws an error', () { + var sink = StreamSinkAdapter<int>(EventHandler()); + sink.addStream(StreamController<int>().stream); + expect(() => sink.add(1), throwsStateError); + }); + + test('addError() throws an error', () { + var sink = StreamSinkAdapter<int>(EventHandler()); + sink.addStream(StreamController<int>().stream); + expect(() => sink.addError('error'), throwsStateError); + }); + + test('addStream() throws an error', () { + var sink = StreamSinkAdapter<int>(EventHandler()); + sink.addStream(StreamController<int>().stream); + expect(() => sink.addStream(Stream.value(123)), throwsStateError); + }); + + test('close() throws an error', () { + var sink = StreamSinkAdapter<int>(EventHandler()); + sink.addStream(StreamController<int>().stream); + expect(() => sink.close(), throwsStateError); + }); + }); + + group("once it's closed", () { + test('addStream() throws an error', () { + var sink = StreamSinkAdapter<int>(EventHandler()); + expect(sink.close(), completes); + expect(() => sink.addStream(Stream.value(123)), throwsStateError); + }); + }); + }); +}