Fix a StreamGroup bug when a component stream's listen() throws (#173)

This would put the StreamGroup into an inconsistent state where it
would believe itself to be active, but only some streams would have
subscriptions. This was exacerbated by dart-lang/sdk#45815, which
meant that even though _onListen threw an error a StreamSubscription
was created and returned, so further callbacks could still be called.

Now instead of going into an inconsistent state, the StreamGroup
simply cancels itself.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d5a22f3..6dff198 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 2.6.1
+
+* When `StreamGroup.stream.listen()` is called, gracefully handle component
+  streams throwing errors when their `Stream.listen()` methods are called.
+
 ## 2.6.0
 
 * Add a `StreamCloser` class, which is a `StreamTransformer` that allows the
diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart
index 49baaad..6270d9a 100644
--- a/lib/src/stream_group.dart
+++ b/lib/src/stream_group.dart
@@ -4,6 +4,8 @@
 
 import 'dart:async';
 
+import 'package:collection/collection.dart';
+
 /// A collection of streams whose events are unified and sent through a central
 /// stream.
 ///
@@ -185,13 +187,24 @@
   /// This is called for both single-subscription and broadcast groups.
   void _onListen() {
     _state = _StreamGroupState.listening;
-    _subscriptions.forEach((stream, subscription) {
+
+    for (var entry in _subscriptions.entries.toList()) {
       // If this is a broadcast group and this isn't the first time it's been
       // listened to, there may still be some subscriptions to
       // single-subscription streams.
-      if (subscription != null) return;
-      _subscriptions[stream] = _listenToStream(stream);
-    });
+      if (entry.value != null) return;
+
+      var stream = entry.key;
+      try {
+        _subscriptions[stream] = _listenToStream(stream);
+      } catch (error) {
+        // If [Stream.listen] throws a synchronous error (for example because
+        // the stream has already been listened to), cancel all subscriptions
+        // and rethrow the error.
+        _onCancel()?.catchError((_) {});
+        rethrow;
+      }
+    }
   }
 
   /// A callback called when [stream] is paused.
@@ -216,8 +229,17 @@
   Future? _onCancel() {
     _state = _StreamGroupState.canceled;
 
-    var futures = _subscriptions.values
-        .map((subscription) => subscription!.cancel())
+    var futures = _subscriptions.entries
+        .map((entry) {
+          var subscription = entry.value;
+          if (subscription != null) return subscription.cancel();
+          try {
+            return entry.key.listen(null).cancel();
+          } catch (_) {
+            return null;
+          }
+        })
+        .whereNotNull()
         .toList();
 
     _subscriptions.clear();
diff --git a/pubspec.yaml b/pubspec.yaml
index d645c47..6d5a06d 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 2.6.0
+version: 2.6.1
 
 description: Utility functions and classes related to the 'dart:async' library.
 repository: https://github.com/dart-lang/async
diff --git a/test/stream_group_test.dart b/test/stream_group_test.dart
index 7ea93f8..11c66c1 100644
--- a/test/stream_group_test.dart
+++ b/test/stream_group_test.dart
@@ -246,6 +246,69 @@
         expect(fired, isTrue);
       });
     });
+
+    group('when listen() throws an error', () {
+      late Stream<String> alreadyListened;
+      setUp(() {
+        alreadyListened = Stream.value('foo')..listen(null);
+      });
+
+      group('listen()', () {
+        test('rethrows that error', () {
+          streamGroup.add(alreadyListened);
+
+          // We can't use expect(..., throwsStateError) here bceause of
+          // dart-lang/sdk#45815.
+          runZonedGuarded(
+              () => streamGroup.stream.listen(expectAsync1((_) {}, count: 0)),
+              expectAsync2((error, _) => expect(error, isStateError)));
+        });
+
+        test('cancels other subscriptions', () async {
+          var firstCancelled = false;
+          var first =
+              StreamController<String>(onCancel: () => firstCancelled = true);
+          streamGroup.add(first.stream);
+
+          streamGroup.add(alreadyListened);
+
+          var lastCancelled = false;
+          var last =
+              StreamController<String>(onCancel: () => lastCancelled = true);
+          streamGroup.add(last.stream);
+
+          runZonedGuarded(() => streamGroup.stream.listen(null), (_, __) {});
+
+          expect(firstCancelled, isTrue);
+          expect(lastCancelled, isTrue);
+        });
+
+        // There really shouldn't even be a subscription here, but due to
+        // dart-lang/sdk#45815 there is.
+        group('canceling the subscription is a no-op', () {
+          test('synchronously', () {
+            streamGroup.add(alreadyListened);
+
+            var subscription = runZonedGuarded(
+                () => streamGroup.stream.listen(null),
+                expectAsync2((_, __) {}, count: 1));
+
+            expect(subscription!.cancel(), completes);
+          });
+
+          test('asynchronously', () async {
+            streamGroup.add(alreadyListened);
+
+            var subscription = runZonedGuarded(
+                () => streamGroup.stream.listen(null),
+                expectAsync2((_, __) {}, count: 1));
+
+            await pumpEventQueue();
+            expect(subscription!.cancel(), completes);
+          });
+        });
+      });
+    });
   });
 
   group('broadcast', () {