Version 1.24.0-dev.6.6
Cherry-pick 33f360cc38c23d82ee0c02a9792466062f3225cf to dev
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b9759dc..553dcce 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -63,9 +63,6 @@
* JSON maps are now typed as `Map<String, dynamic>` instead of
`Map<dynamic, dynamic>`. A JSON-map is not a `HashMap` or `LinkedHashMap`
anymore (but just a `Map`).
-* `dart:async`
- * Add `groupBy` to `Stream`. Allows splitting a string into separate streams
- depending on "key" property computed from the individual events.
* `dart:async`, `dart:io`, `dart:core`
* Adding to a closed sink, including `IOSink`, is not allowed anymore. In
1.24, violations are only reported (on stdout or stderr), but a future
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
index 77230e8..735dc10 100644
--- a/sdk/lib/async/stream.dart
+++ b/sdk/lib/async/stream.dart
@@ -386,76 +386,6 @@
return new _MapStream<T, S>(this, convert);
}
- /// Groups events by a computed key.
- ///
- /// A key is extracted from incoming events.
- /// The first time a key is seen, a stream is created for it, and emitted
- /// on the returned stream, along with the key, as a [GroupedEvents] object.
- /// Then the event is emitted on the stream ([GroupedEvents.values])
- /// corresponding to the key.
- ///
- /// An error on the source stream, or when calling the `key` functions,
- /// will emit the error on the returned stream.
- ///
- /// Canceling the subscription on the returned stream will stop processing
- /// and close the streams for all groups.
- ///
- /// Pausing the subscription on the returned stream will pause processing
- /// and no further events are added to streams for the individual groups.
- ///
- /// Pausing or canceling an individual group stream has no effect other than
- /// on that stream. Events will be queued while the group stream
- /// is paused and until it is first listened to.
- /// If the [GroupedEvents.values] stream is never listened to,
- /// it will enqueue all the events unnecessarily.
- Stream<GroupedEvents<K, T>> groupBy<K>(K key(T event)) {
- var controller;
- controller = new StreamController<GroupedEvents<K, T>>(
- sync: true,
- onListen: () {
- var groupControllers = new HashMap<K, StreamController<T>>();
-
- void closeAll() {
- for (var groupController in groupControllers.values) {
- groupController.close();
- }
- }
-
- var subscription = this.listen(
- (data) {
- K theKey;
- try {
- theKey = key(data);
- } catch (error, stackTrace) {
- controller.addError(error, stackTrace);
- return;
- }
- var groupController = groupControllers[theKey];
- if (groupController == null) {
- groupController =
- new StreamController<T>.broadcast(sync: true);
- groupControllers[theKey] = groupController;
- controller.add(
- new GroupedEvents<K, T>(theKey, groupController.stream));
- }
- groupController.add(data);
- },
- onError: controller.addError,
- onDone: () {
- controller.close();
- closeAll();
- });
- controller.onPause = subscription.pause;
- controller.onResume = subscription.resume;
- controller.onCancel = () {
- subscription.cancel();
- // Don't fire sync events in response to a callback.
- scheduleMicrotask(closeAll);
- };
- });
- return controller.stream;
- }
-
/**
* Creates a new stream with each data event of this stream asynchronously
* mapped to a new event.
@@ -1913,36 +1843,3 @@
_sink.close();
}
}
-
-/// A group created by [Stream.groupBy] or [Stream.groupByMapped].
-///
-/// The stream created by `groupBy` emits a `GroupedEvents` for each distinct key
-/// it encounters.
-/// This group contains the [key] itself, along with a stream of the [values]
-/// associated with that key.
-class GroupedEvents<K, V> {
- /// The key that identifiers the values emitted by [values].
- final K key;
-
- /// The [values] that [GroupBy] have grouped by the common [key].
- final Stream<V> values;
-
- factory GroupedEvents(K key, Stream<V> values) = GroupedEvents<K, V>._;
-
- // Don't expose a generative constructor.
- // This class is not intended for subclassing, so we don't want to promise
- // it. We can change that in the future.
- GroupedEvents._(this.key, this.values);
-
- /// Tells [values] to discard values instead of retaining them.
- ///
- /// Must only be used instead of listening to the [values] stream.
- /// If the stream has been listened to, this call fails.
- /// After calling this method, listening on the [values] stream fails.
- Future cancel() {
- // If values has been listened to,
- // this throws a StateError saying that stream has already been listened to,
- // which is a correct error message for this call too.
- return values.listen(null).cancel();
- }
-}
diff --git a/tests/lib/async/stream_group_by_test.dart b/tests/lib/async/stream_group_by_test.dart
deleted file mode 100644
index 2abbf26..0000000
--- a/tests/lib/async/stream_group_by_test.dart
+++ /dev/null
@@ -1,319 +0,0 @@
-// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-library stream_group_by_test;
-
-import "dart:async";
-
-import "package:expect/expect.dart";
-import "package:async_helper/async_helper.dart";
-
-int len(x) => x.length;
-String wrap(x) => "[$x]";
-
-void main() {
- asyncStart();
- // groupBy.
- test("splits", () async {
- var grouped = stringStream.groupBy<int>(len);
- var byLength = <int, Future<List<String>>>{};
- await for (GroupedEvents<int, String> group in grouped) {
- byLength[group.key] = group.values.toList();
- }
- Expect.listEquals([1, 2, 4, 3], byLength.keys.toList());
- expectCompletes(byLength[1], ["a", "b"]);
- expectCompletes(byLength[2], ["ab"]);
- expectCompletes(byLength[3], ["abe", "lea"]);
- expectCompletes(byLength[4], ["abel", "bell", "able", "abba"]);
- });
-
- test("empty", () async {
- var grouped = emptyStream.groupBy<int>(len);
- var byLength = <int, Future<List<String>>>{};
- await for (GroupedEvents<int, String> group in grouped) {
- byLength[group.key] = group.values.toList();
- }
- Expect.isTrue(byLength.isEmpty);
- });
-
- test("single group", () async {
- var grouped = repeatStream(5, "x").groupBy<int>(len);
- var byLength = <int, Future<List<String>>>{};
- await for (GroupedEvents<int, String> group in grouped) {
- byLength[group.key] = group.values.toList();
- }
- Expect.listEquals([1], byLength.keys.toList());
- expectCompletes(byLength[1], ["x", "x", "x", "x", "x"]);
- });
-
- test("with error", () async {
- var grouped = stringErrorStream(3).groupBy<int>(len);
- var byLength = <int, Future<List<String>>>{};
- bool caught = false;
- try {
- await for (GroupedEvents<int, String> group in grouped) {
- byLength[group.key] = group.values.toList();
- }
- } catch (e) {
- Expect.equals("BAD", e);
- caught = true;
- }
- Expect.isTrue(caught);
- Expect.listEquals([1, 2, 4], byLength.keys.toList());
- expectCompletes(byLength[1], ["a", "b"]);
- expectCompletes(byLength[2], ["ab"]);
- expectCompletes(byLength[4], ["abel"]);
- });
-
- // For comparison with later tests.
- test("no pause or cancel", () async {
- var grouped = stringStream.groupBy<int>(len);
- var events = [];
- var futures = [];
- await grouped.forEach((sg) {
- var key = sg.key;
- var sub;
- sub = sg.values.listen((value) {
- events.add("$key:$value");
- });
- var c = new Completer();
- futures.add(c.future);
- sub.onDone(() {
- c.complete(null);
- });
- });
- await Future.wait(futures);
- Expect.listEquals([
- "1:a",
- "2:ab",
- "1:b",
- "4:abel",
- "3:abe",
- "4:bell",
- "4:able",
- "4:abba",
- "3:lea",
- ], events);
- });
-
- test("pause on group", () async {
- // Pausing the individial group's stream just makes it buffer.
- var grouped = stringStream.groupBy<int>(len);
- var events = [];
- var futures = [];
- await grouped.forEach((sg) {
- var key = sg.key;
- var sub;
- sub = sg.values.listen((value) {
- events.add("$key:$value");
- if (value == "a") {
- // Pause until a later timer event, which is after stringStream
- // has delivered all events.
- sub.pause(new Future.delayed(Duration.ZERO, () {}));
- }
- });
- var c = new Completer();
- futures.add(c.future);
- sub.onDone(() {
- c.complete(null);
- });
- });
- await Future.wait(futures);
- Expect.listEquals([
- "1:a",
- "2:ab",
- "4:abel",
- "3:abe",
- "4:bell",
- "4:able",
- "4:abba",
- "3:lea",
- "1:b"
- ], events);
- });
-
- test("pause on group-stream", () async {
- // Pausing the stream returned by groupBy stops everything.
- var grouped = stringStream.groupBy<int>(len);
- var events = [];
- var futures = [];
- var done = new Completer();
- var sub;
- sub = grouped.listen((sg) {
- var key = sg.key;
- futures.add(sg.values.forEach((value) {
- events.add("$key:$value");
- if (value == "a") {
- // Pause everything until a later timer event.
- asyncStart();
- var eventSnapshot = events.toList();
- var delay = new Future.delayed(Duration.ZERO).then((_) {
- // No events added.
- Expect.listEquals(eventSnapshot, events);
- asyncEnd(); // Ensures this test has run.
- });
- sub.pause(delay);
- }
- }));
- });
- sub.onDone(() {
- done.complete(null);
- });
- futures.add(done.future);
- await Future.wait(futures);
- Expect.listEquals([
- "1:a",
- "2:ab",
- "1:b",
- "4:abel",
- "3:abe",
- "4:bell",
- "4:able",
- "4:abba",
- "3:lea",
- ], events);
- });
-
- test("cancel on group", () async {
- // Cancelling the individial group's stream just makes that one stop.
- var grouped = stringStream.groupBy<int>(len);
- var events = [];
- var futures = [];
- await grouped.forEach((sg) {
- var key = sg.key;
- var sub;
- var c = new Completer();
- sub = sg.values.listen((value) {
- events.add("$key:$value");
- if (value == "bell") {
- // Pause until a later timer event, which is after stringStream
- // has delivered all events.
- sub.cancel();
- c.complete(null);
- }
- });
- futures.add(c.future);
- sub.onDone(() {
- c.complete(null);
- });
- });
- await Future.wait(futures);
- Expect.listEquals([
- "1:a",
- "2:ab",
- "1:b",
- "4:abel",
- "3:abe",
- "4:bell",
- "3:lea",
- ], events);
- });
-
- test("cancel on group-stream", () async {
- // Cancel the stream returned by groupBy ends everything.
- var grouped = stringStream.groupBy<int>(len);
- var events = [];
- var futures = [];
- var done = new Completer();
- var sub;
- sub = grouped.listen((sg) {
- var key = sg.key;
- futures.add(sg.values.forEach((value) {
- events.add("$key:$value");
- if (value == "bell") {
- // Pause everything until a later timer event.
- futures.add(sub.cancel());
- done.complete();
- }
- }));
- });
- futures.add(done.future);
- await Future.wait(futures);
- Expect.listEquals([
- "1:a",
- "2:ab",
- "1:b",
- "4:abel",
- "3:abe",
- "4:bell",
- ], events);
- });
-
- asyncEnd();
-}
-
-expectCompletes(future, result) {
- asyncStart();
- future.then((v) {
- if (result is List) {
- Expect.listEquals(result, v);
- } else {
- Expect.equals(v, result);
- }
- asyncEnd();
- }, onError: (e, s) {
- Expect.fail("$e\n$s");
- });
-}
-
-void test(name, func) {
- asyncStart();
- func().then((_) {
- asyncEnd();
- }, onError: (e, s) {
- Expect.fail("$name: $e\n$s");
- });
-}
-
-var strings = const [
- "a",
- "ab",
- "b",
- "abel",
- "abe",
- "bell",
- "able",
- "abba",
- "lea"
-];
-
-Stream<String> get stringStream async* {
- for (var string in strings) {
- yield string;
- }
-}
-
-Stream get emptyStream async* {}
-
-Stream repeatStream(int count, value) async* {
- for (var i = 0; i < count; i++) {
- yield value;
- }
-}
-
-// Just some valid stack trace.
-var stack = StackTrace.current;
-
-Stream<String> stringErrorStream(int errorAfter) async* {
- for (int i = 0; i < strings.length; i++) {
- yield strings[i];
- if (i == errorAfter) {
- // Emit error, but continue afterwards.
- yield* new Future.error("BAD", stack).asStream();
- }
- }
-}
-
-Stream intStream(int count, [int start = 0]) async* {
- for (int i = 0; i < count; i++) {
- yield start++;
- }
-}
-
-Stream timerStream(int count, Duration interval) async* {
- for (int i = 0; i < count; i++) {
- await new Future.delayed(interval);
- yield i;
- }
-}
diff --git a/tools/VERSION b/tools/VERSION
index b6b50ee..1f4f9fb 100644
--- a/tools/VERSION
+++ b/tools/VERSION
@@ -28,4 +28,4 @@
MINOR 24
PATCH 0
PRERELEASE 6
-PRERELEASE_PATCH 5
+PRERELEASE_PATCH 6