Add files.
diff --git a/lib/src/sink_adapter.dart b/lib/src/sink_adapter.dart
new file mode 100644
index 0000000..c9cb6ec
--- /dev/null
+++ b/lib/src/sink_adapter.dart
@@ -0,0 +1,211 @@
+// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+/// An [EventSink] which forwards event sink calls to an [EventHandler].
+///
+/// The [close] method rof [EventHandler.onDone] can return a future,
+/// but [EventSink.close] doesn't.
+/// Code expecting an [EventSink] won't check for such a future,
+/// and an [EventHandler] used with an [EventSinkAdapter] should never
+/// return a future.
+class EventSinkAdapter<T> implements EventSink<T> {
+ final EventHandler<T> _handler;
+ bool _closed = false;
+
+ /// Creates an event sink forwarding events to [handler].
+ EventSinkAdapter(EventHandler<T> handler) : _handler = handler;
+
+ @override
+ void add(T event) {
+ _checkCanAdd();
+ _handler.onData(event);
+ }
+
+ @override
+ void addError(Object error, [StackTrace? stackTrace]) {
+ _checkCanAdd();
+ _handler.onError(error, stackTrace ?? _defaultStack(error));
+ }
+
+ @override
+ void close() {
+ if (!_closed) {
+ _closed = true;
+ _handler.onDone();
+ }
+ }
+
+ void _checkCanAdd() {
+ if (_closed) {
+ throw StateError('Cannot add event after closing');
+ }
+ }
+}
+
+/// A [StreamSink] which forwards stream sink calls to an [EventHandler].
+class StreamSinkAdapter<T> implements StreamSink<T> {
+ final EventHandler<T> _handler;
+ final Completer<void> _doneFuture = Completer();
+ bool _addingStream = false;
+
+ /// Creates a stream sink which forwards events to [handler].
+ StreamSinkAdapter(EventHandler<T> handler) : _handler = handler;
+
+ @override
+ Future<void> addStream(Stream<T> stream) async {
+ _checkCanAdd();
+ var completer = Completer<void>.sync();
+ _addingStream = true;
+ stream.listen(_handler.onData, onError: _handler.onError, onDone: () {
+ _addingStream = false;
+ completer.complete(null);
+ });
+ return completer.future;
+ }
+
+ void _checkCanAdd() {
+ if (_addingStream) {
+ throw StateError('Cannot add event while adding a stream');
+ }
+ if (_doneFuture.isCompleted) {
+ throw StateError('Cannot add event after closing');
+ }
+ }
+
+ @override
+ Future<void> close() {
+ if (_addingStream) {
+ throw StateError('Cannot add event while adding a stream');
+ }
+ if (!_doneFuture.isCompleted) _doneFuture.complete(_handler.onDone());
+ return _doneFuture.future;
+ }
+
+ @override
+ Future get done => _doneFuture.future;
+
+ @override
+ void add(T event) {
+ _checkCanAdd();
+ _handler.onData(event);
+ }
+
+ @override
+ void addError(Object error, [StackTrace? stackTrace]) {
+ _checkCanAdd();
+ _handler.onError(error, stackTrace ?? _defaultStack(error));
+ }
+}
+
+/// Helper function to ensure a stack trace is available.
+///
+/// If [error] is an [Error], its [Error.stackTrace] is used.
+/// If [error] is not an [Error], or its [Error.stackTrace] is `null`,
+/// the [StackTrace.current] trace is used instead.
+StackTrace _defaultStack(Object error) =>
+ (error is Error ? error.stackTrace : null) ?? StackTrace.current;
+
+/// Generalized unified callback handler for events from a stream.
+///
+/// A subclass can extend this class an override any of the
+/// [onData], [onError] or [onDone] methods.
+/// The default implementation simply ignores the events.
+class EventHandler<T> {
+ EventHandler();
+
+ /// Creates an event handler from the provided callbacks.
+ ///
+ /// Any callback not provided means that the corresponding event
+ /// has no effect.
+ factory EventHandler.fromCallbacks(
+ {void Function(T)? onData,
+ void Function(Object, StackTrace)? onError,
+ Future<void>? Function()? onDone}) = CallbackEventHandler<T>;
+
+ /// Called for data events.
+ void onData(T value) {}
+
+ /// Called for error events.
+ void onError(Object error, StackTrace stackTrace) {}
+
+ /// Called for the done event.
+ Future<void>? onDone() {
+ return null;
+ }
+}
+
+/// An event handler which forwards events to provided callback functions.
+class CallbackEventHandler<T> implements EventHandler<T> {
+ final void Function(T)? _onData;
+ final void Function(Object, StackTrace)? _onError;
+ final Future<void>? Function()? _onDone;
+
+ /// Creates an event handler forwarding to the provided callback fcunctions.
+ CallbackEventHandler(
+ {void Function(T)? onData,
+ void Function(Object, StackTrace)? onError,
+ Future<void>? Function()? onDone})
+ : _onData = onData,
+ _onError = onError,
+ _onDone = onDone;
+
+ @override
+ void onData(T value) {
+ _onData?.call(value);
+ }
+
+ @override
+ Future<void>? onDone() => _onDone?.call();
+
+ @override
+ void onError(Object error, StackTrace stackTrace) {
+ _onError?.call(error, stackTrace);
+ }
+}
+
+/// An [EventHandler] where the function called for events can be changed.
+///
+/// This class is not zone aware. The functions stored in [dataHandler],
+/// [errorHandler] and [doneHandler] are called in the current zone
+/// at the time when [onData], [onError] or [onDone] is called.
+class MutableCallbackEventHandler<T> implements EventHandler<T> {
+ /// The data handler called by [onData], if any.
+ ///
+ /// When set to `null`, data events have no effect.
+ void Function(T)? dataHandler;
+
+ /// The error handler called by [onError], if any.
+ ///
+ /// When set to `null`, error events have no effect.
+ void Function(Object, StackTrace)? errorHandler;
+
+ /// The done handler called by [onDone], if any.
+ ///
+ /// When set to `null`, done events have no effect.
+ Future<void>? Function()? doneHandler;
+
+ /// Creates a mutable event handler initialized with any provided callbacks.
+ MutableCallbackEventHandler(
+ {void Function(T)? onData,
+ void Function(Object, StackTrace)? onError,
+ Future<void>? Function()? onDone})
+ : dataHandler = onData,
+ errorHandler = onError,
+ doneHandler = onDone;
+
+ @override
+ void onData(T value) {
+ dataHandler?.call(value);
+ }
+
+ @override
+ void onError(Object error, StackTrace stackTrace) {
+ errorHandler?.call(error, stackTrace);
+ }
+
+ @override
+ Future<void>? onDone() => doneHandler?.call();
+}
diff --git a/test/sink_adapter_test.dart b/test/sink_adapter_test.dart
new file mode 100644
index 0000000..9c10170
--- /dev/null
+++ b/test/sink_adapter_test.dart
@@ -0,0 +1,258 @@
+// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+@deprecated
+library sink_base_test;
+
+import 'dart:async';
+
+import 'package:test/test.dart';
+
+import 'package:async/async.dart';
+
+void main() {
+ group('CallbackEventHandler', () {
+ test('no callbacks', () {
+ // Nothing happens, but doesn't crash.
+ var handler = CallbackEventHandler<int>();
+ handler.onData(0);
+ handler.onError('error', StackTrace.current);
+ handler.onDone();
+ });
+ test('callbacks', () {
+ var list = [];
+ var handler = CallbackEventHandler<int>(onData: (o) {
+ list.add('data:$o');
+ }, onError: (e, s) {
+ list.add('error:$e');
+ }, onDone: () {
+ list.add('done');
+ });
+ handler.onData(0);
+ handler.onError('e1', StackTrace.current);
+ handler.onData(2);
+ handler.onError('e3', StackTrace.current);
+ handler.onDone();
+ expect(list, ['data:0', 'error:e1', 'data:2', 'error:e3', 'done']);
+ });
+ });
+
+ group('MutableCallbackEventHandler', () {
+ test('no initial callbacks', () {
+ // Nothing happens, but doesn't crash.
+ var list = [];
+ var handler = MutableCallbackEventHandler<int>();
+ handler.onData(0);
+ handler.onError('e1', StackTrace.current);
+ handler.onDone();
+ expect(list, []);
+ handler.dataHandler = (o) {
+ list.add('data:$o');
+ };
+ handler.errorHandler = (e, s) {
+ list.add('error:$e');
+ };
+ handler.doneHandler = () {
+ list.add('done');
+ };
+ handler.onData(0);
+ handler.onError('e1', StackTrace.current);
+ handler.onDone();
+ expect(list, ['data:0', 'error:e1', 'done']);
+ });
+
+ test('initial callbacks', () {
+ var list = [];
+ var list2 = [];
+ var handler = MutableCallbackEventHandler<int>(onData: (o) {
+ list.add('data:$o');
+ }, onError: (e, s) {
+ list.add('error:$e');
+ }, onDone: () {
+ list.add('done');
+ });
+ handler.onData(0);
+ handler.onError('e1', StackTrace.current);
+ handler.onData(2);
+ handler.onError('e3', StackTrace.current);
+ handler.onDone();
+ expect(list, ['data:0', 'error:e1', 'data:2', 'error:e3', 'done']);
+ expect(list2, []);
+ handler.dataHandler = (o) {
+ list2.add('data:$o');
+ };
+ handler.errorHandler = (e, s) {
+ list2.add('error:$e');
+ };
+ handler.doneHandler = () {
+ list2.add('done');
+ };
+ handler.onData(0);
+ handler.onError('e1', StackTrace.current);
+ handler.onDone();
+ expect(list, ['data:0', 'error:e1', 'data:2', 'error:e3', 'done']);
+ expect(list2, ['data:0', 'error:e1', 'done']);
+ });
+ });
+
+ // We don't explicitly test [EventSinkBase] because it shares all the relevant
+ // implementation with [StreamSinkBase].
+ void testAdapter(EventSink<T> Function<T>(EventHandler<T>) create) {
+ test('forwards add() to onAdd()', () {
+ Object? value;
+ var sink = create<int>(CallbackEventHandler(onError: (error, stack) {
+ expect(stack, isA<StackTrace>());
+ value = error;
+ }));
+ sink.addError('error1', StackTrace.current);
+ expect(value, 'error1');
+ sink.addError('error2');
+ expect(value, 'error2');
+ });
+
+ test('forwards close() to onClose()', () {
+ var closed = false;
+ var sink = create<int>(CallbackEventHandler(onDone: () {
+ closed = true;
+ return null;
+ }));
+ sink.close();
+ expect(closed, true);
+ });
+
+ test('onClose() is only invoked once', () {
+ var closed = 0;
+ var sink = create<int>(CallbackEventHandler(onDone: () {
+ closed++;
+ }));
+ expect(closed, 0);
+ sink.close();
+ expect(closed, 1);
+ sink.close();
+ sink.close();
+ sink.close();
+ expect(closed, 1);
+ });
+
+ group("once it's closed", () {
+ test('add() throws an error', () {
+ var sink = create<int>(EventHandler());
+ sink.close();
+ expect(() => sink.add(1), throwsStateError);
+ });
+
+ test('addError() throws an error', () {
+ var sink = create<int>(EventHandler());
+ sink.close();
+ expect(() => sink.addError('error'), throwsStateError);
+ });
+ });
+ }
+
+ group('EventSinkAdapter', () {
+ testAdapter(<T>(handler) => EventSinkAdapter<T>(handler));
+ });
+ group('StreamSinkAdapter', () {
+ testAdapter(<T>(handler) => StreamSinkAdapter<T>(handler));
+ test('forwards addStream() to onAdd() and onError()', () {
+ var list = [];
+ var sink = StreamSinkAdapter<int>(CallbackEventHandler(onData: (value) {
+ list.add('data:$value');
+ }, onError: (error, stackTrace) {
+ list.add('error:$error');
+ expect(stackTrace, isA<StackTrace>());
+ }));
+ var controller = StreamController<int>(sync: true);
+ sink.addStream(controller.stream);
+
+ controller.add(123);
+ controller.addError('error1', StackTrace.current);
+ controller.add(456);
+ expect(list, ['data:123', 'error:error1', 'data:456']);
+ });
+
+ test('addStream() returns once the stream closes', () async {
+ var sink = StreamSinkAdapter<int>(EventHandler()); // Ignores all events.
+ var controller = StreamController<int>();
+ var addStreamCompleted = false;
+ var addStreamTask = sink.addStream(controller.stream);
+ addStreamTask.whenComplete(() {
+ addStreamCompleted = true;
+ });
+ await pumpEventQueue();
+
+ controller.addError('error', StackTrace.current);
+ await pumpEventQueue();
+ expect(addStreamCompleted, false);
+
+ // Will close the stream and complete the future as a microtask.
+ controller.close();
+ await pumpEventQueue();
+ expect(addStreamCompleted, true);
+ });
+
+ test('all invocations of close() return the same future', () async {
+ var completer = Completer();
+ var sink = StreamSinkAdapter<int>(
+ CallbackEventHandler(onDone: () => completer.future));
+ var close1 = sink.close();
+ var close2 = sink.close();
+ expect(close1, same(close2));
+ });
+
+ test('done returns the same future as close()', () async {
+ var completer = Completer<void>();
+ var sink = StreamSinkAdapter<int>(
+ CallbackEventHandler(onDone: () => completer.future));
+
+ var done = sink.done;
+ var close = sink.close();
+ expect(done, same(close));
+
+ var doneCompleted = false;
+ done.whenComplete(() => doneCompleted = true);
+
+ await pumpEventQueue();
+ expect(doneCompleted, false);
+
+ completer.complete();
+ await pumpEventQueue();
+ expect(doneCompleted, true);
+ });
+
+ group('during addStream()', () {
+ test('add() throws an error', () {
+ var sink = StreamSinkAdapter<int>(EventHandler());
+ sink.addStream(StreamController<int>().stream);
+ expect(() => sink.add(1), throwsStateError);
+ });
+
+ test('addError() throws an error', () {
+ var sink = StreamSinkAdapter<int>(EventHandler());
+ sink.addStream(StreamController<int>().stream);
+ expect(() => sink.addError('error'), throwsStateError);
+ });
+
+ test('addStream() throws an error', () {
+ var sink = StreamSinkAdapter<int>(EventHandler());
+ sink.addStream(StreamController<int>().stream);
+ expect(() => sink.addStream(Stream.value(123)), throwsStateError);
+ });
+
+ test('close() throws an error', () {
+ var sink = StreamSinkAdapter<int>(EventHandler());
+ sink.addStream(StreamController<int>().stream);
+ expect(() => sink.close(), throwsStateError);
+ });
+ });
+
+ group("once it's closed", () {
+ test('addStream() throws an error', () {
+ var sink = StreamSinkAdapter<int>(EventHandler());
+ expect(sink.close(), completes);
+ expect(() => sink.addStream(Stream.value(123)), throwsStateError);
+ });
+ });
+ });
+}