Add SyncWorkerLoop and testing helpers
diff --git a/lib/bazel_worker.dart b/lib/bazel_worker.dart
new file mode 100644
index 0000000..9aa790f
--- /dev/null
+++ b/lib/bazel_worker.dart
@@ -0,0 +1,7 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+export 'src/constants.dart';
+export 'src/sync_worker_loop.dart';
+export 'src/worker_protocol.pb.dart';
diff --git a/lib/src/constants.dart b/lib/src/constants.dart
new file mode 100644
index 0000000..30113d3
--- /dev/null
+++ b/lib/src/constants.dart
@@ -0,0 +1,6 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+const int EXIT_CODE_OK = 0;
+const int EXIT_CODE_ERROR = 15;
diff --git a/lib/src/message_grouper_state.dart b/lib/src/message_grouper_state.dart
new file mode 100644
index 0000000..995ce5c
--- /dev/null
+++ b/lib/src/message_grouper_state.dart
@@ -0,0 +1,79 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:typed_data';
+
+/// State held by the [MessageGrouper] while waiting for additional data to
+/// arrive.
+class MessageGrouperState {
+ /// `true` means we are waiting to receive bytes of base-128 encoded length.
+ /// Some bytes of length may have been received already.
+ ///
+ /// `false` means we are waiting to receive more bytes of message data. Some
+ /// bytes of message data may have been received already.
+ bool waitingForLength = true;
+
+ /// If [waitingForLength] is `true`, the decoded value of the length bytes
+ /// received so far (if any). If [waitingForLength] is `false`, the decoded
+ /// length that was most recently received.
+ int length = 0;
+
+ /// If [waitingForLength] is `true`, the amount by which the next received
+ /// length byte must be left-shifted; otherwise undefined.
+ int lengthShift = 0;
+
+ /// If [waitingForLength] is `false`, a [Uint8List] which is ready to receive
+ /// message data. Otherwise null.
+ Uint8List message;
+
+ /// If [waitingForLength] is `false`, the number of message bytes that have
+ /// been received so far. Otherwise zero.
+ int numMessageBytesReceived;
+
+ MessageGrouperState() {
+ reset();
+ }
+
+ /// Handle one byte at a time.
+ ///
+ /// Returns a [List<int>] of message bytes if [byte] was the last byte in a
+ /// message, otherwise returns [null].
+ List<int> handleInput(int byte) {
+ if (waitingForLength) {
+ length |= (byte & 0x7f) << lengthShift;
+ if ((byte & 0x80) == 0) {
+ waitingForLength = false;
+ message = new Uint8List(length);
+ if (length == 0) {
+ // There is no message data to wait for, so just go ahead and deliver the
+ // empty message.
+ var messageToReturn = message;
+ reset();
+ return messageToReturn;
+ }
+ } else {
+ lengthShift += 7;
+ }
+ } else {
+ message[numMessageBytesReceived] = byte;
+ numMessageBytesReceived++;
+ if (numMessageBytesReceived == length) {
+ var messageToReturn = message;
+ reset();
+ return messageToReturn;
+ }
+ }
+ return null;
+ }
+
+ /// Reset the state so that we are ready to receive the next base-128 encoded
+ /// length.
+ void reset() {
+ waitingForLength = true;
+ length = 0;
+ lengthShift = 0;
+ message = null;
+ numMessageBytesReceived = 0;
+ }
+}
diff --git a/lib/src/sync_message_grouper.dart b/lib/src/sync_message_grouper.dart
new file mode 100644
index 0000000..eca810d
--- /dev/null
+++ b/lib/src/sync_message_grouper.dart
@@ -0,0 +1,39 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:io';
+
+import 'message_grouper_state.dart';
+
+/// Groups stdin input into messages by interpreting it as
+/// base-128 encoded lengths interleaved with raw data.
+///
+/// The base-128 encoding is in little-endian order, with the high bit set on
+/// all bytes but the last. This was chosen since it's the same as the
+/// base-128 encoding used by protobufs, so it allows a modest amount of code
+/// reuse at the other end of the protocol.
+///
+/// Possible future improvements to consider (should a debugging need arise):
+/// - Put a magic number at the beginning of the stream.
+/// - Use a guard byte between messages to sanity check that the encoder and
+/// decoder agree on the encoding of lengths.
+class SyncMessageGrouper {
+ final _state = new MessageGrouperState();
+ final Stdin _stdin;
+
+ SyncMessageGrouper(this._stdin);
+
+ /// Blocks until the next full message is received, and then returns it.
+ ///
+ /// Returns null at end of file.
+ List<int> get next {
+ List<int> message;
+ while (message == null) {
+ var nextByte = _stdin.readByteSync();
+ if (nextByte == -1) return null;
+ message = _state.handleInput(nextByte);
+ }
+ return message;
+ }
+}
diff --git a/lib/src/sync_worker_loop.dart b/lib/src/sync_worker_loop.dart
new file mode 100644
index 0000000..b1e8363
--- /dev/null
+++ b/lib/src/sync_worker_loop.dart
@@ -0,0 +1,75 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:io';
+
+import 'constants.dart';
+import 'sync_message_grouper.dart';
+import 'utils.dart';
+import 'worker_protocol.pb.dart';
+
+/// Connection between a worker and input / output.
+abstract class SyncWorkerConnection {
+ /// Read a new [WorkRequest]. Returns [null] when there are no more requests.
+ WorkRequest readRequest();
+
+ /// Write the given [response] as bytes to the output.
+ void writeResponse(WorkResponse response);
+}
+
+/// Persistent Bazel worker loop.
+abstract class SyncWorkerLoop {
+
+ final SyncWorkerConnection connection;
+
+ SyncWorkerLoop({SyncWorkerConnection connection})
+ : this.connection = connection ?? new StdSyncWorkerConnection();
+
+ /// Perform a single [WorkRequest], and return a [WorkResponse].
+ WorkResponse performRequest(WorkRequest request);
+
+ /// Run the worker loop. Blocks until [connection#readRequest] returns `null`.
+ void run() {
+ while (true) {
+ WorkResponse response;
+ try {
+ var request = connection.readRequest();
+ if (request == null) break;
+ response = performRequest(request);
+ // In case they forget to set this.
+ response.exitCode ??= EXIT_CODE_OK;
+ } catch (e, s) {
+ response = new WorkResponse()
+ ..exitCode = EXIT_CODE_ERROR
+ ..output = '$e\n$s';
+ }
+
+ connection.writeResponse(response);
+ }
+ }
+}
+
+/// Default implementation of [SyncWorkerConnection] that works with [Stdin] and
+/// [Stdout].
+class StdSyncWorkerConnection implements SyncWorkerConnection {
+ final SyncMessageGrouper _messageGrouper;
+ final Stdout _stdoutStream;
+
+ StdSyncWorkerConnection({Stdin stdinStream, Stdout stdoutStream})
+ : _messageGrouper = new SyncMessageGrouper(stdinStream ?? stdin),
+ _stdoutStream = stdoutStream ?? stdout;
+
+ @override
+ WorkRequest readRequest() {
+ var buffer = _messageGrouper.next;
+ if (buffer == null) return null;
+
+ return new WorkRequest.fromBuffer(buffer);
+ }
+
+ @override
+ void writeResponse(WorkResponse response) {
+ _stdoutStream.add(protoToDelimitedBuffer(response));
+ }
+}
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
new file mode 100644
index 0000000..534f086
--- /dev/null
+++ b/lib/src/utils.dart
@@ -0,0 +1,15 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:protobuf/protobuf.dart';
+
+List<int> protoToDelimitedBuffer(GeneratedMessage message) {
+ var buffer = message.writeToBuffer();
+
+ var writer = new CodedBufferWriter();
+ writer.writeInt32NoTag(buffer.length);
+ writer.writeRawBytes(buffer);
+
+ return writer.toBuffer();
+}
diff --git a/lib/src/worker_protocol.pb.dart b/lib/src/worker_protocol.pb.dart
new file mode 100644
index 0000000..e86fd2b
--- /dev/null
+++ b/lib/src/worker_protocol.pb.dart
@@ -0,0 +1,136 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+///
+// Generated code. Do not modify.
+///
+library blaze.worker_worker_protocol;
+
+import 'package:protobuf/protobuf.dart';
+
+class Input extends GeneratedMessage {
+ static final BuilderInfo _i = new BuilderInfo('Input')
+ ..a(1, 'path', PbFieldType.OS)
+ ..a(2, 'digest', PbFieldType.OY)
+ ..hasRequiredFields = false
+ ;
+
+ Input() : super();
+ Input.fromBuffer(List<int> i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) : super.fromBuffer(i, r);
+ Input.fromJson(String i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) : super.fromJson(i, r);
+ Input clone() => new Input()..mergeFromMessage(this);
+ BuilderInfo get info_ => _i;
+ static Input create() => new Input();
+ static PbList<Input> createRepeated() => new PbList<Input>();
+ static Input getDefault() {
+ if (_defaultInstance == null) _defaultInstance = new _ReadonlyInput();
+ return _defaultInstance;
+ }
+ static Input _defaultInstance;
+ static void $checkItem(Input v) {
+ if (v is !Input) checkItemFailed(v, 'Input');
+ }
+
+ String get path => $_get(0, 1, '');
+ void set path(String v) { $_setString(0, 1, v); }
+ bool hasPath() => $_has(0, 1);
+ void clearPath() => clearField(1);
+
+ List<int> get digest => $_get(1, 2, null);
+ void set digest(List<int> v) { $_setBytes(1, 2, v); }
+ bool hasDigest() => $_has(1, 2);
+ void clearDigest() => clearField(2);
+}
+
+class _ReadonlyInput extends Input with ReadonlyMessageMixin {}
+
+class WorkRequest extends GeneratedMessage {
+ static final BuilderInfo _i = new BuilderInfo('WorkRequest')
+ ..p(1, 'arguments', PbFieldType.PS)
+ ..pp(2, 'inputs', PbFieldType.PM, Input.$checkItem, Input.create)
+ ..hasRequiredFields = false
+ ;
+
+ WorkRequest() : super();
+ WorkRequest.fromBuffer(List<int> i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) : super.fromBuffer(i, r);
+ WorkRequest.fromJson(String i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) : super.fromJson(i, r);
+ WorkRequest clone() => new WorkRequest()..mergeFromMessage(this);
+ BuilderInfo get info_ => _i;
+ static WorkRequest create() => new WorkRequest();
+ static PbList<WorkRequest> createRepeated() => new PbList<WorkRequest>();
+ static WorkRequest getDefault() {
+ if (_defaultInstance == null) _defaultInstance = new _ReadonlyWorkRequest();
+ return _defaultInstance;
+ }
+ static WorkRequest _defaultInstance;
+ static void $checkItem(WorkRequest v) {
+ if (v is !WorkRequest) checkItemFailed(v, 'WorkRequest');
+ }
+
+ List<String> get arguments => $_get(0, 1, null);
+
+ List<Input> get inputs => $_get(1, 2, null);
+}
+
+class _ReadonlyWorkRequest extends WorkRequest with ReadonlyMessageMixin {}
+
+class WorkResponse extends GeneratedMessage {
+ static final BuilderInfo _i = new BuilderInfo('WorkResponse')
+ ..a(1, 'exitCode', PbFieldType.O3)
+ ..a(2, 'output', PbFieldType.OS)
+ ..hasRequiredFields = false
+ ;
+
+ WorkResponse() : super();
+ WorkResponse.fromBuffer(List<int> i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) : super.fromBuffer(i, r);
+ WorkResponse.fromJson(String i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) : super.fromJson(i, r);
+ WorkResponse clone() => new WorkResponse()..mergeFromMessage(this);
+ BuilderInfo get info_ => _i;
+ static WorkResponse create() => new WorkResponse();
+ static PbList<WorkResponse> createRepeated() => new PbList<WorkResponse>();
+ static WorkResponse getDefault() {
+ if (_defaultInstance == null) _defaultInstance = new _ReadonlyWorkResponse();
+ return _defaultInstance;
+ }
+ static WorkResponse _defaultInstance;
+ static void $checkItem(WorkResponse v) {
+ if (v is !WorkResponse) checkItemFailed(v, 'WorkResponse');
+ }
+
+ int get exitCode => $_get(0, 1, 0);
+ void set exitCode(int v) { $_setUnsignedInt32(0, 1, v); }
+ bool hasExitCode() => $_has(0, 1);
+ void clearExitCode() => clearField(1);
+
+ String get output => $_get(1, 2, '');
+ void set output(String v) { $_setString(1, 2, v); }
+ bool hasOutput() => $_has(1, 2);
+ void clearOutput() => clearField(2);
+}
+
+class _ReadonlyWorkResponse extends WorkResponse with ReadonlyMessageMixin {}
+
+const Input$json = const {
+ '1': 'Input',
+ '2': const [
+ const {'1': 'path', '3': 1, '4': 1, '5': 9},
+ const {'1': 'digest', '3': 2, '4': 1, '5': 12},
+ ],
+};
+
+const WorkRequest$json = const {
+ '1': 'WorkRequest',
+ '2': const [
+ const {'1': 'arguments', '3': 1, '4': 3, '5': 9},
+ const {'1': 'inputs', '3': 2, '4': 3, '5': 11, '6': '.blaze.worker.Input'},
+ ],
+};
+
+const WorkResponse$json = const {
+ '1': 'WorkResponse',
+ '2': const [
+ const {'1': 'exit_code', '3': 1, '4': 1, '5': 5},
+ const {'1': 'output', '3': 2, '4': 1, '5': 9},
+ ],
+};
diff --git a/lib/testing.dart b/lib/testing.dart
new file mode 100644
index 0000000..a201947
--- /dev/null
+++ b/lib/testing.dart
@@ -0,0 +1,85 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:collection';
+import 'dart:io';
+
+import 'package:bazel_worker/bazel_worker.dart';
+
+export 'src/utils.dart' show protoToDelimitedBuffer;
+
+/// A [Stdin] mock object.
+class TestStdinStream implements Stdin {
+ final Queue<int> pendingBytes = new Queue<int>();
+
+ // Adds all the input bytes to this stream.
+ void addInputBytes(List<int> bytes) {
+ pendingBytes.addAll(bytes);
+ }
+
+ @override
+ int readByteSync() {
+ if (pendingBytes.isEmpty) {
+ return -1;
+ } else {
+ return pendingBytes.removeFirst();
+ }
+ }
+
+ @override
+ void noSuchMethod(Invocation invocation) {
+ throw new StateError('Unexpected invocation $invocation');
+ }
+}
+
+/// A [Stdout] mock object.
+class TestStdoutStream implements Stdout {
+ final List<List<int>> writes = <List<int>>[];
+
+ @override
+ void add(List<int> bytes) {
+ writes.add(bytes);
+ }
+
+ @override
+ void noSuchMethod(Invocation invocation) {
+ throw new StateError('Unexpected invocation $invocation');
+ }
+}
+
+/// A [StdWorkerConnection] which records its responses.
+class TestSyncWorkerConnection extends StdSyncWorkerConnection {
+ final List<WorkResponse> responses = <WorkResponse>[];
+
+ TestSyncWorkerConnection(Stdin stdinStream, Stdout stdoutStream)
+ : super(stdinStream: stdinStream, stdoutStream: stdoutStream);
+
+ @override
+ void writeResponse(WorkResponse response) {
+ super.writeResponse(response);
+ responses.add(response);
+ }
+}
+
+/// A [SyncWorkerLoop] for testing.
+class TestSyncWorkerLoop extends SyncWorkerLoop {
+ final List<WorkRequest> requests = <WorkRequest>[];
+ final Queue<WorkResponse> _responses = new Queue<WorkResponse>();
+
+ TestSyncWorkerLoop(SyncWorkerConnection connection)
+ : super(connection: connection);
+
+ @override
+ WorkResponse performRequest(WorkRequest request) {
+ requests.add(request);
+ return _responses.removeFirst();
+ }
+
+ /// Adds [response] to the queue. These will be returned from
+ /// [performResponse] in the order they are added, otherwise it will throw
+ /// if the queue is empty.
+ void enqueueResponse(WorkResponse response) {
+ _responses.addLast(response);
+ }
+}
diff --git a/test/sync_message_grouper_test.dart b/test/sync_message_grouper_test.dart
new file mode 100644
index 0000000..19c8fd5
--- /dev/null
+++ b/test/sync_message_grouper_test.dart
@@ -0,0 +1,123 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:test/test.dart';
+
+import 'package:bazel_worker/src/sync_message_grouper.dart';
+import 'package:bazel_worker/testing.dart';
+
+void main() {
+ SyncMessageGrouper messageGrouper;
+ TestStdinStream stdinStream;
+
+ setUp(() {
+ stdinStream = new TestStdinStream();
+ messageGrouper = new SyncMessageGrouper(stdinStream);
+ });
+
+ group('message_grouper', () {
+ /// Check that if the message grouper produces the [expectedOutput] in
+ /// response to the corresponding [input].
+ void check(List<int> input, List<List<int>> expectedOutput) {
+ stdinStream.addInputBytes(input);
+ for (var chunk in expectedOutput) {
+ expect(messageGrouper.next, equals(chunk));
+ }
+ }
+
+ /// Make a simple message having the given [length]
+ List<int> makeMessage(int length) {
+ var result = <int>[];
+ for (int i = 0; i < length; i++) {
+ result.add(i & 0xff);
+ }
+ return result;
+ }
+
+ test('Empty message', () {
+ check([0], [[]]);
+ });
+
+ test('Short message', () {
+ check([
+ 5,
+ 10,
+ 20,
+ 30,
+ 40,
+ 50
+ ], [
+ [10, 20, 30, 40, 50]
+ ]);
+ });
+
+ test('Message with 2-byte length', () {
+ var len = 0x155;
+ var msg = makeMessage(len);
+ var encodedLen = [0xd5, 0x02];
+ check([]..addAll(encodedLen)..addAll(msg), [msg]);
+ });
+
+ test('Message with 3-byte length', () {
+ var len = 0x4103;
+ var msg = makeMessage(len);
+ var encodedLen = [0x83, 0x82, 0x01];
+ check([]..addAll(encodedLen)..addAll(msg), [msg]);
+ });
+
+ test('Multiple messages', () {
+ check([
+ 2,
+ 10,
+ 20,
+ 2,
+ 30,
+ 40
+ ], [
+ [10, 20],
+ [30, 40]
+ ]);
+ });
+
+ test('Empty message at start', () {
+ check([
+ 0,
+ 2,
+ 10,
+ 20
+ ], [
+ [],
+ [10, 20]
+ ]);
+ });
+
+ test('Empty message at end', () {
+ check([
+ 2,
+ 10,
+ 20,
+ 0
+ ], [
+ [10, 20],
+ []
+ ]);
+ });
+
+ test('Empty message in the middle', () {
+ check([
+ 2,
+ 10,
+ 20,
+ 0,
+ 2,
+ 30,
+ 40
+ ], [
+ [10, 20],
+ [],
+ [30, 40]
+ ]);
+ });
+ });
+}
diff --git a/test/sync_worker_loop_test.dart b/test/sync_worker_loop_test.dart
new file mode 100644
index 0000000..ce323c8
--- /dev/null
+++ b/test/sync_worker_loop_test.dart
@@ -0,0 +1,61 @@
+// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:test/test.dart';
+
+import 'package:bazel_worker/bazel_worker.dart';
+import 'package:bazel_worker/testing.dart';
+
+void main() {
+ TestStdinStream stdinStream;
+ TestStdoutStream stdoutStream;
+ TestSyncWorkerConnection connection;
+ TestSyncWorkerLoop workerLoop;
+
+ setUp(() {
+ stdinStream = new TestStdinStream();
+ stdoutStream = new TestStdoutStream();
+ connection = new TestSyncWorkerConnection(stdinStream, stdoutStream);
+ workerLoop = new TestSyncWorkerLoop(connection);
+ });
+
+ test('basic', () {
+ var request = new WorkRequest();
+ request.arguments.addAll(['--foo=bar']);
+ stdinStream.addInputBytes(protoToDelimitedBuffer(request));
+
+ var response = new WorkResponse()..output = 'Hello World';
+ workerLoop.enqueueResponse(response);
+ workerLoop.run();
+
+ expect(connection.responses, hasLength(1));
+ expect(connection.responses[0], response);
+
+ // Check that a serialized version was written to std out.
+ expect(stdoutStream.writes, hasLength(1));
+ expect(stdoutStream.writes[0], protoToDelimitedBuffer(response));
+ });
+
+ test('Exception in the worker.', () {
+ var request = new WorkRequest();
+ request.arguments.addAll(['--foo=bar']);
+ stdinStream.addInputBytes(protoToDelimitedBuffer(request));
+
+ // Didn't enqueue a response, so this will throw inside of `performRequest`.
+ workerLoop.run();
+
+ expect(connection.responses, hasLength(1));
+ var response = connection.responses[0];
+ expect(response.exitCode, EXIT_CODE_ERROR);
+
+ // Check that a serialized version was written to std out.
+ expect(stdoutStream.writes, hasLength(1));
+ expect(stdoutStream.writes[0], protoToDelimitedBuffer(response));
+ });
+
+ test('Stops at EOF', () {
+ stdinStream.addInputBytes([-1]);
+ workerLoop.run();
+ });
+}