Use optimized stream chunker
diff --git a/lib/src/reader.dart b/lib/src/reader.dart
index f1877aa..df89c02 100644
--- a/lib/src/reader.dart
+++ b/lib/src/reader.dart
@@ -22,7 +22,7 @@
@sealed
class TarReader implements StreamIterator<TarEntry> {
/// A chunked stream iterator to enable us to get our data.
- final StreamIterator<Uint8List> _chunkedStream;
+ final StreamBlockReader _reader;
final PaxHeaders _paxHeaders = PaxHeaders();
final int _maxSpecialFileSize;
@@ -84,7 +84,7 @@
TarReader(Stream<List<int>> tarStream,
{int maxSpecialFileSize = defaultSpecialLength,
bool disallowTrailingData = false})
- : _chunkedStream = StreamIterator(tarStream.inChunks),
+ : _reader = StreamBlockReader(tarStream),
_checkNoTrailingData = disallowTrailingData,
_maxSpecialFileSize = maxSpecialFileSize;
@@ -236,7 +236,7 @@
// Note: Calling cancel is safe when the stream has already been completed.
// It's a noop in that case, which is what we want.
- return _chunkedStream.cancel();
+ return _reader.close();
}
/// Utility function for quickly iterating through all entries in [tarStream].
@@ -307,12 +307,12 @@
Uint8List? block;
do {
- block = await _chunkedStream.nextOrNull;
- if (block != null && !block.isAllZeroes) {
+ block = await _reader.nextBlock();
+ if (!block.isAllZeroes) {
throw TarException(
'Illegal content after the end of the tar archive.');
}
- } while (block != null && block.length == blockSize);
+ } while (block.length == blockSize);
// The stream is done when we couldn't read the full block.
}
@@ -325,7 +325,7 @@
/// Reads a 512-byte block or throws if [allowEmpty] is false.
Future<Uint8List> _readFullBlock({bool allowEmpty = false}) async {
- final block = await _chunkedStream.nextOrNull ?? emptyUint8List;
+ final block = await _reader.nextBlock();
if (block.length != blockSize && !(allowEmpty && block.isEmpty)) {
_unexpectedEof();
}
@@ -337,8 +337,8 @@
final result = Uint8List(blockSize * amount);
for (var i = 0; i < amount; i++) {
- final chunk = await _chunkedStream.nextOrNull;
- if (chunk == null || chunk.length != blockSize) {
+ final chunk = await _reader.nextBlock();
+ if (chunk.length != blockSize) {
_unexpectedEof();
}
@@ -361,10 +361,10 @@
if (rawHeader.isEmpty) return null;
if (rawHeader.isAllZeroes) {
- final next = await _chunkedStream.nextOrNull;
+ final next = await _reader.nextBlock();
// Exactly 1 block of zeroes is read and EOF is hit.
- if (next == null) return null;
+ if (next.isEmpty) return null;
if (next.isAllZeroes) {
// Two blocks of zeros are read - Normal EOF.
@@ -399,7 +399,7 @@
final streamBlockCount = numBlocks(sparseDataLength);
final safeStream = _publishStream(
- _chunkedStream.nextElements(streamBlockCount), sparseDataLength);
+ _reader.nextBlocks(streamBlockCount), sparseDataLength);
return sparseStream(safeStream, sparseHoles, header.size);
} else {
var size = header.size;
@@ -413,8 +413,7 @@
return _publishStream(const Stream<Never>.empty(), 0);
} else {
final blockCount = numBlocks(header.size);
- return _publishStream(
- _chunkedStream.nextElements(blockCount), header.size);
+ return _publishStream(_reader.nextBlocks(blockCount), header.size);
}
}
}
@@ -514,8 +513,8 @@
/// Ensures that [block] h as at least [n] tokens.
Future<void> feedTokens(int n) async {
while (newLineCount < n) {
- final newBlock = await _chunkedStream.nextOrNull;
- if (newBlock == null || newBlock.length < blockSize) {
+ final newBlock = await _reader.nextBlock();
+ if (newBlock.length < blockSize) {
throw TarException.header(
'GNU Sparse Map does not have enough lines!');
}
@@ -659,8 +658,8 @@
while (isExtended) {
// Ok, we have a new block of sparse headers to process
- final block = await _chunkedStream.nextOrNull;
- if (block == null || block.length < blockSize) {
+ final block = await _reader.nextBlock();
+ if (block.length < blockSize) {
throw TarException.header('Unexpected EoF while reading sparse maps');
}
@@ -852,40 +851,48 @@
/// expected. That indicates an invalid tar file though, since the correct size
/// is stored in the header.
class _OutgoingStreamGuard extends EventSink<Uint8List> {
- final int size;
- final int expectedBlockCount;
final EventSink<List<int>> out;
void Function() onDone;
- int emittedBlocks = 0;
+ int remainingContentSize;
+ int remainingPaddingSize;
+ bool isInContent = true;
bool hadError = false;
- _OutgoingStreamGuard(this.size, this.out, this.onDone)
- : expectedBlockCount = numBlocks(size);
+ _OutgoingStreamGuard(this.remainingContentSize, this.out, this.onDone)
+ : remainingPaddingSize = _paddingFor(remainingContentSize);
+
+ static int _paddingFor(int contentSize) {
+ final offsetInLastBlock = contentSize.toUnsigned(blockSizeLog2);
+ if (offsetInLastBlock != 0) {
+ return blockSize - offsetInLastBlock;
+ }
+ return 0;
+ }
@override
void add(Uint8List event) {
- if (event.length != blockSize) {
- addError(TarException('Unexpected end of tar file'), StackTrace.current);
- return;
- }
-
- emittedBlocks++;
- // We have checks limiting the length of outgoing streams. If the stream is
- // larger than expected, that's a bug in pkg:tar.
- assert(
- emittedBlocks <= expectedBlockCount,
- 'Stream now emitted $emittedBlocks blocks, but only expected '
- '$expectedBlockCount');
-
- if (emittedBlocks == expectedBlockCount) {
- // This is the last block, which we may have to truncate because tar
- // blocks are padded with zeroes at the end
- out.add(event.sublistView(0, size.toUnsigned(blockSizeLog2)));
+ if (isInContent) {
+ if (event.length < remainingContentSize) {
+ // We can fully add this chunk as it consists entirely of data
+ out.add(event);
+ remainingContentSize -= event.length;
+ } else {
+ // We can add the first bytes as content, the others are padding that we
+ // shouldn't emit
+ out.add(event.sublistView(0, remainingContentSize));
+ isInContent = false;
+ remainingPaddingSize -= event.length - remainingContentSize;
+ remainingContentSize = 0;
+ }
} else {
- // This blocks just goes out fully
- out.add(event);
+ // Ok, the entire event is padding
+ remainingPaddingSize -= event.length;
}
+
+ // The underlying stream comes from pkg:tar, so if we get too many bytes
+ // that's a bug in this package.
+ assert(remainingPaddingSize >= 0, 'Stream emitted to many bytes');
}
@override
@@ -896,15 +903,15 @@
@override
void close() {
- onDone();
-
// If the stream stopped after an error, the user is already aware that
// something is wrong.
- if (emittedBlocks < expectedBlockCount && !hadError) {
+ if (remainingContentSize > 0 && !hadError) {
+ hadError = true;
out.addError(
TarException('Unexpected end of tar file'), StackTrace.current);
}
+ onDone();
out.close();
}
}
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index 1a0b018..c55917b 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -231,226 +231,323 @@
}
}
-Stream<Uint8List> _inChunks(Stream<List<int>> input) {
- Uint8List? startedChunk;
- int missingForNextChunk = 0;
+class StreamBlockReader {
+ final Stream<List<int>> _source;
+ StreamSubscription<List<int>>? _inputSubscription;
- Uint8List? pendingEvent;
- int offsetInPendingEvent = 0;
+ Completer<Uint8List>? _nextBlockCompleter;
- late StreamSubscription<List<int>> inputSubscription;
- final controller = StreamController<Uint8List>(sync: true);
+ StreamController<Uint8List>? _nextBlocksStream;
+ int _remainingBlocksInStream = 0;
- var isResuming = false;
+ Uint8List? _pendingBlock;
+ int _missingInPendingBlock = 0;
- void startChunk(Uint8List source, int startOffset) {
- assert(startedChunk == null);
+ Uint8List? _waitingForDispatch;
+ int _offsetInWaiting = 0;
+
+ bool _closed = false;
+
+ StreamBlockReader(this._source);
+
+ int get _blocksToServe {
+ if (_nextBlockCompleter != null) {
+ return 1;
+ } else {
+ return _remainingBlocksInStream;
+ }
+ }
+
+ void _listenOrResume() {
+ final subsription = _inputSubscription;
+ if (subsription == null) {
+ _inputSubscription =
+ _source.listen(_onData, onError: _onError, onDone: _onDone);
+ } else {
+ subsription.resume();
+ }
+ }
+
+ /// Emits ready [blocks].
+ ///
+ /// Returns whether we should pause emitting further data afterwards.
+ bool _emitBlocks(Uint8List blocks, {int amount = 1}) {
+ final stream = _nextBlocksStream;
+ final future = _nextBlockCompleter;
+ final subscription = _inputSubscription;
+ assert((future == null) ^ (stream == null));
+ assert(subscription != null);
+
+ final bool shouldPause;
+
+ if (stream != null) {
+ stream.add(blocks);
+ final remaining = _remainingBlocksInStream -= amount;
+ assert(remaining >= 0);
+
+ final done = remaining == 0;
+ if (done) {
+ stream.close();
+ _nextBlocksStream = null;
+ }
+
+ shouldPause = done || stream.isPausedOrClosed;
+ } else if (future != null) {
+ assert(amount == 1);
+ future.complete(blocks);
+ _nextBlockCompleter = null;
+ shouldPause = true;
+ } else {
+ // Exactly one of them must be non-null
+ throw AssertionError();
+ }
+
+ if (shouldPause && !subscription!.isPaused) {
+ subscription.pause();
+ }
+ return shouldPause;
+ }
+
+ void _startPendingBlock(Uint8List source, int startOffset) {
+ assert(_pendingBlock == null);
final availableData = source.length - startOffset;
assert(availableData < blockSize);
- startedChunk = Uint8List(blockSize)
+ _pendingBlock = Uint8List(blockSize)
..setAll(0, source.sublistView(startOffset));
- missingForNextChunk = blockSize - availableData;
+ _missingInPendingBlock = blockSize - availableData;
}
- void handleData(List<int> data) {
- assert(pendingEvent == null,
- 'Had pending events while receiving new data from source.');
- final typedData = data.asUint8List();
+ void _saveNewUndispatchedState(Uint8List chunk, int offsetInChunk) {
+ assert(_waitingForDispatch == null);
+ _saveUndispatchedState(chunk, offsetInChunk);
+ }
+
+ void _saveUndispatchedState(Uint8List chunk, int offsetInChunk) {
+ assert(_pendingBlock == null);
+ final remaining = chunk.length - offsetInChunk;
+ if (remaining < blockSize) {
+ _waitingForDispatch = null;
+ _offsetInWaiting = 0;
+ _startPendingBlock(chunk, offsetInChunk);
+ } else {
+ _waitingForDispatch = chunk;
+ _offsetInWaiting = offsetInChunk;
+ }
+ }
+
+ void _onData(List<int> chunk) {
+ assert(_waitingForDispatch == null,
+ 'Had pending data while receiving new event');
+
+ final typedChunk = chunk.asUint8List();
// The start offset of the new data that we didn't process yet.
var offsetInData = 0;
- bool saveStateIfPaused() {
- if (controller.isPausedOrClosed) {
- pendingEvent = typedData;
- offsetInPendingEvent = offsetInData;
- return true;
- }
- return false;
+
+ void savePendingChunk() {
+ _saveNewUndispatchedState(typedChunk, offsetInData);
}
- // Try completing the pending chunk first, if it exists
- if (startedChunk != null) {
- final started = startedChunk!;
- final startOffsetInStarted = blockSize - missingForNextChunk;
+ // Try finishing the pending block first, if it exists.
+ if (_pendingBlock != null) {
+ final started = _pendingBlock!;
+ final missing = _missingInPendingBlock;
+ final startOffsetInStarted = blockSize - missing;
- if (data.length >= missingForNextChunk) {
- // Fill up the chunk, then go on with the remaining data
- started.setAll(startOffsetInStarted,
- typedData.sublistView(0, missingForNextChunk));
- controller.add(started);
- offsetInData += missingForNextChunk;
+ if (typedChunk.length >= missing) {
+ // We can complete the block
+ started.setAll(
+ startOffsetInStarted, typedChunk.sublistView(0, missing));
+ offsetInData = missing;
+ final done = _emitBlocks(started);
- // We just finished serving the started chunk, so reset that
- startedChunk = null;
- missingForNextChunk = 0;
+ // We just finished serving the started block, so reset that
+ _pendingBlock = null;
+ _missingInPendingBlock = 0;
- // If the controller was paused in a response to the add, stop serving
- // events and store the rest of the input for later
- if (saveStateIfPaused()) return;
+ if (done) {
+ savePendingChunk();
+ return;
+ }
} else {
- // Ok, we can't finish the pending chunk with the new data but at least
+ // We can't complete the pending block with the new data, but at least
// we can continue filling it up
- started.setAll(startOffsetInStarted, typedData);
- missingForNextChunk -= typedData.length;
+ started.setAll(startOffsetInStarted, typedChunk);
+ _missingInPendingBlock -= typedChunk.length;
+ assert(_missingInPendingBlock > 0);
return;
}
}
- // The started chunk has been completed, continue by adding chunks as they
- // come.
- assert(startedChunk == null);
+ // When we get to this point, the started chunk has been completed and we
+ // can continue adding chunks as they come.
+ assert(_pendingBlock == null);
- while (offsetInData < typedData.length) {
- // Can we serve a full block from the new event
- if (offsetInData <= typedData.length - blockSize) {
- // Yup, then let's do that
- final end = offsetInData + blockSize;
- controller.add(typedData.sublistView(offsetInData, end));
- offsetInData = end;
+ // Serve as many blocks as we can from the current chunk
+ final blocksToServe = _blocksToServe;
+ assert(blocksToServe >= 1);
+ final availableBlocksInChunk =
+ (typedChunk.length - offsetInData) ~/ blockSize;
+ final blocksToDeliverNow = min(blocksToServe, availableBlocksInChunk);
- // Once again, stop and save state if the controller was paused.
- if (saveStateIfPaused()) return;
+ if (blocksToDeliverNow > 0) {
+ final length = blockSize * blocksToDeliverNow;
+ final end = offsetInData + length;
+ final done = _emitBlocks(typedChunk.sublistView(offsetInData, end),
+ amount: blocksToDeliverNow);
+ offsetInData = end;
+
+ // Once again, stop and save state if this has completed the output
+ if (done) {
+ savePendingChunk();
+ return;
+ }
+ }
+
+ // If we're here, the remaining data in the chunk was not large enough to
+ // serve a full block. Store the remaining data in a pending block.
+ _startPendingBlock(typedChunk, offsetInData);
+ }
+
+ void _onError(Object error, [StackTrace? trace]) {
+ final stream = _nextBlocksStream;
+ final future = _nextBlockCompleter;
+
+ if (stream != null) {
+ stream.addError(error, trace);
+ } else if (future != null) {
+ future.completeError(error, trace);
+ _nextBlockCompleter = null;
+ }
+
+ final subscription = _inputSubscription;
+ if (!subscription!.isPaused) {
+ subscription.pause();
+ }
+ }
+
+ void _onDone() {
+ if (_blocksToServe > 0) {
+ if (_pendingBlock != null) {
+ _emitBlocks(
+ _pendingBlock!.sublistView(0, blockSize - _missingInPendingBlock));
+ _pendingBlock = null;
+ _missingInPendingBlock = 0;
} else {
- // Ok, no full block but we can start a new pending chunk
- startChunk(typedData, offsetInData);
- break;
+ // Complete with an empty block
+ _emitBlocks(emptyUint8List);
}
+
+ _inputSubscription = null;
+ }
+ close();
+ }
+
+ void _checkIdle(String method) {
+ if (_nextBlockCompleter != null) {
+ throw StateError("Can't call $method before a previous call to "
+ 'nextBlock() has completed.');
+ }
+ if (_nextBlocksStream != null) {
+ throw StateError("Can't call $method before a previous call to "
+ 'nextBlocks() was completed.');
}
}
- void startResume() {
- isResuming = false;
- if (controller.isPausedOrClosed) return;
-
- // Start dispatching pending events before we resume the subscription on the
- // main stream.
- if (pendingEvent != null) {
- final pending = pendingEvent!;
- assert(startedChunk == null, 'Had pending events and a started chunk');
-
- while (offsetInPendingEvent < pending.length) {
- // Can we serve a full block from pending data?
- if (offsetInPendingEvent <= pending.length - blockSize) {
- final end = offsetInPendingEvent + blockSize;
- controller.add(pending.sublistView(offsetInPendingEvent, end));
- offsetInPendingEvent = end;
-
- // Pause if we got a pause request in response to the add
- if (controller.isPausedOrClosed) return;
- } else {
- // Store pending block that we can't finish with the pending chunk.
- startChunk(pending, offsetInPendingEvent);
- break;
- }
- }
-
- // Pending events have been dispatched
- offsetInPendingEvent = 0;
- pendingEvent = null;
- }
-
- // When we're here, all pending events have been dispatched and the
- // controller is not paused. Let's continue then!
- inputSubscription.resume();
- }
-
- void onDone() {
- // Input stream is done. If we have a block left over, emit that now
- if (startedChunk != null) {
- controller
- .add(startedChunk!.sublistView(0, blockSize - missingForNextChunk));
- }
- controller.close();
- }
-
- controller
- ..onListen = () {
- inputSubscription = input.listen(handleData,
- onError: controller.addError, onDone: onDone);
- }
- ..onPause = () {
- if (!inputSubscription.isPaused) {
- inputSubscription.pause();
- }
- }
- ..onResume = () {
- // This is a bit hacky. Our subscription is most likely going to be a
- // _BufferingStreamSubscription which will buffer events that were added
- // in this callback. However, we really want to know when the subscription
- // has been paused in response to a new event because we can handle that
- // very efficiently.
- if (!isResuming) {
- scheduleMicrotask(startResume);
- }
- isResuming = true;
- }
- ..onCancel = () {
- inputSubscription.cancel();
- };
-
- return controller.stream;
-}
-
-extension on StreamController<dynamic> {
- bool get isPausedOrClosed => isPaused || isClosed;
-}
-
-extension ReadInChunks on Stream<List<int>> {
- /// A stream emitting values in chunks of `512` byte blocks.
+ /// Reads the next block from the input stream.
///
- /// The last emitted chunk may be shorter than the regular block length.
- Stream<Uint8List> get inChunks => _inChunks(this);
-}
+ /// The returned list may be empty or smaller than [blockSize] if the input
+ /// stream ends.
+ Future<Uint8List> nextBlock() {
+ _checkIdle('nextBlock');
+ if (_closed) return Future.value(emptyUint8List);
-extension NextOrNull<T extends Object> on StreamIterator<T> {
- Future<T?> get nextOrNull async {
- if (await moveNext()) {
- return current;
- } else {
- return null;
+ // If we have pending data to serve, do that before we start resuming the
+ // stream subscription
+ final waiting = _waitingForDispatch;
+ if (waiting != null) {
+ final offset = _offsetInWaiting;
+ assert(waiting.length - offset >= blockSize);
+
+ final end = offset + blockSize;
+ final result = Future<Uint8List>.value(waiting.sublistView(offset, end));
+
+ if (end == waiting.length) {
+ // We just completed all data waiting for dispatch
+ _waitingForDispatch = null;
+ _offsetInWaiting = 0;
+ } else {
+ // Store new pending data
+ _saveUndispatchedState(waiting, offset + blockSize);
+ }
+ return result;
}
+
+ // Note: Our logic in _emitBlocks depends on this completer NOT being sync
+ final next = _nextBlockCompleter = Completer();
+ _listenOrResume();
+ return next.future;
}
- Stream<T> nextElements(int count) {
- if (count == 0) return const Stream<Never>.empty();
+ Stream<Uint8List> nextBlocks(int amount) {
+ _checkIdle('nextBlocks');
- var remaining = count;
- Future<Object?>? pendingOperation;
- final controller = StreamController<T>(sync: true);
+ if (amount == 0 || _closed) return const Stream.empty();
- void addNewEvent() {
- if (pendingOperation == null && remaining > 0) {
- // Start fetching data
- final fetchNext = nextOrNull.then<Object?>((value) {
- remaining--;
- if (value != null) {
- // Got data, let's add it
- controller.add(value);
- }
+ // This controller must not be sync: We add events in response to onListen
+ // and expect a delay after closing it in _emitBlocks
+ final controller = _nextBlocksStream = StreamController();
+ _remainingBlocksInStream = amount;
- if (value == null || remaining == 0) {
- // Finished, so close the controller
- return controller.close();
- } else {
- return Future.value();
- }
- }, onError: controller.addError);
+ void onListenOrResume() {
+ // Once again, check if we have pending data to serve
+ final waiting = _waitingForDispatch;
+ if (waiting != null) {
+ final offset = _offsetInWaiting;
+ final blocksWaiting = (waiting.length - offset) ~/ blockSize;
+ assert(blocksWaiting > 0);
- pendingOperation = fetchNext.whenComplete(() {
- pendingOperation = null;
- if (!controller.isPaused && remaining > 0) {
- // Controller isn't paused, so add another event
- addNewEvent();
- }
- });
+ final blocksToEmit = min(blocksWaiting, amount);
+ final end = offset + blocksToEmit * blockSize;
+ final done =
+ _emitBlocks(waiting.sublistView(offset, end), amount: blocksToEmit);
+ _saveUndispatchedState(waiting, end);
+ if (!done) {
+ _listenOrResume();
+ }
+ } else {
+ // No data waiting to be served, so we definitely have to start
+ // listening
+ _listenOrResume();
}
}
controller
- ..onListen = addNewEvent
- ..onResume = addNewEvent;
-
+ ..onListen = onListenOrResume
+ ..onPause = () {
+ final sub = _inputSubscription;
+ if (!sub!.isPaused) sub.pause();
+ }
+ ..onResume = onListenOrResume;
return controller.stream;
}
+
+ Future<void> close() async {
+ if (_closed) return;
+
+ _closed = true;
+ await _inputSubscription?.cancel();
+
+ _nextBlockCompleter?.complete(emptyUint8List);
+
+ final stream = _nextBlocksStream;
+ if (stream != null) {
+ await stream.close();
+ }
+ }
+}
+
+extension on StreamController<dynamic> {
+ bool get isPausedOrClosed => isPaused || isClosed;
}
diff --git a/pubspec.yaml b/pubspec.yaml
index 8886a5d..1e60694 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -10,7 +10,6 @@
async: ^2.6.0
meta: ^1.3.0
typed_data: ^1.3.0
- stream_transform: ^2.0.0
dev_dependencies:
charcode: ^1.2.0