Add ChunkedStreamReader (#161)

* Added ChunkedStreamReader

* Updated changelog

* Prepare 2.6.0 release

* Address review comments, fix boundary issue

* Cleanup tests

* More tests and const empty list
diff --git a/CHANGELOG.md b/CHANGELOG.md
index c7256ac..84938d0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,7 @@
-## 2.5.1-dev
+## 2.6.0
+
+* Added `ChunkedStreamReader` for reading _chunked streams_ without managing
+  buffers.
 
 ## 2.5.0
 
diff --git a/lib/async.dart b/lib/async.dart
index a97fc65..611d137 100644
--- a/lib/async.dart
+++ b/lib/async.dart
@@ -37,3 +37,4 @@
 export 'src/stream_zip.dart';
 export 'src/subscription_stream.dart';
 export 'src/typed_stream_transformer.dart';
+export 'src/chunked_stream_reader.dart';
diff --git a/lib/src/chunked_stream_reader.dart b/lib/src/chunked_stream_reader.dart
new file mode 100644
index 0000000..855a772
--- /dev/null
+++ b/lib/src/chunked_stream_reader.dart
@@ -0,0 +1,177 @@
+// Copyright (c) 2021, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:typed_data';
+
+import 'byte_collector.dart' show collectBytes;
+
+/// Utility class for reading elements from a _chunked stream_.
+///
+/// A _chunked stream_ is a stream where each event is a chunk of elements.
+/// Byte-streams with the type `Stream<List<int>>` is common of example of this.
+/// As illustrated in the example below, this utility class makes it easy to
+/// read a _chunked stream_ using custom chunk sizes and sub-stream sizes,
+/// without managing partially read chunks.
+///
+/// ```dart
+/// final r = ChunkedStreamReader(File('myfile.txt').openRead());
+/// try {
+///   // Read the first 4 bytes
+///   final firstBytes = await r.readChunk(4);
+///   if (firstBytes.length < 4) {
+///     throw Exception('myfile.txt has less than 4 bytes');
+///   }
+///
+///   // Read next 8 kilobytes as a substream
+///   Stream<List<int>> substream = r.readStream(8 * 1024);
+///
+///   ...
+/// } finally {
+///   // We always cancel the ChunkedStreamReader, this ensures the underlying
+///   // stream is cancelled.
+///   r.cancel();
+/// }
+/// ```
+///
+/// The read-operations [readChunk] and [readStream] must not be invoked until
+/// the future from a previous call has completed.
+class ChunkedStreamReader<T> {
+  final StreamIterator<List<T>> _input;
+  final List<T> _emptyList = const [];
+  List<T> _buffer = <T>[];
+  bool _reading = false;
+
+  factory ChunkedStreamReader(Stream<List<T>> stream) =>
+      ChunkedStreamReader._(StreamIterator(stream));
+
+  ChunkedStreamReader._(this._input);
+
+  /// Read next [size] elements from _chunked stream_, buffering to create a
+  /// chunk with [size] elements.
+  ///
+  /// This will read _chunks_ from the underlying _chunked stream_ until [size]
+  /// elements have been buffered, or end-of-stream, then it returns the first
+  /// [size] buffered elements.
+  ///
+  /// If end-of-stream is encountered before [size] elements is read, this
+  /// returns a list with fewer than [size] elements (indicating end-of-stream).
+  ///
+  /// If the underlying stream throws, the stream is cancelled, the exception is
+  /// propogated and further read operations will fail.
+  ///
+  /// Throws, if another read operation is on-going.
+  Future<List<T>> readChunk(int size) async {
+    final result = <T>[];
+    await for (final chunk in readStream(size)) {
+      result.addAll(chunk);
+    }
+    return result;
+  }
+
+  /// Read next [size] elements from _chunked stream_ as a sub-stream.
+  ///
+  /// This will pass-through _chunks_ from the underlying _chunked stream_ until
+  /// [size] elements have been returned, or end-of-stream has been encountered.
+  ///
+  /// If end-of-stream is encountered before [size] elements is read, this
+  /// returns a list with fewer than [size] elements (indicating end-of-stream).
+  ///
+  /// If the underlying stream throws, the stream is cancelled, the exception is
+  /// propogated and further read operations will fail.
+  ///
+  /// If the sub-stream returned from [readStream] is cancelled the remaining
+  /// unread elements up-to [size] are drained, allowing subsequent
+  /// read-operations to proceed after cancellation.
+  ///
+  /// Throws, if another read-operation is on-going.
+  Stream<List<T>> readStream(int size) {
+    RangeError.checkNotNegative(size, 'size');
+    if (_reading) {
+      throw StateError('Concurrent read operations are not allowed!');
+    }
+    _reading = true;
+
+    final substream = () async* {
+      // While we have data to read
+      while (size > 0) {
+        // Read something into the buffer, if it's empty
+        if (_buffer.isEmpty) {
+          if (!(await _input.moveNext())) {
+            // Don't attempt to read more data, as there is no more data.
+            size = 0;
+            _reading = false;
+            break;
+          }
+          _buffer = _input.current;
+        }
+
+        if (_buffer.isNotEmpty) {
+          if (size < _buffer.length) {
+            final output = _buffer.sublist(0, size);
+            _buffer = _buffer.sublist(size);
+            size = 0;
+            yield output;
+            _reading = false;
+            break;
+          }
+
+          final output = _buffer;
+          size -= _buffer.length;
+          _buffer = _emptyList;
+          yield output;
+        }
+      }
+    };
+
+    final c = StreamController<List<T>>();
+    c.onListen = () => c.addStream(substream()).whenComplete(c.close);
+    c.onCancel = () async {
+      while (size > 0) {
+        if (_buffer.isEmpty) {
+          if (!await _input.moveNext()) {
+            size = 0; // no more data
+            break;
+          }
+          _buffer = _input.current;
+        }
+
+        if (size < _buffer.length) {
+          _buffer = _buffer.sublist(size);
+          size = 0;
+          break;
+        }
+
+        size -= _buffer.length;
+        _buffer = _emptyList;
+      }
+      _reading = false;
+    };
+
+    return c.stream;
+  }
+
+  /// Cancel the underlying _chunked stream_.
+  ///
+  /// If a future from [readChunk] or [readStream] is still pending then
+  /// [cancel] behaves as if the underlying stream ended early. That is a future
+  /// from [readChunk] may return a partial chunk smaller than the request size.
+  ///
+  /// It is always safe to call [cancel], even if the underlying stream was read
+  /// to completion.
+  ///
+  /// It can be a good idea to call [cancel] in a `finally`-block when done
+  /// using the [ChunkedStreamReader], this mitigates risk of leaking resources.
+  Future<void> cancel() async => await _input.cancel();
+}
+
+/// Extensions for using [ChunkedStreamReader] with byte-streams.
+extension ChunkedStreamReaderByteStreamExt on ChunkedStreamReader<int> {
+  /// Read bytes into a [Uint8List].
+  ///
+  /// This does the same as [readChunk], except it uses [collectBytes] to create
+  /// a [Uint8List], which offers better performance.
+  Future<Uint8List> readBytes(int size) async =>
+      await collectBytes(readStream(size));
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index e24947d..5bfe9b1 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 2.5.1-dev
+version: 2.6.0
 
 description: Utility functions and classes related to the 'dart:async' library.
 repository: https://github.com/dart-lang/async
diff --git a/test/chunked_stream_reader.dart b/test/chunked_stream_reader.dart
new file mode 100644
index 0000000..7dcd408
--- /dev/null
+++ b/test/chunked_stream_reader.dart
@@ -0,0 +1,380 @@
+// Copyright (c) 2021, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:typed_data';
+
+import 'package:test/test.dart';
+import 'package:async/async.dart';
+
+void main() {
+  test('readChunk() chunk by chunk', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield [1, 2];
+      yield [3, 4, 5];
+      yield [6, 7, 8, 9];
+      yield [10];
+    }());
+
+    expect(await r.readChunk(2), equals([1, 2]));
+    expect(await r.readChunk(3), equals([3, 4, 5]));
+    expect(await r.readChunk(4), equals([6, 7, 8, 9]));
+    expect(await r.readChunk(1), equals([10]));
+    expect(await r.readChunk(1), equals([]));
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('readChunk() element by element', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield [1, 2];
+      yield [3, 4, 5];
+      yield [6, 7, 8, 9];
+      yield [10];
+    }());
+
+    for (var i = 0; i < 10; i++) {
+      expect(await r.readChunk(1), equals([i + 1]));
+    }
+    expect(await r.readChunk(1), equals([]));
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('readChunk() exact elements', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield [1, 2];
+      yield [3, 4, 5];
+      yield [6, 7, 8, 9];
+      yield [10];
+    }());
+
+    expect(await r.readChunk(10), equals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
+    expect(await r.readChunk(1), equals([]));
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('readChunk() past end', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield [1, 2];
+      yield [3, 4, 5];
+      yield [6, 7, 8, 9];
+      yield [10];
+    }());
+
+    expect(await r.readChunk(20), equals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
+    expect(await r.readChunk(1), equals([]));
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('readChunk() chunks of 2 elements', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield [1, 2];
+      yield [3, 4, 5];
+      yield [6, 7, 8, 9];
+      yield [10];
+    }());
+
+    expect(await r.readChunk(2), equals([1, 2]));
+    expect(await r.readChunk(2), equals([3, 4]));
+    expect(await r.readChunk(2), equals([5, 6]));
+    expect(await r.readChunk(2), equals([7, 8]));
+    expect(await r.readChunk(2), equals([9, 10]));
+    expect(await r.readChunk(1), equals([]));
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('readChunk() chunks of 3 elements', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield [1, 2];
+      yield [3, 4, 5];
+      yield [6, 7, 8, 9];
+      yield [10];
+    }());
+
+    expect(await r.readChunk(3), equals([1, 2, 3]));
+    expect(await r.readChunk(3), equals([4, 5, 6]));
+    expect(await r.readChunk(3), equals([7, 8, 9]));
+    expect(await r.readChunk(3), equals([10]));
+    expect(await r.readChunk(1), equals([]));
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('readChunk() cancel half way', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield [1, 2];
+      yield [3, 4, 5];
+      yield [6, 7, 8, 9];
+      yield [10];
+    }());
+
+    expect(await r.readChunk(5), equals([1, 2, 3, 4, 5]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('readChunk() propagates exception', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield [1, 2];
+      yield [3, 4, 5];
+      throw Exception('stopping here');
+    }());
+
+    expect(await r.readChunk(3), equals([1, 2, 3]));
+    await expectLater(r.readChunk(3), throwsException);
+
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('readStream() forwards chunks', () async {
+    final chunk2 = [3, 4, 5];
+    final chunk3 = [6, 7, 8, 9];
+    final r = ChunkedStreamReader(() async* {
+      yield [1, 2];
+      yield chunk2;
+      yield chunk3;
+      yield [10];
+    }());
+
+    expect(await r.readChunk(1), equals([1]));
+    final i = StreamIterator(r.readStream(9));
+    expect(await i.moveNext(), isTrue);
+    expect(i.current, equals([2]));
+
+    // We must forward the exact chunks otherwise it's not efficient!
+    // Hence, we have a reference equality check here.
+    expect(await i.moveNext(), isTrue);
+    expect(i.current, equals([3, 4, 5]));
+    expect(i.current == chunk2, isTrue);
+
+    expect(await i.moveNext(), isTrue);
+    expect(i.current, equals([6, 7, 8, 9]));
+    expect(i.current == chunk3, isTrue);
+
+    expect(await i.moveNext(), isTrue);
+    expect(i.current, equals([10]));
+    expect(await i.moveNext(), isFalse);
+
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('readStream() cancel at the exact end', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield [1, 2];
+      yield [3, 4, 5];
+      yield [6, 7, 8, 9];
+      yield [10];
+    }());
+
+    expect(await r.readChunk(1), equals([1]));
+    final i = StreamIterator(r.readStream(7));
+    expect(await i.moveNext(), isTrue);
+    expect(i.current, equals([2]));
+
+    expect(await i.moveNext(), isTrue);
+    expect(i.current, equals([3, 4, 5]));
+
+    expect(await i.moveNext(), isTrue);
+    expect(i.current, equals([6, 7, 8]));
+
+    await i.cancel(); // cancel substream just as it's ending
+
+    expect(await r.readChunk(2), equals([9, 10]));
+
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('readStream() cancel at the exact end on chunk boundary', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield [1, 2];
+      yield [3, 4, 5];
+      yield [6, 7, 8, 9];
+      yield [10];
+    }());
+
+    expect(await r.readChunk(1), equals([1]));
+    final i = StreamIterator(r.readStream(8));
+    expect(await i.moveNext(), isTrue);
+    expect(i.current, equals([2]));
+
+    expect(await i.moveNext(), isTrue);
+    expect(i.current, equals([3, 4, 5]));
+
+    expect(await i.moveNext(), isTrue);
+    expect(i.current, equals([6, 7, 8, 9]));
+
+    await i.cancel(); // cancel substream just as it's ending
+
+    expect(await r.readChunk(2), equals([10]));
+
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('readStream() is drained when canceled', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield [1, 2];
+      yield [3, 4, 5];
+      yield [6, 7, 8, 9];
+      yield [10];
+    }());
+
+    expect(await r.readChunk(1), equals([1]));
+    final i = StreamIterator(r.readStream(7));
+    expect(await i.moveNext(), isTrue);
+    expect(i.current, equals([2]));
+    // Cancelling here should skip the remainder of the substream
+    // and we continue to read 9 and 10 from r
+    await i.cancel();
+
+    expect(await r.readChunk(2), equals([9, 10]));
+
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('readStream() concurrent reads is forbidden', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield [1, 2];
+      yield [3, 4, 5];
+      yield [6, 7, 8, 9];
+      yield [10];
+    }());
+
+    expect(await r.readChunk(1), equals([1]));
+    // Notice we are not reading this substream:
+    r.readStream(7);
+
+    expectLater(r.readChunk(2), throwsStateError);
+  });
+
+  test('readStream() supports draining', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield [1, 2];
+      yield [3, 4, 5];
+      yield [6, 7, 8, 9];
+      yield [10];
+    }());
+
+    expect(await r.readChunk(1), equals([1]));
+    await r.readStream(7).drain();
+    expect(await r.readChunk(2), equals([9, 10]));
+
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('nested ChunkedStreamReader', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield [1, 2];
+      yield [3, 4, 5];
+      yield [6, 7, 8, 9];
+      yield [10];
+    }());
+
+    expect(await r.readChunk(1), equals([1]));
+    final r2 = ChunkedStreamReader(r.readStream(7));
+    expect(await r2.readChunk(2), equals([2, 3]));
+    expect(await r2.readChunk(1), equals([4]));
+    await r2.cancel();
+
+    expect(await r.readChunk(2), equals([9, 10]));
+
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('readBytes() chunks of 3 elements', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield [1, 2];
+      yield [3, 4, 5];
+      yield [6, 7, 8, 9];
+      yield [10];
+    }());
+
+    expect(await r.readBytes(3), allOf(equals([1, 2, 3]), isA<Uint8List>()));
+    expect(await r.readBytes(3), allOf(equals([4, 5, 6]), isA<Uint8List>()));
+    expect(await r.readBytes(3), allOf(equals([7, 8, 9]), isA<Uint8List>()));
+    expect(await r.readBytes(3), allOf(equals([10]), isA<Uint8List>()));
+    expect(await r.readBytes(1), equals([]));
+    expect(await r.readBytes(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readBytes(1), equals([]));
+  });
+
+  test('readChunk() until exact end of stream', () async {
+    final stream = Stream.fromIterable(Iterable.generate(
+      10,
+      (_) => Uint8List(512),
+    ));
+
+    final r = ChunkedStreamReader(stream);
+    while (true) {
+      final c = await r.readBytes(1024);
+      if (c.isEmpty) {
+        break;
+      }
+    }
+  });
+
+  test('cancel while readChunk() is pending', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield [1, 2, 3];
+      // This will hang forever, so we will call cancel()
+      await Completer().future;
+      yield [4]; // this should never be reachable
+      fail('unreachable!');
+    }());
+
+    expect(await r.readBytes(2), equals([1, 2]));
+
+    final future = r.readChunk(2);
+
+    // Wait a tiny bit and cancel
+    await Future.microtask(() => null);
+    r.cancel();
+
+    expect(await future, hasLength(lessThan(2)));
+  });
+
+  test('cancel while readStream() is pending', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield [1, 2, 3];
+      // This will hang forever, so we will call cancel()
+      await Completer().future;
+      yield [4]; // this should never be reachable
+      fail('unreachable!');
+    }());
+
+    expect(await collectBytes(r.readStream(2)), equals([1, 2]));
+
+    final stream = r.readStream(2);
+
+    // Wait a tiny bit and cancel
+    await Future.microtask(() => null);
+    r.cancel();
+
+    expect(await collectBytes(stream), hasLength(lessThan(2)));
+  });
+}