blob: a2198e88782998de1e9cf0bf96c6886df9880b2f [file] [log] [blame]
// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
library input_stream;
import 'archive.dart' as archive;
import 'entry.dart';
import 'read_request.dart';
import 'utils.dart';
/**
* A stream of [ArchiveEntry]s being read from an archive.
*
* This is accessible via [ArchiveReader].
*/
class ArchiveInputStream {
/**
* The id of the underlying archive.
*
* This will be set to null once the input stream has finished reading from
* the archive.
*/
final Reference<int> _id;
/** A [Completer] that will fire once the [_onEntry] callback is set. */
final Completer<Function> _onEntryCompleter;
/** The callback to call when the input stream is closed. */
Function _onClosed;
/** The callback to call when an error occurs. */
Function _onError;
/** The entry that is currently eligible to read data from the archive. */
ArchiveEntry _currentEntry;
ArchiveInputStream(int id)
: _id = new Reference<int>(id),
_onEntryCompleter = new Completer<Function>() {
var future = _consumeHeaders();
future.handleException((e) {
if (_onError != null) {
_onError(e, future.stackTrace);
return true;
} else {
throw e;
}
});
future.then((_) {
close();
if (_onClosed != null) _onClosed();
});
attachFinalizer(this, (id) {
if (id.value != null) call(FREE, id.value).then(() {});
}, _id);
}
/** Whether this stream has finished reading entries. */
bool get closed => _id.value == null;
/**
* Reads the entire contents of the archive at once.
*
* Note that this is mutually exclusive with reading individual entries using
* [onEntry].
*/
Future<List<CompleteArchiveEntry>> readAll() {
var completer = new Completer<List<Future<CompleteArchiveEntry>>>();
var result = <Future<CompleteArchiveEntry>>[];
this.onEntry = (entry) => result.add(entry.readAll());
this.onError = (e, stack) => completer.completeException(e, stack);
this.onClosed = () => completer.complete(result);
return completer.future.chain(Futures.wait);
}
/**
* Sets a callback to call when a new entry is read from the archive.
*
* The [ArchiveEntry] that's read from an archive initially only contains
* header information such as the filename and permissions. To get the actual
* data contained in the entry, use [ArchiveEntry.openInputStream].
*
* Since the entries are read in sequence from the archive, the data stream
* for one entry must be opened before the next entry is read from the
* archive. The next entry will not be read until the return value of
* [callback] has completed.
*
* If [callback] calls [ArchiveEntry.openInputStream] before it returns, or if
* it doesn't want to read the contents of [entry], it can return null.
*/
void set onEntry(Future callback(ArchiveEntry entry)) {
_onEntryCompleter.complete(callback);
}
/**
* Sets a callback to call when the input stream is done emitting entries.
*/
void set onClosed(void callback()) {
_onClosed = callback;
}
/**
* Sets a callback to call if an error occurs while extracting the archive.
*
* [e] is the error that occured and [stack] is the stack trace of the error.
*/
void set onError(void callback(e, stack)) {
_onError = callback;
}
/**
* Closes the input stream. No more entries will be emitted.
*/
void close() {
if (closed) return;
call(FREE, _id.value).then((_) {});
_id.value = null;
if (_currentEntry != null) _currentEntry.close();
if (!_onEntryCompleter.future.isComplete) _onEntryCompleter.complete(null);
}
/**
* Consumes and emits all [ArchiveEntries] in this archive.
*/
Future _consumeHeaders() {
if (closed) return new Future.immediate(null);
var data;
return call(NEXT_HEADER, _id.value).chain((_data) {
data = _data;
if (data == null) return new Future.immediate(null);
return _emit(new archive.ArchiveEntry.internal(data, _id.value)).
chain((_) => _consumeHeaders());
});
}
/**
* Emits [entry] to the [onEntry] callback. Returns a [Future] that will
* complete once the callback's return value completes and the entry's data
* has been fully consumed.
*/
Future _emit(ArchiveEntry entry) {
_currentEntry = entry;
var future = _onEntryCompleter.future.chain((onEntry) {
if (closed) return new Future.immediate(null);
var result = onEntry(entry);
if (result is Future) return result;
return new Future.immediate(null);
}).chain((_) {
if (entry.isInputOpen) return entry.inputComplete;
return new Future.immediate(null);
});
future.onComplete((_) {
_currentEntry = null;
entry.close();
});
return future;
}
}