Add SyncWorkerLoop and testing helpers
diff --git a/lib/bazel_worker.dart b/lib/bazel_worker.dart
new file mode 100644
index 0000000..9aa790f
--- /dev/null
+++ b/lib/bazel_worker.dart
@@ -0,0 +1,7 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+export 'src/constants.dart';
+export 'src/sync_worker_loop.dart';
+export 'src/worker_protocol.pb.dart';
diff --git a/lib/src/constants.dart b/lib/src/constants.dart
new file mode 100644
index 0000000..30113d3
--- /dev/null
+++ b/lib/src/constants.dart
@@ -0,0 +1,6 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+const int EXIT_CODE_OK = 0;
+const int EXIT_CODE_ERROR = 15;
diff --git a/lib/src/message_grouper_state.dart b/lib/src/message_grouper_state.dart
new file mode 100644
index 0000000..995ce5c
--- /dev/null
+++ b/lib/src/message_grouper_state.dart
@@ -0,0 +1,79 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:typed_data';
+
+/// State held by the [MessageGrouper] while waiting for additional data to
+/// arrive.
+class MessageGrouperState {
+  /// `true` means we are waiting to receive bytes of base-128 encoded length.
+  /// Some bytes of length may have been received already.
+  ///
+  /// `false` means we are waiting to receive more bytes of message data.  Some
+  /// bytes of message data may have been received already.
+  bool waitingForLength = true;
+
+  /// If [waitingForLength] is `true`, the decoded value of the length bytes
+  /// received so far (if any).  If [waitingForLength] is `false`, the decoded
+  /// length that was most recently received.
+  int length = 0;
+
+  /// If [waitingForLength] is `true`, the amount by which the next received
+  /// length byte must be left-shifted; otherwise undefined.
+  int lengthShift = 0;
+
+  /// If [waitingForLength] is `false`, a [Uint8List] which is ready to receive
+  /// message data.  Otherwise null.
+  Uint8List message;
+
+  /// If [waitingForLength] is `false`, the number of message bytes that have
+  /// been received so far.  Otherwise zero.
+  int numMessageBytesReceived;
+
+  MessageGrouperState() {
+    reset();
+  }
+
+  /// Handle one byte at a time.
+  ///
+  /// Returns a [List<int>] of message bytes if [byte] was the last byte in a
+  /// message, otherwise returns [null].
+  List<int> handleInput(int byte) {
+    if (waitingForLength) {
+      length |= (byte & 0x7f) << lengthShift;
+      if ((byte & 0x80) == 0) {
+        waitingForLength = false;
+        message = new Uint8List(length);
+        if (length == 0) {
+          // There is no message data to wait for, so just go ahead and deliver the
+          // empty message.
+          var messageToReturn = message;
+          reset();
+          return messageToReturn;
+        }
+      } else {
+        lengthShift += 7;
+      }
+    } else {
+      message[numMessageBytesReceived] = byte;
+      numMessageBytesReceived++;
+      if (numMessageBytesReceived == length) {
+        var messageToReturn = message;
+        reset();
+        return messageToReturn;
+      }
+    }
+    return null;
+  }
+
+  /// Reset the state so that we are ready to receive the next base-128 encoded
+  /// length.
+  void reset() {
+    waitingForLength = true;
+    length = 0;
+    lengthShift = 0;
+    message = null;
+    numMessageBytesReceived = 0;
+  }
+}
diff --git a/lib/src/sync_message_grouper.dart b/lib/src/sync_message_grouper.dart
new file mode 100644
index 0000000..eca810d
--- /dev/null
+++ b/lib/src/sync_message_grouper.dart
@@ -0,0 +1,39 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:io';
+
+import 'message_grouper_state.dart';
+
+/// Groups stdin input into messages by interpreting it as
+/// base-128 encoded lengths interleaved with raw data.
+///
+/// The base-128 encoding is in little-endian order, with the high bit set on
+/// all bytes but the last.  This was chosen since it's the same as the
+/// base-128 encoding used by protobufs, so it allows a modest amount of code
+/// reuse at the other end of the protocol.
+///
+/// Possible future improvements to consider (should a debugging need arise):
+/// - Put a magic number at the beginning of the stream.
+/// - Use a guard byte between messages to sanity check that the encoder and
+///   decoder agree on the encoding of lengths.
+class SyncMessageGrouper {
+  final _state = new MessageGrouperState();
+  final Stdin _stdin;
+
+  SyncMessageGrouper(this._stdin);
+
+  /// Blocks until the next full message is received, and then returns it.
+  ///
+  /// Returns null at end of file.
+  List<int> get next {
+    List<int> message;
+    while (message == null) {
+      var nextByte = _stdin.readByteSync();
+      if (nextByte == -1) return null;
+      message = _state.handleInput(nextByte);
+    }
+    return message;
+  }
+}
diff --git a/lib/src/sync_worker_loop.dart b/lib/src/sync_worker_loop.dart
new file mode 100644
index 0000000..b1e8363
--- /dev/null
+++ b/lib/src/sync_worker_loop.dart
@@ -0,0 +1,75 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:io';
+
+import 'constants.dart';
+import 'sync_message_grouper.dart';
+import 'utils.dart';
+import 'worker_protocol.pb.dart';
+
+/// Connection between a worker and input / output.
+abstract class SyncWorkerConnection {
+  /// Read a new [WorkRequest]. Returns [null] when there are no more requests.
+  WorkRequest readRequest();
+
+  /// Write the given [response] as bytes to the output.
+  void writeResponse(WorkResponse response);
+}
+
+/// Persistent Bazel worker loop.
+abstract class SyncWorkerLoop {
+
+  final SyncWorkerConnection connection;
+
+  SyncWorkerLoop({SyncWorkerConnection connection})
+      : this.connection = connection ?? new StdSyncWorkerConnection();
+
+  /// Perform a single [WorkRequest], and return a [WorkResponse].
+  WorkResponse performRequest(WorkRequest request);
+
+  /// Run the worker loop. Blocks until [connection#readRequest] returns `null`.
+  void run() {
+    while (true) {
+      WorkResponse response;
+      try {
+        var request = connection.readRequest();
+        if (request == null) break;
+        response = performRequest(request);
+        // In case they forget to set this.
+        response.exitCode ??= EXIT_CODE_OK;
+      } catch (e, s) {
+        response = new WorkResponse()
+          ..exitCode = EXIT_CODE_ERROR
+          ..output = '$e\n$s';
+      }
+
+      connection.writeResponse(response);
+    }
+  }
+}
+
+/// Default implementation of [SyncWorkerConnection] that works with [Stdin] and
+/// [Stdout].
+class StdSyncWorkerConnection implements SyncWorkerConnection {
+  final SyncMessageGrouper _messageGrouper;
+  final Stdout _stdoutStream;
+
+  StdSyncWorkerConnection({Stdin stdinStream, Stdout stdoutStream})
+      : _messageGrouper = new SyncMessageGrouper(stdinStream ?? stdin),
+        _stdoutStream = stdoutStream ?? stdout;
+
+  @override
+  WorkRequest readRequest() {
+    var buffer = _messageGrouper.next;
+    if (buffer == null) return null;
+
+    return new WorkRequest.fromBuffer(buffer);
+  }
+
+  @override
+  void writeResponse(WorkResponse response) {
+    _stdoutStream.add(protoToDelimitedBuffer(response));
+  }
+}
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
new file mode 100644
index 0000000..534f086
--- /dev/null
+++ b/lib/src/utils.dart
@@ -0,0 +1,15 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:protobuf/protobuf.dart';
+
+List<int> protoToDelimitedBuffer(GeneratedMessage message) {
+  var buffer = message.writeToBuffer();
+
+  var writer = new CodedBufferWriter();
+  writer.writeInt32NoTag(buffer.length);
+  writer.writeRawBytes(buffer);
+
+  return writer.toBuffer();
+}
diff --git a/lib/src/worker_protocol.pb.dart b/lib/src/worker_protocol.pb.dart
new file mode 100644
index 0000000..e86fd2b
--- /dev/null
+++ b/lib/src/worker_protocol.pb.dart
@@ -0,0 +1,136 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+///
+//  Generated code. Do not modify.
+///
+library blaze.worker_worker_protocol;
+
+import 'package:protobuf/protobuf.dart';
+
+class Input extends GeneratedMessage {
+  static final BuilderInfo _i = new BuilderInfo('Input')
+    ..a(1, 'path', PbFieldType.OS)
+    ..a(2, 'digest', PbFieldType.OY)
+    ..hasRequiredFields = false
+  ;
+
+  Input() : super();
+  Input.fromBuffer(List<int> i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) : super.fromBuffer(i, r);
+  Input.fromJson(String i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) : super.fromJson(i, r);
+  Input clone() => new Input()..mergeFromMessage(this);
+  BuilderInfo get info_ => _i;
+  static Input create() => new Input();
+  static PbList<Input> createRepeated() => new PbList<Input>();
+  static Input getDefault() {
+    if (_defaultInstance == null) _defaultInstance = new _ReadonlyInput();
+    return _defaultInstance;
+  }
+  static Input _defaultInstance;
+  static void $checkItem(Input v) {
+    if (v is !Input) checkItemFailed(v, 'Input');
+  }
+
+  String get path => $_get(0, 1, '');
+  void set path(String v) { $_setString(0, 1, v); }
+  bool hasPath() => $_has(0, 1);
+  void clearPath() => clearField(1);
+
+  List<int> get digest => $_get(1, 2, null);
+  void set digest(List<int> v) { $_setBytes(1, 2, v); }
+  bool hasDigest() => $_has(1, 2);
+  void clearDigest() => clearField(2);
+}
+
+class _ReadonlyInput extends Input with ReadonlyMessageMixin {}
+
+class WorkRequest extends GeneratedMessage {
+  static final BuilderInfo _i = new BuilderInfo('WorkRequest')
+    ..p(1, 'arguments', PbFieldType.PS)
+    ..pp(2, 'inputs', PbFieldType.PM, Input.$checkItem, Input.create)
+    ..hasRequiredFields = false
+  ;
+
+  WorkRequest() : super();
+  WorkRequest.fromBuffer(List<int> i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) : super.fromBuffer(i, r);
+  WorkRequest.fromJson(String i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) : super.fromJson(i, r);
+  WorkRequest clone() => new WorkRequest()..mergeFromMessage(this);
+  BuilderInfo get info_ => _i;
+  static WorkRequest create() => new WorkRequest();
+  static PbList<WorkRequest> createRepeated() => new PbList<WorkRequest>();
+  static WorkRequest getDefault() {
+    if (_defaultInstance == null) _defaultInstance = new _ReadonlyWorkRequest();
+    return _defaultInstance;
+  }
+  static WorkRequest _defaultInstance;
+  static void $checkItem(WorkRequest v) {
+    if (v is !WorkRequest) checkItemFailed(v, 'WorkRequest');
+  }
+
+  List<String> get arguments => $_get(0, 1, null);
+
+  List<Input> get inputs => $_get(1, 2, null);
+}
+
+class _ReadonlyWorkRequest extends WorkRequest with ReadonlyMessageMixin {}
+
+class WorkResponse extends GeneratedMessage {
+  static final BuilderInfo _i = new BuilderInfo('WorkResponse')
+    ..a(1, 'exitCode', PbFieldType.O3)
+    ..a(2, 'output', PbFieldType.OS)
+    ..hasRequiredFields = false
+  ;
+
+  WorkResponse() : super();
+  WorkResponse.fromBuffer(List<int> i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) : super.fromBuffer(i, r);
+  WorkResponse.fromJson(String i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) : super.fromJson(i, r);
+  WorkResponse clone() => new WorkResponse()..mergeFromMessage(this);
+  BuilderInfo get info_ => _i;
+  static WorkResponse create() => new WorkResponse();
+  static PbList<WorkResponse> createRepeated() => new PbList<WorkResponse>();
+  static WorkResponse getDefault() {
+    if (_defaultInstance == null) _defaultInstance = new _ReadonlyWorkResponse();
+    return _defaultInstance;
+  }
+  static WorkResponse _defaultInstance;
+  static void $checkItem(WorkResponse v) {
+    if (v is !WorkResponse) checkItemFailed(v, 'WorkResponse');
+  }
+
+  int get exitCode => $_get(0, 1, 0);
+  void set exitCode(int v) { $_setUnsignedInt32(0, 1, v); }
+  bool hasExitCode() => $_has(0, 1);
+  void clearExitCode() => clearField(1);
+
+  String get output => $_get(1, 2, '');
+  void set output(String v) { $_setString(1, 2, v); }
+  bool hasOutput() => $_has(1, 2);
+  void clearOutput() => clearField(2);
+}
+
+class _ReadonlyWorkResponse extends WorkResponse with ReadonlyMessageMixin {}
+
+const Input$json = const {
+  '1': 'Input',
+  '2': const [
+    const {'1': 'path', '3': 1, '4': 1, '5': 9},
+    const {'1': 'digest', '3': 2, '4': 1, '5': 12},
+  ],
+};
+
+const WorkRequest$json = const {
+  '1': 'WorkRequest',
+  '2': const [
+    const {'1': 'arguments', '3': 1, '4': 3, '5': 9},
+    const {'1': 'inputs', '3': 2, '4': 3, '5': 11, '6': '.blaze.worker.Input'},
+  ],
+};
+
+const WorkResponse$json = const {
+  '1': 'WorkResponse',
+  '2': const [
+    const {'1': 'exit_code', '3': 1, '4': 1, '5': 5},
+    const {'1': 'output', '3': 2, '4': 1, '5': 9},
+  ],
+};
diff --git a/lib/testing.dart b/lib/testing.dart
new file mode 100644
index 0000000..a201947
--- /dev/null
+++ b/lib/testing.dart
@@ -0,0 +1,85 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:collection';
+import 'dart:io';
+
+import 'package:bazel_worker/bazel_worker.dart';
+
+export 'src/utils.dart' show protoToDelimitedBuffer;
+
+/// A [Stdin] mock object.
+class TestStdinStream implements Stdin {
+  final Queue<int> pendingBytes = new Queue<int>();
+
+  // Adds all the input bytes to this stream.
+  void addInputBytes(List<int> bytes) {
+    pendingBytes.addAll(bytes);
+  }
+
+  @override
+  int readByteSync() {
+    if (pendingBytes.isEmpty) {
+      return -1;
+    } else {
+      return pendingBytes.removeFirst();
+    }
+  }
+
+  @override
+  void noSuchMethod(Invocation invocation) {
+    throw new StateError('Unexpected invocation $invocation');
+  }
+}
+
+/// A [Stdout] mock object.
+class TestStdoutStream implements Stdout {
+  final List<List<int>> writes = <List<int>>[];
+
+  @override
+  void add(List<int> bytes) {
+    writes.add(bytes);
+  }
+
+  @override
+  void noSuchMethod(Invocation invocation) {
+    throw new StateError('Unexpected invocation $invocation');
+  }
+}
+
+/// A [StdWorkerConnection] which records its responses.
+class TestSyncWorkerConnection extends StdSyncWorkerConnection {
+  final List<WorkResponse> responses = <WorkResponse>[];
+
+  TestSyncWorkerConnection(Stdin stdinStream, Stdout stdoutStream)
+      : super(stdinStream: stdinStream, stdoutStream: stdoutStream);
+
+  @override
+  void writeResponse(WorkResponse response) {
+    super.writeResponse(response);
+    responses.add(response);
+  }
+}
+
+/// A [SyncWorkerLoop] for testing.
+class TestSyncWorkerLoop extends SyncWorkerLoop  {
+  final List<WorkRequest> requests = <WorkRequest>[];
+  final Queue<WorkResponse> _responses = new Queue<WorkResponse>();
+
+  TestSyncWorkerLoop(SyncWorkerConnection connection)
+      : super(connection: connection);
+
+  @override
+  WorkResponse performRequest(WorkRequest request) {
+    requests.add(request);
+    return _responses.removeFirst();
+  }
+
+  /// Adds [response] to the queue. These will be returned from
+  /// [performResponse] in the order they are added, otherwise it will throw
+  /// if the queue is empty.
+  void enqueueResponse(WorkResponse response) {
+    _responses.addLast(response);
+  }
+}
diff --git a/test/sync_message_grouper_test.dart b/test/sync_message_grouper_test.dart
new file mode 100644
index 0000000..19c8fd5
--- /dev/null
+++ b/test/sync_message_grouper_test.dart
@@ -0,0 +1,123 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:test/test.dart';
+
+import 'package:bazel_worker/src/sync_message_grouper.dart';
+import 'package:bazel_worker/testing.dart';
+
+void main() {
+  SyncMessageGrouper messageGrouper;
+  TestStdinStream stdinStream;
+
+  setUp(() {
+    stdinStream = new TestStdinStream();
+    messageGrouper = new SyncMessageGrouper(stdinStream);
+  });
+
+  group('message_grouper', () {
+    /// Check that if the message grouper produces the [expectedOutput] in
+    /// response to the corresponding [input].
+    void check(List<int> input, List<List<int>> expectedOutput) {
+      stdinStream.addInputBytes(input);
+      for (var chunk in expectedOutput) {
+        expect(messageGrouper.next, equals(chunk));
+      }
+    }
+
+    /// Make a simple message having the given [length]
+    List<int> makeMessage(int length) {
+      var result = <int>[];
+      for (int i = 0; i < length; i++) {
+        result.add(i & 0xff);
+      }
+      return result;
+    }
+
+    test('Empty message', () {
+      check([0], [[]]);
+    });
+
+    test('Short message', () {
+      check([
+        5,
+        10,
+        20,
+        30,
+        40,
+        50
+      ], [
+        [10, 20, 30, 40, 50]
+      ]);
+    });
+
+    test('Message with 2-byte length', () {
+      var len = 0x155;
+      var msg = makeMessage(len);
+      var encodedLen = [0xd5, 0x02];
+      check([]..addAll(encodedLen)..addAll(msg), [msg]);
+    });
+
+    test('Message with 3-byte length', () {
+      var len = 0x4103;
+      var msg = makeMessage(len);
+      var encodedLen = [0x83, 0x82, 0x01];
+      check([]..addAll(encodedLen)..addAll(msg), [msg]);
+    });
+
+    test('Multiple messages', () {
+      check([
+        2,
+        10,
+        20,
+        2,
+        30,
+        40
+      ], [
+        [10, 20],
+        [30, 40]
+      ]);
+    });
+
+    test('Empty message at start', () {
+      check([
+        0,
+        2,
+        10,
+        20
+      ], [
+        [],
+        [10, 20]
+      ]);
+    });
+
+    test('Empty message at end', () {
+      check([
+        2,
+        10,
+        20,
+        0
+      ], [
+        [10, 20],
+        []
+      ]);
+    });
+
+    test('Empty message in the middle', () {
+      check([
+        2,
+        10,
+        20,
+        0,
+        2,
+        30,
+        40
+      ], [
+        [10, 20],
+        [],
+        [30, 40]
+      ]);
+    });
+  });
+}
diff --git a/test/sync_worker_loop_test.dart b/test/sync_worker_loop_test.dart
new file mode 100644
index 0000000..ce323c8
--- /dev/null
+++ b/test/sync_worker_loop_test.dart
@@ -0,0 +1,61 @@
+// Copyright (c) 2016, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:test/test.dart';
+
+import 'package:bazel_worker/bazel_worker.dart';
+import 'package:bazel_worker/testing.dart';
+
+void main() {
+  TestStdinStream stdinStream;
+  TestStdoutStream stdoutStream;
+  TestSyncWorkerConnection connection;
+  TestSyncWorkerLoop workerLoop;
+
+  setUp(() {
+    stdinStream = new TestStdinStream();
+    stdoutStream = new TestStdoutStream();
+    connection = new TestSyncWorkerConnection(stdinStream, stdoutStream);
+    workerLoop = new TestSyncWorkerLoop(connection);
+  });
+
+  test('basic', () {
+    var request = new WorkRequest();
+    request.arguments.addAll(['--foo=bar']);
+    stdinStream.addInputBytes(protoToDelimitedBuffer(request));
+
+    var response = new WorkResponse()..output = 'Hello World';
+    workerLoop.enqueueResponse(response);
+    workerLoop.run();
+
+    expect(connection.responses, hasLength(1));
+    expect(connection.responses[0], response);
+
+    // Check that a serialized version was written to std out.
+    expect(stdoutStream.writes, hasLength(1));
+    expect(stdoutStream.writes[0], protoToDelimitedBuffer(response));
+  });
+
+  test('Exception in the worker.', () {
+    var request = new WorkRequest();
+    request.arguments.addAll(['--foo=bar']);
+    stdinStream.addInputBytes(protoToDelimitedBuffer(request));
+
+    // Didn't enqueue a response, so this will throw inside of `performRequest`.
+    workerLoop.run();
+
+    expect(connection.responses, hasLength(1));
+    var response = connection.responses[0];
+    expect(response.exitCode, EXIT_CODE_ERROR);
+
+    // Check that a serialized version was written to std out.
+    expect(stdoutStream.writes, hasLength(1));
+    expect(stdoutStream.writes[0], protoToDelimitedBuffer(response));
+  });
+
+  test('Stops at EOF', () {
+    stdinStream.addInputBytes([-1]);
+    workerLoop.run();
+  });
+}