blob: 8bd4b017cedd4ef8a1a3530f06dd4974019460bc [file] [log] [blame]
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
/// Utility extensions on [Stream].
extension StreamExtensions<T> on Stream<T> {
Stream<List<T>> slices(int length) {
if (length < 1) throw RangeError.range(length, 1, null, 'length');
var slice = <T>[];
return transform(StreamTransformer.fromHandlers(handleData: (data, sink) {
slice.add(data);
if (slice.length == length) {
sink.add(slice);
slice = [];
}
}, handleDone: (sink) {
if (slice.isNotEmpty) sink.add(slice);
sink.close();
}));
}
}