detect dead workers by a closed input stream instead of exit code (#46)
There has been a long standing issue where the exit code for detached processes was null - so we could never detect that they exited. This can cause builds to hang and continue to try scheduling work on a dead worker.
This fixes that by instead using the input stream (stdout of the worker) to detect a dead worker.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a4a6a35..6a6fed5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,10 @@
+## 0.1.23+1
+
+* Don't rely on `exitCode` to know when a worker terminates, instead wait for
+ the input stream to close.
+ * The SDK may also start throwing instead of returning a `null` here, so this
+ pre-emptively guards against that.
+
## 0.1.23
* Support protobuf `1.x`.
diff --git a/lib/src/async_message_grouper.dart b/lib/src/async_message_grouper.dart
index 5d1cbf5..310a7f9 100644
--- a/lib/src/async_message_grouper.dart
+++ b/lib/src/async_message_grouper.dart
@@ -20,12 +20,13 @@
/// The input stream.
final StreamQueue<List<int>> _inputQueue;
- // Whether or not the input queue has already been cancelled.
- bool _inputQueueCancelled = false;
-
/// The current buffer.
final Queue<int> _buffer = Queue<int>();
+ /// Completes after [cancel] is called or [inputStream] is closed.
+ Future<void> get done => _done.future;
+ final _done = Completer<void>();
+
AsyncMessageGrouper(Stream<List<int>> inputStream)
: _inputQueue = StreamQueue(inputStream);
@@ -43,7 +44,7 @@
}
// If there is nothing left in the queue then cancel the subscription.
- if (message == null) unawaited(_cancel());
+ if (message == null) unawaited(cancel());
return message;
} catch (e) {
@@ -54,14 +55,12 @@
}
}
- Future _cancel() {
- if (!_inputQueueCancelled) {
- _inputQueueCancelled = true;
+ /// Stop listening to the stream for further updates.
+ Future cancel() {
+ if (!_done.isCompleted) {
+ _done.complete(null);
return _inputQueue.cancel();
}
- return Future.value(null);
+ return done;
}
-
- /// Stop listening to the stream for further updates.
- Future cancel() => _cancel();
}
diff --git a/lib/src/driver/driver.dart b/lib/src/driver/driver.dart
index 06b7d3a..16cbc96 100644
--- a/lib/src/driver/driver.dart
+++ b/lib/src/driver/driver.dart
@@ -106,14 +106,17 @@
_spawningWorkers.remove(futureWorker);
_readyWorkers.add(worker);
- _workerConnections[worker] = StdDriverConnection.forWorker(worker);
+ var connection = StdDriverConnection.forWorker(worker);
+ _workerConnections[worker] = connection;
_runWorker(worker, attempt);
// When the worker exits we should retry running the work queue in case
// there is more work to be done. This is primarily just a defensive
// thing but is cheap to do.
- // exitCode can be null: https://github.com/dart-lang/sdk/issues/35874
- worker.exitCode?.then((exitCode) {
+ //
+ // We don't use `exitCode` because it is null for detached processes (
+ // which is common for workers).
+ connection.done.then((_) {
_idleWorkers.remove(worker);
_readyWorkers.remove(worker);
_runWorkQueue();
diff --git a/lib/src/driver/driver_connection.dart b/lib/src/driver/driver_connection.dart
index 0a3cb78..b282aec 100644
--- a/lib/src/driver/driver_connection.dart
+++ b/lib/src/driver/driver_connection.dart
@@ -31,6 +31,8 @@
final AsyncMessageGrouper _messageGrouper;
final StreamSink<List<int>> _outputStream;
+ Future<void> get done => _messageGrouper.done;
+
StdDriverConnection(
{Stream<List<int>> inputStream, StreamSink<List<int>> outputStream})
: _messageGrouper = AsyncMessageGrouper(inputStream ?? stdin),
diff --git a/pubspec.yaml b/pubspec.yaml
index 0814989..dbae4cd 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: bazel_worker
-version: 0.1.24-dev
+version: 0.1.23+1
description: Tools for creating a bazel persistent worker.
homepage: https://github.com/dart-lang/bazel_worker
diff --git a/test/driver_test.dart b/test/driver_test.dart
index 9dcbe53..56c9086 100644
--- a/test/driver_test.dart
+++ b/test/driver_test.dart
@@ -209,9 +209,6 @@
/// Spawns a new [MockWorker].
static Future<MockWorker> spawn() async => MockWorker();
- /// Worker loop that handles reading requests and responding.
- AsyncWorkerLoop _workerLoop;
-
/// Static queue of pending responses, these are shared by all workers.
///
/// If this is empty and a request is received then it will throw.
@@ -223,7 +220,8 @@
/// Static list of all the dead workers.
static final deadWorkers = <MockWorker>[];
- /// Standard constructor, creates the [_workerLoop].
+ /// Standard constructor, creates a [WorkerLoop] from [workerLoopFactory] or
+ /// a [MockWorkerLoop] if no factory is provided.
MockWorker({WorkerLoop Function(MockWorker) workerLoopFactory}) {
liveWorkers.add(this);
var workerLoop = workerLoopFactory != null
@@ -232,12 +230,11 @@
connection: StdAsyncWorkerConnection(
inputStream: _stdinController.stream,
outputStream: _stdoutController.sink));
- _workerLoop = workerLoop..run();
+ workerLoop.run();
}
@override
- Future<int> get exitCode => _exitCodeCompleter.future;
- final _exitCodeCompleter = Completer<int>();
+ Future<int> get exitCode => throw UnsupportedError('Not needed.');
@override
Stream<List<int>> get stdout => _stdoutController.stream;
@@ -266,7 +263,6 @@
await _stdoutController.close();
await _stderrController.close();
await _stdinController.close();
- _exitCodeCompleter.complete(exitCode);
}();
deadWorkers.add(this);
liveWorkers.remove(this);