blob: 2ea461130874877fd83ee55a8fb47edfa0d73378 [file] [log] [blame]
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';
import 'package:charcode/ascii.dart';
import 'common.dart';
import 'entry.dart';
import 'package:synchronized/synchronized.dart';
class _WritingTransformer extends StreamTransformerBase<Entry, List<int>> {
const _WritingTransformer();
@override
Stream<List<int>> bind(Stream<Entry> stream) {
// sync because the controller proxies another stream
final controller = StreamController<List<int>>(sync: true);
controller.onListen = () {
stream.pipe(WritingSink(controller));
};
return controller.stream;
}
}
/// A stream transformer writing tar entries as byte streams.
///
/// When piping the resulting stream into a [StreamConsumer], consider using
/// [WritingSink] directly.
const writer = _WritingTransformer();
/// A sink emitting encoded tar files.
///
/// For instance, you can use this to write a tar file:
///
/// ```dart
/// import 'package:tar/tar.dart' as tar;
///
/// Future<void> main() async {
/// Stream<tar.TarEntry> entries = Stream.value(
/// tar.MemoryEntry(
/// tar.Header(
/// name: 'example.txt',
/// mode: int.parse('644', radix: 8),
/// ),
/// utf8.encode('This is the content of the tar file'),
/// ),
/// );
///
/// final output = File('/tmp/test.tar').openWrite();
/// await entries.pipe(tar.WritingSink(output));
/// }
/// ```
///
/// Note that, if you don't set the [Header.size], outgoing tar entries need to
/// be buffered once, which decreases performance.
///
/// See also:
/// - [writer], a stream transformer using this sink
/// - [StreamSink]
class WritingSink extends StreamSink<Entry> {
final StreamSink<List<int>> _output;
int _paxHeaderCount = 0;
bool _closed = false;
final Completer<void> _done = Completer();
int _pendingOperations = 0;
final Lock _lock = Lock();
WritingSink(this._output);
@override
Future get done => _done.future;
@override
Future<void> add(Entry event) {
if (_closed) {
throw StateError('Cannot add event after close was called');
}
return _doWork(() => _safeAdd(event));
}
Future<void> _doWork(FutureOr Function() work) async {
_pendingOperations++;
try {
await _lock.synchronized(work);
} catch (e, s) {
_output.addError(e, s);
} finally {
_pendingOperations--;
}
if (_closed && _pendingOperations == 0) {
_done.complete(_output.close());
}
}
Future<void> _safeAdd(Entry event) async {
final header = event.header;
var size = header.size;
Uint8List? bufferedData;
if (size < 0) {
final builder = BytesBuilder();
await event.forEach(builder.add);
bufferedData = builder.takeBytes();
size = bufferedData.length;
}
var nameBytes = utf8.encode(header.name);
var linkBytes = utf8.encode(header.linkName ?? '');
// We only get 100 chars for the name and link name. If they are longer, we
// have to insert an entry just to store the names.
final paxHeader = <String, List<int>>{};
if (nameBytes.length > 100) {
paxHeader['path'] = nameBytes;
nameBytes = nameBytes.sublist(0, 100);
}
if (linkBytes.length > 100) {
paxHeader['linkpath'] = linkBytes;
linkBytes = linkBytes.sublist(0, 100);
}
if (paxHeader.isNotEmpty) {
await _writePaxHeader(paxHeader);
}
final headerBlock = Uint8List(blockSize)
..setAll(0, nameBytes)
..setUint(header.mode, 100, 8)
..setUint(header.uid, 108, 8)
..setUint(header.gid, 116, 8)
..setUint(size, 124, 12)
..setUint(header.lastModified.millisecondsSinceEpoch ~/ 1000, 136, 12)
..[156] = header.type.char
..setAll(157, linkBytes)
..setAll(257, magic)
..setUint(0, 263, 2)
// To calculate the checksum, we first fill the checksum range with spaces
..setAll(148, List.filled(8, $space));
// Then, we take the sum of the header
var checksum = 0;
for (final byte in headerBlock) {
checksum += byte;
}
headerBlock..setUint(checksum, 148, 8);
_output.add(headerBlock);
// Write content.
if (bufferedData != null) {
_output.add(bufferedData);
} else {
await for (final chunk in event) {
_output.add(chunk);
}
}
final padding = -size % blockSize;
_output.add(Uint8List(padding));
}
/// Writes an extended pax header.
///
/// https://pubs.opengroup.org/onlinepubs/9699919799/utilities/pax.html#tag_20_92_13_03
Future<void> _writePaxHeader(Map<String, List<int>> values) {
final buffer = BytesBuilder();
// format of each entry: "%d %s=%s\n", <length>, <keyword>, <value>
// note that the length includes the trailing \n and the length description
// itself.
values.forEach((key, value) {
final encodedKey = utf8.encode(key);
// +3 for the whitespace, the equals and the \n
final payloadLength = encodedKey.length + value.length + 3;
var indicatedLength = payloadLength;
// The indicated length contains the length (in decimals) itself. So if
// we had payloadLength=9, then we'd prefix a 9 at which point the whole
// string would have a length of 10. If that happens, increment length.
var actualLength = payloadLength + indicatedLength.toString().length;
while (actualLength != indicatedLength) {
indicatedLength++;
indicatedLength.toString().length;
}
// With that sorted out, let's add the line
buffer
..add(utf8.encode(indicatedLength.toString()))
..addByte($space)
..add(encodedKey)
..addByte($equal)
..add(value)
..addByte($lf); // \n
});
final file = MemoryEntry(
Header(
name: 'PaxHeader/${_paxHeaderCount++}',
mode: 0,
type: FileType.extendedHeader,
),
buffer.takeBytes(),
);
return _safeAdd(file);
}
@override
void addError(Object error, [StackTrace? stackTrace]) {
_output.addError(error, stackTrace);
}
@override
Future addStream(Stream<Entry> stream) async {
await for (final entry in stream) {
await add(entry);
}
}
@override
Future close() async {
if (!_closed) {
_closed = true;
// Add two empty blocks at the end.
await _doWork(() {
final empty = Uint8List(blockSize);
_output.add(empty);
_output.add(empty);
});
}
return done;
}
}
extension on Uint8List {
void setUint(int value, int position, int length) {
// Values are encoded as octal string, terminated and left-padded with
// space chars.
// Set terminating space char.
this[position + length - 1] = $space;
// Write as octal value, we write from right to left
var number = value;
var needsExplicitZero = number == 0;
for (var pos = position + length - 2; pos >= position; pos--) {
if (number != 0) {
// Write the last octal digit of the number (e.g. the last 4 bits)
this[pos] = (number & 7) + $0;
// then drop the last digit (divide by 8 = 2³)
number >>= 3;
} else if (needsExplicitZero) {
this[pos] = $0;
needsExplicitZero = false;
} else {
// done, left-pad with spaces
this[pos] = $space;
}
}
}
}
extension FileTypeChar on FileType {
int get char {
switch (this) {
case FileType.regular:
return aregtype;
case FileType.link:
return linktype;
case FileType.directory:
return dirtype;
case FileType.extendedHeader:
return extendedHeader;
case FileType.globalExtended:
return globalExtended;
case FileType.unsupported:
case FileType.gnuLongLinkName:
case FileType.gnuLongName:
throw UnsupportedError('Unsupported file type');
}
}
}