blob: a7180999bb6b02d762a9b77f43d16a3206daf450 [file] [log] [blame]
// Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:io';
import 'dart:isolate';
import 'dart:typed_data';
import '../async_message_grouper.dart';
import '../sync_message_grouper.dart';
import '../utils.dart';
import '../worker_protocol.pb.dart';
/// A connection from a worker to a driver (driver could be bazel, a dart
/// program using `BazelWorkerDriver`, or any other process that speaks the
/// protocol).
abstract class WorkerConnection {
/// Reads a [WorkRequest] or returns [null] if there are none left.
/// See [AsyncWorkerConnection] and [SyncWorkerConnection] for more narrow
/// interfaces.
FutureOr<WorkRequest?> readRequest();
void writeResponse(WorkResponse response);
abstract class AsyncWorkerConnection implements WorkerConnection {
/// Creates a [StdAsyncWorkerConnection] with the specified [inputStream]
/// and [outputStream], unless [sendPort] is specified, in which case
/// creates a [SendPortAsyncWorkerConnection].
factory AsyncWorkerConnection(
{Stream<List<int>>? inputStream,
StreamSink<List<int>>? outputStream,
SendPort? sendPort}) =>
sendPort == null
? StdAsyncWorkerConnection(
inputStream: inputStream, outputStream: outputStream)
: SendPortAsyncWorkerConnection(sendPort);
Future<WorkRequest?> readRequest();
abstract class SyncWorkerConnection implements WorkerConnection {
WorkRequest? readRequest();
/// Default implementation of [AsyncWorkerConnection] that works with [Stdin]
/// and [Stdout].
class StdAsyncWorkerConnection implements AsyncWorkerConnection {
final AsyncMessageGrouper _messageGrouper;
final StreamSink<List<int>> _outputStream;
{Stream<List<int>>? inputStream, StreamSink<List<int>>? outputStream})
: _messageGrouper = AsyncMessageGrouper(inputStream ?? stdin),
_outputStream = outputStream ?? stdout;
Future<WorkRequest?> readRequest() async {
var buffer = await;
if (buffer == null) return null;
return WorkRequest.fromBuffer(buffer);
void writeResponse(WorkResponse response) {
/// Implementation of [AsyncWorkerConnection] for running in an isolate.
class SendPortAsyncWorkerConnection implements AsyncWorkerConnection {
final ReceivePort receivePort;
final StreamIterator<Uint8List> receivePortIterator;
final SendPort sendPort;
factory SendPortAsyncWorkerConnection(SendPort sendPort) {
var receivePort = ReceivePort();
return SendPortAsyncWorkerConnection._(receivePort, sendPort);
SendPortAsyncWorkerConnection._(this.receivePort, this.sendPort)
: receivePortIterator = StreamIterator(receivePort.cast());
Future<WorkRequest?> readRequest() async {
if (!await receivePortIterator.moveNext()) return null;
return WorkRequest.fromBuffer(receivePortIterator.current);
void writeResponse(WorkResponse response) {
/// Default implementation of [SyncWorkerConnection] that works with [Stdin] and
/// [Stdout].
class StdSyncWorkerConnection implements SyncWorkerConnection {
final SyncMessageGrouper _messageGrouper;
final Stdout _stdoutStream;
StdSyncWorkerConnection({Stdin? stdinStream, Stdout? stdoutStream})
: _messageGrouper = SyncMessageGrouper(stdinStream ?? stdin),
_stdoutStream = stdoutStream ?? stdout;
WorkRequest? readRequest() {
var buffer =;
if (buffer == null) return null;
return WorkRequest.fromBuffer(buffer);
void writeResponse(WorkResponse response) {