| // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| import 'dart:async'; |
| import 'dart:math'; |
| import 'dart:typed_data'; |
| |
| import 'package:async/async.dart'; |
| import 'package:pedantic/pedantic.dart'; |
| import 'package:protobuf/protobuf.dart'; |
| |
| import 'message_grouper.dart'; |
| |
| /// Collects stream data into messages by interpreting it as |
| /// base-128 encoded lengths interleaved with raw data. |
| class AsyncMessageGrouper implements MessageGrouper { |
| /// The input stream. |
| final StreamQueue<List<int>> _inputQueue; |
| |
| /// The current input buffer. |
| List<int> _inputBuffer = []; |
| |
| /// Position in the current input buffer. |
| int _inputBufferPos = 0; |
| |
| /// Completes after [cancel] is called or [inputStream] is closed. |
| Future<void> get done => _done.future; |
| final _done = Completer<void>(); |
| |
| /// Whether currently reading length or raw data. |
| bool _readingLength = true; |
| |
| /// If reading length, buffer to build up length one byte at a time until |
| /// done. |
| List<int> _lengthBuffer = []; |
| |
| /// If reading raw data, buffer for the data. |
| Uint8List _message = Uint8List(0); |
| |
| /// If reading raw data, position in the buffer. |
| int _messagePos = 0; |
| |
| AsyncMessageGrouper(Stream<List<int>> inputStream) |
| : _inputQueue = StreamQueue(inputStream); |
| |
| /// Returns the next full message that is received, or null if none are left. |
| @override |
| Future<List<int>?> get next async { |
| try { |
| // Loop while there is data in the input buffer or the input stream. |
| while ( |
| _inputBufferPos != _inputBuffer.length || await _inputQueue.hasNext) { |
| // If the input buffer is empty fill it from the input stream. |
| if (_inputBufferPos == _inputBuffer.length) { |
| _inputBuffer = await _inputQueue.next; |
| _inputBufferPos = 0; |
| } |
| |
| // Loop over the input buffer. Might return without reading the full |
| // buffer if a message completes. Then, this is tracked in |
| // `_inputBufferPos`. |
| while (_inputBufferPos != _inputBuffer.length) { |
| if (_readingLength) { |
| // Reading message length byte by byte. |
| var byte = _inputBuffer[_inputBufferPos++]; |
| _lengthBuffer.add(byte); |
| // Check for the last byte in the length, and then read it. |
| if ((byte & 0x80) == 0) { |
| var reader = CodedBufferReader(_lengthBuffer); |
| var length = reader.readInt32(); |
| _lengthBuffer = []; |
| |
| // Special case: don't keep reading an empty message, return it |
| // and `_readingLength` stays true. |
| if (length == 0) { |
| return Uint8List(0); |
| } |
| |
| // Switch to reading raw data. Allocate message buffer and reset |
| // `_messagePos`. |
| _readingLength = false; |
| _message = Uint8List(length); |
| _messagePos = 0; |
| } |
| } else { |
| // Copy as much as possible from the input buffer. Limit is the |
| // smaller of the remaining length to fill in the message and the |
| // remaining length in the buffer. |
| var lengthToCopy = min(_message.length - _messagePos, |
| _inputBuffer.length - _inputBufferPos); |
| _message.setRange( |
| _messagePos, |
| _messagePos + lengthToCopy, |
| _inputBuffer.sublist( |
| _inputBufferPos, _inputBufferPos + lengthToCopy)); |
| _messagePos += lengthToCopy; |
| _inputBufferPos += lengthToCopy; |
| |
| // If there is a complete message to return, return it and switch |
| // back to reading length. |
| if (_messagePos == _message.length) { |
| var result = _message; |
| // Don't keep a reference to the message. |
| _message = Uint8List(0); |
| _readingLength = true; |
| return result; |
| } |
| } |
| } |
| } |
| |
| // If there is nothing left in the queue then cancel the subscription. |
| unawaited(cancel()); |
| } catch (e) { |
| // It appears we sometimes get an exception instead of -1 as expected when |
| // stdin closes, this handles that in the same way (returning a null |
| // message) |
| return null; |
| } |
| } |
| |
| /// Stop listening to the stream for further updates. |
| Future cancel() { |
| if (!_done.isCompleted) { |
| _done.complete(null); |
| return _inputQueue.cancel()!; |
| } |
| return done; |
| } |
| } |