Avoid missing streams when relistening to broadcast StreamGroup (#184)
In a broadcast `StreamGroup` when all listeners have canceled the inner
subscriptions are kept only for single subscriber streams. When a new
listener is added and subscriptions need to be recreated for inner
broadcast streams the presence of a single subscriber subscription in
the collection of subscriptions would cause the listen to bail out early
and fail to subscribe to later broadcast streams. Change `return` to
`continue` to keep going with the rest of the streams.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 102ad59..2c5a3eb 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,9 @@
easier to implement custom sinks.
* Improve performance for `ChunkedStreamReader` by creating fewer internal
sublists and specializing to create views for `Uint8List` chunks.
+* Don't ignore broadcast streams added to a `StreamGroup` that doesn't have an
+ active listener but previously had listeners and contains a single
+ subscription inner stream.
## 2.7.0
diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart
index f7819bf..c8414f4 100644
--- a/lib/src/stream_group.dart
+++ b/lib/src/stream_group.dart
@@ -192,7 +192,7 @@
// If this is a broadcast group and this isn't the first time it's been
// listened to, there may still be some subscriptions to
// single-subscription streams.
- if (entry.value != null) return;
+ if (entry.value != null) continue;
var stream = entry.key;
try {
diff --git a/test/stream_group_test.dart b/test/stream_group_test.dart
index 11c66c1..256056e 100644
--- a/test/stream_group_test.dart
+++ b/test/stream_group_test.dart
@@ -417,6 +417,24 @@
expect(controller.hasListener, isTrue);
});
+ test(
+ 'listens on streams that follow single-subscription streams when '
+ 'relistening after a cancel', () async {
+ var controller1 = StreamController<String>();
+ streamGroup.add(controller1.stream);
+ streamGroup.stream.listen(null).cancel();
+
+ var controller2 = StreamController<String>();
+ streamGroup.add(controller2.stream);
+
+ var emitted = <String>[];
+ streamGroup.stream.listen(emitted.add);
+ controller1.add('one');
+ controller2.add('two');
+ await flushMicrotasks();
+ expect(emitted, ['one', 'two']);
+ });
+
test('never cancels single-subscription streams', () async {
var subscription = streamGroup.stream.listen(null);