blob: 8a3fb048b0b27f26132e91812d07da7b54907705 [file] [log] [blame]
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:collection';
import 'dart:io';
import '../constants.dart';
import '../worker_protocol.pb.dart';
import 'driver_connection.dart';
typedef Future<Process> SpawnWorker();
/// A driver for talking to a bazel worker.
///
/// This allows you to use any binary that supports the bazel worker protocol in
/// the same way that bazel would, but from another dart process instead.
class BazelWorkerDriver {
/// Idle worker processes.
final _idleWorkers = <Process>[];
/// The maximum number of idle workers at any given time.
final int _maxIdleWorkers;
/// The maximum number of times to retry a [WorkAttempt] if there is an error.
final int _maxRetries;
/// The maximum number of concurrent workers to run at any given time.
final int _maxWorkers;
/// The number of currently active workers.
int get _numWorkers => _readyWorkers.length + _spawningWorkers.length;
/// All workers that are fully spawned and ready to handle work.
final _readyWorkers = <Process>[];
/// All workers that are in the process of being spawned.
final _spawningWorkers = <Future<Process>>[];
/// Work requests that haven't been started yet.
final _workQueue = new Queue<_WorkAttempt>();
/// Factory method that spawns a worker process.
final SpawnWorker _spawnWorker;
BazelWorkerDriver(this._spawnWorker,
{int maxIdleWorkers, int maxWorkers, int maxRetries})
: this._maxIdleWorkers = maxIdleWorkers ?? 4,
this._maxWorkers = maxWorkers ?? 4,
this._maxRetries = maxRetries ?? 4;
/// Waits for an available worker, and then sends [WorkRequest] to it.
///
/// If [trackWork] is provided it will be invoked with a [Future] once the
/// [request] has been actually sent to the worker. This allows the caller
/// to determine when actual work is being done versus just waiting for an
/// available worker.
Future<WorkResponse> doWork(WorkRequest request,
{Function(Future<WorkResponse>) trackWork}) {
var attempt = new _WorkAttempt(request, trackWork: trackWork);
_workQueue.add(attempt);
_runWorkQueue();
return attempt.response;
}
/// Calls `kill` on all worker processes.
Future terminateWorkers() async {
for (var worker in _readyWorkers.toList()) {
_killWorker(worker);
}
await Future.wait(_spawningWorkers.map((worker) async {
_killWorker(await worker);
}));
}
/// Runs as many items in [_workQueue] as possible given the number of
/// available workers.
///
/// Will spawn additional workers until [_maxWorkers] has been reached.
///
/// This method synchronously drains the [_workQueue] and [_idleWorkers], but
/// some tasks may not actually start right away if they need to wait for a
/// worker to spin up.
void _runWorkQueue() {
// Bail out conditions, we will continue to call ourselves indefinitely
// until one of these is met.
if (_workQueue.isEmpty) return;
if (_numWorkers == _maxWorkers && _idleWorkers.isEmpty) return;
if (_numWorkers > _maxWorkers) {
throw new StateError('Internal error, created to many workers. Please '
'file a bug at https://github.com/dart-lang/bazel_worker/issues/new');
}
// At this point we definitely want to run a task, we just need to decide
// whether or not we need to start up a new worker.
var attempt = _workQueue.removeFirst();
if (_idleWorkers.isNotEmpty) {
_runWorker(_idleWorkers.removeLast(), attempt);
} else {
// No need to block here, we want to continue to synchronously drain the
// work queue.
var futureWorker = _spawnWorker();
_spawningWorkers.add(futureWorker);
futureWorker.then((worker) {
_spawningWorkers.remove(futureWorker);
_readyWorkers.add(worker);
_workerConnections[worker] = new StdDriverConnection.forWorker(worker);
_runWorker(worker, attempt);
// When the worker exits we should retry running the work queue in case
// there is more work to be done. This is primarily just a defensive
// thing but is cheap to do.
// exitCode can be null: https://github.com/dart-lang/sdk/issues/35874
worker.exitCode?.then((exitCode) {
_idleWorkers.remove(worker);
_readyWorkers.remove(worker);
_runWorkQueue();
});
});
}
// Recursively calls itself until one of the bail out conditions are met.
_runWorkQueue();
}
/// Sends [request] to [worker].
///
/// Once the worker responds then it will be added back to the pool of idle
/// workers.
void _runWorker(Process worker, _WorkAttempt attempt) {
bool rescheduled = false;
runZoned(() async {
var connection = _workerConnections[worker];
connection.writeRequest(attempt.request);
var responseFuture = connection.readResponse();
if (attempt.trackWork != null) {
attempt.trackWork(responseFuture);
}
var response = await responseFuture;
// It is possible for us to complete with an error response due to an
// unhandled async error before we get here.
if (!attempt.responseCompleter.isCompleted) {
if (response == null) {
rescheduled = _tryReschedule(attempt);
if (rescheduled) return;
stderr.writeln('Failed to run request ${attempt.request}');
response = new WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output =
'Invalid response from worker, this probably means it wrote '
'invalid output or died.';
}
attempt.responseCompleter.complete(response);
_cleanUp(worker);
}
}, onError: (e, s) {
// Note that we don't need to do additional cleanup here on failures. If
// the worker dies that is already handled in a generic fashion, we just
// need to make sure we complete with a valid response.
if (!attempt.responseCompleter.isCompleted) {
rescheduled = _tryReschedule(attempt);
if (rescheduled) return;
var response = new WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output = 'Error running worker:\n$e\n$s';
attempt.responseCompleter.complete(response);
_cleanUp(worker);
}
});
}
/// Performs post-work cleanup for [worker].
void _cleanUp(Process worker) {
// If the worker crashes, it won't be in `_readyWorkers` any more, and
// we don't want to add it to _idleWorkers.
if (_readyWorkers.contains(worker)) {
_idleWorkers.add(worker);
}
// Do additional work if available.
_runWorkQueue();
// If the worker wasn't immediately used we might have to many idle
// workers now, kill one if necessary.
if (_idleWorkers.length > _maxIdleWorkers) {
// Note that whenever we spawn a worker we listen for its exit code
// and clean it up so we don't need to do that here.
var worker = _idleWorkers.removeLast();
_killWorker(worker);
}
}
/// Attempts to reschedule a failed [attempt].
///
/// Returns whether or not the job was successfully rescheduled.
bool _tryReschedule(_WorkAttempt attempt) {
if (attempt.timesRetried >= _maxRetries) return false;
stderr.writeln('Rescheduling failed request...');
attempt.timesRetried++;
_workQueue.add(attempt);
_runWorkQueue();
return true;
}
void _killWorker(Process worker) {
_workerConnections[worker].cancel();
_readyWorkers.remove(worker);
_idleWorkers.remove(worker);
worker.kill();
}
}
/// Encapsulates an attempt to fulfill a [WorkRequest], a completer for the
/// [WorkResponse], and the number of times it has been retried.
class _WorkAttempt {
final WorkRequest request;
final responseCompleter = new Completer<WorkResponse>();
final Function(Future<WorkResponse>) trackWork;
Future<WorkResponse> get response => responseCompleter.future;
int timesRetried = 0;
_WorkAttempt(this.request, {this.trackWork});
}
final _workerConnections = new Expando<DriverConnection>('connection');