Upgrade `package:tar` to 0.5.6 (#3540)

diff --git a/lib/src/third_party/tar/README.md b/lib/src/third_party/tar/README.md
index a2fbf7b..5e12e5a 100644
--- a/lib/src/third_party/tar/README.md
+++ b/lib/src/third_party/tar/README.md
@@ -4,4 +4,4 @@
 tar-archives.
 
  * Repository: `https://github.com/simolus3/tar/`
- * Revision: `901ae404e0a225d9b08e5253415ca092f5c08706`
+ * Revision: `23ee71d667f003fba8c80ee126d5e1330d17c141`
diff --git a/lib/src/third_party/tar/src/reader.dart b/lib/src/third_party/tar/src/reader.dart
index b9bc3d3..8502a26 100644
--- a/lib/src/third_party/tar/src/reader.dart
+++ b/lib/src/third_party/tar/src/reader.dart
@@ -3,7 +3,6 @@
 import 'dart:convert';
 import 'dart:typed_data';
 
-import 'package:async/async.dart';
 import 'package:meta/meta.dart';
 import 'package:typed_data/typed_data.dart';
 
@@ -27,21 +26,7 @@
   final int _maxSpecialFileSize;
 
   TarEntry? _current;
-
-  /// The underlying content stream for the [_current] entry. Draining this
-  /// stream will move the tar reader to the beginning of the next file.
-  ///
-  /// This is not the same as `_current.stream` for sparse files, which are
-  /// reported as expanded through [TarEntry.contents].
-  /// For that reason, we prefer to drain this stream when skipping a tar entry.
-  /// When we know we're skipping data, there's no point expanding sparse holes.
-  ///
-  /// This stream is always set to null after being drained, and there can only
-  /// be one [_underlyingContentStream] at a time.
-  Stream<List<int>>? _underlyingContentStream;
-
-  /// Whether [_current] has ever been listened to.
-  bool _listenedToContentsOnce = false;
+  _CurrentEntryStream? _currentStream;
 
   /// Whether we're in the process of reading tar headers.
   bool _isReadingHeaders = false;
@@ -220,7 +205,9 @@
         nextHeader.format = format;
 
         _current = TarEntry(nextHeader, content);
-        _listenedToContentsOnce = false;
+        final currentStreams = _currentStream;
+        assert(currentStreams == null ||
+            currentStreams.state == _EntryStreamState.preListen);
         _isReadingHeaders = false;
         return true;
       }
@@ -233,8 +220,7 @@
 
     _isDone = true;
     _current = null;
-    _underlyingContentStream = null;
-    _listenedToContentsOnce = false;
+    _currentStream = null;
     _isReadingHeaders = false;
 
     // Note: Calling cancel is safe when the stream has already been completed.
@@ -276,18 +262,51 @@
     }
     _isReadingHeaders = true;
 
-    final underlyingStream = _underlyingContentStream;
+    final underlyingStream = _currentStream;
     if (underlyingStream != null) {
-      if (_listenedToContentsOnce) {
-        throw StateError(
+      switch (underlyingStream.state) {
+        case _EntryStreamState.preListen:
+          await underlyingStream.drain<void>();
+          // The stream should reset when drained (we do this in _publishStream)
+          assert(_currentStream == null);
+
+          break;
+        case _EntryStreamState.subscriptionActive:
+          throw StateError(
             'Illegal call to TarReader.moveNext() while a previous stream was '
             'active.\n'
             'When listening to tar contents, make sure the stream is '
-            'complete or cancelled before calling TarReader.moveNext() again.');
-      } else {
-        await underlyingStream.drain<void>();
-        // The stream should reset when drained (we do this in _publishStream)
-        assert(_underlyingContentStream == null);
+            'complete or cancelled before calling TarReader.moveNext() again.',
+          );
+        case _EntryStreamState.cancelled:
+          // ignore: cancel_subscriptions
+          final subscription = underlyingStream._sourceSubscription!;
+
+          // Re-purpose the existing subscription to drain the stream
+          assert(subscription.isPaused);
+
+          subscription
+            ..onData(null)
+            ..resume();
+
+          try {
+            await subscription.asFuture<void>();
+          } on Object {
+            await cancel();
+            rethrow;
+          } finally {
+            // This also draines the stream
+            _currentStream = null;
+          }
+
+          break;
+        case _EntryStreamState.done:
+          assert(
+            false,
+            'Unreachable: There should not be a currentStream in a done state, '
+            'as the stream is no longer current at that point.',
+          );
+          break;
       }
     }
   }
@@ -418,59 +437,14 @@
   /// Publishes an library-internal stream for users.
   ///
   /// This adds a check to ensure that the stream we're exposing has the
-  /// expected length. It also sets the [_underlyingContentStream] field when
-  /// the stream starts and resets it when it's done.
-  Stream<List<int>> _publishStream(Stream<List<int>> stream, int length) {
+  /// expected length. It also sets the [_currentStream] field and resets it
+  /// when it's done.
+  Stream<List<int>> _publishStream(Stream<Uint8List> stream, int length) {
     // There can only be one content stream at a time. This precondition is
     // checked by _prepareToReadHeaders.
-    assert(_underlyingContentStream == null);
-    Stream<List<int>>? thisStream;
+    assert(_currentStream == null);
 
-    return thisStream =
-        _underlyingContentStream = Stream.eventTransformed(stream, (sink) {
-      // This callback is called when we have a listener. Make sure that, at
-      // this point, this stream is still the active content stream.
-      // If users store the contents of a tar header, then read more tar
-      // entries, and finally try to read the stream of the old contents, they'd
-      // get an exception about the straem already being listened to.
-      // This can be a bit confusing, so this check enables a better error UX.
-      if (thisStream != _underlyingContentStream) {
-        throw StateError(
-          'Tried listening to an outdated tar entry. \n'
-          'As all tar entries found by a reader are backed by a single source '
-          'stream, only the latest tar entry can be read. It looks like you '
-          'stored the results of `tarEntry.contents` somewhere, called '
-          '`reader.moveNext()` and then read the contents of the previous '
-          'entry.\n'
-          'For more details, including a discussion of workarounds, see '
-          'https://github.com/simolus3/tar/issues/18',
-        );
-      } else if (_listenedToContentsOnce) {
-        throw StateError(
-          'A tar entry has been listened to multiple times. \n'
-          'As all tar entries are read from what\'s likely a single-'
-          'subscription stream, this is unsupported. If you didn\'t read a tar '
-          'entry multiple times yourself, perhaps you\'ve called `moveNext()` '
-          'before reading contents?',
-        );
-      }
-
-      _listenedToContentsOnce = true;
-
-      late _OutgoingStreamGuard guard;
-      return guard = _OutgoingStreamGuard(
-        length,
-        sink,
-        // Reset state when the stream is done. This will only be called when
-        // the stream is done, not when a listener cancels.
-        () {
-          _underlyingContentStream = null;
-          if (guard.hadError) {
-            cancel();
-          }
-        },
-      );
-    });
+    return _currentStream = _CurrentEntryStream(this, stream, length);
   }
 
   /// Checks the PAX headers for GNU sparse headers.
@@ -881,23 +855,96 @@
   }
 }
 
-/// Event-sink tracking the length of emitted tar entry streams.
+enum _EntryStreamState {
+  preListen,
+  subscriptionActive,
+  cancelled,
+  done,
+}
+
+/// The underlying content stream for the [TarReader._current] entry. Draining
+/// this stream will move the tar reader to the beginning of the next file.
 ///
-/// [ChunkedStreamReader.readStream] might return a stream shorter than
-/// expected. That indicates an invalid tar file though, since the correct size
-/// is stored in the header.
-class _OutgoingStreamGuard extends EventSink<Uint8List> {
-  int remainingContentSize;
-  int remainingPaddingSize;
+/// This is not the same as `_current.stream` for sparse files, which are
+/// reported as expanded through [TarEntry.contents].
+/// For that reason, we prefer to drain this stream when skipping a tar entry.
+/// When we know we're skipping data, there's no point expanding sparse holes.
+///
+/// Draining this stream will set the [TarReader._currentStream] field back to
+/// null. There can only be one content stream at the time.
+class _CurrentEntryStream extends Stream<List<int>> {
+  _EntryStreamState state = _EntryStreamState.preListen;
 
-  final EventSink<List<int>> out;
-  void Function() onDone;
+  final TarReader _reader;
+  final Stream<Uint8List> _source;
 
-  bool hadError = false;
-  bool isInContent = true;
+  final StreamController<List<int>> _listener = StreamController(sync: true);
+  // ignore: cancel_subscriptions
+  StreamSubscription<List<int>>? _sourceSubscription;
 
-  _OutgoingStreamGuard(this.remainingContentSize, this.out, this.onDone)
-      : remainingPaddingSize = _paddingFor(remainingContentSize);
+  int _remainingContentSize;
+  int _remainingPaddingSize;
+  bool _hadError = false;
+  bool _isInContent = true;
+
+  _CurrentEntryStream(this._reader, this._source, this._remainingContentSize)
+      : _remainingPaddingSize = _paddingFor(_remainingContentSize);
+
+  @override
+  StreamSubscription<List<int>> listen(void Function(List<int> event)? onData,
+      {Function? onError, void Function()? onDone, bool? cancelOnError}) {
+    // Make sure that this entry is still the current one: If users store the
+    // contents of a tar entry, then read more tar entries, and finally try to
+    // read the stream of the old contents, they'd get an exception about the
+    // stream already being listened to.
+    // This can be a bit confusing, so this check enables a better error UX.
+    if (_reader._currentStream != this) {
+      throw StateError(
+        'Tried listening to an outdated tar entry. \n'
+        'As all tar entries found by a reader are backed by a single source '
+        'stream, only the latest tar entry can be read. It looks like you '
+        'stored the results of `tarEntry.contents` somewhere, called '
+        '`reader.moveNext()` and then read the contents of the previous '
+        'entry.\n'
+        'For more details, including a discussion of workarounds, see '
+        'https://github.com/simolus3/tar/issues/18',
+      );
+    } else if (state != _EntryStreamState.preListen) {
+      throw StateError(
+        'A tar entry has been listened to multiple times. \n'
+        'As all tar entries are read from what\'s likely a single-'
+        'subscription stream, this is unsupported. If you didn\'t read a tar '
+        'entry multiple times yourself, perhaps you\'ve called `moveNext()` '
+        'before reading contents?',
+      );
+    }
+
+    // Now we have a listener, so
+    state = _EntryStreamState.subscriptionActive;
+    // ignore: cancel_subscriptions
+    final sub = _sourceSubscription = _source.listen(
+      _forwardData,
+      onError: _forwardError,
+      onDone: _forwardDone,
+    );
+
+    _listener
+      ..onPause = sub.pause
+      ..onResume = sub.resume
+      ..onCancel = () {
+        // Pause the source subscription. When reading the next entry, the tar
+        // reader will drain the remaining source stream.
+        sub.pause();
+        state = _EntryStreamState.cancelled;
+      };
+
+    return _listener.stream.listen(
+      onData,
+      onError: onError,
+      onDone: onDone,
+      cancelOnError: cancelOnError,
+    );
+  }
 
   static int _paddingFor(int contentSize) {
     final offsetInLastBlock = contentSize.toUnsigned(blockSizeLog2);
@@ -907,47 +954,59 @@
     return 0;
   }
 
-  @override
-  void add(Uint8List event) {
-    if (isInContent) {
-      if (event.length <= remainingContentSize) {
+  void _assertInStateForForwarding() {
+    assert(state == _EntryStreamState.subscriptionActive &&
+        _listener.hasListener &&
+        !_listener.isPaused);
+  }
+
+  void _forwardData(Uint8List event) {
+    _assertInStateForForwarding();
+
+    if (_isInContent) {
+      if (event.length <= _remainingContentSize) {
         // We can fully add this chunk as it consists entirely of data
-        out.add(event);
-        remainingContentSize -= event.length;
+        _listener.add(event);
+        _remainingContentSize -= event.length;
       } else {
         // We can add the first bytes as content, the others are padding that we
         // shouldn't emit
-        out.add(event.sublistView(0, remainingContentSize));
-        isInContent = false;
-        remainingPaddingSize -= event.length - remainingContentSize;
-        remainingContentSize = 0;
+        _listener.add(event.sublistView(0, _remainingContentSize));
+        _isInContent = false;
+        _remainingPaddingSize -= event.length - _remainingContentSize;
+        _remainingContentSize = 0;
       }
     } else {
       // Ok, the entire event is padding
-      remainingPaddingSize -= event.length;
+      _remainingPaddingSize -= event.length;
     }
 
     // The underlying stream comes from pkg:tar, so if we get too many bytes
     // that's a bug in this package.
-    assert(remainingPaddingSize >= 0, 'Stream emitted to many bytes');
+    assert(_remainingPaddingSize >= 0, 'Stream emitted to many bytes');
   }
 
-  @override
-  void addError(Object error, [StackTrace? stackTrace]) {
-    hadError = true;
-    out.addError(error, stackTrace);
+  void _forwardError(Object error, StackTrace trace) {
+    _assertInStateForForwarding();
+
+    _hadError = true;
+    _listener.addError(error, trace);
   }
 
-  @override
-  void close() {
+  void _forwardDone() {
+    _assertInStateForForwarding();
+
+    // Now that the source stream is done, reset the stream state on the reader.
+    state = _EntryStreamState.done;
+    _sourceSubscription = null;
+    _reader._currentStream = null;
+
     // If the stream stopped after an error, the user is already aware that
     // something is wrong.
-    if (remainingContentSize > 0 && !hadError) {
-      out.addError(
+    if (_remainingContentSize > 0 && !_hadError) {
+      _listener.addError(
           TarException('Unexpected end of tar file'), StackTrace.current);
     }
-
-    onDone();
-    out.close();
+    _listener.close();
   }
 }