Merge branch 'backport-exit-code-fix'
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2599680..5aeb331 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,21 @@
+## 0.1.23+1
+
+* Don't rely on `exitCode` to know when a worker terminates, instead wait for
+ the input stream to close.
+ * The SDK may also start throwing instead of returning a `null` here, so this
+ pre-emptively guards against that.
+
+## 0.1.23
+
+* Support protobuf `1.x`.
+* Added a tool for updating generated proto files and updated them
+ using the latest version of the protoc_plugin package.
+ * This required a lower bound bump of the `protobuf` package to `0.14.4`.
+
+## 0.1.22
+
+* Require protobuf 0.14.0.
+
## 0.1.21+1
* Don't rely on `exitCode` to know when a worker terminates, instead wait for
diff --git a/analysis_options.yaml b/analysis_options.yaml
index e88b5f1..9781e73 100644
--- a/analysis_options.yaml
+++ b/analysis_options.yaml
@@ -1,3 +1,5 @@
+include: package:pedantic/analysis_options.yaml
+
analyzer:
linter:
@@ -19,8 +21,6 @@
- package_prefixed_library_names
- prefer_is_not_empty
- slash_for_doc_comments
- - sort_unnamed_constructors_first
- - super_goes_last
# - type_annotate_public_apis
- type_init_formals
- unnecessary_brace_in_string_interps
diff --git a/codereview.settings b/codereview.settings
deleted file mode 100644
index 2ef41df..0000000
--- a/codereview.settings
+++ /dev/null
@@ -1,3 +0,0 @@
-CODE_REVIEW_SERVER: http://codereview.chromium.org/
-VIEW_VC: https://github.com/dart-lang/blaze_worker/commit/
-CC_LIST: reviews@dartlang.org
diff --git a/e2e_test/bin/async_worker.dart b/e2e_test/bin/async_worker.dart
index 4573473..ddd530c 100644
--- a/e2e_test/bin/async_worker.dart
+++ b/e2e_test/bin/async_worker.dart
@@ -10,5 +10,5 @@
/// This worker can run in one of two ways: normally, using stdin/stdout, or
/// in an isolate, communicating over a [SendPort].
Future main(List<String> args, [SendPort sendPort]) async {
- await new ExampleAsyncWorker(sendPort).run();
+ await ExampleAsyncWorker(sendPort).run();
}
diff --git a/e2e_test/bin/async_worker_in_isolate.dart b/e2e_test/bin/async_worker_in_isolate.dart
index 7b245f6..fce1eff 100644
--- a/e2e_test/bin/async_worker_in_isolate.dart
+++ b/e2e_test/bin/async_worker_in_isolate.dart
@@ -15,9 +15,9 @@
/// to use this code to do additional work, for example post processing one of
/// the output files.
Future main(List<String> args, SendPort message) async {
- var receivePort = new ReceivePort();
+ var receivePort = ReceivePort();
await Isolate.spawnUri(
- new Uri.file('async_worker.dart'), [], receivePort.sendPort);
+ Uri.file('async_worker.dart'), [], receivePort.sendPort);
var worker = await ForwardsToIsolateAsyncWorker.create(receivePort);
await worker.run();
diff --git a/e2e_test/bin/sync_worker.dart b/e2e_test/bin/sync_worker.dart
index e87b1c8..9bdcc77 100644
--- a/e2e_test/bin/sync_worker.dart
+++ b/e2e_test/bin/sync_worker.dart
@@ -5,5 +5,5 @@
import 'package:e2e_test/sync_worker.dart';
void main() {
- new ExampleSyncWorker().run();
+ ExampleSyncWorker().run();
}
diff --git a/e2e_test/lib/async_worker.dart b/e2e_test/lib/async_worker.dart
index ee080a9..075a5e6 100644
--- a/e2e_test/lib/async_worker.dart
+++ b/e2e_test/lib/async_worker.dart
@@ -12,10 +12,11 @@
class ExampleAsyncWorker extends AsyncWorkerLoop {
/// Set [sendPort] to run in an isolate.
ExampleAsyncWorker([SendPort sendPort])
- : super(connection: new AsyncWorkerConnection(sendPort: sendPort));
+ : super(connection: AsyncWorkerConnection(sendPort: sendPort));
+ @override
Future<WorkResponse> performRequest(WorkRequest request) async {
- return new WorkResponse()
+ return WorkResponse()
..exitCode = 0
..output = request.arguments.join('\n');
}
diff --git a/e2e_test/lib/forwards_to_isolate_async_worker.dart b/e2e_test/lib/forwards_to_isolate_async_worker.dart
index 47cf101..bb937b2 100644
--- a/e2e_test/lib/forwards_to_isolate_async_worker.dart
+++ b/e2e_test/lib/forwards_to_isolate_async_worker.dart
@@ -14,12 +14,13 @@
static Future<ForwardsToIsolateAsyncWorker> create(
ReceivePort receivePort) async {
- return new ForwardsToIsolateAsyncWorker(
+ return ForwardsToIsolateAsyncWorker(
await IsolateDriverConnection.create(receivePort));
}
ForwardsToIsolateAsyncWorker(this._isolateDriverConnection);
+ @override
Future<WorkResponse> performRequest(WorkRequest request) {
_isolateDriverConnection.writeRequest(request);
return _isolateDriverConnection.readResponse();
diff --git a/e2e_test/lib/sync_worker.dart b/e2e_test/lib/sync_worker.dart
index 7c98dbe..789f780 100644
--- a/e2e_test/lib/sync_worker.dart
+++ b/e2e_test/lib/sync_worker.dart
@@ -7,8 +7,9 @@
/// Example worker that just returns in its response all the arguments passed
/// separated by newlines.
class ExampleSyncWorker extends SyncWorkerLoop {
+ @override
WorkResponse performRequest(WorkRequest request) {
- return new WorkResponse()
+ return WorkResponse()
..exitCode = 0
..output = request.arguments.join('\n');
}
diff --git a/e2e_test/test/e2e_test.dart b/e2e_test/test/e2e_test.dart
index 1757139..faae0d3 100644
--- a/e2e_test/test/e2e_test.dart
+++ b/e2e_test/test/e2e_test.dart
@@ -27,7 +27,7 @@
BazelWorkerDriver driver;
group(groupName, () {
setUp(() {
- driver = new BazelWorkerDriver(spawnWorker);
+ driver = BazelWorkerDriver(spawnWorker);
});
tearDown(() async {
@@ -48,14 +48,13 @@
/// completed with the correct response.
Future _doRequests(BazelWorkerDriver driver, {int count}) async {
count ??= 100;
- var requests = new List.generate(count, (requestNum) {
- var request = new WorkRequest();
- request.arguments
- .addAll(new List.generate(requestNum, (argNum) => '$argNum'));
+ var requests = List.generate(count, (requestNum) {
+ var request = WorkRequest();
+ request.arguments.addAll(List.generate(requestNum, (argNum) => '$argNum'));
return request;
});
var responses = await Future.wait(requests.map(driver.doWork));
- for (int i = 0; i < responses.length; i++) {
+ for (var i = 0; i < responses.length; i++) {
var request = requests[i];
var response = responses[i];
expect(response.exitCode, EXIT_CODE_OK);
diff --git a/lib/src/async_message_grouper.dart b/lib/src/async_message_grouper.dart
index 7fca060..310a7f9 100644
--- a/lib/src/async_message_grouper.dart
+++ b/lib/src/async_message_grouper.dart
@@ -6,6 +6,7 @@
import 'dart:collection';
import 'package:async/async.dart';
+import 'package:pedantic/pedantic.dart';
import 'message_grouper.dart';
import 'message_grouper_state.dart';
@@ -14,22 +15,23 @@
/// base-128 encoded lengths interleaved with raw data.
class AsyncMessageGrouper implements MessageGrouper {
/// Current state for reading in messages;
- final _state = new MessageGrouperState();
+ final _state = MessageGrouperState();
/// The input stream.
final StreamQueue<List<int>> _inputQueue;
/// The current buffer.
- final Queue<int> _buffer = new Queue<int>();
+ final Queue<int> _buffer = Queue<int>();
/// Completes after [cancel] is called or [inputStream] is closed.
Future<void> get done => _done.future;
final _done = Completer<void>();
AsyncMessageGrouper(Stream<List<int>> inputStream)
- : _inputQueue = new StreamQueue(inputStream);
+ : _inputQueue = StreamQueue(inputStream);
/// Returns the next full message that is received, or null if none are left.
+ @override
Future<List<int>> get next async {
try {
List<int> message;
@@ -42,7 +44,7 @@
}
// If there is nothing left in the queue then cancel the subscription.
- if (message == null) cancel();
+ if (message == null) unawaited(cancel());
return message;
} catch (e) {
diff --git a/lib/src/driver/driver.dart b/lib/src/driver/driver.dart
index ba3eec8..16cbc96 100644
--- a/lib/src/driver/driver.dart
+++ b/lib/src/driver/driver.dart
@@ -10,7 +10,7 @@
import '../worker_protocol.pb.dart';
import 'driver_connection.dart';
-typedef Future<Process> SpawnWorker();
+typedef SpawnWorker = Future<Process> Function();
/// A driver for talking to a bazel worker.
///
@@ -39,16 +39,16 @@
final _spawningWorkers = <Future<Process>>[];
/// Work requests that haven't been started yet.
- final _workQueue = new Queue<_WorkAttempt>();
+ final _workQueue = Queue<_WorkAttempt>();
/// Factory method that spawns a worker process.
final SpawnWorker _spawnWorker;
BazelWorkerDriver(this._spawnWorker,
{int maxIdleWorkers, int maxWorkers, int maxRetries})
- : this._maxIdleWorkers = maxIdleWorkers ?? 4,
- this._maxWorkers = maxWorkers ?? 4,
- this._maxRetries = maxRetries ?? 4;
+ : _maxIdleWorkers = maxIdleWorkers ?? 4,
+ _maxWorkers = maxWorkers ?? 4,
+ _maxRetries = maxRetries ?? 4;
/// Waits for an available worker, and then sends [WorkRequest] to it.
///
@@ -58,7 +58,7 @@
/// available worker.
Future<WorkResponse> doWork(WorkRequest request,
{Function(Future<WorkResponse>) trackWork}) {
- var attempt = new _WorkAttempt(request, trackWork: trackWork);
+ var attempt = _WorkAttempt(request, trackWork: trackWork);
_workQueue.add(attempt);
_runWorkQueue();
return attempt.response;
@@ -88,7 +88,7 @@
if (_workQueue.isEmpty) return;
if (_numWorkers == _maxWorkers && _idleWorkers.isEmpty) return;
if (_numWorkers > _maxWorkers) {
- throw new StateError('Internal error, created to many workers. Please '
+ throw StateError('Internal error, created to many workers. Please '
'file a bug at https://github.com/dart-lang/bazel_worker/issues/new');
}
@@ -132,7 +132,7 @@
/// Once the worker responds then it will be added back to the pool of idle
/// workers.
void _runWorker(Process worker, _WorkAttempt attempt) {
- bool rescheduled = false;
+ var rescheduled = false;
runZoned(() async {
var connection = _workerConnections[worker];
@@ -151,11 +151,11 @@
rescheduled = _tryReschedule(attempt);
if (rescheduled) return;
stderr.writeln('Failed to run request ${attempt.request}');
- response = new WorkResponse()
+ response = WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output =
'Invalid response from worker, this probably means it wrote '
- 'invalid output or died.';
+ 'invalid output or died.';
}
attempt.responseCompleter.complete(response);
_cleanUp(worker);
@@ -167,7 +167,7 @@
if (!attempt.responseCompleter.isCompleted) {
rescheduled = _tryReschedule(attempt);
if (rescheduled) return;
- var response = new WorkResponse()
+ var response = WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output = 'Error running worker:\n$e\n$s';
attempt.responseCompleter.complete(response);
@@ -221,7 +221,7 @@
/// [WorkResponse], and the number of times it has been retried.
class _WorkAttempt {
final WorkRequest request;
- final responseCompleter = new Completer<WorkResponse>();
+ final responseCompleter = Completer<WorkResponse>();
final Function(Future<WorkResponse>) trackWork;
Future<WorkResponse> get response => responseCompleter.future;
@@ -231,4 +231,4 @@
_WorkAttempt(this.request, {this.trackWork});
}
-final _workerConnections = new Expando<DriverConnection>('connection');
+final _workerConnections = Expando<DriverConnection>('connection');
diff --git a/lib/src/driver/driver_connection.dart b/lib/src/driver/driver_connection.dart
index 360897b..b282aec 100644
--- a/lib/src/driver/driver_connection.dart
+++ b/lib/src/driver/driver_connection.dart
@@ -35,12 +35,11 @@
StdDriverConnection(
{Stream<List<int>> inputStream, StreamSink<List<int>> outputStream})
- : _messageGrouper = new AsyncMessageGrouper(inputStream ?? stdin),
+ : _messageGrouper = AsyncMessageGrouper(inputStream ?? stdin),
_outputStream = outputStream ?? stdout;
- factory StdDriverConnection.forWorker(Process worker) =>
- new StdDriverConnection(
- inputStream: worker.stdout, outputStream: worker.stdin);
+ factory StdDriverConnection.forWorker(Process worker) => StdDriverConnection(
+ inputStream: worker.stdout, outputStream: worker.stdin);
/// Note: This will attempts to recover from invalid proto messages by parsing
/// them as strings. This is a common error case for workers (they print a
@@ -55,12 +54,12 @@
WorkResponse response;
try {
- response = new WorkResponse.fromBuffer(buffer);
+ response = WorkResponse.fromBuffer(buffer);
} catch (_) {
try {
// Try parsing the message as a string and set that as the output.
var output = utf8.decode(buffer);
- var response = new WorkResponse()
+ var response = WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output = 'Worker sent an invalid response:\n$output';
return response;
@@ -96,10 +95,10 @@
/// [receivePort] attached to the [SendPort] that the isolate was created
/// with.
static Future<IsolateDriverConnection> create(ReceivePort receivePort) async {
- var receivePortIterator = new StreamIterator(receivePort);
+ var receivePortIterator = StreamIterator(receivePort);
await receivePortIterator.moveNext();
var sendPort = receivePortIterator.current as SendPort;
- return new IsolateDriverConnection._(receivePortIterator, sendPort);
+ return IsolateDriverConnection._(receivePortIterator, sendPort);
}
@override
diff --git a/lib/src/message_grouper_state.dart b/lib/src/message_grouper_state.dart
index 6ee524b..5cbf933 100644
--- a/lib/src/message_grouper_state.dart
+++ b/lib/src/message_grouper_state.dart
@@ -27,7 +27,7 @@
if (!_lengthReader.done) {
_lengthReader.readByte(byte);
if (_lengthReader.done) {
- _messageReader = new _MessageReader(_lengthReader.length);
+ _messageReader = _MessageReader(_lengthReader.length);
}
} else {
assert(_messageReader != null);
@@ -45,7 +45,7 @@
/// Reset the state so that we are ready to receive the next message.
void reset() {
- _lengthReader = new _LengthReader();
+ _lengthReader = _LengthReader();
_messageReader = null;
}
}
@@ -83,7 +83,7 @@
// Check for the last byte in the length, and then read it.
if ((byte & 0x80) == 0) {
_done = true;
- var reader = new CodedBufferReader(_buffer);
+ var reader = CodedBufferReader(_buffer);
_length = reader.readInt32();
}
}
@@ -105,14 +105,14 @@
return _message;
}
- Uint8List _message;
+ final Uint8List _message;
/// If [_done] is `false`, the number of message bytes that have been received
/// so far. Otherwise zero.
int _numMessageBytesReceived = 0;
_MessageReader(int length)
- : _message = new Uint8List(length),
+ : _message = Uint8List(length),
_length = length,
_done = length == 0;
diff --git a/lib/src/sync_message_grouper.dart b/lib/src/sync_message_grouper.dart
index 20a1139..c0d11f0 100644
--- a/lib/src/sync_message_grouper.dart
+++ b/lib/src/sync_message_grouper.dart
@@ -9,7 +9,7 @@
/// Groups bytes in delimited proto format into the bytes for each message.
class SyncMessageGrouper implements MessageGrouper {
- final _state = new MessageGrouperState();
+ final _state = MessageGrouperState();
final Stdin _stdin;
SyncMessageGrouper(this._stdin);
@@ -17,6 +17,7 @@
/// Blocks until the next full message is received, and then returns it.
///
/// Returns null at end of file.
+ @override
List<int> get next {
try {
List<int> message;
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index e34112d..609b435 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -7,14 +7,14 @@
import 'package:protobuf/protobuf.dart';
List<int> protoToDelimitedBuffer(GeneratedMessage message) {
- var messageBuffer = new CodedBufferWriter();
+ var messageBuffer = CodedBufferWriter();
message.writeToCodedBufferWriter(messageBuffer);
- var delimiterBuffer = new CodedBufferWriter();
+ var delimiterBuffer = CodedBufferWriter();
delimiterBuffer.writeInt32NoTag(messageBuffer.lengthInBytes);
- var result = new Uint8List(
- messageBuffer.lengthInBytes + delimiterBuffer.lengthInBytes);
+ var result =
+ Uint8List(messageBuffer.lengthInBytes + delimiterBuffer.lengthInBytes);
delimiterBuffer.writeTo(result);
messageBuffer.writeTo(result, delimiterBuffer.lengthInBytes);
diff --git a/lib/src/worker/async_worker_loop.dart b/lib/src/worker/async_worker_loop.dart
index 29f6ff8..9405c4f 100644
--- a/lib/src/worker/async_worker_loop.dart
+++ b/lib/src/worker/async_worker_loop.dart
@@ -16,23 +16,25 @@
final AsyncWorkerConnection connection;
AsyncWorkerLoop({AsyncWorkerConnection connection})
- : this.connection = connection ?? new StdAsyncWorkerConnection();
+ : connection = connection ?? StdAsyncWorkerConnection();
/// Perform a single [WorkRequest], and return a [WorkResponse].
+ @override
Future<WorkResponse> performRequest(WorkRequest request);
/// Run the worker loop. The returned [Future] doesn't complete until
/// [connection#readRequest] returns `null`.
+ @override
Future run() async {
while (true) {
WorkResponse response;
try {
var request = await connection.readRequest();
if (request == null) break;
- var printMessages = new StringBuffer();
+ var printMessages = StringBuffer();
response = await runZoned(() => performRequest(request),
zoneSpecification:
- new ZoneSpecification(print: (self, parent, zone, message) {
+ ZoneSpecification(print: (self, parent, zone, message) {
printMessages.writeln();
printMessages.write(message);
}));
@@ -42,7 +44,7 @@
// In case they forget to set this.
response.exitCode ??= EXIT_CODE_OK;
} catch (e, s) {
- response = new WorkResponse()
+ response = WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output = '$e\n$s';
}
diff --git a/lib/src/worker/sync_worker_loop.dart b/lib/src/worker/sync_worker_loop.dart
index e1b9aaf..2d287b0 100644
--- a/lib/src/worker/sync_worker_loop.dart
+++ b/lib/src/worker/sync_worker_loop.dart
@@ -15,21 +15,23 @@
final SyncWorkerConnection connection;
SyncWorkerLoop({SyncWorkerConnection connection})
- : this.connection = connection ?? new StdSyncWorkerConnection();
+ : connection = connection ?? StdSyncWorkerConnection();
/// Perform a single [WorkRequest], and return a [WorkResponse].
+ @override
WorkResponse performRequest(WorkRequest request);
/// Run the worker loop. Blocks until [connection#readRequest] returns `null`.
+ @override
void run() {
while (true) {
WorkResponse response;
try {
var request = connection.readRequest();
if (request == null) break;
- var printMessages = new StringBuffer();
+ var printMessages = StringBuffer();
response = runZoned(() => performRequest(request), zoneSpecification:
- new ZoneSpecification(print: (self, parent, zone, message) {
+ ZoneSpecification(print: (self, parent, zone, message) {
printMessages.writeln();
printMessages.write(message);
}));
@@ -39,7 +41,7 @@
// In case they forget to set this.
response.exitCode ??= EXIT_CODE_OK;
} catch (e, s) {
- response = new WorkResponse()
+ response = WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output = '$e\n$s';
}
diff --git a/lib/src/worker/worker_connection.dart b/lib/src/worker/worker_connection.dart
index c55f52d..a429cfe 100644
--- a/lib/src/worker/worker_connection.dart
+++ b/lib/src/worker/worker_connection.dart
@@ -34,15 +34,16 @@
StreamSink<List<int>> outputStream,
SendPort sendPort}) =>
sendPort == null
- ? new StdAsyncWorkerConnection(
+ ? StdAsyncWorkerConnection(
inputStream: inputStream, outputStream: outputStream)
- : new SendPortAsyncWorkerConnection(sendPort);
+ : SendPortAsyncWorkerConnection(sendPort);
@override
Future<WorkRequest> readRequest();
}
abstract class SyncWorkerConnection implements WorkerConnection {
+ @override
WorkRequest readRequest();
}
@@ -54,7 +55,7 @@
StdAsyncWorkerConnection(
{Stream<List<int>> inputStream, StreamSink<List<int>> outputStream})
- : _messageGrouper = new AsyncMessageGrouper(inputStream ?? stdin),
+ : _messageGrouper = AsyncMessageGrouper(inputStream ?? stdin),
_outputStream = outputStream ?? stdout;
@override
@@ -62,7 +63,7 @@
var buffer = await _messageGrouper.next;
if (buffer == null) return null;
- return new WorkRequest.fromBuffer(buffer);
+ return WorkRequest.fromBuffer(buffer);
}
@override
@@ -78,18 +79,18 @@
final SendPort sendPort;
factory SendPortAsyncWorkerConnection(SendPort sendPort) {
- var receivePort = new ReceivePort();
+ var receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
return SendPortAsyncWorkerConnection._(receivePort, sendPort);
}
SendPortAsyncWorkerConnection._(this.receivePort, this.sendPort)
- : receivePortIterator = new StreamIterator(receivePort.cast());
+ : receivePortIterator = StreamIterator(receivePort.cast());
@override
Future<WorkRequest> readRequest() async {
if (!await receivePortIterator.moveNext()) return null;
- return new WorkRequest.fromBuffer(receivePortIterator.current);
+ return WorkRequest.fromBuffer(receivePortIterator.current);
}
@override
@@ -105,7 +106,7 @@
final Stdout _stdoutStream;
StdSyncWorkerConnection({Stdin stdinStream, Stdout stdoutStream})
- : _messageGrouper = new SyncMessageGrouper(stdinStream ?? stdin),
+ : _messageGrouper = SyncMessageGrouper(stdinStream ?? stdin),
_stdoutStream = stdoutStream ?? stdout;
@override
@@ -113,7 +114,7 @@
var buffer = _messageGrouper.next;
if (buffer == null) return null;
- return new WorkRequest.fromBuffer(buffer);
+ return WorkRequest.fromBuffer(buffer);
}
@override
diff --git a/lib/src/worker_protocol.pb.dart b/lib/src/worker_protocol.pb.dart
index 7e50966..90e4381 100644
--- a/lib/src/worker_protocol.pb.dart
+++ b/lib/src/worker_protocol.pb.dart
@@ -1,102 +1,182 @@
///
// Generated code. Do not modify.
-// source: third_party/bazel/src/main/protobuf/worker_protocol.proto
+// source: worker_protocol.proto
+//
+// @dart = 2.3
+// ignore_for_file: camel_case_types,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type
+// ignore_for_file: annotate_overrides
-// ignore: UNUSED_SHOWN_NAME
-import 'dart:core' show int, bool, double, String, List, Map, override;
+import 'dart:core' as $core;
import 'package:protobuf/protobuf.dart' as $pb;
class Input extends $pb.GeneratedMessage {
- static final $pb.BuilderInfo _i = new $pb.BuilderInfo('Input', package: const $pb.PackageName('blaze.worker'))
+ static final $pb.BuilderInfo _i = $pb.BuilderInfo('Input',
+ package: const $pb.PackageName('blaze.worker'),
+ createEmptyInstance: create)
..aOS(1, 'path')
- ..a<List<int>>(2, 'digest', $pb.PbFieldType.OY)
- ..hasRequiredFields = false
- ;
+ ..a<$core.List<$core.int>>(2, 'digest', $pb.PbFieldType.OY)
+ ..hasRequiredFields = false;
- Input() : super();
- Input.fromBuffer(List<int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) : super.fromBuffer(i, r);
- Input.fromJson(String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) : super.fromJson(i, r);
- Input clone() => new Input()..mergeFromMessage(this);
- Input copyWith(void Function(Input) updates) => super.copyWith((message) => updates(message as Input));
+ Input._() : super();
+ factory Input() => create();
+ factory Input.fromBuffer($core.List<$core.int> i,
+ [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) =>
+ create()..mergeFromBuffer(i, r);
+ factory Input.fromJson($core.String i,
+ [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) =>
+ create()..mergeFromJson(i, r);
+ Input clone() => Input()..mergeFromMessage(this);
+ Input copyWith(void Function(Input) updates) =>
+ super.copyWith((message) => updates(message as Input));
$pb.BuilderInfo get info_ => _i;
- static Input create() => new Input();
+ @$core.pragma('dart2js:noInline')
+ static Input create() => Input._();
Input createEmptyInstance() => create();
- static $pb.PbList<Input> createRepeated() => new $pb.PbList<Input>();
- static Input getDefault() => _defaultInstance ??= create()..freeze();
+ static $pb.PbList<Input> createRepeated() => $pb.PbList<Input>();
+ @$core.pragma('dart2js:noInline')
+ static Input getDefault() =>
+ _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<Input>(create);
static Input _defaultInstance;
- static void $checkItem(Input v) {
- if (v is! Input) $pb.checkItemFailed(v, _i.qualifiedMessageName);
+
+ @$pb.TagNumber(1)
+ $core.String get path => $_getSZ(0);
+ @$pb.TagNumber(1)
+ set path($core.String v) {
+ $_setString(0, v);
}
- String get path => $_getS(0, '');
- set path(String v) { $_setString(0, v); }
- bool hasPath() => $_has(0);
+ @$pb.TagNumber(1)
+ $core.bool hasPath() => $_has(0);
+ @$pb.TagNumber(1)
void clearPath() => clearField(1);
- List<int> get digest => $_getN(1);
- set digest(List<int> v) { $_setBytes(1, v); }
- bool hasDigest() => $_has(1);
+ @$pb.TagNumber(2)
+ $core.List<$core.int> get digest => $_getN(1);
+ @$pb.TagNumber(2)
+ set digest($core.List<$core.int> v) {
+ $_setBytes(1, v);
+ }
+
+ @$pb.TagNumber(2)
+ $core.bool hasDigest() => $_has(1);
+ @$pb.TagNumber(2)
void clearDigest() => clearField(2);
}
class WorkRequest extends $pb.GeneratedMessage {
- static final $pb.BuilderInfo _i = new $pb.BuilderInfo('WorkRequest', package: const $pb.PackageName('blaze.worker'))
+ static final $pb.BuilderInfo _i = $pb.BuilderInfo('WorkRequest',
+ package: const $pb.PackageName('blaze.worker'),
+ createEmptyInstance: create)
..pPS(1, 'arguments')
- ..pp<Input>(2, 'inputs', $pb.PbFieldType.PM, Input.$checkItem, Input.create)
- ..hasRequiredFields = false
- ;
+ ..pc<Input>(2, 'inputs', $pb.PbFieldType.PM, subBuilder: Input.create)
+ ..a<$core.int>(3, 'requestId', $pb.PbFieldType.O3)
+ ..hasRequiredFields = false;
- WorkRequest() : super();
- WorkRequest.fromBuffer(List<int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) : super.fromBuffer(i, r);
- WorkRequest.fromJson(String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) : super.fromJson(i, r);
- WorkRequest clone() => new WorkRequest()..mergeFromMessage(this);
- WorkRequest copyWith(void Function(WorkRequest) updates) => super.copyWith((message) => updates(message as WorkRequest));
+ WorkRequest._() : super();
+ factory WorkRequest() => create();
+ factory WorkRequest.fromBuffer($core.List<$core.int> i,
+ [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) =>
+ create()..mergeFromBuffer(i, r);
+ factory WorkRequest.fromJson($core.String i,
+ [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) =>
+ create()..mergeFromJson(i, r);
+ WorkRequest clone() => WorkRequest()..mergeFromMessage(this);
+ WorkRequest copyWith(void Function(WorkRequest) updates) =>
+ super.copyWith((message) => updates(message as WorkRequest));
$pb.BuilderInfo get info_ => _i;
- static WorkRequest create() => new WorkRequest();
+ @$core.pragma('dart2js:noInline')
+ static WorkRequest create() => WorkRequest._();
WorkRequest createEmptyInstance() => create();
- static $pb.PbList<WorkRequest> createRepeated() => new $pb.PbList<WorkRequest>();
- static WorkRequest getDefault() => _defaultInstance ??= create()..freeze();
+ static $pb.PbList<WorkRequest> createRepeated() => $pb.PbList<WorkRequest>();
+ @$core.pragma('dart2js:noInline')
+ static WorkRequest getDefault() => _defaultInstance ??=
+ $pb.GeneratedMessage.$_defaultFor<WorkRequest>(create);
static WorkRequest _defaultInstance;
- static void $checkItem(WorkRequest v) {
- if (v is! WorkRequest) $pb.checkItemFailed(v, _i.qualifiedMessageName);
+
+ @$pb.TagNumber(1)
+ $core.List<$core.String> get arguments => $_getList(0);
+
+ @$pb.TagNumber(2)
+ $core.List<Input> get inputs => $_getList(1);
+
+ @$pb.TagNumber(3)
+ $core.int get requestId => $_getIZ(2);
+ @$pb.TagNumber(3)
+ set requestId($core.int v) {
+ $_setSignedInt32(2, v);
}
- List<String> get arguments => $_getList(0);
-
- List<Input> get inputs => $_getList(1);
+ @$pb.TagNumber(3)
+ $core.bool hasRequestId() => $_has(2);
+ @$pb.TagNumber(3)
+ void clearRequestId() => clearField(3);
}
class WorkResponse extends $pb.GeneratedMessage {
- static final $pb.BuilderInfo _i = new $pb.BuilderInfo('WorkResponse', package: const $pb.PackageName('blaze.worker'))
- ..a<int>(1, 'exitCode', $pb.PbFieldType.O3)
+ static final $pb.BuilderInfo _i = $pb.BuilderInfo('WorkResponse',
+ package: const $pb.PackageName('blaze.worker'),
+ createEmptyInstance: create)
+ ..a<$core.int>(1, 'exitCode', $pb.PbFieldType.O3)
..aOS(2, 'output')
- ..hasRequiredFields = false
- ;
+ ..a<$core.int>(3, 'requestId', $pb.PbFieldType.O3)
+ ..hasRequiredFields = false;
- WorkResponse() : super();
- WorkResponse.fromBuffer(List<int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) : super.fromBuffer(i, r);
- WorkResponse.fromJson(String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) : super.fromJson(i, r);
- WorkResponse clone() => new WorkResponse()..mergeFromMessage(this);
- WorkResponse copyWith(void Function(WorkResponse) updates) => super.copyWith((message) => updates(message as WorkResponse));
+ WorkResponse._() : super();
+ factory WorkResponse() => create();
+ factory WorkResponse.fromBuffer($core.List<$core.int> i,
+ [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) =>
+ create()..mergeFromBuffer(i, r);
+ factory WorkResponse.fromJson($core.String i,
+ [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) =>
+ create()..mergeFromJson(i, r);
+ WorkResponse clone() => WorkResponse()..mergeFromMessage(this);
+ WorkResponse copyWith(void Function(WorkResponse) updates) =>
+ super.copyWith((message) => updates(message as WorkResponse));
$pb.BuilderInfo get info_ => _i;
- static WorkResponse create() => new WorkResponse();
+ @$core.pragma('dart2js:noInline')
+ static WorkResponse create() => WorkResponse._();
WorkResponse createEmptyInstance() => create();
- static $pb.PbList<WorkResponse> createRepeated() => new $pb.PbList<WorkResponse>();
- static WorkResponse getDefault() => _defaultInstance ??= create()..freeze();
+ static $pb.PbList<WorkResponse> createRepeated() =>
+ $pb.PbList<WorkResponse>();
+ @$core.pragma('dart2js:noInline')
+ static WorkResponse getDefault() => _defaultInstance ??=
+ $pb.GeneratedMessage.$_defaultFor<WorkResponse>(create);
static WorkResponse _defaultInstance;
- static void $checkItem(WorkResponse v) {
- if (v is! WorkResponse) $pb.checkItemFailed(v, _i.qualifiedMessageName);
+
+ @$pb.TagNumber(1)
+ $core.int get exitCode => $_getIZ(0);
+ @$pb.TagNumber(1)
+ set exitCode($core.int v) {
+ $_setSignedInt32(0, v);
}
- int get exitCode => $_get(0, 0);
- set exitCode(int v) { $_setSignedInt32(0, v); }
- bool hasExitCode() => $_has(0);
+ @$pb.TagNumber(1)
+ $core.bool hasExitCode() => $_has(0);
+ @$pb.TagNumber(1)
void clearExitCode() => clearField(1);
- String get output => $_getS(1, '');
- set output(String v) { $_setString(1, v); }
- bool hasOutput() => $_has(1);
- void clearOutput() => clearField(2);
-}
+ @$pb.TagNumber(2)
+ $core.String get output => $_getSZ(1);
+ @$pb.TagNumber(2)
+ set output($core.String v) {
+ $_setString(1, v);
+ }
+ @$pb.TagNumber(2)
+ $core.bool hasOutput() => $_has(1);
+ @$pb.TagNumber(2)
+ void clearOutput() => clearField(2);
+
+ @$pb.TagNumber(3)
+ $core.int get requestId => $_getIZ(2);
+ @$pb.TagNumber(3)
+ set requestId($core.int v) {
+ $_setSignedInt32(2, v);
+ }
+
+ @$pb.TagNumber(3)
+ $core.bool hasRequestId() => $_has(2);
+ @$pb.TagNumber(3)
+ void clearRequestId() => clearField(3);
+}
diff --git a/lib/testing.dart b/lib/testing.dart
index 1e7ccc3..037aa1e 100644
--- a/lib/testing.dart
+++ b/lib/testing.dart
@@ -23,14 +23,16 @@
/// A [Stdin] mock object which only implements `readByteSync`.
class TestStdinSync implements TestStdin {
/// Pending bytes to be delivered synchronously.
- final Queue<int> pendingBytes = new Queue<int>();
+ final Queue<int> pendingBytes = Queue<int>();
/// Adds all the [bytes] to this stream.
+ @override
void addInputBytes(List<int> bytes) {
pendingBytes.addAll(bytes);
}
/// Add a -1 to signal EOF.
+ @override
void close() {
pendingBytes.add(-1);
}
@@ -42,7 +44,7 @@
@override
void noSuchMethod(Invocation invocation) {
- throw new StateError('Unexpected invocation ${invocation.memberName}.');
+ throw StateError('Unexpected invocation ${invocation.memberName}.');
}
}
@@ -51,29 +53,31 @@
/// Note: You must call [close] in order for the loop to exit properly.
class TestStdinAsync implements TestStdin {
/// Controls the stream for async delivery of bytes.
- final StreamController<Uint8List> _controller = new StreamController();
+ final StreamController<Uint8List> _controller = StreamController();
StreamController<Uint8List> get controller => _controller;
/// Adds all the [bytes] to this stream.
+ @override
void addInputBytes(List<int> bytes) {
_controller.add(Uint8List.fromList(bytes));
}
/// Closes this stream. This is necessary for the [AsyncWorkerLoop] to exit.
+ @override
void close() {
_controller.close();
}
@override
- StreamSubscription<Uint8List> listen(onData(Uint8List bytes),
- {Function onError, void onDone(), bool cancelOnError}) {
+ StreamSubscription<Uint8List> listen(void Function(Uint8List bytes) onData,
+ {Function onError, void Function() onDone, bool cancelOnError}) {
return _controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
@override
void noSuchMethod(Invocation invocation) {
- throw new StateError('Unexpected invocation ${invocation.memberName}.');
+ throw StateError('Unexpected invocation ${invocation.memberName}.');
}
}
@@ -88,7 +92,7 @@
@override
void noSuchMethod(Invocation invocation) {
- throw new StateError('Unexpected invocation ${invocation.memberName}.');
+ throw StateError('Unexpected invocation ${invocation.memberName}.');
}
}
@@ -108,6 +112,7 @@
/// A [StdSyncWorkerConnection] which records its responses.
class TestSyncWorkerConnection extends StdSyncWorkerConnection
implements TestWorkerConnection {
+ @override
final List<WorkResponse> responses = <WorkResponse>[];
TestSyncWorkerConnection(Stdin stdinStream, Stdout stdoutStream)
@@ -123,7 +128,7 @@
/// A [SyncWorkerLoop] for testing.
class TestSyncWorkerLoop extends SyncWorkerLoop implements TestWorkerLoop {
final List<WorkRequest> requests = <WorkRequest>[];
- final Queue<WorkResponse> _responses = new Queue<WorkResponse>();
+ final Queue<WorkResponse> _responses = Queue<WorkResponse>();
@override
final String printMessage;
@@ -141,6 +146,7 @@
/// Adds [response] to the queue. These will be returned from
/// [performResponse] in the order they are added, otherwise it will throw
/// if the queue is empty.
+ @override
void enqueueResponse(WorkResponse response) {
_responses.addLast(response);
}
@@ -149,6 +155,7 @@
/// A [StdAsyncWorkerConnection] which records its responses.
class TestAsyncWorkerConnection extends StdAsyncWorkerConnection
implements TestWorkerConnection {
+ @override
final List<WorkResponse> responses = <WorkResponse>[];
TestAsyncWorkerConnection(
@@ -165,7 +172,7 @@
/// A [AsyncWorkerLoop] for testing.
class TestAsyncWorkerLoop extends AsyncWorkerLoop implements TestWorkerLoop {
final List<WorkRequest> requests = <WorkRequest>[];
- final Queue<WorkResponse> _responses = new Queue<WorkResponse>();
+ final Queue<WorkResponse> _responses = Queue<WorkResponse>();
@override
final String printMessage;
@@ -183,6 +190,7 @@
/// Adds [response] to the queue. These will be returned from
/// [performResponse] in the order they are added, otherwise it will throw
/// if the queue is empty.
+ @override
void enqueueResponse(WorkResponse response) {
_responses.addLast(response);
}
diff --git a/pubspec.yaml b/pubspec.yaml
index f8ffa33..dbae4cd 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,16 +1,16 @@
name: bazel_worker
-version: 0.1.21+1
+version: 0.1.23+1
description: Tools for creating a bazel persistent worker.
-author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/bazel_worker
environment:
- sdk: '>=2.0.0-dev.17.0 <3.0.0'
+ sdk: '>=2.3.0 <3.0.0'
dependencies:
async: '>1.9.0 <3.0.0'
- protobuf: '>=0.10.4 <0.14.0'
+ pedantic: ^1.8.0
+ protobuf: '>=0.14.4 <2.0.0'
dev_dependencies:
test: ^1.2.0
diff --git a/test/driver_test.dart b/test/driver_test.dart
index 5b1f57a..56c9086 100644
--- a/test/driver_test.dart
+++ b/test/driver_test.dart
@@ -22,11 +22,11 @@
test('can run multiple batches of requests through multiple workers',
() async {
- int maxWorkers = 4;
- int maxIdleWorkers = 2;
- driver = new BazelWorkerDriver(MockWorker.spawn,
+ var maxWorkers = 4;
+ var maxIdleWorkers = 2;
+ driver = BazelWorkerDriver(MockWorker.spawn,
maxWorkers: maxWorkers, maxIdleWorkers: maxIdleWorkers);
- for (int i = 0; i < 10; i++) {
+ for (var i = 0; i < 10; i++) {
await _doRequests(driver: driver);
expect(MockWorker.liveWorkers.length, maxIdleWorkers);
// No workers should be killed while there is ongoing work, but they
@@ -37,11 +37,11 @@
});
test('can run multiple requests through one worker', () async {
- int maxWorkers = 1;
- int maxIdleWorkers = 1;
- driver = new BazelWorkerDriver(MockWorker.spawn,
+ var maxWorkers = 1;
+ var maxIdleWorkers = 1;
+ driver = BazelWorkerDriver(MockWorker.spawn,
maxWorkers: maxWorkers, maxIdleWorkers: maxIdleWorkers);
- for (int i = 0; i < 10; i++) {
+ for (var i = 0; i < 10; i++) {
await _doRequests(driver: driver);
expect(MockWorker.liveWorkers.length, 1);
expect(MockWorker.deadWorkers.length, 0);
@@ -49,9 +49,9 @@
});
test('can run one request through multiple workers', () async {
- driver = new BazelWorkerDriver(MockWorker.spawn,
- maxWorkers: 4, maxIdleWorkers: 4);
- for (int i = 0; i < 10; i++) {
+ driver =
+ BazelWorkerDriver(MockWorker.spawn, maxWorkers: 4, maxIdleWorkers: 4);
+ for (var i = 0; i < 10; i++) {
await _doRequests(driver: driver, count: 1);
expect(MockWorker.liveWorkers.length, 1);
expect(MockWorker.deadWorkers.length, 0);
@@ -59,10 +59,10 @@
});
test('can run with maxIdleWorkers == 0', () async {
- int maxWorkers = 4;
- driver = new BazelWorkerDriver(MockWorker.spawn,
+ var maxWorkers = 4;
+ driver = BazelWorkerDriver(MockWorker.spawn,
maxWorkers: maxWorkers, maxIdleWorkers: 0);
- for (int i = 0; i < 10; i++) {
+ for (var i = 0; i < 10; i++) {
await _doRequests(driver: driver);
expect(MockWorker.liveWorkers.length, 0);
expect(MockWorker.deadWorkers.length, maxWorkers * (i + 1));
@@ -71,7 +71,7 @@
test('trackWork gets invoked when a worker is actually ready', () async {
var maxWorkers = 2;
- driver = new BazelWorkerDriver(MockWorker.spawn, maxWorkers: maxWorkers);
+ driver = BazelWorkerDriver(MockWorker.spawn, maxWorkers: maxWorkers);
var tracking = <Future>[];
await _doRequests(
driver: driver,
@@ -88,18 +88,18 @@
/// A driver which spawns [numBadWorkers] failing workers and then good
/// ones after that, and which will retry [maxRetries] times.
void createDriver({int maxRetries = 2, int numBadWorkers = 2}) {
- int numSpawned = 0;
- driver = new BazelWorkerDriver(
- () async => new MockWorker(workerLoopFactory: (MockWorker worker) {
- var connection = new StdAsyncWorkerConnection(
+ var numSpawned = 0;
+ driver = BazelWorkerDriver(
+ () async => MockWorker(workerLoopFactory: (MockWorker worker) {
+ var connection = StdAsyncWorkerConnection(
inputStream: worker._stdinController.stream,
outputStream: worker._stdoutController.sink);
if (numSpawned < numBadWorkers) {
numSpawned++;
- return new ThrowingMockWorkerLoop(
+ return ThrowingMockWorkerLoop(
worker, MockWorker.responseQueue, connection);
} else {
- return new MockWorkerLoop(MockWorker.responseQueue,
+ return MockWorkerLoop(MockWorker.responseQueue,
connection: connection);
}
}),
@@ -108,9 +108,9 @@
test('should retry up to maxRetries times', () async {
createDriver();
- var expectedResponse = new WorkResponse();
+ var expectedResponse = WorkResponse();
MockWorker.responseQueue.addAll([null, null, expectedResponse]);
- var actualResponse = await driver.doWork(new WorkRequest());
+ var actualResponse = await driver.doWork(WorkRequest());
// The first 2 null responses are thrown away, and we should get the
// third one.
expect(actualResponse, expectedResponse);
@@ -121,8 +121,8 @@
test('should fail if it exceeds maxRetries failures', () async {
createDriver(maxRetries: 2, numBadWorkers: 3);
- MockWorker.responseQueue.addAll([null, null, new WorkResponse()]);
- var actualResponse = await driver.doWork(new WorkRequest());
+ MockWorker.responseQueue.addAll([null, null, WorkResponse()]);
+ var actualResponse = await driver.doWork(WorkRequest());
// Should actually get a bad response.
expect(actualResponse.exitCode, 15);
expect(
@@ -151,11 +151,11 @@
Function(Future<WorkResponse>) trackWork}) async {
// If we create a driver, we need to make sure and terminate it.
var terminateDriver = driver == null;
- driver ??= new BazelWorkerDriver(MockWorker.spawn);
+ driver ??= BazelWorkerDriver(MockWorker.spawn);
count ??= 100;
terminateDriver ??= true;
- var requests = new List.generate(count, (_) => new WorkRequest());
- var responses = new List.generate(count, (_) => new WorkResponse());
+ var requests = List.generate(count, (_) => WorkRequest());
+ var responses = List.generate(count, (_) => WorkResponse());
MockWorker.responseQueue.addAll(responses);
var actualResponses = await Future.wait(
requests.map((request) => driver.doWork(request, trackWork: trackWork)));
@@ -172,6 +172,7 @@
MockWorkerLoop(this._responseQueue, {AsyncWorkerConnection connection})
: super(connection: connection);
+ @override
Future<WorkResponse> performRequest(WorkRequest request) async {
print('Performing request $request');
return _responseQueue.removeFirst();
@@ -206,12 +207,12 @@
/// If there are no items left in [responseQueue] then it will throw.
class MockWorker implements Process {
/// Spawns a new [MockWorker].
- static Future<MockWorker> spawn() async => new MockWorker();
+ static Future<MockWorker> spawn() async => MockWorker();
/// Static queue of pending responses, these are shared by all workers.
///
/// If this is empty and a request is received then it will throw.
- static final responseQueue = new Queue<WorkResponse>();
+ static final responseQueue = Queue<WorkResponse>();
/// Static list of all live workers.
static final liveWorkers = <MockWorker>[];
@@ -237,25 +238,26 @@
@override
Stream<List<int>> get stdout => _stdoutController.stream;
- final _stdoutController = new StreamController<List<int>>();
+ final _stdoutController = StreamController<List<int>>();
@override
Stream<List<int>> get stderr => _stderrController.stream;
- final _stderrController = new StreamController<List<int>>();
+ final _stderrController = StreamController<List<int>>();
@override
IOSink get stdin {
- _stdin ??= new IOSink(_stdinController.sink);
+ _stdin ??= IOSink(_stdinController.sink);
return _stdin;
}
IOSink _stdin;
- final _stdinController = new StreamController<List<int>>();
-
- int get pid => throw new UnsupportedError('Not needed.');
+ final _stdinController = StreamController<List<int>>();
@override
- bool kill([ProcessSignal = ProcessSignal.sigterm, int exitCode = 0]) {
+ int get pid => throw UnsupportedError('Not needed.');
+
+ @override
+ bool kill([processSignal = ProcessSignal.sigterm, int exitCode = 0]) {
if (_killed) return false;
() async {
await _stdoutController.close();
@@ -267,5 +269,5 @@
return true;
}
- bool _killed = false;
+ final _killed = false;
}
diff --git a/test/message_grouper_test.dart b/test/message_grouper_test.dart
index 39dcb9e..8843b69 100644
--- a/test/message_grouper_test.dart
+++ b/test/message_grouper_test.dart
@@ -14,18 +14,18 @@
void main() {
group('AsyncMessageGrouper', () {
- runTests(() => new TestStdinAsync(),
- (Stdin stdinStream) => new AsyncMessageGrouper(stdinStream));
+ runTests(() => TestStdinAsync(),
+ (Stdin stdinStream) => AsyncMessageGrouper(stdinStream));
});
group('SyncMessageGrouper', () {
- runTests(() => new TestStdinSync(),
- (Stdin stdinStream) => new SyncMessageGrouper(stdinStream));
+ runTests(() => TestStdinSync(),
+ (Stdin stdinStream) => SyncMessageGrouper(stdinStream));
});
}
-void runTests(TestStdin stdinFactory(),
- MessageGrouper messageGrouperFactory(Stdin stdinStream)) {
+void runTests(TestStdin Function() stdinFactory,
+ MessageGrouper Function(Stdin) messageGrouperFactory) {
MessageGrouper messageGrouper;
TestStdin stdinStream;
@@ -47,7 +47,7 @@
/// Make a simple message having the given [length]
List<int> makeMessage(int length) {
var result = <int>[];
- for (int i = 0; i < length; i++) {
+ for (var i = 0; i < length; i++) {
result.add(i & 0xff);
}
return result;
@@ -74,14 +74,14 @@
var len = 0x155;
var msg = makeMessage(len);
var encodedLen = [0xd5, 0x02];
- await check([]..addAll(encodedLen)..addAll(msg), [msg]);
+ await check([...encodedLen, ...msg], [msg]);
});
test('Message with 3-byte length', () async {
var len = 0x4103;
var msg = makeMessage(len);
var encodedLen = [0x83, 0x82, 0x01];
- await check([]..addAll(encodedLen)..addAll(msg), [msg]);
+ await check([...encodedLen, ...msg], [msg]);
});
test('Multiple messages', () async {
diff --git a/test/worker_loop_test.dart b/test/worker_loop_test.dart
index 2356060..ec7f83b 100644
--- a/test/worker_loop_test.dart
+++ b/test/worker_loop_test.dart
@@ -13,45 +13,45 @@
void main() {
group('SyncWorkerLoop', () {
runTests(
- () => new TestStdinSync(),
+ () => TestStdinSync(),
(Stdin stdinStream, Stdout stdoutStream) =>
- new TestSyncWorkerConnection(stdinStream, stdoutStream),
+ TestSyncWorkerConnection(stdinStream, stdoutStream),
(TestSyncWorkerConnection connection) =>
- new TestSyncWorkerLoop(connection));
+ TestSyncWorkerLoop(connection));
});
group('AsyncWorkerLoop', () {
runTests(
- () => new TestStdinAsync(),
+ () => TestStdinAsync(),
(Stdin stdinStream, Stdout stdoutStream) =>
- new TestAsyncWorkerConnection(stdinStream, stdoutStream),
+ TestAsyncWorkerConnection(stdinStream, stdoutStream),
(TestAsyncWorkerConnection connection) =>
- new TestAsyncWorkerLoop(connection));
+ TestAsyncWorkerLoop(connection));
});
group('SyncWorkerLoopWithPrint', () {
runTests(
- () => new TestStdinSync(),
+ () => TestStdinSync(),
(Stdin stdinStream, Stdout stdoutStream) =>
- new TestSyncWorkerConnection(stdinStream, stdoutStream),
+ TestSyncWorkerConnection(stdinStream, stdoutStream),
(TestSyncWorkerConnection connection) =>
- new TestSyncWorkerLoop(connection, printMessage: 'Goodbye!'));
+ TestSyncWorkerLoop(connection, printMessage: 'Goodbye!'));
});
group('AsyncWorkerLoopWithPrint', () {
runTests(
- () => new TestStdinAsync(),
+ () => TestStdinAsync(),
(Stdin stdinStream, Stdout stdoutStream) =>
- new TestAsyncWorkerConnection(stdinStream, stdoutStream),
+ TestAsyncWorkerConnection(stdinStream, stdoutStream),
(TestAsyncWorkerConnection connection) =>
- new TestAsyncWorkerLoop(connection, printMessage: 'Goodbye!'));
+ TestAsyncWorkerLoop(connection, printMessage: 'Goodbye!'));
});
}
void runTests<T extends TestWorkerConnection>(
- TestStdin stdinFactory(),
- T workerConnectionFactory(Stdin stdin, Stdout stdout),
- TestWorkerLoop workerLoopFactory(T connection)) {
+ TestStdin Function() stdinFactory,
+ T Function(Stdin, Stdout) workerConnectionFactory,
+ TestWorkerLoop Function(T) workerLoopFactory) {
TestStdin stdinStream;
TestStdoutStream stdoutStream;
T connection;
@@ -59,24 +59,24 @@
setUp(() {
stdinStream = stdinFactory();
- stdoutStream = new TestStdoutStream();
+ stdoutStream = TestStdoutStream();
connection = workerConnectionFactory(stdinStream, stdoutStream);
workerLoop = workerLoopFactory(connection);
});
test('basic', () async {
- var request = new WorkRequest();
+ var request = WorkRequest();
request.arguments.addAll(['--foo=bar']);
stdinStream.addInputBytes(protoToDelimitedBuffer(request));
stdinStream.close();
- var response = new WorkResponse()..output = 'Hello World';
+ var response = WorkResponse()..output = 'Hello World';
workerLoop.enqueueResponse(response);
// Make sure `print` never gets called in the parent zone.
var printMessages = <String>[];
await runZoned(() => workerLoop.run(), zoneSpecification:
- new ZoneSpecification(print: (self, parent, zone, message) {
+ ZoneSpecification(print: (self, parent, zone, message) {
printMessages.add(message);
}));
expect(printMessages, isEmpty,
@@ -96,7 +96,7 @@
});
test('Exception in the worker.', () async {
- var request = new WorkRequest();
+ var request = WorkRequest();
request.arguments.addAll(['--foo=bar']);
stdinStream.addInputBytes(protoToDelimitedBuffer(request));
stdinStream.close();
@@ -125,7 +125,7 @@
(stdinStream as TestStdinSync).pendingBytes.clear();
await workerLoop.run();
} else if (stdinStream is TestStdinAsync) {
- var done = new Completer();
+ var done = Completer();
workerLoop.run().then((_) => done.complete(null));
(stdinStream as TestStdinAsync).controller.addError('Error!!');
await done.future;
diff --git a/tool/travis.sh b/tool/travis.sh
index a011957..bccd424 100755
--- a/tool/travis.sh
+++ b/tool/travis.sh
@@ -8,7 +8,7 @@
set -e
# Verify that the libraries are error free.
-dartanalyzer --fatal-warnings \
+dartanalyzer --fatal-infos --fatal-warnings \
lib/bazel_worker.dart \
lib/driver.dart \
lib/testing.dart \
@@ -19,6 +19,6 @@
pushd e2e_test
pub get
-dartanalyzer --fatal-warnings test/e2e_test.dart
+dartanalyzer --fatal-infos --fatal-warnings test/e2e_test.dart
pub run test
popd
diff --git a/tool/update_proto.sh b/tool/update_proto.sh
new file mode 100755
index 0000000..b224694
--- /dev/null
+++ b/tool/update_proto.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+
+set -e
+
+if [ -z "$1" ]; then
+ echo "Expected exactly one argument which is the protoc_plugin version to use"
+else
+ echo "Using protoc_plugin version $1"
+ pub global activate protoc_plugin "$1"
+fi
+
+BAZEL_REPO=.dart_tool/bazel_worker/bazel.git/
+# Bash away old versions if they exist
+rm -rf "$BAZEL_REPO"
+git clone https://github.com/bazelbuild/bazel.git "$BAZEL_REPO"
+
+protoc --proto_path="${BAZEL_REPO}/src/main/protobuf" --dart_out=lib/src worker_protocol.proto
+dartfmt -w lib/src/worker_protocol.pb.dart
+
+# We only care about the *.pb.dart file, not the extra files
+rm lib/src/worker_protocol.pbenum.dart
+rm lib/src/worker_protocol.pbjson.dart
+rm lib/src/worker_protocol.pbserver.dart
+
+rm -rf "$BAZEL_REPO"