Add *SinkBase classes for implementing custom sinks (#188)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 51c067d..102ad59 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,7 @@
-## 2.7.1-dev
+## 2.8.0
 
+* Add `EventSinkBase`, `StreamSinkBase`, and `IOSinkBase` classes to make it
+  easier to implement custom sinks.
 * Improve performance for `ChunkedStreamReader` by creating fewer internal
   sublists and specializing to create views for `Uint8List` chunks.
 
diff --git a/lib/async.dart b/lib/async.dart
index 2170442..10e7fe7 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -27,6 +27,7 @@
 export 'src/result/future.dart';
 export 'src/result/value.dart';
 export 'src/single_subscription_transformer.dart';
+export 'src/sink_base.dart';
 export 'src/stream_closer.dart';
 export 'src/stream_completer.dart';
 export 'src/stream_extensions.dart';
diff --git a/lib/src/sink_base.dart b/lib/src/sink_base.dart
new file mode 100644
index 0000000..873ce82
--- /dev/null
+++ b/lib/src/sink_base.dart
@@ -0,0 +1,168 @@
+// Copyright (c) 2021, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:meta/meta.dart';
+
+import 'async_memoizer.dart';
+
+/// An abstract class that implements [EventSink] in terms of [onData],
+/// [onError], and [onClose] methods.
+///
+/// This takes care of ensuring that events can't be added after [close] is
+/// called.
+abstract class EventSinkBase<T> implements EventSink<T> {
+  /// Whether [close] has been called and no more events may be written.
+  bool get _closed => _closeMemo.hasRun;
+
+  @override
+  void add(T data) {
+    _checkCanAddEvent();
+    onAdd(data);
+  }
+
+  /// A method that handles data events that are passed to the sink.
+  @visibleForOverriding
+  void onAdd(T data);
+
+  @override
+  void addError(Object error, [StackTrace? stackTrace]) {
+    _checkCanAddEvent();
+    onError(error, stackTrace);
+  }
+
+  /// A method that handles error events that are passed to the sink.
+  @visibleForOverriding
+  void onError(Object error, [StackTrace? stackTrace]);
+
+  @override
+  Future<void> close() => _closeMemo.runOnce(onClose);
+  final _closeMemo = AsyncMemoizer<void>();
+
+  /// A method that handles the sink being closed.
+  ///
+  /// This may return a future that completes once the stream sink has shut
+  /// down. If cleaning up can fail, the error may be reported in the returned
+  /// future.
+  @visibleForOverriding
+  FutureOr<void> onClose();
+
+  /// Asserts that the sink is in a state where adding an event is valid.
+  void _checkCanAddEvent() {
+    if (_closed) throw StateError('Cannot add event after closing');
+  }
+}
+
+/// An abstract class that implements [StreamSink] in terms of [onData],
+/// [onError], and [onClose] methods.
+///
+/// This takes care of ensuring that events can't be added after [close] is
+/// called or during a call to [onStream].
+abstract class StreamSinkBase<T> extends EventSinkBase<T>
+    implements StreamSink<T> {
+  /// Whether a call to [addStream] is ongoing.
+  bool _addingStream = false;
+
+  @override
+  Future<void> get done => _closeMemo.future;
+
+  @override
+  Future<void> addStream(Stream<T> stream) {
+    _checkCanAddEvent();
+
+    _addingStream = true;
+    var completer = Completer<void>.sync();
+    stream.listen(onAdd, onError: onError, onDone: () {
+      _addingStream = false;
+      completer.complete();
+    });
+    return completer.future;
+  }
+
+  @override
+  Future<void> close() {
+    if (_addingStream) throw StateError('StreamSink is bound to a stream');
+    return super.close();
+  }
+
+  @override
+  void _checkCanAddEvent() {
+    super._checkCanAddEvent();
+    if (_addingStream) throw StateError('StreamSink is bound to a stream');
+  }
+}
+
+/// An abstract class that implements `dart:io`'s [IOSink]'s API in terms of
+/// [onData], [onError], [onClose], and [onFlush] methods.
+///
+/// Because [IOSink] is defined in `dart:io`, this can't officially implement
+/// it. However, it's designed to match its API exactly so that subclasses can
+/// implement [IOSink] without any additional modifications.
+///
+/// This takes care of ensuring that events can't be added after [close] is
+/// called or during a call to [onStream].
+abstract class IOSinkBase extends StreamSinkBase<List<int>> {
+  /// See [IOSink.encoding] from `dart:io`.
+  Encoding encoding;
+
+  IOSinkBase([this.encoding = utf8]);
+
+  /// See [IOSink.flush] from `dart:io`.
+  ///
+  /// Because this base class doesn't do any buffering of its own, [flush]
+  /// always completes immediately.
+  ///
+  /// Subclasses that do buffer events should override [flush] to complete once
+  /// all events are delivered. They should also call `super.flush()` at the
+  /// beginning of the method to throw a [StateError] if the sink is currently
+  /// adding a stream.
+  Future<void> flush() {
+    if (_addingStream) throw StateError('StreamSink is bound to a stream');
+    if (_closed) return Future.value();
+
+    _addingStream = true;
+    return onFlush().whenComplete(() {
+      _addingStream = false;
+    });
+  }
+
+  /// Flushes any buffered data to the underlying consumer, and returns a future
+  /// that completes once the consumer has accepted all data.
+  @visibleForOverriding
+  Future<void> onFlush();
+
+  /// See [IOSink.write] from `dart:io`.
+  void write(Object? object) {
+    var string = object.toString();
+    if (string.isEmpty) return;
+    add(encoding.encode(string));
+  }
+
+  /// See [IOSink.writeAll] from `dart:io`.
+  void writeAll(Iterable<Object?> objects, [String separator = '']) {
+    var first = true;
+    for (var object in objects) {
+      if (first) {
+        first = false;
+      } else {
+        write(separator);
+      }
+
+      write(object);
+    }
+  }
+
+  /// See [IOSink.writeln] from `dart:io`.
+  void writeln([Object? object = '']) {
+    write(object);
+    write('\n');
+  }
+
+  /// See [IOSink.writeCharCode] from `dart:io`.
+  void writeCharCode(int charCode) {
+    write(String.fromCharCode(charCode));
+  }
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index 79dc278..b0a68a6 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 2.7.1-dev
+version: 2.8.0
 
 description: Utility functions and classes related to the 'dart:async' library.
 repository: https://github.com/dart-lang/async
@@ -12,6 +12,7 @@
   meta: ^1.1.7
 
 dev_dependencies:
+  charcode: ^1.3.0
   fake_async: ^1.2.0
   pedantic: ^1.10.0
   stack_trace: ^1.10.0
diff --git a/test/io_sink_impl.dart b/test/io_sink_impl.dart
new file mode 100644
index 0000000..63fa289
--- /dev/null
+++ b/test/io_sink_impl.dart
@@ -0,0 +1,23 @@
+// Copyright (c) 2021, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:io';
+
+import 'package:async/async.dart';
+
+/// This class isn't used, it's just used to verify that [IOSinkBase] produces a
+/// valid implementation of [IOSink].
+class IOSinkImpl extends IOSinkBase implements IOSink {
+  @override
+  void onAdd(List<int> data) {}
+
+  @override
+  void onError(Object error, [StackTrace? stackTrace]) {}
+
+  @override
+  void onClose() {}
+
+  @override
+  Future<void> onFlush() => Future.value();
+}
diff --git a/test/sink_base_test.dart b/test/sink_base_test.dart
new file mode 100644
index 0000000..da3ca9f
--- /dev/null
+++ b/test/sink_base_test.dart
@@ -0,0 +1,403 @@
+// Copyright (c) 2021, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:charcode/charcode.dart';
+import 'package:test/test.dart';
+
+import 'package:async/async.dart';
+
+void main() {
+  // We don't explicitly test [EventSinkBase] because it shares all the relevant
+  // implementation with [StreamSinkBase].
+  group('StreamSinkBase', () {
+    test('forwards add() to onAdd()', () {
+      var sink = _StreamSink(onAdd: expectAsync1((value) {
+        expect(value, equals(123));
+      }));
+      sink.add(123);
+    });
+
+    test('forwards addError() to onError()', () {
+      var sink = _StreamSink(onError: expectAsync2((error, [stackTrace]) {
+        expect(error, equals('oh no'));
+        expect(stackTrace, isA<StackTrace>());
+      }));
+      sink.addError('oh no', StackTrace.current);
+    });
+
+    test('forwards addStream() to onAdd() and onError()', () {
+      var sink = _StreamSink(
+          onAdd: expectAsync1((value) {
+            expect(value, equals(123));
+          }, count: 1),
+          onError: expectAsync2((error, [stackTrace]) {
+            expect(error, equals('oh no'));
+            expect(stackTrace, isA<StackTrace>());
+          }));
+
+      var controller = StreamController<int>();
+      sink.addStream(controller.stream);
+
+      controller.add(123);
+      controller.addError('oh no', StackTrace.current);
+    });
+
+    test('addStream() returns once the stream closes', () async {
+      var sink = _StreamSink();
+      var controller = StreamController<int>();
+      var addStreamCompleted = false;
+      sink.addStream(controller.stream).then((_) => addStreamCompleted = true);
+      ;
+
+      await pumpEventQueue();
+      expect(addStreamCompleted, isFalse);
+
+      controller.addError('oh no', StackTrace.current);
+      await pumpEventQueue();
+      expect(addStreamCompleted, isFalse);
+
+      controller.close();
+      await pumpEventQueue();
+      expect(addStreamCompleted, isTrue);
+    });
+
+    test('forwards close() to onClose()', () {
+      var sink = _StreamSink(onClose: expectAsync0(() {}));
+      expect(sink.close(), completes);
+    });
+
+    test('onClose() is only invoked once', () {
+      var sink = _StreamSink(onClose: expectAsync0(() {}, count: 1));
+      expect(sink.close(), completes);
+      expect(sink.close(), completes);
+      expect(sink.close(), completes);
+    });
+
+    test('all invocations of close() return the same future', () async {
+      var completer = Completer();
+      var sink = _StreamSink(onClose: expectAsync0(() => completer.future));
+
+      var close1Completed = false;
+      sink.close().then((_) => close1Completed = true);
+
+      var close2Completed = false;
+      sink.close().then((_) => close2Completed = true);
+
+      var doneCompleted = false;
+      sink.done.then((_) => doneCompleted = true);
+
+      await pumpEventQueue();
+      expect(close1Completed, isFalse);
+      expect(close2Completed, isFalse);
+      expect(doneCompleted, isFalse);
+
+      completer.complete();
+      await pumpEventQueue();
+      expect(close1Completed, isTrue);
+      expect(close2Completed, isTrue);
+      expect(doneCompleted, isTrue);
+    });
+
+    test('done returns a future that completes once close() completes',
+        () async {
+      var completer = Completer();
+      var sink = _StreamSink(onClose: expectAsync0(() => completer.future));
+
+      var doneCompleted = false;
+      sink.done.then((_) => doneCompleted = true);
+
+      await pumpEventQueue();
+      expect(doneCompleted, isFalse);
+
+      expect(sink.close(), completes);
+      await pumpEventQueue();
+      expect(doneCompleted, isFalse);
+
+      completer.complete();
+      await pumpEventQueue();
+      expect(doneCompleted, isTrue);
+    });
+
+    group('during addStream()', () {
+      test('add() throws an error', () {
+        var sink = _StreamSink(onAdd: expectAsync1((_) {}, count: 0));
+        sink.addStream(StreamController<int>().stream);
+        expect(() => sink.add(1), throwsStateError);
+      });
+
+      test('addError() throws an error', () {
+        var sink = _StreamSink(onError: expectAsync2((_, [__]) {}, count: 0));
+        sink.addStream(StreamController<int>().stream);
+        expect(() => sink.addError('oh no'), throwsStateError);
+      });
+
+      test('addStream() throws an error', () {
+        var sink = _StreamSink(onAdd: expectAsync1((_) {}, count: 0));
+        sink.addStream(StreamController<int>().stream);
+        expect(() => sink.addStream(Stream.value(123)), throwsStateError);
+      });
+
+      test('close() throws an error', () {
+        var sink = _StreamSink(onClose: expectAsync0(() {}, count: 0));
+        sink.addStream(StreamController<int>().stream);
+        expect(() => sink.close(), throwsStateError);
+      });
+    });
+
+    group("once it's closed", () {
+      test('add() throws an error', () {
+        var sink = _StreamSink(onAdd: expectAsync1((_) {}, count: 0));
+        expect(sink.close(), completes);
+        expect(() => sink.add(1), throwsStateError);
+      });
+
+      test('addError() throws an error', () {
+        var sink = _StreamSink(onError: expectAsync2((_, [__]) {}, count: 0));
+        expect(sink.close(), completes);
+        expect(() => sink.addError('oh no'), throwsStateError);
+      });
+
+      test('addStream() throws an error', () {
+        var sink = _StreamSink(onAdd: expectAsync1((_) {}, count: 0));
+        expect(sink.close(), completes);
+        expect(() => sink.addStream(Stream.value(123)), throwsStateError);
+      });
+    });
+  });
+
+  group('IOSinkBase', () {
+    group('write()', () {
+      test("doesn't call add() for the empty string", () async {
+        var sink = _IOSink(onAdd: expectAsync1((_) {}, count: 0));
+        sink.write('');
+      });
+
+      test('converts the text to data and passes it to add', () async {
+        var sink = _IOSink(onAdd: expectAsync1((data) {
+          expect(data, equals(utf8.encode('hello')));
+        }));
+        sink.write('hello');
+      });
+
+      test('calls Object.toString()', () async {
+        var sink = _IOSink(onAdd: expectAsync1((data) {
+          expect(data, equals(utf8.encode('123')));
+        }));
+        sink.write(123);
+      });
+
+      test('respects the encoding', () async {
+        var sink = _IOSink(
+            onAdd: expectAsync1((data) {
+              expect(data, equals(latin1.encode('Æ')));
+            }),
+            encoding: latin1);
+        sink.write('Æ');
+      });
+
+      test('throws if the sink is closed', () async {
+        var sink = _IOSink(onAdd: expectAsync1((_) {}, count: 0));
+        expect(sink.close(), completes);
+        expect(() => sink.write('hello'), throwsStateError);
+      });
+    });
+
+    group('writeAll()', () {
+      test('writes nothing for an empty iterable', () async {
+        var sink = _IOSink(onAdd: expectAsync1((_) {}, count: 0));
+        sink.writeAll([]);
+      });
+
+      test('writes each object in the iterable', () async {
+        var chunks = <List<int>>[];
+        var sink = _IOSink(
+            onAdd: expectAsync1((data) {
+          chunks.add(data);
+        }, count: 3));
+
+        sink.writeAll(['hello', null, 123]);
+        expect(chunks, equals(['hello', 'null', '123'].map(utf8.encode)));
+      });
+
+      test('writes separators between each object', () async {
+        var chunks = <List<int>>[];
+        var sink = _IOSink(
+            onAdd: expectAsync1((data) {
+          chunks.add(data);
+        }, count: 5));
+
+        sink.writeAll(['hello', null, 123], '/');
+        expect(chunks,
+            equals(['hello', '/', 'null', '/', '123'].map(utf8.encode)));
+      });
+
+      test('throws if the sink is closed', () async {
+        var sink = _IOSink(onAdd: expectAsync1((_) {}, count: 0));
+        expect(sink.close(), completes);
+        expect(() => sink.writeAll(['hello']), throwsStateError);
+      });
+    });
+
+    group('writeln()', () {
+      test('only writes a newline by default', () async {
+        var sink = _IOSink(
+            onAdd: expectAsync1((data) {
+          expect(data, equals(utf8.encode('\n')));
+        }, count: 1));
+        sink.writeln();
+      });
+
+      test('writes the object followed by a newline', () async {
+        var chunks = <List<int>>[];
+        var sink = _IOSink(
+            onAdd: expectAsync1((data) {
+          chunks.add(data);
+        }, count: 2));
+        sink.writeln(123);
+
+        expect(chunks, equals(['123', '\n'].map(utf8.encode)));
+      });
+
+      test('throws if the sink is closed', () async {
+        var sink = _IOSink(onAdd: expectAsync1((_) {}, count: 0));
+        expect(sink.close(), completes);
+        expect(() => sink.writeln(), throwsStateError);
+      });
+    });
+
+    group('writeCharCode()', () {
+      test('writes the character code', () async {
+        var sink = _IOSink(onAdd: expectAsync1((data) {
+          expect(data, equals(utf8.encode('A')));
+        }));
+        sink.writeCharCode($A);
+      });
+
+      test('respects the encoding', () async {
+        var sink = _IOSink(
+            onAdd: expectAsync1((data) {
+              expect(data, equals(latin1.encode('Æ')));
+            }),
+            encoding: latin1);
+        sink.writeCharCode('Æ'.runes.first);
+      });
+
+      test('throws if the sink is closed', () async {
+        var sink = _IOSink(onAdd: expectAsync1((_) {}, count: 0));
+        expect(sink.close(), completes);
+        expect(() => sink.writeCharCode($A), throwsStateError);
+      });
+    });
+
+    group('flush()', () {
+      test('returns a future that completes when onFlush() is done', () async {
+        var completer = Completer();
+        var sink = _IOSink(onFlush: expectAsync0(() => completer.future));
+
+        var flushDone = false;
+        sink.flush().then((_) => flushDone = true);
+
+        await pumpEventQueue();
+        expect(flushDone, isFalse);
+
+        completer.complete();
+        await pumpEventQueue();
+        expect(flushDone, isTrue);
+      });
+
+      test('does nothing after close() is called', () {
+        var sink =
+            _IOSink(onFlush: expectAsync0(() => Future.value(), count: 0));
+        expect(sink.close(), completes);
+        expect(sink.flush(), completes);
+      });
+
+      test("can't be called during addStream()", () {
+        var sink =
+            _IOSink(onFlush: expectAsync0(() => Future.value(), count: 0));
+        sink.addStream(StreamController<List<int>>().stream);
+        expect(() => sink.flush(), throwsStateError);
+      });
+
+      test('locks the sink as though a stream was being added', () {
+        var sink = _IOSink(onFlush: expectAsync0(() => Completer().future));
+        sink.flush();
+        expect(() => sink.add([0]), throwsStateError);
+        expect(() => sink.addError('oh no'), throwsStateError);
+        expect(() => sink.addStream(Stream.empty()), throwsStateError);
+        expect(() => sink.flush(), throwsStateError);
+        expect(() => sink.close(), throwsStateError);
+      });
+    });
+  });
+}
+
+/// A subclass of [StreamSinkBase] that takes all the overridable methods as
+/// callbacks, for ease of testing.
+class _StreamSink extends StreamSinkBase<int> {
+  final void Function(int value) _onAdd;
+  final void Function(Object error, [StackTrace? stackTrace]) _onError;
+  final FutureOr<void> Function() _onClose;
+
+  _StreamSink(
+      {void Function(int value)? onAdd,
+      void Function(Object error, [StackTrace? stackTrace])? onError,
+      FutureOr<void> Function()? onClose})
+      : _onAdd = onAdd ?? ((_) {}),
+        _onError = onError ?? ((_, [__]) {}),
+        _onClose = onClose ?? (() {});
+
+  @override
+  void onAdd(int value) {
+    _onAdd(value);
+  }
+
+  @override
+  void onError(Object error, [StackTrace? stackTrace]) {
+    _onError(error, stackTrace);
+  }
+
+  @override
+  FutureOr<void> onClose() => _onClose();
+}
+
+/// A subclass of [IOSinkBase] that takes all the overridable methods as
+/// callbacks, for ease of testing.
+class _IOSink extends IOSinkBase {
+  final void Function(List<int> value) _onAdd;
+  final void Function(Object error, [StackTrace? stackTrace]) _onError;
+  final FutureOr<void> Function() _onClose;
+  final Future<void> Function() _onFlush;
+
+  _IOSink(
+      {void Function(List<int> value)? onAdd,
+      void Function(Object error, [StackTrace? stackTrace])? onError,
+      FutureOr<void> Function()? onClose,
+      Future<void> Function()? onFlush,
+      Encoding encoding = utf8})
+      : _onAdd = onAdd ?? ((_) {}),
+        _onError = onError ?? ((_, [__]) {}),
+        _onClose = onClose ?? (() {}),
+        _onFlush = onFlush ?? (() => Future.value()),
+        super(encoding);
+
+  @override
+  void onAdd(List<int> value) {
+    _onAdd(value);
+  }
+
+  @override
+  void onError(Object error, [StackTrace? stackTrace]) {
+    _onError(error, stackTrace);
+  }
+
+  @override
+  FutureOr<void> onClose() => _onClose();
+
+  @override
+  Future<void> onFlush() => _onFlush();
+}