Initial version of BazelWorkerDriver
diff --git a/lib/driver.dart b/lib/driver.dart
new file mode 100644
index 0000000..7453491
--- /dev/null
+++ b/lib/driver.dart
@@ -0,0 +1,9 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+export 'src/driver/driver.dart';
+export 'src/driver/driver_connection.dart';
+export 'src/constants.dart';
+export 'src/message_grouper.dart';
+export 'src/worker_protocol.pb.dart';
diff --git a/lib/src/driver/driver.dart b/lib/src/driver/driver.dart
new file mode 100644
index 0000000..c2465e0
--- /dev/null
+++ b/lib/src/driver/driver.dart
@@ -0,0 +1,151 @@
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:collection';
+import 'dart:io';
+
+import '../constants.dart';
+import '../worker_protocol.pb.dart';
+import 'driver_connection.dart';
+
+typedef Future<Process> SpawnWorker();
+
+/// A driver for talking to a bazel worker.
+///
+/// This allows you to use any binary that supports the bazel worker protocol in
+/// the same way that bazel would, but from another dart process instead.
+class BazelWorkerDriver {
+ /// The maximum number of idle workers at any given time.
+ final int _maxIdleWorkers;
+
+ /// The maximum number of concurrent workers to run at any given time.
+ final int _maxWorkers;
+
+ /// The number of currently active workers.
+ int get _numWorkers => _allWorkers.length;
+
+ /// Idle worker processes.
+ final _idleWorkers = <Process>[];
+
+ /// All workers, even the ones that are in the process of being spawned.
+ final _allWorkers = <FutureOr<Process>>[];
+
+ /// Work requests that haven't been started yet.
+ final _workQueue = new Queue<WorkRequest>();
+
+ /// Factory method that spawns a worker process.
+ final SpawnWorker _spawnWorker;
+
+ BazelWorkerDriver(this._spawnWorker, {int maxIdleWorkers, int maxWorkers})
+ : this._maxIdleWorkers = maxIdleWorkers ?? 4,
+ this._maxWorkers = maxWorkers ?? 4;
+
+ Future<WorkResponse> doWork(WorkRequest request) {
+ var responseCompleter = new Completer<WorkResponse>();
+ _responseCompleters[request] = responseCompleter;
+ _workQueue.add(request);
+ _runWorkQueue();
+ return responseCompleter.future;
+ }
+
+ /// Calls `kill` on all worker processes.
+ Future terminateWorkers() async {
+ var existing = new List.from(_allWorkers);
+ _allWorkers.clear();
+ await Future.wait(existing.map((futureWorker) async {
+ (await futureWorker).kill();
+ }));
+ }
+
+ /// Runs as many items in [_workQueue] as possible given the number of
+ /// available workers.
+ ///
+ /// Will spawn additional workers until [_maxWorkers] has been reached.
+ ///
+ /// This method synchronously drains the [_workQueue] and [_idleWorkers], but
+ /// some tasks may not actually start right away if they need to wait for a
+ /// worker to spin up.
+ void _runWorkQueue() {
+ // Bail out conditions, we will continue to call ourselves indefinitely
+ // until one of these is met.
+ if (_workQueue.isEmpty) return;
+ if (_numWorkers == _maxWorkers && _idleWorkers.isEmpty) return;
+ if (_numWorkers > _maxWorkers) {
+ throw new StateError('Internal error, created to many workers. Please '
+ 'file a bug at https://github.com/dart-lang/bazel_worker/issues/new');
+ }
+
+ // At this point we definitely want to run a task, we just need to decide
+ // whether or not we need to start up a new worker.
+ var request = _workQueue.removeFirst();
+ if (_idleWorkers.isNotEmpty) {
+ _runWorker(_idleWorkers.removeLast(), request);
+ } else {
+ // No need to block here, we want to continue to synchronously drain the
+ // work queue.
+ var futureWorker = _spawnWorker();
+ _allWorkers.add(futureWorker);
+ futureWorker.then((worker) {
+ // Somewhat ugly cleanup, we want to replace the `Future<Process>` in
+ // `_allWorkers` with the real `Process` once we have it.
+ _allWorkers.remove(futureWorker);
+ _allWorkers.add(worker);
+
+ // Set up the connection and run the worker.
+ _workerConnections[worker] = new StdDriverConnection.forWorker(worker);
+ _runWorker(worker, request);
+
+ // Clean up things when the worker exits, and retry running the work
+ // queue in case there is more work to be done.
+ worker.exitCode.then((_) {
+ _allWorkers.remove(worker);
+ _runWorkQueue();
+ });
+ });
+ }
+ // Recursively calls itself until one of the bail out conditions are met.
+ _runWorkQueue();
+ }
+
+ /// Sends [request] to [worker].
+ ///
+ /// Once the worker responds then it will be added back to the pool of idle
+ /// workers.
+ Future _runWorker(Process worker, WorkRequest request) async {
+ try {
+ var connection = _workerConnections[worker];
+ connection.writeRequest(request);
+ var response = await connection.readResponse();
+ _responseCompleters[request].complete(response);
+
+ // Do additional work if available.
+ _idleWorkers.add(worker);
+ _runWorkQueue();
+
+ // If the worker wasn't immediately used we might have to many idle
+ // workers now, kill one if necessary.
+ if (_idleWorkers.length > _maxIdleWorkers) {
+ // Note that whenever we spawn a worker we listen for its exit code
+ // and clean it up so we don't need to do that here.
+ var worker = _idleWorkers.removeLast();
+ _allWorkers.remove(worker);
+ worker.kill();
+ }
+ } catch (e, s) {
+ // Note that we don't need to do additional cleanup here on failures. If
+ // the worker dies that is already handled in a generic fashion, we just
+ // need to make sure we complete with a valid response.
+ if (!_responseCompleters[request].isCompleted) {
+ var response = new WorkResponse()
+ ..exitCode = EXIT_CODE_ERROR
+ ..output = 'Error running worker:\n$e\n$s';
+ _responseCompleters[request].complete(response);
+ }
+ }
+ }
+}
+
+final _responseCompleters = new Expando<Completer<WorkResponse>>('response');
+final _workerConnections = new Expando<DriverConnection>('connectin');
diff --git a/lib/src/driver/driver_connection.dart b/lib/src/driver/driver_connection.dart
new file mode 100644
index 0000000..1624b28
--- /dev/null
+++ b/lib/src/driver/driver_connection.dart
@@ -0,0 +1,48 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:io';
+
+import '../async_message_grouper.dart';
+import '../worker_protocol.pb.dart';
+import '../utils.dart';
+
+/// Interface for a [DriverConnection].
+abstract class DriverConnection {
+ /// Reads a [WorkResponse] asynchronously.
+ Future<WorkResponse> readResponse();
+
+ /// Writes a [WorkRequest].
+ void writeRequest(WorkRequest request);
+}
+
+/// Default implementation of [DriverConnection] that works with [Stdin]
+/// and [Stdout].
+class StdDriverConnection implements DriverConnection {
+ final AsyncMessageGrouper _messageGrouper;
+ final StreamSink<List<int>> _outputStream;
+
+ StdDriverConnection(
+ {Stream<List<int>> inputStream, StreamSink<List<int>> outputStream})
+ : _messageGrouper = new AsyncMessageGrouper(inputStream ?? stdin),
+ _outputStream = outputStream ?? stdout;
+
+ factory StdDriverConnection.forWorker(Process worker) =>
+ new StdDriverConnection(
+ inputStream: worker.stdout, outputStream: worker.stdin);
+
+ @override
+ Future<WorkResponse> readResponse() async {
+ var buffer = await _messageGrouper.next;
+ if (buffer == null) return null;
+
+ return new WorkResponse.fromBuffer(buffer);
+ }
+
+ @override
+ void writeRequest(WorkRequest request) {
+ _outputStream.add(protoToDelimitedBuffer(request));
+ }
+}
diff --git a/test/driver_test.dart b/test/driver_test.dart
new file mode 100644
index 0000000..ac4b791
--- /dev/null
+++ b/test/driver_test.dart
@@ -0,0 +1,181 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:collection';
+import 'dart:io';
+
+import 'package:test/test.dart';
+
+import 'package:bazel_worker/bazel_worker.dart';
+import 'package:bazel_worker/driver.dart';
+
+void main() {
+ BazelWorkerDriver driver;
+
+ group('basic driver', () {
+ test('can run a single request', () async {
+ await _doRequests(count: 1);
+ await _doRequests(count: 1);
+ });
+
+ test('can run multiple batches of requests through multiple workers',
+ () async {
+ int maxWorkers = 4;
+ int maxIdleWorkers = 2;
+ driver = new BazelWorkerDriver(MockWorker.spawn,
+ maxWorkers: maxWorkers, maxIdleWorkers: maxIdleWorkers);
+ for (int i = 0; i < 10; i++) {
+ await _doRequests(driver: driver);
+ expect(MockWorker.liveWorkers.length, maxIdleWorkers);
+ // No workers should be killed while there is ongoing work, but they
+ // should be cleaned up once there isn't any more work to do.
+ expect(MockWorker.deadWorkers.length,
+ (maxWorkers - maxIdleWorkers) * (i + 1));
+ }
+ });
+
+ test('can run multiple requests through one worker', () async {
+ int maxWorkers = 1;
+ int maxIdleWorkers = 1;
+ driver = new BazelWorkerDriver(MockWorker.spawn,
+ maxWorkers: maxWorkers, maxIdleWorkers: maxIdleWorkers);
+ for (int i = 0; i < 10; i++) {
+ await _doRequests(driver: driver);
+ expect(MockWorker.liveWorkers.length, 1);
+ expect(MockWorker.deadWorkers.length, 0);
+ }
+ });
+
+ test('can run one request through multiple workers', () async {
+ driver = new BazelWorkerDriver(MockWorker.spawn,
+ maxWorkers: 4, maxIdleWorkers: 4);
+ for (int i = 0; i < 10; i++) {
+ await _doRequests(driver: driver, count: 1);
+ expect(MockWorker.liveWorkers.length, 1);
+ expect(MockWorker.deadWorkers.length, 0);
+ }
+ });
+
+ test('can run with maxIdleWorkers == 0', () async {
+ int maxWorkers = 4;
+ driver = new BazelWorkerDriver(MockWorker.spawn,
+ maxWorkers: maxWorkers, maxIdleWorkers: 0);
+ for (int i = 0; i < 10; i++) {
+ await _doRequests(driver: driver);
+ expect(MockWorker.liveWorkers.length, 0);
+ expect(MockWorker.deadWorkers.length, maxWorkers * (i + 1));
+ }
+ });
+
+ tearDown(() async {
+ await driver?.terminateWorkers();
+ expect(MockWorker.liveWorkers, isEmpty);
+ MockWorker.deadWorkers.clear();
+ });
+ });
+}
+
+/// Runs [count] of fake work requests through [driver], and asserts that they
+/// all completed.
+Future _doRequests({BazelWorkerDriver driver, int count}) async {
+ // If we create a driver, we need to make sure and terminate it.
+ var terminateDriver = driver == null;
+ driver ??= new BazelWorkerDriver(MockWorker.spawn);
+ count ??= 100;
+ terminateDriver ??= true;
+ var requests = new List.generate(count, (_) => new WorkRequest());
+ var responses = new List.generate(count, (_) => new WorkResponse());
+ MockWorker.responseQueue.addAll(responses);
+ var actualResponses = await Future.wait(requests.map(driver.doWork));
+ expect(actualResponses, unorderedEquals(responses));
+ if (terminateDriver) await driver.terminateWorkers();
+}
+
+/// A mock worker loop that returns work responses from the provided list.
+///
+/// Throws if it runs out of responses.
+class MockWorkerLoop extends AsyncWorkerLoop {
+ final Queue<WorkResponse> _responseQueue;
+
+ MockWorkerLoop(this._responseQueue, {AsyncWorkerConnection connection})
+ : super(connection: connection);
+
+ Future<WorkResponse> performRequest(WorkRequest request) async {
+ print('Performing request $request');
+ return _responseQueue.removeFirst();
+ }
+}
+
+/// A mock worker process.
+///
+/// Items in [responseQueue] will be returned in order based on requests.
+///
+/// If there are no items left in [responseQueue] then it will throw.
+class MockWorker implements Process {
+ /// Spawns a new [MockWorker].
+ static Future<MockWorker> spawn() async => new MockWorker();
+
+ /// Worker loop that handles reading requests and responding.
+ AsyncWorkerLoop _workerLoop;
+
+ /// Static queue of pending responses, these are shared by all workers.
+ ///
+ /// If this is empty and a request is received then it will throw.
+ static final responseQueue = new Queue<WorkResponse>();
+
+ /// Static list of all live workers.
+ static final liveWorkers = <MockWorker>[];
+
+ /// Static list of all the dead workers.
+ static final deadWorkers = <MockWorker>[];
+
+ /// Standard constructor, creates the [_workerLoop].
+ MockWorker() {
+ liveWorkers.add(this);
+ _workerLoop = new MockWorkerLoop(responseQueue,
+ connection: new StdAsyncWorkerConnection(
+ inputStream: this._stdinController.stream,
+ outputStream: this._stdoutController.sink))
+ ..run();
+ }
+
+ Future<int> get exitCode => _exitCodeCompleter.future;
+ final _exitCodeCompleter = new Completer<int>();
+
+ @override
+ Stream<List<int>> get stdout => _stdoutController.stream;
+ final _stdoutController = new StreamController<List<int>>();
+
+ @override
+ Stream<List<int>> get stderr => _stderrController.stream;
+ final _stderrController = new StreamController<List<int>>();
+
+ @override
+ IOSink get stdin {
+ _stdin ??= new IOSink(_stdinController.sink);
+ return _stdin;
+ }
+
+ IOSink _stdin;
+ final _stdinController = new StreamController<List<int>>();
+
+ int get pid => throw new UnsupportedError('Not needed.');
+
+ @override
+ bool kill([ProcessSignal = ProcessSignal.SIGTERM, int exitCode = 0]) {
+ if (_killed) return false;
+ () async {
+ await _stdoutController.close();
+ await _stderrController.close();
+ await _stdinController.close();
+ _exitCodeCompleter.complete(exitCode);
+ }();
+ deadWorkers.add(this);
+ liveWorkers.remove(this);
+ return true;
+ }
+
+ bool _killed = false;
+}