Add ChunkedStreamReader (#161)
* Added ChunkedStreamReader
* Updated changelog
* Prepare 2.6.0 release
* Address review comments, fix boundary issue
* Cleanup tests
* More tests and const empty list
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c7256ac..84938d0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,7 @@
-## 2.5.1-dev
+## 2.6.0
+
+* Added `ChunkedStreamReader` for reading _chunked streams_ without managing
+ buffers.
## 2.5.0
diff --git a/lib/async.dart b/lib/async.dart
index a97fc65..611d137 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -37,3 +37,4 @@
export 'src/stream_zip.dart';
export 'src/subscription_stream.dart';
export 'src/typed_stream_transformer.dart';
+export 'src/chunked_stream_reader.dart';
diff --git a/lib/src/chunked_stream_reader.dart b/lib/src/chunked_stream_reader.dart
new file mode 100644
index 0000000..855a772
--- /dev/null
+++ b/lib/src/chunked_stream_reader.dart
@@ -0,0 +1,177 @@
+// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:typed_data';
+
+import 'byte_collector.dart' show collectBytes;
+
+/// Utility class for reading elements from a _chunked stream_.
+///
+/// A _chunked stream_ is a stream where each event is a chunk of elements.
+/// Byte-streams with the type `Stream<List<int>>` is common of example of this.
+/// As illustrated in the example below, this utility class makes it easy to
+/// read a _chunked stream_ using custom chunk sizes and sub-stream sizes,
+/// without managing partially read chunks.
+///
+/// ```dart
+/// final r = ChunkedStreamReader(File('myfile.txt').openRead());
+/// try {
+/// // Read the first 4 bytes
+/// final firstBytes = await r.readChunk(4);
+/// if (firstBytes.length < 4) {
+/// throw Exception('myfile.txt has less than 4 bytes');
+/// }
+///
+/// // Read next 8 kilobytes as a substream
+/// Stream<List<int>> substream = r.readStream(8 * 1024);
+///
+/// ...
+/// } finally {
+/// // We always cancel the ChunkedStreamReader, this ensures the underlying
+/// // stream is cancelled.
+/// r.cancel();
+/// }
+/// ```
+///
+/// The read-operations [readChunk] and [readStream] must not be invoked until
+/// the future from a previous call has completed.
+class ChunkedStreamReader<T> {
+ final StreamIterator<List<T>> _input;
+ final List<T> _emptyList = const [];
+ List<T> _buffer = <T>[];
+ bool _reading = false;
+
+ factory ChunkedStreamReader(Stream<List<T>> stream) =>
+ ChunkedStreamReader._(StreamIterator(stream));
+
+ ChunkedStreamReader._(this._input);
+
+ /// Read next [size] elements from _chunked stream_, buffering to create a
+ /// chunk with [size] elements.
+ ///
+ /// This will read _chunks_ from the underlying _chunked stream_ until [size]
+ /// elements have been buffered, or end-of-stream, then it returns the first
+ /// [size] buffered elements.
+ ///
+ /// If end-of-stream is encountered before [size] elements is read, this
+ /// returns a list with fewer than [size] elements (indicating end-of-stream).
+ ///
+ /// If the underlying stream throws, the stream is cancelled, the exception is
+ /// propogated and further read operations will fail.
+ ///
+ /// Throws, if another read operation is on-going.
+ Future<List<T>> readChunk(int size) async {
+ final result = <T>[];
+ await for (final chunk in readStream(size)) {
+ result.addAll(chunk);
+ }
+ return result;
+ }
+
+ /// Read next [size] elements from _chunked stream_ as a sub-stream.
+ ///
+ /// This will pass-through _chunks_ from the underlying _chunked stream_ until
+ /// [size] elements have been returned, or end-of-stream has been encountered.
+ ///
+ /// If end-of-stream is encountered before [size] elements is read, this
+ /// returns a list with fewer than [size] elements (indicating end-of-stream).
+ ///
+ /// If the underlying stream throws, the stream is cancelled, the exception is
+ /// propogated and further read operations will fail.
+ ///
+ /// If the sub-stream returned from [readStream] is cancelled the remaining
+ /// unread elements up-to [size] are drained, allowing subsequent
+ /// read-operations to proceed after cancellation.
+ ///
+ /// Throws, if another read-operation is on-going.
+ Stream<List<T>> readStream(int size) {
+ RangeError.checkNotNegative(size, 'size');
+ if (_reading) {
+ throw StateError('Concurrent read operations are not allowed!');
+ }
+ _reading = true;
+
+ final substream = () async* {
+ // While we have data to read
+ while (size > 0) {
+ // Read something into the buffer, if it's empty
+ if (_buffer.isEmpty) {
+ if (!(await _input.moveNext())) {
+ // Don't attempt to read more data, as there is no more data.
+ size = 0;
+ _reading = false;
+ break;
+ }
+ _buffer = _input.current;
+ }
+
+ if (_buffer.isNotEmpty) {
+ if (size < _buffer.length) {
+ final output = _buffer.sublist(0, size);
+ _buffer = _buffer.sublist(size);
+ size = 0;
+ yield output;
+ _reading = false;
+ break;
+ }
+
+ final output = _buffer;
+ size -= _buffer.length;
+ _buffer = _emptyList;
+ yield output;
+ }
+ }
+ };
+
+ final c = StreamController<List<T>>();
+ c.onListen = () => c.addStream(substream()).whenComplete(c.close);
+ c.onCancel = () async {
+ while (size > 0) {
+ if (_buffer.isEmpty) {
+ if (!await _input.moveNext()) {
+ size = 0; // no more data
+ break;
+ }
+ _buffer = _input.current;
+ }
+
+ if (size < _buffer.length) {
+ _buffer = _buffer.sublist(size);
+ size = 0;
+ break;
+ }
+
+ size -= _buffer.length;
+ _buffer = _emptyList;
+ }
+ _reading = false;
+ };
+
+ return c.stream;
+ }
+
+ /// Cancel the underlying _chunked stream_.
+ ///
+ /// If a future from [readChunk] or [readStream] is still pending then
+ /// [cancel] behaves as if the underlying stream ended early. That is a future
+ /// from [readChunk] may return a partial chunk smaller than the request size.
+ ///
+ /// It is always safe to call [cancel], even if the underlying stream was read
+ /// to completion.
+ ///
+ /// It can be a good idea to call [cancel] in a `finally`-block when done
+ /// using the [ChunkedStreamReader], this mitigates risk of leaking resources.
+ Future<void> cancel() async => await _input.cancel();
+}
+
+/// Extensions for using [ChunkedStreamReader] with byte-streams.
+extension ChunkedStreamReaderByteStreamExt on ChunkedStreamReader<int> {
+ /// Read bytes into a [Uint8List].
+ ///
+ /// This does the same as [readChunk], except it uses [collectBytes] to create
+ /// a [Uint8List], which offers better performance.
+ Future<Uint8List> readBytes(int size) async =>
+ await collectBytes(readStream(size));
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index e24947d..5bfe9b1 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: async
-version: 2.5.1-dev
+version: 2.6.0
description: Utility functions and classes related to the 'dart:async' library.
repository: https://github.com/dart-lang/async
diff --git a/test/chunked_stream_reader.dart b/test/chunked_stream_reader.dart
new file mode 100644
index 0000000..7dcd408
--- /dev/null
+++ b/test/chunked_stream_reader.dart
@@ -0,0 +1,380 @@
+// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:typed_data';
+
+import 'package:test/test.dart';
+import 'package:async/async.dart';
+
+void main() {
+ test('readChunk() chunk by chunk', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield [1, 2];
+ yield [3, 4, 5];
+ yield [6, 7, 8, 9];
+ yield [10];
+ }());
+
+ expect(await r.readChunk(2), equals([1, 2]));
+ expect(await r.readChunk(3), equals([3, 4, 5]));
+ expect(await r.readChunk(4), equals([6, 7, 8, 9]));
+ expect(await r.readChunk(1), equals([10]));
+ expect(await r.readChunk(1), equals([]));
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('readChunk() element by element', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield [1, 2];
+ yield [3, 4, 5];
+ yield [6, 7, 8, 9];
+ yield [10];
+ }());
+
+ for (var i = 0; i < 10; i++) {
+ expect(await r.readChunk(1), equals([i + 1]));
+ }
+ expect(await r.readChunk(1), equals([]));
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('readChunk() exact elements', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield [1, 2];
+ yield [3, 4, 5];
+ yield [6, 7, 8, 9];
+ yield [10];
+ }());
+
+ expect(await r.readChunk(10), equals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
+ expect(await r.readChunk(1), equals([]));
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('readChunk() past end', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield [1, 2];
+ yield [3, 4, 5];
+ yield [6, 7, 8, 9];
+ yield [10];
+ }());
+
+ expect(await r.readChunk(20), equals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
+ expect(await r.readChunk(1), equals([]));
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('readChunk() chunks of 2 elements', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield [1, 2];
+ yield [3, 4, 5];
+ yield [6, 7, 8, 9];
+ yield [10];
+ }());
+
+ expect(await r.readChunk(2), equals([1, 2]));
+ expect(await r.readChunk(2), equals([3, 4]));
+ expect(await r.readChunk(2), equals([5, 6]));
+ expect(await r.readChunk(2), equals([7, 8]));
+ expect(await r.readChunk(2), equals([9, 10]));
+ expect(await r.readChunk(1), equals([]));
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('readChunk() chunks of 3 elements', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield [1, 2];
+ yield [3, 4, 5];
+ yield [6, 7, 8, 9];
+ yield [10];
+ }());
+
+ expect(await r.readChunk(3), equals([1, 2, 3]));
+ expect(await r.readChunk(3), equals([4, 5, 6]));
+ expect(await r.readChunk(3), equals([7, 8, 9]));
+ expect(await r.readChunk(3), equals([10]));
+ expect(await r.readChunk(1), equals([]));
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('readChunk() cancel half way', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield [1, 2];
+ yield [3, 4, 5];
+ yield [6, 7, 8, 9];
+ yield [10];
+ }());
+
+ expect(await r.readChunk(5), equals([1, 2, 3, 4, 5]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('readChunk() propagates exception', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield [1, 2];
+ yield [3, 4, 5];
+ throw Exception('stopping here');
+ }());
+
+ expect(await r.readChunk(3), equals([1, 2, 3]));
+ await expectLater(r.readChunk(3), throwsException);
+
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('readStream() forwards chunks', () async {
+ final chunk2 = [3, 4, 5];
+ final chunk3 = [6, 7, 8, 9];
+ final r = ChunkedStreamReader(() async* {
+ yield [1, 2];
+ yield chunk2;
+ yield chunk3;
+ yield [10];
+ }());
+
+ expect(await r.readChunk(1), equals([1]));
+ final i = StreamIterator(r.readStream(9));
+ expect(await i.moveNext(), isTrue);
+ expect(i.current, equals([2]));
+
+ // We must forward the exact chunks otherwise it's not efficient!
+ // Hence, we have a reference equality check here.
+ expect(await i.moveNext(), isTrue);
+ expect(i.current, equals([3, 4, 5]));
+ expect(i.current == chunk2, isTrue);
+
+ expect(await i.moveNext(), isTrue);
+ expect(i.current, equals([6, 7, 8, 9]));
+ expect(i.current == chunk3, isTrue);
+
+ expect(await i.moveNext(), isTrue);
+ expect(i.current, equals([10]));
+ expect(await i.moveNext(), isFalse);
+
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('readStream() cancel at the exact end', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield [1, 2];
+ yield [3, 4, 5];
+ yield [6, 7, 8, 9];
+ yield [10];
+ }());
+
+ expect(await r.readChunk(1), equals([1]));
+ final i = StreamIterator(r.readStream(7));
+ expect(await i.moveNext(), isTrue);
+ expect(i.current, equals([2]));
+
+ expect(await i.moveNext(), isTrue);
+ expect(i.current, equals([3, 4, 5]));
+
+ expect(await i.moveNext(), isTrue);
+ expect(i.current, equals([6, 7, 8]));
+
+ await i.cancel(); // cancel substream just as it's ending
+
+ expect(await r.readChunk(2), equals([9, 10]));
+
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('readStream() cancel at the exact end on chunk boundary', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield [1, 2];
+ yield [3, 4, 5];
+ yield [6, 7, 8, 9];
+ yield [10];
+ }());
+
+ expect(await r.readChunk(1), equals([1]));
+ final i = StreamIterator(r.readStream(8));
+ expect(await i.moveNext(), isTrue);
+ expect(i.current, equals([2]));
+
+ expect(await i.moveNext(), isTrue);
+ expect(i.current, equals([3, 4, 5]));
+
+ expect(await i.moveNext(), isTrue);
+ expect(i.current, equals([6, 7, 8, 9]));
+
+ await i.cancel(); // cancel substream just as it's ending
+
+ expect(await r.readChunk(2), equals([10]));
+
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('readStream() is drained when canceled', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield [1, 2];
+ yield [3, 4, 5];
+ yield [6, 7, 8, 9];
+ yield [10];
+ }());
+
+ expect(await r.readChunk(1), equals([1]));
+ final i = StreamIterator(r.readStream(7));
+ expect(await i.moveNext(), isTrue);
+ expect(i.current, equals([2]));
+ // Cancelling here should skip the remainder of the substream
+ // and we continue to read 9 and 10 from r
+ await i.cancel();
+
+ expect(await r.readChunk(2), equals([9, 10]));
+
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('readStream() concurrent reads is forbidden', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield [1, 2];
+ yield [3, 4, 5];
+ yield [6, 7, 8, 9];
+ yield [10];
+ }());
+
+ expect(await r.readChunk(1), equals([1]));
+ // Notice we are not reading this substream:
+ r.readStream(7);
+
+ expectLater(r.readChunk(2), throwsStateError);
+ });
+
+ test('readStream() supports draining', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield [1, 2];
+ yield [3, 4, 5];
+ yield [6, 7, 8, 9];
+ yield [10];
+ }());
+
+ expect(await r.readChunk(1), equals([1]));
+ await r.readStream(7).drain();
+ expect(await r.readChunk(2), equals([9, 10]));
+
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('nested ChunkedStreamReader', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield [1, 2];
+ yield [3, 4, 5];
+ yield [6, 7, 8, 9];
+ yield [10];
+ }());
+
+ expect(await r.readChunk(1), equals([1]));
+ final r2 = ChunkedStreamReader(r.readStream(7));
+ expect(await r2.readChunk(2), equals([2, 3]));
+ expect(await r2.readChunk(1), equals([4]));
+ await r2.cancel();
+
+ expect(await r.readChunk(2), equals([9, 10]));
+
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('readBytes() chunks of 3 elements', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield [1, 2];
+ yield [3, 4, 5];
+ yield [6, 7, 8, 9];
+ yield [10];
+ }());
+
+ expect(await r.readBytes(3), allOf(equals([1, 2, 3]), isA<Uint8List>()));
+ expect(await r.readBytes(3), allOf(equals([4, 5, 6]), isA<Uint8List>()));
+ expect(await r.readBytes(3), allOf(equals([7, 8, 9]), isA<Uint8List>()));
+ expect(await r.readBytes(3), allOf(equals([10]), isA<Uint8List>()));
+ expect(await r.readBytes(1), equals([]));
+ expect(await r.readBytes(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readBytes(1), equals([]));
+ });
+
+ test('readChunk() until exact end of stream', () async {
+ final stream = Stream.fromIterable(Iterable.generate(
+ 10,
+ (_) => Uint8List(512),
+ ));
+
+ final r = ChunkedStreamReader(stream);
+ while (true) {
+ final c = await r.readBytes(1024);
+ if (c.isEmpty) {
+ break;
+ }
+ }
+ });
+
+ test('cancel while readChunk() is pending', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield [1, 2, 3];
+ // This will hang forever, so we will call cancel()
+ await Completer().future;
+ yield [4]; // this should never be reachable
+ fail('unreachable!');
+ }());
+
+ expect(await r.readBytes(2), equals([1, 2]));
+
+ final future = r.readChunk(2);
+
+ // Wait a tiny bit and cancel
+ await Future.microtask(() => null);
+ r.cancel();
+
+ expect(await future, hasLength(lessThan(2)));
+ });
+
+ test('cancel while readStream() is pending', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield [1, 2, 3];
+ // This will hang forever, so we will call cancel()
+ await Completer().future;
+ yield [4]; // this should never be reachable
+ fail('unreachable!');
+ }());
+
+ expect(await collectBytes(r.readStream(2)), equals([1, 2]));
+
+ final stream = r.readStream(2);
+
+ // Wait a tiny bit and cancel
+ await Future.microtask(() => null);
+ r.cancel();
+
+ expect(await collectBytes(stream), hasLength(lessThan(2)));
+ });
+}