Detect dead workers by a closed input stream

Cherry picks the fix from a45a104

This should allow projects using an older `protobuf` to use newer SDKs
which require this change due to a backwards incompatible change.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ab64309..2599680 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,9 @@
+## 0.1.21+1
+
+* Don't rely on `exitCode` to know when a worker terminates, instead wait for
+  the input stream to close. Backport of fix in `0.1.23+1` in a version that
+  does not require a newer protobuf.
+
 ## 0.1.21
 
 * Make `TestStdinAsync` behave like a `Stream<Uint8List>`
diff --git a/lib/src/async_message_grouper.dart b/lib/src/async_message_grouper.dart
index 3044bbe..7fca060 100644
--- a/lib/src/async_message_grouper.dart
+++ b/lib/src/async_message_grouper.dart
@@ -19,12 +19,13 @@
   /// The input stream.
   final StreamQueue<List<int>> _inputQueue;
 
-  // Whether or not the input queue has already been cancelled.
-  bool _inputQueueCancelled = false;
-
   /// The current buffer.
   final Queue<int> _buffer = new Queue<int>();
 
+  /// Completes after [cancel] is called or [inputStream] is closed.
+  Future<void> get done => _done.future;
+  final _done = Completer<void>();
+
   AsyncMessageGrouper(Stream<List<int>> inputStream)
       : _inputQueue = new StreamQueue(inputStream);
 
@@ -41,7 +42,7 @@
       }
 
       // If there is nothing left in the queue then cancel the subscription.
-      if (message == null) _cancel();
+      if (message == null) cancel();
 
       return message;
     } catch (e) {
@@ -52,14 +53,12 @@
     }
   }
 
-  Future _cancel() {
-    if (!_inputQueueCancelled) {
-      _inputQueueCancelled = true;
+  /// Stop listening to the stream for further updates.
+  Future cancel() {
+    if (!_done.isCompleted) {
+      _done.complete(null);
       return _inputQueue.cancel();
     }
-    return new Future.value(null);
+    return done;
   }
-
-  /// Stop listening to the stream for further updates.
-  Future cancel() => _cancel();
 }
diff --git a/lib/src/driver/driver.dart b/lib/src/driver/driver.dart
index 8a3fb04..ba3eec8 100644
--- a/lib/src/driver/driver.dart
+++ b/lib/src/driver/driver.dart
@@ -106,14 +106,17 @@
         _spawningWorkers.remove(futureWorker);
         _readyWorkers.add(worker);
 
-        _workerConnections[worker] = new StdDriverConnection.forWorker(worker);
+        var connection = StdDriverConnection.forWorker(worker);
+        _workerConnections[worker] = connection;
         _runWorker(worker, attempt);
 
         // When the worker exits we should retry running the work queue in case
         // there is more work to be done. This is primarily just a defensive
         // thing but is cheap to do.
-        // exitCode can be null: https://github.com/dart-lang/sdk/issues/35874
-        worker.exitCode?.then((exitCode) {
+        //
+        // We don't use `exitCode` because it is null for detached processes (
+        // which is common for workers).
+        connection.done.then((_) {
           _idleWorkers.remove(worker);
           _readyWorkers.remove(worker);
           _runWorkQueue();
diff --git a/lib/src/driver/driver_connection.dart b/lib/src/driver/driver_connection.dart
index f30bf84..360897b 100644
--- a/lib/src/driver/driver_connection.dart
+++ b/lib/src/driver/driver_connection.dart
@@ -31,6 +31,8 @@
   final AsyncMessageGrouper _messageGrouper;
   final StreamSink<List<int>> _outputStream;
 
+  Future<void> get done => _messageGrouper.done;
+
   StdDriverConnection(
       {Stream<List<int>> inputStream, StreamSink<List<int>> outputStream})
       : _messageGrouper = new AsyncMessageGrouper(inputStream ?? stdin),
diff --git a/pubspec.yaml b/pubspec.yaml
index f7da685..f8ffa33 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: bazel_worker
-version: 0.1.21
+version: 0.1.21+1
 
 description: Tools for creating a bazel persistent worker.
 author: Dart Team <misc@dartlang.org>
diff --git a/test/driver_test.dart b/test/driver_test.dart
index 9b664e9..5b1f57a 100644
--- a/test/driver_test.dart
+++ b/test/driver_test.dart
@@ -73,12 +73,15 @@
       var maxWorkers = 2;
       driver = new BazelWorkerDriver(MockWorker.spawn, maxWorkers: maxWorkers);
       var tracking = <Future>[];
-      await _doRequests(driver: driver, count: 10, trackWork: (Future response) {
-          // We should never be tracking more than `maxWorkers` jobs at a time.
-          expect(tracking.length, lessThan(maxWorkers));
-          tracking.add(response);
-          response.then((_) => tracking.remove(response));
-        });
+      await _doRequests(
+          driver: driver,
+          count: 10,
+          trackWork: (Future response) {
+            // We should never be tracking more than `maxWorkers` jobs at a time.
+            expect(tracking.length, lessThan(maxWorkers));
+            tracking.add(response);
+            response.then((_) => tracking.remove(response));
+          });
     });
 
     group('failing workers', () {
@@ -179,9 +182,7 @@
 class ThrowingMockWorkerLoop extends MockWorkerLoop {
   final MockWorker _mockWorker;
 
-  ThrowingMockWorkerLoop(
-      this._mockWorker,
-      Queue<WorkResponse> responseQueue,
+  ThrowingMockWorkerLoop(this._mockWorker, Queue<WorkResponse> responseQueue,
       AsyncWorkerConnection connection)
       : super(responseQueue, connection: connection);
 
@@ -207,9 +208,6 @@
   /// Spawns a new [MockWorker].
   static Future<MockWorker> spawn() async => new MockWorker();
 
-  /// Worker loop that handles reading requests and responding.
-  AsyncWorkerLoop _workerLoop;
-
   /// Static queue of pending responses, these are shared by all workers.
   ///
   /// If this is empty and a request is received then it will throw.
@@ -221,20 +219,21 @@
   /// Static list of all the dead workers.
   static final deadWorkers = <MockWorker>[];
 
-  /// Standard constructor, creates the [_workerLoop].
-  MockWorker({WorkerLoop workerLoopFactory(MockWorker mockWorker)}) {
+  /// Standard constructor, creates a [WorkerLoop] from [workerLoopFactory] or
+  /// a [MockWorkerLoop] if no factory is provided.
+  MockWorker({WorkerLoop Function(MockWorker) workerLoopFactory}) {
     liveWorkers.add(this);
     var workerLoop = workerLoopFactory != null
         ? workerLoopFactory(this)
-        : new MockWorkerLoop(responseQueue,
-            connection: new StdAsyncWorkerConnection(
-                inputStream: this._stdinController.stream,
-                outputStream: this._stdoutController.sink));
-    _workerLoop = workerLoop..run();
+        : MockWorkerLoop(responseQueue,
+            connection: StdAsyncWorkerConnection(
+                inputStream: _stdinController.stream,
+                outputStream: _stdoutController.sink));
+    workerLoop.run();
   }
 
-  Future<int> get exitCode => _exitCodeCompleter.future;
-  final _exitCodeCompleter = new Completer<int>();
+  @override
+  Future<int> get exitCode => throw UnsupportedError('Not needed.');
 
   @override
   Stream<List<int>> get stdout => _stdoutController.stream;
@@ -262,7 +261,6 @@
       await _stdoutController.close();
       await _stderrController.close();
       await _stdinController.close();
-      _exitCodeCompleter.complete(exitCode);
     }();
     deadWorkers.add(this);
     liveWorkers.remove(this);