Upgrade `package:tar` to version `0.5.4`. (#3313)

diff --git a/lib/src/third_party/tar/README.md b/lib/src/third_party/tar/README.md
index 11a8f37..a2a220f 100644
--- a/lib/src/third_party/tar/README.md
+++ b/lib/src/third_party/tar/README.md
@@ -1,7 +1,7 @@
 # package:tar
 
-Vendored elements from `package:tar` for use in creation and extration of
+Vendored elements from `package:tar` for use in creation and extraction of
 tar-archives.
 
  * Repository: `https://github.com/simolus3/tar/`
- * Revision: `b5c5a11d8969f458ccdeb8cf01615f692fed3e97`
+ * Revision: `7cdb563c9894600c6a739ec268f8673d6122006f`
diff --git a/lib/src/third_party/tar/src/constants.dart b/lib/src/third_party/tar/src/constants.dart
index aac7669..05accb0 100644
--- a/lib/src/third_party/tar/src/constants.dart
+++ b/lib/src/third_party/tar/src/constants.dart
@@ -216,13 +216,6 @@
 /// Sticky bit
 const c_ISVTX = 512;
 
-/// **********************
-///  Convenience constants
-/// **********************
-/// 64-bit integer max and min values
-const int64MaxValue = 9223372036854775807;
-const int64MinValue = -9223372036854775808;
-
 /// Constants to determine file modes.
 const modeType = 2401763328;
 const modeSymLink = 134217728;
diff --git a/lib/src/third_party/tar/src/entry.dart b/lib/src/third_party/tar/src/entry.dart
index f6b0a5a..160974b 100644
--- a/lib/src/third_party/tar/src/entry.dart
+++ b/lib/src/third_party/tar/src/entry.dart
@@ -52,8 +52,17 @@
   TarEntry._(this.header, this.contents);
 
   /// Creates an in-memory tar entry from the [header] and the [data] to store.
-  factory TarEntry.data(TarHeader header, List<int> data) {
+  static SynchronousTarEntry data(TarHeader header, List<int> data) {
     (header as HeaderImpl).size = data.length;
-    return TarEntry(header, Stream.value(data));
+    return SynchronousTarEntry._(header, data);
   }
 }
+
+/// A tar entry stored in memory.
+class SynchronousTarEntry extends TarEntry {
+  /// The contents of this tar entry as a byte array.
+  final List<int> data;
+
+  SynchronousTarEntry._(TarHeader header, this.data)
+      : super._(header, Stream.value(data));
+}
diff --git a/lib/src/third_party/tar/src/reader.dart b/lib/src/third_party/tar/src/reader.dart
index 713235a..b9bc3d3 100644
--- a/lib/src/third_party/tar/src/reader.dart
+++ b/lib/src/third_party/tar/src/reader.dart
@@ -22,14 +22,10 @@
 /// to read each archive where possible.
 @sealed
 class TarReader implements StreamIterator<TarEntry> {
-  /// A chunked stream iterator to enable us to get our data.
-  final ChunkedStreamReader<int> _chunkedStream;
+  final BlockReader _reader;
   final PaxHeaders _paxHeaders = PaxHeaders();
   final int _maxSpecialFileSize;
 
-  /// Skip the next [_skipNext] elements when reading in the stream.
-  int _skipNext = 0;
-
   TarEntry? _current;
 
   /// The underlying content stream for the [_current] entry. Draining this
@@ -88,7 +84,7 @@
   TarReader(Stream<List<int>> tarStream,
       {int maxSpecialFileSize = defaultSpecialLength,
       bool disallowTrailingData = false})
-      : _chunkedStream = ChunkedStreamReader(tarStream),
+      : _reader = BlockReader(tarStream),
         _checkNoTrailingData = disallowTrailingData,
         _maxSpecialFileSize = maxSpecialFileSize;
 
@@ -152,13 +148,7 @@
     // iterates through one or more "header files" until it finds a
     // "normal file".
     while (true) {
-      if (_skipNext > 0) {
-        await _readFullBlock(_skipNext);
-        _skipNext = 0;
-      }
-
-      final rawHeader =
-          await _readFullBlock(blockSize, allowEmpty: eofAcceptable);
+      final rawHeader = await _readFullBlock(allowEmpty: eofAcceptable);
 
       nextHeader = await _readHeader(rawHeader);
       if (nextHeader == null) {
@@ -180,19 +170,21 @@
           nextHeader.typeFlag == TypeFlag.xGlobalHeader) {
         format = format.mayOnlyBe(TarFormat.pax);
         final paxHeaderSize = _checkSpecialSize(nextHeader.size);
-        final rawPaxHeaders = await _readFullBlock(paxHeaderSize);
+
+        final rawPaxHeaders =
+            (await _readFullBlock(amount: numBlocks(paxHeaderSize)))
+                .sublistView(0, paxHeaderSize);
 
         _paxHeaders.readPaxHeaders(
             rawPaxHeaders, nextHeader.typeFlag == TypeFlag.xGlobalHeader);
-        _markPaddingToSkip(paxHeaderSize);
 
         // This is a meta header affecting the next header.
         continue;
       } else if (nextHeader.typeFlag == TypeFlag.gnuLongLink ||
           nextHeader.typeFlag == TypeFlag.gnuLongName) {
         format = format.mayOnlyBe(TarFormat.gnu);
-        final realName = await _readFullBlock(
-            _checkSpecialSize(nextBlockSize(nextHeader.size)));
+        final size = _checkSpecialSize(nextHeader.size);
+        final realName = await _readFullBlock(amount: numBlocks(size));
 
         final readName = realName.readString(0, realName.length);
         if (nextHeader.typeFlag == TypeFlag.gnuLongName) {
@@ -247,7 +239,7 @@
 
     // Note: Calling cancel is safe when the stream has already been completed.
     // It's a noop in that case, which is what we want.
-    return _chunkedStream.cancel();
+    return _reader.close();
   }
 
   /// Utility function for quickly iterating through all entries in [tarStream].
@@ -317,7 +309,7 @@
       Uint8List block;
 
       do {
-        block = await _chunkedStream.readBytes(blockSize);
+        block = await _reader.nextBlock();
         if (!block.isAllZeroes) {
           throw TarException(
               'Illegal content after the end of the tar archive.');
@@ -333,15 +325,24 @@
     throw TarException.header('Unexpected end of file');
   }
 
-  /// Reads a block with the requested [size], or throws an unexpected EoF
-  /// exception.
-  Future<Uint8List> _readFullBlock(int size, {bool allowEmpty = false}) async {
-    final block = await _chunkedStream.readBytes(size);
-    if (block.length != size && !(allowEmpty && block.isEmpty)) {
-      _unexpectedEof();
-    }
+  /// Reads [amount] blocks from the input stream, or throws an exception if
+  /// the stream ends prematurely.
+  Future<Uint8List> _readFullBlock({bool allowEmpty = false, int amount = 1}) {
+    final blocks = Uint8List(amount * blockSize);
+    var offset = 0;
 
-    return block;
+    return _reader.nextBlocks(amount).forEach((chunk) {
+      blocks.setAll(offset, chunk);
+      offset += chunk.length;
+    }).then((void _) {
+      if (allowEmpty && offset == 0) {
+        return Uint8List(0);
+      } else if (offset < blocks.length) {
+        _unexpectedEof();
+      } else {
+        return blocks;
+      }
+    });
   }
 
   /// Reads the next block header and assumes that the underlying reader
@@ -357,7 +358,7 @@
     if (rawHeader.isEmpty) return null;
 
     if (rawHeader.isAllZeroes) {
-      rawHeader = await _chunkedStream.readBytes(blockSize);
+      rawHeader = await _reader.nextBlock();
 
       // Exactly 1 block of zeroes is read and EOF is hit.
       if (rawHeader.isEmpty) return null;
@@ -393,9 +394,9 @@
       final sparseDataLength =
           sparseData.fold<int>(0, (value, element) => value + element.length);
 
-      final streamLength = nextBlockSize(sparseDataLength);
-      final safeStream =
-          _publishStream(_chunkedStream.readStream(streamLength), streamLength);
+      final streamBlockCount = numBlocks(sparseDataLength);
+      final safeStream = _publishStream(
+          _reader.nextBlocks(streamBlockCount), streamBlockCount * blockSize);
       return sparseStream(safeStream, sparseHoles, header.size);
     } else {
       var size = header.size;
@@ -408,9 +409,8 @@
       if (size == 0) {
         return _publishStream(const Stream<Never>.empty(), 0);
       } else {
-        _markPaddingToSkip(size);
-        return _publishStream(
-            _chunkedStream.readStream(header.size), header.size);
+        final blockCount = numBlocks(header.size);
+        return _publishStream(_reader.nextBlocks(blockCount), header.size);
       }
     }
   }
@@ -424,7 +424,37 @@
     // There can only be one content stream at a time. This precondition is
     // checked by _prepareToReadHeaders.
     assert(_underlyingContentStream == null);
-    return _underlyingContentStream = Stream.eventTransformed(stream, (sink) {
+    Stream<List<int>>? thisStream;
+
+    return thisStream =
+        _underlyingContentStream = Stream.eventTransformed(stream, (sink) {
+      // This callback is called when we have a listener. Make sure that, at
+      // this point, this stream is still the active content stream.
+      // If users store the contents of a tar header, then read more tar
+      // entries, and finally try to read the stream of the old contents, they'd
+      // get an exception about the straem already being listened to.
+      // This can be a bit confusing, so this check enables a better error UX.
+      if (thisStream != _underlyingContentStream) {
+        throw StateError(
+          'Tried listening to an outdated tar entry. \n'
+          'As all tar entries found by a reader are backed by a single source '
+          'stream, only the latest tar entry can be read. It looks like you '
+          'stored the results of `tarEntry.contents` somewhere, called '
+          '`reader.moveNext()` and then read the contents of the previous '
+          'entry.\n'
+          'For more details, including a discussion of workarounds, see '
+          'https://github.com/simolus3/tar/issues/18',
+        );
+      } else if (_listenedToContentsOnce) {
+        throw StateError(
+          'A tar entry has been listened to multiple times. \n'
+          'As all tar entries are read from what\'s likely a single-'
+          'subscription stream, this is unsupported. If you didn\'t read a tar '
+          'entry multiple times yourself, perhaps you\'ve called `moveNext()` '
+          'before reading contents?',
+        );
+      }
+
       _listenedToContentsOnce = true;
 
       late _OutgoingStreamGuard guard;
@@ -432,7 +462,7 @@
         length,
         sink,
         // Reset state when the stream is done. This will only be called when
-        // the sream is done, not when a listener cancels.
+        // the stream is done, not when a listener cancels.
         () {
           _underlyingContentStream = null;
           if (guard.hadError) {
@@ -443,15 +473,6 @@
     });
   }
 
-  /// Skips to the next block after reading [readSize] bytes from the beginning
-  /// of a previous block.
-  void _markPaddingToSkip(int readSize) {
-    final offsetInLastBlock = readSize.toUnsigned(blockSizeLog2);
-    if (offsetInLastBlock != 0) {
-      _skipNext = blockSize - offsetInLastBlock;
-    }
-  }
-
   /// Checks the PAX headers for GNU sparse headers.
   /// If they are found, then this function reads the sparse map and returns it.
   /// This assumes that 0.0 headers have already been converted to 0.1 headers
@@ -519,7 +540,7 @@
     /// Ensures that [block] h as at least [n] tokens.
     Future<void> feedTokens(int n) async {
       while (newLineCount < n) {
-        final newBlock = await _chunkedStream.readBytes(blockSize);
+        final newBlock = await _readFullBlock();
         if (newBlock.length < blockSize) {
           throw TarException.header(
               'GNU Sparse Map does not have enough lines!');
@@ -639,54 +660,44 @@
       throw TarException.header('Tried to read sparse map of non-GNU header');
     }
 
+    // Read the real size of the file when sparse holes are expanded.
     header.size = rawHeader.readNumeric(483, 12);
-    final sparseMaps = <Uint8List>[];
+    final sparseEntries = <SparseEntry>[];
 
-    var sparse = rawHeader.sublistView(386, 483);
-    sparseMaps.add(sparse);
+    bool readEntry(Uint8List source, int offset) {
+      // If a sparse header starts with a null byte, it marks the end of the
+      // sparse structures.
+      if (rawHeader[offset] == 0) return false;
 
-    while (true) {
-      final maxEntries = sparse.length ~/ 24;
-      if (sparse[24 * maxEntries] > 0) {
-        // If there are more entries, read an extension header and parse its
-        // entries.
-        sparse = await _chunkedStream.readBytes(blockSize);
-        sparseMaps.add(sparse);
-        continue;
+      final fileOffset = source.readNumeric(offset, 12);
+      final length = source.readNumeric(offset + 12, 12);
+
+      sparseEntries.add(SparseEntry(fileOffset, length));
+      return true;
+    }
+
+    // The first four sparse headers are stored in the tar header itself
+    for (var i = 0; i < 4; i++) {
+      final offset = 386 + 24 * i;
+      if (!readEntry(rawHeader, offset)) break;
+    }
+
+    var isExtended = rawHeader[482] != 0;
+
+    while (isExtended) {
+      // Ok, we have a new block of sparse headers to process
+      final block = await _readFullBlock();
+
+      // A full block of sparse data contains up to 21 entries
+      for (var i = 0; i < 21; i++) {
+        if (!readEntry(block, i * 24)) break;
       }
 
-      break;
+      // The last bytes indicates whether another sparse header block follows.
+      isExtended = block[504] != 0;
     }
 
-    try {
-      return _processOldGNUSparseMap(sparseMaps);
-    } on FormatException {
-      throw TarException('Invalid old GNU Sparse Map');
-    }
-  }
-
-  /// Process [sparseMaps], which is known to be an OLD GNU v0.1 sparse map.
-  ///
-  /// For details, see https://www.gnu.org/software/tar/manual/html_section/tar_94.html#SEC191
-  List<SparseEntry> _processOldGNUSparseMap(List<Uint8List> sparseMaps) {
-    final sparseData = <SparseEntry>[];
-
-    for (final sparseMap in sparseMaps) {
-      final maxEntries = sparseMap.length ~/ 24;
-      for (var i = 0; i < maxEntries; i++) {
-        // This termination condition is identical to GNU and BSD tar.
-        if (sparseMap[i * 24] == 0) {
-          // Don't return, need to process extended headers (even if empty)
-          break;
-        }
-
-        final offset = sparseMap.readNumeric(i * 24, 12);
-        final length = sparseMap.readNumeric(i * 24 + 12, 12);
-
-        sparseData.add(SparseEntry(offset, length));
-      }
-    }
-    return sparseData;
+    return sparseEntries;
   }
 }
 
@@ -703,10 +714,6 @@
     _globalHeaders.addAll(headers);
   }
 
-  void addLocal(String key, String value) => _localHeaders[key] = value;
-
-  void removeLocal(String key) => _localHeaders.remove(key);
-
   /// Applies new local PAX-headers from the map.
   ///
   /// This replaces all currently active local headers.
@@ -792,18 +799,20 @@
       // Skip over the equals sign
       offset = nextEquals + 1;
 
-      // Subtract one for trailing newline
+      // Subtract one for trailing newline for value
       final endOfValue = endOfEntry - 1;
-      final value = utf8.decoder.convert(data, offset, endOfValue);
 
-      if (!_isValidPaxRecord(key, value)) {
+      if (!_isValidPaxKey(key)) {
         error();
       }
 
       // If we're seeing weird PAX Version 0.0 sparse keys, expect alternating
       // GNU.sparse.offset and GNU.sparse.numbytes headers.
       if (key == paxGNUSparseNumBytes || key == paxGNUSparseOffset) {
-        if ((sparseMap.length.isEven && key != paxGNUSparseOffset) ||
+        final value = utf8.decoder.convert(data, offset, endOfValue);
+
+        if (!_isValidPaxRecord(key, value) ||
+            (sparseMap.length.isEven && key != paxGNUSparseOffset) ||
             (sparseMap.length.isOdd && key != paxGNUSparseNumBytes) ||
             value.contains(',')) {
           error();
@@ -813,6 +822,12 @@
       } else if (!ignoreUnknown || supportedPaxHeaders.contains(key)) {
         // Ignore unrecognized headers to avoid unbounded growth of the global
         // header map.
+        final value = unsafeUtf8Decoder.convert(data, offset, endOfValue);
+
+        if (!_isValidPaxRecord(key, value)) {
+          error();
+        }
+
         map[key] = value;
       }
 
@@ -837,16 +852,23 @@
     }
   }
 
+  // NB: Some Tar files have malformed UTF-8 data in the headers, we should
+  // decode them anyways even if they're broken
+  static const unsafeUtf8Decoder = Utf8Decoder(allowMalformed: true);
+
+  static bool _isValidPaxKey(String key) {
+    // These limitations are documented in the PAX standard.
+    return key.isNotEmpty && !key.contains('=') & !key.codeUnits.contains(0);
+  }
+
   /// Checks whether [key], [value] is a valid entry in a pax header.
   ///
   /// This is adopted from the Golang tar reader (`validPAXRecord`), which says
   /// that "Keys and values should be UTF-8, but the number of bad writers out
   /// there forces us to be a more liberal."
   static bool _isValidPaxRecord(String key, String value) {
-    // These limitations are documented in the PAX standard.
-    if (key.isEmpty || key.contains('=')) return false;
-
-    // These aren't, but Golangs's tar has them and got away with it.
+    // These aren't documented in any standard, but Golangs's tar has them and
+    // got away with it.
     switch (key) {
       case paxPath:
       case paxLinkpath:
@@ -854,7 +876,7 @@
       case paxGname:
         return !value.codeUnits.contains(0);
       default:
-        return !key.codeUnits.contains(0);
+        return true;
     }
   }
 }
@@ -864,27 +886,50 @@
 /// [ChunkedStreamReader.readStream] might return a stream shorter than
 /// expected. That indicates an invalid tar file though, since the correct size
 /// is stored in the header.
-class _OutgoingStreamGuard extends EventSink<List<int>> {
-  final int expectedSize;
+class _OutgoingStreamGuard extends EventSink<Uint8List> {
+  int remainingContentSize;
+  int remainingPaddingSize;
+
   final EventSink<List<int>> out;
   void Function() onDone;
 
-  int emittedSize = 0;
   bool hadError = false;
+  bool isInContent = true;
 
-  _OutgoingStreamGuard(this.expectedSize, this.out, this.onDone);
+  _OutgoingStreamGuard(this.remainingContentSize, this.out, this.onDone)
+      : remainingPaddingSize = _paddingFor(remainingContentSize);
+
+  static int _paddingFor(int contentSize) {
+    final offsetInLastBlock = contentSize.toUnsigned(blockSizeLog2);
+    if (offsetInLastBlock != 0) {
+      return blockSize - offsetInLastBlock;
+    }
+    return 0;
+  }
 
   @override
-  void add(List<int> event) {
-    emittedSize += event.length;
-    // We have checks limiting the length of outgoing streams. If the stream is
-    // larger than expected, that's a bug in pkg:tar.
-    assert(
-        emittedSize <= expectedSize,
-        'Stream now emitted $emittedSize bytes, but only expected '
-        '$expectedSize');
+  void add(Uint8List event) {
+    if (isInContent) {
+      if (event.length <= remainingContentSize) {
+        // We can fully add this chunk as it consists entirely of data
+        out.add(event);
+        remainingContentSize -= event.length;
+      } else {
+        // We can add the first bytes as content, the others are padding that we
+        // shouldn't emit
+        out.add(event.sublistView(0, remainingContentSize));
+        isInContent = false;
+        remainingPaddingSize -= event.length - remainingContentSize;
+        remainingContentSize = 0;
+      }
+    } else {
+      // Ok, the entire event is padding
+      remainingPaddingSize -= event.length;
+    }
 
-    out.add(event);
+    // The underlying stream comes from pkg:tar, so if we get too many bytes
+    // that's a bug in this package.
+    assert(remainingPaddingSize >= 0, 'Stream emitted to many bytes');
   }
 
   @override
@@ -895,15 +940,14 @@
 
   @override
   void close() {
-    onDone();
-
     // If the stream stopped after an error, the user is already aware that
     // something is wrong.
-    if (emittedSize < expectedSize && !hadError) {
+    if (remainingContentSize > 0 && !hadError) {
       out.addError(
           TarException('Unexpected end of tar file'), StackTrace.current);
     }
 
+    onDone();
     out.close();
   }
 }
diff --git a/lib/src/third_party/tar/src/utils.dart b/lib/src/third_party/tar/src/utils.dart
index a6b7e13..4fa75b1 100644
--- a/lib/src/third_party/tar/src/utils.dart
+++ b/lib/src/third_party/tar/src/utils.dart
@@ -1,3 +1,4 @@
+import 'dart:async';
 import 'dart:convert';
 import 'dart:math';
 import 'dart:typed_data';
@@ -92,25 +93,31 @@
   }
 
   int computeUnsignedHeaderChecksum() {
-    var result = 0;
+    // Accessing the last element first helps the VM eliminate bounds checks in
+    // the loops below.
+    this[blockSize - 1];
+    var result = checksumLength * _checksumPlaceholder;
 
-    for (var i = 0; i < length; i++) {
-      result += (i < checksumOffset || i >= _checksumEnd)
-          ? this[i] // Not in range of where the checksum is written
-          : _checksumPlaceholder;
+    for (var i = 0; i < checksumOffset; i++) {
+      result += this[i];
+    }
+    for (var i = _checksumEnd; i < blockSize; i++) {
+      result += this[i];
     }
 
     return result;
   }
 
   int computeSignedHeaderChecksum() {
-    var result = 0;
+    this[blockSize - 1];
+    // Note that _checksumPlaceholder.toSigned(8) == _checksumPlaceholder
+    var result = checksumLength * _checksumPlaceholder;
 
-    for (var i = 0; i < length; i++) {
-      // Note that _checksumPlaceholder.toSigned(8) == _checksumPlaceholder
-      result += (i < checksumOffset || i >= _checksumEnd)
-          ? this[i].toSigned(8)
-          : _checksumPlaceholder;
+    for (var i = 0; i < checksumOffset; i++) {
+      result += this[i].toSigned(8);
+    }
+    for (var i = _checksumEnd; i < blockSize; i++) {
+      result += this[i].toSigned(8);
     }
 
     return result;
@@ -123,6 +130,14 @@
 
     return true;
   }
+
+  bool get isAllZeroes {
+    for (var i = 0; i < length; i++) {
+      if (this[i] != 0) return false;
+    }
+
+    return true;
+  }
 }
 
 bool isNotAscii(int i) => i > 128;
@@ -200,14 +215,6 @@
     final $this = this;
     return $this is Uint8List ? $this : Uint8List.fromList(this);
   }
-
-  bool get isAllZeroes {
-    for (var i = 0; i < length; i++) {
-      if (this[i] != 0) return false;
-    }
-
-    return true;
-  }
 }
 
 /// Generates a chunked stream of [length] zeroes.
@@ -229,3 +236,325 @@
     yield Uint8List(remainingBytes);
   }
 }
+
+/// An optimized reader reading 512-byte blocks from an input stream.
+class BlockReader {
+  final Stream<List<int>> _input;
+  StreamSubscription<List<int>>? _subscription;
+  bool _isClosed = false;
+
+  /// If a request is active, returns the current stream that we're reporting.
+  /// This controler is synchronous.
+  StreamController<Uint8List>? _outgoing;
+
+  /// The amount of (512-byte) blocks remaining before [_outgoing] should close.
+  int _remainingBlocksInOutgoing = 0;
+
+  /// A pending tar block that has not been emitted yet.
+  ///
+  /// This can happen if we receive small chunks of data in [_onData] that
+  /// aren't enough to form a full block.
+  final Uint8List _pendingBlock = Uint8List(blockSize);
+
+  /// The offset in [_pendingBlock] at which new data should start.
+  ///
+  /// For instance, if this value is `502`, we're missing `10` additional bytes
+  /// to complete the [_pendingBlock].
+  /// When this value is `0`, there is no active pending block.
+  int _offsetInPendingBlock = 0;
+
+  /// Additional data that we received, but were unable to dispatch to a
+  /// downstream listener yet.
+  ///
+  /// This can happen if a we receive a large chunk of data and a listener is
+  /// only interested in a small chunk.
+  ///
+  /// We will never have trailing data and a pending block at the same time.
+  /// When we haver fewer than 512 bytes of trailing data, it should be stored
+  /// as a pending block instead.
+  Uint8List? _trailingData;
+
+  /// The offset in the [_trailingData] byte array.
+  ///
+  /// When a new listener attaches, we can start by emitting the sublist
+  /// starting at this offset.
+  int _offsetInTrailingData = 0;
+
+  BlockReader(this._input);
+
+  /// Emits full blocks.
+  ///
+  /// Returns `true` if the listener detached in response to emitting these
+  /// blocks. In this case, remaining data must be saved in [_trailingData].
+  bool _emitBlocks(Uint8List data, {int amount = 1}) {
+    assert(_remainingBlocksInOutgoing >= amount);
+    final outgoing = _outgoing!;
+
+    if (!outgoing.isClosed) outgoing.add(data);
+
+    final remainingNow = _remainingBlocksInOutgoing -= amount;
+    if (remainingNow == 0) {
+      _outgoing = null;
+      _pause();
+
+      scheduleMicrotask(() {
+        outgoing.close();
+      });
+      return true;
+    } else if (outgoing.isPaused || outgoing.isClosed) {
+      _pause();
+      return true;
+    }
+
+    return false;
+  }
+
+  void _onData(List<int> data) {
+    assert(_outgoing != null && _trailingData == null);
+
+    final typedData = data.asUint8List();
+    var offsetInData = 0;
+
+    /// Saves parts of the current chunks that couldn't be emitted.
+    void saveTrailingState() {
+      assert(_trailingData == null && _offsetInPendingBlock == 0);
+
+      final remaining = typedData.length - offsetInData;
+
+      if (remaining == 0) {
+        return; // Nothing to save, the chunk has been consumed fully.
+      } else if (remaining < blockSize) {
+        // Store remaining data as a pending block.
+        _pendingBlock.setAll(0, typedData.sublistView(offsetInData));
+        _offsetInPendingBlock = remaining;
+      } else {
+        _trailingData = typedData;
+        _offsetInTrailingData = offsetInData;
+      }
+    }
+
+    // Try to complete a pending block first
+    var offsetInPending = _offsetInPendingBlock;
+    final canWriteIntoPending = min(blockSize - offsetInPending, data.length);
+
+    if (offsetInPending != 0 && canWriteIntoPending > 0) {
+      _pendingBlock.setAll(
+          offsetInPending, typedData.sublistView(0, canWriteIntoPending));
+      offsetInPending = _offsetInPendingBlock += canWriteIntoPending;
+      offsetInData += canWriteIntoPending;
+
+      // Did this finish the pending block?
+      if (offsetInPending == blockSize) {
+        _offsetInPendingBlock = 0;
+        if (_emitBlocks(_pendingBlock)) {
+          // Emitting the pending block completed all pending requests.
+          saveTrailingState();
+          return;
+        }
+      } else {
+        // The chunk we received didn't fill up the pending block, so just stop
+        // here.
+        assert(offsetInData == data.length);
+        return;
+      }
+    }
+
+    // At this point, the pending block should have been served.
+    assert(_offsetInPendingBlock == 0);
+
+    final fullBlocksToEmit = min(_remainingBlocksInOutgoing,
+        (typedData.length - offsetInData) ~/ blockSize);
+
+    if (fullBlocksToEmit > 0) {
+      _emitBlocks(
+        typedData.sublistView(
+            offsetInData, offsetInData += fullBlocksToEmit * blockSize),
+        amount: fullBlocksToEmit,
+      );
+    }
+
+    saveTrailingState();
+  }
+
+  void _onError(Object error, StackTrace trace) {
+    assert(_outgoing != null && _trailingData == null);
+
+    _outgoing!.addError(error, trace);
+  }
+
+  void _onDone() {
+    assert(_outgoing != null && _trailingData == null);
+    final outgoing = _outgoing!;
+
+    // Add pending data, then close
+    if (_offsetInPendingBlock != 0) {
+      outgoing.add(_pendingBlock.sublistView(0, _offsetInPendingBlock));
+    }
+
+    _isClosed = true;
+    _subscription?.cancel();
+    outgoing.close();
+  }
+
+  void _subscribeOrResume() {
+    // We should not resume the subscription if there is trailing data ready to
+    // be emitted.
+    assert(_trailingData == null);
+
+    final sub = _subscription;
+    if (sub == null) {
+      _subscription = _input.listen(_onData,
+          onError: _onError, onDone: _onDone, cancelOnError: true);
+    } else {
+      sub.resume();
+    }
+  }
+
+  void _pause() {
+    final sub = _subscription!; // ignore: cancel_subscriptions
+
+    if (!sub.isPaused) sub.pause();
+  }
+
+  Future<Uint8List> nextBlock() {
+    final result = Uint8List(blockSize);
+    var offset = 0;
+
+    return nextBlocks(1).forEach((chunk) {
+      result.setAll(offset, chunk);
+      offset += chunk.length;
+    }).then((void _) => result.sublistView(0, offset));
+  }
+
+  Stream<Uint8List> nextBlocks(int amount) {
+    if (_isClosed || amount == 0) {
+      return const Stream.empty();
+    }
+    if (_outgoing != null) {
+      throw StateError(
+          'Cannot call nextBlocks() before the previous stream completed.');
+    }
+    assert(_remainingBlocksInOutgoing == 0);
+
+    // We're making this synchronous because we will mostly add events in
+    // response to receiving chunks from the source stream. We manually ensure
+    // that other emits are happening asynchronously.
+    final controller = StreamController<Uint8List>(sync: true);
+    _outgoing = controller;
+    _remainingBlocksInOutgoing = amount;
+
+    var state = _StreamState.initial;
+
+    /// Sends trailing data to the stream. Reeturns true if the subscription
+    /// should still be resumed afterwards.
+    bool emitTrailing() {
+      // Attempt to serve requests from pending data first.
+      final trailing = _trailingData;
+      if (trailing != null) {
+        // There should never be trailing data and a pending block at the
+        // same time
+        assert(_offsetInPendingBlock == 0);
+
+        var remaining = trailing.length - _offsetInTrailingData;
+        // If there is trailing data, it should contain a full block
+        // (otherwise we would have stored it as a pending block)
+        assert(remaining >= blockSize);
+
+        final blocks = min(_remainingBlocksInOutgoing, remaining ~/ blockSize);
+        assert(blocks > 0);
+
+        final done = _emitBlocks(
+            trailing.sublistView(_offsetInTrailingData,
+                _offsetInTrailingData + blocks * blockSize),
+            amount: blocks);
+
+        remaining -= blocks * blockSize;
+        _offsetInTrailingData += blocks * blockSize;
+
+        if (remaining == 0) {
+          _trailingData = null;
+          _offsetInTrailingData = 0;
+        } else if (remaining < blockSize) {
+          assert(_offsetInPendingBlock == 0);
+
+          // Move trailing data into a pending block
+          _pendingBlock.setAll(0, trailing.sublistView(_offsetInTrailingData));
+          _offsetInPendingBlock = remaining;
+          _trailingData = null;
+          _offsetInTrailingData = 0;
+        } else {
+          // If there is still more than a full block of data waiting, we
+          // should not listen. This implies that the stream is done already.
+          assert(done);
+        }
+
+        // The listener detached in response to receiving the event.
+        if (done) {
+          if (_remainingBlocksInOutgoing == 0) state = _StreamState.done;
+          return false;
+        }
+      }
+
+      return true;
+    }
+
+    void scheduleInitialEmit() {
+      scheduleMicrotask(() {
+        if (state != _StreamState.initial) return;
+        state = _StreamState.attached;
+
+        if (emitTrailing()) {
+          _subscribeOrResume();
+        }
+      });
+    }
+
+    controller
+      ..onListen = scheduleInitialEmit
+      ..onPause = () {
+        assert(state == _StreamState.initial || state == _StreamState.attached);
+
+        if (state == _StreamState.initial) {
+          state = _StreamState.pausedAfterInitial;
+        } else {
+          _pause();
+          state = _StreamState.pausedAfterAttached;
+        }
+      }
+      ..onResume = () {
+        // We're done already
+        if (_remainingBlocksInOutgoing == 0) return;
+
+        assert(state == _StreamState.pausedAfterAttached ||
+            state == _StreamState.pausedAfterInitial);
+
+        if (state == _StreamState.pausedAfterInitial) {
+          state = _StreamState.initial;
+          scheduleInitialEmit();
+        } else {
+          state = _StreamState.attached;
+          if (emitTrailing()) {
+            _subscribeOrResume();
+          }
+        }
+      }
+      ..onCancel = () {
+        state = _StreamState.done;
+      };
+
+    return controller.stream;
+  }
+
+  FutureOr<void> close() {
+    _isClosed = true;
+    return _subscription?.cancel();
+  }
+}
+
+enum _StreamState {
+  initial,
+  attached,
+  pausedAfterInitial,
+  pausedAfterAttached,
+  done,
+}
diff --git a/lib/src/third_party/tar/src/writer.dart b/lib/src/third_party/tar/src/writer.dart
index 71ab368..5ff92b9 100644
--- a/lib/src/third_party/tar/src/writer.dart
+++ b/lib/src/third_party/tar/src/writer.dart
@@ -99,6 +99,33 @@
   return _WritingSink(output, format);
 }
 
+/// A synchronous encoder for in-memory tar files.
+///
+/// The default [tarWriter] creates an asynchronous conversion from a stream of
+/// tar entries to a byte stream.
+/// When all tar entries are in-memory ([SynchronousTarEntry]), it is possible
+/// to write them synchronously too.
+///
+/// To create a tar archive consisting of a single entry, use
+/// [Converter.convert] on this [tarConverter].
+/// To create a tar archive consisting of any number of entries, first call
+/// [Converter.startChunkedConversion] with a suitable output sink. Next, call
+/// [Sink.add] for each tar entry and finish the archive by calling
+/// [Sink.close].
+///
+/// To change the output format of the tar converter, use [tarConverterWith].
+/// To encode any kind of tar entries, use the asynchronous [tarWriter].
+const Converter<SynchronousTarEntry, List<int>> tarConverter =
+    _SynchronousTarConverter(OutputFormat.pax);
+
+/// A synchronous encoder for in-memory tar files, with custom encoding options.
+///
+/// For more information on how to use the converter, see [tarConverter].
+Converter<SynchronousTarEntry, List<int>> tarConverterWith(
+    {OutputFormat format = OutputFormat.pax}) {
+  return _SynchronousTarConverter(format);
+}
+
 /// This option controls how long file and link names should be written.
 ///
 /// This option can be passed to writer in [tarWritingSink] or[tarWriterWith].
@@ -127,16 +154,15 @@
 
 class _WritingSink extends StreamSink<TarEntry> {
   final StreamSink<List<int>> _output;
-  final OutputFormat format;
-
-  int _paxHeaderCount = 0;
+  final _SynchronousTarSink _synchronousWriter;
   bool _closed = false;
   final Completer<Object?> _done = Completer();
 
   int _pendingOperations = 0;
   Future<void> _ready = Future.value();
 
-  _WritingSink(this._output, this.format);
+  _WritingSink(this._output, OutputFormat format)
+      : _synchronousWriter = _SynchronousTarSink(_output, format);
 
   @override
   Future<void> get done => _done.future;
@@ -175,6 +201,120 @@
       size = bufferedData.length;
     }
 
+    _synchronousWriter._writeHeader(header, size);
+
+    // Write content.
+    if (bufferedData != null) {
+      _output.add(bufferedData);
+    } else {
+      await _output.addStream(event.contents);
+    }
+
+    _output.add(_paddingBytes(size));
+  }
+
+  @override
+  void addError(Object error, [StackTrace? stackTrace]) {
+    _output.addError(error, stackTrace);
+  }
+
+  @override
+  Future<void> addStream(Stream<TarEntry> stream) async {
+    await for (final entry in stream) {
+      await add(entry);
+    }
+  }
+
+  @override
+  Future<void> close() async {
+    if (!_closed) {
+      _closed = true;
+
+      // Add two empty blocks at the end.
+      await _doWork(_synchronousWriter.close);
+    }
+
+    return done;
+  }
+}
+
+Uint8List _paddingBytes(int size) {
+  final padding = -size % blockSize;
+  assert((size + padding) % blockSize == 0 &&
+      padding <= blockSize &&
+      padding >= 0);
+
+  return Uint8List(padding);
+}
+
+class _SynchronousTarConverter
+    extends Converter<SynchronousTarEntry, List<int>> {
+  final OutputFormat format;
+
+  const _SynchronousTarConverter(this.format);
+
+  @override
+  Sink<SynchronousTarEntry> startChunkedConversion(Sink<List<int>> sink) {
+    return _SynchronousTarSink(sink, format);
+  }
+
+  @override
+  List<int> convert(SynchronousTarEntry input) {
+    final output = BytesBuilder(copy: false);
+    startChunkedConversion(ByteConversionSink.withCallback(output.add))
+      ..add(input)
+      ..close();
+
+    return output.takeBytes();
+  }
+}
+
+class _SynchronousTarSink extends Sink<SynchronousTarEntry> {
+  final OutputFormat _format;
+  final Sink<List<int>> _output;
+
+  bool _closed = false;
+  int _paxHeaderCount = 0;
+
+  _SynchronousTarSink(this._output, this._format);
+
+  @override
+  void add(SynchronousTarEntry data) {
+    addHeaderAndData(data.header, data.data);
+  }
+
+  void addHeaderAndData(TarHeader header, List<int> data) {
+    _throwIfClosed();
+
+    _writeHeader(header, data.length);
+    _output
+      ..add(data)
+      ..add(_paddingBytes(data.length));
+  }
+
+  @override
+  void close() {
+    if (_closed) return;
+
+    // End the tar archive by writing two zero blocks.
+    _output
+      ..add(UnmodifiableUint8ListView(zeroBlock))
+      ..add(UnmodifiableUint8ListView(zeroBlock));
+    _output.close();
+
+    _closed = true;
+  }
+
+  void _throwIfClosed() {
+    if (_closed) {
+      throw StateError('Encoder is closed. '
+          'After calling `endOfArchive()`, encoder must not be used.');
+    }
+  }
+
+  void _writeHeader(TarHeader header, int size) {
+    assert(header.size < 0 || header.size == size);
+
     var nameBytes = utf8.encode(header.name);
     var linkBytes = utf8.encode(header.linkName ?? '');
     var gnameBytes = utf8.encode(header.groupName ?? '');
@@ -209,10 +349,10 @@
     }
 
     if (paxHeader.isNotEmpty) {
-      if (format == OutputFormat.pax) {
-        await _writePaxHeader(paxHeader);
+      if (_format == OutputFormat.pax) {
+        _writePaxHeader(paxHeader);
       } else {
-        await _writeGnuLongName(paxHeader);
+        _writeGnuLongName(paxHeader);
       }
     }
 
@@ -238,24 +378,13 @@
       checksum += byte;
     }
     headerBlock.setUint(checksum, 148, 8);
-
     _output.add(headerBlock);
-
-    // Write content.
-    if (bufferedData != null) {
-      _output.add(bufferedData);
-    } else {
-      await event.contents.forEach(_output.add);
-    }
-
-    final padding = -size % blockSize;
-    _output.add(Uint8List(padding));
   }
 
-  /// Writes an extended pax header.
+  /// Encodes an extended pax header.
   ///
   /// https://pubs.opengroup.org/onlinepubs/9699919799/utilities/pax.html#tag_20_92_13_03
-  Future<void> _writePaxHeader(Map<String, List<int>> values) {
+  void _writePaxHeader(Map<String, List<int>> values) {
     final buffer = BytesBuilder();
     // format of each entry: "%d %s=%s\n", <length>, <keyword>, <value>
     // note that the length includes the trailing \n and the length description
@@ -287,7 +416,7 @@
     });
 
     final paxData = buffer.takeBytes();
-    final file = TarEntry.data(
+    addHeaderAndData(
       HeaderImpl.internal(
         format: TarFormat.pax,
         modified: millisecondsSinceEpoch(0),
@@ -298,10 +427,9 @@
       ),
       paxData,
     );
-    return _safeAdd(file);
   }
 
-  Future<void> _writeGnuLongName(Map<String, List<int>> values) async {
+  void _writeGnuLongName(Map<String, List<int>> values) {
     // Ensure that a file that can't be written in the GNU format is not written
     const allowedKeys = {paxPath, paxLinkpath};
     final invalidOptions = values.keys.toSet()..removeAll(allowedKeys);
@@ -316,54 +444,25 @@
     final name = values[paxPath];
     final linkName = values[paxLinkpath];
 
-    Future<void> write(List<int> name, TypeFlag flag) {
-      return _safeAdd(
-        TarEntry.data(
-          HeaderImpl.internal(
-            name: '././@LongLink',
-            modified: millisecondsSinceEpoch(0),
-            format: TarFormat.gnu,
-            typeFlag: flag,
-          ),
-          name,
+    void create(List<int> name, TypeFlag flag) {
+      return addHeaderAndData(
+        HeaderImpl.internal(
+          name: '././@LongLink',
+          modified: millisecondsSinceEpoch(0),
+          format: TarFormat.gnu,
+          typeFlag: flag,
         ),
+        name,
       );
     }
 
     if (name != null) {
-      await write(name, TypeFlag.gnuLongName);
+      create(name, TypeFlag.gnuLongName);
     }
     if (linkName != null) {
-      await write(linkName, TypeFlag.gnuLongLink);
+      create(linkName, TypeFlag.gnuLongLink);
     }
   }
-
-  @override
-  void addError(Object error, [StackTrace? stackTrace]) {
-    _output.addError(error, stackTrace);
-  }
-
-  @override
-  Future<void> addStream(Stream<TarEntry> stream) async {
-    await for (final entry in stream) {
-      await add(entry);
-    }
-  }
-
-  @override
-  Future<void> close() async {
-    if (!_closed) {
-      _closed = true;
-
-      // Add two empty blocks at the end.
-      await _doWork(() {
-        _output.add(zeroBlock);
-        _output.add(zeroBlock);
-      });
-    }
-
-    return done;
-  }
 }
 
 extension on Uint8List {
diff --git a/lib/src/third_party/tar/tar.dart b/lib/src/third_party/tar/tar.dart
index 218a6a2..14247bd 100644
--- a/lib/src/third_party/tar/tar.dart
+++ b/lib/src/third_party/tar/tar.dart
@@ -9,7 +9,7 @@
 import 'src/writer.dart';
 
 export 'src/constants.dart' show TypeFlag;
-export 'src/entry.dart';
+export 'src/entry.dart' show TarEntry, SynchronousTarEntry;
 export 'src/exception.dart';
 export 'src/format.dart';
 export 'src/header.dart' show TarHeader;
diff --git a/test/io_test.dart b/test/io_test.dart
index 2c529cd..0657dd6 100644
--- a/test/io_test.dart
+++ b/test/io_test.dart
@@ -291,7 +291,7 @@
     test(
       'applies executable bits from tar file',
       () => withTempDir((tempDir) async {
-        final entry = Stream.value(TarEntry.data(
+        final entry = Stream<TarEntry>.value(TarEntry.data(
             TarHeader(
               name: 'weird_exe',
               typeFlag: TypeFlag.reg,
@@ -309,7 +309,7 @@
 
     test('extracts files and links', () {
       return withTempDir((tempDir) async {
-        final entries = Stream.fromIterable([
+        final entries = Stream<TarEntry>.fromIterable([
           TarEntry.data(
             TarHeader(name: 'lib/main.txt', typeFlag: TypeFlag.reg),
             utf8.encode('text content'),
@@ -349,7 +349,7 @@
 
     test('preserves empty directories', () {
       return withTempDir((tempDir) async {
-        final entry = Stream.value(TarEntry.data(
+        final entry = Stream<TarEntry>.value(TarEntry.data(
             TarHeader(
               name: 'bin/',
               typeFlag: TypeFlag.dir,
@@ -368,7 +368,7 @@
 
     test('throws for entries escaping the tar file', () {
       return withTempDir((tempDir) async {
-        final entry = Stream.value(TarEntry.data(
+        final entry = Stream<TarEntry>.value(TarEntry.data(
             TarHeader(
               name: '../other_package-1.2.3/lib/file.dart',
               typeFlag: TypeFlag.reg,
@@ -386,7 +386,7 @@
 
     test('skips symlinks escaping the tar file', () {
       return withTempDir((tempDir) async {
-        final entry = Stream.value(TarEntry.data(
+        final entry = Stream<TarEntry>.value(TarEntry.data(
             TarHeader(
               name: 'nested/bad_link',
               typeFlag: TypeFlag.symlink,
@@ -403,7 +403,7 @@
 
     test('skips hardlinks escaping the tar file', () {
       return withTempDir((tempDir) async {
-        final entry = Stream.value(TarEntry.data(
+        final entry = Stream<TarEntry>.value(TarEntry.data(
             TarHeader(
               name: 'nested/bad_link',
               typeFlag: TypeFlag.link,