blob: 4fa75b1caacb0f6432d2d3494070cf15ef7bc3af [file] [log] [blame]
import 'dart:async';
import 'dart:convert';
import 'dart:math';
import 'dart:typed_data';
import 'charcodes.dart';
import 'constants.dart';
import 'exception.dart';
const _checksumEnd = checksumOffset + checksumLength;
const _checksumPlaceholder = $space;
extension ByteBufferUtils on Uint8List {
String readString(int offset, int maxLength) {
return readStringOrNullIfEmpty(offset, maxLength) ?? '';
}
Uint8List sublistView(int start, [int? end]) {
return Uint8List.sublistView(this, start, end);
}
String? readStringOrNullIfEmpty(int offset, int maxLength) {
var data = sublistView(offset, offset + maxLength);
var contentLength = data.indexOf(0);
// If there's no \0, assume that the string fills the whole segment
if (contentLength.isNegative) contentLength = maxLength;
if (contentLength == 0) return null;
data = data.sublistView(0, contentLength);
try {
return utf8.decode(data);
} on FormatException {
return String.fromCharCodes(data).trim();
}
}
/// Parse an octal string encoded from index [offset] with the maximum length
/// [length].
int readOctal(int offset, int length) {
var result = 0;
var multiplier = 1;
for (var i = length - 1; i >= 0; i--) {
final charCode = this[offset + i];
// Some tar implementations add a \0 or space at the end, ignore that
if (charCode == 0 || charCode == $space) continue;
if (charCode < $0 || charCode > $9) {
throw TarException('Invalid octal value');
}
// Obtain the numerical value of this digit
final digit = charCode - $0;
result += digit * multiplier;
multiplier <<= 3; // Multiply by the base, 8
}
return result;
}
/// Parses an encoded int, either as base-256 or octal.
///
/// This function may return negative numbers.
int readNumeric(int offset, int length) {
if (length == 0) return 0;
// Check for base-256 (binary) format first. If the first bit is set, then
// all following bits constitute a two's complement encoded number in big-
// endian byte order.
final firstByte = this[offset];
if (firstByte & 0x80 != 0) {
// Handling negative numbers relies on the following identity:
// -a-1 == ~a
//
// If the number is negative, we use an inversion mask to invert the
// date bytes and treat the value as an unsigned number.
final inverseMask = firstByte & 0x40 != 0 ? 0xff : 0x00;
// Ignore signal bit in the first byte
var x = (firstByte ^ inverseMask) & 0x7f;
for (var i = 1; i < length; i++) {
var byte = this[offset + i];
byte ^= inverseMask;
x = x << 8 | byte;
}
return inverseMask == 0xff ? ~x : x;
}
return readOctal(offset, length);
}
int computeUnsignedHeaderChecksum() {
// Accessing the last element first helps the VM eliminate bounds checks in
// the loops below.
this[blockSize - 1];
var result = checksumLength * _checksumPlaceholder;
for (var i = 0; i < checksumOffset; i++) {
result += this[i];
}
for (var i = _checksumEnd; i < blockSize; i++) {
result += this[i];
}
return result;
}
int computeSignedHeaderChecksum() {
this[blockSize - 1];
// Note that _checksumPlaceholder.toSigned(8) == _checksumPlaceholder
var result = checksumLength * _checksumPlaceholder;
for (var i = 0; i < checksumOffset; i++) {
result += this[i].toSigned(8);
}
for (var i = _checksumEnd; i < blockSize; i++) {
result += this[i].toSigned(8);
}
return result;
}
bool matchesHeader(List<int> header, {int offset = magicOffset}) {
for (var i = 0; i < header.length; i++) {
if (this[offset + i] != header[i]) return false;
}
return true;
}
bool get isAllZeroes {
for (var i = 0; i < length; i++) {
if (this[i] != 0) return false;
}
return true;
}
}
bool isNotAscii(int i) => i > 128;
/// Like [int.parse], but throwing a [TarException] instead of the more-general
/// [FormatException] when it fails.
int parseInt(String source) {
return int.tryParse(source, radix: 10) ??
(throw TarException('Not an int: $source'));
}
/// Takes a [paxTimeString] of the form %d.%d as described in the PAX
/// specification. Note that this implementation allows for negative timestamps,
/// which is allowed for by the PAX specification, but not always portable.
///
/// Note that Dart's [DateTime] class only allows us to give up to microsecond
/// precision, which implies that we cannot parse all the digits in since PAX
/// allows for nanosecond level encoding.
DateTime parsePaxTime(String paxTimeString) {
const maxMicroSecondDigits = 6;
/// Split [paxTimeString] into seconds and sub-seconds parts.
var secondsString = paxTimeString;
var microSecondsString = '';
final position = paxTimeString.indexOf('.');
if (position >= 0) {
secondsString = paxTimeString.substring(0, position);
microSecondsString = paxTimeString.substring(position + 1);
}
/// Parse the seconds.
final seconds = int.tryParse(secondsString);
if (seconds == null) {
throw TarException.header('Invalid PAX time $paxTimeString detected!');
}
if (microSecondsString.replaceAll(RegExp('[0-9]'), '') != '') {
throw TarException.header(
'Invalid nanoseconds $microSecondsString detected');
}
microSecondsString = microSecondsString.padRight(maxMicroSecondDigits, '0');
microSecondsString = microSecondsString.substring(0, maxMicroSecondDigits);
var microSeconds =
microSecondsString.isEmpty ? 0 : int.parse(microSecondsString);
if (paxTimeString.startsWith('-')) microSeconds = -microSeconds;
return microsecondsSinceEpoch(microSeconds + seconds * pow(10, 6).toInt());
}
DateTime secondsSinceEpoch(int timestamp) {
return DateTime.fromMillisecondsSinceEpoch(timestamp * 1000, isUtc: true);
}
DateTime millisecondsSinceEpoch(int milliseconds) {
return DateTime.fromMillisecondsSinceEpoch(milliseconds, isUtc: true);
}
DateTime microsecondsSinceEpoch(int microseconds) {
return DateTime.fromMicrosecondsSinceEpoch(microseconds, isUtc: true);
}
int numBlocks(int fileSize) {
if (fileSize % blockSize == 0) return fileSize ~/ blockSize;
return fileSize ~/ blockSize + 1;
}
int nextBlockSize(int fileSize) => numBlocks(fileSize) * blockSize;
extension ToTyped on List<int> {
Uint8List asUint8List() {
// Flow analysis doesn't work on this.
final $this = this;
return $this is Uint8List ? $this : Uint8List.fromList(this);
}
}
/// Generates a chunked stream of [length] zeroes.
Stream<List<int>> zeroes(int length) async* {
// Emit data in chunks for efficiency
const chunkSize = 4 * 1024;
if (length < chunkSize) {
yield Uint8List(length);
return;
}
final chunk = Uint8List(chunkSize);
for (var i = 0; i < length ~/ chunkSize; i++) {
yield chunk;
}
final remainingBytes = length % chunkSize;
if (remainingBytes != 0) {
yield Uint8List(remainingBytes);
}
}
/// An optimized reader reading 512-byte blocks from an input stream.
class BlockReader {
final Stream<List<int>> _input;
StreamSubscription<List<int>>? _subscription;
bool _isClosed = false;
/// If a request is active, returns the current stream that we're reporting.
/// This controler is synchronous.
StreamController<Uint8List>? _outgoing;
/// The amount of (512-byte) blocks remaining before [_outgoing] should close.
int _remainingBlocksInOutgoing = 0;
/// A pending tar block that has not been emitted yet.
///
/// This can happen if we receive small chunks of data in [_onData] that
/// aren't enough to form a full block.
final Uint8List _pendingBlock = Uint8List(blockSize);
/// The offset in [_pendingBlock] at which new data should start.
///
/// For instance, if this value is `502`, we're missing `10` additional bytes
/// to complete the [_pendingBlock].
/// When this value is `0`, there is no active pending block.
int _offsetInPendingBlock = 0;
/// Additional data that we received, but were unable to dispatch to a
/// downstream listener yet.
///
/// This can happen if a we receive a large chunk of data and a listener is
/// only interested in a small chunk.
///
/// We will never have trailing data and a pending block at the same time.
/// When we haver fewer than 512 bytes of trailing data, it should be stored
/// as a pending block instead.
Uint8List? _trailingData;
/// The offset in the [_trailingData] byte array.
///
/// When a new listener attaches, we can start by emitting the sublist
/// starting at this offset.
int _offsetInTrailingData = 0;
BlockReader(this._input);
/// Emits full blocks.
///
/// Returns `true` if the listener detached in response to emitting these
/// blocks. In this case, remaining data must be saved in [_trailingData].
bool _emitBlocks(Uint8List data, {int amount = 1}) {
assert(_remainingBlocksInOutgoing >= amount);
final outgoing = _outgoing!;
if (!outgoing.isClosed) outgoing.add(data);
final remainingNow = _remainingBlocksInOutgoing -= amount;
if (remainingNow == 0) {
_outgoing = null;
_pause();
scheduleMicrotask(() {
outgoing.close();
});
return true;
} else if (outgoing.isPaused || outgoing.isClosed) {
_pause();
return true;
}
return false;
}
void _onData(List<int> data) {
assert(_outgoing != null && _trailingData == null);
final typedData = data.asUint8List();
var offsetInData = 0;
/// Saves parts of the current chunks that couldn't be emitted.
void saveTrailingState() {
assert(_trailingData == null && _offsetInPendingBlock == 0);
final remaining = typedData.length - offsetInData;
if (remaining == 0) {
return; // Nothing to save, the chunk has been consumed fully.
} else if (remaining < blockSize) {
// Store remaining data as a pending block.
_pendingBlock.setAll(0, typedData.sublistView(offsetInData));
_offsetInPendingBlock = remaining;
} else {
_trailingData = typedData;
_offsetInTrailingData = offsetInData;
}
}
// Try to complete a pending block first
var offsetInPending = _offsetInPendingBlock;
final canWriteIntoPending = min(blockSize - offsetInPending, data.length);
if (offsetInPending != 0 && canWriteIntoPending > 0) {
_pendingBlock.setAll(
offsetInPending, typedData.sublistView(0, canWriteIntoPending));
offsetInPending = _offsetInPendingBlock += canWriteIntoPending;
offsetInData += canWriteIntoPending;
// Did this finish the pending block?
if (offsetInPending == blockSize) {
_offsetInPendingBlock = 0;
if (_emitBlocks(_pendingBlock)) {
// Emitting the pending block completed all pending requests.
saveTrailingState();
return;
}
} else {
// The chunk we received didn't fill up the pending block, so just stop
// here.
assert(offsetInData == data.length);
return;
}
}
// At this point, the pending block should have been served.
assert(_offsetInPendingBlock == 0);
final fullBlocksToEmit = min(_remainingBlocksInOutgoing,
(typedData.length - offsetInData) ~/ blockSize);
if (fullBlocksToEmit > 0) {
_emitBlocks(
typedData.sublistView(
offsetInData, offsetInData += fullBlocksToEmit * blockSize),
amount: fullBlocksToEmit,
);
}
saveTrailingState();
}
void _onError(Object error, StackTrace trace) {
assert(_outgoing != null && _trailingData == null);
_outgoing!.addError(error, trace);
}
void _onDone() {
assert(_outgoing != null && _trailingData == null);
final outgoing = _outgoing!;
// Add pending data, then close
if (_offsetInPendingBlock != 0) {
outgoing.add(_pendingBlock.sublistView(0, _offsetInPendingBlock));
}
_isClosed = true;
_subscription?.cancel();
outgoing.close();
}
void _subscribeOrResume() {
// We should not resume the subscription if there is trailing data ready to
// be emitted.
assert(_trailingData == null);
final sub = _subscription;
if (sub == null) {
_subscription = _input.listen(_onData,
onError: _onError, onDone: _onDone, cancelOnError: true);
} else {
sub.resume();
}
}
void _pause() {
final sub = _subscription!; // ignore: cancel_subscriptions
if (!sub.isPaused) sub.pause();
}
Future<Uint8List> nextBlock() {
final result = Uint8List(blockSize);
var offset = 0;
return nextBlocks(1).forEach((chunk) {
result.setAll(offset, chunk);
offset += chunk.length;
}).then((void _) => result.sublistView(0, offset));
}
Stream<Uint8List> nextBlocks(int amount) {
if (_isClosed || amount == 0) {
return const Stream.empty();
}
if (_outgoing != null) {
throw StateError(
'Cannot call nextBlocks() before the previous stream completed.');
}
assert(_remainingBlocksInOutgoing == 0);
// We're making this synchronous because we will mostly add events in
// response to receiving chunks from the source stream. We manually ensure
// that other emits are happening asynchronously.
final controller = StreamController<Uint8List>(sync: true);
_outgoing = controller;
_remainingBlocksInOutgoing = amount;
var state = _StreamState.initial;
/// Sends trailing data to the stream. Reeturns true if the subscription
/// should still be resumed afterwards.
bool emitTrailing() {
// Attempt to serve requests from pending data first.
final trailing = _trailingData;
if (trailing != null) {
// There should never be trailing data and a pending block at the
// same time
assert(_offsetInPendingBlock == 0);
var remaining = trailing.length - _offsetInTrailingData;
// If there is trailing data, it should contain a full block
// (otherwise we would have stored it as a pending block)
assert(remaining >= blockSize);
final blocks = min(_remainingBlocksInOutgoing, remaining ~/ blockSize);
assert(blocks > 0);
final done = _emitBlocks(
trailing.sublistView(_offsetInTrailingData,
_offsetInTrailingData + blocks * blockSize),
amount: blocks);
remaining -= blocks * blockSize;
_offsetInTrailingData += blocks * blockSize;
if (remaining == 0) {
_trailingData = null;
_offsetInTrailingData = 0;
} else if (remaining < blockSize) {
assert(_offsetInPendingBlock == 0);
// Move trailing data into a pending block
_pendingBlock.setAll(0, trailing.sublistView(_offsetInTrailingData));
_offsetInPendingBlock = remaining;
_trailingData = null;
_offsetInTrailingData = 0;
} else {
// If there is still more than a full block of data waiting, we
// should not listen. This implies that the stream is done already.
assert(done);
}
// The listener detached in response to receiving the event.
if (done) {
if (_remainingBlocksInOutgoing == 0) state = _StreamState.done;
return false;
}
}
return true;
}
void scheduleInitialEmit() {
scheduleMicrotask(() {
if (state != _StreamState.initial) return;
state = _StreamState.attached;
if (emitTrailing()) {
_subscribeOrResume();
}
});
}
controller
..onListen = scheduleInitialEmit
..onPause = () {
assert(state == _StreamState.initial || state == _StreamState.attached);
if (state == _StreamState.initial) {
state = _StreamState.pausedAfterInitial;
} else {
_pause();
state = _StreamState.pausedAfterAttached;
}
}
..onResume = () {
// We're done already
if (_remainingBlocksInOutgoing == 0) return;
assert(state == _StreamState.pausedAfterAttached ||
state == _StreamState.pausedAfterInitial);
if (state == _StreamState.pausedAfterInitial) {
state = _StreamState.initial;
scheduleInitialEmit();
} else {
state = _StreamState.attached;
if (emitTrailing()) {
_subscribeOrResume();
}
}
}
..onCancel = () {
state = _StreamState.done;
};
return controller.stream;
}
FutureOr<void> close() {
_isClosed = true;
return _subscription?.cancel();
}
}
enum _StreamState {
initial,
attached,
pausedAfterInitial,
pausedAfterAttached,
done,
}