Add a StreamSink.rejectErrors() extension method (#169)
This makes it easy for authors to expose sinks that can't natively
consume errors, but still handle them in a consistent and robust
manner.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a0f15c1..e26b712 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,9 @@
* Added `ChunkedStreamReader` for reading _chunked streams_ without managing
buffers.
+* Add extensions on `StreamSink`, including `StreamSink.transform()` for
+ applying `StreamSinkTransformer`s and `StreamSink.rejectErrors()`.
+
* Add `StreamGroup.isIdle` and `StreamGroup.onIdle`.
* Add `StreamGroup.isClosed` and `FutureGroup.isClosed` getters.
diff --git a/lib/async.dart b/lib/async.dart
index 7b9d942..2d5876a 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -32,6 +32,7 @@
export 'src/stream_group.dart';
export 'src/stream_queue.dart';
export 'src/stream_sink_completer.dart';
+export 'src/stream_sink_extensions.dart';
export 'src/stream_sink_transformer.dart';
export 'src/stream_splitter.dart';
export 'src/stream_subscription_transformer.dart';
diff --git a/lib/src/stream_sink_extensions.dart b/lib/src/stream_sink_extensions.dart
new file mode 100644
index 0000000..ed43341
--- /dev/null
+++ b/lib/src/stream_sink_extensions.dart
@@ -0,0 +1,22 @@
+// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'stream_sink_transformer.dart';
+import 'stream_sink_transformer/reject_errors.dart';
+
+/// Extensions on [StreamSink] to make stream transformations more fluent.
+extension StreamSinkExtensions<T> on StreamSink<T> {
+ /// Transforms a [StreamSink] using [transformer].
+ StreamSink<S> transform<S>(StreamSinkTransformer<S, T> transformer) =>
+ transformer.bind(this);
+
+ /// Returns a [StreamSink] that forwards to [this] but rejects errors.
+ ///
+ /// If an error is passed (either by [addError] or [addStream]), the
+ /// underlying sink will be closed and the error will be forwarded to the
+ /// returned sink's [StreamSink.done] future. Further events will be ignored.
+ StreamSink<T> rejectErrors() => RejectErrorsSink(this);
+}
diff --git a/lib/src/stream_sink_transformer/reject_errors.dart b/lib/src/stream_sink_transformer/reject_errors.dart
new file mode 100644
index 0000000..a8d130f
--- /dev/null
+++ b/lib/src/stream_sink_transformer/reject_errors.dart
@@ -0,0 +1,127 @@
+// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+/// A [StreamSink] wrapper that rejects all errors passed into the sink.
+class RejectErrorsSink<T> implements StreamSink<T> {
+ /// The target sink.
+ final StreamSink<T> _inner;
+
+ @override
+ Future<void> get done => _doneCompleter.future;
+ final _doneCompleter = Completer<void>();
+
+ /// Whether the user has called [close].
+ ///
+ /// If [_closed] is true, [_canceled] must be true and [_inAddStream] must be
+ /// false.
+ bool _closed = false;
+
+ /// The subscription to the stream passed to [addStream], if a stream is
+ /// currently being added.
+ StreamSubscription<T>? _addStreamSubscription;
+
+ /// The completer for the future returned by [addStream], if a stream is
+ /// currently being added.
+ Completer<void>? _addStreamCompleter;
+
+ /// Whether we're currently adding a stream with [addStream].
+ bool get _inAddStream => _addStreamSubscription != null;
+
+ RejectErrorsSink(this._inner) {
+ _inner.done.then((value) {
+ _cancelAddStream();
+ if (!_canceled) _doneCompleter.complete(value);
+ }).onError<Object>((error, stackTrace) {
+ _cancelAddStream();
+ if (!_canceled) _doneCompleter.completeError(error, stackTrace);
+ });
+ }
+
+ /// Whether the underlying sink is no longer receiving events.
+ ///
+ /// This can happen if:
+ ///
+ /// * [close] has been called,
+ /// * an error has been passed,
+ /// * or the underlying [StreamSink.done] has completed.
+ ///
+ /// If [_canceled] is true, [_inAddStream] must be false.
+ bool get _canceled => _doneCompleter.isCompleted;
+
+ @override
+ void add(T data) {
+ if (_closed) throw StateError('Cannot add event after closing.');
+ if (_inAddStream) {
+ throw StateError('Cannot add event while adding stream.');
+ }
+ if (_canceled) return;
+
+ _inner.add(data);
+ }
+
+ @override
+ void addError(error, [StackTrace? stackTrace]) {
+ if (_closed) throw StateError('Cannot add event after closing.');
+ if (_inAddStream) {
+ throw StateError('Cannot add event while adding stream.');
+ }
+ if (_canceled) return;
+
+ _addError(error, stackTrace);
+ }
+
+ /// Like [addError], but doesn't check to ensure that an error can be added.
+ ///
+ /// This is called from [addStream], so it shouldn't fail if a stream is being
+ /// added.
+ void _addError(Object error, [StackTrace? stackTrace]) {
+ _cancelAddStream();
+ _doneCompleter.completeError(error, stackTrace);
+
+ // Ignore errors from the inner sink. We're already surfacing one error, and
+ // if the user handles it we don't want them to have another top-level.
+ _inner.close().catchError((_) {});
+ }
+
+ @override
+ Future<void> addStream(Stream<T> stream) {
+ if (_closed) throw StateError('Cannot add stream after closing.');
+ if (_inAddStream) {
+ throw StateError('Cannot add stream while adding stream.');
+ }
+ if (_canceled) return Future.value();
+
+ var addStreamCompleter = _addStreamCompleter = Completer.sync();
+ _addStreamSubscription = stream.listen(_inner.add,
+ onError: _addError, onDone: addStreamCompleter.complete);
+ return addStreamCompleter.future.then((_) {
+ _addStreamCompleter = null;
+ _addStreamSubscription = null;
+ });
+ }
+
+ @override
+ Future<void> close() {
+ if (_inAddStream) {
+ throw StateError('Cannot close sink while adding stream.');
+ }
+
+ if (_closed) return done;
+ _closed = true;
+
+ if (!_canceled) _doneCompleter.complete(_inner.close());
+ return done;
+ }
+
+ /// If an [addStream] call is active, cancel its subscription and complete its
+ /// completer.
+ void _cancelAddStream() {
+ if (!_inAddStream) return;
+ _addStreamCompleter!.complete(_addStreamSubscription!.cancel());
+ _addStreamCompleter = null;
+ _addStreamSubscription = null;
+ }
+}
diff --git a/test/reject_errors_test.dart b/test/reject_errors_test.dart
new file mode 100644
index 0000000..32bffd1
--- /dev/null
+++ b/test/reject_errors_test.dart
@@ -0,0 +1,205 @@
+// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE filevents.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:test/test.dart';
+
+void main() {
+ late StreamController controller;
+ setUp(() {
+ controller = StreamController();
+ });
+
+ test('passes through data events', () {
+ controller.sink.rejectErrors()..add(1)..add(2)..add(3);
+ expect(controller.stream, emitsInOrder([1, 2, 3]));
+ });
+
+ test('passes through close events', () {
+ controller.sink.rejectErrors()
+ ..add(1)
+ ..close();
+ expect(controller.stream, emitsInOrder([1, emitsDone]));
+ });
+
+ test('passes through data events from addStream()', () {
+ controller.sink.rejectErrors().addStream(Stream.fromIterable([1, 2, 3]));
+ expect(controller.stream, emitsInOrder([1, 2, 3]));
+ });
+
+ test('allows multiple addStream() calls', () async {
+ var transformed = controller.sink.rejectErrors();
+ await transformed.addStream(Stream.fromIterable([1, 2, 3]));
+ await transformed.addStream(Stream.fromIterable([4, 5, 6]));
+ expect(controller.stream, emitsInOrder([1, 2, 3, 4, 5, 6]));
+ });
+
+ group('on addError()', () {
+ test('forwards the error to done', () {
+ var transformed = controller.sink.rejectErrors();
+ transformed.addError('oh no');
+ expect(transformed.done, throwsA('oh no'));
+ });
+
+ test('closes the underlying sink', () {
+ var transformed = controller.sink.rejectErrors();
+ transformed.addError('oh no');
+ transformed.done.catchError((_) {});
+
+ expect(controller.stream, emitsDone);
+ });
+
+ test('ignores further events', () async {
+ var transformed = controller.sink.rejectErrors();
+ transformed.addError('oh no');
+ transformed.done.catchError((_) {});
+ expect(controller.stream, emitsDone);
+
+ // Try adding events synchronously and asynchronously and verify that they
+ // don't throw and also aren't passed to the underlying sink.
+ transformed
+ ..add(1)
+ ..addError('another');
+ await pumpEventQueue();
+ transformed
+ ..add(2)
+ ..addError('yet another');
+ });
+
+ test('cancels the current subscription', () async {
+ var inputCanceled = false;
+ var inputController =
+ StreamController(onCancel: () => inputCanceled = true);
+
+ var transformed = controller.sink.rejectErrors()
+ ..addStream(inputController.stream);
+ inputController.addError('oh no');
+ transformed.done.catchError((_) {});
+
+ await pumpEventQueue();
+ expect(inputCanceled, isTrue);
+ });
+ });
+
+ group('when the inner sink\'s done future completes', () {
+ test('done completes', () async {
+ var completer = Completer();
+ var transformed = NullStreamSink(done: completer.future).rejectErrors();
+
+ var doneCompleted = false;
+ transformed.done.then((_) => doneCompleted = true);
+ await pumpEventQueue();
+ expect(doneCompleted, isFalse);
+
+ completer.complete();
+ await pumpEventQueue();
+ expect(doneCompleted, isTrue);
+ });
+
+ test('an outstanding addStream() completes', () async {
+ var completer = Completer();
+ var transformed = NullStreamSink(done: completer.future).rejectErrors();
+
+ var addStreamCompleted = false;
+ transformed
+ .addStream(StreamController().stream)
+ .then((_) => addStreamCompleted = true);
+ await pumpEventQueue();
+ expect(addStreamCompleted, isFalse);
+
+ completer.complete();
+ await pumpEventQueue();
+ expect(addStreamCompleted, isTrue);
+ });
+
+ test('an outstanding addStream()\'s subscription is cancelled', () async {
+ var completer = Completer();
+ var transformed = NullStreamSink(done: completer.future).rejectErrors();
+
+ var addStreamCancelled = false;
+ transformed.addStream(
+ StreamController(onCancel: () => addStreamCancelled = true).stream);
+ await pumpEventQueue();
+ expect(addStreamCancelled, isFalse);
+
+ completer.complete();
+ await pumpEventQueue();
+ expect(addStreamCancelled, isTrue);
+ });
+
+ test('forwards an outstanding addStream()\'s cancellation error', () async {
+ var completer = Completer();
+ var transformed = NullStreamSink(done: completer.future).rejectErrors();
+
+ expect(
+ transformed.addStream(
+ StreamController(onCancel: () => throw 'oh no').stream),
+ throwsA('oh no'));
+ completer.complete();
+ });
+
+ group('forwards its error', () {
+ test('through done', () async {
+ expect(NullStreamSink(done: Future.error('oh no')).rejectErrors().done,
+ throwsA('oh no'));
+ });
+
+ test('through close', () async {
+ expect(
+ NullStreamSink(done: Future.error('oh no')).rejectErrors().close(),
+ throwsA('oh no'));
+ });
+ });
+ });
+
+ group('after closing', () {
+ test('throws on add()', () {
+ var sink = controller.sink.rejectErrors()..close();
+ expect(() => sink.add(1), throwsStateError);
+ });
+
+ test('throws on addError()', () {
+ var sink = controller.sink.rejectErrors()..close();
+ expect(() => sink.addError('oh no'), throwsStateError);
+ });
+
+ test('throws on addStream()', () {
+ var sink = controller.sink.rejectErrors()..close();
+ expect(() => sink.addStream(Stream.empty()), throwsStateError);
+ });
+
+ test('allows close()', () {
+ var sink = controller.sink.rejectErrors()..close();
+ sink.close(); // Shouldn't throw
+ });
+ });
+
+ group('during an active addStream()', () {
+ test('throws on add()', () {
+ var sink = controller.sink.rejectErrors()
+ ..addStream(StreamController().stream);
+ expect(() => sink.add(1), throwsStateError);
+ });
+
+ test('throws on addError()', () {
+ var sink = controller.sink.rejectErrors()
+ ..addStream(StreamController().stream);
+ expect(() => sink.addError('oh no'), throwsStateError);
+ });
+
+ test('throws on addStream()', () {
+ var sink = controller.sink.rejectErrors()
+ ..addStream(StreamController().stream);
+ expect(() => sink.addStream(Stream.empty()), throwsStateError);
+ });
+
+ test('throws on close()', () {
+ var sink = controller.sink.rejectErrors()
+ ..addStream(StreamController().stream);
+ expect(() => sink.close(), throwsStateError);
+ });
+ });
+}