Add a StreamSink.rejectErrors() extension method (#169)

This makes it easy for authors to expose sinks that can't natively
consume errors, but still handle them in a consistent and robust
manner.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a0f15c1..e26b712 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -6,6 +6,9 @@
 * Added `ChunkedStreamReader` for reading _chunked streams_ without managing
   buffers.
 
+* Add extensions on `StreamSink`, including `StreamSink.transform()` for
+  applying `StreamSinkTransformer`s and `StreamSink.rejectErrors()`.
+
 * Add `StreamGroup.isIdle` and `StreamGroup.onIdle`.
 
 * Add `StreamGroup.isClosed` and `FutureGroup.isClosed` getters.
diff --git a/lib/async.dart b/lib/async.dart
index 7b9d942..2d5876a 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -32,6 +32,7 @@
 export 'src/stream_group.dart';
 export 'src/stream_queue.dart';
 export 'src/stream_sink_completer.dart';
+export 'src/stream_sink_extensions.dart';
 export 'src/stream_sink_transformer.dart';
 export 'src/stream_splitter.dart';
 export 'src/stream_subscription_transformer.dart';
diff --git a/lib/src/stream_sink_extensions.dart b/lib/src/stream_sink_extensions.dart
new file mode 100644
index 0000000..ed43341
--- /dev/null
+++ b/lib/src/stream_sink_extensions.dart
@@ -0,0 +1,22 @@
+// Copyright (c) 2021, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'stream_sink_transformer.dart';
+import 'stream_sink_transformer/reject_errors.dart';
+
+/// Extensions on [StreamSink] to make stream transformations more fluent.
+extension StreamSinkExtensions<T> on StreamSink<T> {
+  /// Transforms a [StreamSink] using [transformer].
+  StreamSink<S> transform<S>(StreamSinkTransformer<S, T> transformer) =>
+      transformer.bind(this);
+
+  /// Returns a [StreamSink] that forwards to [this] but rejects errors.
+  ///
+  /// If an error is passed (either by [addError] or [addStream]), the
+  /// underlying sink will be closed and the error will be forwarded to the
+  /// returned sink's [StreamSink.done] future. Further events will be ignored.
+  StreamSink<T> rejectErrors() => RejectErrorsSink(this);
+}
diff --git a/lib/src/stream_sink_transformer/reject_errors.dart b/lib/src/stream_sink_transformer/reject_errors.dart
new file mode 100644
index 0000000..a8d130f
--- /dev/null
+++ b/lib/src/stream_sink_transformer/reject_errors.dart
@@ -0,0 +1,127 @@
+// Copyright (c) 2021, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+/// A [StreamSink] wrapper that rejects all errors passed into the sink.
+class RejectErrorsSink<T> implements StreamSink<T> {
+  /// The target sink.
+  final StreamSink<T> _inner;
+
+  @override
+  Future<void> get done => _doneCompleter.future;
+  final _doneCompleter = Completer<void>();
+
+  /// Whether the user has called [close].
+  ///
+  /// If [_closed] is true, [_canceled] must be true and [_inAddStream] must be
+  /// false.
+  bool _closed = false;
+
+  /// The subscription to the stream passed to [addStream], if a stream is
+  /// currently being added.
+  StreamSubscription<T>? _addStreamSubscription;
+
+  /// The completer for the future returned by [addStream], if a stream is
+  /// currently being added.
+  Completer<void>? _addStreamCompleter;
+
+  /// Whether we're currently adding a stream with [addStream].
+  bool get _inAddStream => _addStreamSubscription != null;
+
+  RejectErrorsSink(this._inner) {
+    _inner.done.then((value) {
+      _cancelAddStream();
+      if (!_canceled) _doneCompleter.complete(value);
+    }).onError<Object>((error, stackTrace) {
+      _cancelAddStream();
+      if (!_canceled) _doneCompleter.completeError(error, stackTrace);
+    });
+  }
+
+  /// Whether the underlying sink is no longer receiving events.
+  ///
+  /// This can happen if:
+  ///
+  /// * [close] has been called,
+  /// * an error has been passed,
+  /// * or the underlying [StreamSink.done] has completed.
+  ///
+  /// If [_canceled] is true, [_inAddStream] must be false.
+  bool get _canceled => _doneCompleter.isCompleted;
+
+  @override
+  void add(T data) {
+    if (_closed) throw StateError('Cannot add event after closing.');
+    if (_inAddStream) {
+      throw StateError('Cannot add event while adding stream.');
+    }
+    if (_canceled) return;
+
+    _inner.add(data);
+  }
+
+  @override
+  void addError(error, [StackTrace? stackTrace]) {
+    if (_closed) throw StateError('Cannot add event after closing.');
+    if (_inAddStream) {
+      throw StateError('Cannot add event while adding stream.');
+    }
+    if (_canceled) return;
+
+    _addError(error, stackTrace);
+  }
+
+  /// Like [addError], but doesn't check to ensure that an error can be added.
+  ///
+  /// This is called from [addStream], so it shouldn't fail if a stream is being
+  /// added.
+  void _addError(Object error, [StackTrace? stackTrace]) {
+    _cancelAddStream();
+    _doneCompleter.completeError(error, stackTrace);
+
+    // Ignore errors from the inner sink. We're already surfacing one error, and
+    // if the user handles it we don't want them to have another top-level.
+    _inner.close().catchError((_) {});
+  }
+
+  @override
+  Future<void> addStream(Stream<T> stream) {
+    if (_closed) throw StateError('Cannot add stream after closing.');
+    if (_inAddStream) {
+      throw StateError('Cannot add stream while adding stream.');
+    }
+    if (_canceled) return Future.value();
+
+    var addStreamCompleter = _addStreamCompleter = Completer.sync();
+    _addStreamSubscription = stream.listen(_inner.add,
+        onError: _addError, onDone: addStreamCompleter.complete);
+    return addStreamCompleter.future.then((_) {
+      _addStreamCompleter = null;
+      _addStreamSubscription = null;
+    });
+  }
+
+  @override
+  Future<void> close() {
+    if (_inAddStream) {
+      throw StateError('Cannot close sink while adding stream.');
+    }
+
+    if (_closed) return done;
+    _closed = true;
+
+    if (!_canceled) _doneCompleter.complete(_inner.close());
+    return done;
+  }
+
+  /// If an [addStream] call is active, cancel its subscription and complete its
+  /// completer.
+  void _cancelAddStream() {
+    if (!_inAddStream) return;
+    _addStreamCompleter!.complete(_addStreamSubscription!.cancel());
+    _addStreamCompleter = null;
+    _addStreamSubscription = null;
+  }
+}
diff --git a/test/reject_errors_test.dart b/test/reject_errors_test.dart
new file mode 100644
index 0000000..32bffd1
--- /dev/null
+++ b/test/reject_errors_test.dart
@@ -0,0 +1,205 @@
+// Copyright (c) 2021, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE filevents.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:test/test.dart';
+
+void main() {
+  late StreamController controller;
+  setUp(() {
+    controller = StreamController();
+  });
+
+  test('passes through data events', () {
+    controller.sink.rejectErrors()..add(1)..add(2)..add(3);
+    expect(controller.stream, emitsInOrder([1, 2, 3]));
+  });
+
+  test('passes through close events', () {
+    controller.sink.rejectErrors()
+      ..add(1)
+      ..close();
+    expect(controller.stream, emitsInOrder([1, emitsDone]));
+  });
+
+  test('passes through data events from addStream()', () {
+    controller.sink.rejectErrors().addStream(Stream.fromIterable([1, 2, 3]));
+    expect(controller.stream, emitsInOrder([1, 2, 3]));
+  });
+
+  test('allows multiple addStream() calls', () async {
+    var transformed = controller.sink.rejectErrors();
+    await transformed.addStream(Stream.fromIterable([1, 2, 3]));
+    await transformed.addStream(Stream.fromIterable([4, 5, 6]));
+    expect(controller.stream, emitsInOrder([1, 2, 3, 4, 5, 6]));
+  });
+
+  group('on addError()', () {
+    test('forwards the error to done', () {
+      var transformed = controller.sink.rejectErrors();
+      transformed.addError('oh no');
+      expect(transformed.done, throwsA('oh no'));
+    });
+
+    test('closes the underlying sink', () {
+      var transformed = controller.sink.rejectErrors();
+      transformed.addError('oh no');
+      transformed.done.catchError((_) {});
+
+      expect(controller.stream, emitsDone);
+    });
+
+    test('ignores further events', () async {
+      var transformed = controller.sink.rejectErrors();
+      transformed.addError('oh no');
+      transformed.done.catchError((_) {});
+      expect(controller.stream, emitsDone);
+
+      // Try adding events synchronously and asynchronously and verify that they
+      // don't throw and also aren't passed to the underlying sink.
+      transformed
+        ..add(1)
+        ..addError('another');
+      await pumpEventQueue();
+      transformed
+        ..add(2)
+        ..addError('yet another');
+    });
+
+    test('cancels the current subscription', () async {
+      var inputCanceled = false;
+      var inputController =
+          StreamController(onCancel: () => inputCanceled = true);
+
+      var transformed = controller.sink.rejectErrors()
+        ..addStream(inputController.stream);
+      inputController.addError('oh no');
+      transformed.done.catchError((_) {});
+
+      await pumpEventQueue();
+      expect(inputCanceled, isTrue);
+    });
+  });
+
+  group('when the inner sink\'s done future completes', () {
+    test('done completes', () async {
+      var completer = Completer();
+      var transformed = NullStreamSink(done: completer.future).rejectErrors();
+
+      var doneCompleted = false;
+      transformed.done.then((_) => doneCompleted = true);
+      await pumpEventQueue();
+      expect(doneCompleted, isFalse);
+
+      completer.complete();
+      await pumpEventQueue();
+      expect(doneCompleted, isTrue);
+    });
+
+    test('an outstanding addStream() completes', () async {
+      var completer = Completer();
+      var transformed = NullStreamSink(done: completer.future).rejectErrors();
+
+      var addStreamCompleted = false;
+      transformed
+          .addStream(StreamController().stream)
+          .then((_) => addStreamCompleted = true);
+      await pumpEventQueue();
+      expect(addStreamCompleted, isFalse);
+
+      completer.complete();
+      await pumpEventQueue();
+      expect(addStreamCompleted, isTrue);
+    });
+
+    test('an outstanding addStream()\'s subscription is cancelled', () async {
+      var completer = Completer();
+      var transformed = NullStreamSink(done: completer.future).rejectErrors();
+
+      var addStreamCancelled = false;
+      transformed.addStream(
+          StreamController(onCancel: () => addStreamCancelled = true).stream);
+      await pumpEventQueue();
+      expect(addStreamCancelled, isFalse);
+
+      completer.complete();
+      await pumpEventQueue();
+      expect(addStreamCancelled, isTrue);
+    });
+
+    test('forwards an outstanding addStream()\'s cancellation error', () async {
+      var completer = Completer();
+      var transformed = NullStreamSink(done: completer.future).rejectErrors();
+
+      expect(
+          transformed.addStream(
+              StreamController(onCancel: () => throw 'oh no').stream),
+          throwsA('oh no'));
+      completer.complete();
+    });
+
+    group('forwards its error', () {
+      test('through done', () async {
+        expect(NullStreamSink(done: Future.error('oh no')).rejectErrors().done,
+            throwsA('oh no'));
+      });
+
+      test('through close', () async {
+        expect(
+            NullStreamSink(done: Future.error('oh no')).rejectErrors().close(),
+            throwsA('oh no'));
+      });
+    });
+  });
+
+  group('after closing', () {
+    test('throws on add()', () {
+      var sink = controller.sink.rejectErrors()..close();
+      expect(() => sink.add(1), throwsStateError);
+    });
+
+    test('throws on addError()', () {
+      var sink = controller.sink.rejectErrors()..close();
+      expect(() => sink.addError('oh no'), throwsStateError);
+    });
+
+    test('throws on addStream()', () {
+      var sink = controller.sink.rejectErrors()..close();
+      expect(() => sink.addStream(Stream.empty()), throwsStateError);
+    });
+
+    test('allows close()', () {
+      var sink = controller.sink.rejectErrors()..close();
+      sink.close(); // Shouldn't throw
+    });
+  });
+
+  group('during an active addStream()', () {
+    test('throws on add()', () {
+      var sink = controller.sink.rejectErrors()
+        ..addStream(StreamController().stream);
+      expect(() => sink.add(1), throwsStateError);
+    });
+
+    test('throws on addError()', () {
+      var sink = controller.sink.rejectErrors()
+        ..addStream(StreamController().stream);
+      expect(() => sink.addError('oh no'), throwsStateError);
+    });
+
+    test('throws on addStream()', () {
+      var sink = controller.sink.rejectErrors()
+        ..addStream(StreamController().stream);
+      expect(() => sink.addStream(Stream.empty()), throwsStateError);
+    });
+
+    test('throws on close()', () {
+      var sink = controller.sink.rejectErrors()
+        ..addStream(StreamController().stream);
+      expect(() => sink.close(), throwsStateError);
+    });
+  });
+}