Add StreamGroup.onIdle and StreamGroup.isIdle (#164)

These match the corresponding APIs on FutureGroup.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 421db06..0b23130 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,6 +3,8 @@
 * Added `ChunkedStreamReader` for reading _chunked streams_ without managing
   buffers.
 
+* Add `StreamGroup.isIdle` and `StreamGroup.onIdle`.
+
 * Add `StreamGroup.isClosed` and `FutureGroup.isClosed` getters.
 
 ## 2.5.0
diff --git a/lib/src/future_group.dart b/lib/src/future_group.dart
index 1742d6a..c98fbc3 100644
--- a/lib/src/future_group.dart
+++ b/lib/src/future_group.dart
@@ -35,14 +35,28 @@
   Future<List<T>> get future => _completer.future;
   final _completer = Completer<List<T>>();
 
-  /// Whether this group has no pending futures.
+  /// Whether this group contains no futures.
+  ///
+  /// A [FutureGroup] is idle when it contains no futures, which is the case for
+  /// a newly created group or one where all added futures have been removed or
+  /// completed.
   bool get isIdle => _pending == 0;
 
-  /// A broadcast stream that emits a `null` event whenever the last pending
-  /// future in this group completes.
+  /// A broadcast stream that emits an event whenever this group becomes idle.
   ///
-  /// Once this group isn't waiting on any futures *and* [close] has been
-  /// called, this stream will close.
+  /// A [FutureGroup] is idle when it contains no futures, which is the case for
+  /// a newly created group or one where all added futures have been removed or
+  /// completed.
+  ///
+  /// This stream will close when this group is idle *and* [close] has been
+  /// called.
+  ///
+  /// Note that:
+  ///
+  /// * Events won't be emitted on this stream until [stream] has been listened
+  ///   to.
+  /// * Events are delivered asynchronously, so it's possible for the group to
+  ///   become active again before the event is delivered.
   Stream get onIdle =>
       (_onIdleController ??= StreamController.broadcast(sync: true)).stream;
 
diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart
index c864987..49baaad 100644
--- a/lib/src/stream_group.dart
+++ b/lib/src/stream_group.dart
@@ -41,6 +41,39 @@
   /// See [_StreamGroupState] for detailed descriptions of each state.
   var _state = _StreamGroupState.dormant;
 
+  /// Whether this group contains no streams.
+  ///
+  /// A [StreamGroup] is idle when it contains no streams, which is the case for
+  /// a newly created group or one where all added streams have been emitted
+  /// done events (or been [remove]d).
+  ///
+  /// If this is a single-subscription group, then cancelling the subscription
+  /// to [stream] will also remove all streams.
+  bool get isIdle => _subscriptions.isEmpty;
+
+  /// A broadcast stream that emits an event whenever this group becomes idle.
+  ///
+  /// A [StreamGroup] is idle when it contains no streams, which is the case for
+  /// a newly created group or one where all added streams have been emitted
+  /// done events (or been [remove]d).
+  ///
+  /// This stream will close when either:
+  ///
+  /// * This group is idle *and* [close] has been called, or
+  /// * [stream]'s subscription has been cancelled (if this is a
+  ///   single-subscription group).
+  ///
+  /// Note that:
+  ///
+  /// * Events won't be emitted on this stream until [stream] has been listened
+  ///   to.
+  /// * Events are delivered asynchronously, so it's possible for the group to
+  ///   become active again before the event is delivered.
+  Stream<void> get onIdle =>
+      (_onIdleController ??= StreamController.broadcast()).stream;
+
+  StreamController<Null>? _onIdleController;
+
   /// Streams that have been added to the group, and their subscriptions if they
   /// have been subscribed to.
   ///
@@ -135,7 +168,15 @@
   Future? remove(Stream<T> stream) {
     var subscription = _subscriptions.remove(stream);
     var future = subscription == null ? null : subscription.cancel();
-    if (_closed && _subscriptions.isEmpty) _controller.close();
+
+    if (_subscriptions.isEmpty) {
+      _onIdleController?.add(null);
+      if (_closed) {
+        _onIdleController?.close();
+        scheduleMicrotask(_controller.close);
+      }
+    }
+
     return future;
   }
 
@@ -180,6 +221,13 @@
         .toList();
 
     _subscriptions.clear();
+
+    var onIdleController = _onIdleController;
+    if (onIdleController != null && !onIdleController.isClosed) {
+      onIdleController.add(null);
+      onIdleController.close();
+    }
+
     return futures.isEmpty ? null : Future.wait(futures);
   }
 
diff --git a/pubspec.yaml b/pubspec.yaml
index 59ee5f2..5bfe9b1 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 2.6.0-dev
+version: 2.6.0
 
 description: Utility functions and classes related to the 'dart:async' library.
 repository: https://github.com/dart-lang/async
diff --git a/test/stream_group_test.dart b/test/stream_group_test.dart
index 43f9bfc..7ea93f8 100644
--- a/test/stream_group_test.dart
+++ b/test/stream_group_test.dart
@@ -736,6 +736,109 @@
       expect(events, equals(['one', 'two', 'three', 'four', 'five', 'six']));
     });
   });
+
+  group('onIdle', () {
+    test('emits an event when the last pending stream emits done', () async {
+      streamGroup.stream.listen(null);
+
+      var idle = false;
+      streamGroup.onIdle.listen((_) => idle = true);
+
+      var controller1 = StreamController<String>();
+      var controller2 = StreamController<String>();
+      var controller3 = StreamController<String>();
+
+      streamGroup.add(controller1.stream);
+      streamGroup.add(controller2.stream);
+      streamGroup.add(controller3.stream);
+
+      await flushMicrotasks();
+      expect(idle, isFalse);
+      expect(streamGroup.isIdle, isFalse);
+
+      controller1.close();
+      await flushMicrotasks();
+      expect(idle, isFalse);
+      expect(streamGroup.isIdle, isFalse);
+
+      controller2.close();
+      await flushMicrotasks();
+      expect(idle, isFalse);
+      expect(streamGroup.isIdle, isFalse);
+
+      controller3.close();
+      await flushMicrotasks();
+      expect(idle, isTrue);
+      expect(streamGroup.isIdle, isTrue);
+    });
+
+    test('emits an event each time it becomes idle', () async {
+      streamGroup.stream.listen(null);
+
+      var idle = false;
+      streamGroup.onIdle.listen((_) => idle = true);
+
+      var controller = StreamController<String>();
+      streamGroup.add(controller.stream);
+
+      controller.close();
+      await flushMicrotasks();
+      expect(idle, isTrue);
+      expect(streamGroup.isIdle, isTrue);
+
+      idle = false;
+      controller = StreamController<String>();
+      streamGroup.add(controller.stream);
+
+      await flushMicrotasks();
+      expect(idle, isFalse);
+      expect(streamGroup.isIdle, isFalse);
+
+      controller.close();
+      await flushMicrotasks();
+      expect(idle, isTrue);
+      expect(streamGroup.isIdle, isTrue);
+    });
+
+    test('emits an event when the group closes', () async {
+      // It's important that the order of events here stays consistent over
+      // time, since code may rely on it in subtle ways. Note that this is *not*
+      // an official guarantee, so the authors of `async` are free to change
+      // this behavior if they need to.
+      var idle = false;
+      var onIdleDone = false;
+      var streamClosed = false;
+
+      streamGroup.onIdle.listen(expectAsync1((_) {
+        expect(streamClosed, isFalse);
+        idle = true;
+      }), onDone: expectAsync0(() {
+        expect(idle, isTrue);
+        expect(streamClosed, isTrue);
+        onIdleDone = true;
+      }));
+
+      streamGroup.stream.drain().then(expectAsync1((_) {
+        expect(idle, isTrue);
+        expect(onIdleDone, isFalse);
+        streamClosed = true;
+      }));
+
+      var controller = StreamController<String>();
+      streamGroup.add(controller.stream);
+      streamGroup.close();
+
+      await flushMicrotasks();
+      expect(idle, isFalse);
+      expect(streamGroup.isIdle, isFalse);
+
+      controller.close();
+      await flushMicrotasks();
+      expect(idle, isTrue);
+      expect(streamGroup.isIdle, isTrue);
+      expect(streamClosed, isTrue);
+    });
+  });
 }
 
 /// Wait for all microtasks to complete.