Add a StreamCloser transformer (#166)
This is essentially the Stream equivalent of the Disconnector class in
stream_channel.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0b23130..a0f15c1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,8 @@
## 2.6.0
+* Add a `StreamCloser` class, which is a `StreamTransformer` that allows the
+ caller to force the stream to emit a done event.
+
* Added `ChunkedStreamReader` for reading _chunked streams_ without managing
buffers.
diff --git a/lib/async.dart b/lib/async.dart
index 611d137..7b9d942 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -27,6 +27,7 @@
export 'src/result/future.dart';
export 'src/result/value.dart';
export 'src/single_subscription_transformer.dart';
+export 'src/stream_closer.dart';
export 'src/stream_completer.dart';
export 'src/stream_group.dart';
export 'src/stream_queue.dart';
diff --git a/lib/src/stream_closer.dart b/lib/src/stream_closer.dart
new file mode 100644
index 0000000..9154624
--- /dev/null
+++ b/lib/src/stream_closer.dart
@@ -0,0 +1,108 @@
+// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:meta/meta.dart';
+
+/// A [StreamTransformer] that allows the caller to forcibly close the
+/// transformed [Stream](s).
+///
+/// When [close] is called, any stream (or streams) transformed by this
+/// transformer that haven't already completed or been cancelled will emit a
+/// done event and cancel their underlying subscriptions.
+///
+/// Note that unlike most [StreamTransformer]s, each instance of [StreamCloser]
+/// has its own state (whether or not it's been closed), so it's a good idea to
+/// construct a new one for each use unless you need to close multiple streams
+/// at the same time.
+@sealed
+class StreamCloser<T> extends StreamTransformerBase<T, T> {
+ /// The subscriptions to streams passed to [bind].
+ final _subscriptions = <StreamSubscription<T>>{};
+
+ /// The controllers for streams returned by [bind].
+ final _controllers = <StreamController<T>>{};
+
+ /// Closes all transformed streams.
+ ///
+ /// Returns a future that completes when all inner subscriptions'
+ /// [StreamSubscription.cancel] futures have completed. Note that a stream's
+ /// subscription won't be canceled until the transformed stream has a
+ /// listener.
+ ///
+ /// If a transformed stream is listened to after [close] is called, the
+ /// original stream will be listened to and then the subscription immediately
+ /// canceled. If that cancellation throws an error, it will be silently
+ /// ignored.
+ Future<void> close() => _closeFuture ??= () {
+ var futures = [
+ for (var subscription in _subscriptions) subscription.cancel()
+ ];
+ _subscriptions.clear();
+
+ var controllers = _controllers.toList();
+ _controllers.clear();
+ scheduleMicrotask(() {
+ for (var controller in controllers) {
+ scheduleMicrotask(controller.close);
+ }
+ });
+
+ return Future.wait(futures, eagerError: true);
+ }();
+ Future<void>? _closeFuture;
+
+ /// Whether [close] has been called.
+ bool get isClosed => _closeFuture != null;
+
+ @override
+ Stream<T> bind(Stream<T> stream) {
+ var controller = stream.isBroadcast
+ ? StreamController<T>.broadcast(sync: true)
+ : StreamController<T>(sync: true);
+
+ controller.onListen = () {
+ if (isClosed) {
+ // Ignore errors here, because otherwise there would be no way for the
+ // user to handle them gracefully.
+ stream.listen(null).cancel().catchError((_) {});
+ return;
+ }
+
+ var subscription =
+ stream.listen(controller.add, onError: controller.addError);
+ subscription.onDone(() {
+ _subscriptions.remove(subscription);
+ _controllers.remove(controller);
+ controller.close();
+ });
+ _subscriptions.add(subscription);
+
+ if (!stream.isBroadcast) {
+ controller.onPause = subscription.pause;
+ controller.onResume = subscription.resume;
+ }
+
+ controller.onCancel = () {
+ _controllers.remove(controller);
+
+ // If the subscription has already been removed, that indicates that the
+ // underlying stream has been cancelled by [close] and its cancellation
+ // future has been handled there. In that case, we shouldn't forward it
+ // here as well.
+ if (_subscriptions.remove(subscription)) return subscription.cancel();
+ return null;
+ };
+ };
+
+ if (isClosed) {
+ controller.close();
+ } else {
+ _controllers.add(controller);
+ }
+
+ return controller.stream;
+ }
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index 5bfe9b1..d645c47 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -8,6 +8,7 @@
sdk: ">=2.12.0 <3.0.0"
dependencies:
+ meta: ^1.1.7
collection: ^1.15.0
dev_dependencies:
diff --git a/test/stream_closer_test.dart b/test/stream_closer_test.dart
new file mode 100644
index 0000000..28ab970
--- /dev/null
+++ b/test/stream_closer_test.dart
@@ -0,0 +1,208 @@
+// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:test/test.dart';
+
+import 'utils.dart';
+
+void main() {
+ late StreamCloser<int> closer;
+ setUp(() {
+ closer = StreamCloser();
+ });
+
+ group('when the closer is never closed', () {
+ test('forwards data and done events', () {
+ expect(
+ createStream().transform(closer).toList(), completion([1, 2, 3, 4]));
+ });
+
+ test('forwards error events', () {
+ expect(Stream<int>.error('oh no').transform(closer).toList(),
+ throwsA('oh no'));
+ });
+
+ test('transforms a broadcast stream into a broadcast stream', () {
+ expect(Stream<int>.empty().transform(closer).isBroadcast, isTrue);
+ });
+
+ test("doesn't eagerly listen", () {
+ var controller = StreamController<int>();
+ var transformed = controller.stream.transform(closer);
+ expect(controller.hasListener, isFalse);
+
+ transformed.listen(null);
+ expect(controller.hasListener, isTrue);
+ });
+
+ test('forwards pause and resume', () {
+ var controller = StreamController<int>();
+ var transformed = controller.stream.transform(closer);
+
+ var subscription = transformed.listen(null);
+ expect(controller.isPaused, isFalse);
+ subscription.pause();
+ expect(controller.isPaused, isTrue);
+ subscription.resume();
+ expect(controller.isPaused, isFalse);
+ });
+
+ test('forwards cancel', () {
+ var isCancelled = false;
+ var controller =
+ StreamController<int>(onCancel: () => isCancelled = true);
+ var transformed = controller.stream.transform(closer);
+
+ expect(isCancelled, isFalse);
+ var subscription = transformed.listen(null);
+ expect(isCancelled, isFalse);
+ subscription.cancel();
+ expect(isCancelled, isTrue);
+ });
+
+ test('forwards errors from cancel', () {
+ var controller = StreamController<int>(onCancel: () => throw 'oh no');
+
+ expect(controller.stream.transform(closer).listen(null).cancel(),
+ throwsA('oh no'));
+ });
+ });
+
+ group('when a stream is added before the closer is closed', () {
+ test('the stream emits a close event once the closer is closed', () async {
+ var queue = StreamQueue(createStream().transform(closer));
+ await expectLater(queue, emits(1));
+ await expectLater(queue, emits(2));
+ expect(closer.close(), completes);
+ expect(queue, emitsDone);
+ });
+
+ test('the inner subscription is canceled once the closer is closed', () {
+ var isCancelled = false;
+ var controller =
+ StreamController<int>(onCancel: () => isCancelled = true);
+
+ expect(controller.stream.transform(closer), emitsDone);
+ expect(closer.close(), completes);
+ expect(isCancelled, isTrue);
+ });
+
+ test('closer.close() forwards errors from StreamSubscription.cancel()', () {
+ var controller = StreamController<int>(onCancel: () => throw 'oh no');
+
+ expect(controller.stream.transform(closer), emitsDone);
+ expect(closer.close(), throwsA('oh no'));
+ });
+
+ test('closer.close() works even if a stream has already completed',
+ () async {
+ expect(await createStream().transform(closer).toList(),
+ equals([1, 2, 3, 4]));
+ expect(closer.close(), completes);
+ });
+
+ test('closer.close() works even if a stream has already been canceled',
+ () async {
+ createStream().transform(closer).listen(null).cancel();
+ expect(closer.close(), completes);
+ });
+
+ group('but listened afterwards', () {
+ test('the output stream immediately emits done', () {
+ var stream = createStream().transform(closer);
+ expect(closer.close(), completes);
+ expect(stream, emitsDone);
+ });
+
+ test(
+ 'the underlying subscription is never listened if the stream is '
+ 'never listened', () async {
+ var controller =
+ StreamController<int>(onListen: expectAsync0(() {}, count: 0));
+ controller.stream.transform(closer);
+
+ expect(closer.close(), completes);
+
+ await pumpEventQueue();
+ });
+
+ test(
+ 'the underlying subscription is listened and then canceled once the '
+ 'stream is listened', () {
+ var controller = StreamController<int>(
+ onListen: expectAsync0(() {}), onCancel: expectAsync0(() {}));
+ var stream = controller.stream.transform(closer);
+
+ expect(closer.close(), completes);
+
+ stream.listen(null);
+ });
+
+ test('Subscription.cancel() errors are silently ignored', () async {
+ var controller =
+ StreamController<int>(onCancel: expectAsync0(() => throw 'oh no'));
+ var stream = controller.stream.transform(closer);
+
+ expect(closer.close(), completes);
+
+ stream.listen(null);
+ await pumpEventQueue();
+ });
+ });
+ });
+
+ group('when a stream is added after the closer is closed', () {
+ test('the output stream immediately emits done', () {
+ expect(closer.close(), completes);
+ expect(createStream().transform(closer), emitsDone);
+ });
+
+ test(
+ 'the underlying subscription is never listened if the stream is never '
+ 'listened', () async {
+ expect(closer.close(), completes);
+
+ var controller =
+ StreamController<int>(onListen: expectAsync0(() {}, count: 0));
+ controller.stream.transform(closer);
+
+ await pumpEventQueue();
+ });
+
+ test(
+ 'the underlying subscription is listened and then canceled once the '
+ 'stream is listened', () {
+ expect(closer.close(), completes);
+
+ var controller = StreamController<int>(
+ onListen: expectAsync0(() {}), onCancel: expectAsync0(() {}));
+
+ controller.stream.transform(closer).listen(null);
+ });
+
+ test('Subscription.cancel() errors are silently ignored', () async {
+ expect(closer.close(), completes);
+
+ var controller =
+ StreamController<int>(onCancel: expectAsync0(() => throw 'oh no'));
+
+ controller.stream.transform(closer).listen(null);
+
+ await pumpEventQueue();
+ });
+ });
+}
+
+Stream<int> createStream() async* {
+ yield 1;
+ await flushMicrotasks();
+ yield 2;
+ await flushMicrotasks();
+ yield 3;
+ await flushMicrotasks();
+ yield 4;
+}