Roll our own stream chunked reader
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 561163d..6c54d3b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,7 +1,7 @@
 ## 0.4.0
 
 - Support generating tar files with GNU-style long link names
- - Add `format` parameter to `tarWritingSink` and `tarTransformerWith`
+ - Add `format` parameter to `tarWritingSink` and `tarWriterWith`
 
 ## 0.3.3
 
diff --git a/example/main.dart b/example/main.dart
index 65cab46..6dfd101 100644
--- a/example/main.dart
+++ b/example/main.dart
@@ -1,3 +1,4 @@
+import 'dart:async';
 import 'dart:convert';
 import 'dart:io';
 
@@ -5,7 +6,8 @@
 
 Future<void> main() async {
   // Start reading a tar file
-  final reader = TarReader(File('reference/gnu.tar').openRead());
+  final reader =
+      TarReader(File('reference/neats_test/gnu-incremental.tar').openRead());
 
   while (await reader.moveNext()) {
     final header = reader.current.header;
@@ -13,7 +15,10 @@
 
     // Print the output if it's a regular file
     if (header.typeFlag == TypeFlag.reg) {
-      await reader.current.contents.transform(utf8.decoder).forEach(print);
+      await reader.current.contents.forEach((chunk) {
+        print(chunk.length);
+        print(utf8.decode(chunk));
+      });
     }
   }
 
diff --git a/lib/src/constants.dart b/lib/src/constants.dart
index aac7669..312a245 100644
--- a/lib/src/constants.dart
+++ b/lib/src/constants.dart
@@ -258,3 +258,4 @@
 
 /// A full TAR block of zeros.
 final zeroBlock = Uint8List(blockSize);
+final emptyUint8List = Uint8List(0);
diff --git a/lib/src/reader.dart b/lib/src/reader.dart
index 713235a..f1877aa 100644
--- a/lib/src/reader.dart
+++ b/lib/src/reader.dart
@@ -3,7 +3,6 @@
 import 'dart:convert';
 import 'dart:typed_data';
 
-import 'package:async/async.dart';
 import 'package:meta/meta.dart';
 import 'package:typed_data/typed_data.dart';
 
@@ -23,13 +22,10 @@
 @sealed
 class TarReader implements StreamIterator<TarEntry> {
   /// A chunked stream iterator to enable us to get our data.
-  final ChunkedStreamReader<int> _chunkedStream;
+  final StreamIterator<Uint8List> _chunkedStream;
   final PaxHeaders _paxHeaders = PaxHeaders();
   final int _maxSpecialFileSize;
 
-  /// Skip the next [_skipNext] elements when reading in the stream.
-  int _skipNext = 0;
-
   TarEntry? _current;
 
   /// The underlying content stream for the [_current] entry. Draining this
@@ -88,7 +84,7 @@
   TarReader(Stream<List<int>> tarStream,
       {int maxSpecialFileSize = defaultSpecialLength,
       bool disallowTrailingData = false})
-      : _chunkedStream = ChunkedStreamReader(tarStream),
+      : _chunkedStream = StreamIterator(tarStream.inChunks),
         _checkNoTrailingData = disallowTrailingData,
         _maxSpecialFileSize = maxSpecialFileSize;
 
@@ -152,13 +148,7 @@
     // iterates through one or more "header files" until it finds a
     // "normal file".
     while (true) {
-      if (_skipNext > 0) {
-        await _readFullBlock(_skipNext);
-        _skipNext = 0;
-      }
-
-      final rawHeader =
-          await _readFullBlock(blockSize, allowEmpty: eofAcceptable);
+      final rawHeader = await _readFullBlock(allowEmpty: eofAcceptable);
 
       nextHeader = await _readHeader(rawHeader);
       if (nextHeader == null) {
@@ -179,20 +169,19 @@
       if (nextHeader.typeFlag == TypeFlag.xHeader ||
           nextHeader.typeFlag == TypeFlag.xGlobalHeader) {
         format = format.mayOnlyBe(TarFormat.pax);
-        final paxHeaderSize = _checkSpecialSize(nextHeader.size);
-        final rawPaxHeaders = await _readFullBlock(paxHeaderSize);
+        final paxHeaderBlocks = await _readFullBlocks(
+            _checkSpecialBlocks(numBlocks(nextHeader.size)));
+        final rawPaxHeaders = paxHeaderBlocks.sublistView(0, nextHeader.size);
 
         _paxHeaders.readPaxHeaders(
             rawPaxHeaders, nextHeader.typeFlag == TypeFlag.xGlobalHeader);
-        _markPaddingToSkip(paxHeaderSize);
-
-        // This is a meta header affecting the next header.
+        // This is a meta header affecting the next header
         continue;
       } else if (nextHeader.typeFlag == TypeFlag.gnuLongLink ||
           nextHeader.typeFlag == TypeFlag.gnuLongName) {
         format = format.mayOnlyBe(TarFormat.gnu);
-        final realName = await _readFullBlock(
-            _checkSpecialSize(nextBlockSize(nextHeader.size)));
+        final realName = await _readFullBlocks(
+            _checkSpecialBlocks(numBlocks(nextHeader.size)));
 
         final readName = realName.readString(0, realName.length);
         if (nextHeader.typeFlag == TypeFlag.gnuLongName) {
@@ -300,13 +289,14 @@
     }
   }
 
-  int _checkSpecialSize(int size) {
+  int _checkSpecialBlocks(int count) {
+    final size = blockSize * count;
     if (size > _maxSpecialFileSize) {
       throw TarException(
           'TAR file contains hidden entry with an invalid size of $size.');
     }
 
-    return size;
+    return count;
   }
 
   /// Ater we detected the end of a tar file, optionally check for trailing
@@ -314,15 +304,15 @@
   Future<void> _handleExpectedEof() async {
     if (_checkNoTrailingData) {
       // Trailing zeroes are okay, but don't allow any more data here.
-      Uint8List block;
+      Uint8List? block;
 
       do {
-        block = await _chunkedStream.readBytes(blockSize);
-        if (!block.isAllZeroes) {
+        block = await _chunkedStream.nextOrNull;
+        if (block != null && !block.isAllZeroes) {
           throw TarException(
               'Illegal content after the end of the tar archive.');
         }
-      } while (block.length == blockSize);
+      } while (block != null && block.length == blockSize);
       // The stream is done when we couldn't read the full block.
     }
 
@@ -333,17 +323,31 @@
     throw TarException.header('Unexpected end of file');
   }
 
-  /// Reads a block with the requested [size], or throws an unexpected EoF
-  /// exception.
-  Future<Uint8List> _readFullBlock(int size, {bool allowEmpty = false}) async {
-    final block = await _chunkedStream.readBytes(size);
-    if (block.length != size && !(allowEmpty && block.isEmpty)) {
+  /// Reads a 512-byte block or throws if [allowEmpty] is false.
+  Future<Uint8List> _readFullBlock({bool allowEmpty = false}) async {
+    final block = await _chunkedStream.nextOrNull ?? emptyUint8List;
+    if (block.length != blockSize && !(allowEmpty && block.isEmpty)) {
       _unexpectedEof();
     }
 
     return block;
   }
 
+  Future<Uint8List> _readFullBlocks(int amount) async {
+    final result = Uint8List(blockSize * amount);
+
+    for (var i = 0; i < amount; i++) {
+      final chunk = await _chunkedStream.nextOrNull;
+      if (chunk == null || chunk.length != blockSize) {
+        _unexpectedEof();
+      }
+
+      result.setAll(i * blockSize, chunk);
+    }
+
+    return result;
+  }
+
   /// Reads the next block header and assumes that the underlying reader
   /// is already aligned to a block boundary. It returns the raw block of the
   /// header in case further processing is required.
@@ -357,12 +361,12 @@
     if (rawHeader.isEmpty) return null;
 
     if (rawHeader.isAllZeroes) {
-      rawHeader = await _chunkedStream.readBytes(blockSize);
+      final next = await _chunkedStream.nextOrNull;
 
       // Exactly 1 block of zeroes is read and EOF is hit.
-      if (rawHeader.isEmpty) return null;
+      if (next == null) return null;
 
-      if (rawHeader.isAllZeroes) {
+      if (next.isAllZeroes) {
         // Two blocks of zeros are read - Normal EOF.
         return null;
       }
@@ -393,9 +397,9 @@
       final sparseDataLength =
           sparseData.fold<int>(0, (value, element) => value + element.length);
 
-      final streamLength = nextBlockSize(sparseDataLength);
-      final safeStream =
-          _publishStream(_chunkedStream.readStream(streamLength), streamLength);
+      final streamBlockCount = numBlocks(sparseDataLength);
+      final safeStream = _publishStream(
+          _chunkedStream.nextElements(streamBlockCount), sparseDataLength);
       return sparseStream(safeStream, sparseHoles, header.size);
     } else {
       var size = header.size;
@@ -408,9 +412,9 @@
       if (size == 0) {
         return _publishStream(const Stream<Never>.empty(), 0);
       } else {
-        _markPaddingToSkip(size);
+        final blockCount = numBlocks(header.size);
         return _publishStream(
-            _chunkedStream.readStream(header.size), header.size);
+            _chunkedStream.nextElements(blockCount), header.size);
       }
     }
   }
@@ -443,15 +447,6 @@
     });
   }
 
-  /// Skips to the next block after reading [readSize] bytes from the beginning
-  /// of a previous block.
-  void _markPaddingToSkip(int readSize) {
-    final offsetInLastBlock = readSize.toUnsigned(blockSizeLog2);
-    if (offsetInLastBlock != 0) {
-      _skipNext = blockSize - offsetInLastBlock;
-    }
-  }
-
   /// Checks the PAX headers for GNU sparse headers.
   /// If they are found, then this function reads the sparse map and returns it.
   /// This assumes that 0.0 headers have already been converted to 0.1 headers
@@ -519,8 +514,8 @@
     /// Ensures that [block] h as at least [n] tokens.
     Future<void> feedTokens(int n) async {
       while (newLineCount < n) {
-        final newBlock = await _chunkedStream.readBytes(blockSize);
-        if (newBlock.length < blockSize) {
+        final newBlock = await _chunkedStream.nextOrNull;
+        if (newBlock == null || newBlock.length < blockSize) {
           throw TarException.header(
               'GNU Sparse Map does not have enough lines!');
         }
@@ -629,7 +624,7 @@
   /// Thus, this function will read from the chunked stream iterator to fetch
   /// extra headers.
   ///
-  /// See also: https://www.gnu.org/software/tar/manual/html_section/tar_94.html#SEC191
+  /// See also: https://www.gnu.org/software/tar/manual/tar.html#Old-GNU-Format
   Future<List<SparseEntry>> _readOldGNUSparseMap(
       HeaderImpl header, Uint8List rawHeader) async {
     // Make sure that the input format is GNU.
@@ -639,54 +634,46 @@
       throw TarException.header('Tried to read sparse map of non-GNU header');
     }
 
+    // Read the real size of the file when sparse holes are expanded.
     header.size = rawHeader.readNumeric(483, 12);
-    final sparseMaps = <Uint8List>[];
+    final sparseEntries = <SparseEntry>[];
 
-    var sparse = rawHeader.sublistView(386, 483);
-    sparseMaps.add(sparse);
+    bool readEntry(Uint8List source, int offset) {
+      // If a sparse header starts with a null byte, it marks the end of the
+      // sparse structures.
+      if (rawHeader[offset] == 0) return false;
 
-    while (true) {
-      final maxEntries = sparse.length ~/ 24;
-      if (sparse[24 * maxEntries] > 0) {
-        // If there are more entries, read an extension header and parse its
-        // entries.
-        sparse = await _chunkedStream.readBytes(blockSize);
-        sparseMaps.add(sparse);
-        continue;
+      final fileOffset = source.readNumeric(offset, 12);
+      final length = source.readNumeric(offset + 12, 12);
+
+      sparseEntries.add(SparseEntry(fileOffset, length));
+      return true;
+    }
+
+    // The first four sparse headers are stored in the tar header itself
+    for (var i = 0; i < 4; i++) {
+      final offset = 386 + 24 * i;
+      if (!readEntry(rawHeader, offset)) break;
+    }
+    var isExtended = rawHeader[482] != 0;
+
+    while (isExtended) {
+      // Ok, we have a new block of sparse headers to process
+      final block = await _chunkedStream.nextOrNull;
+      if (block == null || block.length < blockSize) {
+        throw TarException.header('Unexpected EoF while reading sparse maps');
       }
 
-      break;
-    }
-
-    try {
-      return _processOldGNUSparseMap(sparseMaps);
-    } on FormatException {
-      throw TarException('Invalid old GNU Sparse Map');
-    }
-  }
-
-  /// Process [sparseMaps], which is known to be an OLD GNU v0.1 sparse map.
-  ///
-  /// For details, see https://www.gnu.org/software/tar/manual/html_section/tar_94.html#SEC191
-  List<SparseEntry> _processOldGNUSparseMap(List<Uint8List> sparseMaps) {
-    final sparseData = <SparseEntry>[];
-
-    for (final sparseMap in sparseMaps) {
-      final maxEntries = sparseMap.length ~/ 24;
-      for (var i = 0; i < maxEntries; i++) {
-        // This termination condition is identical to GNU and BSD tar.
-        if (sparseMap[i * 24] == 0) {
-          // Don't return, need to process extended headers (even if empty)
-          break;
-        }
-
-        final offset = sparseMap.readNumeric(i * 24, 12);
-        final length = sparseMap.readNumeric(i * 24 + 12, 12);
-
-        sparseData.add(SparseEntry(offset, length));
+      // A full block of sparse data contains up to 21 entries
+      for (var i = 0; i < 21; i++) {
+        if (!readEntry(block, i * 24)) break;
       }
+
+      // The last bytes indicates whether another sparse header block follows.
+      isExtended = block[504] != 0;
     }
-    return sparseData;
+
+    return sparseEntries;
   }
 }
 
@@ -864,27 +851,41 @@
 /// [ChunkedStreamReader.readStream] might return a stream shorter than
 /// expected. That indicates an invalid tar file though, since the correct size
 /// is stored in the header.
-class _OutgoingStreamGuard extends EventSink<List<int>> {
-  final int expectedSize;
+class _OutgoingStreamGuard extends EventSink<Uint8List> {
+  final int size;
+  final int expectedBlockCount;
   final EventSink<List<int>> out;
   void Function() onDone;
 
-  int emittedSize = 0;
+  int emittedBlocks = 0;
   bool hadError = false;
 
-  _OutgoingStreamGuard(this.expectedSize, this.out, this.onDone);
+  _OutgoingStreamGuard(this.size, this.out, this.onDone)
+      : expectedBlockCount = numBlocks(size);
 
   @override
-  void add(List<int> event) {
-    emittedSize += event.length;
+  void add(Uint8List event) {
+    if (event.length != blockSize) {
+      addError(TarException('Unexpected end of tar file'), StackTrace.current);
+      return;
+    }
+
+    emittedBlocks++;
     // We have checks limiting the length of outgoing streams. If the stream is
     // larger than expected, that's a bug in pkg:tar.
     assert(
-        emittedSize <= expectedSize,
-        'Stream now emitted $emittedSize bytes, but only expected '
-        '$expectedSize');
+        emittedBlocks <= expectedBlockCount,
+        'Stream now emitted $emittedBlocks blocks, but only expected '
+        '$expectedBlockCount');
 
-    out.add(event);
+    if (emittedBlocks == expectedBlockCount) {
+      // This is the last block, which we may have to truncate because tar
+      // blocks are padded with zeroes at the end
+      out.add(event.sublistView(0, size.toUnsigned(blockSizeLog2)));
+    } else {
+      // This blocks just goes out fully
+      out.add(event);
+    }
   }
 
   @override
@@ -899,7 +900,7 @@
 
     // If the stream stopped after an error, the user is already aware that
     // something is wrong.
-    if (emittedSize < expectedSize && !hadError) {
+    if (emittedBlocks < expectedBlockCount && !hadError) {
       out.addError(
           TarException('Unexpected end of tar file'), StackTrace.current);
     }
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index a6b7e13..1a0b018 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -1,3 +1,4 @@
+import 'dart:async';
 import 'dart:convert';
 import 'dart:math';
 import 'dart:typed_data';
@@ -229,3 +230,227 @@
     yield Uint8List(remainingBytes);
   }
 }
+
+Stream<Uint8List> _inChunks(Stream<List<int>> input) {
+  Uint8List? startedChunk;
+  int missingForNextChunk = 0;
+
+  Uint8List? pendingEvent;
+  int offsetInPendingEvent = 0;
+
+  late StreamSubscription<List<int>> inputSubscription;
+  final controller = StreamController<Uint8List>(sync: true);
+
+  var isResuming = false;
+
+  void startChunk(Uint8List source, int startOffset) {
+    assert(startedChunk == null);
+    final availableData = source.length - startOffset;
+    assert(availableData < blockSize);
+
+    startedChunk = Uint8List(blockSize)
+      ..setAll(0, source.sublistView(startOffset));
+    missingForNextChunk = blockSize - availableData;
+  }
+
+  void handleData(List<int> data) {
+    assert(pendingEvent == null,
+        'Had pending events while receiving new data from source.');
+    final typedData = data.asUint8List();
+
+    // The start offset of the new data that we didn't process yet.
+    var offsetInData = 0;
+    bool saveStateIfPaused() {
+      if (controller.isPausedOrClosed) {
+        pendingEvent = typedData;
+        offsetInPendingEvent = offsetInData;
+        return true;
+      }
+      return false;
+    }
+
+    // Try completing the pending chunk first, if it exists
+    if (startedChunk != null) {
+      final started = startedChunk!;
+      final startOffsetInStarted = blockSize - missingForNextChunk;
+
+      if (data.length >= missingForNextChunk) {
+        // Fill up the chunk, then go on with the remaining data
+        started.setAll(startOffsetInStarted,
+            typedData.sublistView(0, missingForNextChunk));
+        controller.add(started);
+        offsetInData += missingForNextChunk;
+
+        // We just finished serving the started chunk, so reset that
+        startedChunk = null;
+        missingForNextChunk = 0;
+
+        // If the controller was paused in a response to the add, stop serving
+        // events and store the rest of the input for later
+        if (saveStateIfPaused()) return;
+      } else {
+        // Ok, we can't finish the pending chunk with the new data but at least
+        // we can continue filling it up
+        started.setAll(startOffsetInStarted, typedData);
+        missingForNextChunk -= typedData.length;
+        return;
+      }
+    }
+
+    // The started chunk has been completed, continue by adding chunks as they
+    // come.
+    assert(startedChunk == null);
+
+    while (offsetInData < typedData.length) {
+      // Can we serve a full block from the new event
+      if (offsetInData <= typedData.length - blockSize) {
+        // Yup, then let's do that
+        final end = offsetInData + blockSize;
+        controller.add(typedData.sublistView(offsetInData, end));
+        offsetInData = end;
+
+        // Once again, stop and save state if the controller was paused.
+        if (saveStateIfPaused()) return;
+      } else {
+        // Ok, no full block but we can start a new pending chunk
+        startChunk(typedData, offsetInData);
+        break;
+      }
+    }
+  }
+
+  void startResume() {
+    isResuming = false;
+    if (controller.isPausedOrClosed) return;
+
+    // Start dispatching pending events before we resume the subscription on the
+    // main stream.
+    if (pendingEvent != null) {
+      final pending = pendingEvent!;
+      assert(startedChunk == null, 'Had pending events and a started chunk');
+
+      while (offsetInPendingEvent < pending.length) {
+        // Can we serve a full block from pending data?
+        if (offsetInPendingEvent <= pending.length - blockSize) {
+          final end = offsetInPendingEvent + blockSize;
+          controller.add(pending.sublistView(offsetInPendingEvent, end));
+          offsetInPendingEvent = end;
+
+          // Pause if we got a pause request in response to the add
+          if (controller.isPausedOrClosed) return;
+        } else {
+          // Store pending block that we can't finish with the pending chunk.
+          startChunk(pending, offsetInPendingEvent);
+          break;
+        }
+      }
+
+      // Pending events have been dispatched
+      offsetInPendingEvent = 0;
+      pendingEvent = null;
+    }
+
+    // When we're here, all pending events have been dispatched and the
+    // controller is not paused. Let's continue then!
+    inputSubscription.resume();
+  }
+
+  void onDone() {
+    // Input stream is done. If we have a block left over, emit that now
+    if (startedChunk != null) {
+      controller
+          .add(startedChunk!.sublistView(0, blockSize - missingForNextChunk));
+    }
+    controller.close();
+  }
+
+  controller
+    ..onListen = () {
+      inputSubscription = input.listen(handleData,
+          onError: controller.addError, onDone: onDone);
+    }
+    ..onPause = () {
+      if (!inputSubscription.isPaused) {
+        inputSubscription.pause();
+      }
+    }
+    ..onResume = () {
+      // This is a bit hacky. Our subscription is most likely going to be a
+      // _BufferingStreamSubscription which will buffer events that were added
+      // in this callback. However, we really want to know when the subscription
+      // has been paused in response to a new event because we can handle that
+      // very efficiently.
+      if (!isResuming) {
+        scheduleMicrotask(startResume);
+      }
+      isResuming = true;
+    }
+    ..onCancel = () {
+      inputSubscription.cancel();
+    };
+
+  return controller.stream;
+}
+
+extension on StreamController<dynamic> {
+  bool get isPausedOrClosed => isPaused || isClosed;
+}
+
+extension ReadInChunks on Stream<List<int>> {
+  /// A stream emitting values in chunks of `512` byte blocks.
+  ///
+  /// The last emitted chunk may be shorter than the regular block length.
+  Stream<Uint8List> get inChunks => _inChunks(this);
+}
+
+extension NextOrNull<T extends Object> on StreamIterator<T> {
+  Future<T?> get nextOrNull async {
+    if (await moveNext()) {
+      return current;
+    } else {
+      return null;
+    }
+  }
+
+  Stream<T> nextElements(int count) {
+    if (count == 0) return const Stream<Never>.empty();
+
+    var remaining = count;
+    Future<Object?>? pendingOperation;
+    final controller = StreamController<T>(sync: true);
+
+    void addNewEvent() {
+      if (pendingOperation == null && remaining > 0) {
+        // Start fetching data
+        final fetchNext = nextOrNull.then<Object?>((value) {
+          remaining--;
+          if (value != null) {
+            // Got data, let's add it
+            controller.add(value);
+          }
+
+          if (value == null || remaining == 0) {
+            // Finished, so close the controller
+            return controller.close();
+          } else {
+            return Future.value();
+          }
+        }, onError: controller.addError);
+
+        pendingOperation = fetchNext.whenComplete(() {
+          pendingOperation = null;
+          if (!controller.isPaused && remaining > 0) {
+            // Controller isn't paused, so add another event
+            addNewEvent();
+          }
+        });
+      }
+    }
+
+    controller
+      ..onListen = addNewEvent
+      ..onResume = addNewEvent;
+
+    return controller.stream;
+  }
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index 1e60694..8886a5d 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -10,6 +10,7 @@
   async: ^2.6.0
   meta: ^1.3.0
   typed_data: ^1.3.0
+  stream_transform: ^2.0.0
 
 dev_dependencies:
   charcode: ^1.2.0
diff --git a/test/utils_test.dart b/test/utils_test.dart
index 32de010..e8d5ea2 100644
--- a/test/utils_test.dart
+++ b/test/utils_test.dart
@@ -183,6 +183,10 @@
       });
     }
   });
+
+  group('inChunks', () {
+    test('keeps data in tact', () {});
+  });
 }
 
 Uint8List _bytes(String str) {