Optimize `AsyncMessageGrouper`. (#54)

Add a benchmark for a large (~600k) request.

Before this change, grouping the message takes around 14ms. After this change it takes around 1ms.

Deserializing is not affected, but is benchmarked too as it is always needed and so is useful to compare. It takes around 2ms.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 74cd91b..b84eebc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 1.0.0
+
+* Improve `AsyncMessageGrouper` performance.
+* Add `benchmark/benchmark.dart` measuring `AsyncMessageGrouper` performance.
+
 ## 1.0.0-nullsafety.0
 
 * Migrate to null safety.
diff --git a/benchmark/benchmark.dart b/benchmark/benchmark.dart
new file mode 100644
index 0000000..984d8aa
--- /dev/null
+++ b/benchmark/benchmark.dart
@@ -0,0 +1,56 @@
+import 'dart:math';
+import 'dart:typed_data';
+
+import 'package:bazel_worker/bazel_worker.dart';
+import 'package:bazel_worker/src/async_message_grouper.dart';
+
+/// Benchmark for `AsyncMessageGrouper`.
+Future<void> main() async {
+  // Create a large work request with 10,000 inputs.
+  var workRequest = WorkRequest();
+  for (var i = 0; i != 10000; ++i) {
+    var path = 'blaze-bin/some/path/to/a/file/that/is/an/input/$i';
+    workRequest
+      ..arguments.add('--input=$path')
+      ..inputs.add(Input()
+        ..path = ''
+        ..digest.addAll(List.filled(70, 0x11)));
+  }
+
+  // Serialize it.
+  var requestBytes = workRequest.writeToBuffer();
+  var length = requestBytes.length;
+  print('Request has $length requestBytes.');
+
+  // Add the length in front base 128 encoded as in the worker protocol.
+  requestBytes =
+      Uint8List.fromList(requestBytes.toList()..insertAll(0, _varInt(length)));
+
+  // Split into 10000 byte chunks.
+  var lists = <Uint8List>[];
+  for (var i = 0; i < requestBytes.length; i += 10000) {
+    lists.add(Uint8List.sublistView(
+        requestBytes, i, min(i + 10000, requestBytes.length)));
+  }
+
+  // Time `AsyncMessageGrouper` and deserialization.
+  for (var i = 0; i != 30; ++i) {
+    var stopwatch = Stopwatch()..start();
+    var asyncGrouper = AsyncMessageGrouper(Stream.fromIterable(lists));
+    var message = (await asyncGrouper.next)!;
+    print('Grouped in ${stopwatch.elapsedMilliseconds}ms');
+    stopwatch.reset();
+    WorkRequest.fromBuffer(message);
+    print('Deserialized in ${stopwatch.elapsedMilliseconds}ms');
+  }
+}
+
+Uint8List _varInt(int value) {
+  var result = <int>[];
+  while (value >= 0x80) {
+    result.add(0x80 | (value & 0x7f));
+    value >>= 7;
+  }
+  result.add(value);
+  return Uint8List.fromList(result);
+}
diff --git a/lib/src/async_message_grouper.dart b/lib/src/async_message_grouper.dart
index b2ac9c3..d171a5d 100644
--- a/lib/src/async_message_grouper.dart
+++ b/lib/src/async_message_grouper.dart
@@ -3,30 +3,44 @@
 // BSD-style license that can be found in the LICENSE file.
 
 import 'dart:async';
-import 'dart:collection';
+import 'dart:math';
+import 'dart:typed_data';
 
 import 'package:async/async.dart';
 import 'package:pedantic/pedantic.dart';
+import 'package:protobuf/protobuf.dart';
 
 import 'message_grouper.dart';
-import 'message_grouper_state.dart';
 
 /// Collects stream data into messages by interpreting it as
 /// base-128 encoded lengths interleaved with raw data.
 class AsyncMessageGrouper implements MessageGrouper {
-  /// Current state for reading in messages;
-  final _state = MessageGrouperState();
-
   /// The input stream.
   final StreamQueue<List<int>> _inputQueue;
 
-  /// The current buffer.
-  final Queue<int> _buffer = Queue<int>();
+  /// The current input buffer.
+  List<int> _inputBuffer = [];
+
+  /// Position in the current input buffer.
+  int _inputBufferPos = 0;
 
   /// Completes after [cancel] is called or [inputStream] is closed.
   Future<void> get done => _done.future;
   final _done = Completer<void>();
 
+  /// Whether currently reading length or raw data.
+  bool _readingLength = true;
+
+  /// If reading length, buffer to build up length one byte at a time until
+  /// done.
+  List<int> _lengthBuffer = [];
+
+  /// If reading raw data, buffer for the data.
+  Uint8List _message = Uint8List(0);
+
+  /// If reading raw data, position in the buffer.
+  int _messagePos = 0;
+
   AsyncMessageGrouper(Stream<List<int>> inputStream)
       : _inputQueue = StreamQueue(inputStream);
 
@@ -34,19 +48,70 @@
   @override
   Future<List<int>?> get next async {
     try {
-      List<int>? message;
-      while (message == null &&
-          (_buffer.isNotEmpty || await _inputQueue.hasNext)) {
-        if (_buffer.isEmpty) _buffer.addAll(await _inputQueue.next);
-        var nextByte = _buffer.removeFirst();
-        if (nextByte == -1) return null;
-        message = _state.handleInput(nextByte);
+      // Loop while there is data in the input buffer or the input stream.
+      while (
+          _inputBufferPos != _inputBuffer.length || await _inputQueue.hasNext) {
+        // If the input buffer is empty fill it from the input stream.
+        if (_inputBufferPos == _inputBuffer.length) {
+          _inputBuffer = await _inputQueue.next;
+          _inputBufferPos = 0;
+        }
+
+        // Loop over the input buffer. Might return without reading the full
+        // buffer if a message completes. Then, this is tracked in
+        // `_inputBufferPos`.
+        while (_inputBufferPos != _inputBuffer.length) {
+          if (_readingLength) {
+            // Reading message length byte by byte.
+            var byte = _inputBuffer[_inputBufferPos++];
+            _lengthBuffer.add(byte);
+            // Check for the last byte in the length, and then read it.
+            if ((byte & 0x80) == 0) {
+              var reader = CodedBufferReader(_lengthBuffer);
+              var length = reader.readInt32();
+              _lengthBuffer = [];
+
+              // Special case: don't keep reading an empty message, return it
+              // and `_readingLength` stays true.
+              if (length == 0) {
+                return Uint8List(0);
+              }
+
+              // Switch to reading raw data. Allocate message buffer and reset
+              // `_messagePos`.
+              _readingLength = false;
+              _message = Uint8List(length);
+              _messagePos = 0;
+            }
+          } else {
+            // Copy as much as possible from the input buffer. Limit is the
+            // smaller of the remaining length to fill in the message and the
+            // remaining length in the buffer.
+            var lengthToCopy = min(_message.length - _messagePos,
+                _inputBuffer.length - _inputBufferPos);
+            _message.setRange(
+                _messagePos,
+                _messagePos + lengthToCopy,
+                _inputBuffer.sublist(
+                    _inputBufferPos, _inputBufferPos + lengthToCopy));
+            _messagePos += lengthToCopy;
+            _inputBufferPos += lengthToCopy;
+
+            // If there is a complete message to return, return it and switch
+            // back to reading length.
+            if (_messagePos == _message.length) {
+              var result = _message;
+              // Don't keep a reference to the message.
+              _message = Uint8List(0);
+              _readingLength = true;
+              return result;
+            }
+          }
+        }
       }
 
       // If there is nothing left in the queue then cancel the subscription.
-      if (message == null) unawaited(cancel());
-
-      return message;
+      unawaited(cancel());
     } catch (e) {
       // It appears we sometimes get an exception instead of -1 as expected when
       // stdin closes, this handles that in the same way (returning a null
diff --git a/pubspec.yaml b/pubspec.yaml
index 7b549bc..92c903c 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,16 +1,16 @@
 name: bazel_worker
-version: 1.0.0-nullsafety.0
+version: 1.0.0
 
 description: Tools for creating a bazel persistent worker.
 homepage: https://github.com/dart-lang/bazel_worker
 
 environment:
-  sdk: '>=2.12.0-0 <3.0.0'
+  sdk: '>=2.12.0 <3.0.0'
 
 dependencies:
-  async: '>=2.5.0-nullsafety <3.0.0'
-  pedantic: ^1.10.0-nullsafety
-  protobuf: '>=2.0.0-nullsafety <3.0.0'
+  async: '>=2.5.0 <3.0.0'
+  pedantic: ^1.10.0
+  protobuf: '>=2.0.0 <3.0.0'
 
 dev_dependencies:
-  test: ^1.16.0-nullsafety
+  test: ^1.16.0