Upgrade `package:tar` to 0.5.6 (#3540)
diff --git a/lib/src/third_party/tar/README.md b/lib/src/third_party/tar/README.md
index a2fbf7b..5e12e5a 100644
--- a/lib/src/third_party/tar/README.md
+++ b/lib/src/third_party/tar/README.md
@@ -4,4 +4,4 @@
tar-archives.
* Repository: `https://github.com/simolus3/tar/`
- * Revision: `901ae404e0a225d9b08e5253415ca092f5c08706`
+ * Revision: `23ee71d667f003fba8c80ee126d5e1330d17c141`
diff --git a/lib/src/third_party/tar/src/reader.dart b/lib/src/third_party/tar/src/reader.dart
index b9bc3d3..8502a26 100644
--- a/lib/src/third_party/tar/src/reader.dart
+++ b/lib/src/third_party/tar/src/reader.dart
@@ -3,7 +3,6 @@
import 'dart:convert';
import 'dart:typed_data';
-import 'package:async/async.dart';
import 'package:meta/meta.dart';
import 'package:typed_data/typed_data.dart';
@@ -27,21 +26,7 @@
final int _maxSpecialFileSize;
TarEntry? _current;
-
- /// The underlying content stream for the [_current] entry. Draining this
- /// stream will move the tar reader to the beginning of the next file.
- ///
- /// This is not the same as `_current.stream` for sparse files, which are
- /// reported as expanded through [TarEntry.contents].
- /// For that reason, we prefer to drain this stream when skipping a tar entry.
- /// When we know we're skipping data, there's no point expanding sparse holes.
- ///
- /// This stream is always set to null after being drained, and there can only
- /// be one [_underlyingContentStream] at a time.
- Stream<List<int>>? _underlyingContentStream;
-
- /// Whether [_current] has ever been listened to.
- bool _listenedToContentsOnce = false;
+ _CurrentEntryStream? _currentStream;
/// Whether we're in the process of reading tar headers.
bool _isReadingHeaders = false;
@@ -220,7 +205,9 @@
nextHeader.format = format;
_current = TarEntry(nextHeader, content);
- _listenedToContentsOnce = false;
+ final currentStreams = _currentStream;
+ assert(currentStreams == null ||
+ currentStreams.state == _EntryStreamState.preListen);
_isReadingHeaders = false;
return true;
}
@@ -233,8 +220,7 @@
_isDone = true;
_current = null;
- _underlyingContentStream = null;
- _listenedToContentsOnce = false;
+ _currentStream = null;
_isReadingHeaders = false;
// Note: Calling cancel is safe when the stream has already been completed.
@@ -276,18 +262,51 @@
}
_isReadingHeaders = true;
- final underlyingStream = _underlyingContentStream;
+ final underlyingStream = _currentStream;
if (underlyingStream != null) {
- if (_listenedToContentsOnce) {
- throw StateError(
+ switch (underlyingStream.state) {
+ case _EntryStreamState.preListen:
+ await underlyingStream.drain<void>();
+ // The stream should reset when drained (we do this in _publishStream)
+ assert(_currentStream == null);
+
+ break;
+ case _EntryStreamState.subscriptionActive:
+ throw StateError(
'Illegal call to TarReader.moveNext() while a previous stream was '
'active.\n'
'When listening to tar contents, make sure the stream is '
- 'complete or cancelled before calling TarReader.moveNext() again.');
- } else {
- await underlyingStream.drain<void>();
- // The stream should reset when drained (we do this in _publishStream)
- assert(_underlyingContentStream == null);
+ 'complete or cancelled before calling TarReader.moveNext() again.',
+ );
+ case _EntryStreamState.cancelled:
+ // ignore: cancel_subscriptions
+ final subscription = underlyingStream._sourceSubscription!;
+
+ // Re-purpose the existing subscription to drain the stream
+ assert(subscription.isPaused);
+
+ subscription
+ ..onData(null)
+ ..resume();
+
+ try {
+ await subscription.asFuture<void>();
+ } on Object {
+ await cancel();
+ rethrow;
+ } finally {
+ // This also draines the stream
+ _currentStream = null;
+ }
+
+ break;
+ case _EntryStreamState.done:
+ assert(
+ false,
+ 'Unreachable: There should not be a currentStream in a done state, '
+ 'as the stream is no longer current at that point.',
+ );
+ break;
}
}
}
@@ -418,59 +437,14 @@
/// Publishes an library-internal stream for users.
///
/// This adds a check to ensure that the stream we're exposing has the
- /// expected length. It also sets the [_underlyingContentStream] field when
- /// the stream starts and resets it when it's done.
- Stream<List<int>> _publishStream(Stream<List<int>> stream, int length) {
+ /// expected length. It also sets the [_currentStream] field and resets it
+ /// when it's done.
+ Stream<List<int>> _publishStream(Stream<Uint8List> stream, int length) {
// There can only be one content stream at a time. This precondition is
// checked by _prepareToReadHeaders.
- assert(_underlyingContentStream == null);
- Stream<List<int>>? thisStream;
+ assert(_currentStream == null);
- return thisStream =
- _underlyingContentStream = Stream.eventTransformed(stream, (sink) {
- // This callback is called when we have a listener. Make sure that, at
- // this point, this stream is still the active content stream.
- // If users store the contents of a tar header, then read more tar
- // entries, and finally try to read the stream of the old contents, they'd
- // get an exception about the straem already being listened to.
- // This can be a bit confusing, so this check enables a better error UX.
- if (thisStream != _underlyingContentStream) {
- throw StateError(
- 'Tried listening to an outdated tar entry. \n'
- 'As all tar entries found by a reader are backed by a single source '
- 'stream, only the latest tar entry can be read. It looks like you '
- 'stored the results of `tarEntry.contents` somewhere, called '
- '`reader.moveNext()` and then read the contents of the previous '
- 'entry.\n'
- 'For more details, including a discussion of workarounds, see '
- 'https://github.com/simolus3/tar/issues/18',
- );
- } else if (_listenedToContentsOnce) {
- throw StateError(
- 'A tar entry has been listened to multiple times. \n'
- 'As all tar entries are read from what\'s likely a single-'
- 'subscription stream, this is unsupported. If you didn\'t read a tar '
- 'entry multiple times yourself, perhaps you\'ve called `moveNext()` '
- 'before reading contents?',
- );
- }
-
- _listenedToContentsOnce = true;
-
- late _OutgoingStreamGuard guard;
- return guard = _OutgoingStreamGuard(
- length,
- sink,
- // Reset state when the stream is done. This will only be called when
- // the stream is done, not when a listener cancels.
- () {
- _underlyingContentStream = null;
- if (guard.hadError) {
- cancel();
- }
- },
- );
- });
+ return _currentStream = _CurrentEntryStream(this, stream, length);
}
/// Checks the PAX headers for GNU sparse headers.
@@ -881,23 +855,96 @@
}
}
-/// Event-sink tracking the length of emitted tar entry streams.
+enum _EntryStreamState {
+ preListen,
+ subscriptionActive,
+ cancelled,
+ done,
+}
+
+/// The underlying content stream for the [TarReader._current] entry. Draining
+/// this stream will move the tar reader to the beginning of the next file.
///
-/// [ChunkedStreamReader.readStream] might return a stream shorter than
-/// expected. That indicates an invalid tar file though, since the correct size
-/// is stored in the header.
-class _OutgoingStreamGuard extends EventSink<Uint8List> {
- int remainingContentSize;
- int remainingPaddingSize;
+/// This is not the same as `_current.stream` for sparse files, which are
+/// reported as expanded through [TarEntry.contents].
+/// For that reason, we prefer to drain this stream when skipping a tar entry.
+/// When we know we're skipping data, there's no point expanding sparse holes.
+///
+/// Draining this stream will set the [TarReader._currentStream] field back to
+/// null. There can only be one content stream at the time.
+class _CurrentEntryStream extends Stream<List<int>> {
+ _EntryStreamState state = _EntryStreamState.preListen;
- final EventSink<List<int>> out;
- void Function() onDone;
+ final TarReader _reader;
+ final Stream<Uint8List> _source;
- bool hadError = false;
- bool isInContent = true;
+ final StreamController<List<int>> _listener = StreamController(sync: true);
+ // ignore: cancel_subscriptions
+ StreamSubscription<List<int>>? _sourceSubscription;
- _OutgoingStreamGuard(this.remainingContentSize, this.out, this.onDone)
- : remainingPaddingSize = _paddingFor(remainingContentSize);
+ int _remainingContentSize;
+ int _remainingPaddingSize;
+ bool _hadError = false;
+ bool _isInContent = true;
+
+ _CurrentEntryStream(this._reader, this._source, this._remainingContentSize)
+ : _remainingPaddingSize = _paddingFor(_remainingContentSize);
+
+ @override
+ StreamSubscription<List<int>> listen(void Function(List<int> event)? onData,
+ {Function? onError, void Function()? onDone, bool? cancelOnError}) {
+ // Make sure that this entry is still the current one: If users store the
+ // contents of a tar entry, then read more tar entries, and finally try to
+ // read the stream of the old contents, they'd get an exception about the
+ // stream already being listened to.
+ // This can be a bit confusing, so this check enables a better error UX.
+ if (_reader._currentStream != this) {
+ throw StateError(
+ 'Tried listening to an outdated tar entry. \n'
+ 'As all tar entries found by a reader are backed by a single source '
+ 'stream, only the latest tar entry can be read. It looks like you '
+ 'stored the results of `tarEntry.contents` somewhere, called '
+ '`reader.moveNext()` and then read the contents of the previous '
+ 'entry.\n'
+ 'For more details, including a discussion of workarounds, see '
+ 'https://github.com/simolus3/tar/issues/18',
+ );
+ } else if (state != _EntryStreamState.preListen) {
+ throw StateError(
+ 'A tar entry has been listened to multiple times. \n'
+ 'As all tar entries are read from what\'s likely a single-'
+ 'subscription stream, this is unsupported. If you didn\'t read a tar '
+ 'entry multiple times yourself, perhaps you\'ve called `moveNext()` '
+ 'before reading contents?',
+ );
+ }
+
+ // Now we have a listener, so
+ state = _EntryStreamState.subscriptionActive;
+ // ignore: cancel_subscriptions
+ final sub = _sourceSubscription = _source.listen(
+ _forwardData,
+ onError: _forwardError,
+ onDone: _forwardDone,
+ );
+
+ _listener
+ ..onPause = sub.pause
+ ..onResume = sub.resume
+ ..onCancel = () {
+ // Pause the source subscription. When reading the next entry, the tar
+ // reader will drain the remaining source stream.
+ sub.pause();
+ state = _EntryStreamState.cancelled;
+ };
+
+ return _listener.stream.listen(
+ onData,
+ onError: onError,
+ onDone: onDone,
+ cancelOnError: cancelOnError,
+ );
+ }
static int _paddingFor(int contentSize) {
final offsetInLastBlock = contentSize.toUnsigned(blockSizeLog2);
@@ -907,47 +954,59 @@
return 0;
}
- @override
- void add(Uint8List event) {
- if (isInContent) {
- if (event.length <= remainingContentSize) {
+ void _assertInStateForForwarding() {
+ assert(state == _EntryStreamState.subscriptionActive &&
+ _listener.hasListener &&
+ !_listener.isPaused);
+ }
+
+ void _forwardData(Uint8List event) {
+ _assertInStateForForwarding();
+
+ if (_isInContent) {
+ if (event.length <= _remainingContentSize) {
// We can fully add this chunk as it consists entirely of data
- out.add(event);
- remainingContentSize -= event.length;
+ _listener.add(event);
+ _remainingContentSize -= event.length;
} else {
// We can add the first bytes as content, the others are padding that we
// shouldn't emit
- out.add(event.sublistView(0, remainingContentSize));
- isInContent = false;
- remainingPaddingSize -= event.length - remainingContentSize;
- remainingContentSize = 0;
+ _listener.add(event.sublistView(0, _remainingContentSize));
+ _isInContent = false;
+ _remainingPaddingSize -= event.length - _remainingContentSize;
+ _remainingContentSize = 0;
}
} else {
// Ok, the entire event is padding
- remainingPaddingSize -= event.length;
+ _remainingPaddingSize -= event.length;
}
// The underlying stream comes from pkg:tar, so if we get too many bytes
// that's a bug in this package.
- assert(remainingPaddingSize >= 0, 'Stream emitted to many bytes');
+ assert(_remainingPaddingSize >= 0, 'Stream emitted to many bytes');
}
- @override
- void addError(Object error, [StackTrace? stackTrace]) {
- hadError = true;
- out.addError(error, stackTrace);
+ void _forwardError(Object error, StackTrace trace) {
+ _assertInStateForForwarding();
+
+ _hadError = true;
+ _listener.addError(error, trace);
}
- @override
- void close() {
+ void _forwardDone() {
+ _assertInStateForForwarding();
+
+ // Now that the source stream is done, reset the stream state on the reader.
+ state = _EntryStreamState.done;
+ _sourceSubscription = null;
+ _reader._currentStream = null;
+
// If the stream stopped after an error, the user is already aware that
// something is wrong.
- if (remainingContentSize > 0 && !hadError) {
- out.addError(
+ if (_remainingContentSize > 0 && !_hadError) {
+ _listener.addError(
TarException('Unexpected end of tar file'), StackTrace.current);
}
-
- onDone();
- out.close();
+ _listener.close();
}
}