Faster ChunkedStreamReader. (#182)

* Faster ChunkedStreamReader.

 * Add an internal `_offset` to track offset in `_buffer`, reducing the
   number of times we need to create a sublist internally.
 * Specialize to handle cases where `_buffer` is a `Uint8List` by
   creating a `Uint8List.sublistView` when we need to split a chunk.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 58d6f9b..2e366fe 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 2.7.0-dev
+
+* Improve performance for `ChunkedStreamReader` by creating fewer internal
+  sublists and specializing to create views for `Uint8List` chunks.
+
 ## 2.7.0
 
 * Add a `Stream.slices()` extension method.
diff --git a/lib/src/chunked_stream_reader.dart b/lib/src/chunked_stream_reader.dart
index 855a772..a896341 100644
--- a/lib/src/chunked_stream_reader.dart
+++ b/lib/src/chunked_stream_reader.dart
@@ -38,9 +38,34 @@
 /// The read-operations [readChunk] and [readStream] must not be invoked until
 /// the future from a previous call has completed.
 class ChunkedStreamReader<T> {
+  /// Iterator over underlying stream.
+  ///
+  /// The reader requests data from this input whenever requests on the
+  /// reader cannot be fulfilled with the already fetched data.
   final StreamIterator<List<T>> _input;
+
+  /// Sentinel value used for [_buffer] when we have no value.
   final List<T> _emptyList = const [];
+
+  /// Last partially consumed chunk received from [_input].
+  ///
+  /// Elements up to [_offset] have already been consumed and should not be
+  /// consumed again.
   List<T> _buffer = <T>[];
+
+  /// Offset into [_buffer] after data which have already been emitted.
+  ///
+  /// The offset is between `0` and `_buffer.length`, both inclusive.
+  /// The data in [_buffer] from [_offset] and forward have not yet been
+  /// emitted by the chunked stream reader, the data before [_offset] has.
+  int _offset = 0;
+
+  /// Whether a read request is currently being processed.
+  ///
+  /// Is `true` while a request is in progress.
+  /// While a read request, like [readChunk] or [readStream], is being processed,
+  /// no new requests can be made.
+  /// New read attempts will throw instead.
   bool _reading = false;
 
   factory ChunkedStreamReader(Stream<List<T>> stream) =>
@@ -96,8 +121,9 @@
     final substream = () async* {
       // While we have data to read
       while (size > 0) {
-        // Read something into the buffer, if it's empty
-        if (_buffer.isEmpty) {
+        // Read something into the buffer, if buffer has been consumed.
+        assert(_offset <= _buffer.length);
+        if (_offset == _buffer.length) {
           if (!(await _input.moveNext())) {
             // Don't attempt to read more data, as there is no more data.
             size = 0;
@@ -105,21 +131,30 @@
             break;
           }
           _buffer = _input.current;
+          _offset = 0;
         }
 
-        if (_buffer.isNotEmpty) {
-          if (size < _buffer.length) {
-            final output = _buffer.sublist(0, size);
-            _buffer = _buffer.sublist(size);
+        final remainingBuffer = _buffer.length - _offset;
+        if (remainingBuffer > 0) {
+          if (remainingBuffer >= size) {
+            List<T> output;
+            if (_buffer is Uint8List) {
+              output = Uint8List.sublistView(
+                  _buffer as Uint8List, _offset, _offset + size) as List<T>;
+            } else {
+              output = _buffer.sublist(_offset, _offset + size);
+            }
+            _offset += size;
             size = 0;
             yield output;
             _reading = false;
             break;
           }
 
-          final output = _buffer;
-          size -= _buffer.length;
+          final output = _offset == 0 ? _buffer : _buffer.sublist(_offset);
+          size -= remainingBuffer;
           _buffer = _emptyList;
+          _offset = 0;
           yield output;
         }
       }
@@ -129,22 +164,26 @@
     c.onListen = () => c.addStream(substream()).whenComplete(c.close);
     c.onCancel = () async {
       while (size > 0) {
-        if (_buffer.isEmpty) {
+        assert(_offset <= _buffer.length);
+        if (_buffer.length == _offset) {
           if (!await _input.moveNext()) {
             size = 0; // no more data
             break;
           }
           _buffer = _input.current;
+          _offset = 0;
         }
 
-        if (size < _buffer.length) {
-          _buffer = _buffer.sublist(size);
+        final remainingBuffer = _buffer.length - _offset;
+        if (remainingBuffer >= size) {
+          _offset += size;
           size = 0;
           break;
         }
 
-        size -= _buffer.length;
+        size -= remainingBuffer;
         _buffer = _emptyList;
+        _offset = 0;
       }
       _reading = false;
     };
diff --git a/pubspec.yaml b/pubspec.yaml
index 13e7281..79dc278 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 2.7.0
+version: 2.7.1-dev
 
 description: Utility functions and classes related to the 'dart:async' library.
 repository: https://github.com/dart-lang/async
diff --git a/test/chunked_stream_reader.dart b/test/chunked_stream_reader.dart
index 7dcd408..d80cf83 100644
--- a/test/chunked_stream_reader.dart
+++ b/test/chunked_stream_reader.dart
@@ -377,4 +377,106 @@
 
     expect(await collectBytes(stream), hasLength(lessThan(2)));
   });
+
+  test('readChunk() chunk by chunk (Uint8List)', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield Uint8List.fromList([1, 2]);
+      yield Uint8List.fromList([3, 4, 5]);
+      yield Uint8List.fromList([6, 7, 8, 9]);
+      yield Uint8List.fromList([10]);
+    }());
+
+    expect(await r.readChunk(2), equals([1, 2]));
+    expect(await r.readChunk(3), equals([3, 4, 5]));
+    expect(await r.readChunk(4), equals([6, 7, 8, 9]));
+    expect(await r.readChunk(1), equals([10]));
+    expect(await r.readChunk(1), equals([]));
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('readChunk() element by element (Uint8List)', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield Uint8List.fromList([1, 2]);
+      yield Uint8List.fromList([3, 4, 5]);
+      yield Uint8List.fromList([6, 7, 8, 9]);
+      yield Uint8List.fromList([10]);
+    }());
+
+    for (var i = 0; i < 10; i++) {
+      expect(await r.readChunk(1), equals([i + 1]));
+    }
+    expect(await r.readChunk(1), equals([]));
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('readChunk() exact elements (Uint8List)', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield Uint8List.fromList([1, 2]);
+      yield Uint8List.fromList([3, 4, 5]);
+      yield Uint8List.fromList([6, 7, 8, 9]);
+      yield Uint8List.fromList([10]);
+    }());
+
+    expect(await r.readChunk(10), equals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
+    expect(await r.readChunk(1), equals([]));
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('readChunk() past end (Uint8List)', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield Uint8List.fromList([1, 2]);
+      yield Uint8List.fromList([3, 4, 5]);
+      yield Uint8List.fromList([6, 7, 8, 9]);
+      yield Uint8List.fromList([10]);
+    }());
+
+    expect(await r.readChunk(20), equals([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
+    expect(await r.readChunk(1), equals([]));
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('readChunk() chunks of 2 elements (Uint8List)', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield Uint8List.fromList([1, 2]);
+      yield Uint8List.fromList([3, 4, 5]);
+      yield Uint8List.fromList([6, 7, 8, 9]);
+      yield Uint8List.fromList([10]);
+    }());
+
+    expect(await r.readChunk(2), equals([1, 2]));
+    expect(await r.readChunk(2), equals([3, 4]));
+    expect(await r.readChunk(2), equals([5, 6]));
+    expect(await r.readChunk(2), equals([7, 8]));
+    expect(await r.readChunk(2), equals([9, 10]));
+    expect(await r.readChunk(1), equals([]));
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
+
+  test('readChunk() chunks of 3 elements (Uint8List)', () async {
+    final r = ChunkedStreamReader(() async* {
+      yield Uint8List.fromList([1, 2]);
+      yield Uint8List.fromList([3, 4, 5]);
+      yield Uint8List.fromList([6, 7, 8, 9]);
+      yield Uint8List.fromList([10]);
+    }());
+
+    expect(await r.readChunk(3), equals([1, 2, 3]));
+    expect(await r.readChunk(3), equals([4, 5, 6]));
+    expect(await r.readChunk(3), equals([7, 8, 9]));
+    expect(await r.readChunk(3), equals([10]));
+    expect(await r.readChunk(1), equals([]));
+    expect(await r.readChunk(1), equals([]));
+    await r.cancel(); // check this is okay!
+    expect(await r.readChunk(1), equals([]));
+  });
 }