blob: 78bec3bc7e5250e3ab4859431cd5f1432da3f94b [file] [log] [blame]
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:collection';
import 'dart:io';
import '../constants.dart';
import '../worker_protocol.pb.dart';
import 'driver_connection.dart';
typedef Future<Process> SpawnWorker();
/// A driver for talking to a bazel worker.
/// This allows you to use any binary that supports the bazel worker protocol in
/// the same way that bazel would, but from another dart process instead.
class BazelWorkerDriver {
/// Idle worker processes.
final _idleWorkers = <Process>[];
/// The maximum number of idle workers at any given time.
final int _maxIdleWorkers;
/// The maximum number of times to retry a [WorkAttempt] if there is an error.
final int _maxRetries;
/// The maximum number of concurrent workers to run at any given time.
final int _maxWorkers;
/// The number of currently active workers.
int get _numWorkers => _readyWorkers.length + _spawningWorkers.length;
/// All workers that are fully spawned and ready to handle work.
final _readyWorkers = <Process>[];
/// All workers that are in the process of being spawned.
final _spawningWorkers = <Future<Process>>[];
/// Work requests that haven't been started yet.
final _workQueue = new Queue<_WorkAttempt>();
/// Factory method that spawns a worker process.
final SpawnWorker _spawnWorker;
{int maxIdleWorkers, int maxWorkers, int maxRetries})
: this._maxIdleWorkers = maxIdleWorkers ?? 4,
this._maxWorkers = maxWorkers ?? 4,
this._maxRetries = maxRetries ?? 4;
Future<WorkResponse> doWork(WorkRequest request) {
var attempt = new _WorkAttempt(request);
return attempt.response;
/// Calls `kill` on all worker processes.
Future terminateWorkers() async {
for (var worker in _readyWorkers) {
await Future.wait( async {
(await worker).kill();
/// Runs as many items in [_workQueue] as possible given the number of
/// available workers.
/// Will spawn additional workers until [_maxWorkers] has been reached.
/// This method synchronously drains the [_workQueue] and [_idleWorkers], but
/// some tasks may not actually start right away if they need to wait for a
/// worker to spin up.
void _runWorkQueue() {
// Bail out conditions, we will continue to call ourselves indefinitely
// until one of these is met.
if (_workQueue.isEmpty) return;
if (_numWorkers == _maxWorkers && _idleWorkers.isEmpty) return;
if (_numWorkers > _maxWorkers) {
throw new StateError('Internal error, created to many workers. Please '
'file a bug at');
// At this point we definitely want to run a task, we just need to decide
// whether or not we need to start up a new worker.
var attempt = _workQueue.removeFirst();
if (_idleWorkers.isNotEmpty) {
_runWorker(_idleWorkers.removeLast(), attempt);
} else {
// No need to block here, we want to continue to synchronously drain the
// work queue.
var futureWorker = _spawnWorker();
futureWorker.then((worker) {
_workerConnections[worker] = new StdDriverConnection.forWorker(worker);
_runWorker(worker, attempt);
// When the worker exits we should retry running the work queue in case
// there is more work to be done. This is primarily just a defensive
// thing but is cheap to do.
worker.exitCode.then((exitCode) {
// Recursively calls itself until one of the bail out conditions are met.
/// Sends [request] to [worker].
/// Once the worker responds then it will be added back to the pool of idle
/// workers.
void _runWorker(Process worker, _WorkAttempt attempt) {
bool rescheduled = false;
runZoned(() async {
var connection = _workerConnections[worker];
var response = await connection.readResponse();
// It is possible for us to complete with an error response due to an
// unhandled async error before we get here.
if (!attempt.responseCompleter.isCompleted) {
if (response == null) {
rescheduled = _tryReschedule(attempt);
if (rescheduled) return;
stderr.writeln('Failed to run request ${attempt.request}');
response = new WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output =
'Invalid response from worker, this probably means it wrote '
'invalid output or died.';
}, onError: (e, s) {
// Note that we don't need to do additional cleanup here on failures. If
// the worker dies that is already handled in a generic fashion, we just
// need to make sure we complete with a valid response.
if (!attempt.responseCompleter.isCompleted) {
rescheduled = _tryReschedule(attempt);
if (rescheduled) return;
var response = new WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output = 'Error running worker:\n$e\n$s';
/// Performs post-work cleanup for [worker].
void _cleanUp(Process worker) {
// If the worker crashes, it won't be in `_readyWorkers` any more, and
// we don't want to add it to _idleWorkers.
if (_readyWorkers.contains(worker)) {
// Do additional work if available.
// If the worker wasn't immediately used we might have to many idle
// workers now, kill one if necessary.
if (_idleWorkers.length > _maxIdleWorkers) {
// Note that whenever we spawn a worker we listen for its exit code
// and clean it up so we don't need to do that here.
var worker = _idleWorkers.removeLast();
/// Attempts to reschedule a failed [attempt].
/// Returns whether or not the job was successfully rescheduled.
bool _tryReschedule(_WorkAttempt attempt) {
if (attempt.timesRetried >= _maxRetries) return false;
stderr.writeln('Rescheduling failed request...');
return true;
/// Encapsulates an attempt to fulfill a [WorkRequest], a completer for the
/// [WorkResponse], and the number of times it has been retried.
class _WorkAttempt {
final WorkRequest request;
final responseCompleter = new Completer<WorkResponse>();
Future<WorkResponse> get response => responseCompleter.future;
int timesRetried = 0;
final _workerConnections = new Expando<DriverConnection>('connection');