blob: 7205ecdbebc2cc9ddae7104eabe5cbf42f6fccb7 [file] [log] [blame]
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';
import 'package:_fe_analyzer_shared/src/macros/executor/protocol.dart';
import '../executor/message_grouper.dart';
import '../executor/executor_base.dart';
import '../executor/serialization.dart';
import '../executor.dart';
/// Spawns a [MacroExecutor] as a separate process, by running [program] with
/// [arguments], and communicating using [serializationMode].
///
/// The [serializationMode] must be a `server` variant, and [program] must use
/// the corresponding `client` variant.
///
/// This is the only public api exposed by this library.
Future<MacroExecutor> start(SerializationMode serializationMode,
CommunicationChannel communicationChannel, String program,
[List<String> arguments = const []]) {
switch (communicationChannel) {
case CommunicationChannel.stdio:
return _SingleProcessMacroExecutor.startWithStdio(
serializationMode, program, arguments);
case CommunicationChannel.socket:
return _SingleProcessMacroExecutor.startWithSocket(
serializationMode, program, arguments);
}
}
/// Actual implementation of the separate process based macro executor.
class _SingleProcessMacroExecutor extends ExternalMacroExecutorBase {
/// The IOSink that writes to stdin of the external process.
final IOSink outSink;
/// A function that should be invoked when shutting down this executor
/// to perform any necessary cleanup.
final void Function() onClose;
_SingleProcessMacroExecutor(
{required Stream<Object> messageStream,
required this.onClose,
required this.outSink,
required SerializationMode serializationMode})
: super(
messageStream: messageStream, serializationMode: serializationMode);
static Future<_SingleProcessMacroExecutor> startWithSocket(
SerializationMode serializationMode,
String programPath,
List<String> arguments) async {
late ServerSocket serverSocket;
// Try an ipv6 address loopback first, and fall back on ipv4.
try {
serverSocket = await ServerSocket.bind(InternetAddress.loopbackIPv6, 0);
} on SocketException catch (_) {
serverSocket = await ServerSocket.bind(InternetAddress.loopbackIPv4, 0);
}
Process process = await Process.start(programPath, [
...arguments,
serverSocket.address.address,
serverSocket.port.toString(),
]);
process.stderr
.transform(const Utf8Decoder())
.listen((content) => throw new RemoteException(content));
process.stdout.transform(const Utf8Decoder()).listen(
(event) => print('Stdout from MacroExecutor at $programPath:\n$event'));
Completer<Socket> clientCompleter = new Completer();
serverSocket.listen((client) {
clientCompleter.complete(client);
});
Socket client = await clientCompleter.future;
Stream<Object> messageStream;
if (serializationMode == SerializationMode.byteDataServer) {
messageStream = new MessageGrouper(client).messageStream;
} else if (serializationMode == SerializationMode.jsonServer) {
messageStream = const Utf8Decoder()
.bind(client)
.transform(const LineSplitter())
.map((line) => jsonDecode(line)!);
} else {
throw new UnsupportedError(
'Unsupported serialization mode \$serializationMode for '
'ProcessExecutor');
}
return new _SingleProcessMacroExecutor(
onClose: () {
client.close();
serverSocket.close();
process.kill();
},
messageStream: messageStream,
outSink: client,
serializationMode: serializationMode);
}
static Future<_SingleProcessMacroExecutor> startWithStdio(
SerializationMode serializationMode,
String programPath,
List<String> arguments) async {
Process process = await Process.start(programPath, arguments);
process.stderr
.transform(const Utf8Decoder())
.listen((content) => throw new RemoteException(content));
Stream<Object> messageStream;
if (serializationMode == SerializationMode.byteDataServer) {
messageStream = new MessageGrouper(process.stdout).messageStream;
} else if (serializationMode == SerializationMode.jsonServer) {
messageStream = process.stdout
.transform(const Utf8Decoder())
.transform(const LineSplitter())
.map((line) => jsonDecode(line)!);
} else {
throw new UnsupportedError(
'Unsupported serialization mode \$serializationMode for '
'ProcessExecutor');
}
return new _SingleProcessMacroExecutor(
onClose: () {
process.kill();
},
messageStream: messageStream,
outSink: process.stdin,
serializationMode: serializationMode);
}
@override
Future<void> close() => new Future.sync(onClose);
/// Sends the [Serializer.result] to [stdin].
///
/// Json results are serialized to a `String`, and separated by newlines.
void sendResult(Serializer serializer) {
if (serializationMode == SerializationMode.jsonServer) {
outSink.writeln(jsonEncode(serializer.result));
} else if (serializationMode == SerializationMode.byteDataServer) {
Uint8List result = (serializer as ByteDataSerializer).result;
int length = result.lengthInBytes;
if (length > 0xffffffff) {
throw new StateError('Message was larger than the allowed size!');
}
outSink.add([
length >> 24 & 0xff,
length >> 16 & 0xff,
length >> 8 & 0xff,
length & 0xff
]);
outSink.add(result);
} else {
throw new UnsupportedError(
'Unsupported serialization mode $serializationMode for '
'ProcessExecutor');
}
}
}
enum CommunicationChannel {
socket,
stdio,
}