Add retry logic for failed workers (#15)

Also updated changelog/pubspec for 0.1.7 release
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b9c373a..77cd606 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,9 @@
+## 0.1.7
+
+* Update the `BazelWorkerDriver` class to handle worker crashes, and retry work
+  requests. The number of retries is configurable with the new `int maxRetries`
+  optional arg to the `BazelWorkerDriver` constructor.
+
 ## 0.1.6
 
 * Update the worker_protocol.pb.dart file with the latest proto generator.
diff --git a/lib/src/driver/driver.dart b/lib/src/driver/driver.dart
index 9d0bb85..78bec3b 100644
--- a/lib/src/driver/driver.dart
+++ b/lib/src/driver/driver.dart
@@ -17,18 +17,21 @@
 /// This allows you to use any binary that supports the bazel worker protocol in
 /// the same way that bazel would, but from another dart process instead.
 class BazelWorkerDriver {
+  /// Idle worker processes.
+  final _idleWorkers = <Process>[];
+
   /// The maximum number of idle workers at any given time.
   final int _maxIdleWorkers;
 
+  /// The maximum number of times to retry a [WorkAttempt] if there is an error.
+  final int _maxRetries;
+
   /// The maximum number of concurrent workers to run at any given time.
   final int _maxWorkers;
 
   /// The number of currently active workers.
   int get _numWorkers => _readyWorkers.length + _spawningWorkers.length;
 
-  /// Idle worker processes.
-  final _idleWorkers = <Process>[];
-
   /// All workers that are fully spawned and ready to handle work.
   final _readyWorkers = <Process>[];
 
@@ -36,21 +39,22 @@
   final _spawningWorkers = <Future<Process>>[];
 
   /// Work requests that haven't been started yet.
-  final _workQueue = new Queue<WorkRequest>();
+  final _workQueue = new Queue<_WorkAttempt>();
 
   /// Factory method that spawns a worker process.
   final SpawnWorker _spawnWorker;
 
-  BazelWorkerDriver(this._spawnWorker, {int maxIdleWorkers, int maxWorkers})
+  BazelWorkerDriver(this._spawnWorker,
+      {int maxIdleWorkers, int maxWorkers, int maxRetries})
       : this._maxIdleWorkers = maxIdleWorkers ?? 4,
-        this._maxWorkers = maxWorkers ?? 4;
+        this._maxWorkers = maxWorkers ?? 4,
+        this._maxRetries = maxRetries ?? 4;
 
   Future<WorkResponse> doWork(WorkRequest request) {
-    var responseCompleter = new Completer<WorkResponse>();
-    _responseCompleters[request] = responseCompleter;
-    _workQueue.add(request);
+    var attempt = new _WorkAttempt(request);
+    _workQueue.add(attempt);
     _runWorkQueue();
-    return responseCompleter.future;
+    return attempt.response;
   }
 
   /// Calls `kill` on all worker processes.
@@ -83,9 +87,9 @@
 
     // At this point we definitely want to run a task, we just need to decide
     // whether or not we need to start up a new worker.
-    var request = _workQueue.removeFirst();
+    var attempt = _workQueue.removeFirst();
     if (_idleWorkers.isNotEmpty) {
-      _runWorker(_idleWorkers.removeLast(), request);
+      _runWorker(_idleWorkers.removeLast(), attempt);
     } else {
       // No need to block here, we want to continue to synchronously drain the
       // work queue.
@@ -96,13 +100,15 @@
         _readyWorkers.add(worker);
 
         _workerConnections[worker] = new StdDriverConnection.forWorker(worker);
-        _runWorker(worker, request);
+        _runWorker(worker, attempt);
 
         // When the worker exits we should retry running the work queue in case
         // there is more work to be done. This is primarily just a defensive
         // thing but is cheap to do.
-        worker.exitCode.then((_) {
+        worker.exitCode.then((exitCode) {
+          _idleWorkers.remove(worker);
           _readyWorkers.remove(worker);
+          _spawningWorkers.remove(worker);
           _runWorkQueue();
         });
       });
@@ -115,39 +121,92 @@
   ///
   /// Once the worker responds then it will be added back to the pool of idle
   /// workers.
-  Future _runWorker(Process worker, WorkRequest request) async {
-    try {
+  void _runWorker(Process worker, _WorkAttempt attempt) {
+    bool rescheduled = false;
+
+    runZoned(() async {
       var connection = _workerConnections[worker];
-      connection.writeRequest(request);
+      connection.writeRequest(attempt.request);
       var response = await connection.readResponse();
-      _responseCompleters[request].complete(response);
 
-      // Do additional work if available.
-      _idleWorkers.add(worker);
-      _runWorkQueue();
-
-      // If the worker wasn't immediately used we might have to many idle
-      // workers now, kill one if necessary.
-      if (_idleWorkers.length > _maxIdleWorkers) {
-        // Note that whenever we spawn a worker we listen for its exit code
-        // and clean it up so we don't need to do that here.
-        var worker = _idleWorkers.removeLast();
-        _readyWorkers.remove(worker);
-        worker.kill();
+      // It is possible for us to complete with an error response due to an
+      // unhandled async error before we get here.
+      if (!attempt.responseCompleter.isCompleted) {
+        if (response == null) {
+          rescheduled = _tryReschedule(attempt);
+          if (rescheduled) return;
+          stderr.writeln('Failed to run request ${attempt.request}');
+          response = new WorkResponse()
+            ..exitCode = EXIT_CODE_ERROR
+            ..output =
+                'Invalid response from worker, this probably means it wrote '
+                'invalid output or died.';
+        }
+        attempt.responseCompleter.complete(response);
+        _cleanUp(worker);
       }
-    } catch (e, s) {
+    }, onError: (e, s) {
       // Note that we don't need to do additional cleanup here on failures. If
       // the worker dies that is already handled in a generic fashion, we just
       // need to make sure we complete with a valid response.
-      if (!_responseCompleters[request].isCompleted) {
+      if (!attempt.responseCompleter.isCompleted) {
+        rescheduled = _tryReschedule(attempt);
+        if (rescheduled) return;
         var response = new WorkResponse()
           ..exitCode = EXIT_CODE_ERROR
           ..output = 'Error running worker:\n$e\n$s';
-        _responseCompleters[request].complete(response);
+        attempt.responseCompleter.complete(response);
+        _cleanUp(worker);
       }
+    });
+  }
+
+  /// Performs post-work cleanup for [worker].
+  void _cleanUp(Process worker) {
+    // If the worker crashes, it won't be in `_readyWorkers` any more, and
+    // we don't want to add it to _idleWorkers.
+    if (_readyWorkers.contains(worker)) {
+      _idleWorkers.add(worker);
     }
+
+    // Do additional work if available.
+    _runWorkQueue();
+
+    // If the worker wasn't immediately used we might have to many idle
+    // workers now, kill one if necessary.
+    if (_idleWorkers.length > _maxIdleWorkers) {
+      // Note that whenever we spawn a worker we listen for its exit code
+      // and clean it up so we don't need to do that here.
+      var worker = _idleWorkers.removeLast();
+      _readyWorkers.remove(worker);
+      worker.kill();
+    }
+  }
+
+  /// Attempts to reschedule a failed [attempt].
+  ///
+  /// Returns whether or not the job was successfully rescheduled.
+  bool _tryReschedule(_WorkAttempt attempt) {
+    if (attempt.timesRetried >= _maxRetries) return false;
+    stderr.writeln('Rescheduling failed request...');
+    attempt.timesRetried++;
+    _workQueue.add(attempt);
+    _runWorkQueue();
+    return true;
   }
 }
 
-final _responseCompleters = new Expando<Completer<WorkResponse>>('response');
-final _workerConnections = new Expando<DriverConnection>('connectin');
+/// Encapsulates an attempt to fulfill a [WorkRequest], a completer for the
+/// [WorkResponse], and the number of times it has been retried.
+class _WorkAttempt {
+  final WorkRequest request;
+  final responseCompleter = new Completer<WorkResponse>();
+
+  Future<WorkResponse> get response => responseCompleter.future;
+
+  int timesRetried = 0;
+
+  _WorkAttempt(this.request);
+}
+
+final _workerConnections = new Expando<DriverConnection>('connection');
diff --git a/pubspec.yaml b/pubspec.yaml
index f4e6514..397ee9b 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: bazel_worker
-version: 0.1.6
+version: 0.1.7
 description: Tools for creating a bazel persistent worker.
 author: Dart Team <misc@dartlang.org>
 homepage: https://github.com/dart-lang/bazel_worker
diff --git a/test/driver_test.dart b/test/driver_test.dart
index 0191c3a..ede5f99 100644
--- a/test/driver_test.dart
+++ b/test/driver_test.dart
@@ -69,10 +69,61 @@
       }
     });
 
+    group('failing workers', () {
+      /// A driver which spawns [numBadWorkers] failing workers and then good
+      /// ones after that, and which will retry [maxRetries] times.
+      void createDriver({int maxRetries = 2, int numBadWorkers = 2}) {
+        int numSpawned = 0;
+        driver = new BazelWorkerDriver(
+            () async => new MockWorker(workerLoopFactory: (MockWorker worker) {
+                  var connection = new StdAsyncWorkerConnection(
+                      inputStream: worker._stdinController.stream,
+                      outputStream: worker._stdoutController.sink);
+                  if (numSpawned < numBadWorkers) {
+                    numSpawned++;
+                    return new ThrowingMockWorkerLoop(
+                        worker, MockWorker.responseQueue, connection);
+                  } else {
+                    return new MockWorkerLoop(MockWorker.responseQueue,
+                        connection: connection);
+                  }
+                }),
+            maxRetries: maxRetries);
+      }
+
+      test('should retry up to maxRetries times', () async {
+        createDriver();
+        var expectedResponse = new WorkResponse();
+        MockWorker.responseQueue.addAll([null, null, expectedResponse]);
+        var actualResponse = await driver.doWork(new WorkRequest());
+        // The first 2 null responses are thrown away, and we should get the
+        // third one.
+        expect(actualResponse, expectedResponse);
+
+        expect(MockWorker.deadWorkers.length, 2);
+        expect(MockWorker.liveWorkers.length, 1);
+      });
+
+      test('should fail if it exceeds maxRetries failures', () async {
+        createDriver(maxRetries: 2, numBadWorkers: 3);
+        MockWorker.responseQueue.addAll([null, null, new WorkResponse()]);
+        var actualResponse = await driver.doWork(new WorkRequest());
+        // Should actually get a bad response.
+        expect(actualResponse.exitCode, 15);
+        expect(
+            actualResponse.output,
+            'Invalid response from worker, this probably means it wrote '
+            'invalid output or died.');
+
+        expect(MockWorker.deadWorkers.length, 3);
+      });
+    });
+
     tearDown(() async {
       await driver?.terminateWorkers();
       expect(MockWorker.liveWorkers, isEmpty);
       MockWorker.deadWorkers.clear();
+      MockWorker.responseQueue.clear();
     });
   });
 }
@@ -108,6 +159,27 @@
   }
 }
 
+/// A mock worker loop with a custom `run` function that throws.
+class ThrowingMockWorkerLoop extends MockWorkerLoop {
+  final MockWorker _mockWorker;
+
+  ThrowingMockWorkerLoop(this._mockWorker, Queue<WorkResponse> responseQueue,
+      AsyncWorkerConnection connection)
+      : super(responseQueue, connection: connection);
+
+  /// Run the worker loop. The returned [Future] doesn't complete until
+  /// [connection#readRequest] returns `null`.
+  @override
+  Future run() async {
+    while (true) {
+      var request = await connection.readRequest();
+      if (request == null) break;
+      await performRequest(request);
+      _mockWorker.kill();
+    }
+  }
+}
+
 /// A mock worker process.
 ///
 /// Items in [responseQueue] will be returned in order based on requests.
@@ -132,13 +204,15 @@
   static final deadWorkers = <MockWorker>[];
 
   /// Standard constructor, creates the [_workerLoop].
-  MockWorker() {
+  MockWorker({WorkerLoop workerLoopFactory(MockWorker mockWorker)}) {
     liveWorkers.add(this);
-    _workerLoop = new MockWorkerLoop(responseQueue,
-        connection: new StdAsyncWorkerConnection(
-            inputStream: this._stdinController.stream,
-            outputStream: this._stdoutController.sink))
-      ..run();
+    var workerLoop = workerLoopFactory != null
+        ? workerLoopFactory(this)
+        : new MockWorkerLoop(responseQueue,
+            connection: new StdAsyncWorkerConnection(
+                inputStream: this._stdinController.stream,
+                outputStream: this._stdoutController.sink));
+    _workerLoop = workerLoop..run();
   }
 
   Future<int> get exitCode => _exitCodeCompleter.future;