| // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| import 'dart:typed_data'; |
| |
| import 'package:protobuf/protobuf.dart'; |
| |
| /// State held by the [MessageGrouper] while waiting for additional data to |
| /// arrive. |
| class MessageGrouperState { |
| /// Reads the initial length. |
| _LengthReader _lengthReader; |
| |
| /// Reads messages from a stream of bytes. |
| _MessageReader _messageReader; |
| |
| MessageGrouperState() { |
| reset(); |
| } |
| |
| /// Handle one byte at a time. |
| /// |
| /// Returns a [List<int>] of message bytes if [byte] was the last byte in a |
| /// message, otherwise returns [null]. |
| List<int> handleInput(int byte) { |
| if (!_lengthReader.done) { |
| _lengthReader.readByte(byte); |
| if (_lengthReader.done) { |
| _messageReader = new _MessageReader(_lengthReader.length); |
| } |
| } else { |
| assert(_messageReader != null); |
| _messageReader.readByte(byte); |
| } |
| |
| if (_lengthReader.done && _messageReader.done) { |
| var message = _messageReader.message; |
| reset(); |
| return message; |
| } |
| |
| return null; |
| } |
| |
| /// Reset the state so that we are ready to receive the next message. |
| void reset() { |
| _lengthReader = new _LengthReader(); |
| _messageReader = null; |
| } |
| } |
| |
| /// Reads a length one byte at a time. |
| /// |
| /// The base-128 encoding is in little-endian order, with the high bit set on |
| /// all bytes but the last. This was chosen since it's the same as the |
| /// base-128 encoding used by protobufs, so it allows a modest amount of code |
| /// reuse at the other end of the protocol. |
| class _LengthReader { |
| /// Whether or not we are done reading the length. |
| bool get done => _done; |
| bool _done = false; |
| |
| /// If [_done] is `true`, the decoded value of the length bytes received so |
| /// far (if any). If [_done] is `false`, the decoded length that was most |
| /// recently received. |
| int _length; |
| |
| /// The length read in. You are only allowed to read this if [_done] is |
| /// `true`. |
| int get length { |
| assert(_done); |
| return _length; |
| } |
| |
| final List<int> _buffer = <int>[]; |
| |
| /// Read a single byte into [_length]. |
| void readByte(int byte) { |
| assert(!_done); |
| _buffer.add(byte); |
| |
| // Check for the last byte in the length, and then read it. |
| if ((byte & 0x80) == 0) { |
| _done = true; |
| var reader = new CodedBufferReader(_buffer); |
| _length = reader.readInt32(); |
| } |
| } |
| } |
| |
| /// Reads some number of bytes from a stream, one byte at a time. |
| class _MessageReader { |
| /// Whether or not we are done reading bytes from the stream. |
| bool get done => _done; |
| bool _done; |
| |
| /// The total length of the message to be read. |
| final int _length; |
| |
| /// A [Uint8List] which holds the message data. You are only allowed to read |
| /// this if [_done] is `true`. |
| Uint8List get message { |
| assert(_done); |
| return _message; |
| } |
| |
| Uint8List _message; |
| |
| /// If [_done] is `false`, the number of message bytes that have been received |
| /// so far. Otherwise zero. |
| int _numMessageBytesReceived = 0; |
| |
| _MessageReader(int length) |
| : _message = new Uint8List(length), |
| _length = length, |
| _done = length == 0; |
| |
| /// Reads [byte] into [_message]. |
| void readByte(int byte) { |
| assert(!done); |
| |
| _message[_numMessageBytesReceived++] = byte; |
| if (_numMessageBytesReceived == _length) _done = true; |
| } |
| } |