detect dead workers by a closed input stream instead of exit code (#46)

There has been a long standing issue where the exit code for detached processes was null - so we could never detect that they exited. This can cause builds to hang and continue to try scheduling work on a dead worker.

This fixes that by instead using the input stream (stdout of the worker) to detect a dead worker.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a4a6a35..6a6fed5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,10 @@
+## 0.1.23+1
+
+* Don't rely on `exitCode` to know when a worker terminates, instead wait for
+  the input stream to close.
+  * The SDK may also start throwing instead of returning a `null` here, so this
+    pre-emptively guards against that.
+
 ## 0.1.23
 
 * Support protobuf `1.x`.
diff --git a/lib/src/async_message_grouper.dart b/lib/src/async_message_grouper.dart
index 5d1cbf5..310a7f9 100644
--- a/lib/src/async_message_grouper.dart
+++ b/lib/src/async_message_grouper.dart
@@ -20,12 +20,13 @@
   /// The input stream.
   final StreamQueue<List<int>> _inputQueue;
 
-  // Whether or not the input queue has already been cancelled.
-  bool _inputQueueCancelled = false;
-
   /// The current buffer.
   final Queue<int> _buffer = Queue<int>();
 
+  /// Completes after [cancel] is called or [inputStream] is closed.
+  Future<void> get done => _done.future;
+  final _done = Completer<void>();
+
   AsyncMessageGrouper(Stream<List<int>> inputStream)
       : _inputQueue = StreamQueue(inputStream);
 
@@ -43,7 +44,7 @@
       }
 
       // If there is nothing left in the queue then cancel the subscription.
-      if (message == null) unawaited(_cancel());
+      if (message == null) unawaited(cancel());
 
       return message;
     } catch (e) {
@@ -54,14 +55,12 @@
     }
   }
 
-  Future _cancel() {
-    if (!_inputQueueCancelled) {
-      _inputQueueCancelled = true;
+  /// Stop listening to the stream for further updates.
+  Future cancel() {
+    if (!_done.isCompleted) {
+      _done.complete(null);
       return _inputQueue.cancel();
     }
-    return Future.value(null);
+    return done;
   }
-
-  /// Stop listening to the stream for further updates.
-  Future cancel() => _cancel();
 }
diff --git a/lib/src/driver/driver.dart b/lib/src/driver/driver.dart
index 06b7d3a..16cbc96 100644
--- a/lib/src/driver/driver.dart
+++ b/lib/src/driver/driver.dart
@@ -106,14 +106,17 @@
         _spawningWorkers.remove(futureWorker);
         _readyWorkers.add(worker);
 
-        _workerConnections[worker] = StdDriverConnection.forWorker(worker);
+        var connection = StdDriverConnection.forWorker(worker);
+        _workerConnections[worker] = connection;
         _runWorker(worker, attempt);
 
         // When the worker exits we should retry running the work queue in case
         // there is more work to be done. This is primarily just a defensive
         // thing but is cheap to do.
-        // exitCode can be null: https://github.com/dart-lang/sdk/issues/35874
-        worker.exitCode?.then((exitCode) {
+        //
+        // We don't use `exitCode` because it is null for detached processes (
+        // which is common for workers).
+        connection.done.then((_) {
           _idleWorkers.remove(worker);
           _readyWorkers.remove(worker);
           _runWorkQueue();
diff --git a/lib/src/driver/driver_connection.dart b/lib/src/driver/driver_connection.dart
index 0a3cb78..b282aec 100644
--- a/lib/src/driver/driver_connection.dart
+++ b/lib/src/driver/driver_connection.dart
@@ -31,6 +31,8 @@
   final AsyncMessageGrouper _messageGrouper;
   final StreamSink<List<int>> _outputStream;
 
+  Future<void> get done => _messageGrouper.done;
+
   StdDriverConnection(
       {Stream<List<int>> inputStream, StreamSink<List<int>> outputStream})
       : _messageGrouper = AsyncMessageGrouper(inputStream ?? stdin),
diff --git a/pubspec.yaml b/pubspec.yaml
index 0814989..dbae4cd 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: bazel_worker
-version: 0.1.24-dev
+version: 0.1.23+1
 
 description: Tools for creating a bazel persistent worker.
 homepage: https://github.com/dart-lang/bazel_worker
diff --git a/test/driver_test.dart b/test/driver_test.dart
index 9dcbe53..56c9086 100644
--- a/test/driver_test.dart
+++ b/test/driver_test.dart
@@ -209,9 +209,6 @@
   /// Spawns a new [MockWorker].
   static Future<MockWorker> spawn() async => MockWorker();
 
-  /// Worker loop that handles reading requests and responding.
-  AsyncWorkerLoop _workerLoop;
-
   /// Static queue of pending responses, these are shared by all workers.
   ///
   /// If this is empty and a request is received then it will throw.
@@ -223,7 +220,8 @@
   /// Static list of all the dead workers.
   static final deadWorkers = <MockWorker>[];
 
-  /// Standard constructor, creates the [_workerLoop].
+  /// Standard constructor, creates a [WorkerLoop] from [workerLoopFactory] or
+  /// a [MockWorkerLoop] if no factory is provided.
   MockWorker({WorkerLoop Function(MockWorker) workerLoopFactory}) {
     liveWorkers.add(this);
     var workerLoop = workerLoopFactory != null
@@ -232,12 +230,11 @@
             connection: StdAsyncWorkerConnection(
                 inputStream: _stdinController.stream,
                 outputStream: _stdoutController.sink));
-    _workerLoop = workerLoop..run();
+    workerLoop.run();
   }
 
   @override
-  Future<int> get exitCode => _exitCodeCompleter.future;
-  final _exitCodeCompleter = Completer<int>();
+  Future<int> get exitCode => throw UnsupportedError('Not needed.');
 
   @override
   Stream<List<int>> get stdout => _stdoutController.stream;
@@ -266,7 +263,6 @@
       await _stdoutController.close();
       await _stderrController.close();
       await _stdinController.close();
-      _exitCodeCompleter.complete(exitCode);
     }();
     deadWorkers.add(this);
     liveWorkers.remove(this);