Detect dead workers by a closed input stream
Cherry picks the fix from a45a104
This should allow projects using an older `protobuf` to use newer SDKs
which require this change due to a backwards incompatible change.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ab64309..2599680 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,9 @@
+## 0.1.21+1
+
+* Don't rely on `exitCode` to know when a worker terminates, instead wait for
+ the input stream to close. Backport of fix in `0.1.23+1` in a version that
+ does not require a newer protobuf.
+
## 0.1.21
* Make `TestStdinAsync` behave like a `Stream<Uint8List>`
diff --git a/lib/src/async_message_grouper.dart b/lib/src/async_message_grouper.dart
index 3044bbe..7fca060 100644
--- a/lib/src/async_message_grouper.dart
+++ b/lib/src/async_message_grouper.dart
@@ -19,12 +19,13 @@
/// The input stream.
final StreamQueue<List<int>> _inputQueue;
- // Whether or not the input queue has already been cancelled.
- bool _inputQueueCancelled = false;
-
/// The current buffer.
final Queue<int> _buffer = new Queue<int>();
+ /// Completes after [cancel] is called or [inputStream] is closed.
+ Future<void> get done => _done.future;
+ final _done = Completer<void>();
+
AsyncMessageGrouper(Stream<List<int>> inputStream)
: _inputQueue = new StreamQueue(inputStream);
@@ -41,7 +42,7 @@
}
// If there is nothing left in the queue then cancel the subscription.
- if (message == null) _cancel();
+ if (message == null) cancel();
return message;
} catch (e) {
@@ -52,14 +53,12 @@
}
}
- Future _cancel() {
- if (!_inputQueueCancelled) {
- _inputQueueCancelled = true;
+ /// Stop listening to the stream for further updates.
+ Future cancel() {
+ if (!_done.isCompleted) {
+ _done.complete(null);
return _inputQueue.cancel();
}
- return new Future.value(null);
+ return done;
}
-
- /// Stop listening to the stream for further updates.
- Future cancel() => _cancel();
}
diff --git a/lib/src/driver/driver.dart b/lib/src/driver/driver.dart
index 8a3fb04..ba3eec8 100644
--- a/lib/src/driver/driver.dart
+++ b/lib/src/driver/driver.dart
@@ -106,14 +106,17 @@
_spawningWorkers.remove(futureWorker);
_readyWorkers.add(worker);
- _workerConnections[worker] = new StdDriverConnection.forWorker(worker);
+ var connection = StdDriverConnection.forWorker(worker);
+ _workerConnections[worker] = connection;
_runWorker(worker, attempt);
// When the worker exits we should retry running the work queue in case
// there is more work to be done. This is primarily just a defensive
// thing but is cheap to do.
- // exitCode can be null: https://github.com/dart-lang/sdk/issues/35874
- worker.exitCode?.then((exitCode) {
+ //
+ // We don't use `exitCode` because it is null for detached processes (
+ // which is common for workers).
+ connection.done.then((_) {
_idleWorkers.remove(worker);
_readyWorkers.remove(worker);
_runWorkQueue();
diff --git a/lib/src/driver/driver_connection.dart b/lib/src/driver/driver_connection.dart
index f30bf84..360897b 100644
--- a/lib/src/driver/driver_connection.dart
+++ b/lib/src/driver/driver_connection.dart
@@ -31,6 +31,8 @@
final AsyncMessageGrouper _messageGrouper;
final StreamSink<List<int>> _outputStream;
+ Future<void> get done => _messageGrouper.done;
+
StdDriverConnection(
{Stream<List<int>> inputStream, StreamSink<List<int>> outputStream})
: _messageGrouper = new AsyncMessageGrouper(inputStream ?? stdin),
diff --git a/pubspec.yaml b/pubspec.yaml
index f7da685..f8ffa33 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: bazel_worker
-version: 0.1.21
+version: 0.1.21+1
description: Tools for creating a bazel persistent worker.
author: Dart Team <misc@dartlang.org>
diff --git a/test/driver_test.dart b/test/driver_test.dart
index 9b664e9..5b1f57a 100644
--- a/test/driver_test.dart
+++ b/test/driver_test.dart
@@ -73,12 +73,15 @@
var maxWorkers = 2;
driver = new BazelWorkerDriver(MockWorker.spawn, maxWorkers: maxWorkers);
var tracking = <Future>[];
- await _doRequests(driver: driver, count: 10, trackWork: (Future response) {
- // We should never be tracking more than `maxWorkers` jobs at a time.
- expect(tracking.length, lessThan(maxWorkers));
- tracking.add(response);
- response.then((_) => tracking.remove(response));
- });
+ await _doRequests(
+ driver: driver,
+ count: 10,
+ trackWork: (Future response) {
+ // We should never be tracking more than `maxWorkers` jobs at a time.
+ expect(tracking.length, lessThan(maxWorkers));
+ tracking.add(response);
+ response.then((_) => tracking.remove(response));
+ });
});
group('failing workers', () {
@@ -179,9 +182,7 @@
class ThrowingMockWorkerLoop extends MockWorkerLoop {
final MockWorker _mockWorker;
- ThrowingMockWorkerLoop(
- this._mockWorker,
- Queue<WorkResponse> responseQueue,
+ ThrowingMockWorkerLoop(this._mockWorker, Queue<WorkResponse> responseQueue,
AsyncWorkerConnection connection)
: super(responseQueue, connection: connection);
@@ -207,9 +208,6 @@
/// Spawns a new [MockWorker].
static Future<MockWorker> spawn() async => new MockWorker();
- /// Worker loop that handles reading requests and responding.
- AsyncWorkerLoop _workerLoop;
-
/// Static queue of pending responses, these are shared by all workers.
///
/// If this is empty and a request is received then it will throw.
@@ -221,20 +219,21 @@
/// Static list of all the dead workers.
static final deadWorkers = <MockWorker>[];
- /// Standard constructor, creates the [_workerLoop].
- MockWorker({WorkerLoop workerLoopFactory(MockWorker mockWorker)}) {
+ /// Standard constructor, creates a [WorkerLoop] from [workerLoopFactory] or
+ /// a [MockWorkerLoop] if no factory is provided.
+ MockWorker({WorkerLoop Function(MockWorker) workerLoopFactory}) {
liveWorkers.add(this);
var workerLoop = workerLoopFactory != null
? workerLoopFactory(this)
- : new MockWorkerLoop(responseQueue,
- connection: new StdAsyncWorkerConnection(
- inputStream: this._stdinController.stream,
- outputStream: this._stdoutController.sink));
- _workerLoop = workerLoop..run();
+ : MockWorkerLoop(responseQueue,
+ connection: StdAsyncWorkerConnection(
+ inputStream: _stdinController.stream,
+ outputStream: _stdoutController.sink));
+ workerLoop.run();
}
- Future<int> get exitCode => _exitCodeCompleter.future;
- final _exitCodeCompleter = new Completer<int>();
+ @override
+ Future<int> get exitCode => throw UnsupportedError('Not needed.');
@override
Stream<List<int>> get stdout => _stdoutController.stream;
@@ -262,7 +261,6 @@
await _stdoutController.close();
await _stderrController.close();
await _stdinController.close();
- _exitCodeCompleter.complete(exitCode);
}();
deadWorkers.add(this);
liveWorkers.remove(this);