blob: 382dd8107af32745ba1f17f65704b16fe667acad [file] [log] [blame]
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:typed_data';
/// Collects messages from an input stream of bytes.
///
/// Each message should start with a 32 bit big endian uint indicating its size,
/// followed by that many bytes.
class MessageGrouper {
/// The input bytes stream subscription.
late final StreamSubscription _inputStreamSubscription;
/// The buffer to store the length bytes in.
final _FixedBuffer _lengthBuffer = new _FixedBuffer(4);
/// If reading raw data, buffer for the data.
_FixedBuffer? _messageBuffer;
late StreamController<Uint8List> _messageStreamController =
new StreamController<Uint8List>(onCancel: () {
_inputStreamSubscription.cancel();
});
Stream<Uint8List> get messageStream => _messageStreamController.stream;
MessageGrouper(Stream<List<int>> inputStream) {
_inputStreamSubscription = inputStream.listen(_handleBytes, onDone: cancel);
}
void _handleBytes(List<int> bytes, [int offset = 0]) {
final _FixedBuffer? messageBuffer = _messageBuffer;
if (messageBuffer == null) {
while (offset < bytes.length && !_lengthBuffer.isReady) {
_lengthBuffer.addByte(bytes[offset++]);
}
if (_lengthBuffer.isReady) {
int length = _lengthBuffer[0] << 24 |
_lengthBuffer[1] << 16 |
_lengthBuffer[2] << 8 |
_lengthBuffer[3];
// Reset the length reading state.
_lengthBuffer.reset();
// Switch to the message payload reading state.
_messageBuffer = new _FixedBuffer(length);
_handleBytes(bytes, offset);
} else {
// Continue reading the length.
return;
}
} else {
// Read the data from `bytes`.
while (offset < bytes.length && !messageBuffer.isReady) {
messageBuffer.addByte(bytes[offset++]);
}
// If we completed a message, add it to the output stream.
if (messageBuffer.isReady) {
_messageStreamController.add(messageBuffer.bytes);
// Switch to the length reading state.
_messageBuffer = null;
_handleBytes(bytes, offset);
}
}
}
/// Stop listening to the input stream for further updates, and close the
/// output stream.
void cancel() {
_inputStreamSubscription.cancel();
_messageStreamController.close();
}
}
/// A buffer of fixed length.
class _FixedBuffer {
final Uint8List bytes;
/// The offset in [bytes].
int _offset = 0;
_FixedBuffer(int length) : bytes = new Uint8List(length);
/// Return `true` when the required number of bytes added.
bool get isReady => _offset == bytes.length;
int operator [](int index) => bytes[index];
void addByte(int byte) {
bytes[_offset++] = byte;
}
/// Reset the number of added bytes to zero.
void reset() {
_offset = 0;
}
}