Add retry logic for failed workers (#15)
Also updated changelog/pubspec for 0.1.7 release
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b9c373a..77cd606 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,9 @@
+## 0.1.7
+
+* Update the `BazelWorkerDriver` class to handle worker crashes, and retry work
+ requests. The number of retries is configurable with the new `int maxRetries`
+ optional arg to the `BazelWorkerDriver` constructor.
+
## 0.1.6
* Update the worker_protocol.pb.dart file with the latest proto generator.
diff --git a/lib/src/driver/driver.dart b/lib/src/driver/driver.dart
index 9d0bb85..78bec3b 100644
--- a/lib/src/driver/driver.dart
+++ b/lib/src/driver/driver.dart
@@ -17,18 +17,21 @@
/// This allows you to use any binary that supports the bazel worker protocol in
/// the same way that bazel would, but from another dart process instead.
class BazelWorkerDriver {
+ /// Idle worker processes.
+ final _idleWorkers = <Process>[];
+
/// The maximum number of idle workers at any given time.
final int _maxIdleWorkers;
+ /// The maximum number of times to retry a [WorkAttempt] if there is an error.
+ final int _maxRetries;
+
/// The maximum number of concurrent workers to run at any given time.
final int _maxWorkers;
/// The number of currently active workers.
int get _numWorkers => _readyWorkers.length + _spawningWorkers.length;
- /// Idle worker processes.
- final _idleWorkers = <Process>[];
-
/// All workers that are fully spawned and ready to handle work.
final _readyWorkers = <Process>[];
@@ -36,21 +39,22 @@
final _spawningWorkers = <Future<Process>>[];
/// Work requests that haven't been started yet.
- final _workQueue = new Queue<WorkRequest>();
+ final _workQueue = new Queue<_WorkAttempt>();
/// Factory method that spawns a worker process.
final SpawnWorker _spawnWorker;
- BazelWorkerDriver(this._spawnWorker, {int maxIdleWorkers, int maxWorkers})
+ BazelWorkerDriver(this._spawnWorker,
+ {int maxIdleWorkers, int maxWorkers, int maxRetries})
: this._maxIdleWorkers = maxIdleWorkers ?? 4,
- this._maxWorkers = maxWorkers ?? 4;
+ this._maxWorkers = maxWorkers ?? 4,
+ this._maxRetries = maxRetries ?? 4;
Future<WorkResponse> doWork(WorkRequest request) {
- var responseCompleter = new Completer<WorkResponse>();
- _responseCompleters[request] = responseCompleter;
- _workQueue.add(request);
+ var attempt = new _WorkAttempt(request);
+ _workQueue.add(attempt);
_runWorkQueue();
- return responseCompleter.future;
+ return attempt.response;
}
/// Calls `kill` on all worker processes.
@@ -83,9 +87,9 @@
// At this point we definitely want to run a task, we just need to decide
// whether or not we need to start up a new worker.
- var request = _workQueue.removeFirst();
+ var attempt = _workQueue.removeFirst();
if (_idleWorkers.isNotEmpty) {
- _runWorker(_idleWorkers.removeLast(), request);
+ _runWorker(_idleWorkers.removeLast(), attempt);
} else {
// No need to block here, we want to continue to synchronously drain the
// work queue.
@@ -96,13 +100,15 @@
_readyWorkers.add(worker);
_workerConnections[worker] = new StdDriverConnection.forWorker(worker);
- _runWorker(worker, request);
+ _runWorker(worker, attempt);
// When the worker exits we should retry running the work queue in case
// there is more work to be done. This is primarily just a defensive
// thing but is cheap to do.
- worker.exitCode.then((_) {
+ worker.exitCode.then((exitCode) {
+ _idleWorkers.remove(worker);
_readyWorkers.remove(worker);
+ _spawningWorkers.remove(worker);
_runWorkQueue();
});
});
@@ -115,39 +121,92 @@
///
/// Once the worker responds then it will be added back to the pool of idle
/// workers.
- Future _runWorker(Process worker, WorkRequest request) async {
- try {
+ void _runWorker(Process worker, _WorkAttempt attempt) {
+ bool rescheduled = false;
+
+ runZoned(() async {
var connection = _workerConnections[worker];
- connection.writeRequest(request);
+ connection.writeRequest(attempt.request);
var response = await connection.readResponse();
- _responseCompleters[request].complete(response);
- // Do additional work if available.
- _idleWorkers.add(worker);
- _runWorkQueue();
-
- // If the worker wasn't immediately used we might have to many idle
- // workers now, kill one if necessary.
- if (_idleWorkers.length > _maxIdleWorkers) {
- // Note that whenever we spawn a worker we listen for its exit code
- // and clean it up so we don't need to do that here.
- var worker = _idleWorkers.removeLast();
- _readyWorkers.remove(worker);
- worker.kill();
+ // It is possible for us to complete with an error response due to an
+ // unhandled async error before we get here.
+ if (!attempt.responseCompleter.isCompleted) {
+ if (response == null) {
+ rescheduled = _tryReschedule(attempt);
+ if (rescheduled) return;
+ stderr.writeln('Failed to run request ${attempt.request}');
+ response = new WorkResponse()
+ ..exitCode = EXIT_CODE_ERROR
+ ..output =
+ 'Invalid response from worker, this probably means it wrote '
+ 'invalid output or died.';
+ }
+ attempt.responseCompleter.complete(response);
+ _cleanUp(worker);
}
- } catch (e, s) {
+ }, onError: (e, s) {
// Note that we don't need to do additional cleanup here on failures. If
// the worker dies that is already handled in a generic fashion, we just
// need to make sure we complete with a valid response.
- if (!_responseCompleters[request].isCompleted) {
+ if (!attempt.responseCompleter.isCompleted) {
+ rescheduled = _tryReschedule(attempt);
+ if (rescheduled) return;
var response = new WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output = 'Error running worker:\n$e\n$s';
- _responseCompleters[request].complete(response);
+ attempt.responseCompleter.complete(response);
+ _cleanUp(worker);
}
+ });
+ }
+
+ /// Performs post-work cleanup for [worker].
+ void _cleanUp(Process worker) {
+ // If the worker crashes, it won't be in `_readyWorkers` any more, and
+ // we don't want to add it to _idleWorkers.
+ if (_readyWorkers.contains(worker)) {
+ _idleWorkers.add(worker);
}
+
+ // Do additional work if available.
+ _runWorkQueue();
+
+ // If the worker wasn't immediately used we might have to many idle
+ // workers now, kill one if necessary.
+ if (_idleWorkers.length > _maxIdleWorkers) {
+ // Note that whenever we spawn a worker we listen for its exit code
+ // and clean it up so we don't need to do that here.
+ var worker = _idleWorkers.removeLast();
+ _readyWorkers.remove(worker);
+ worker.kill();
+ }
+ }
+
+ /// Attempts to reschedule a failed [attempt].
+ ///
+ /// Returns whether or not the job was successfully rescheduled.
+ bool _tryReschedule(_WorkAttempt attempt) {
+ if (attempt.timesRetried >= _maxRetries) return false;
+ stderr.writeln('Rescheduling failed request...');
+ attempt.timesRetried++;
+ _workQueue.add(attempt);
+ _runWorkQueue();
+ return true;
}
}
-final _responseCompleters = new Expando<Completer<WorkResponse>>('response');
-final _workerConnections = new Expando<DriverConnection>('connectin');
+/// Encapsulates an attempt to fulfill a [WorkRequest], a completer for the
+/// [WorkResponse], and the number of times it has been retried.
+class _WorkAttempt {
+ final WorkRequest request;
+ final responseCompleter = new Completer<WorkResponse>();
+
+ Future<WorkResponse> get response => responseCompleter.future;
+
+ int timesRetried = 0;
+
+ _WorkAttempt(this.request);
+}
+
+final _workerConnections = new Expando<DriverConnection>('connection');
diff --git a/pubspec.yaml b/pubspec.yaml
index f4e6514..397ee9b 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: bazel_worker
-version: 0.1.6
+version: 0.1.7
description: Tools for creating a bazel persistent worker.
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/bazel_worker
diff --git a/test/driver_test.dart b/test/driver_test.dart
index 0191c3a..ede5f99 100644
--- a/test/driver_test.dart
+++ b/test/driver_test.dart
@@ -69,10 +69,61 @@
}
});
+ group('failing workers', () {
+ /// A driver which spawns [numBadWorkers] failing workers and then good
+ /// ones after that, and which will retry [maxRetries] times.
+ void createDriver({int maxRetries = 2, int numBadWorkers = 2}) {
+ int numSpawned = 0;
+ driver = new BazelWorkerDriver(
+ () async => new MockWorker(workerLoopFactory: (MockWorker worker) {
+ var connection = new StdAsyncWorkerConnection(
+ inputStream: worker._stdinController.stream,
+ outputStream: worker._stdoutController.sink);
+ if (numSpawned < numBadWorkers) {
+ numSpawned++;
+ return new ThrowingMockWorkerLoop(
+ worker, MockWorker.responseQueue, connection);
+ } else {
+ return new MockWorkerLoop(MockWorker.responseQueue,
+ connection: connection);
+ }
+ }),
+ maxRetries: maxRetries);
+ }
+
+ test('should retry up to maxRetries times', () async {
+ createDriver();
+ var expectedResponse = new WorkResponse();
+ MockWorker.responseQueue.addAll([null, null, expectedResponse]);
+ var actualResponse = await driver.doWork(new WorkRequest());
+ // The first 2 null responses are thrown away, and we should get the
+ // third one.
+ expect(actualResponse, expectedResponse);
+
+ expect(MockWorker.deadWorkers.length, 2);
+ expect(MockWorker.liveWorkers.length, 1);
+ });
+
+ test('should fail if it exceeds maxRetries failures', () async {
+ createDriver(maxRetries: 2, numBadWorkers: 3);
+ MockWorker.responseQueue.addAll([null, null, new WorkResponse()]);
+ var actualResponse = await driver.doWork(new WorkRequest());
+ // Should actually get a bad response.
+ expect(actualResponse.exitCode, 15);
+ expect(
+ actualResponse.output,
+ 'Invalid response from worker, this probably means it wrote '
+ 'invalid output or died.');
+
+ expect(MockWorker.deadWorkers.length, 3);
+ });
+ });
+
tearDown(() async {
await driver?.terminateWorkers();
expect(MockWorker.liveWorkers, isEmpty);
MockWorker.deadWorkers.clear();
+ MockWorker.responseQueue.clear();
});
});
}
@@ -108,6 +159,27 @@
}
}
+/// A mock worker loop with a custom `run` function that throws.
+class ThrowingMockWorkerLoop extends MockWorkerLoop {
+ final MockWorker _mockWorker;
+
+ ThrowingMockWorkerLoop(this._mockWorker, Queue<WorkResponse> responseQueue,
+ AsyncWorkerConnection connection)
+ : super(responseQueue, connection: connection);
+
+ /// Run the worker loop. The returned [Future] doesn't complete until
+ /// [connection#readRequest] returns `null`.
+ @override
+ Future run() async {
+ while (true) {
+ var request = await connection.readRequest();
+ if (request == null) break;
+ await performRequest(request);
+ _mockWorker.kill();
+ }
+ }
+}
+
/// A mock worker process.
///
/// Items in [responseQueue] will be returned in order based on requests.
@@ -132,13 +204,15 @@
static final deadWorkers = <MockWorker>[];
/// Standard constructor, creates the [_workerLoop].
- MockWorker() {
+ MockWorker({WorkerLoop workerLoopFactory(MockWorker mockWorker)}) {
liveWorkers.add(this);
- _workerLoop = new MockWorkerLoop(responseQueue,
- connection: new StdAsyncWorkerConnection(
- inputStream: this._stdinController.stream,
- outputStream: this._stdoutController.sink))
- ..run();
+ var workerLoop = workerLoopFactory != null
+ ? workerLoopFactory(this)
+ : new MockWorkerLoop(responseQueue,
+ connection: new StdAsyncWorkerConnection(
+ inputStream: this._stdinController.stream,
+ outputStream: this._stdoutController.sink));
+ _workerLoop = workerLoop..run();
}
Future<int> get exitCode => _exitCodeCompleter.future;