blob: 06192e766ea4fc122dc6318637fbe0bd091448dd [file] [log] [blame]
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
library barback.file_pool;
import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'dart:io';
/// Manages a pool of files that are opened for reading to cope with maximum
/// file descriptor limits.
///
/// If a file cannot be opened because too many files are already open, this
/// will defer the open until a previously opened file is closed and then try
/// again. If this doesn't succeed after a certain amount of time, the open
/// will fail and the original "too many files" exception will be thrown.
class FilePool {
/// [_FileReader]s whose last [listen] call failed and that are waiting for
/// another file to close so they can be retried.
final _pendingListens = new Queue<_FileReader>();
/// Opens [file] for reading.
///
/// When the returned stream is listened to, if there are too many files
/// open, this will wait for a previously opened file to be closed and then
/// try again.
Stream<List<int>> openRead(File file) => new _FileReader(this, file).stream;
/// Reads [file] as a string using [encoding].
///
/// If there are too many files open and the read fails, this will wait for
/// a previously opened file to be closed and then try again.
Future<String> readAsString(File file, Encoding encoding) {
return _readAsBytes(file).then(encoding.decode);
}
/// Reads [file] as a list of bytes, using [openRead] to retry if there are
/// failures.
Future<List<int>> _readAsBytes(File file) {
var completer = new Completer<List<int>>();
var builder = new BytesBuilder();
openRead(file).listen(builder.add, onDone: () {
completer.complete(builder.takeBytes());
}, onError: completer.completeError, cancelOnError: true);
return completer.future;
}
/// Tries to re-listen to the next pending file reader if there are any.
void _retryPendingListen() {
if (_pendingListens.isEmpty) return;
var pending = _pendingListens.removeFirst();
pending._listen();
}
}
/// Wraps a raw file reading stream in a stream that handles "too many files"
/// errors.
///
/// This also notifies the pool when the underlying file stream is closed so
/// that it can try to open a waiting file.
class _FileReader {
final FilePool _pool;
final File _file;
/// The underyling file stream.
Stream<List<int>> _fileStream;
/// The controller for the stream wrapper.
StreamController<List<int>> _controller;
/// The current subscription to the underlying file stream.
///
/// This will only be non-null while the wrapped stream is being listened to.
StreamSubscription _subscription;
/// The timeout timer.
///
/// If this timer fires before the listen is retried, it gives up and throws
/// the original error.
Timer _timer;
/// When a [listen] call has thrown a "too many files" error, this will be
/// the exception object.
Object _exception;
/// When a [listen] call has thrown a "too many files" error, this will be
/// the captured stack trace.
Object _stackTrace;
/// The wrapped stream that the file can be read from.
Stream<List<int>> get stream => _controller.stream;
_FileReader(this._pool, this._file) {
_controller = new StreamController<List<int>>(onListen: _listen,
onPause: () {
_subscription.pause();
}, onResume: () {
_subscription.resume();
}, onCancel: () {
if (_subscription != null) _subscription.cancel();
_subscription = null;
}, sync: true);
}
/// Starts listening to the underlying file stream.
void _listen() {
if (_timer != null) {
_timer.cancel();
_timer = null;
}
_exception = null;
_stackTrace = null;
_fileStream = _file.openRead();
_subscription = _fileStream.listen(_controller.add,
onError: _onError, onDone: _onDone, cancelOnError: true);
}
/// Handles an error from the underlying file stream.
///
/// "Too many file" errors are caught so that we can retry later. Other
/// errors are passed to the wrapped stream and the underlying stream
/// subscription is canceled.
void _onError(Object exception, Object stackTrace) {
assert(_subscription != null);
assert(_exception == null);
// The subscription is canceled after an error.
_subscription = null;
// We only handle "Too many open files errors".
if (exception is! FileException || exception.osError.errorCode != 24) {
_controller.addError(exception, stackTrace);
return;
}
_exception = exception;
_stackTrace = stackTrace;
// We'll try to defer the listen in the hopes that another file will close
// and we can try. If that doesn't happen after a while, give up and just
// throw the original error.
// TODO(rnystrom): The point of this timer is to not get stuck forever in
// a deadlock scenario. But this can also erroneously fire if there is a
// large number of slow reads that do incrementally finish. A file may not
// move to the front of the queue in time even though it is making
// progress. A better solution is to have a single deadlock timer on the
// FilePool itself that starts when a pending listen is enqueued and checks
// to see if progress has been made when it fires.
_timer = new Timer(new Duration(seconds: 60), _onTimeout);
// Tell the pool that this file is waiting.
_pool._pendingListens.add(this);
}
/// Handles the underlying file stream finishing.
void _onDone() {
_subscription = null;
_controller.close();
_pool._retryPendingListen();
}
/// If this file failed to be read because there were too many open files and
/// no file was closed in time to retry, this handles giving up.
void _onTimeout() {
assert(_subscription == null);
assert(_exception != null);
// We failed to open in time, so just fail with the original error.
_pool._pendingListens.remove(this);
_controller.addError(_exception, _stackTrace);
_controller.close();
_timer = null;
_exception = null;
_stackTrace = null;
}
}