add trackWork argument to BazelWorkerDriver.doWork (#35)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1b018f5..7ae5208 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 0.1.18
+
+* Add a `trackWork` optional named argument to `BazelDriver.doWork`. This allows
+ the caller to know when a work request is actually sent to a worker.
+
## 0.1.17
* Allow protobuf 0.13.0.
diff --git a/lib/src/driver/driver.dart b/lib/src/driver/driver.dart
index 80d82a9..8049546 100644
--- a/lib/src/driver/driver.dart
+++ b/lib/src/driver/driver.dart
@@ -50,8 +50,15 @@
this._maxWorkers = maxWorkers ?? 4,
this._maxRetries = maxRetries ?? 4;
- Future<WorkResponse> doWork(WorkRequest request) {
- var attempt = new _WorkAttempt(request);
+ /// Waits for an available worker, and then sends [WorkRequest] to it.
+ ///
+ /// If [trackWork] is provided it will be invoked with a [Future] once the
+ /// [request] has been actually sent to the worker. This allows the caller
+ /// to determine when actual work is being done versus just waiting for an
+ /// available worker.
+ Future<WorkResponse> doWork(WorkRequest request,
+ {Function(Future<WorkResponse>) trackWork}) {
+ var attempt = new _WorkAttempt(request, trackWork: trackWork);
_workQueue.add(attempt);
_runWorkQueue();
return attempt.response;
@@ -125,8 +132,13 @@
runZoned(() async {
var connection = _workerConnections[worker];
+
connection.writeRequest(attempt.request);
- var response = await connection.readResponse();
+ var responseFuture = connection.readResponse();
+ if (attempt.trackWork != null) {
+ attempt.trackWork(responseFuture);
+ }
+ var response = await responseFuture;
// It is possible for us to complete with an error response due to an
// unhandled async error before we get here.
@@ -206,12 +218,13 @@
class _WorkAttempt {
final WorkRequest request;
final responseCompleter = new Completer<WorkResponse>();
+ final Function(Future<WorkResponse>) trackWork;
Future<WorkResponse> get response => responseCompleter.future;
int timesRetried = 0;
- _WorkAttempt(this.request);
+ _WorkAttempt(this.request, {this.trackWork});
}
final _workerConnections = new Expando<DriverConnection>('connection');
diff --git a/pubspec.yaml b/pubspec.yaml
index b8c5766..3f4462d 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: bazel_worker
-version: 0.1.17
+version: 0.1.18
description: Tools for creating a bazel persistent worker.
author: Dart Team <misc@dartlang.org>
diff --git a/test/driver_test.dart b/test/driver_test.dart
index 1b48ce7..9b664e9 100644
--- a/test/driver_test.dart
+++ b/test/driver_test.dart
@@ -69,6 +69,18 @@
}
});
+ test('trackWork gets invoked when a worker is actually ready', () async {
+ var maxWorkers = 2;
+ driver = new BazelWorkerDriver(MockWorker.spawn, maxWorkers: maxWorkers);
+ var tracking = <Future>[];
+ await _doRequests(driver: driver, count: 10, trackWork: (Future response) {
+ // We should never be tracking more than `maxWorkers` jobs at a time.
+ expect(tracking.length, lessThan(maxWorkers));
+ tracking.add(response);
+ response.then((_) => tracking.remove(response));
+ });
+ });
+
group('failing workers', () {
/// A driver which spawns [numBadWorkers] failing workers and then good
/// ones after that, and which will retry [maxRetries] times.
@@ -130,7 +142,10 @@
/// Runs [count] of fake work requests through [driver], and asserts that they
/// all completed.
-Future _doRequests({BazelWorkerDriver driver, int count}) async {
+Future _doRequests(
+ {BazelWorkerDriver driver,
+ int count,
+ Function(Future<WorkResponse>) trackWork}) async {
// If we create a driver, we need to make sure and terminate it.
var terminateDriver = driver == null;
driver ??= new BazelWorkerDriver(MockWorker.spawn);
@@ -139,7 +154,8 @@
var requests = new List.generate(count, (_) => new WorkRequest());
var responses = new List.generate(count, (_) => new WorkResponse());
MockWorker.responseQueue.addAll(responses);
- var actualResponses = await Future.wait(requests.map(driver.doWork));
+ var actualResponses = await Future.wait(
+ requests.map((request) => driver.doWork(request, trackWork: trackWork)));
expect(actualResponses, unorderedEquals(responses));
if (terminateDriver) await driver.terminateWorkers();
}
@@ -163,7 +179,9 @@
class ThrowingMockWorkerLoop extends MockWorkerLoop {
final MockWorker _mockWorker;
- ThrowingMockWorkerLoop(this._mockWorker, Queue<WorkResponse> responseQueue,
+ ThrowingMockWorkerLoop(
+ this._mockWorker,
+ Queue<WorkResponse> responseQueue,
AsyncWorkerConnection connection)
: super(responseQueue, connection: connection);