Add a Stream.slices extension method (#175)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 47effed..712352e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,6 @@
-## 2.6.2-dev
+## 2.7.0
+
+* Add a `Stream.slices()` extension method.
* Fix a bug where `CancelableOperation.then` may invoke the `onValue` callback,
even if it had been canceled before `CancelableOperation.value` completes.
diff --git a/lib/async.dart b/lib/async.dart
index 2d5876a..2170442 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -29,6 +29,7 @@
export 'src/single_subscription_transformer.dart';
export 'src/stream_closer.dart';
export 'src/stream_completer.dart';
+export 'src/stream_extensions.dart';
export 'src/stream_group.dart';
export 'src/stream_queue.dart';
export 'src/stream_sink_completer.dart';
diff --git a/lib/src/stream_extensions.dart b/lib/src/stream_extensions.dart
new file mode 100644
index 0000000..8bd4b01
--- /dev/null
+++ b/lib/src/stream_extensions.dart
@@ -0,0 +1,24 @@
+// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+/// Utility extensions on [Stream].
+extension StreamExtensions<T> on Stream<T> {
+ Stream<List<T>> slices(int length) {
+ if (length < 1) throw RangeError.range(length, 1, null, 'length');
+
+ var slice = <T>[];
+ return transform(StreamTransformer.fromHandlers(handleData: (data, sink) {
+ slice.add(data);
+ if (slice.length == length) {
+ sink.add(slice);
+ slice = [];
+ }
+ }, handleDone: (sink) {
+ if (slice.isNotEmpty) sink.add(slice);
+ sink.close();
+ }));
+ }
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index 4fc5b92..b6b6bca 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: async
-version: 2.6.2-dev
+version: 2.7.0
description: Utility functions and classes related to the 'dart:async' library.
repository: https://github.com/dart-lang/async
diff --git a/test/stream_extensions_test.dart b/test/stream_extensions_test.dart
new file mode 100644
index 0000000..85a3cee
--- /dev/null
+++ b/test/stream_extensions_test.dart
@@ -0,0 +1,58 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE filevents.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:test/test.dart';
+
+void main() {
+ group('.slices', () {
+ test('empty', () {
+ expect(Stream.empty().slices(1).toList(), completion(equals([])));
+ });
+
+ test('with the same length as the iterable', () {
+ expect(
+ Stream.fromIterable([1, 2, 3]).slices(3).toList(),
+ completion(equals([
+ [1, 2, 3]
+ ])));
+ });
+
+ test('with a longer length than the iterable', () {
+ expect(
+ Stream.fromIterable([1, 2, 3]).slices(5).toList(),
+ completion(equals([
+ [1, 2, 3]
+ ])));
+ });
+
+ test('with a shorter length than the iterable', () {
+ expect(
+ Stream.fromIterable([1, 2, 3]).slices(2).toList(),
+ completion(equals([
+ [1, 2],
+ [3]
+ ])));
+ });
+
+ test('with length divisible by the iterable\'s', () {
+ expect(
+ Stream.fromIterable([1, 2, 3, 4]).slices(2).toList(),
+ completion(equals([
+ [1, 2],
+ [3, 4]
+ ])));
+ });
+
+ test('refuses negative length', () {
+ expect(() => Stream.fromIterable([1]).slices(-1), throwsRangeError);
+ });
+
+ test('refuses length 0', () {
+ expect(() => Stream.fromIterable([1]).slices(0), throwsRangeError);
+ });
+ });
+}