Add cancel method to `BazelWorkerConnection` and use it (#16)
Also update pubspec/changelog for release
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 77cd606..b8b0031 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,10 @@
+## 0.1.8
+
+* Add `Future cancel()` method to `DriverConnection`, which in the case of a
+ `StdDriverConnection` closes the input stream.
+ * The `terminateWorkers` method on `BazelWorkerDriver` now calls `cancel` on
+ all worker connections to ensure the vm can exit correctly.
+
## 0.1.7
* Update the `BazelWorkerDriver` class to handle worker crashes, and retry work
diff --git a/lib/src/async_message_grouper.dart b/lib/src/async_message_grouper.dart
index 93caf49..3044bbe 100644
--- a/lib/src/async_message_grouper.dart
+++ b/lib/src/async_message_grouper.dart
@@ -19,6 +19,9 @@
/// The input stream.
final StreamQueue<List<int>> _inputQueue;
+ // Whether or not the input queue has already been cancelled.
+ bool _inputQueueCancelled = false;
+
/// The current buffer.
final Queue<int> _buffer = new Queue<int>();
@@ -38,7 +41,7 @@
}
// If there is nothing left in the queue then cancel the subscription.
- if (message == null) _inputQueue.cancel();
+ if (message == null) _cancel();
return message;
} catch (e) {
@@ -49,6 +52,14 @@
}
}
+ Future _cancel() {
+ if (!_inputQueueCancelled) {
+ _inputQueueCancelled = true;
+ return _inputQueue.cancel();
+ }
+ return new Future.value(null);
+ }
+
/// Stop listening to the stream for further updates.
- Future cancel() => _inputQueue.cancel();
+ Future cancel() => _cancel();
}
diff --git a/lib/src/driver/driver.dart b/lib/src/driver/driver.dart
index 78bec3b..80d82a9 100644
--- a/lib/src/driver/driver.dart
+++ b/lib/src/driver/driver.dart
@@ -59,11 +59,11 @@
/// Calls `kill` on all worker processes.
Future terminateWorkers() async {
- for (var worker in _readyWorkers) {
- worker.kill();
+ for (var worker in _readyWorkers.toList()) {
+ _killWorker(worker);
}
await Future.wait(_spawningWorkers.map((worker) async {
- (await worker).kill();
+ _killWorker(await worker);
}));
}
@@ -108,7 +108,6 @@
worker.exitCode.then((exitCode) {
_idleWorkers.remove(worker);
_readyWorkers.remove(worker);
- _spawningWorkers.remove(worker);
_runWorkQueue();
});
});
@@ -178,8 +177,7 @@
// Note that whenever we spawn a worker we listen for its exit code
// and clean it up so we don't need to do that here.
var worker = _idleWorkers.removeLast();
- _readyWorkers.remove(worker);
- worker.kill();
+ _killWorker(worker);
}
}
@@ -194,6 +192,13 @@
_runWorkQueue();
return true;
}
+
+ void _killWorker(Process worker) {
+ _workerConnections[worker].cancel();
+ _readyWorkers.remove(worker);
+ _idleWorkers.remove(worker);
+ worker.kill();
+ }
}
/// Encapsulates an attempt to fulfill a [WorkRequest], a completer for the
diff --git a/lib/src/driver/driver_connection.dart b/lib/src/driver/driver_connection.dart
index b170586..29a00a1 100644
--- a/lib/src/driver/driver_connection.dart
+++ b/lib/src/driver/driver_connection.dart
@@ -20,6 +20,8 @@
Future<WorkResponse> readResponse();
void writeRequest(WorkRequest request);
+
+ Future cancel();
}
/// Default implementation of [DriverConnection] that works with [Stdin]
@@ -72,4 +74,7 @@
void writeRequest(WorkRequest request) {
_outputStream.add(protoToDelimitedBuffer(request));
}
+
+ @override
+ Future cancel() => _messageGrouper.cancel();
}
diff --git a/pubspec.yaml b/pubspec.yaml
index 397ee9b..d2007e5 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: bazel_worker
-version: 0.1.7
+version: 0.1.8
description: Tools for creating a bazel persistent worker.
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/bazel_worker