Tweaks for MessageGrouper.

Change-Id: I1be1d7e998ee28414a455816383e5d0810d4ad9e
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/239403
Reviewed-by: Jake Macdonald <jakemac@google.com>
Commit-Queue: Konstantin Shcheglov <scheglov@google.com>
diff --git a/pkg/_fe_analyzer_shared/lib/src/macros/executor/message_grouper.dart b/pkg/_fe_analyzer_shared/lib/src/macros/executor/message_grouper.dart
index 7e2d390..382dd81 100644
--- a/pkg/_fe_analyzer_shared/lib/src/macros/executor/message_grouper.dart
+++ b/pkg/_fe_analyzer_shared/lib/src/macros/executor/message_grouper.dart
@@ -3,7 +3,6 @@
 // BSD-style license that can be found in the LICENSE file.
 
 import 'dart:async';
-import 'dart:math';
 import 'dart:typed_data';
 
 /// Collects messages from an input stream of bytes.
@@ -14,18 +13,11 @@
   /// The input bytes stream subscription.
   late final StreamSubscription _inputStreamSubscription;
 
-  /// The length of the current message to read, or `-1` if we are currently
-  /// reading the length.
-  int _length = -1;
-
   /// The buffer to store the length bytes in.
-  BytesBuilder _lengthBuffer = new BytesBuilder();
+  final _FixedBuffer _lengthBuffer = new _FixedBuffer(4);
 
   /// If reading raw data, buffer for the data.
-  Uint8List _messageBuffer = new Uint8List(0);
-
-  /// The position to write the next byte in [_messageBuffer].
-  int _messagePos = 0;
+  _FixedBuffer? _messageBuffer;
 
   late StreamController<Uint8List> _messageStreamController =
       new StreamController<Uint8List>(onCancel: () {
@@ -38,45 +30,36 @@
   }
 
   void _handleBytes(List<int> bytes, [int offset = 0]) {
-    if (_length == -1) {
-      while (_lengthBuffer.length < 4 && offset < bytes.length) {
+    final _FixedBuffer? messageBuffer = _messageBuffer;
+    if (messageBuffer == null) {
+      while (offset < bytes.length && !_lengthBuffer.isReady) {
         _lengthBuffer.addByte(bytes[offset++]);
       }
-      if (_lengthBuffer.length >= 4) {
-        Uint8List lengthBytes = _lengthBuffer.takeBytes();
-        _length = lengthBytes[0] << 24 |
-            lengthBytes[1] << 16 |
-            lengthBytes[2] << 8 |
-            lengthBytes[3];
+      if (_lengthBuffer.isReady) {
+        int length = _lengthBuffer[0] << 24 |
+            _lengthBuffer[1] << 16 |
+            _lengthBuffer[2] << 8 |
+            _lengthBuffer[3];
+        // Reset the length reading state.
+        _lengthBuffer.reset();
+        // Switch to the message payload reading state.
+        _messageBuffer = new _FixedBuffer(length);
+        _handleBytes(bytes, offset);
+      } else {
+        // Continue reading the length.
+        return;
       }
-    }
+    } else {
+      // Read the data from `bytes`.
+      while (offset < bytes.length && !messageBuffer.isReady) {
+        messageBuffer.addByte(bytes[offset++]);
+      }
 
-    // Just pass along `bytes` without a copy if we can, and reset our state
-    if (offset == 0 && bytes.length == _length && bytes is Uint8List) {
-      _length = -1;
-      _messageStreamController.add(bytes);
-      return;
-    }
-
-    // Initialize a new buffer.
-    if (_messagePos == 0) {
-      _messageBuffer = new Uint8List(_length);
-    }
-
-    // Read the data from `bytes`.
-    int lenToRead = min(_length - _messagePos, bytes.length - offset);
-    while (lenToRead-- > 0) {
-      _messageBuffer[_messagePos++] = bytes[offset++];
-    }
-
-    // If we completed a message, add it to the output stream, reset our state,
-    // and call ourselves again if we have more data to read.
-    if (_messagePos >= _length) {
-      _messageStreamController.add(_messageBuffer);
-      _length = -1;
-      _messagePos = 0;
-
-      if (offset < bytes.length) {
+      // If we completed a message, add it to the output stream.
+      if (messageBuffer.isReady) {
+        _messageStreamController.add(messageBuffer.bytes);
+        // Switch to the length reading state.
+        _messageBuffer = null;
         _handleBytes(bytes, offset);
       }
     }
@@ -89,3 +72,27 @@
     _messageStreamController.close();
   }
 }
+
+/// A buffer of fixed length.
+class _FixedBuffer {
+  final Uint8List bytes;
+
+  /// The offset in [bytes].
+  int _offset = 0;
+
+  _FixedBuffer(int length) : bytes = new Uint8List(length);
+
+  /// Return `true` when the required number of bytes added.
+  bool get isReady => _offset == bytes.length;
+
+  int operator [](int index) => bytes[index];
+
+  void addByte(int byte) {
+    bytes[_offset++] = byte;
+  }
+
+  /// Reset the number of added bytes to zero.
+  void reset() {
+    _offset = 0;
+  }
+}