Use optimized stream chunker
diff --git a/lib/src/reader.dart b/lib/src/reader.dart
index f1877aa..df89c02 100644
--- a/lib/src/reader.dart
+++ b/lib/src/reader.dart
@@ -22,7 +22,7 @@
 @sealed
 class TarReader implements StreamIterator<TarEntry> {
   /// A chunked stream iterator to enable us to get our data.
-  final StreamIterator<Uint8List> _chunkedStream;
+  final StreamBlockReader _reader;
   final PaxHeaders _paxHeaders = PaxHeaders();
   final int _maxSpecialFileSize;
 
@@ -84,7 +84,7 @@
   TarReader(Stream<List<int>> tarStream,
       {int maxSpecialFileSize = defaultSpecialLength,
       bool disallowTrailingData = false})
-      : _chunkedStream = StreamIterator(tarStream.inChunks),
+      : _reader = StreamBlockReader(tarStream),
         _checkNoTrailingData = disallowTrailingData,
         _maxSpecialFileSize = maxSpecialFileSize;
 
@@ -236,7 +236,7 @@
 
     // Note: Calling cancel is safe when the stream has already been completed.
     // It's a noop in that case, which is what we want.
-    return _chunkedStream.cancel();
+    return _reader.close();
   }
 
   /// Utility function for quickly iterating through all entries in [tarStream].
@@ -307,12 +307,12 @@
       Uint8List? block;
 
       do {
-        block = await _chunkedStream.nextOrNull;
-        if (block != null && !block.isAllZeroes) {
+        block = await _reader.nextBlock();
+        if (!block.isAllZeroes) {
           throw TarException(
               'Illegal content after the end of the tar archive.');
         }
-      } while (block != null && block.length == blockSize);
+      } while (block.length == blockSize);
       // The stream is done when we couldn't read the full block.
     }
 
@@ -325,7 +325,7 @@
 
   /// Reads a 512-byte block or throws if [allowEmpty] is false.
   Future<Uint8List> _readFullBlock({bool allowEmpty = false}) async {
-    final block = await _chunkedStream.nextOrNull ?? emptyUint8List;
+    final block = await _reader.nextBlock();
     if (block.length != blockSize && !(allowEmpty && block.isEmpty)) {
       _unexpectedEof();
     }
@@ -337,8 +337,8 @@
     final result = Uint8List(blockSize * amount);
 
     for (var i = 0; i < amount; i++) {
-      final chunk = await _chunkedStream.nextOrNull;
-      if (chunk == null || chunk.length != blockSize) {
+      final chunk = await _reader.nextBlock();
+      if (chunk.length != blockSize) {
         _unexpectedEof();
       }
 
@@ -361,10 +361,10 @@
     if (rawHeader.isEmpty) return null;
 
     if (rawHeader.isAllZeroes) {
-      final next = await _chunkedStream.nextOrNull;
+      final next = await _reader.nextBlock();
 
       // Exactly 1 block of zeroes is read and EOF is hit.
-      if (next == null) return null;
+      if (next.isEmpty) return null;
 
       if (next.isAllZeroes) {
         // Two blocks of zeros are read - Normal EOF.
@@ -399,7 +399,7 @@
 
       final streamBlockCount = numBlocks(sparseDataLength);
       final safeStream = _publishStream(
-          _chunkedStream.nextElements(streamBlockCount), sparseDataLength);
+          _reader.nextBlocks(streamBlockCount), sparseDataLength);
       return sparseStream(safeStream, sparseHoles, header.size);
     } else {
       var size = header.size;
@@ -413,8 +413,7 @@
         return _publishStream(const Stream<Never>.empty(), 0);
       } else {
         final blockCount = numBlocks(header.size);
-        return _publishStream(
-            _chunkedStream.nextElements(blockCount), header.size);
+        return _publishStream(_reader.nextBlocks(blockCount), header.size);
       }
     }
   }
@@ -514,8 +513,8 @@
     /// Ensures that [block] h as at least [n] tokens.
     Future<void> feedTokens(int n) async {
       while (newLineCount < n) {
-        final newBlock = await _chunkedStream.nextOrNull;
-        if (newBlock == null || newBlock.length < blockSize) {
+        final newBlock = await _reader.nextBlock();
+        if (newBlock.length < blockSize) {
           throw TarException.header(
               'GNU Sparse Map does not have enough lines!');
         }
@@ -659,8 +658,8 @@
 
     while (isExtended) {
       // Ok, we have a new block of sparse headers to process
-      final block = await _chunkedStream.nextOrNull;
-      if (block == null || block.length < blockSize) {
+      final block = await _reader.nextBlock();
+      if (block.length < blockSize) {
         throw TarException.header('Unexpected EoF while reading sparse maps');
       }
 
@@ -852,40 +851,48 @@
 /// expected. That indicates an invalid tar file though, since the correct size
 /// is stored in the header.
 class _OutgoingStreamGuard extends EventSink<Uint8List> {
-  final int size;
-  final int expectedBlockCount;
   final EventSink<List<int>> out;
   void Function() onDone;
 
-  int emittedBlocks = 0;
+  int remainingContentSize;
+  int remainingPaddingSize;
+  bool isInContent = true;
   bool hadError = false;
 
-  _OutgoingStreamGuard(this.size, this.out, this.onDone)
-      : expectedBlockCount = numBlocks(size);
+  _OutgoingStreamGuard(this.remainingContentSize, this.out, this.onDone)
+      : remainingPaddingSize = _paddingFor(remainingContentSize);
+
+  static int _paddingFor(int contentSize) {
+    final offsetInLastBlock = contentSize.toUnsigned(blockSizeLog2);
+    if (offsetInLastBlock != 0) {
+      return blockSize - offsetInLastBlock;
+    }
+    return 0;
+  }
 
   @override
   void add(Uint8List event) {
-    if (event.length != blockSize) {
-      addError(TarException('Unexpected end of tar file'), StackTrace.current);
-      return;
-    }
-
-    emittedBlocks++;
-    // We have checks limiting the length of outgoing streams. If the stream is
-    // larger than expected, that's a bug in pkg:tar.
-    assert(
-        emittedBlocks <= expectedBlockCount,
-        'Stream now emitted $emittedBlocks blocks, but only expected '
-        '$expectedBlockCount');
-
-    if (emittedBlocks == expectedBlockCount) {
-      // This is the last block, which we may have to truncate because tar
-      // blocks are padded with zeroes at the end
-      out.add(event.sublistView(0, size.toUnsigned(blockSizeLog2)));
+    if (isInContent) {
+      if (event.length < remainingContentSize) {
+        // We can fully add this chunk as it consists entirely of data
+        out.add(event);
+        remainingContentSize -= event.length;
+      } else {
+        // We can add the first bytes as content, the others are padding that we
+        // shouldn't emit
+        out.add(event.sublistView(0, remainingContentSize));
+        isInContent = false;
+        remainingPaddingSize -= event.length - remainingContentSize;
+        remainingContentSize = 0;
+      }
     } else {
-      // This blocks just goes out fully
-      out.add(event);
+      // Ok, the entire event is padding
+      remainingPaddingSize -= event.length;
     }
+
+    // The underlying stream comes from pkg:tar, so if we get too many bytes
+    // that's a bug in this package.
+    assert(remainingPaddingSize >= 0, 'Stream emitted to many bytes');
   }
 
   @override
@@ -896,15 +903,15 @@
 
   @override
   void close() {
-    onDone();
-
     // If the stream stopped after an error, the user is already aware that
     // something is wrong.
-    if (emittedBlocks < expectedBlockCount && !hadError) {
+    if (remainingContentSize > 0 && !hadError) {
+      hadError = true;
       out.addError(
           TarException('Unexpected end of tar file'), StackTrace.current);
     }
 
+    onDone();
     out.close();
   }
 }
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index 1a0b018..c55917b 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -231,226 +231,323 @@
   }
 }
 
-Stream<Uint8List> _inChunks(Stream<List<int>> input) {
-  Uint8List? startedChunk;
-  int missingForNextChunk = 0;
+class StreamBlockReader {
+  final Stream<List<int>> _source;
+  StreamSubscription<List<int>>? _inputSubscription;
 
-  Uint8List? pendingEvent;
-  int offsetInPendingEvent = 0;
+  Completer<Uint8List>? _nextBlockCompleter;
 
-  late StreamSubscription<List<int>> inputSubscription;
-  final controller = StreamController<Uint8List>(sync: true);
+  StreamController<Uint8List>? _nextBlocksStream;
+  int _remainingBlocksInStream = 0;
 
-  var isResuming = false;
+  Uint8List? _pendingBlock;
+  int _missingInPendingBlock = 0;
 
-  void startChunk(Uint8List source, int startOffset) {
-    assert(startedChunk == null);
+  Uint8List? _waitingForDispatch;
+  int _offsetInWaiting = 0;
+
+  bool _closed = false;
+
+  StreamBlockReader(this._source);
+
+  int get _blocksToServe {
+    if (_nextBlockCompleter != null) {
+      return 1;
+    } else {
+      return _remainingBlocksInStream;
+    }
+  }
+
+  void _listenOrResume() {
+    final subsription = _inputSubscription;
+    if (subsription == null) {
+      _inputSubscription =
+          _source.listen(_onData, onError: _onError, onDone: _onDone);
+    } else {
+      subsription.resume();
+    }
+  }
+
+  /// Emits ready [blocks].
+  ///
+  /// Returns whether we should pause emitting further data afterwards.
+  bool _emitBlocks(Uint8List blocks, {int amount = 1}) {
+    final stream = _nextBlocksStream;
+    final future = _nextBlockCompleter;
+    final subscription = _inputSubscription;
+    assert((future == null) ^ (stream == null));
+    assert(subscription != null);
+
+    final bool shouldPause;
+
+    if (stream != null) {
+      stream.add(blocks);
+      final remaining = _remainingBlocksInStream -= amount;
+      assert(remaining >= 0);
+
+      final done = remaining == 0;
+      if (done) {
+        stream.close();
+        _nextBlocksStream = null;
+      }
+
+      shouldPause = done || stream.isPausedOrClosed;
+    } else if (future != null) {
+      assert(amount == 1);
+      future.complete(blocks);
+      _nextBlockCompleter = null;
+      shouldPause = true;
+    } else {
+      // Exactly one of them must be non-null
+      throw AssertionError();
+    }
+
+    if (shouldPause && !subscription!.isPaused) {
+      subscription.pause();
+    }
+    return shouldPause;
+  }
+
+  void _startPendingBlock(Uint8List source, int startOffset) {
+    assert(_pendingBlock == null);
     final availableData = source.length - startOffset;
     assert(availableData < blockSize);
 
-    startedChunk = Uint8List(blockSize)
+    _pendingBlock = Uint8List(blockSize)
       ..setAll(0, source.sublistView(startOffset));
-    missingForNextChunk = blockSize - availableData;
+    _missingInPendingBlock = blockSize - availableData;
   }
 
-  void handleData(List<int> data) {
-    assert(pendingEvent == null,
-        'Had pending events while receiving new data from source.');
-    final typedData = data.asUint8List();
+  void _saveNewUndispatchedState(Uint8List chunk, int offsetInChunk) {
+    assert(_waitingForDispatch == null);
+    _saveUndispatchedState(chunk, offsetInChunk);
+  }
+
+  void _saveUndispatchedState(Uint8List chunk, int offsetInChunk) {
+    assert(_pendingBlock == null);
+    final remaining = chunk.length - offsetInChunk;
+    if (remaining < blockSize) {
+      _waitingForDispatch = null;
+      _offsetInWaiting = 0;
+      _startPendingBlock(chunk, offsetInChunk);
+    } else {
+      _waitingForDispatch = chunk;
+      _offsetInWaiting = offsetInChunk;
+    }
+  }
+
+  void _onData(List<int> chunk) {
+    assert(_waitingForDispatch == null,
+        'Had pending data while receiving new event');
+
+    final typedChunk = chunk.asUint8List();
 
     // The start offset of the new data that we didn't process yet.
     var offsetInData = 0;
-    bool saveStateIfPaused() {
-      if (controller.isPausedOrClosed) {
-        pendingEvent = typedData;
-        offsetInPendingEvent = offsetInData;
-        return true;
-      }
-      return false;
+
+    void savePendingChunk() {
+      _saveNewUndispatchedState(typedChunk, offsetInData);
     }
 
-    // Try completing the pending chunk first, if it exists
-    if (startedChunk != null) {
-      final started = startedChunk!;
-      final startOffsetInStarted = blockSize - missingForNextChunk;
+    // Try finishing the pending block first, if it exists.
+    if (_pendingBlock != null) {
+      final started = _pendingBlock!;
+      final missing = _missingInPendingBlock;
+      final startOffsetInStarted = blockSize - missing;
 
-      if (data.length >= missingForNextChunk) {
-        // Fill up the chunk, then go on with the remaining data
-        started.setAll(startOffsetInStarted,
-            typedData.sublistView(0, missingForNextChunk));
-        controller.add(started);
-        offsetInData += missingForNextChunk;
+      if (typedChunk.length >= missing) {
+        // We can complete the block
+        started.setAll(
+            startOffsetInStarted, typedChunk.sublistView(0, missing));
+        offsetInData = missing;
+        final done = _emitBlocks(started);
 
-        // We just finished serving the started chunk, so reset that
-        startedChunk = null;
-        missingForNextChunk = 0;
+        // We just finished serving the started block, so reset that
+        _pendingBlock = null;
+        _missingInPendingBlock = 0;
 
-        // If the controller was paused in a response to the add, stop serving
-        // events and store the rest of the input for later
-        if (saveStateIfPaused()) return;
+        if (done) {
+          savePendingChunk();
+          return;
+        }
       } else {
-        // Ok, we can't finish the pending chunk with the new data but at least
+        // We can't complete the pending block with the new data, but at least
         // we can continue filling it up
-        started.setAll(startOffsetInStarted, typedData);
-        missingForNextChunk -= typedData.length;
+        started.setAll(startOffsetInStarted, typedChunk);
+        _missingInPendingBlock -= typedChunk.length;
+        assert(_missingInPendingBlock > 0);
         return;
       }
     }
 
-    // The started chunk has been completed, continue by adding chunks as they
-    // come.
-    assert(startedChunk == null);
+    // When we get to this point, the started chunk has been completed and we
+    // can continue adding chunks as they come.
+    assert(_pendingBlock == null);
 
-    while (offsetInData < typedData.length) {
-      // Can we serve a full block from the new event
-      if (offsetInData <= typedData.length - blockSize) {
-        // Yup, then let's do that
-        final end = offsetInData + blockSize;
-        controller.add(typedData.sublistView(offsetInData, end));
-        offsetInData = end;
+    // Serve as many blocks as we can from the current chunk
+    final blocksToServe = _blocksToServe;
+    assert(blocksToServe >= 1);
+    final availableBlocksInChunk =
+        (typedChunk.length - offsetInData) ~/ blockSize;
+    final blocksToDeliverNow = min(blocksToServe, availableBlocksInChunk);
 
-        // Once again, stop and save state if the controller was paused.
-        if (saveStateIfPaused()) return;
+    if (blocksToDeliverNow > 0) {
+      final length = blockSize * blocksToDeliverNow;
+      final end = offsetInData + length;
+      final done = _emitBlocks(typedChunk.sublistView(offsetInData, end),
+          amount: blocksToDeliverNow);
+      offsetInData = end;
+
+      // Once again, stop and save state if this has completed the output
+      if (done) {
+        savePendingChunk();
+        return;
+      }
+    }
+
+    // If we're here, the remaining data in the chunk was not large enough to
+    // serve a full block. Store the remaining data in a pending block.
+    _startPendingBlock(typedChunk, offsetInData);
+  }
+
+  void _onError(Object error, [StackTrace? trace]) {
+    final stream = _nextBlocksStream;
+    final future = _nextBlockCompleter;
+
+    if (stream != null) {
+      stream.addError(error, trace);
+    } else if (future != null) {
+      future.completeError(error, trace);
+      _nextBlockCompleter = null;
+    }
+
+    final subscription = _inputSubscription;
+    if (!subscription!.isPaused) {
+      subscription.pause();
+    }
+  }
+
+  void _onDone() {
+    if (_blocksToServe > 0) {
+      if (_pendingBlock != null) {
+        _emitBlocks(
+            _pendingBlock!.sublistView(0, blockSize - _missingInPendingBlock));
+        _pendingBlock = null;
+        _missingInPendingBlock = 0;
       } else {
-        // Ok, no full block but we can start a new pending chunk
-        startChunk(typedData, offsetInData);
-        break;
+        // Complete with an empty block
+        _emitBlocks(emptyUint8List);
       }
+
+      _inputSubscription = null;
+    }
+    close();
+  }
+
+  void _checkIdle(String method) {
+    if (_nextBlockCompleter != null) {
+      throw StateError("Can't call $method before a previous call to "
+          'nextBlock() has completed.');
+    }
+    if (_nextBlocksStream != null) {
+      throw StateError("Can't call $method before a previous call to "
+          'nextBlocks() was completed.');
     }
   }
 
-  void startResume() {
-    isResuming = false;
-    if (controller.isPausedOrClosed) return;
-
-    // Start dispatching pending events before we resume the subscription on the
-    // main stream.
-    if (pendingEvent != null) {
-      final pending = pendingEvent!;
-      assert(startedChunk == null, 'Had pending events and a started chunk');
-
-      while (offsetInPendingEvent < pending.length) {
-        // Can we serve a full block from pending data?
-        if (offsetInPendingEvent <= pending.length - blockSize) {
-          final end = offsetInPendingEvent + blockSize;
-          controller.add(pending.sublistView(offsetInPendingEvent, end));
-          offsetInPendingEvent = end;
-
-          // Pause if we got a pause request in response to the add
-          if (controller.isPausedOrClosed) return;
-        } else {
-          // Store pending block that we can't finish with the pending chunk.
-          startChunk(pending, offsetInPendingEvent);
-          break;
-        }
-      }
-
-      // Pending events have been dispatched
-      offsetInPendingEvent = 0;
-      pendingEvent = null;
-    }
-
-    // When we're here, all pending events have been dispatched and the
-    // controller is not paused. Let's continue then!
-    inputSubscription.resume();
-  }
-
-  void onDone() {
-    // Input stream is done. If we have a block left over, emit that now
-    if (startedChunk != null) {
-      controller
-          .add(startedChunk!.sublistView(0, blockSize - missingForNextChunk));
-    }
-    controller.close();
-  }
-
-  controller
-    ..onListen = () {
-      inputSubscription = input.listen(handleData,
-          onError: controller.addError, onDone: onDone);
-    }
-    ..onPause = () {
-      if (!inputSubscription.isPaused) {
-        inputSubscription.pause();
-      }
-    }
-    ..onResume = () {
-      // This is a bit hacky. Our subscription is most likely going to be a
-      // _BufferingStreamSubscription which will buffer events that were added
-      // in this callback. However, we really want to know when the subscription
-      // has been paused in response to a new event because we can handle that
-      // very efficiently.
-      if (!isResuming) {
-        scheduleMicrotask(startResume);
-      }
-      isResuming = true;
-    }
-    ..onCancel = () {
-      inputSubscription.cancel();
-    };
-
-  return controller.stream;
-}
-
-extension on StreamController<dynamic> {
-  bool get isPausedOrClosed => isPaused || isClosed;
-}
-
-extension ReadInChunks on Stream<List<int>> {
-  /// A stream emitting values in chunks of `512` byte blocks.
+  /// Reads the next block from the input stream.
   ///
-  /// The last emitted chunk may be shorter than the regular block length.
-  Stream<Uint8List> get inChunks => _inChunks(this);
-}
+  /// The returned list may be empty or smaller than [blockSize] if the input
+  /// stream ends.
+  Future<Uint8List> nextBlock() {
+    _checkIdle('nextBlock');
+    if (_closed) return Future.value(emptyUint8List);
 
-extension NextOrNull<T extends Object> on StreamIterator<T> {
-  Future<T?> get nextOrNull async {
-    if (await moveNext()) {
-      return current;
-    } else {
-      return null;
+    // If we have pending data to serve, do that before we start resuming the
+    // stream subscription
+    final waiting = _waitingForDispatch;
+    if (waiting != null) {
+      final offset = _offsetInWaiting;
+      assert(waiting.length - offset >= blockSize);
+
+      final end = offset + blockSize;
+      final result = Future<Uint8List>.value(waiting.sublistView(offset, end));
+
+      if (end == waiting.length) {
+        // We just completed all data waiting for dispatch
+        _waitingForDispatch = null;
+        _offsetInWaiting = 0;
+      } else {
+        // Store new pending data
+        _saveUndispatchedState(waiting, offset + blockSize);
+      }
+      return result;
     }
+
+    // Note: Our logic in _emitBlocks depends on this completer NOT being sync
+    final next = _nextBlockCompleter = Completer();
+    _listenOrResume();
+    return next.future;
   }
 
-  Stream<T> nextElements(int count) {
-    if (count == 0) return const Stream<Never>.empty();
+  Stream<Uint8List> nextBlocks(int amount) {
+    _checkIdle('nextBlocks');
 
-    var remaining = count;
-    Future<Object?>? pendingOperation;
-    final controller = StreamController<T>(sync: true);
+    if (amount == 0 || _closed) return const Stream.empty();
 
-    void addNewEvent() {
-      if (pendingOperation == null && remaining > 0) {
-        // Start fetching data
-        final fetchNext = nextOrNull.then<Object?>((value) {
-          remaining--;
-          if (value != null) {
-            // Got data, let's add it
-            controller.add(value);
-          }
+    // This controller must not be sync: We add events in response to onListen
+    // and expect a delay after closing it in _emitBlocks
+    final controller = _nextBlocksStream = StreamController();
+    _remainingBlocksInStream = amount;
 
-          if (value == null || remaining == 0) {
-            // Finished, so close the controller
-            return controller.close();
-          } else {
-            return Future.value();
-          }
-        }, onError: controller.addError);
+    void onListenOrResume() {
+      // Once again, check if we have pending data to serve
+      final waiting = _waitingForDispatch;
+      if (waiting != null) {
+        final offset = _offsetInWaiting;
+        final blocksWaiting = (waiting.length - offset) ~/ blockSize;
+        assert(blocksWaiting > 0);
 
-        pendingOperation = fetchNext.whenComplete(() {
-          pendingOperation = null;
-          if (!controller.isPaused && remaining > 0) {
-            // Controller isn't paused, so add another event
-            addNewEvent();
-          }
-        });
+        final blocksToEmit = min(blocksWaiting, amount);
+        final end = offset + blocksToEmit * blockSize;
+        final done =
+            _emitBlocks(waiting.sublistView(offset, end), amount: blocksToEmit);
+        _saveUndispatchedState(waiting, end);
+        if (!done) {
+          _listenOrResume();
+        }
+      } else {
+        // No data waiting to be served, so we definitely have to start
+        // listening
+        _listenOrResume();
       }
     }
 
     controller
-      ..onListen = addNewEvent
-      ..onResume = addNewEvent;
-
+      ..onListen = onListenOrResume
+      ..onPause = () {
+        final sub = _inputSubscription;
+        if (!sub!.isPaused) sub.pause();
+      }
+      ..onResume = onListenOrResume;
     return controller.stream;
   }
+
+  Future<void> close() async {
+    if (_closed) return;
+
+    _closed = true;
+    await _inputSubscription?.cancel();
+
+    _nextBlockCompleter?.complete(emptyUint8List);
+
+    final stream = _nextBlocksStream;
+    if (stream != null) {
+      await stream.close();
+    }
+  }
+}
+
+extension on StreamController<dynamic> {
+  bool get isPausedOrClosed => isPaused || isClosed;
 }
diff --git a/pubspec.yaml b/pubspec.yaml
index 8886a5d..1e60694 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -10,7 +10,6 @@
   async: ^2.6.0
   meta: ^1.3.0
   typed_data: ^1.3.0
-  stream_transform: ^2.0.0
 
 dev_dependencies:
   charcode: ^1.2.0