Add files.
diff --git a/lib/src/sink_adapter.dart b/lib/src/sink_adapter.dart
new file mode 100644
index 0000000..c9cb6ec
--- /dev/null
+++ b/lib/src/sink_adapter.dart
@@ -0,0 +1,211 @@
+// Copyright (c) 2021, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+/// An [EventSink] which forwards event sink calls to an [EventHandler].
+///
+/// The [close] method rof [EventHandler.onDone] can return a future,
+/// but [EventSink.close] doesn't.
+/// Code expecting an [EventSink] won't check for such a future,
+/// and an [EventHandler] used with an [EventSinkAdapter] should never
+/// return a future.
+class EventSinkAdapter<T> implements EventSink<T> {
+  final EventHandler<T> _handler;
+  bool _closed = false;
+
+  /// Creates an event sink forwarding events to [handler].
+  EventSinkAdapter(EventHandler<T> handler) : _handler = handler;
+
+  @override
+  void add(T event) {
+    _checkCanAdd();
+    _handler.onData(event);
+  }
+
+  @override
+  void addError(Object error, [StackTrace? stackTrace]) {
+    _checkCanAdd();
+    _handler.onError(error, stackTrace ?? _defaultStack(error));
+  }
+
+  @override
+  void close() {
+    if (!_closed) {
+      _closed = true;
+      _handler.onDone();
+    }
+  }
+
+  void _checkCanAdd() {
+    if (_closed) {
+      throw StateError('Cannot add event after closing');
+    }
+  }
+}
+
+/// A [StreamSink] which forwards stream sink calls to an [EventHandler].
+class StreamSinkAdapter<T> implements StreamSink<T> {
+  final EventHandler<T> _handler;
+  final Completer<void> _doneFuture = Completer();
+  bool _addingStream = false;
+
+  /// Creates a stream sink which forwards events to [handler].
+  StreamSinkAdapter(EventHandler<T> handler) : _handler = handler;
+
+  @override
+  Future<void> addStream(Stream<T> stream) async {
+    _checkCanAdd();
+    var completer = Completer<void>.sync();
+    _addingStream = true;
+    stream.listen(_handler.onData, onError: _handler.onError, onDone: () {
+      _addingStream = false;
+      completer.complete(null);
+    });
+    return completer.future;
+  }
+
+  void _checkCanAdd() {
+    if (_addingStream) {
+      throw StateError('Cannot add event while adding a stream');
+    }
+    if (_doneFuture.isCompleted) {
+      throw StateError('Cannot add event after closing');
+    }
+  }
+
+  @override
+  Future<void> close() {
+    if (_addingStream) {
+      throw StateError('Cannot add event while adding a stream');
+    }
+    if (!_doneFuture.isCompleted) _doneFuture.complete(_handler.onDone());
+    return _doneFuture.future;
+  }
+
+  @override
+  Future get done => _doneFuture.future;
+
+  @override
+  void add(T event) {
+    _checkCanAdd();
+    _handler.onData(event);
+  }
+
+  @override
+  void addError(Object error, [StackTrace? stackTrace]) {
+    _checkCanAdd();
+    _handler.onError(error, stackTrace ?? _defaultStack(error));
+  }
+}
+
+/// Helper function to ensure a stack trace is available.
+///
+/// If [error] is an [Error], its [Error.stackTrace] is used.
+/// If [error] is not an [Error], or its [Error.stackTrace] is `null`,
+/// the [StackTrace.current] trace is used instead.
+StackTrace _defaultStack(Object error) =>
+    (error is Error ? error.stackTrace : null) ?? StackTrace.current;
+
+/// Generalized unified callback handler for events from a stream.
+///
+/// A subclass can extend this class an override any of the
+/// [onData], [onError] or [onDone] methods.
+/// The default implementation simply ignores the events.
+class EventHandler<T> {
+  EventHandler();
+
+  /// Creates an event handler from the provided callbacks.
+  ///
+  /// Any callback not provided means that the corresponding event
+  /// has no effect.
+  factory EventHandler.fromCallbacks(
+      {void Function(T)? onData,
+      void Function(Object, StackTrace)? onError,
+      Future<void>? Function()? onDone}) = CallbackEventHandler<T>;
+
+  /// Called for data events.
+  void onData(T value) {}
+
+  /// Called for error events.
+  void onError(Object error, StackTrace stackTrace) {}
+
+  /// Called for the done event.
+  Future<void>? onDone() {
+    return null;
+  }
+}
+
+/// An event handler which forwards events to provided callback functions.
+class CallbackEventHandler<T> implements EventHandler<T> {
+  final void Function(T)? _onData;
+  final void Function(Object, StackTrace)? _onError;
+  final Future<void>? Function()? _onDone;
+
+  /// Creates an event handler forwarding to the provided callback fcunctions.
+  CallbackEventHandler(
+      {void Function(T)? onData,
+      void Function(Object, StackTrace)? onError,
+      Future<void>? Function()? onDone})
+      : _onData = onData,
+        _onError = onError,
+        _onDone = onDone;
+
+  @override
+  void onData(T value) {
+    _onData?.call(value);
+  }
+
+  @override
+  Future<void>? onDone() => _onDone?.call();
+
+  @override
+  void onError(Object error, StackTrace stackTrace) {
+    _onError?.call(error, stackTrace);
+  }
+}
+
+/// An [EventHandler] where the function called for events can be changed.
+///
+/// This class is not zone aware. The functions stored in [dataHandler],
+/// [errorHandler] and [doneHandler] are called in the current zone
+/// at the time when [onData], [onError] or [onDone] is called.
+class MutableCallbackEventHandler<T> implements EventHandler<T> {
+  /// The data handler called by [onData], if any.
+  ///
+  /// When set to `null`, data events have no effect.
+  void Function(T)? dataHandler;
+
+  /// The error handler called by [onError], if any.
+  ///
+  /// When set to `null`, error events have no effect.
+  void Function(Object, StackTrace)? errorHandler;
+
+  /// The done handler called by [onDone], if any.
+  ///
+  /// When set to `null`, done events have no effect.
+  Future<void>? Function()? doneHandler;
+
+  /// Creates a mutable event handler initialized with any provided callbacks.
+  MutableCallbackEventHandler(
+      {void Function(T)? onData,
+      void Function(Object, StackTrace)? onError,
+      Future<void>? Function()? onDone})
+      : dataHandler = onData,
+        errorHandler = onError,
+        doneHandler = onDone;
+
+  @override
+  void onData(T value) {
+    dataHandler?.call(value);
+  }
+
+  @override
+  void onError(Object error, StackTrace stackTrace) {
+    errorHandler?.call(error, stackTrace);
+  }
+
+  @override
+  Future<void>? onDone() => doneHandler?.call();
+}
diff --git a/test/sink_adapter_test.dart b/test/sink_adapter_test.dart
new file mode 100644
index 0000000..9c10170
--- /dev/null
+++ b/test/sink_adapter_test.dart
@@ -0,0 +1,258 @@
+// Copyright (c) 2021, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+@deprecated
+library sink_base_test;
+
+import 'dart:async';
+
+import 'package:test/test.dart';
+
+import 'package:async/async.dart';
+
+void main() {
+  group('CallbackEventHandler', () {
+    test('no callbacks', () {
+      // Nothing happens, but doesn't crash.
+      var handler = CallbackEventHandler<int>();
+      handler.onData(0);
+      handler.onError('error', StackTrace.current);
+      handler.onDone();
+    });
+    test('callbacks', () {
+      var list = [];
+      var handler = CallbackEventHandler<int>(onData: (o) {
+        list.add('data:$o');
+      }, onError: (e, s) {
+        list.add('error:$e');
+      }, onDone: () {
+        list.add('done');
+      });
+      handler.onData(0);
+      handler.onError('e1', StackTrace.current);
+      handler.onData(2);
+      handler.onError('e3', StackTrace.current);
+      handler.onDone();
+      expect(list, ['data:0', 'error:e1', 'data:2', 'error:e3', 'done']);
+    });
+  });
+
+  group('MutableCallbackEventHandler', () {
+    test('no initial callbacks', () {
+      // Nothing happens, but doesn't crash.
+      var list = [];
+      var handler = MutableCallbackEventHandler<int>();
+      handler.onData(0);
+      handler.onError('e1', StackTrace.current);
+      handler.onDone();
+      expect(list, []);
+      handler.dataHandler = (o) {
+        list.add('data:$o');
+      };
+      handler.errorHandler = (e, s) {
+        list.add('error:$e');
+      };
+      handler.doneHandler = () {
+        list.add('done');
+      };
+      handler.onData(0);
+      handler.onError('e1', StackTrace.current);
+      handler.onDone();
+      expect(list, ['data:0', 'error:e1', 'done']);
+    });
+
+    test('initial callbacks', () {
+      var list = [];
+      var list2 = [];
+      var handler = MutableCallbackEventHandler<int>(onData: (o) {
+        list.add('data:$o');
+      }, onError: (e, s) {
+        list.add('error:$e');
+      }, onDone: () {
+        list.add('done');
+      });
+      handler.onData(0);
+      handler.onError('e1', StackTrace.current);
+      handler.onData(2);
+      handler.onError('e3', StackTrace.current);
+      handler.onDone();
+      expect(list, ['data:0', 'error:e1', 'data:2', 'error:e3', 'done']);
+      expect(list2, []);
+      handler.dataHandler = (o) {
+        list2.add('data:$o');
+      };
+      handler.errorHandler = (e, s) {
+        list2.add('error:$e');
+      };
+      handler.doneHandler = () {
+        list2.add('done');
+      };
+      handler.onData(0);
+      handler.onError('e1', StackTrace.current);
+      handler.onDone();
+      expect(list, ['data:0', 'error:e1', 'data:2', 'error:e3', 'done']);
+      expect(list2, ['data:0', 'error:e1', 'done']);
+    });
+  });
+
+  // We don't explicitly test [EventSinkBase] because it shares all the relevant
+  // implementation with [StreamSinkBase].
+  void testAdapter(EventSink<T> Function<T>(EventHandler<T>) create) {
+    test('forwards add() to onAdd()', () {
+      Object? value;
+      var sink = create<int>(CallbackEventHandler(onError: (error, stack) {
+        expect(stack, isA<StackTrace>());
+        value = error;
+      }));
+      sink.addError('error1', StackTrace.current);
+      expect(value, 'error1');
+      sink.addError('error2');
+      expect(value, 'error2');
+    });
+
+    test('forwards close() to onClose()', () {
+      var closed = false;
+      var sink = create<int>(CallbackEventHandler(onDone: () {
+        closed = true;
+        return null;
+      }));
+      sink.close();
+      expect(closed, true);
+    });
+
+    test('onClose() is only invoked once', () {
+      var closed = 0;
+      var sink = create<int>(CallbackEventHandler(onDone: () {
+        closed++;
+      }));
+      expect(closed, 0);
+      sink.close();
+      expect(closed, 1);
+      sink.close();
+      sink.close();
+      sink.close();
+      expect(closed, 1);
+    });
+
+    group("once it's closed", () {
+      test('add() throws an error', () {
+        var sink = create<int>(EventHandler());
+        sink.close();
+        expect(() => sink.add(1), throwsStateError);
+      });
+
+      test('addError() throws an error', () {
+        var sink = create<int>(EventHandler());
+        sink.close();
+        expect(() => sink.addError('error'), throwsStateError);
+      });
+    });
+  }
+
+  group('EventSinkAdapter', () {
+    testAdapter(<T>(handler) => EventSinkAdapter<T>(handler));
+  });
+  group('StreamSinkAdapter', () {
+    testAdapter(<T>(handler) => StreamSinkAdapter<T>(handler));
+    test('forwards addStream() to onAdd() and onError()', () {
+      var list = [];
+      var sink = StreamSinkAdapter<int>(CallbackEventHandler(onData: (value) {
+        list.add('data:$value');
+      }, onError: (error, stackTrace) {
+        list.add('error:$error');
+        expect(stackTrace, isA<StackTrace>());
+      }));
+      var controller = StreamController<int>(sync: true);
+      sink.addStream(controller.stream);
+
+      controller.add(123);
+      controller.addError('error1', StackTrace.current);
+      controller.add(456);
+      expect(list, ['data:123', 'error:error1', 'data:456']);
+    });
+
+    test('addStream() returns once the stream closes', () async {
+      var sink = StreamSinkAdapter<int>(EventHandler()); // Ignores all events.
+      var controller = StreamController<int>();
+      var addStreamCompleted = false;
+      var addStreamTask = sink.addStream(controller.stream);
+      addStreamTask.whenComplete(() {
+        addStreamCompleted = true;
+      });
+      await pumpEventQueue();
+
+      controller.addError('error', StackTrace.current);
+      await pumpEventQueue();
+      expect(addStreamCompleted, false);
+
+      // Will close the stream and complete the future as a microtask.
+      controller.close();
+      await pumpEventQueue();
+      expect(addStreamCompleted, true);
+    });
+
+    test('all invocations of close() return the same future', () async {
+      var completer = Completer();
+      var sink = StreamSinkAdapter<int>(
+          CallbackEventHandler(onDone: () => completer.future));
+      var close1 = sink.close();
+      var close2 = sink.close();
+      expect(close1, same(close2));
+    });
+
+    test('done returns the same future as close()', () async {
+      var completer = Completer<void>();
+      var sink = StreamSinkAdapter<int>(
+          CallbackEventHandler(onDone: () => completer.future));
+
+      var done = sink.done;
+      var close = sink.close();
+      expect(done, same(close));
+
+      var doneCompleted = false;
+      done.whenComplete(() => doneCompleted = true);
+
+      await pumpEventQueue();
+      expect(doneCompleted, false);
+
+      completer.complete();
+      await pumpEventQueue();
+      expect(doneCompleted, true);
+    });
+
+    group('during addStream()', () {
+      test('add() throws an error', () {
+        var sink = StreamSinkAdapter<int>(EventHandler());
+        sink.addStream(StreamController<int>().stream);
+        expect(() => sink.add(1), throwsStateError);
+      });
+
+      test('addError() throws an error', () {
+        var sink = StreamSinkAdapter<int>(EventHandler());
+        sink.addStream(StreamController<int>().stream);
+        expect(() => sink.addError('error'), throwsStateError);
+      });
+
+      test('addStream() throws an error', () {
+        var sink = StreamSinkAdapter<int>(EventHandler());
+        sink.addStream(StreamController<int>().stream);
+        expect(() => sink.addStream(Stream.value(123)), throwsStateError);
+      });
+
+      test('close() throws an error', () {
+        var sink = StreamSinkAdapter<int>(EventHandler());
+        sink.addStream(StreamController<int>().stream);
+        expect(() => sink.close(), throwsStateError);
+      });
+    });
+
+    group("once it's closed", () {
+      test('addStream() throws an error', () {
+        var sink = StreamSinkAdapter<int>(EventHandler());
+        expect(sink.close(), completes);
+        expect(() => sink.addStream(Stream.value(123)), throwsStateError);
+      });
+    });
+  });
+}