Roll our own stream chunked reader
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 561163d..6c54d3b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,7 +1,7 @@
## 0.4.0
- Support generating tar files with GNU-style long link names
- - Add `format` parameter to `tarWritingSink` and `tarTransformerWith`
+ - Add `format` parameter to `tarWritingSink` and `tarWriterWith`
## 0.3.3
diff --git a/example/main.dart b/example/main.dart
index 65cab46..6dfd101 100644
--- a/example/main.dart
+++ b/example/main.dart
@@ -1,3 +1,4 @@
+import 'dart:async';
import 'dart:convert';
import 'dart:io';
@@ -5,7 +6,8 @@
Future<void> main() async {
// Start reading a tar file
- final reader = TarReader(File('reference/gnu.tar').openRead());
+ final reader =
+ TarReader(File('reference/neats_test/gnu-incremental.tar').openRead());
while (await reader.moveNext()) {
final header = reader.current.header;
@@ -13,7 +15,10 @@
// Print the output if it's a regular file
if (header.typeFlag == TypeFlag.reg) {
- await reader.current.contents.transform(utf8.decoder).forEach(print);
+ await reader.current.contents.forEach((chunk) {
+ print(chunk.length);
+ print(utf8.decode(chunk));
+ });
}
}
diff --git a/lib/src/constants.dart b/lib/src/constants.dart
index aac7669..312a245 100644
--- a/lib/src/constants.dart
+++ b/lib/src/constants.dart
@@ -258,3 +258,4 @@
/// A full TAR block of zeros.
final zeroBlock = Uint8List(blockSize);
+final emptyUint8List = Uint8List(0);
diff --git a/lib/src/reader.dart b/lib/src/reader.dart
index 713235a..f1877aa 100644
--- a/lib/src/reader.dart
+++ b/lib/src/reader.dart
@@ -3,7 +3,6 @@
import 'dart:convert';
import 'dart:typed_data';
-import 'package:async/async.dart';
import 'package:meta/meta.dart';
import 'package:typed_data/typed_data.dart';
@@ -23,13 +22,10 @@
@sealed
class TarReader implements StreamIterator<TarEntry> {
/// A chunked stream iterator to enable us to get our data.
- final ChunkedStreamReader<int> _chunkedStream;
+ final StreamIterator<Uint8List> _chunkedStream;
final PaxHeaders _paxHeaders = PaxHeaders();
final int _maxSpecialFileSize;
- /// Skip the next [_skipNext] elements when reading in the stream.
- int _skipNext = 0;
-
TarEntry? _current;
/// The underlying content stream for the [_current] entry. Draining this
@@ -88,7 +84,7 @@
TarReader(Stream<List<int>> tarStream,
{int maxSpecialFileSize = defaultSpecialLength,
bool disallowTrailingData = false})
- : _chunkedStream = ChunkedStreamReader(tarStream),
+ : _chunkedStream = StreamIterator(tarStream.inChunks),
_checkNoTrailingData = disallowTrailingData,
_maxSpecialFileSize = maxSpecialFileSize;
@@ -152,13 +148,7 @@
// iterates through one or more "header files" until it finds a
// "normal file".
while (true) {
- if (_skipNext > 0) {
- await _readFullBlock(_skipNext);
- _skipNext = 0;
- }
-
- final rawHeader =
- await _readFullBlock(blockSize, allowEmpty: eofAcceptable);
+ final rawHeader = await _readFullBlock(allowEmpty: eofAcceptable);
nextHeader = await _readHeader(rawHeader);
if (nextHeader == null) {
@@ -179,20 +169,19 @@
if (nextHeader.typeFlag == TypeFlag.xHeader ||
nextHeader.typeFlag == TypeFlag.xGlobalHeader) {
format = format.mayOnlyBe(TarFormat.pax);
- final paxHeaderSize = _checkSpecialSize(nextHeader.size);
- final rawPaxHeaders = await _readFullBlock(paxHeaderSize);
+ final paxHeaderBlocks = await _readFullBlocks(
+ _checkSpecialBlocks(numBlocks(nextHeader.size)));
+ final rawPaxHeaders = paxHeaderBlocks.sublistView(0, nextHeader.size);
_paxHeaders.readPaxHeaders(
rawPaxHeaders, nextHeader.typeFlag == TypeFlag.xGlobalHeader);
- _markPaddingToSkip(paxHeaderSize);
-
- // This is a meta header affecting the next header.
+ // This is a meta header affecting the next header
continue;
} else if (nextHeader.typeFlag == TypeFlag.gnuLongLink ||
nextHeader.typeFlag == TypeFlag.gnuLongName) {
format = format.mayOnlyBe(TarFormat.gnu);
- final realName = await _readFullBlock(
- _checkSpecialSize(nextBlockSize(nextHeader.size)));
+ final realName = await _readFullBlocks(
+ _checkSpecialBlocks(numBlocks(nextHeader.size)));
final readName = realName.readString(0, realName.length);
if (nextHeader.typeFlag == TypeFlag.gnuLongName) {
@@ -300,13 +289,14 @@
}
}
- int _checkSpecialSize(int size) {
+ int _checkSpecialBlocks(int count) {
+ final size = blockSize * count;
if (size > _maxSpecialFileSize) {
throw TarException(
'TAR file contains hidden entry with an invalid size of $size.');
}
- return size;
+ return count;
}
/// Ater we detected the end of a tar file, optionally check for trailing
@@ -314,15 +304,15 @@
Future<void> _handleExpectedEof() async {
if (_checkNoTrailingData) {
// Trailing zeroes are okay, but don't allow any more data here.
- Uint8List block;
+ Uint8List? block;
do {
- block = await _chunkedStream.readBytes(blockSize);
- if (!block.isAllZeroes) {
+ block = await _chunkedStream.nextOrNull;
+ if (block != null && !block.isAllZeroes) {
throw TarException(
'Illegal content after the end of the tar archive.');
}
- } while (block.length == blockSize);
+ } while (block != null && block.length == blockSize);
// The stream is done when we couldn't read the full block.
}
@@ -333,17 +323,31 @@
throw TarException.header('Unexpected end of file');
}
- /// Reads a block with the requested [size], or throws an unexpected EoF
- /// exception.
- Future<Uint8List> _readFullBlock(int size, {bool allowEmpty = false}) async {
- final block = await _chunkedStream.readBytes(size);
- if (block.length != size && !(allowEmpty && block.isEmpty)) {
+ /// Reads a 512-byte block or throws if [allowEmpty] is false.
+ Future<Uint8List> _readFullBlock({bool allowEmpty = false}) async {
+ final block = await _chunkedStream.nextOrNull ?? emptyUint8List;
+ if (block.length != blockSize && !(allowEmpty && block.isEmpty)) {
_unexpectedEof();
}
return block;
}
+ Future<Uint8List> _readFullBlocks(int amount) async {
+ final result = Uint8List(blockSize * amount);
+
+ for (var i = 0; i < amount; i++) {
+ final chunk = await _chunkedStream.nextOrNull;
+ if (chunk == null || chunk.length != blockSize) {
+ _unexpectedEof();
+ }
+
+ result.setAll(i * blockSize, chunk);
+ }
+
+ return result;
+ }
+
/// Reads the next block header and assumes that the underlying reader
/// is already aligned to a block boundary. It returns the raw block of the
/// header in case further processing is required.
@@ -357,12 +361,12 @@
if (rawHeader.isEmpty) return null;
if (rawHeader.isAllZeroes) {
- rawHeader = await _chunkedStream.readBytes(blockSize);
+ final next = await _chunkedStream.nextOrNull;
// Exactly 1 block of zeroes is read and EOF is hit.
- if (rawHeader.isEmpty) return null;
+ if (next == null) return null;
- if (rawHeader.isAllZeroes) {
+ if (next.isAllZeroes) {
// Two blocks of zeros are read - Normal EOF.
return null;
}
@@ -393,9 +397,9 @@
final sparseDataLength =
sparseData.fold<int>(0, (value, element) => value + element.length);
- final streamLength = nextBlockSize(sparseDataLength);
- final safeStream =
- _publishStream(_chunkedStream.readStream(streamLength), streamLength);
+ final streamBlockCount = numBlocks(sparseDataLength);
+ final safeStream = _publishStream(
+ _chunkedStream.nextElements(streamBlockCount), sparseDataLength);
return sparseStream(safeStream, sparseHoles, header.size);
} else {
var size = header.size;
@@ -408,9 +412,9 @@
if (size == 0) {
return _publishStream(const Stream<Never>.empty(), 0);
} else {
- _markPaddingToSkip(size);
+ final blockCount = numBlocks(header.size);
return _publishStream(
- _chunkedStream.readStream(header.size), header.size);
+ _chunkedStream.nextElements(blockCount), header.size);
}
}
}
@@ -443,15 +447,6 @@
});
}
- /// Skips to the next block after reading [readSize] bytes from the beginning
- /// of a previous block.
- void _markPaddingToSkip(int readSize) {
- final offsetInLastBlock = readSize.toUnsigned(blockSizeLog2);
- if (offsetInLastBlock != 0) {
- _skipNext = blockSize - offsetInLastBlock;
- }
- }
-
/// Checks the PAX headers for GNU sparse headers.
/// If they are found, then this function reads the sparse map and returns it.
/// This assumes that 0.0 headers have already been converted to 0.1 headers
@@ -519,8 +514,8 @@
/// Ensures that [block] h as at least [n] tokens.
Future<void> feedTokens(int n) async {
while (newLineCount < n) {
- final newBlock = await _chunkedStream.readBytes(blockSize);
- if (newBlock.length < blockSize) {
+ final newBlock = await _chunkedStream.nextOrNull;
+ if (newBlock == null || newBlock.length < blockSize) {
throw TarException.header(
'GNU Sparse Map does not have enough lines!');
}
@@ -629,7 +624,7 @@
/// Thus, this function will read from the chunked stream iterator to fetch
/// extra headers.
///
- /// See also: https://www.gnu.org/software/tar/manual/html_section/tar_94.html#SEC191
+ /// See also: https://www.gnu.org/software/tar/manual/tar.html#Old-GNU-Format
Future<List<SparseEntry>> _readOldGNUSparseMap(
HeaderImpl header, Uint8List rawHeader) async {
// Make sure that the input format is GNU.
@@ -639,54 +634,46 @@
throw TarException.header('Tried to read sparse map of non-GNU header');
}
+ // Read the real size of the file when sparse holes are expanded.
header.size = rawHeader.readNumeric(483, 12);
- final sparseMaps = <Uint8List>[];
+ final sparseEntries = <SparseEntry>[];
- var sparse = rawHeader.sublistView(386, 483);
- sparseMaps.add(sparse);
+ bool readEntry(Uint8List source, int offset) {
+ // If a sparse header starts with a null byte, it marks the end of the
+ // sparse structures.
+ if (rawHeader[offset] == 0) return false;
- while (true) {
- final maxEntries = sparse.length ~/ 24;
- if (sparse[24 * maxEntries] > 0) {
- // If there are more entries, read an extension header and parse its
- // entries.
- sparse = await _chunkedStream.readBytes(blockSize);
- sparseMaps.add(sparse);
- continue;
+ final fileOffset = source.readNumeric(offset, 12);
+ final length = source.readNumeric(offset + 12, 12);
+
+ sparseEntries.add(SparseEntry(fileOffset, length));
+ return true;
+ }
+
+ // The first four sparse headers are stored in the tar header itself
+ for (var i = 0; i < 4; i++) {
+ final offset = 386 + 24 * i;
+ if (!readEntry(rawHeader, offset)) break;
+ }
+ var isExtended = rawHeader[482] != 0;
+
+ while (isExtended) {
+ // Ok, we have a new block of sparse headers to process
+ final block = await _chunkedStream.nextOrNull;
+ if (block == null || block.length < blockSize) {
+ throw TarException.header('Unexpected EoF while reading sparse maps');
}
- break;
- }
-
- try {
- return _processOldGNUSparseMap(sparseMaps);
- } on FormatException {
- throw TarException('Invalid old GNU Sparse Map');
- }
- }
-
- /// Process [sparseMaps], which is known to be an OLD GNU v0.1 sparse map.
- ///
- /// For details, see https://www.gnu.org/software/tar/manual/html_section/tar_94.html#SEC191
- List<SparseEntry> _processOldGNUSparseMap(List<Uint8List> sparseMaps) {
- final sparseData = <SparseEntry>[];
-
- for (final sparseMap in sparseMaps) {
- final maxEntries = sparseMap.length ~/ 24;
- for (var i = 0; i < maxEntries; i++) {
- // This termination condition is identical to GNU and BSD tar.
- if (sparseMap[i * 24] == 0) {
- // Don't return, need to process extended headers (even if empty)
- break;
- }
-
- final offset = sparseMap.readNumeric(i * 24, 12);
- final length = sparseMap.readNumeric(i * 24 + 12, 12);
-
- sparseData.add(SparseEntry(offset, length));
+ // A full block of sparse data contains up to 21 entries
+ for (var i = 0; i < 21; i++) {
+ if (!readEntry(block, i * 24)) break;
}
+
+ // The last bytes indicates whether another sparse header block follows.
+ isExtended = block[504] != 0;
}
- return sparseData;
+
+ return sparseEntries;
}
}
@@ -864,27 +851,41 @@
/// [ChunkedStreamReader.readStream] might return a stream shorter than
/// expected. That indicates an invalid tar file though, since the correct size
/// is stored in the header.
-class _OutgoingStreamGuard extends EventSink<List<int>> {
- final int expectedSize;
+class _OutgoingStreamGuard extends EventSink<Uint8List> {
+ final int size;
+ final int expectedBlockCount;
final EventSink<List<int>> out;
void Function() onDone;
- int emittedSize = 0;
+ int emittedBlocks = 0;
bool hadError = false;
- _OutgoingStreamGuard(this.expectedSize, this.out, this.onDone);
+ _OutgoingStreamGuard(this.size, this.out, this.onDone)
+ : expectedBlockCount = numBlocks(size);
@override
- void add(List<int> event) {
- emittedSize += event.length;
+ void add(Uint8List event) {
+ if (event.length != blockSize) {
+ addError(TarException('Unexpected end of tar file'), StackTrace.current);
+ return;
+ }
+
+ emittedBlocks++;
// We have checks limiting the length of outgoing streams. If the stream is
// larger than expected, that's a bug in pkg:tar.
assert(
- emittedSize <= expectedSize,
- 'Stream now emitted $emittedSize bytes, but only expected '
- '$expectedSize');
+ emittedBlocks <= expectedBlockCount,
+ 'Stream now emitted $emittedBlocks blocks, but only expected '
+ '$expectedBlockCount');
- out.add(event);
+ if (emittedBlocks == expectedBlockCount) {
+ // This is the last block, which we may have to truncate because tar
+ // blocks are padded with zeroes at the end
+ out.add(event.sublistView(0, size.toUnsigned(blockSizeLog2)));
+ } else {
+ // This blocks just goes out fully
+ out.add(event);
+ }
}
@override
@@ -899,7 +900,7 @@
// If the stream stopped after an error, the user is already aware that
// something is wrong.
- if (emittedSize < expectedSize && !hadError) {
+ if (emittedBlocks < expectedBlockCount && !hadError) {
out.addError(
TarException('Unexpected end of tar file'), StackTrace.current);
}
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index a6b7e13..1a0b018 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -1,3 +1,4 @@
+import 'dart:async';
import 'dart:convert';
import 'dart:math';
import 'dart:typed_data';
@@ -229,3 +230,227 @@
yield Uint8List(remainingBytes);
}
}
+
+Stream<Uint8List> _inChunks(Stream<List<int>> input) {
+ Uint8List? startedChunk;
+ int missingForNextChunk = 0;
+
+ Uint8List? pendingEvent;
+ int offsetInPendingEvent = 0;
+
+ late StreamSubscription<List<int>> inputSubscription;
+ final controller = StreamController<Uint8List>(sync: true);
+
+ var isResuming = false;
+
+ void startChunk(Uint8List source, int startOffset) {
+ assert(startedChunk == null);
+ final availableData = source.length - startOffset;
+ assert(availableData < blockSize);
+
+ startedChunk = Uint8List(blockSize)
+ ..setAll(0, source.sublistView(startOffset));
+ missingForNextChunk = blockSize - availableData;
+ }
+
+ void handleData(List<int> data) {
+ assert(pendingEvent == null,
+ 'Had pending events while receiving new data from source.');
+ final typedData = data.asUint8List();
+
+ // The start offset of the new data that we didn't process yet.
+ var offsetInData = 0;
+ bool saveStateIfPaused() {
+ if (controller.isPausedOrClosed) {
+ pendingEvent = typedData;
+ offsetInPendingEvent = offsetInData;
+ return true;
+ }
+ return false;
+ }
+
+ // Try completing the pending chunk first, if it exists
+ if (startedChunk != null) {
+ final started = startedChunk!;
+ final startOffsetInStarted = blockSize - missingForNextChunk;
+
+ if (data.length >= missingForNextChunk) {
+ // Fill up the chunk, then go on with the remaining data
+ started.setAll(startOffsetInStarted,
+ typedData.sublistView(0, missingForNextChunk));
+ controller.add(started);
+ offsetInData += missingForNextChunk;
+
+ // We just finished serving the started chunk, so reset that
+ startedChunk = null;
+ missingForNextChunk = 0;
+
+ // If the controller was paused in a response to the add, stop serving
+ // events and store the rest of the input for later
+ if (saveStateIfPaused()) return;
+ } else {
+ // Ok, we can't finish the pending chunk with the new data but at least
+ // we can continue filling it up
+ started.setAll(startOffsetInStarted, typedData);
+ missingForNextChunk -= typedData.length;
+ return;
+ }
+ }
+
+ // The started chunk has been completed, continue by adding chunks as they
+ // come.
+ assert(startedChunk == null);
+
+ while (offsetInData < typedData.length) {
+ // Can we serve a full block from the new event
+ if (offsetInData <= typedData.length - blockSize) {
+ // Yup, then let's do that
+ final end = offsetInData + blockSize;
+ controller.add(typedData.sublistView(offsetInData, end));
+ offsetInData = end;
+
+ // Once again, stop and save state if the controller was paused.
+ if (saveStateIfPaused()) return;
+ } else {
+ // Ok, no full block but we can start a new pending chunk
+ startChunk(typedData, offsetInData);
+ break;
+ }
+ }
+ }
+
+ void startResume() {
+ isResuming = false;
+ if (controller.isPausedOrClosed) return;
+
+ // Start dispatching pending events before we resume the subscription on the
+ // main stream.
+ if (pendingEvent != null) {
+ final pending = pendingEvent!;
+ assert(startedChunk == null, 'Had pending events and a started chunk');
+
+ while (offsetInPendingEvent < pending.length) {
+ // Can we serve a full block from pending data?
+ if (offsetInPendingEvent <= pending.length - blockSize) {
+ final end = offsetInPendingEvent + blockSize;
+ controller.add(pending.sublistView(offsetInPendingEvent, end));
+ offsetInPendingEvent = end;
+
+ // Pause if we got a pause request in response to the add
+ if (controller.isPausedOrClosed) return;
+ } else {
+ // Store pending block that we can't finish with the pending chunk.
+ startChunk(pending, offsetInPendingEvent);
+ break;
+ }
+ }
+
+ // Pending events have been dispatched
+ offsetInPendingEvent = 0;
+ pendingEvent = null;
+ }
+
+ // When we're here, all pending events have been dispatched and the
+ // controller is not paused. Let's continue then!
+ inputSubscription.resume();
+ }
+
+ void onDone() {
+ // Input stream is done. If we have a block left over, emit that now
+ if (startedChunk != null) {
+ controller
+ .add(startedChunk!.sublistView(0, blockSize - missingForNextChunk));
+ }
+ controller.close();
+ }
+
+ controller
+ ..onListen = () {
+ inputSubscription = input.listen(handleData,
+ onError: controller.addError, onDone: onDone);
+ }
+ ..onPause = () {
+ if (!inputSubscription.isPaused) {
+ inputSubscription.pause();
+ }
+ }
+ ..onResume = () {
+ // This is a bit hacky. Our subscription is most likely going to be a
+ // _BufferingStreamSubscription which will buffer events that were added
+ // in this callback. However, we really want to know when the subscription
+ // has been paused in response to a new event because we can handle that
+ // very efficiently.
+ if (!isResuming) {
+ scheduleMicrotask(startResume);
+ }
+ isResuming = true;
+ }
+ ..onCancel = () {
+ inputSubscription.cancel();
+ };
+
+ return controller.stream;
+}
+
+extension on StreamController<dynamic> {
+ bool get isPausedOrClosed => isPaused || isClosed;
+}
+
+extension ReadInChunks on Stream<List<int>> {
+ /// A stream emitting values in chunks of `512` byte blocks.
+ ///
+ /// The last emitted chunk may be shorter than the regular block length.
+ Stream<Uint8List> get inChunks => _inChunks(this);
+}
+
+extension NextOrNull<T extends Object> on StreamIterator<T> {
+ Future<T?> get nextOrNull async {
+ if (await moveNext()) {
+ return current;
+ } else {
+ return null;
+ }
+ }
+
+ Stream<T> nextElements(int count) {
+ if (count == 0) return const Stream<Never>.empty();
+
+ var remaining = count;
+ Future<Object?>? pendingOperation;
+ final controller = StreamController<T>(sync: true);
+
+ void addNewEvent() {
+ if (pendingOperation == null && remaining > 0) {
+ // Start fetching data
+ final fetchNext = nextOrNull.then<Object?>((value) {
+ remaining--;
+ if (value != null) {
+ // Got data, let's add it
+ controller.add(value);
+ }
+
+ if (value == null || remaining == 0) {
+ // Finished, so close the controller
+ return controller.close();
+ } else {
+ return Future.value();
+ }
+ }, onError: controller.addError);
+
+ pendingOperation = fetchNext.whenComplete(() {
+ pendingOperation = null;
+ if (!controller.isPaused && remaining > 0) {
+ // Controller isn't paused, so add another event
+ addNewEvent();
+ }
+ });
+ }
+ }
+
+ controller
+ ..onListen = addNewEvent
+ ..onResume = addNewEvent;
+
+ return controller.stream;
+ }
+}
diff --git a/pubspec.yaml b/pubspec.yaml
index 1e60694..8886a5d 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -10,6 +10,7 @@
async: ^2.6.0
meta: ^1.3.0
typed_data: ^1.3.0
+ stream_transform: ^2.0.0
dev_dependencies:
charcode: ^1.2.0
diff --git a/test/utils_test.dart b/test/utils_test.dart
index 32de010..e8d5ea2 100644
--- a/test/utils_test.dart
+++ b/test/utils_test.dart
@@ -183,6 +183,10 @@
});
}
});
+
+ group('inChunks', () {
+ test('keeps data in tact', () {});
+ });
}
Uint8List _bytes(String str) {