Add StreamGroup.mergeBroadcast (#90)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0b7518b..9071526 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 2.4.0
+
+* Add `StreamGroup.mergeBroadcast()` utility.
+
## 2.3.0
* Implement `RestartableTimer.tick`.
diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart
index d647ce3..1a6d15d 100644
--- a/lib/src/stream_group.dart
+++ b/lib/src/stream_group.dart
@@ -49,7 +49,7 @@
/// re-subscribed.
final _subscriptions = Map<Stream<T>, StreamSubscription<T>>();
- /// Merges the events from [streams] into a single (single-subscriber) stream.
+ /// Merges the events from [streams] into a single single-subscription stream.
///
/// This is equivalent to adding [streams] to a group, closing that group, and
/// returning its stream.
@@ -60,6 +60,17 @@
return group.stream;
}
+ /// Merges the events from [streams] into a single broadcast stream.
+ ///
+ /// This is equivalent to adding [streams] to a broadcast group, closing that
+ /// group, and returning its stream.
+ static Stream<T> mergeBroadcast<T>(Iterable<Stream<T>> streams) {
+ var group = StreamGroup<T>.broadcast();
+ streams.forEach(group.add);
+ group.close();
+ return group.stream;
+ }
+
/// Creates a new stream group where [stream] is single-subscriber.
StreamGroup() {
_controller = StreamController<T>(
diff --git a/pubspec.yaml b/pubspec.yaml
index b1013ac..aec53cf 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: async
-version: 2.3.0
+version: 2.4.0
description: Utility functions and classes related to the 'dart:async' library.
author: Dart Team <misc@dartlang.org>
diff --git a/test/stream_group_test.dart b/test/stream_group_test.dart
index 3c4a3a8..96b19db 100644
--- a/test/stream_group_test.dart
+++ b/test/stream_group_test.dart
@@ -435,6 +435,23 @@
expect(merged.toList(), completion(unorderedEquals(["first", "second"])));
});
+
+ test("mergeBroadcast() emits events from all components streams", () {
+ var controller1 = StreamController<String>();
+ var controller2 = StreamController<String>();
+
+ var merged =
+ StreamGroup.mergeBroadcast([controller1.stream, controller2.stream]);
+
+ controller1.add("first");
+ controller1.close();
+ controller2.add("second");
+ controller2.close();
+
+ expect(merged.isBroadcast, isTrue);
+
+ expect(merged.toList(), completion(unorderedEquals(["first", "second"])));
+ });
}
void regardlessOfType(StreamGroup<String> newStreamGroup()) {