Faster ChunkedStreamReader. (#182)
* Faster ChunkedStreamReader.
* Add an internal `_offset` to track offset in `_buffer`, reducing the
number of times we need to create a sublist internally.
* Specialize to handle cases where `_buffer` is a `Uint8List` by
creating a `Uint8List.sublistView` when we need to split a chunk.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 58d6f9b..2e366fe 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 2.7.0-dev
+
+* Improve performance for `ChunkedStreamReader` by creating fewer internal
+ sublists and specializing to create views for `Uint8List` chunks.
+
## 2.7.0
* Add a `Stream.slices()` extension method.
diff --git a/lib/src/chunked_stream_reader.dart b/lib/src/chunked_stream_reader.dart
index 855a772..a896341 100644
--- a/lib/src/chunked_stream_reader.dart
+++ b/lib/src/chunked_stream_reader.dart
@@ -38,9 +38,34 @@
/// The read-operations [readChunk] and [readStream] must not be invoked until
/// the future from a previous call has completed.
class ChunkedStreamReader<T> {
+ /// Iterator over underlying stream.
+ ///
+ /// The reader requests data from this input whenever requests on the
+ /// reader cannot be fulfilled with the already fetched data.
final StreamIterator<List<T>> _input;
+
+ /// Sentinel value used for [_buffer] when we have no value.
final List<T> _emptyList = const [];
+
+ /// Last partially consumed chunk received from [_input].
+ ///
+ /// Elements up to [_offset] have already been consumed and should not be
+ /// consumed again.
List<T> _buffer = <T>[];
+
+ /// Offset into [_buffer] after data which have already been emitted.
+ ///
+ /// The offset is between `0` and `_buffer.length`, both inclusive.
+ /// The data in [_buffer] from [_offset] and forward have not yet been
+ /// emitted by the chunked stream reader, the data before [_offset] has.
+ int _offset = 0;
+
+ /// Whether a read request is currently being processed.
+ ///
+ /// Is `true` while a request is in progress.
+ /// While a read request, like [readChunk] or [readStream], is being processed,
+ /// no new requests can be made.
+ /// New read attempts will throw instead.
bool _reading = false;
factory ChunkedStreamReader(Stream<List<T>> stream) =>
@@ -96,8 +121,9 @@
final substream = () async* {
// While we have data to read
while (size > 0) {
- // Read something into the buffer, if it's empty
- if (_buffer.isEmpty) {
+ // Read something into the buffer, if buffer has been consumed.
+ assert(_offset <= _buffer.length);
+ if (_offset == _buffer.length) {
if (!(await _input.moveNext())) {
// Don't attempt to read more data, as there is no more data.
size = 0;
@@ -105,21 +131,30 @@
break;
}
_buffer = _input.current;
+ _offset = 0;
}
- if (_buffer.isNotEmpty) {
- if (size < _buffer.length) {
- final output = _buffer.sublist(0, size);
- _buffer = _buffer.sublist(size);
+ final remainingBuffer = _buffer.length - _offset;
+ if (remainingBuffer > 0) {
+ if (remainingBuffer >= size) {
+ List<T> output;
+ if (_buffer is Uint8List) {
+ output = Uint8List.sublistView(
+ _buffer as Uint8List, _offset, _offset + size) as List<T>;
+ } else {
+ output = _buffer.sublist(_offset, _offset + size);
+ }
+ _offset += size;
size = 0;
yield output;
_reading = false;
break;
}
- final output = _buffer;
- size -= _buffer.length;
+ final output = _offset == 0 ? _buffer : _buffer.sublist(_offset);
+ size -= remainingBuffer;
_buffer = _emptyList;
+ _offset = 0;
yield output;
}
}
@@ -129,22 +164,26 @@
c.onListen = () => c.addStream(substream()).whenComplete(c.close);
c.onCancel = () async {
while (size > 0) {
- if (_buffer.isEmpty) {
+ assert(_offset <= _buffer.length);
+ if (_buffer.length == _offset) {
if (!await _input.moveNext()) {
size = 0; // no more data
break;
}
_buffer = _input.current;
+ _offset = 0;
}
- if (size < _buffer.length) {
- _buffer = _buffer.sublist(size);
+ final remainingBuffer = _buffer.length - _offset;
+ if (remainingBuffer >= size) {
+ _offset += size;
size = 0;
break;
}
- size -= _buffer.length;
+ size -= remainingBuffer;
_buffer = _emptyList;
+ _offset = 0;
}
_reading = false;
};
diff --git a/pubspec.yaml b/pubspec.yaml
index 13e7281..79dc278 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: async
-version: 2.7.0
+version: 2.7.1-dev
description: Utility functions and classes related to the 'dart:async' library.
repository: https://github.com/dart-lang/async
diff --git a/test/chunked_stream_reader.dart b/test/chunked_stream_reader.dart
index 7dcd408..d80cf83 100644
--- a/test/chunked_stream_reader.dart
+++ b/test/chunked_stream_reader.dart
@@ -377,4 +377,106 @@
expect(await collectBytes(stream), hasLength(lessThan(2)));
});
+
+ test('readChunk() chunk by chunk (Uint8List)', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield Uint8List.fromList([1, 2]);
+ yield Uint8List.fromList([3, 4, 5]);
+ yield Uint8List.fromList([6, 7, 8, 9]);
+ yield Uint8List.fromList([10]);
+ }());
+
+ expect(await r.readChunk(2), equals([1, 2]));
+ expect(await r.readChunk(3), equals([3, 4, 5]));
+ expect(await r.readChunk(4), equals([6, 7, 8, 9]));
+ expect(await r.readChunk(1), equals([10]));
+ expect(await r.readChunk(1), equals([]));
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('readChunk() element by element (Uint8List)', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield Uint8List.fromList([1, 2]);
+ yield Uint8List.fromList([3, 4, 5]);
+ yield Uint8List.fromList([6, 7, 8, 9]);
+ yield Uint8List.fromList([10]);
+ }());
+
+ for (var i = 0; i < 10; i++) {
+ expect(await r.readChunk(1), equals([i + 1]));
+ }
+ expect(await r.readChunk(1), equals([]));
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('readChunk() exact elements (Uint8List)', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield Uint8List.fromList([1, 2]);
+ yield Uint8List.fromList([3, 4, 5]);
+ yield Uint8List.fromList([6, 7, 8, 9]);
+ yield Uint8List.fromList([10]);
+ }());
+
+ expect(await r.readChunk(10), equals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
+ expect(await r.readChunk(1), equals([]));
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('readChunk() past end (Uint8List)', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield Uint8List.fromList([1, 2]);
+ yield Uint8List.fromList([3, 4, 5]);
+ yield Uint8List.fromList([6, 7, 8, 9]);
+ yield Uint8List.fromList([10]);
+ }());
+
+ expect(await r.readChunk(20), equals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
+ expect(await r.readChunk(1), equals([]));
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('readChunk() chunks of 2 elements (Uint8List)', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield Uint8List.fromList([1, 2]);
+ yield Uint8List.fromList([3, 4, 5]);
+ yield Uint8List.fromList([6, 7, 8, 9]);
+ yield Uint8List.fromList([10]);
+ }());
+
+ expect(await r.readChunk(2), equals([1, 2]));
+ expect(await r.readChunk(2), equals([3, 4]));
+ expect(await r.readChunk(2), equals([5, 6]));
+ expect(await r.readChunk(2), equals([7, 8]));
+ expect(await r.readChunk(2), equals([9, 10]));
+ expect(await r.readChunk(1), equals([]));
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
+
+ test('readChunk() chunks of 3 elements (Uint8List)', () async {
+ final r = ChunkedStreamReader(() async* {
+ yield Uint8List.fromList([1, 2]);
+ yield Uint8List.fromList([3, 4, 5]);
+ yield Uint8List.fromList([6, 7, 8, 9]);
+ yield Uint8List.fromList([10]);
+ }());
+
+ expect(await r.readChunk(3), equals([1, 2, 3]));
+ expect(await r.readChunk(3), equals([4, 5, 6]));
+ expect(await r.readChunk(3), equals([7, 8, 9]));
+ expect(await r.readChunk(3), equals([10]));
+ expect(await r.readChunk(1), equals([]));
+ expect(await r.readChunk(1), equals([]));
+ await r.cancel(); // check this is okay!
+ expect(await r.readChunk(1), equals([]));
+ });
}