Fix a StreamGroup bug when a component stream's listen() throws (#173)
This would put the StreamGroup into an inconsistent state where it
would believe itself to be active, but only some streams would have
subscriptions. This was exacerbated by dart-lang/sdk#45815, which
meant that even though _onListen threw an error a StreamSubscription
was created and returned, so further callbacks could still be called.
Now instead of going into an inconsistent state, the StreamGroup
simply cancels itself.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d5a22f3..6dff198 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 2.6.1
+
+* When `StreamGroup.stream.listen()` is called, gracefully handle component
+ streams throwing errors when their `Stream.listen()` methods are called.
+
## 2.6.0
* Add a `StreamCloser` class, which is a `StreamTransformer` that allows the
diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart
index 49baaad..6270d9a 100644
--- a/lib/src/stream_group.dart
+++ b/lib/src/stream_group.dart
@@ -4,6 +4,8 @@
import 'dart:async';
+import 'package:collection/collection.dart';
+
/// A collection of streams whose events are unified and sent through a central
/// stream.
///
@@ -185,13 +187,24 @@
/// This is called for both single-subscription and broadcast groups.
void _onListen() {
_state = _StreamGroupState.listening;
- _subscriptions.forEach((stream, subscription) {
+
+ for (var entry in _subscriptions.entries.toList()) {
// If this is a broadcast group and this isn't the first time it's been
// listened to, there may still be some subscriptions to
// single-subscription streams.
- if (subscription != null) return;
- _subscriptions[stream] = _listenToStream(stream);
- });
+ if (entry.value != null) return;
+
+ var stream = entry.key;
+ try {
+ _subscriptions[stream] = _listenToStream(stream);
+ } catch (error) {
+ // If [Stream.listen] throws a synchronous error (for example because
+ // the stream has already been listened to), cancel all subscriptions
+ // and rethrow the error.
+ _onCancel()?.catchError((_) {});
+ rethrow;
+ }
+ }
}
/// A callback called when [stream] is paused.
@@ -216,8 +229,17 @@
Future? _onCancel() {
_state = _StreamGroupState.canceled;
- var futures = _subscriptions.values
- .map((subscription) => subscription!.cancel())
+ var futures = _subscriptions.entries
+ .map((entry) {
+ var subscription = entry.value;
+ if (subscription != null) return subscription.cancel();
+ try {
+ return entry.key.listen(null).cancel();
+ } catch (_) {
+ return null;
+ }
+ })
+ .whereNotNull()
.toList();
_subscriptions.clear();
diff --git a/pubspec.yaml b/pubspec.yaml
index d645c47..6d5a06d 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: async
-version: 2.6.0
+version: 2.6.1
description: Utility functions and classes related to the 'dart:async' library.
repository: https://github.com/dart-lang/async
diff --git a/test/stream_group_test.dart b/test/stream_group_test.dart
index 7ea93f8..11c66c1 100644
--- a/test/stream_group_test.dart
+++ b/test/stream_group_test.dart
@@ -246,6 +246,69 @@
expect(fired, isTrue);
});
});
+
+ group('when listen() throws an error', () {
+ late Stream<String> alreadyListened;
+ setUp(() {
+ alreadyListened = Stream.value('foo')..listen(null);
+ });
+
+ group('listen()', () {
+ test('rethrows that error', () {
+ streamGroup.add(alreadyListened);
+
+ // We can't use expect(..., throwsStateError) here bceause of
+ // dart-lang/sdk#45815.
+ runZonedGuarded(
+ () => streamGroup.stream.listen(expectAsync1((_) {}, count: 0)),
+ expectAsync2((error, _) => expect(error, isStateError)));
+ });
+
+ test('cancels other subscriptions', () async {
+ var firstCancelled = false;
+ var first =
+ StreamController<String>(onCancel: () => firstCancelled = true);
+ streamGroup.add(first.stream);
+
+ streamGroup.add(alreadyListened);
+
+ var lastCancelled = false;
+ var last =
+ StreamController<String>(onCancel: () => lastCancelled = true);
+ streamGroup.add(last.stream);
+
+ runZonedGuarded(() => streamGroup.stream.listen(null), (_, __) {});
+
+ expect(firstCancelled, isTrue);
+ expect(lastCancelled, isTrue);
+ });
+
+ // There really shouldn't even be a subscription here, but due to
+ // dart-lang/sdk#45815 there is.
+ group('canceling the subscription is a no-op', () {
+ test('synchronously', () {
+ streamGroup.add(alreadyListened);
+
+ var subscription = runZonedGuarded(
+ () => streamGroup.stream.listen(null),
+ expectAsync2((_, __) {}, count: 1));
+
+ expect(subscription!.cancel(), completes);
+ });
+
+ test('asynchronously', () async {
+ streamGroup.add(alreadyListened);
+
+ var subscription = runZonedGuarded(
+ () => streamGroup.stream.listen(null),
+ expectAsync2((_, __) {}, count: 1));
+
+ await pumpEventQueue();
+ expect(subscription!.cancel(), completes);
+ });
+ });
+ });
+ });
});
group('broadcast', () {