blob: 7e2d39088dff241cf2d1802e6141e225ab018934 [file] [log] [blame]
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:math';
import 'dart:typed_data';
/// Collects messages from an input stream of bytes.
///
/// Each message should start with a 32 bit big endian uint indicating its size,
/// followed by that many bytes.
class MessageGrouper {
/// The input bytes stream subscription.
late final StreamSubscription _inputStreamSubscription;
/// The length of the current message to read, or `-1` if we are currently
/// reading the length.
int _length = -1;
/// The buffer to store the length bytes in.
BytesBuilder _lengthBuffer = new BytesBuilder();
/// If reading raw data, buffer for the data.
Uint8List _messageBuffer = new Uint8List(0);
/// The position to write the next byte in [_messageBuffer].
int _messagePos = 0;
late StreamController<Uint8List> _messageStreamController =
new StreamController<Uint8List>(onCancel: () {
_inputStreamSubscription.cancel();
});
Stream<Uint8List> get messageStream => _messageStreamController.stream;
MessageGrouper(Stream<List<int>> inputStream) {
_inputStreamSubscription = inputStream.listen(_handleBytes, onDone: cancel);
}
void _handleBytes(List<int> bytes, [int offset = 0]) {
if (_length == -1) {
while (_lengthBuffer.length < 4 && offset < bytes.length) {
_lengthBuffer.addByte(bytes[offset++]);
}
if (_lengthBuffer.length >= 4) {
Uint8List lengthBytes = _lengthBuffer.takeBytes();
_length = lengthBytes[0] << 24 |
lengthBytes[1] << 16 |
lengthBytes[2] << 8 |
lengthBytes[3];
}
}
// Just pass along `bytes` without a copy if we can, and reset our state
if (offset == 0 && bytes.length == _length && bytes is Uint8List) {
_length = -1;
_messageStreamController.add(bytes);
return;
}
// Initialize a new buffer.
if (_messagePos == 0) {
_messageBuffer = new Uint8List(_length);
}
// Read the data from `bytes`.
int lenToRead = min(_length - _messagePos, bytes.length - offset);
while (lenToRead-- > 0) {
_messageBuffer[_messagePos++] = bytes[offset++];
}
// If we completed a message, add it to the output stream, reset our state,
// and call ourselves again if we have more data to read.
if (_messagePos >= _length) {
_messageStreamController.add(_messageBuffer);
_length = -1;
_messagePos = 0;
if (offset < bytes.length) {
_handleBytes(bytes, offset);
}
}
}
/// Stop listening to the input stream for further updates, and close the
/// output stream.
void cancel() {
_inputStreamSubscription.cancel();
_messageStreamController.close();
}
}