Add *SinkBase classes for implementing custom sinks (#188)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 51c067d..102ad59 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,7 @@
-## 2.7.1-dev
+## 2.8.0
+* Add `EventSinkBase`, `StreamSinkBase`, and `IOSinkBase` classes to make it
+ easier to implement custom sinks.
* Improve performance for `ChunkedStreamReader` by creating fewer internal
sublists and specializing to create views for `Uint8List` chunks.
diff --git a/lib/async.dart b/lib/async.dart
index 2170442..10e7fe7 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -27,6 +27,7 @@
export 'src/result/future.dart';
export 'src/result/value.dart';
export 'src/single_subscription_transformer.dart';
+export 'src/sink_base.dart';
export 'src/stream_closer.dart';
export 'src/stream_completer.dart';
export 'src/stream_extensions.dart';
diff --git a/lib/src/sink_base.dart b/lib/src/sink_base.dart
new file mode 100644
index 0000000..873ce82
--- /dev/null
+++ b/lib/src/sink_base.dart
@@ -0,0 +1,168 @@
+// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:meta/meta.dart';
+
+import 'async_memoizer.dart';
+
+/// An abstract class that implements [EventSink] in terms of [onData],
+/// [onError], and [onClose] methods.
+///
+/// This takes care of ensuring that events can't be added after [close] is
+/// called.
+abstract class EventSinkBase<T> implements EventSink<T> {
+ /// Whether [close] has been called and no more events may be written.
+ bool get _closed => _closeMemo.hasRun;
+
+ @override
+ void add(T data) {
+ _checkCanAddEvent();
+ onAdd(data);
+ }
+
+ /// A method that handles data events that are passed to the sink.
+ @visibleForOverriding
+ void onAdd(T data);
+
+ @override
+ void addError(Object error, [StackTrace? stackTrace]) {
+ _checkCanAddEvent();
+ onError(error, stackTrace);
+ }
+
+ /// A method that handles error events that are passed to the sink.
+ @visibleForOverriding
+ void onError(Object error, [StackTrace? stackTrace]);
+
+ @override
+ Future<void> close() => _closeMemo.runOnce(onClose);
+ final _closeMemo = AsyncMemoizer<void>();
+
+ /// A method that handles the sink being closed.
+ ///
+ /// This may return a future that completes once the stream sink has shut
+ /// down. If cleaning up can fail, the error may be reported in the returned
+ /// future.
+ @visibleForOverriding
+ FutureOr<void> onClose();
+
+ /// Asserts that the sink is in a state where adding an event is valid.
+ void _checkCanAddEvent() {
+ if (_closed) throw StateError('Cannot add event after closing');
+ }
+}
+
+/// An abstract class that implements [StreamSink] in terms of [onData],
+/// [onError], and [onClose] methods.
+///
+/// This takes care of ensuring that events can't be added after [close] is
+/// called or during a call to [onStream].
+abstract class StreamSinkBase<T> extends EventSinkBase<T>
+ implements StreamSink<T> {
+ /// Whether a call to [addStream] is ongoing.
+ bool _addingStream = false;
+
+ @override
+ Future<void> get done => _closeMemo.future;
+
+ @override
+ Future<void> addStream(Stream<T> stream) {
+ _checkCanAddEvent();
+
+ _addingStream = true;
+ var completer = Completer<void>.sync();
+ stream.listen(onAdd, onError: onError, onDone: () {
+ _addingStream = false;
+ completer.complete();
+ });
+ return completer.future;
+ }
+
+ @override
+ Future<void> close() {
+ if (_addingStream) throw StateError('StreamSink is bound to a stream');
+ return super.close();
+ }
+
+ @override
+ void _checkCanAddEvent() {
+ super._checkCanAddEvent();
+ if (_addingStream) throw StateError('StreamSink is bound to a stream');
+ }
+}
+
+/// An abstract class that implements `dart:io`'s [IOSink]'s API in terms of
+/// [onData], [onError], [onClose], and [onFlush] methods.
+///
+/// Because [IOSink] is defined in `dart:io`, this can't officially implement
+/// it. However, it's designed to match its API exactly so that subclasses can
+/// implement [IOSink] without any additional modifications.
+///
+/// This takes care of ensuring that events can't be added after [close] is
+/// called or during a call to [onStream].
+abstract class IOSinkBase extends StreamSinkBase<List<int>> {
+ /// See [IOSink.encoding] from `dart:io`.
+ Encoding encoding;
+
+ IOSinkBase([this.encoding = utf8]);
+
+ /// See [IOSink.flush] from `dart:io`.
+ ///
+ /// Because this base class doesn't do any buffering of its own, [flush]
+ /// always completes immediately.
+ ///
+ /// Subclasses that do buffer events should override [flush] to complete once
+ /// all events are delivered. They should also call `super.flush()` at the
+ /// beginning of the method to throw a [StateError] if the sink is currently
+ /// adding a stream.
+ Future<void> flush() {
+ if (_addingStream) throw StateError('StreamSink is bound to a stream');
+ if (_closed) return Future.value();
+
+ _addingStream = true;
+ return onFlush().whenComplete(() {
+ _addingStream = false;
+ });
+ }
+
+ /// Flushes any buffered data to the underlying consumer, and returns a future
+ /// that completes once the consumer has accepted all data.
+ @visibleForOverriding
+ Future<void> onFlush();
+
+ /// See [IOSink.write] from `dart:io`.
+ void write(Object? object) {
+ var string = object.toString();
+ if (string.isEmpty) return;
+ add(encoding.encode(string));
+ }
+
+ /// See [IOSink.writeAll] from `dart:io`.
+ void writeAll(Iterable<Object?> objects, [String separator = '']) {
+ var first = true;
+ for (var object in objects) {
+ if (first) {
+ first = false;
+ } else {
+ write(separator);
+ }
+
+ write(object);
+ }
+ }
+
+ /// See [IOSink.writeln] from `dart:io`.
+ void writeln([Object? object = '']) {
+ write(object);
+ write('\n');
+ }
+
+ /// See [IOSink.writeCharCode] from `dart:io`.
+ void writeCharCode(int charCode) {
+ write(String.fromCharCode(charCode));
+ }
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index 79dc278..b0a68a6 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: async
-version: 2.7.1-dev
+version: 2.8.0
description: Utility functions and classes related to the 'dart:async' library.
repository: https://github.com/dart-lang/async
@@ -12,6 +12,7 @@
meta: ^1.1.7
dev_dependencies:
+ charcode: ^1.3.0
fake_async: ^1.2.0
pedantic: ^1.10.0
stack_trace: ^1.10.0
diff --git a/test/io_sink_impl.dart b/test/io_sink_impl.dart
new file mode 100644
index 0000000..63fa289
--- /dev/null
+++ b/test/io_sink_impl.dart
@@ -0,0 +1,23 @@
+// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:io';
+
+import 'package:async/async.dart';
+
+/// This class isn't used, it's just used to verify that [IOSinkBase] produces a
+/// valid implementation of [IOSink].
+class IOSinkImpl extends IOSinkBase implements IOSink {
+ @override
+ void onAdd(List<int> data) {}
+
+ @override
+ void onError(Object error, [StackTrace? stackTrace]) {}
+
+ @override
+ void onClose() {}
+
+ @override
+ Future<void> onFlush() => Future.value();
+}
diff --git a/test/sink_base_test.dart b/test/sink_base_test.dart
new file mode 100644
index 0000000..da3ca9f
--- /dev/null
+++ b/test/sink_base_test.dart
@@ -0,0 +1,403 @@
+// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:charcode/charcode.dart';
+import 'package:test/test.dart';
+
+import 'package:async/async.dart';
+
+void main() {
+ // We don't explicitly test [EventSinkBase] because it shares all the relevant
+ // implementation with [StreamSinkBase].
+ group('StreamSinkBase', () {
+ test('forwards add() to onAdd()', () {
+ var sink = _StreamSink(onAdd: expectAsync1((value) {
+ expect(value, equals(123));
+ }));
+ sink.add(123);
+ });
+
+ test('forwards addError() to onError()', () {
+ var sink = _StreamSink(onError: expectAsync2((error, [stackTrace]) {
+ expect(error, equals('oh no'));
+ expect(stackTrace, isA<StackTrace>());
+ }));
+ sink.addError('oh no', StackTrace.current);
+ });
+
+ test('forwards addStream() to onAdd() and onError()', () {
+ var sink = _StreamSink(
+ onAdd: expectAsync1((value) {
+ expect(value, equals(123));
+ }, count: 1),
+ onError: expectAsync2((error, [stackTrace]) {
+ expect(error, equals('oh no'));
+ expect(stackTrace, isA<StackTrace>());
+ }));
+
+ var controller = StreamController<int>();
+ sink.addStream(controller.stream);
+
+ controller.add(123);
+ controller.addError('oh no', StackTrace.current);
+ });
+
+ test('addStream() returns once the stream closes', () async {
+ var sink = _StreamSink();
+ var controller = StreamController<int>();
+ var addStreamCompleted = false;
+ sink.addStream(controller.stream).then((_) => addStreamCompleted = true);
+ ;
+
+ await pumpEventQueue();
+ expect(addStreamCompleted, isFalse);
+
+ controller.addError('oh no', StackTrace.current);
+ await pumpEventQueue();
+ expect(addStreamCompleted, isFalse);
+
+ controller.close();
+ await pumpEventQueue();
+ expect(addStreamCompleted, isTrue);
+ });
+
+ test('forwards close() to onClose()', () {
+ var sink = _StreamSink(onClose: expectAsync0(() {}));
+ expect(sink.close(), completes);
+ });
+
+ test('onClose() is only invoked once', () {
+ var sink = _StreamSink(onClose: expectAsync0(() {}, count: 1));
+ expect(sink.close(), completes);
+ expect(sink.close(), completes);
+ expect(sink.close(), completes);
+ });
+
+ test('all invocations of close() return the same future', () async {
+ var completer = Completer();
+ var sink = _StreamSink(onClose: expectAsync0(() => completer.future));
+
+ var close1Completed = false;
+ sink.close().then((_) => close1Completed = true);
+
+ var close2Completed = false;
+ sink.close().then((_) => close2Completed = true);
+
+ var doneCompleted = false;
+ sink.done.then((_) => doneCompleted = true);
+
+ await pumpEventQueue();
+ expect(close1Completed, isFalse);
+ expect(close2Completed, isFalse);
+ expect(doneCompleted, isFalse);
+
+ completer.complete();
+ await pumpEventQueue();
+ expect(close1Completed, isTrue);
+ expect(close2Completed, isTrue);
+ expect(doneCompleted, isTrue);
+ });
+
+ test('done returns a future that completes once close() completes',
+ () async {
+ var completer = Completer();
+ var sink = _StreamSink(onClose: expectAsync0(() => completer.future));
+
+ var doneCompleted = false;
+ sink.done.then((_) => doneCompleted = true);
+
+ await pumpEventQueue();
+ expect(doneCompleted, isFalse);
+
+ expect(sink.close(), completes);
+ await pumpEventQueue();
+ expect(doneCompleted, isFalse);
+
+ completer.complete();
+ await pumpEventQueue();
+ expect(doneCompleted, isTrue);
+ });
+
+ group('during addStream()', () {
+ test('add() throws an error', () {
+ var sink = _StreamSink(onAdd: expectAsync1((_) {}, count: 0));
+ sink.addStream(StreamController<int>().stream);
+ expect(() => sink.add(1), throwsStateError);
+ });
+
+ test('addError() throws an error', () {
+ var sink = _StreamSink(onError: expectAsync2((_, [__]) {}, count: 0));
+ sink.addStream(StreamController<int>().stream);
+ expect(() => sink.addError('oh no'), throwsStateError);
+ });
+
+ test('addStream() throws an error', () {
+ var sink = _StreamSink(onAdd: expectAsync1((_) {}, count: 0));
+ sink.addStream(StreamController<int>().stream);
+ expect(() => sink.addStream(Stream.value(123)), throwsStateError);
+ });
+
+ test('close() throws an error', () {
+ var sink = _StreamSink(onClose: expectAsync0(() {}, count: 0));
+ sink.addStream(StreamController<int>().stream);
+ expect(() => sink.close(), throwsStateError);
+ });
+ });
+
+ group("once it's closed", () {
+ test('add() throws an error', () {
+ var sink = _StreamSink(onAdd: expectAsync1((_) {}, count: 0));
+ expect(sink.close(), completes);
+ expect(() => sink.add(1), throwsStateError);
+ });
+
+ test('addError() throws an error', () {
+ var sink = _StreamSink(onError: expectAsync2((_, [__]) {}, count: 0));
+ expect(sink.close(), completes);
+ expect(() => sink.addError('oh no'), throwsStateError);
+ });
+
+ test('addStream() throws an error', () {
+ var sink = _StreamSink(onAdd: expectAsync1((_) {}, count: 0));
+ expect(sink.close(), completes);
+ expect(() => sink.addStream(Stream.value(123)), throwsStateError);
+ });
+ });
+ });
+
+ group('IOSinkBase', () {
+ group('write()', () {
+ test("doesn't call add() for the empty string", () async {
+ var sink = _IOSink(onAdd: expectAsync1((_) {}, count: 0));
+ sink.write('');
+ });
+
+ test('converts the text to data and passes it to add', () async {
+ var sink = _IOSink(onAdd: expectAsync1((data) {
+ expect(data, equals(utf8.encode('hello')));
+ }));
+ sink.write('hello');
+ });
+
+ test('calls Object.toString()', () async {
+ var sink = _IOSink(onAdd: expectAsync1((data) {
+ expect(data, equals(utf8.encode('123')));
+ }));
+ sink.write(123);
+ });
+
+ test('respects the encoding', () async {
+ var sink = _IOSink(
+ onAdd: expectAsync1((data) {
+ expect(data, equals(latin1.encode('Æ')));
+ }),
+ encoding: latin1);
+ sink.write('Æ');
+ });
+
+ test('throws if the sink is closed', () async {
+ var sink = _IOSink(onAdd: expectAsync1((_) {}, count: 0));
+ expect(sink.close(), completes);
+ expect(() => sink.write('hello'), throwsStateError);
+ });
+ });
+
+ group('writeAll()', () {
+ test('writes nothing for an empty iterable', () async {
+ var sink = _IOSink(onAdd: expectAsync1((_) {}, count: 0));
+ sink.writeAll([]);
+ });
+
+ test('writes each object in the iterable', () async {
+ var chunks = <List<int>>[];
+ var sink = _IOSink(
+ onAdd: expectAsync1((data) {
+ chunks.add(data);
+ }, count: 3));
+
+ sink.writeAll(['hello', null, 123]);
+ expect(chunks, equals(['hello', 'null', '123'].map(utf8.encode)));
+ });
+
+ test('writes separators between each object', () async {
+ var chunks = <List<int>>[];
+ var sink = _IOSink(
+ onAdd: expectAsync1((data) {
+ chunks.add(data);
+ }, count: 5));
+
+ sink.writeAll(['hello', null, 123], '/');
+ expect(chunks,
+ equals(['hello', '/', 'null', '/', '123'].map(utf8.encode)));
+ });
+
+ test('throws if the sink is closed', () async {
+ var sink = _IOSink(onAdd: expectAsync1((_) {}, count: 0));
+ expect(sink.close(), completes);
+ expect(() => sink.writeAll(['hello']), throwsStateError);
+ });
+ });
+
+ group('writeln()', () {
+ test('only writes a newline by default', () async {
+ var sink = _IOSink(
+ onAdd: expectAsync1((data) {
+ expect(data, equals(utf8.encode('\n')));
+ }, count: 1));
+ sink.writeln();
+ });
+
+ test('writes the object followed by a newline', () async {
+ var chunks = <List<int>>[];
+ var sink = _IOSink(
+ onAdd: expectAsync1((data) {
+ chunks.add(data);
+ }, count: 2));
+ sink.writeln(123);
+
+ expect(chunks, equals(['123', '\n'].map(utf8.encode)));
+ });
+
+ test('throws if the sink is closed', () async {
+ var sink = _IOSink(onAdd: expectAsync1((_) {}, count: 0));
+ expect(sink.close(), completes);
+ expect(() => sink.writeln(), throwsStateError);
+ });
+ });
+
+ group('writeCharCode()', () {
+ test('writes the character code', () async {
+ var sink = _IOSink(onAdd: expectAsync1((data) {
+ expect(data, equals(utf8.encode('A')));
+ }));
+ sink.writeCharCode($A);
+ });
+
+ test('respects the encoding', () async {
+ var sink = _IOSink(
+ onAdd: expectAsync1((data) {
+ expect(data, equals(latin1.encode('Æ')));
+ }),
+ encoding: latin1);
+ sink.writeCharCode('Æ'.runes.first);
+ });
+
+ test('throws if the sink is closed', () async {
+ var sink = _IOSink(onAdd: expectAsync1((_) {}, count: 0));
+ expect(sink.close(), completes);
+ expect(() => sink.writeCharCode($A), throwsStateError);
+ });
+ });
+
+ group('flush()', () {
+ test('returns a future that completes when onFlush() is done', () async {
+ var completer = Completer();
+ var sink = _IOSink(onFlush: expectAsync0(() => completer.future));
+
+ var flushDone = false;
+ sink.flush().then((_) => flushDone = true);
+
+ await pumpEventQueue();
+ expect(flushDone, isFalse);
+
+ completer.complete();
+ await pumpEventQueue();
+ expect(flushDone, isTrue);
+ });
+
+ test('does nothing after close() is called', () {
+ var sink =
+ _IOSink(onFlush: expectAsync0(() => Future.value(), count: 0));
+ expect(sink.close(), completes);
+ expect(sink.flush(), completes);
+ });
+
+ test("can't be called during addStream()", () {
+ var sink =
+ _IOSink(onFlush: expectAsync0(() => Future.value(), count: 0));
+ sink.addStream(StreamController<List<int>>().stream);
+ expect(() => sink.flush(), throwsStateError);
+ });
+
+ test('locks the sink as though a stream was being added', () {
+ var sink = _IOSink(onFlush: expectAsync0(() => Completer().future));
+ sink.flush();
+ expect(() => sink.add([0]), throwsStateError);
+ expect(() => sink.addError('oh no'), throwsStateError);
+ expect(() => sink.addStream(Stream.empty()), throwsStateError);
+ expect(() => sink.flush(), throwsStateError);
+ expect(() => sink.close(), throwsStateError);
+ });
+ });
+ });
+}
+
+/// A subclass of [StreamSinkBase] that takes all the overridable methods as
+/// callbacks, for ease of testing.
+class _StreamSink extends StreamSinkBase<int> {
+ final void Function(int value) _onAdd;
+ final void Function(Object error, [StackTrace? stackTrace]) _onError;
+ final FutureOr<void> Function() _onClose;
+
+ _StreamSink(
+ {void Function(int value)? onAdd,
+ void Function(Object error, [StackTrace? stackTrace])? onError,
+ FutureOr<void> Function()? onClose})
+ : _onAdd = onAdd ?? ((_) {}),
+ _onError = onError ?? ((_, [__]) {}),
+ _onClose = onClose ?? (() {});
+
+ @override
+ void onAdd(int value) {
+ _onAdd(value);
+ }
+
+ @override
+ void onError(Object error, [StackTrace? stackTrace]) {
+ _onError(error, stackTrace);
+ }
+
+ @override
+ FutureOr<void> onClose() => _onClose();
+}
+
+/// A subclass of [IOSinkBase] that takes all the overridable methods as
+/// callbacks, for ease of testing.
+class _IOSink extends IOSinkBase {
+ final void Function(List<int> value) _onAdd;
+ final void Function(Object error, [StackTrace? stackTrace]) _onError;
+ final FutureOr<void> Function() _onClose;
+ final Future<void> Function() _onFlush;
+
+ _IOSink(
+ {void Function(List<int> value)? onAdd,
+ void Function(Object error, [StackTrace? stackTrace])? onError,
+ FutureOr<void> Function()? onClose,
+ Future<void> Function()? onFlush,
+ Encoding encoding = utf8})
+ : _onAdd = onAdd ?? ((_) {}),
+ _onError = onError ?? ((_, [__]) {}),
+ _onClose = onClose ?? (() {}),
+ _onFlush = onFlush ?? (() => Future.value()),
+ super(encoding);
+
+ @override
+ void onAdd(List<int> value) {
+ _onAdd(value);
+ }
+
+ @override
+ void onError(Object error, [StackTrace? stackTrace]) {
+ _onError(error, stackTrace);
+ }
+
+ @override
+ FutureOr<void> onClose() => _onClose();
+
+ @override
+ Future<void> onFlush() => _onFlush();
+}