Upgrade `package:tar` to version `0.5.4`. (#3313)
diff --git a/lib/src/third_party/tar/README.md b/lib/src/third_party/tar/README.md index 11a8f37..a2a220f 100644 --- a/lib/src/third_party/tar/README.md +++ b/lib/src/third_party/tar/README.md
@@ -1,7 +1,7 @@ # package:tar -Vendored elements from `package:tar` for use in creation and extration of +Vendored elements from `package:tar` for use in creation and extraction of tar-archives. * Repository: `https://github.com/simolus3/tar/` - * Revision: `b5c5a11d8969f458ccdeb8cf01615f692fed3e97` + * Revision: `7cdb563c9894600c6a739ec268f8673d6122006f`
diff --git a/lib/src/third_party/tar/src/constants.dart b/lib/src/third_party/tar/src/constants.dart index aac7669..05accb0 100644 --- a/lib/src/third_party/tar/src/constants.dart +++ b/lib/src/third_party/tar/src/constants.dart
@@ -216,13 +216,6 @@ /// Sticky bit const c_ISVTX = 512; -/// ********************** -/// Convenience constants -/// ********************** -/// 64-bit integer max and min values -const int64MaxValue = 9223372036854775807; -const int64MinValue = -9223372036854775808; - /// Constants to determine file modes. const modeType = 2401763328; const modeSymLink = 134217728;
diff --git a/lib/src/third_party/tar/src/entry.dart b/lib/src/third_party/tar/src/entry.dart index f6b0a5a..160974b 100644 --- a/lib/src/third_party/tar/src/entry.dart +++ b/lib/src/third_party/tar/src/entry.dart
@@ -52,8 +52,17 @@ TarEntry._(this.header, this.contents); /// Creates an in-memory tar entry from the [header] and the [data] to store. - factory TarEntry.data(TarHeader header, List<int> data) { + static SynchronousTarEntry data(TarHeader header, List<int> data) { (header as HeaderImpl).size = data.length; - return TarEntry(header, Stream.value(data)); + return SynchronousTarEntry._(header, data); } } + +/// A tar entry stored in memory. +class SynchronousTarEntry extends TarEntry { + /// The contents of this tar entry as a byte array. + final List<int> data; + + SynchronousTarEntry._(TarHeader header, this.data) + : super._(header, Stream.value(data)); +}
diff --git a/lib/src/third_party/tar/src/reader.dart b/lib/src/third_party/tar/src/reader.dart index 713235a..b9bc3d3 100644 --- a/lib/src/third_party/tar/src/reader.dart +++ b/lib/src/third_party/tar/src/reader.dart
@@ -22,14 +22,10 @@ /// to read each archive where possible. @sealed class TarReader implements StreamIterator<TarEntry> { - /// A chunked stream iterator to enable us to get our data. - final ChunkedStreamReader<int> _chunkedStream; + final BlockReader _reader; final PaxHeaders _paxHeaders = PaxHeaders(); final int _maxSpecialFileSize; - /// Skip the next [_skipNext] elements when reading in the stream. - int _skipNext = 0; - TarEntry? _current; /// The underlying content stream for the [_current] entry. Draining this @@ -88,7 +84,7 @@ TarReader(Stream<List<int>> tarStream, {int maxSpecialFileSize = defaultSpecialLength, bool disallowTrailingData = false}) - : _chunkedStream = ChunkedStreamReader(tarStream), + : _reader = BlockReader(tarStream), _checkNoTrailingData = disallowTrailingData, _maxSpecialFileSize = maxSpecialFileSize; @@ -152,13 +148,7 @@ // iterates through one or more "header files" until it finds a // "normal file". while (true) { - if (_skipNext > 0) { - await _readFullBlock(_skipNext); - _skipNext = 0; - } - - final rawHeader = - await _readFullBlock(blockSize, allowEmpty: eofAcceptable); + final rawHeader = await _readFullBlock(allowEmpty: eofAcceptable); nextHeader = await _readHeader(rawHeader); if (nextHeader == null) { @@ -180,19 +170,21 @@ nextHeader.typeFlag == TypeFlag.xGlobalHeader) { format = format.mayOnlyBe(TarFormat.pax); final paxHeaderSize = _checkSpecialSize(nextHeader.size); - final rawPaxHeaders = await _readFullBlock(paxHeaderSize); + + final rawPaxHeaders = + (await _readFullBlock(amount: numBlocks(paxHeaderSize))) + .sublistView(0, paxHeaderSize); _paxHeaders.readPaxHeaders( rawPaxHeaders, nextHeader.typeFlag == TypeFlag.xGlobalHeader); - _markPaddingToSkip(paxHeaderSize); // This is a meta header affecting the next header. continue; } else if (nextHeader.typeFlag == TypeFlag.gnuLongLink || nextHeader.typeFlag == TypeFlag.gnuLongName) { format = format.mayOnlyBe(TarFormat.gnu); - final realName = await _readFullBlock( - _checkSpecialSize(nextBlockSize(nextHeader.size))); + final size = _checkSpecialSize(nextHeader.size); + final realName = await _readFullBlock(amount: numBlocks(size)); final readName = realName.readString(0, realName.length); if (nextHeader.typeFlag == TypeFlag.gnuLongName) { @@ -247,7 +239,7 @@ // Note: Calling cancel is safe when the stream has already been completed. // It's a noop in that case, which is what we want. - return _chunkedStream.cancel(); + return _reader.close(); } /// Utility function for quickly iterating through all entries in [tarStream]. @@ -317,7 +309,7 @@ Uint8List block; do { - block = await _chunkedStream.readBytes(blockSize); + block = await _reader.nextBlock(); if (!block.isAllZeroes) { throw TarException( 'Illegal content after the end of the tar archive.'); @@ -333,15 +325,24 @@ throw TarException.header('Unexpected end of file'); } - /// Reads a block with the requested [size], or throws an unexpected EoF - /// exception. - Future<Uint8List> _readFullBlock(int size, {bool allowEmpty = false}) async { - final block = await _chunkedStream.readBytes(size); - if (block.length != size && !(allowEmpty && block.isEmpty)) { - _unexpectedEof(); - } + /// Reads [amount] blocks from the input stream, or throws an exception if + /// the stream ends prematurely. + Future<Uint8List> _readFullBlock({bool allowEmpty = false, int amount = 1}) { + final blocks = Uint8List(amount * blockSize); + var offset = 0; - return block; + return _reader.nextBlocks(amount).forEach((chunk) { + blocks.setAll(offset, chunk); + offset += chunk.length; + }).then((void _) { + if (allowEmpty && offset == 0) { + return Uint8List(0); + } else if (offset < blocks.length) { + _unexpectedEof(); + } else { + return blocks; + } + }); } /// Reads the next block header and assumes that the underlying reader @@ -357,7 +358,7 @@ if (rawHeader.isEmpty) return null; if (rawHeader.isAllZeroes) { - rawHeader = await _chunkedStream.readBytes(blockSize); + rawHeader = await _reader.nextBlock(); // Exactly 1 block of zeroes is read and EOF is hit. if (rawHeader.isEmpty) return null; @@ -393,9 +394,9 @@ final sparseDataLength = sparseData.fold<int>(0, (value, element) => value + element.length); - final streamLength = nextBlockSize(sparseDataLength); - final safeStream = - _publishStream(_chunkedStream.readStream(streamLength), streamLength); + final streamBlockCount = numBlocks(sparseDataLength); + final safeStream = _publishStream( + _reader.nextBlocks(streamBlockCount), streamBlockCount * blockSize); return sparseStream(safeStream, sparseHoles, header.size); } else { var size = header.size; @@ -408,9 +409,8 @@ if (size == 0) { return _publishStream(const Stream<Never>.empty(), 0); } else { - _markPaddingToSkip(size); - return _publishStream( - _chunkedStream.readStream(header.size), header.size); + final blockCount = numBlocks(header.size); + return _publishStream(_reader.nextBlocks(blockCount), header.size); } } } @@ -424,7 +424,37 @@ // There can only be one content stream at a time. This precondition is // checked by _prepareToReadHeaders. assert(_underlyingContentStream == null); - return _underlyingContentStream = Stream.eventTransformed(stream, (sink) { + Stream<List<int>>? thisStream; + + return thisStream = + _underlyingContentStream = Stream.eventTransformed(stream, (sink) { + // This callback is called when we have a listener. Make sure that, at + // this point, this stream is still the active content stream. + // If users store the contents of a tar header, then read more tar + // entries, and finally try to read the stream of the old contents, they'd + // get an exception about the straem already being listened to. + // This can be a bit confusing, so this check enables a better error UX. + if (thisStream != _underlyingContentStream) { + throw StateError( + 'Tried listening to an outdated tar entry. \n' + 'As all tar entries found by a reader are backed by a single source ' + 'stream, only the latest tar entry can be read. It looks like you ' + 'stored the results of `tarEntry.contents` somewhere, called ' + '`reader.moveNext()` and then read the contents of the previous ' + 'entry.\n' + 'For more details, including a discussion of workarounds, see ' + 'https://github.com/simolus3/tar/issues/18', + ); + } else if (_listenedToContentsOnce) { + throw StateError( + 'A tar entry has been listened to multiple times. \n' + 'As all tar entries are read from what\'s likely a single-' + 'subscription stream, this is unsupported. If you didn\'t read a tar ' + 'entry multiple times yourself, perhaps you\'ve called `moveNext()` ' + 'before reading contents?', + ); + } + _listenedToContentsOnce = true; late _OutgoingStreamGuard guard; @@ -432,7 +462,7 @@ length, sink, // Reset state when the stream is done. This will only be called when - // the sream is done, not when a listener cancels. + // the stream is done, not when a listener cancels. () { _underlyingContentStream = null; if (guard.hadError) { @@ -443,15 +473,6 @@ }); } - /// Skips to the next block after reading [readSize] bytes from the beginning - /// of a previous block. - void _markPaddingToSkip(int readSize) { - final offsetInLastBlock = readSize.toUnsigned(blockSizeLog2); - if (offsetInLastBlock != 0) { - _skipNext = blockSize - offsetInLastBlock; - } - } - /// Checks the PAX headers for GNU sparse headers. /// If they are found, then this function reads the sparse map and returns it. /// This assumes that 0.0 headers have already been converted to 0.1 headers @@ -519,7 +540,7 @@ /// Ensures that [block] h as at least [n] tokens. Future<void> feedTokens(int n) async { while (newLineCount < n) { - final newBlock = await _chunkedStream.readBytes(blockSize); + final newBlock = await _readFullBlock(); if (newBlock.length < blockSize) { throw TarException.header( 'GNU Sparse Map does not have enough lines!'); @@ -639,54 +660,44 @@ throw TarException.header('Tried to read sparse map of non-GNU header'); } + // Read the real size of the file when sparse holes are expanded. header.size = rawHeader.readNumeric(483, 12); - final sparseMaps = <Uint8List>[]; + final sparseEntries = <SparseEntry>[]; - var sparse = rawHeader.sublistView(386, 483); - sparseMaps.add(sparse); + bool readEntry(Uint8List source, int offset) { + // If a sparse header starts with a null byte, it marks the end of the + // sparse structures. + if (rawHeader[offset] == 0) return false; - while (true) { - final maxEntries = sparse.length ~/ 24; - if (sparse[24 * maxEntries] > 0) { - // If there are more entries, read an extension header and parse its - // entries. - sparse = await _chunkedStream.readBytes(blockSize); - sparseMaps.add(sparse); - continue; + final fileOffset = source.readNumeric(offset, 12); + final length = source.readNumeric(offset + 12, 12); + + sparseEntries.add(SparseEntry(fileOffset, length)); + return true; + } + + // The first four sparse headers are stored in the tar header itself + for (var i = 0; i < 4; i++) { + final offset = 386 + 24 * i; + if (!readEntry(rawHeader, offset)) break; + } + + var isExtended = rawHeader[482] != 0; + + while (isExtended) { + // Ok, we have a new block of sparse headers to process + final block = await _readFullBlock(); + + // A full block of sparse data contains up to 21 entries + for (var i = 0; i < 21; i++) { + if (!readEntry(block, i * 24)) break; } - break; + // The last bytes indicates whether another sparse header block follows. + isExtended = block[504] != 0; } - try { - return _processOldGNUSparseMap(sparseMaps); - } on FormatException { - throw TarException('Invalid old GNU Sparse Map'); - } - } - - /// Process [sparseMaps], which is known to be an OLD GNU v0.1 sparse map. - /// - /// For details, see https://www.gnu.org/software/tar/manual/html_section/tar_94.html#SEC191 - List<SparseEntry> _processOldGNUSparseMap(List<Uint8List> sparseMaps) { - final sparseData = <SparseEntry>[]; - - for (final sparseMap in sparseMaps) { - final maxEntries = sparseMap.length ~/ 24; - for (var i = 0; i < maxEntries; i++) { - // This termination condition is identical to GNU and BSD tar. - if (sparseMap[i * 24] == 0) { - // Don't return, need to process extended headers (even if empty) - break; - } - - final offset = sparseMap.readNumeric(i * 24, 12); - final length = sparseMap.readNumeric(i * 24 + 12, 12); - - sparseData.add(SparseEntry(offset, length)); - } - } - return sparseData; + return sparseEntries; } } @@ -703,10 +714,6 @@ _globalHeaders.addAll(headers); } - void addLocal(String key, String value) => _localHeaders[key] = value; - - void removeLocal(String key) => _localHeaders.remove(key); - /// Applies new local PAX-headers from the map. /// /// This replaces all currently active local headers. @@ -792,18 +799,20 @@ // Skip over the equals sign offset = nextEquals + 1; - // Subtract one for trailing newline + // Subtract one for trailing newline for value final endOfValue = endOfEntry - 1; - final value = utf8.decoder.convert(data, offset, endOfValue); - if (!_isValidPaxRecord(key, value)) { + if (!_isValidPaxKey(key)) { error(); } // If we're seeing weird PAX Version 0.0 sparse keys, expect alternating // GNU.sparse.offset and GNU.sparse.numbytes headers. if (key == paxGNUSparseNumBytes || key == paxGNUSparseOffset) { - if ((sparseMap.length.isEven && key != paxGNUSparseOffset) || + final value = utf8.decoder.convert(data, offset, endOfValue); + + if (!_isValidPaxRecord(key, value) || + (sparseMap.length.isEven && key != paxGNUSparseOffset) || (sparseMap.length.isOdd && key != paxGNUSparseNumBytes) || value.contains(',')) { error(); @@ -813,6 +822,12 @@ } else if (!ignoreUnknown || supportedPaxHeaders.contains(key)) { // Ignore unrecognized headers to avoid unbounded growth of the global // header map. + final value = unsafeUtf8Decoder.convert(data, offset, endOfValue); + + if (!_isValidPaxRecord(key, value)) { + error(); + } + map[key] = value; } @@ -837,16 +852,23 @@ } } + // NB: Some Tar files have malformed UTF-8 data in the headers, we should + // decode them anyways even if they're broken + static const unsafeUtf8Decoder = Utf8Decoder(allowMalformed: true); + + static bool _isValidPaxKey(String key) { + // These limitations are documented in the PAX standard. + return key.isNotEmpty && !key.contains('=') & !key.codeUnits.contains(0); + } + /// Checks whether [key], [value] is a valid entry in a pax header. /// /// This is adopted from the Golang tar reader (`validPAXRecord`), which says /// that "Keys and values should be UTF-8, but the number of bad writers out /// there forces us to be a more liberal." static bool _isValidPaxRecord(String key, String value) { - // These limitations are documented in the PAX standard. - if (key.isEmpty || key.contains('=')) return false; - - // These aren't, but Golangs's tar has them and got away with it. + // These aren't documented in any standard, but Golangs's tar has them and + // got away with it. switch (key) { case paxPath: case paxLinkpath: @@ -854,7 +876,7 @@ case paxGname: return !value.codeUnits.contains(0); default: - return !key.codeUnits.contains(0); + return true; } } } @@ -864,27 +886,50 @@ /// [ChunkedStreamReader.readStream] might return a stream shorter than /// expected. That indicates an invalid tar file though, since the correct size /// is stored in the header. -class _OutgoingStreamGuard extends EventSink<List<int>> { - final int expectedSize; +class _OutgoingStreamGuard extends EventSink<Uint8List> { + int remainingContentSize; + int remainingPaddingSize; + final EventSink<List<int>> out; void Function() onDone; - int emittedSize = 0; bool hadError = false; + bool isInContent = true; - _OutgoingStreamGuard(this.expectedSize, this.out, this.onDone); + _OutgoingStreamGuard(this.remainingContentSize, this.out, this.onDone) + : remainingPaddingSize = _paddingFor(remainingContentSize); + + static int _paddingFor(int contentSize) { + final offsetInLastBlock = contentSize.toUnsigned(blockSizeLog2); + if (offsetInLastBlock != 0) { + return blockSize - offsetInLastBlock; + } + return 0; + } @override - void add(List<int> event) { - emittedSize += event.length; - // We have checks limiting the length of outgoing streams. If the stream is - // larger than expected, that's a bug in pkg:tar. - assert( - emittedSize <= expectedSize, - 'Stream now emitted $emittedSize bytes, but only expected ' - '$expectedSize'); + void add(Uint8List event) { + if (isInContent) { + if (event.length <= remainingContentSize) { + // We can fully add this chunk as it consists entirely of data + out.add(event); + remainingContentSize -= event.length; + } else { + // We can add the first bytes as content, the others are padding that we + // shouldn't emit + out.add(event.sublistView(0, remainingContentSize)); + isInContent = false; + remainingPaddingSize -= event.length - remainingContentSize; + remainingContentSize = 0; + } + } else { + // Ok, the entire event is padding + remainingPaddingSize -= event.length; + } - out.add(event); + // The underlying stream comes from pkg:tar, so if we get too many bytes + // that's a bug in this package. + assert(remainingPaddingSize >= 0, 'Stream emitted to many bytes'); } @override @@ -895,15 +940,14 @@ @override void close() { - onDone(); - // If the stream stopped after an error, the user is already aware that // something is wrong. - if (emittedSize < expectedSize && !hadError) { + if (remainingContentSize > 0 && !hadError) { out.addError( TarException('Unexpected end of tar file'), StackTrace.current); } + onDone(); out.close(); } }
diff --git a/lib/src/third_party/tar/src/utils.dart b/lib/src/third_party/tar/src/utils.dart index a6b7e13..4fa75b1 100644 --- a/lib/src/third_party/tar/src/utils.dart +++ b/lib/src/third_party/tar/src/utils.dart
@@ -1,3 +1,4 @@ +import 'dart:async'; import 'dart:convert'; import 'dart:math'; import 'dart:typed_data'; @@ -92,25 +93,31 @@ } int computeUnsignedHeaderChecksum() { - var result = 0; + // Accessing the last element first helps the VM eliminate bounds checks in + // the loops below. + this[blockSize - 1]; + var result = checksumLength * _checksumPlaceholder; - for (var i = 0; i < length; i++) { - result += (i < checksumOffset || i >= _checksumEnd) - ? this[i] // Not in range of where the checksum is written - : _checksumPlaceholder; + for (var i = 0; i < checksumOffset; i++) { + result += this[i]; + } + for (var i = _checksumEnd; i < blockSize; i++) { + result += this[i]; } return result; } int computeSignedHeaderChecksum() { - var result = 0; + this[blockSize - 1]; + // Note that _checksumPlaceholder.toSigned(8) == _checksumPlaceholder + var result = checksumLength * _checksumPlaceholder; - for (var i = 0; i < length; i++) { - // Note that _checksumPlaceholder.toSigned(8) == _checksumPlaceholder - result += (i < checksumOffset || i >= _checksumEnd) - ? this[i].toSigned(8) - : _checksumPlaceholder; + for (var i = 0; i < checksumOffset; i++) { + result += this[i].toSigned(8); + } + for (var i = _checksumEnd; i < blockSize; i++) { + result += this[i].toSigned(8); } return result; @@ -123,6 +130,14 @@ return true; } + + bool get isAllZeroes { + for (var i = 0; i < length; i++) { + if (this[i] != 0) return false; + } + + return true; + } } bool isNotAscii(int i) => i > 128; @@ -200,14 +215,6 @@ final $this = this; return $this is Uint8List ? $this : Uint8List.fromList(this); } - - bool get isAllZeroes { - for (var i = 0; i < length; i++) { - if (this[i] != 0) return false; - } - - return true; - } } /// Generates a chunked stream of [length] zeroes. @@ -229,3 +236,325 @@ yield Uint8List(remainingBytes); } } + +/// An optimized reader reading 512-byte blocks from an input stream. +class BlockReader { + final Stream<List<int>> _input; + StreamSubscription<List<int>>? _subscription; + bool _isClosed = false; + + /// If a request is active, returns the current stream that we're reporting. + /// This controler is synchronous. + StreamController<Uint8List>? _outgoing; + + /// The amount of (512-byte) blocks remaining before [_outgoing] should close. + int _remainingBlocksInOutgoing = 0; + + /// A pending tar block that has not been emitted yet. + /// + /// This can happen if we receive small chunks of data in [_onData] that + /// aren't enough to form a full block. + final Uint8List _pendingBlock = Uint8List(blockSize); + + /// The offset in [_pendingBlock] at which new data should start. + /// + /// For instance, if this value is `502`, we're missing `10` additional bytes + /// to complete the [_pendingBlock]. + /// When this value is `0`, there is no active pending block. + int _offsetInPendingBlock = 0; + + /// Additional data that we received, but were unable to dispatch to a + /// downstream listener yet. + /// + /// This can happen if a we receive a large chunk of data and a listener is + /// only interested in a small chunk. + /// + /// We will never have trailing data and a pending block at the same time. + /// When we haver fewer than 512 bytes of trailing data, it should be stored + /// as a pending block instead. + Uint8List? _trailingData; + + /// The offset in the [_trailingData] byte array. + /// + /// When a new listener attaches, we can start by emitting the sublist + /// starting at this offset. + int _offsetInTrailingData = 0; + + BlockReader(this._input); + + /// Emits full blocks. + /// + /// Returns `true` if the listener detached in response to emitting these + /// blocks. In this case, remaining data must be saved in [_trailingData]. + bool _emitBlocks(Uint8List data, {int amount = 1}) { + assert(_remainingBlocksInOutgoing >= amount); + final outgoing = _outgoing!; + + if (!outgoing.isClosed) outgoing.add(data); + + final remainingNow = _remainingBlocksInOutgoing -= amount; + if (remainingNow == 0) { + _outgoing = null; + _pause(); + + scheduleMicrotask(() { + outgoing.close(); + }); + return true; + } else if (outgoing.isPaused || outgoing.isClosed) { + _pause(); + return true; + } + + return false; + } + + void _onData(List<int> data) { + assert(_outgoing != null && _trailingData == null); + + final typedData = data.asUint8List(); + var offsetInData = 0; + + /// Saves parts of the current chunks that couldn't be emitted. + void saveTrailingState() { + assert(_trailingData == null && _offsetInPendingBlock == 0); + + final remaining = typedData.length - offsetInData; + + if (remaining == 0) { + return; // Nothing to save, the chunk has been consumed fully. + } else if (remaining < blockSize) { + // Store remaining data as a pending block. + _pendingBlock.setAll(0, typedData.sublistView(offsetInData)); + _offsetInPendingBlock = remaining; + } else { + _trailingData = typedData; + _offsetInTrailingData = offsetInData; + } + } + + // Try to complete a pending block first + var offsetInPending = _offsetInPendingBlock; + final canWriteIntoPending = min(blockSize - offsetInPending, data.length); + + if (offsetInPending != 0 && canWriteIntoPending > 0) { + _pendingBlock.setAll( + offsetInPending, typedData.sublistView(0, canWriteIntoPending)); + offsetInPending = _offsetInPendingBlock += canWriteIntoPending; + offsetInData += canWriteIntoPending; + + // Did this finish the pending block? + if (offsetInPending == blockSize) { + _offsetInPendingBlock = 0; + if (_emitBlocks(_pendingBlock)) { + // Emitting the pending block completed all pending requests. + saveTrailingState(); + return; + } + } else { + // The chunk we received didn't fill up the pending block, so just stop + // here. + assert(offsetInData == data.length); + return; + } + } + + // At this point, the pending block should have been served. + assert(_offsetInPendingBlock == 0); + + final fullBlocksToEmit = min(_remainingBlocksInOutgoing, + (typedData.length - offsetInData) ~/ blockSize); + + if (fullBlocksToEmit > 0) { + _emitBlocks( + typedData.sublistView( + offsetInData, offsetInData += fullBlocksToEmit * blockSize), + amount: fullBlocksToEmit, + ); + } + + saveTrailingState(); + } + + void _onError(Object error, StackTrace trace) { + assert(_outgoing != null && _trailingData == null); + + _outgoing!.addError(error, trace); + } + + void _onDone() { + assert(_outgoing != null && _trailingData == null); + final outgoing = _outgoing!; + + // Add pending data, then close + if (_offsetInPendingBlock != 0) { + outgoing.add(_pendingBlock.sublistView(0, _offsetInPendingBlock)); + } + + _isClosed = true; + _subscription?.cancel(); + outgoing.close(); + } + + void _subscribeOrResume() { + // We should not resume the subscription if there is trailing data ready to + // be emitted. + assert(_trailingData == null); + + final sub = _subscription; + if (sub == null) { + _subscription = _input.listen(_onData, + onError: _onError, onDone: _onDone, cancelOnError: true); + } else { + sub.resume(); + } + } + + void _pause() { + final sub = _subscription!; // ignore: cancel_subscriptions + + if (!sub.isPaused) sub.pause(); + } + + Future<Uint8List> nextBlock() { + final result = Uint8List(blockSize); + var offset = 0; + + return nextBlocks(1).forEach((chunk) { + result.setAll(offset, chunk); + offset += chunk.length; + }).then((void _) => result.sublistView(0, offset)); + } + + Stream<Uint8List> nextBlocks(int amount) { + if (_isClosed || amount == 0) { + return const Stream.empty(); + } + if (_outgoing != null) { + throw StateError( + 'Cannot call nextBlocks() before the previous stream completed.'); + } + assert(_remainingBlocksInOutgoing == 0); + + // We're making this synchronous because we will mostly add events in + // response to receiving chunks from the source stream. We manually ensure + // that other emits are happening asynchronously. + final controller = StreamController<Uint8List>(sync: true); + _outgoing = controller; + _remainingBlocksInOutgoing = amount; + + var state = _StreamState.initial; + + /// Sends trailing data to the stream. Reeturns true if the subscription + /// should still be resumed afterwards. + bool emitTrailing() { + // Attempt to serve requests from pending data first. + final trailing = _trailingData; + if (trailing != null) { + // There should never be trailing data and a pending block at the + // same time + assert(_offsetInPendingBlock == 0); + + var remaining = trailing.length - _offsetInTrailingData; + // If there is trailing data, it should contain a full block + // (otherwise we would have stored it as a pending block) + assert(remaining >= blockSize); + + final blocks = min(_remainingBlocksInOutgoing, remaining ~/ blockSize); + assert(blocks > 0); + + final done = _emitBlocks( + trailing.sublistView(_offsetInTrailingData, + _offsetInTrailingData + blocks * blockSize), + amount: blocks); + + remaining -= blocks * blockSize; + _offsetInTrailingData += blocks * blockSize; + + if (remaining == 0) { + _trailingData = null; + _offsetInTrailingData = 0; + } else if (remaining < blockSize) { + assert(_offsetInPendingBlock == 0); + + // Move trailing data into a pending block + _pendingBlock.setAll(0, trailing.sublistView(_offsetInTrailingData)); + _offsetInPendingBlock = remaining; + _trailingData = null; + _offsetInTrailingData = 0; + } else { + // If there is still more than a full block of data waiting, we + // should not listen. This implies that the stream is done already. + assert(done); + } + + // The listener detached in response to receiving the event. + if (done) { + if (_remainingBlocksInOutgoing == 0) state = _StreamState.done; + return false; + } + } + + return true; + } + + void scheduleInitialEmit() { + scheduleMicrotask(() { + if (state != _StreamState.initial) return; + state = _StreamState.attached; + + if (emitTrailing()) { + _subscribeOrResume(); + } + }); + } + + controller + ..onListen = scheduleInitialEmit + ..onPause = () { + assert(state == _StreamState.initial || state == _StreamState.attached); + + if (state == _StreamState.initial) { + state = _StreamState.pausedAfterInitial; + } else { + _pause(); + state = _StreamState.pausedAfterAttached; + } + } + ..onResume = () { + // We're done already + if (_remainingBlocksInOutgoing == 0) return; + + assert(state == _StreamState.pausedAfterAttached || + state == _StreamState.pausedAfterInitial); + + if (state == _StreamState.pausedAfterInitial) { + state = _StreamState.initial; + scheduleInitialEmit(); + } else { + state = _StreamState.attached; + if (emitTrailing()) { + _subscribeOrResume(); + } + } + } + ..onCancel = () { + state = _StreamState.done; + }; + + return controller.stream; + } + + FutureOr<void> close() { + _isClosed = true; + return _subscription?.cancel(); + } +} + +enum _StreamState { + initial, + attached, + pausedAfterInitial, + pausedAfterAttached, + done, +}
diff --git a/lib/src/third_party/tar/src/writer.dart b/lib/src/third_party/tar/src/writer.dart index 71ab368..5ff92b9 100644 --- a/lib/src/third_party/tar/src/writer.dart +++ b/lib/src/third_party/tar/src/writer.dart
@@ -99,6 +99,33 @@ return _WritingSink(output, format); } +/// A synchronous encoder for in-memory tar files. +/// +/// The default [tarWriter] creates an asynchronous conversion from a stream of +/// tar entries to a byte stream. +/// When all tar entries are in-memory ([SynchronousTarEntry]), it is possible +/// to write them synchronously too. +/// +/// To create a tar archive consisting of a single entry, use +/// [Converter.convert] on this [tarConverter]. +/// To create a tar archive consisting of any number of entries, first call +/// [Converter.startChunkedConversion] with a suitable output sink. Next, call +/// [Sink.add] for each tar entry and finish the archive by calling +/// [Sink.close]. +/// +/// To change the output format of the tar converter, use [tarConverterWith]. +/// To encode any kind of tar entries, use the asynchronous [tarWriter]. +const Converter<SynchronousTarEntry, List<int>> tarConverter = + _SynchronousTarConverter(OutputFormat.pax); + +/// A synchronous encoder for in-memory tar files, with custom encoding options. +/// +/// For more information on how to use the converter, see [tarConverter]. +Converter<SynchronousTarEntry, List<int>> tarConverterWith( + {OutputFormat format = OutputFormat.pax}) { + return _SynchronousTarConverter(format); +} + /// This option controls how long file and link names should be written. /// /// This option can be passed to writer in [tarWritingSink] or[tarWriterWith]. @@ -127,16 +154,15 @@ class _WritingSink extends StreamSink<TarEntry> { final StreamSink<List<int>> _output; - final OutputFormat format; - - int _paxHeaderCount = 0; + final _SynchronousTarSink _synchronousWriter; bool _closed = false; final Completer<Object?> _done = Completer(); int _pendingOperations = 0; Future<void> _ready = Future.value(); - _WritingSink(this._output, this.format); + _WritingSink(this._output, OutputFormat format) + : _synchronousWriter = _SynchronousTarSink(_output, format); @override Future<void> get done => _done.future; @@ -175,6 +201,120 @@ size = bufferedData.length; } + _synchronousWriter._writeHeader(header, size); + + // Write content. + if (bufferedData != null) { + _output.add(bufferedData); + } else { + await _output.addStream(event.contents); + } + + _output.add(_paddingBytes(size)); + } + + @override + void addError(Object error, [StackTrace? stackTrace]) { + _output.addError(error, stackTrace); + } + + @override + Future<void> addStream(Stream<TarEntry> stream) async { + await for (final entry in stream) { + await add(entry); + } + } + + @override + Future<void> close() async { + if (!_closed) { + _closed = true; + + // Add two empty blocks at the end. + await _doWork(_synchronousWriter.close); + } + + return done; + } +} + +Uint8List _paddingBytes(int size) { + final padding = -size % blockSize; + assert((size + padding) % blockSize == 0 && + padding <= blockSize && + padding >= 0); + + return Uint8List(padding); +} + +class _SynchronousTarConverter + extends Converter<SynchronousTarEntry, List<int>> { + final OutputFormat format; + + const _SynchronousTarConverter(this.format); + + @override + Sink<SynchronousTarEntry> startChunkedConversion(Sink<List<int>> sink) { + return _SynchronousTarSink(sink, format); + } + + @override + List<int> convert(SynchronousTarEntry input) { + final output = BytesBuilder(copy: false); + startChunkedConversion(ByteConversionSink.withCallback(output.add)) + ..add(input) + ..close(); + + return output.takeBytes(); + } +} + +class _SynchronousTarSink extends Sink<SynchronousTarEntry> { + final OutputFormat _format; + final Sink<List<int>> _output; + + bool _closed = false; + int _paxHeaderCount = 0; + + _SynchronousTarSink(this._output, this._format); + + @override + void add(SynchronousTarEntry data) { + addHeaderAndData(data.header, data.data); + } + + void addHeaderAndData(TarHeader header, List<int> data) { + _throwIfClosed(); + + _writeHeader(header, data.length); + _output + ..add(data) + ..add(_paddingBytes(data.length)); + } + + @override + void close() { + if (_closed) return; + + // End the tar archive by writing two zero blocks. + _output + ..add(UnmodifiableUint8ListView(zeroBlock)) + ..add(UnmodifiableUint8ListView(zeroBlock)); + _output.close(); + + _closed = true; + } + + void _throwIfClosed() { + if (_closed) { + throw StateError('Encoder is closed. ' + 'After calling `endOfArchive()`, encoder must not be used.'); + } + } + + void _writeHeader(TarHeader header, int size) { + assert(header.size < 0 || header.size == size); + var nameBytes = utf8.encode(header.name); var linkBytes = utf8.encode(header.linkName ?? ''); var gnameBytes = utf8.encode(header.groupName ?? ''); @@ -209,10 +349,10 @@ } if (paxHeader.isNotEmpty) { - if (format == OutputFormat.pax) { - await _writePaxHeader(paxHeader); + if (_format == OutputFormat.pax) { + _writePaxHeader(paxHeader); } else { - await _writeGnuLongName(paxHeader); + _writeGnuLongName(paxHeader); } } @@ -238,24 +378,13 @@ checksum += byte; } headerBlock.setUint(checksum, 148, 8); - _output.add(headerBlock); - - // Write content. - if (bufferedData != null) { - _output.add(bufferedData); - } else { - await event.contents.forEach(_output.add); - } - - final padding = -size % blockSize; - _output.add(Uint8List(padding)); } - /// Writes an extended pax header. + /// Encodes an extended pax header. /// /// https://pubs.opengroup.org/onlinepubs/9699919799/utilities/pax.html#tag_20_92_13_03 - Future<void> _writePaxHeader(Map<String, List<int>> values) { + void _writePaxHeader(Map<String, List<int>> values) { final buffer = BytesBuilder(); // format of each entry: "%d %s=%s\n", <length>, <keyword>, <value> // note that the length includes the trailing \n and the length description @@ -287,7 +416,7 @@ }); final paxData = buffer.takeBytes(); - final file = TarEntry.data( + addHeaderAndData( HeaderImpl.internal( format: TarFormat.pax, modified: millisecondsSinceEpoch(0), @@ -298,10 +427,9 @@ ), paxData, ); - return _safeAdd(file); } - Future<void> _writeGnuLongName(Map<String, List<int>> values) async { + void _writeGnuLongName(Map<String, List<int>> values) { // Ensure that a file that can't be written in the GNU format is not written const allowedKeys = {paxPath, paxLinkpath}; final invalidOptions = values.keys.toSet()..removeAll(allowedKeys); @@ -316,54 +444,25 @@ final name = values[paxPath]; final linkName = values[paxLinkpath]; - Future<void> write(List<int> name, TypeFlag flag) { - return _safeAdd( - TarEntry.data( - HeaderImpl.internal( - name: '././@LongLink', - modified: millisecondsSinceEpoch(0), - format: TarFormat.gnu, - typeFlag: flag, - ), - name, + void create(List<int> name, TypeFlag flag) { + return addHeaderAndData( + HeaderImpl.internal( + name: '././@LongLink', + modified: millisecondsSinceEpoch(0), + format: TarFormat.gnu, + typeFlag: flag, ), + name, ); } if (name != null) { - await write(name, TypeFlag.gnuLongName); + create(name, TypeFlag.gnuLongName); } if (linkName != null) { - await write(linkName, TypeFlag.gnuLongLink); + create(linkName, TypeFlag.gnuLongLink); } } - - @override - void addError(Object error, [StackTrace? stackTrace]) { - _output.addError(error, stackTrace); - } - - @override - Future<void> addStream(Stream<TarEntry> stream) async { - await for (final entry in stream) { - await add(entry); - } - } - - @override - Future<void> close() async { - if (!_closed) { - _closed = true; - - // Add two empty blocks at the end. - await _doWork(() { - _output.add(zeroBlock); - _output.add(zeroBlock); - }); - } - - return done; - } } extension on Uint8List {
diff --git a/lib/src/third_party/tar/tar.dart b/lib/src/third_party/tar/tar.dart index 218a6a2..14247bd 100644 --- a/lib/src/third_party/tar/tar.dart +++ b/lib/src/third_party/tar/tar.dart
@@ -9,7 +9,7 @@ import 'src/writer.dart'; export 'src/constants.dart' show TypeFlag; -export 'src/entry.dart'; +export 'src/entry.dart' show TarEntry, SynchronousTarEntry; export 'src/exception.dart'; export 'src/format.dart'; export 'src/header.dart' show TarHeader;
diff --git a/test/io_test.dart b/test/io_test.dart index 2c529cd..0657dd6 100644 --- a/test/io_test.dart +++ b/test/io_test.dart
@@ -291,7 +291,7 @@ test( 'applies executable bits from tar file', () => withTempDir((tempDir) async { - final entry = Stream.value(TarEntry.data( + final entry = Stream<TarEntry>.value(TarEntry.data( TarHeader( name: 'weird_exe', typeFlag: TypeFlag.reg, @@ -309,7 +309,7 @@ test('extracts files and links', () { return withTempDir((tempDir) async { - final entries = Stream.fromIterable([ + final entries = Stream<TarEntry>.fromIterable([ TarEntry.data( TarHeader(name: 'lib/main.txt', typeFlag: TypeFlag.reg), utf8.encode('text content'), @@ -349,7 +349,7 @@ test('preserves empty directories', () { return withTempDir((tempDir) async { - final entry = Stream.value(TarEntry.data( + final entry = Stream<TarEntry>.value(TarEntry.data( TarHeader( name: 'bin/', typeFlag: TypeFlag.dir, @@ -368,7 +368,7 @@ test('throws for entries escaping the tar file', () { return withTempDir((tempDir) async { - final entry = Stream.value(TarEntry.data( + final entry = Stream<TarEntry>.value(TarEntry.data( TarHeader( name: '../other_package-1.2.3/lib/file.dart', typeFlag: TypeFlag.reg, @@ -386,7 +386,7 @@ test('skips symlinks escaping the tar file', () { return withTempDir((tempDir) async { - final entry = Stream.value(TarEntry.data( + final entry = Stream<TarEntry>.value(TarEntry.data( TarHeader( name: 'nested/bad_link', typeFlag: TypeFlag.symlink, @@ -403,7 +403,7 @@ test('skips hardlinks escaping the tar file', () { return withTempDir((tempDir) async { - final entry = Stream.value(TarEntry.data( + final entry = Stream<TarEntry>.value(TarEntry.data( TarHeader( name: 'nested/bad_link', typeFlag: TypeFlag.link,