Add a StreamCloser transformer (#166)

This is essentially the Stream equivalent of the Disconnector class in
stream_channel.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0b23130..a0f15c1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,8 @@
 ## 2.6.0
 
+* Add a `StreamCloser` class, which is a `StreamTransformer` that allows the
+  caller to force the stream to emit a done event.
+
 * Added `ChunkedStreamReader` for reading _chunked streams_ without managing
   buffers.
 
diff --git a/lib/async.dart b/lib/async.dart
index 611d137..7b9d942 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -27,6 +27,7 @@
 export 'src/result/future.dart';
 export 'src/result/value.dart';
 export 'src/single_subscription_transformer.dart';
+export 'src/stream_closer.dart';
 export 'src/stream_completer.dart';
 export 'src/stream_group.dart';
 export 'src/stream_queue.dart';
diff --git a/lib/src/stream_closer.dart b/lib/src/stream_closer.dart
new file mode 100644
index 0000000..9154624
--- /dev/null
+++ b/lib/src/stream_closer.dart
@@ -0,0 +1,108 @@
+// Copyright (c) 2021, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:meta/meta.dart';
+
+/// A [StreamTransformer] that allows the caller to forcibly close the
+/// transformed [Stream](s).
+///
+/// When [close] is called, any stream (or streams) transformed by this
+/// transformer that haven't already completed or been cancelled will emit a
+/// done event and cancel their underlying subscriptions.
+///
+/// Note that unlike most [StreamTransformer]s, each instance of [StreamCloser]
+/// has its own state (whether or not it's been closed), so it's a good idea to
+/// construct a new one for each use unless you need to close multiple streams
+/// at the same time.
+@sealed
+class StreamCloser<T> extends StreamTransformerBase<T, T> {
+  /// The subscriptions to streams passed to [bind].
+  final _subscriptions = <StreamSubscription<T>>{};
+
+  /// The controllers for streams returned by [bind].
+  final _controllers = <StreamController<T>>{};
+
+  /// Closes all transformed streams.
+  ///
+  /// Returns a future that completes when all inner subscriptions'
+  /// [StreamSubscription.cancel] futures have completed. Note that a stream's
+  /// subscription won't be canceled until the transformed stream has a
+  /// listener.
+  ///
+  /// If a transformed stream is listened to after [close] is called, the
+  /// original stream will be listened to and then the subscription immediately
+  /// canceled. If that cancellation throws an error, it will be silently
+  /// ignored.
+  Future<void> close() => _closeFuture ??= () {
+        var futures = [
+          for (var subscription in _subscriptions) subscription.cancel()
+        ];
+        _subscriptions.clear();
+
+        var controllers = _controllers.toList();
+        _controllers.clear();
+        scheduleMicrotask(() {
+          for (var controller in controllers) {
+            scheduleMicrotask(controller.close);
+          }
+        });
+
+        return Future.wait(futures, eagerError: true);
+      }();
+  Future<void>? _closeFuture;
+
+  /// Whether [close] has been called.
+  bool get isClosed => _closeFuture != null;
+
+  @override
+  Stream<T> bind(Stream<T> stream) {
+    var controller = stream.isBroadcast
+        ? StreamController<T>.broadcast(sync: true)
+        : StreamController<T>(sync: true);
+
+    controller.onListen = () {
+      if (isClosed) {
+        // Ignore errors here, because otherwise there would be no way for the
+        // user to handle them gracefully.
+        stream.listen(null).cancel().catchError((_) {});
+        return;
+      }
+
+      var subscription =
+          stream.listen(controller.add, onError: controller.addError);
+      subscription.onDone(() {
+        _subscriptions.remove(subscription);
+        _controllers.remove(controller);
+        controller.close();
+      });
+      _subscriptions.add(subscription);
+
+      if (!stream.isBroadcast) {
+        controller.onPause = subscription.pause;
+        controller.onResume = subscription.resume;
+      }
+
+      controller.onCancel = () {
+        _controllers.remove(controller);
+
+        // If the subscription has already been removed, that indicates that the
+        // underlying stream has been cancelled by [close] and its cancellation
+        // future has been handled there. In that case, we shouldn't forward it
+        // here as well.
+        if (_subscriptions.remove(subscription)) return subscription.cancel();
+        return null;
+      };
+    };
+
+    if (isClosed) {
+      controller.close();
+    } else {
+      _controllers.add(controller);
+    }
+
+    return controller.stream;
+  }
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index 5bfe9b1..d645c47 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -8,6 +8,7 @@
   sdk: ">=2.12.0 <3.0.0"
 
 dependencies:
+  meta: ^1.1.7
   collection: ^1.15.0
 
 dev_dependencies:
diff --git a/test/stream_closer_test.dart b/test/stream_closer_test.dart
new file mode 100644
index 0000000..28ab970
--- /dev/null
+++ b/test/stream_closer_test.dart
@@ -0,0 +1,208 @@
+// Copyright (c) 2021, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+  late StreamCloser<int> closer;
+  setUp(() {
+    closer = StreamCloser();
+  });
+
+  group('when the closer is never closed', () {
+    test('forwards data and done events', () {
+      expect(
+          createStream().transform(closer).toList(), completion([1, 2, 3, 4]));
+    });
+
+    test('forwards error events', () {
+      expect(Stream<int>.error('oh no').transform(closer).toList(),
+          throwsA('oh no'));
+    });
+
+    test('transforms a broadcast stream into a broadcast stream', () {
+      expect(Stream<int>.empty().transform(closer).isBroadcast, isTrue);
+    });
+
+    test("doesn't eagerly listen", () {
+      var controller = StreamController<int>();
+      var transformed = controller.stream.transform(closer);
+      expect(controller.hasListener, isFalse);
+
+      transformed.listen(null);
+      expect(controller.hasListener, isTrue);
+    });
+
+    test('forwards pause and resume', () {
+      var controller = StreamController<int>();
+      var transformed = controller.stream.transform(closer);
+
+      var subscription = transformed.listen(null);
+      expect(controller.isPaused, isFalse);
+      subscription.pause();
+      expect(controller.isPaused, isTrue);
+      subscription.resume();
+      expect(controller.isPaused, isFalse);
+    });
+
+    test('forwards cancel', () {
+      var isCancelled = false;
+      var controller =
+          StreamController<int>(onCancel: () => isCancelled = true);
+      var transformed = controller.stream.transform(closer);
+
+      expect(isCancelled, isFalse);
+      var subscription = transformed.listen(null);
+      expect(isCancelled, isFalse);
+      subscription.cancel();
+      expect(isCancelled, isTrue);
+    });
+
+    test('forwards errors from cancel', () {
+      var controller = StreamController<int>(onCancel: () => throw 'oh no');
+
+      expect(controller.stream.transform(closer).listen(null).cancel(),
+          throwsA('oh no'));
+    });
+  });
+
+  group('when a stream is added before the closer is closed', () {
+    test('the stream emits a close event once the closer is closed', () async {
+      var queue = StreamQueue(createStream().transform(closer));
+      await expectLater(queue, emits(1));
+      await expectLater(queue, emits(2));
+      expect(closer.close(), completes);
+      expect(queue, emitsDone);
+    });
+
+    test('the inner subscription is canceled once the closer is closed', () {
+      var isCancelled = false;
+      var controller =
+          StreamController<int>(onCancel: () => isCancelled = true);
+
+      expect(controller.stream.transform(closer), emitsDone);
+      expect(closer.close(), completes);
+      expect(isCancelled, isTrue);
+    });
+
+    test('closer.close() forwards errors from StreamSubscription.cancel()', () {
+      var controller = StreamController<int>(onCancel: () => throw 'oh no');
+
+      expect(controller.stream.transform(closer), emitsDone);
+      expect(closer.close(), throwsA('oh no'));
+    });
+
+    test('closer.close() works even if a stream has already completed',
+        () async {
+      expect(await createStream().transform(closer).toList(),
+          equals([1, 2, 3, 4]));
+      expect(closer.close(), completes);
+    });
+
+    test('closer.close() works even if a stream has already been canceled',
+        () async {
+      createStream().transform(closer).listen(null).cancel();
+      expect(closer.close(), completes);
+    });
+
+    group('but listened afterwards', () {
+      test('the output stream immediately emits done', () {
+        var stream = createStream().transform(closer);
+        expect(closer.close(), completes);
+        expect(stream, emitsDone);
+      });
+
+      test(
+          'the underlying subscription is never listened if the stream is '
+          'never listened', () async {
+        var controller =
+            StreamController<int>(onListen: expectAsync0(() {}, count: 0));
+        controller.stream.transform(closer);
+
+        expect(closer.close(), completes);
+
+        await pumpEventQueue();
+      });
+
+      test(
+          'the underlying subscription is listened and then canceled once the '
+          'stream is listened', () {
+        var controller = StreamController<int>(
+            onListen: expectAsync0(() {}), onCancel: expectAsync0(() {}));
+        var stream = controller.stream.transform(closer);
+
+        expect(closer.close(), completes);
+
+        stream.listen(null);
+      });
+
+      test('Subscription.cancel() errors are silently ignored', () async {
+        var controller =
+            StreamController<int>(onCancel: expectAsync0(() => throw 'oh no'));
+        var stream = controller.stream.transform(closer);
+
+        expect(closer.close(), completes);
+
+        stream.listen(null);
+        await pumpEventQueue();
+      });
+    });
+  });
+
+  group('when a stream is added after the closer is closed', () {
+    test('the output stream immediately emits done', () {
+      expect(closer.close(), completes);
+      expect(createStream().transform(closer), emitsDone);
+    });
+
+    test(
+        'the underlying subscription is never listened if the stream is never '
+        'listened', () async {
+      expect(closer.close(), completes);
+
+      var controller =
+          StreamController<int>(onListen: expectAsync0(() {}, count: 0));
+      controller.stream.transform(closer);
+
+      await pumpEventQueue();
+    });
+
+    test(
+        'the underlying subscription is listened and then canceled once the '
+        'stream is listened', () {
+      expect(closer.close(), completes);
+
+      var controller = StreamController<int>(
+          onListen: expectAsync0(() {}), onCancel: expectAsync0(() {}));
+
+      controller.stream.transform(closer).listen(null);
+    });
+
+    test('Subscription.cancel() errors are silently ignored', () async {
+      expect(closer.close(), completes);
+
+      var controller =
+          StreamController<int>(onCancel: expectAsync0(() => throw 'oh no'));
+
+      controller.stream.transform(closer).listen(null);
+
+      await pumpEventQueue();
+    });
+  });
+}
+
+Stream<int> createStream() async* {
+  yield 1;
+  await flushMicrotasks();
+  yield 2;
+  await flushMicrotasks();
+  yield 3;
+  await flushMicrotasks();
+  yield 4;
+}