reorganize MessageGrouperState
diff --git a/lib/src/message_grouper_state.dart b/lib/src/message_grouper_state.dart
index 995ce5c..2c80fad 100644
--- a/lib/src/message_grouper_state.dart
+++ b/lib/src/message_grouper_state.dart
@@ -7,29 +7,11 @@
 /// State held by the [MessageGrouper] while waiting for additional data to
 /// arrive.
 class MessageGrouperState {
-  /// `true` means we are waiting to receive bytes of base-128 encoded length.
-  /// Some bytes of length may have been received already.
-  ///
-  /// `false` means we are waiting to receive more bytes of message data.  Some
-  /// bytes of message data may have been received already.
-  bool waitingForLength = true;
+  /// Reads the initial length.
+  _LengthReader _lengthReader;
 
-  /// If [waitingForLength] is `true`, the decoded value of the length bytes
-  /// received so far (if any).  If [waitingForLength] is `false`, the decoded
-  /// length that was most recently received.
-  int length = 0;
-
-  /// If [waitingForLength] is `true`, the amount by which the next received
-  /// length byte must be left-shifted; otherwise undefined.
-  int lengthShift = 0;
-
-  /// If [waitingForLength] is `false`, a [Uint8List] which is ready to receive
-  /// message data.  Otherwise null.
-  Uint8List message;
-
-  /// If [waitingForLength] is `false`, the number of message bytes that have
-  /// been received so far.  Otherwise zero.
-  int numMessageBytesReceived;
+  /// Reads messages from a stream of bytes.
+  _MessageReader _messageReader;
 
   MessageGrouperState() {
     reset();
@@ -40,40 +22,106 @@
   /// Returns a [List<int>] of message bytes if [byte] was the last byte in a
   /// message, otherwise returns [null].
   List<int> handleInput(int byte) {
-    if (waitingForLength) {
-      length |= (byte & 0x7f) << lengthShift;
-      if ((byte & 0x80) == 0) {
-        waitingForLength = false;
-        message = new Uint8List(length);
-        if (length == 0) {
-          // There is no message data to wait for, so just go ahead and deliver the
-          // empty message.
-          var messageToReturn = message;
-          reset();
-          return messageToReturn;
-        }
-      } else {
-        lengthShift += 7;
+    if (!_lengthReader.done) {
+      _lengthReader.readByte(byte);
+      if (_lengthReader.done) {
+        _messageReader = new _MessageReader(_lengthReader.length);
       }
     } else {
-      message[numMessageBytesReceived] = byte;
-      numMessageBytesReceived++;
-      if (numMessageBytesReceived == length) {
-        var messageToReturn = message;
-        reset();
-        return messageToReturn;
-      }
+      assert(_messageReader != null);
+      _messageReader.readByte(byte);
     }
+
+    if (_lengthReader.done && _messageReader.done) {
+      var message = _messageReader.message;
+      reset();
+      return message;
+    }
+
     return null;
   }
 
-  /// Reset the state so that we are ready to receive the next base-128 encoded
-  /// length.
+  /// Reset the state so that we are ready to receive the next message.
   void reset() {
-    waitingForLength = true;
-    length = 0;
-    lengthShift = 0;
-    message = null;
-    numMessageBytesReceived = 0;
+    _lengthReader = new _LengthReader();
+    _messageReader = null;
+  }
+}
+
+/// Reads a length one byte at a time.
+///
+/// The base-128 encoding is in little-endian order, with the high bit set on
+/// all bytes but the last.  This was chosen since it's the same as the
+/// base-128 encoding used by protobufs, so it allows a modest amount of code
+/// reuse at the other end of the protocol.
+class _LengthReader {
+  /// Whether or not we are done reading the length.
+  bool get done => _done;
+  bool _done = false;
+
+  /// If [_done] is `true`, the decoded value of the length bytes received so
+  /// far (if any).  If [_done] is `false`, the decoded length that was most
+  /// recently received.
+  int _length = 0;
+
+  /// The length read in.  You are only allowed to read this if [_done] is
+  /// `true`.
+  int get length {
+    assert(_done);
+    return _length;
+  }
+
+  /// If [_done] is `true`, the amount by which the next received
+  /// length byte must be left-shifted; otherwise undefined.
+  int _lengthShift = 0;
+
+  /// Read a single byte into [_length].
+  void readByte(int byte) {
+    assert(!_done);
+
+    _length |= (byte & 0x7f) << _lengthShift;
+
+    // Check for the last byte in the length.
+    if ((byte & 0x80) == 0) {
+      _done = true;
+    } else {
+      _lengthShift += 7;
+    }
+  }
+}
+
+/// Reads some number of bytes from a stream, one byte at a time.
+class _MessageReader {
+  /// Whether or not we are done reading bytes from the stream.
+  bool get done => _done;
+  bool _done;
+
+  /// The total length of the message to be read.
+  final int _length;
+
+  /// A [Uint8List] which holds the message data. You are only allowed to read
+  /// this if [_done] is `true`.
+  Uint8List get message {
+    assert(_done);
+    return _message;
+  }
+
+  Uint8List _message;
+
+  /// If [_done] is `false`, the number of message bytes that have been received
+  /// so far.  Otherwise zero.
+  int _numMessageBytesReceived = 0;
+
+  _MessageReader(int length)
+      : _message = new Uint8List(length),
+        _length = length,
+        _done = length == 0;
+
+  /// Reads [byte] into [_message].
+  void readByte(int byte) {
+    assert(!done);
+
+    _message[_numMessageBytesReceived++] = byte;
+    if (_numMessageBytesReceived == _length) _done = true;
   }
 }