blob: 3801a025ff0638ce45b2668925c1700cdb727415 [file] [log] [blame]
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
/// Utility extensions on [Stream].
extension StreamExtensions<T> on Stream<T> {
/// Creates a stream whose elements are contiguous slices of [this].
///
/// Each slice is [length] elements long, except for the last one which may be
/// shorter if [this] emits too few elements. Each slice begins after the
/// last one ends.
///
/// For example, `Stream.fromIterable([1, 2, 3, 4, 5]).slices(2)` emits
/// `([1, 2], [3, 4], [5])`.
///
/// Errors are forwarded to the result stream immediately when they occur,
/// even if previous data events have not been emitted because the next slice
/// is not complete yet.
Stream<List<T>> slices(int length) {
if (length < 1) throw RangeError.range(length, 1, null, 'length');
var slice = <T>[];
return transform(StreamTransformer.fromHandlers(handleData: (data, sink) {
slice.add(data);
if (slice.length == length) {
sink.add(slice);
slice = [];
}
}, handleDone: (sink) {
if (slice.isNotEmpty) sink.add(slice);
sink.close();
}));
}
}