Initial version of BazelWorkerDriver
diff --git a/lib/driver.dart b/lib/driver.dart
new file mode 100644
index 0000000..7453491
--- /dev/null
+++ b/lib/driver.dart
@@ -0,0 +1,9 @@
+// Copyright (c) 2017, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+export 'src/driver/driver.dart';
+export 'src/driver/driver_connection.dart';
+export 'src/constants.dart';
+export 'src/message_grouper.dart';
+export 'src/worker_protocol.pb.dart';
diff --git a/lib/src/driver/driver.dart b/lib/src/driver/driver.dart
new file mode 100644
index 0000000..c2465e0
--- /dev/null
+++ b/lib/src/driver/driver.dart
@@ -0,0 +1,151 @@
+// Copyright (c) 2017, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:collection';
+import 'dart:io';
+
+import '../constants.dart';
+import '../worker_protocol.pb.dart';
+import 'driver_connection.dart';
+
+typedef Future<Process> SpawnWorker();
+
+/// A driver for talking to a bazel worker.
+///
+/// This allows you to use any binary that supports the bazel worker protocol in
+/// the same way that bazel would, but from another dart process instead.
+class BazelWorkerDriver {
+  /// The maximum number of idle workers at any given time.
+  final int _maxIdleWorkers;
+
+  /// The maximum number of concurrent workers to run at any given time.
+  final int _maxWorkers;
+
+  /// The number of currently active workers.
+  int get _numWorkers => _allWorkers.length;
+
+  /// Idle worker processes.
+  final _idleWorkers = <Process>[];
+
+  /// All workers, even the ones that are in the process of being spawned.
+  final _allWorkers = <FutureOr<Process>>[];
+
+  /// Work requests that haven't been started yet.
+  final _workQueue = new Queue<WorkRequest>();
+
+  /// Factory method that spawns a worker process.
+  final SpawnWorker _spawnWorker;
+
+  BazelWorkerDriver(this._spawnWorker, {int maxIdleWorkers, int maxWorkers})
+      : this._maxIdleWorkers = maxIdleWorkers ?? 4,
+        this._maxWorkers = maxWorkers ?? 4;
+
+  Future<WorkResponse> doWork(WorkRequest request) {
+    var responseCompleter = new Completer<WorkResponse>();
+    _responseCompleters[request] = responseCompleter;
+    _workQueue.add(request);
+    _runWorkQueue();
+    return responseCompleter.future;
+  }
+
+  /// Calls `kill` on all worker processes.
+  Future terminateWorkers() async {
+    var existing = new List.from(_allWorkers);
+    _allWorkers.clear();
+    await Future.wait(existing.map((futureWorker) async {
+      (await futureWorker).kill();
+    }));
+  }
+
+  /// Runs as many items in [_workQueue] as possible given the number of
+  /// available workers.
+  ///
+  /// Will spawn additional workers until [_maxWorkers] has been reached.
+  ///
+  /// This method synchronously drains the [_workQueue] and [_idleWorkers], but
+  /// some tasks may not actually start right away if they need to wait for a
+  /// worker to spin up.
+  void _runWorkQueue() {
+    // Bail out conditions, we will continue to call ourselves indefinitely
+    // until one of these is met.
+    if (_workQueue.isEmpty) return;
+    if (_numWorkers == _maxWorkers && _idleWorkers.isEmpty) return;
+    if (_numWorkers > _maxWorkers) {
+      throw new StateError('Internal error, created to many workers. Please '
+          'file a bug at https://github.com/dart-lang/bazel_worker/issues/new');
+    }
+
+    // At this point we definitely want to run a task, we just need to decide
+    // whether or not we need to start up a new worker.
+    var request = _workQueue.removeFirst();
+    if (_idleWorkers.isNotEmpty) {
+      _runWorker(_idleWorkers.removeLast(), request);
+    } else {
+      // No need to block here, we want to continue to synchronously drain the
+      // work queue.
+      var futureWorker = _spawnWorker();
+      _allWorkers.add(futureWorker);
+      futureWorker.then((worker) {
+        // Somewhat ugly cleanup, we want to replace the `Future<Process>` in
+        // `_allWorkers` with the real `Process` once we have it.
+        _allWorkers.remove(futureWorker);
+        _allWorkers.add(worker);
+
+        // Set up the connection and run the worker.
+        _workerConnections[worker] = new StdDriverConnection.forWorker(worker);
+        _runWorker(worker, request);
+
+        // Clean up things when the worker exits, and retry running the work
+        // queue in case there is more work to be done.
+        worker.exitCode.then((_) {
+          _allWorkers.remove(worker);
+          _runWorkQueue();
+        });
+      });
+    }
+    // Recursively calls itself until one of the bail out conditions are met.
+    _runWorkQueue();
+  }
+
+  /// Sends [request] to [worker].
+  ///
+  /// Once the worker responds then it will be added back to the pool of idle
+  /// workers.
+  Future _runWorker(Process worker, WorkRequest request) async {
+    try {
+      var connection = _workerConnections[worker];
+      connection.writeRequest(request);
+      var response = await connection.readResponse();
+      _responseCompleters[request].complete(response);
+
+      // Do additional work if available.
+      _idleWorkers.add(worker);
+      _runWorkQueue();
+
+      // If the worker wasn't immediately used we might have to many idle
+      // workers now, kill one if necessary.
+      if (_idleWorkers.length > _maxIdleWorkers) {
+        // Note that whenever we spawn a worker we listen for its exit code
+        // and clean it up so we don't need to do that here.
+        var worker = _idleWorkers.removeLast();
+        _allWorkers.remove(worker);
+        worker.kill();
+      }
+    } catch (e, s) {
+      // Note that we don't need to do additional cleanup here on failures. If
+      // the worker dies that is already handled in a generic fashion, we just
+      // need to make sure we complete with a valid response.
+      if (!_responseCompleters[request].isCompleted) {
+        var response = new WorkResponse()
+          ..exitCode = EXIT_CODE_ERROR
+          ..output = 'Error running worker:\n$e\n$s';
+        _responseCompleters[request].complete(response);
+      }
+    }
+  }
+}
+
+final _responseCompleters = new Expando<Completer<WorkResponse>>('response');
+final _workerConnections = new Expando<DriverConnection>('connectin');
diff --git a/lib/src/driver/driver_connection.dart b/lib/src/driver/driver_connection.dart
new file mode 100644
index 0000000..1624b28
--- /dev/null
+++ b/lib/src/driver/driver_connection.dart
@@ -0,0 +1,48 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:io';
+
+import '../async_message_grouper.dart';
+import '../worker_protocol.pb.dart';
+import '../utils.dart';
+
+/// Interface for a [DriverConnection].
+abstract class DriverConnection {
+  /// Reads a [WorkResponse] asynchronously.
+  Future<WorkResponse> readResponse();
+
+  /// Writes a [WorkRequest].
+  void writeRequest(WorkRequest request);
+}
+
+/// Default implementation of [DriverConnection] that works with [Stdin]
+/// and [Stdout].
+class StdDriverConnection implements DriverConnection {
+  final AsyncMessageGrouper _messageGrouper;
+  final StreamSink<List<int>> _outputStream;
+
+  StdDriverConnection(
+      {Stream<List<int>> inputStream, StreamSink<List<int>> outputStream})
+      : _messageGrouper = new AsyncMessageGrouper(inputStream ?? stdin),
+        _outputStream = outputStream ?? stdout;
+
+  factory StdDriverConnection.forWorker(Process worker) =>
+      new StdDriverConnection(
+          inputStream: worker.stdout, outputStream: worker.stdin);
+
+  @override
+  Future<WorkResponse> readResponse() async {
+    var buffer = await _messageGrouper.next;
+    if (buffer == null) return null;
+
+    return new WorkResponse.fromBuffer(buffer);
+  }
+
+  @override
+  void writeRequest(WorkRequest request) {
+    _outputStream.add(protoToDelimitedBuffer(request));
+  }
+}
diff --git a/test/driver_test.dart b/test/driver_test.dart
new file mode 100644
index 0000000..ac4b791
--- /dev/null
+++ b/test/driver_test.dart
@@ -0,0 +1,181 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:collection';
+import 'dart:io';
+
+import 'package:test/test.dart';
+
+import 'package:bazel_worker/bazel_worker.dart';
+import 'package:bazel_worker/driver.dart';
+
+void main() {
+  BazelWorkerDriver driver;
+
+  group('basic driver', () {
+    test('can run a single request', () async {
+      await _doRequests(count: 1);
+      await _doRequests(count: 1);
+    });
+
+    test('can run multiple batches of requests through multiple workers',
+        () async {
+      int maxWorkers = 4;
+      int maxIdleWorkers = 2;
+      driver = new BazelWorkerDriver(MockWorker.spawn,
+          maxWorkers: maxWorkers, maxIdleWorkers: maxIdleWorkers);
+      for (int i = 0; i < 10; i++) {
+        await _doRequests(driver: driver);
+        expect(MockWorker.liveWorkers.length, maxIdleWorkers);
+        // No workers should be killed while there is ongoing work, but they
+        // should be cleaned up once there isn't any more work to do.
+        expect(MockWorker.deadWorkers.length,
+            (maxWorkers - maxIdleWorkers) * (i + 1));
+      }
+    });
+
+    test('can run multiple requests through one worker', () async {
+      int maxWorkers = 1;
+      int maxIdleWorkers = 1;
+      driver = new BazelWorkerDriver(MockWorker.spawn,
+          maxWorkers: maxWorkers, maxIdleWorkers: maxIdleWorkers);
+      for (int i = 0; i < 10; i++) {
+        await _doRequests(driver: driver);
+        expect(MockWorker.liveWorkers.length, 1);
+        expect(MockWorker.deadWorkers.length, 0);
+      }
+    });
+
+    test('can run one request through multiple workers', () async {
+      driver = new BazelWorkerDriver(MockWorker.spawn,
+          maxWorkers: 4, maxIdleWorkers: 4);
+      for (int i = 0; i < 10; i++) {
+        await _doRequests(driver: driver, count: 1);
+        expect(MockWorker.liveWorkers.length, 1);
+        expect(MockWorker.deadWorkers.length, 0);
+      }
+    });
+
+    test('can run with maxIdleWorkers == 0', () async {
+      int maxWorkers = 4;
+      driver = new BazelWorkerDriver(MockWorker.spawn,
+          maxWorkers: maxWorkers, maxIdleWorkers: 0);
+      for (int i = 0; i < 10; i++) {
+        await _doRequests(driver: driver);
+        expect(MockWorker.liveWorkers.length, 0);
+        expect(MockWorker.deadWorkers.length, maxWorkers * (i + 1));
+      }
+    });
+
+    tearDown(() async {
+      await driver?.terminateWorkers();
+      expect(MockWorker.liveWorkers, isEmpty);
+      MockWorker.deadWorkers.clear();
+    });
+  });
+}
+
+/// Runs [count] of fake work requests through [driver], and asserts that they
+/// all completed.
+Future _doRequests({BazelWorkerDriver driver, int count}) async {
+  // If we create a driver, we need to make sure and terminate it.
+  var terminateDriver = driver == null;
+  driver ??= new BazelWorkerDriver(MockWorker.spawn);
+  count ??= 100;
+  terminateDriver ??= true;
+  var requests = new List.generate(count, (_) => new WorkRequest());
+  var responses = new List.generate(count, (_) => new WorkResponse());
+  MockWorker.responseQueue.addAll(responses);
+  var actualResponses = await Future.wait(requests.map(driver.doWork));
+  expect(actualResponses, unorderedEquals(responses));
+  if (terminateDriver) await driver.terminateWorkers();
+}
+
+/// A mock worker loop that returns work responses from the provided list.
+///
+/// Throws if it runs out of responses.
+class MockWorkerLoop extends AsyncWorkerLoop {
+  final Queue<WorkResponse> _responseQueue;
+
+  MockWorkerLoop(this._responseQueue, {AsyncWorkerConnection connection})
+      : super(connection: connection);
+
+  Future<WorkResponse> performRequest(WorkRequest request) async {
+    print('Performing request $request');
+    return _responseQueue.removeFirst();
+  }
+}
+
+/// A mock worker process.
+///
+/// Items in [responseQueue] will be returned in order based on requests.
+///
+/// If there are no items left in [responseQueue] then it will throw.
+class MockWorker implements Process {
+  /// Spawns a new [MockWorker].
+  static Future<MockWorker> spawn() async => new MockWorker();
+
+  /// Worker loop that handles reading requests and responding.
+  AsyncWorkerLoop _workerLoop;
+
+  /// Static queue of pending responses, these are shared by all workers.
+  ///
+  /// If this is empty and a request is received then it will throw.
+  static final responseQueue = new Queue<WorkResponse>();
+
+  /// Static list of all live workers.
+  static final liveWorkers = <MockWorker>[];
+
+  /// Static list of all the dead workers.
+  static final deadWorkers = <MockWorker>[];
+
+  /// Standard constructor, creates the [_workerLoop].
+  MockWorker() {
+    liveWorkers.add(this);
+    _workerLoop = new MockWorkerLoop(responseQueue,
+        connection: new StdAsyncWorkerConnection(
+            inputStream: this._stdinController.stream,
+            outputStream: this._stdoutController.sink))
+      ..run();
+  }
+
+  Future<int> get exitCode => _exitCodeCompleter.future;
+  final _exitCodeCompleter = new Completer<int>();
+
+  @override
+  Stream<List<int>> get stdout => _stdoutController.stream;
+  final _stdoutController = new StreamController<List<int>>();
+
+  @override
+  Stream<List<int>> get stderr => _stderrController.stream;
+  final _stderrController = new StreamController<List<int>>();
+
+  @override
+  IOSink get stdin {
+    _stdin ??= new IOSink(_stdinController.sink);
+    return _stdin;
+  }
+
+  IOSink _stdin;
+  final _stdinController = new StreamController<List<int>>();
+
+  int get pid => throw new UnsupportedError('Not needed.');
+
+  @override
+  bool kill([ProcessSignal = ProcessSignal.SIGTERM, int exitCode = 0]) {
+    if (_killed) return false;
+    () async {
+      await _stdoutController.close();
+      await _stderrController.close();
+      await _stdinController.close();
+      _exitCodeCompleter.complete(exitCode);
+    }();
+    deadWorkers.add(this);
+    liveWorkers.remove(this);
+    return true;
+  }
+
+  bool _killed = false;
+}