blob: 4583ba2182ba34ce43cf3da482233cc0c369c610 [file] [log] [blame]
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:io';
import 'constants.dart';
import 'async_message_grouper.dart';
import 'utils.dart';
import 'worker_connection.dart';
import 'worker_loop.dart';
import 'worker_protocol.pb.dart';
/// Connection between a worker and input / output.
abstract class AsyncWorkerConnection implements WorkerConnection {
/// Read a new [WorkRequest]. Returns [null] when there are no more requests.
Future<WorkRequest> readRequest();
/// Write the given [response] as bytes to the output.
void writeResponse(WorkResponse response);
}
/// Persistent Bazel worker loop.
///
/// Extend this class and implement the `performRequest` method.
abstract class AsyncWorkerLoop implements WorkerLoop {
final AsyncWorkerConnection connection;
AsyncWorkerLoop({AsyncWorkerConnection connection})
: this.connection = connection ?? new StdAsyncWorkerConnection();
/// Perform a single [WorkRequest], and return a [WorkResponse].
Future<WorkResponse> performRequest(WorkRequest request);
/// Run the worker loop. The returned [Future] doesn't complete until
/// [connection#readRequest] returns `null`.
Future run() async {
while (true) {
WorkResponse response;
try {
var request = await connection.readRequest();
if (request == null) break;
response = await performRequest(request);
// In case they forget to set this.
response.exitCode ??= EXIT_CODE_OK;
} catch (e, s) {
response = new WorkResponse()
..exitCode = EXIT_CODE_ERROR
..output = '$e\n$s';
}
connection.writeResponse(response);
}
}
}
/// Default implementation of [AsyncWorkerConnection] that works with [Stdin]
/// and [Stdout].
class StdAsyncWorkerConnection implements AsyncWorkerConnection {
final AsyncMessageGrouper _messageGrouper;
final StreamSink<List<int>> _outputStream;
StdAsyncWorkerConnection(
{Stream<List<int>> inputStream, StreamSink<List<int>> outputStream})
: _messageGrouper = new AsyncMessageGrouper(inputStream ?? stdin),
_outputStream = outputStream ?? stdout;
@override
Future<WorkRequest> readRequest() async {
var buffer = await _messageGrouper.next;
if (buffer == null) return null;
return new WorkRequest.fromBuffer(buffer);
}
@override
void writeResponse(WorkResponse response) {
_outputStream.add(protoToDelimitedBuffer(response));
}
}