reorganize MessageGrouperState
diff --git a/lib/src/message_grouper_state.dart b/lib/src/message_grouper_state.dart
index 995ce5c..2c80fad 100644
--- a/lib/src/message_grouper_state.dart
+++ b/lib/src/message_grouper_state.dart
@@ -7,29 +7,11 @@
/// State held by the [MessageGrouper] while waiting for additional data to
/// arrive.
class MessageGrouperState {
- /// `true` means we are waiting to receive bytes of base-128 encoded length.
- /// Some bytes of length may have been received already.
- ///
- /// `false` means we are waiting to receive more bytes of message data. Some
- /// bytes of message data may have been received already.
- bool waitingForLength = true;
+ /// Reads the initial length.
+ _LengthReader _lengthReader;
- /// If [waitingForLength] is `true`, the decoded value of the length bytes
- /// received so far (if any). If [waitingForLength] is `false`, the decoded
- /// length that was most recently received.
- int length = 0;
-
- /// If [waitingForLength] is `true`, the amount by which the next received
- /// length byte must be left-shifted; otherwise undefined.
- int lengthShift = 0;
-
- /// If [waitingForLength] is `false`, a [Uint8List] which is ready to receive
- /// message data. Otherwise null.
- Uint8List message;
-
- /// If [waitingForLength] is `false`, the number of message bytes that have
- /// been received so far. Otherwise zero.
- int numMessageBytesReceived;
+ /// Reads messages from a stream of bytes.
+ _MessageReader _messageReader;
MessageGrouperState() {
reset();
@@ -40,40 +22,106 @@
/// Returns a [List<int>] of message bytes if [byte] was the last byte in a
/// message, otherwise returns [null].
List<int> handleInput(int byte) {
- if (waitingForLength) {
- length |= (byte & 0x7f) << lengthShift;
- if ((byte & 0x80) == 0) {
- waitingForLength = false;
- message = new Uint8List(length);
- if (length == 0) {
- // There is no message data to wait for, so just go ahead and deliver the
- // empty message.
- var messageToReturn = message;
- reset();
- return messageToReturn;
- }
- } else {
- lengthShift += 7;
+ if (!_lengthReader.done) {
+ _lengthReader.readByte(byte);
+ if (_lengthReader.done) {
+ _messageReader = new _MessageReader(_lengthReader.length);
}
} else {
- message[numMessageBytesReceived] = byte;
- numMessageBytesReceived++;
- if (numMessageBytesReceived == length) {
- var messageToReturn = message;
- reset();
- return messageToReturn;
- }
+ assert(_messageReader != null);
+ _messageReader.readByte(byte);
}
+
+ if (_lengthReader.done && _messageReader.done) {
+ var message = _messageReader.message;
+ reset();
+ return message;
+ }
+
return null;
}
- /// Reset the state so that we are ready to receive the next base-128 encoded
- /// length.
+ /// Reset the state so that we are ready to receive the next message.
void reset() {
- waitingForLength = true;
- length = 0;
- lengthShift = 0;
- message = null;
- numMessageBytesReceived = 0;
+ _lengthReader = new _LengthReader();
+ _messageReader = null;
+ }
+}
+
+/// Reads a length one byte at a time.
+///
+/// The base-128 encoding is in little-endian order, with the high bit set on
+/// all bytes but the last. This was chosen since it's the same as the
+/// base-128 encoding used by protobufs, so it allows a modest amount of code
+/// reuse at the other end of the protocol.
+class _LengthReader {
+ /// Whether or not we are done reading the length.
+ bool get done => _done;
+ bool _done = false;
+
+ /// If [_done] is `true`, the decoded value of the length bytes received so
+ /// far (if any). If [_done] is `false`, the decoded length that was most
+ /// recently received.
+ int _length = 0;
+
+ /// The length read in. You are only allowed to read this if [_done] is
+ /// `true`.
+ int get length {
+ assert(_done);
+ return _length;
+ }
+
+ /// If [_done] is `true`, the amount by which the next received
+ /// length byte must be left-shifted; otherwise undefined.
+ int _lengthShift = 0;
+
+ /// Read a single byte into [_length].
+ void readByte(int byte) {
+ assert(!_done);
+
+ _length |= (byte & 0x7f) << _lengthShift;
+
+ // Check for the last byte in the length.
+ if ((byte & 0x80) == 0) {
+ _done = true;
+ } else {
+ _lengthShift += 7;
+ }
+ }
+}
+
+/// Reads some number of bytes from a stream, one byte at a time.
+class _MessageReader {
+ /// Whether or not we are done reading bytes from the stream.
+ bool get done => _done;
+ bool _done;
+
+ /// The total length of the message to be read.
+ final int _length;
+
+ /// A [Uint8List] which holds the message data. You are only allowed to read
+ /// this if [_done] is `true`.
+ Uint8List get message {
+ assert(_done);
+ return _message;
+ }
+
+ Uint8List _message;
+
+ /// If [_done] is `false`, the number of message bytes that have been received
+ /// so far. Otherwise zero.
+ int _numMessageBytesReceived = 0;
+
+ _MessageReader(int length)
+ : _message = new Uint8List(length),
+ _length = length,
+ _done = length == 0;
+
+ /// Reads [byte] into [_message].
+ void readByte(int byte) {
+ assert(!done);
+
+ _message[_numMessageBytesReceived++] = byte;
+ if (_numMessageBytesReceived == _length) _done = true;
}
}