Tweaks for MessageGrouper.
Change-Id: I1be1d7e998ee28414a455816383e5d0810d4ad9e
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/239403
Reviewed-by: Jake Macdonald <jakemac@google.com>
Commit-Queue: Konstantin Shcheglov <scheglov@google.com>
diff --git a/pkg/_fe_analyzer_shared/lib/src/macros/executor/message_grouper.dart b/pkg/_fe_analyzer_shared/lib/src/macros/executor/message_grouper.dart
index 7e2d390..382dd81 100644
--- a/pkg/_fe_analyzer_shared/lib/src/macros/executor/message_grouper.dart
+++ b/pkg/_fe_analyzer_shared/lib/src/macros/executor/message_grouper.dart
@@ -3,7 +3,6 @@
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
-import 'dart:math';
import 'dart:typed_data';
/// Collects messages from an input stream of bytes.
@@ -14,18 +13,11 @@
/// The input bytes stream subscription.
late final StreamSubscription _inputStreamSubscription;
- /// The length of the current message to read, or `-1` if we are currently
- /// reading the length.
- int _length = -1;
-
/// The buffer to store the length bytes in.
- BytesBuilder _lengthBuffer = new BytesBuilder();
+ final _FixedBuffer _lengthBuffer = new _FixedBuffer(4);
/// If reading raw data, buffer for the data.
- Uint8List _messageBuffer = new Uint8List(0);
-
- /// The position to write the next byte in [_messageBuffer].
- int _messagePos = 0;
+ _FixedBuffer? _messageBuffer;
late StreamController<Uint8List> _messageStreamController =
new StreamController<Uint8List>(onCancel: () {
@@ -38,45 +30,36 @@
}
void _handleBytes(List<int> bytes, [int offset = 0]) {
- if (_length == -1) {
- while (_lengthBuffer.length < 4 && offset < bytes.length) {
+ final _FixedBuffer? messageBuffer = _messageBuffer;
+ if (messageBuffer == null) {
+ while (offset < bytes.length && !_lengthBuffer.isReady) {
_lengthBuffer.addByte(bytes[offset++]);
}
- if (_lengthBuffer.length >= 4) {
- Uint8List lengthBytes = _lengthBuffer.takeBytes();
- _length = lengthBytes[0] << 24 |
- lengthBytes[1] << 16 |
- lengthBytes[2] << 8 |
- lengthBytes[3];
+ if (_lengthBuffer.isReady) {
+ int length = _lengthBuffer[0] << 24 |
+ _lengthBuffer[1] << 16 |
+ _lengthBuffer[2] << 8 |
+ _lengthBuffer[3];
+ // Reset the length reading state.
+ _lengthBuffer.reset();
+ // Switch to the message payload reading state.
+ _messageBuffer = new _FixedBuffer(length);
+ _handleBytes(bytes, offset);
+ } else {
+ // Continue reading the length.
+ return;
}
- }
+ } else {
+ // Read the data from `bytes`.
+ while (offset < bytes.length && !messageBuffer.isReady) {
+ messageBuffer.addByte(bytes[offset++]);
+ }
- // Just pass along `bytes` without a copy if we can, and reset our state
- if (offset == 0 && bytes.length == _length && bytes is Uint8List) {
- _length = -1;
- _messageStreamController.add(bytes);
- return;
- }
-
- // Initialize a new buffer.
- if (_messagePos == 0) {
- _messageBuffer = new Uint8List(_length);
- }
-
- // Read the data from `bytes`.
- int lenToRead = min(_length - _messagePos, bytes.length - offset);
- while (lenToRead-- > 0) {
- _messageBuffer[_messagePos++] = bytes[offset++];
- }
-
- // If we completed a message, add it to the output stream, reset our state,
- // and call ourselves again if we have more data to read.
- if (_messagePos >= _length) {
- _messageStreamController.add(_messageBuffer);
- _length = -1;
- _messagePos = 0;
-
- if (offset < bytes.length) {
+ // If we completed a message, add it to the output stream.
+ if (messageBuffer.isReady) {
+ _messageStreamController.add(messageBuffer.bytes);
+ // Switch to the length reading state.
+ _messageBuffer = null;
_handleBytes(bytes, offset);
}
}
@@ -89,3 +72,27 @@
_messageStreamController.close();
}
}
+
+/// A buffer of fixed length.
+class _FixedBuffer {
+ final Uint8List bytes;
+
+ /// The offset in [bytes].
+ int _offset = 0;
+
+ _FixedBuffer(int length) : bytes = new Uint8List(length);
+
+ /// Return `true` when the required number of bytes added.
+ bool get isReady => _offset == bytes.length;
+
+ int operator [](int index) => bytes[index];
+
+ void addByte(int byte) {
+ bytes[_offset++] = byte;
+ }
+
+ /// Reset the number of added bytes to zero.
+ void reset() {
+ _offset = 0;
+ }
+}