Add StreamGroup.onIdle and StreamGroup.isIdle (#164)
These match the corresponding APIs on FutureGroup.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 421db06..0b23130 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,6 +3,8 @@
* Added `ChunkedStreamReader` for reading _chunked streams_ without managing
buffers.
+* Add `StreamGroup.isIdle` and `StreamGroup.onIdle`.
+
* Add `StreamGroup.isClosed` and `FutureGroup.isClosed` getters.
## 2.5.0
diff --git a/lib/src/future_group.dart b/lib/src/future_group.dart
index 1742d6a..c98fbc3 100644
--- a/lib/src/future_group.dart
+++ b/lib/src/future_group.dart
@@ -35,14 +35,28 @@
Future<List<T>> get future => _completer.future;
final _completer = Completer<List<T>>();
- /// Whether this group has no pending futures.
+ /// Whether this group contains no futures.
+ ///
+ /// A [FutureGroup] is idle when it contains no futures, which is the case for
+ /// a newly created group or one where all added futures have been removed or
+ /// completed.
bool get isIdle => _pending == 0;
- /// A broadcast stream that emits a `null` event whenever the last pending
- /// future in this group completes.
+ /// A broadcast stream that emits an event whenever this group becomes idle.
///
- /// Once this group isn't waiting on any futures *and* [close] has been
- /// called, this stream will close.
+ /// A [FutureGroup] is idle when it contains no futures, which is the case for
+ /// a newly created group or one where all added futures have been removed or
+ /// completed.
+ ///
+ /// This stream will close when this group is idle *and* [close] has been
+ /// called.
+ ///
+ /// Note that:
+ ///
+ /// * Events won't be emitted on this stream until [stream] has been listened
+ /// to.
+ /// * Events are delivered asynchronously, so it's possible for the group to
+ /// become active again before the event is delivered.
Stream get onIdle =>
(_onIdleController ??= StreamController.broadcast(sync: true)).stream;
diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart
index c864987..49baaad 100644
--- a/lib/src/stream_group.dart
+++ b/lib/src/stream_group.dart
@@ -41,6 +41,39 @@
/// See [_StreamGroupState] for detailed descriptions of each state.
var _state = _StreamGroupState.dormant;
+ /// Whether this group contains no streams.
+ ///
+ /// A [StreamGroup] is idle when it contains no streams, which is the case for
+ /// a newly created group or one where all added streams have been emitted
+ /// done events (or been [remove]d).
+ ///
+ /// If this is a single-subscription group, then cancelling the subscription
+ /// to [stream] will also remove all streams.
+ bool get isIdle => _subscriptions.isEmpty;
+
+ /// A broadcast stream that emits an event whenever this group becomes idle.
+ ///
+ /// A [StreamGroup] is idle when it contains no streams, which is the case for
+ /// a newly created group or one where all added streams have been emitted
+ /// done events (or been [remove]d).
+ ///
+ /// This stream will close when either:
+ ///
+ /// * This group is idle *and* [close] has been called, or
+ /// * [stream]'s subscription has been cancelled (if this is a
+ /// single-subscription group).
+ ///
+ /// Note that:
+ ///
+ /// * Events won't be emitted on this stream until [stream] has been listened
+ /// to.
+ /// * Events are delivered asynchronously, so it's possible for the group to
+ /// become active again before the event is delivered.
+ Stream<void> get onIdle =>
+ (_onIdleController ??= StreamController.broadcast()).stream;
+
+ StreamController<Null>? _onIdleController;
+
/// Streams that have been added to the group, and their subscriptions if they
/// have been subscribed to.
///
@@ -135,7 +168,15 @@
Future? remove(Stream<T> stream) {
var subscription = _subscriptions.remove(stream);
var future = subscription == null ? null : subscription.cancel();
- if (_closed && _subscriptions.isEmpty) _controller.close();
+
+ if (_subscriptions.isEmpty) {
+ _onIdleController?.add(null);
+ if (_closed) {
+ _onIdleController?.close();
+ scheduleMicrotask(_controller.close);
+ }
+ }
+
return future;
}
@@ -180,6 +221,13 @@
.toList();
_subscriptions.clear();
+
+ var onIdleController = _onIdleController;
+ if (onIdleController != null && !onIdleController.isClosed) {
+ onIdleController.add(null);
+ onIdleController.close();
+ }
+
return futures.isEmpty ? null : Future.wait(futures);
}
diff --git a/pubspec.yaml b/pubspec.yaml
index 59ee5f2..5bfe9b1 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: async
-version: 2.6.0-dev
+version: 2.6.0
description: Utility functions and classes related to the 'dart:async' library.
repository: https://github.com/dart-lang/async
diff --git a/test/stream_group_test.dart b/test/stream_group_test.dart
index 43f9bfc..7ea93f8 100644
--- a/test/stream_group_test.dart
+++ b/test/stream_group_test.dart
@@ -736,6 +736,109 @@
expect(events, equals(['one', 'two', 'three', 'four', 'five', 'six']));
});
});
+
+ group('onIdle', () {
+ test('emits an event when the last pending stream emits done', () async {
+ streamGroup.stream.listen(null);
+
+ var idle = false;
+ streamGroup.onIdle.listen((_) => idle = true);
+
+ var controller1 = StreamController<String>();
+ var controller2 = StreamController<String>();
+ var controller3 = StreamController<String>();
+
+ streamGroup.add(controller1.stream);
+ streamGroup.add(controller2.stream);
+ streamGroup.add(controller3.stream);
+
+ await flushMicrotasks();
+ expect(idle, isFalse);
+ expect(streamGroup.isIdle, isFalse);
+
+ controller1.close();
+ await flushMicrotasks();
+ expect(idle, isFalse);
+ expect(streamGroup.isIdle, isFalse);
+
+ controller2.close();
+ await flushMicrotasks();
+ expect(idle, isFalse);
+ expect(streamGroup.isIdle, isFalse);
+
+ controller3.close();
+ await flushMicrotasks();
+ expect(idle, isTrue);
+ expect(streamGroup.isIdle, isTrue);
+ });
+
+ test('emits an event each time it becomes idle', () async {
+ streamGroup.stream.listen(null);
+
+ var idle = false;
+ streamGroup.onIdle.listen((_) => idle = true);
+
+ var controller = StreamController<String>();
+ streamGroup.add(controller.stream);
+
+ controller.close();
+ await flushMicrotasks();
+ expect(idle, isTrue);
+ expect(streamGroup.isIdle, isTrue);
+
+ idle = false;
+ controller = StreamController<String>();
+ streamGroup.add(controller.stream);
+
+ await flushMicrotasks();
+ expect(idle, isFalse);
+ expect(streamGroup.isIdle, isFalse);
+
+ controller.close();
+ await flushMicrotasks();
+ expect(idle, isTrue);
+ expect(streamGroup.isIdle, isTrue);
+ });
+
+ test('emits an event when the group closes', () async {
+ // It's important that the order of events here stays consistent over
+ // time, since code may rely on it in subtle ways. Note that this is *not*
+ // an official guarantee, so the authors of `async` are free to change
+ // this behavior if they need to.
+ var idle = false;
+ var onIdleDone = false;
+ var streamClosed = false;
+
+ streamGroup.onIdle.listen(expectAsync1((_) {
+ expect(streamClosed, isFalse);
+ idle = true;
+ }), onDone: expectAsync0(() {
+ expect(idle, isTrue);
+ expect(streamClosed, isTrue);
+ onIdleDone = true;
+ }));
+
+ streamGroup.stream.drain().then(expectAsync1((_) {
+ expect(idle, isTrue);
+ expect(onIdleDone, isFalse);
+ streamClosed = true;
+ }));
+
+ var controller = StreamController<String>();
+ streamGroup.add(controller.stream);
+ streamGroup.close();
+
+ await flushMicrotasks();
+ expect(idle, isFalse);
+ expect(streamGroup.isIdle, isFalse);
+
+ controller.close();
+ await flushMicrotasks();
+ expect(idle, isTrue);
+ expect(streamGroup.isIdle, isTrue);
+ expect(streamClosed, isTrue);
+ });
+ });
}
/// Wait for all microtasks to complete.