blob: 8650c7c2d4445178ff6b93ba9a7e85c1ef6d003d [file] [log] [blame]
import 'dart:async';
import 'dart:convert';
import 'dart:math';
import 'dart:typed_data';
import 'charcodes.dart';
import 'constants.dart';
import 'exception.dart';
const _checksumEnd = checksumOffset + checksumLength;
const _checksumPlaceholder = $space;
extension ByteBufferUtils on Uint8List {
String readString(int offset, int maxLength) {
return readStringOrNullIfEmpty(offset, maxLength) ?? '';
}
Uint8List sublistView(int start, [int? end]) {
return Uint8List.sublistView(this, start, end);
}
String? readStringOrNullIfEmpty(int offset, int maxLength) {
var data = sublistView(offset, offset + maxLength);
var contentLength = data.indexOf(0);
// If there's no \0, assume that the string fills the whole segment
if (contentLength.isNegative) contentLength = maxLength;
if (contentLength == 0) return null;
data = data.sublistView(0, contentLength);
try {
return utf8.decode(data);
} on FormatException {
return String.fromCharCodes(data).trim();
}
}
/// Parse an octal string encoded from index [offset] with the maximum length
/// [length].
int readOctal(int offset, int length) {
var result = 0;
var multiplier = 1;
for (var i = length - 1; i >= 0; i--) {
final charCode = this[offset + i];
// Some tar implementations add a \0 or space at the end, ignore that
if (charCode == 0 || charCode == $space) continue;
if (charCode < $0 || charCode > $9) {
throw TarException('Invalid octal value');
}
// Obtain the numerical value of this digit
final digit = charCode - $0;
result += digit * multiplier;
multiplier <<= 3; // Multiply by the base, 8
}
return result;
}
/// Parses an encoded int, either as base-256 or octal.
///
/// This function may return negative numbers.
int readNumeric(int offset, int length) {
if (length == 0) return 0;
// Check for base-256 (binary) format first. If the first bit is set, then
// all following bits constitute a two's complement encoded number in big-
// endian byte order.
final firstByte = this[offset];
if (firstByte & 0x80 != 0) {
// Handling negative numbers relies on the following identity:
// -a-1 == ~a
//
// If the number is negative, we use an inversion mask to invert the
// date bytes and treat the value as an unsigned number.
final inverseMask = firstByte & 0x40 != 0 ? 0xff : 0x00;
// Ignore signal bit in the first byte
var x = (firstByte ^ inverseMask) & 0x7f;
for (var i = 1; i < length; i++) {
var byte = this[offset + i];
byte ^= inverseMask;
x = x << 8 | byte;
}
return inverseMask == 0xff ? ~x : x;
}
return readOctal(offset, length);
}
int computeUnsignedHeaderChecksum() {
// Accessing the last element first helps the VM eliminate bounds checks in
// the loops below.
this[blockSize - 1];
var result = checksumLength * _checksumPlaceholder;
for (var i = 0; i < checksumOffset; i++) {
result += this[i];
}
for (var i = _checksumEnd; i < blockSize; i++) {
result += this[i];
}
return result;
}
int computeSignedHeaderChecksum() {
this[blockSize - 1];
// Note that _checksumPlaceholder.toSigned(8) == _checksumPlaceholder
var result = checksumLength * _checksumPlaceholder;
for (var i = 0; i < checksumOffset; i++) {
result += this[i];
}
for (var i = _checksumEnd; i < blockSize; i++) {
result += this[i];
}
return result;
}
bool get isAllZeroes {
for (var i = 0; i < length; i++) {
if (this[i] != 0) return false;
}
return true;
}
bool matchesHeader(List<int> header, {int offset = magicOffset}) {
for (var i = 0; i < header.length; i++) {
if (this[offset + i] != header[i]) return false;
}
return true;
}
}
bool isNotAscii(int i) => i > 128;
/// Like [int.parse], but throwing a [TarException] instead of the more-general
/// [FormatException] when it fails.
int parseInt(String source) {
return int.tryParse(source, radix: 10) ??
(throw TarException('Not an int: $source'));
}
/// Takes a [paxTimeString] of the form %d.%d as described in the PAX
/// specification. Note that this implementation allows for negative timestamps,
/// which is allowed for by the PAX specification, but not always portable.
///
/// Note that Dart's [DateTime] class only allows us to give up to microsecond
/// precision, which implies that we cannot parse all the digits in since PAX
/// allows for nanosecond level encoding.
DateTime parsePaxTime(String paxTimeString) {
const maxMicroSecondDigits = 6;
/// Split [paxTimeString] into seconds and sub-seconds parts.
var secondsString = paxTimeString;
var microSecondsString = '';
final position = paxTimeString.indexOf('.');
if (position >= 0) {
secondsString = paxTimeString.substring(0, position);
microSecondsString = paxTimeString.substring(position + 1);
}
/// Parse the seconds.
final seconds = int.tryParse(secondsString);
if (seconds == null) {
throw TarException.header('Invalid PAX time $paxTimeString detected!');
}
if (microSecondsString.replaceAll(RegExp('[0-9]'), '') != '') {
throw TarException.header(
'Invalid nanoseconds $microSecondsString detected');
}
microSecondsString = microSecondsString.padRight(maxMicroSecondDigits, '0');
microSecondsString = microSecondsString.substring(0, maxMicroSecondDigits);
var microSeconds =
microSecondsString.isEmpty ? 0 : int.parse(microSecondsString);
if (paxTimeString.startsWith('-')) microSeconds = -microSeconds;
return microsecondsSinceEpoch(microSeconds + seconds * pow(10, 6).toInt());
}
DateTime secondsSinceEpoch(int timestamp) {
return DateTime.fromMillisecondsSinceEpoch(timestamp * 1000, isUtc: true);
}
DateTime millisecondsSinceEpoch(int milliseconds) {
return DateTime.fromMillisecondsSinceEpoch(milliseconds, isUtc: true);
}
DateTime microsecondsSinceEpoch(int microseconds) {
return DateTime.fromMicrosecondsSinceEpoch(microseconds, isUtc: true);
}
int numBlocks(int fileSize) {
if (fileSize % blockSize == 0) return fileSize ~/ blockSize;
return fileSize ~/ blockSize + 1;
}
int nextBlockSize(int fileSize) => numBlocks(fileSize) * blockSize;
extension ToTyped on List<int> {
Uint8List asUint8List() {
// Flow analysis doesn't work on this.
final $this = this;
return $this is Uint8List ? $this : Uint8List.fromList(this);
}
}
/// Generates a chunked stream of [length] zeroes.
Stream<List<int>> zeroes(int length) async* {
// Emit data in chunks for efficiency
const chunkSize = 4 * 1024;
if (length < chunkSize) {
yield Uint8List(length);
return;
}
final chunk = Uint8List(chunkSize);
for (var i = 0; i < length ~/ chunkSize; i++) {
yield chunk;
}
final remainingBytes = length % chunkSize;
if (remainingBytes != 0) {
yield Uint8List(remainingBytes);
}
}
class StreamBlockReader {
final Stream<List<int>> _source;
StreamSubscription<List<int>>? _inputSubscription;
Completer<Uint8List>? _nextBlockCompleter;
StreamController<Uint8List>? _nextBlocksStream;
int _remainingBlocksInStream = 0;
Uint8List? _pendingBlock;
int _missingInPendingBlock = 0;
Uint8List? _waitingForDispatch;
int _offsetInWaiting = 0;
bool _closed = false;
StreamBlockReader(this._source);
int get _blocksToServe {
if (_nextBlockCompleter != null) {
return 1;
} else {
return _remainingBlocksInStream;
}
}
void _listenOrResume() {
final subsription = _inputSubscription;
if (subsription == null) {
_inputSubscription =
_source.listen(_onData, onError: _onError, onDone: _onDone);
} else {
subsription.resume();
}
}
/// Emits ready [blocks].
///
/// Returns whether we should pause emitting further data afterwards.
bool _emitBlocks(Uint8List blocks, {int amount = 1}) {
final stream = _nextBlocksStream;
final future = _nextBlockCompleter;
final subscription = _inputSubscription;
assert((future == null) ^ (stream == null));
assert(subscription != null);
final bool shouldPause;
if (stream != null) {
stream.add(blocks);
final remaining = _remainingBlocksInStream -= amount;
assert(remaining >= 0);
final done = remaining == 0;
if (done) {
stream.close();
_nextBlocksStream = null;
}
shouldPause = done || stream.isPausedOrClosed;
} else if (future != null) {
assert(amount == 1);
future.complete(blocks);
_nextBlockCompleter = null;
shouldPause = true;
} else {
// Exactly one of them must be non-null
throw AssertionError();
}
if (shouldPause && !subscription!.isPaused) {
subscription.pause();
}
return shouldPause;
}
void _startPendingBlock(Uint8List source, int startOffset) {
assert(_pendingBlock == null);
final availableData = source.length - startOffset;
assert(availableData < blockSize);
_pendingBlock = Uint8List(blockSize)
..setAll(0, source.sublistView(startOffset));
_missingInPendingBlock = blockSize - availableData;
}
void _saveNewUndispatchedState(Uint8List chunk, int offsetInChunk) {
assert(_waitingForDispatch == null);
_saveUndispatchedState(chunk, offsetInChunk);
}
void _saveUndispatchedState(Uint8List chunk, int offsetInChunk) {
assert(_pendingBlock == null);
final remaining = chunk.length - offsetInChunk;
if (remaining < blockSize) {
_waitingForDispatch = null;
_offsetInWaiting = 0;
_startPendingBlock(chunk, offsetInChunk);
} else {
_waitingForDispatch = chunk;
_offsetInWaiting = offsetInChunk;
}
}
void _onData(List<int> chunk) {
assert(_waitingForDispatch == null,
'Had pending data while receiving new event');
final typedChunk = chunk.asUint8List();
// The start offset of the new data that we didn't process yet.
var offsetInData = 0;
void savePendingChunk() {
_saveNewUndispatchedState(typedChunk, offsetInData);
}
// Try finishing the pending block first, if it exists.
if (_pendingBlock != null) {
final started = _pendingBlock!;
final missing = _missingInPendingBlock;
final startOffsetInStarted = blockSize - missing;
if (typedChunk.length >= missing) {
// We can complete the block
started.setAll(
startOffsetInStarted, typedChunk.sublistView(0, missing));
offsetInData = missing;
final done = _emitBlocks(started);
// We just finished serving the started block, so reset that
_pendingBlock = null;
_missingInPendingBlock = 0;
if (done) {
savePendingChunk();
return;
}
} else {
// We can't complete the pending block with the new data, but at least
// we can continue filling it up
started.setAll(startOffsetInStarted, typedChunk);
_missingInPendingBlock -= typedChunk.length;
assert(_missingInPendingBlock > 0);
return;
}
}
// When we get to this point, the started chunk has been completed and we
// can continue adding chunks as they come.
assert(_pendingBlock == null);
// Serve as many blocks as we can from the current chunk
final blocksToServe = _blocksToServe;
assert(blocksToServe >= 1);
final availableBlocksInChunk =
(typedChunk.length - offsetInData) ~/ blockSize;
final blocksToDeliverNow = min(blocksToServe, availableBlocksInChunk);
if (blocksToDeliverNow > 0) {
final length = blockSize * blocksToDeliverNow;
final end = offsetInData + length;
final done = _emitBlocks(typedChunk.sublistView(offsetInData, end),
amount: blocksToDeliverNow);
offsetInData = end;
// Once again, stop and save state if this has completed the output
if (done) {
savePendingChunk();
return;
}
}
// If we're here, the remaining data in the chunk was not large enough to
// serve a full block. Store the remaining data in a pending block.
_startPendingBlock(typedChunk, offsetInData);
}
void _onError(Object error, [StackTrace? trace]) {
final stream = _nextBlocksStream;
final future = _nextBlockCompleter;
if (stream != null) {
stream.addError(error, trace);
} else if (future != null) {
future.completeError(error, trace);
_nextBlockCompleter = null;
}
final subscription = _inputSubscription;
if (!subscription!.isPaused) {
subscription.pause();
}
}
void _onDone() {
if (_blocksToServe > 0) {
if (_pendingBlock != null) {
_emitBlocks(
_pendingBlock!.sublistView(0, blockSize - _missingInPendingBlock));
_pendingBlock = null;
_missingInPendingBlock = 0;
} else {
// Complete with an empty block
_emitBlocks(emptyUint8List);
}
_inputSubscription = null;
}
close();
}
void _checkIdle(String method) {
if (_nextBlockCompleter != null) {
throw StateError("Can't call $method before a previous call to "
'nextBlock() has completed.');
}
if (_nextBlocksStream != null) {
throw StateError("Can't call $method before a previous call to "
'nextBlocks() was completed.');
}
}
/// Reads the next block from the input stream.
///
/// The returned list may be empty or smaller than [blockSize] if the input
/// stream ends.
Future<Uint8List> nextBlock() {
_checkIdle('nextBlock');
if (_closed) return Future.value(emptyUint8List);
// If we have pending data to serve, do that before we start resuming the
// stream subscription
final waiting = _waitingForDispatch;
if (waiting != null) {
final offset = _offsetInWaiting;
assert(waiting.length - offset >= blockSize);
final end = offset + blockSize;
final result = Future<Uint8List>.value(waiting.sublistView(offset, end));
if (end == waiting.length) {
// We just completed all data waiting for dispatch
_waitingForDispatch = null;
_offsetInWaiting = 0;
} else {
// Store new pending data
_saveUndispatchedState(waiting, offset + blockSize);
}
return result;
}
// Note: Our logic in _emitBlocks depends on this completer NOT being sync
final next = _nextBlockCompleter = Completer();
_listenOrResume();
return next.future;
}
Stream<Uint8List> nextBlocks(int amount) {
_checkIdle('nextBlocks');
if (amount == 0 || _closed) return const Stream.empty();
// This controller must not be sync: We add events in response to onListen
// and expect a delay after closing it in _emitBlocks
final controller = _nextBlocksStream = StreamController();
_remainingBlocksInStream = amount;
void onListenOrResume() {
// Once again, check if we have pending data to serve
final waiting = _waitingForDispatch;
if (waiting != null) {
final offset = _offsetInWaiting;
final blocksWaiting = (waiting.length - offset) ~/ blockSize;
assert(blocksWaiting > 0);
final blocksToEmit = min(blocksWaiting, amount);
final end = offset + blocksToEmit * blockSize;
final done =
_emitBlocks(waiting.sublistView(offset, end), amount: blocksToEmit);
_saveUndispatchedState(waiting, end);
if (!done) {
_listenOrResume();
}
} else {
// No data waiting to be served, so we definitely have to start
// listening
_listenOrResume();
}
}
controller
..onListen = onListenOrResume
..onPause = () {
final sub = _inputSubscription;
if (!sub!.isPaused) sub.pause();
}
..onResume = onListenOrResume;
return controller.stream;
}
Future<void> close() async {
if (_closed) return;
_closed = true;
await _inputSubscription?.cancel();
_nextBlockCompleter?.complete(emptyUint8List);
final stream = _nextBlocksStream;
if (stream != null) {
await stream.close();
}
}
}
extension on StreamController<dynamic> {
bool get isPausedOrClosed => isPaused || isClosed;
}