blob: 8dfc78144c80513cf7571806813d0131b2a6b991 [file] [log] [blame]
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:isolate';
import 'dart:typed_data';
import '../api.dart';
import '../executor.dart';
import 'exception_impls.dart';
import 'execute_macro.dart';
import 'message_grouper.dart';
import 'protocol.dart';
import 'remote_instance.dart';
import 'response_impls.dart';
import 'serialization.dart';
/// Implements the client side of the macro instantiation/expansion protocol.
final class MacroExpansionClient {
/// A map of the instantiable macro constructors.
///
/// The outer map is keyed by the URI of the library defining macros, whose
/// values are Maps keyed
final Map<Uri, Map<String, Map<String, Function>>> _macroConstructors;
/// Maps macro instance identifiers to instances.
final Map<MacroInstanceIdentifierImpl, Macro> _macroInstances = {};
/// Holds on to response completers by request id.
final Map<int, Completer<Response>> _responseCompleters = {};
MacroExpansionClient._(void Function(Serializer) sendResult,
Stream<Object?> messageStream, this._macroConstructors) {
messageStream.listen((message) => _handleMessage(message, sendResult));
}
/// Spawns a client connecting either to [sendPort] or a socket address and
/// port given in [arguments].
static Future<MacroExpansionClient> start(
SerializationMode serializationMode,
Map<Uri, Map<String, Map<String, Function>>> macroConstructors,
List<String> arguments,
SendPort? sendPort) {
return withSerializationMode(serializationMode, () async {
// Function that sends the result of a `Serializer` using either
// `sendPort` or `stdout`.
void Function(Serializer) sendResult;
// The stream for incoming messages, could be either a ReceivePort, stdin,
// or a socket.
Stream<Object?> messageStream;
String? socketAddress;
int? socketPort;
if (arguments.isNotEmpty) {
if (arguments.length != 2) {
throw ArgumentError(
'Expected exactly two or zero arguments, got $arguments.');
}
socketAddress = arguments.first;
socketPort = int.parse(arguments[1]);
}
if (sendPort != null) {
ReceivePort receivePort = ReceivePort();
messageStream = receivePort;
sendResult =
(Serializer serializer) => _sendIsolateResult(serializer, sendPort);
// If using isolate communication, first send a sendPort to the parent
// isolate.
sendPort.send(receivePort.sendPort);
} else {
late Stream<List<int>> inputStream;
if (socketAddress != null && socketPort != null) {
Socket socket = await Socket.connect(socketAddress, socketPort);
// Nagle's algorithm slows us down >100x, disable it.
socket.setOption(SocketOption.tcpNoDelay, true);
sendResult = _sendIOSinkResultFactory(socket);
inputStream = socket;
} else {
sendResult = _sendIOSinkResultFactory(stdout);
inputStream = stdin;
}
if (serializationMode == SerializationMode.byteData) {
messageStream = MessageGrouper(inputStream).messageStream;
} else if (serializationMode == SerializationMode.json) {
messageStream = const Utf8Decoder()
.bind(inputStream)
.transform(const LineSplitter())
.map((line) => jsonDecode(line)!);
} else {
throw UnsupportedError(
'Unsupported serialization mode $serializationMode for '
'ProcessExecutor');
}
}
return MacroExpansionClient._(
sendResult, messageStream, macroConstructors);
});
}
void _handleMessage(
Object? message, void Function(Serializer) sendResult) async {
// Serializes `request` and sends it using `sendResult`.
Future<Response> sendRequest(Request request) =>
_sendRequest(request, sendResult);
if (serializationMode == SerializationMode.byteData &&
message is TransferableTypedData) {
message = message.materialize().asUint8List();
}
Deserializer deserializer = deserializerFactory(message)..moveNext();
int zoneId = deserializer.expectInt();
await withRemoteInstanceZone(zoneId, () async {
deserializer.moveNext();
MessageType type = MessageType.values[deserializer.expectInt()];
Serializer serializer = serializerFactory();
switch (type) {
case MessageType.instantiateMacroRequest:
InstantiateMacroRequest request =
InstantiateMacroRequest.deserialize(deserializer, zoneId);
(await _instantiateMacro(request)).serialize(serializer);
case MessageType.disposeMacroRequest:
DisposeMacroRequest request =
DisposeMacroRequest.deserialize(deserializer, zoneId);
_macroInstances.remove(request.identifier);
return;
case MessageType.executeDeclarationsPhaseRequest:
ExecuteDeclarationsPhaseRequest request =
ExecuteDeclarationsPhaseRequest.deserialize(deserializer, zoneId);
(await _executeDeclarationsPhase(request, sendRequest))
.serialize(serializer);
case MessageType.executeDefinitionsPhaseRequest:
ExecuteDefinitionsPhaseRequest request =
ExecuteDefinitionsPhaseRequest.deserialize(deserializer, zoneId);
(await _executeDefinitionsPhase(request, sendRequest))
.serialize(serializer);
case MessageType.executeTypesPhaseRequest:
ExecuteTypesPhaseRequest request =
ExecuteTypesPhaseRequest.deserialize(deserializer, zoneId);
(await _executeTypesPhase(request, sendRequest))
.serialize(serializer);
case MessageType.response:
SerializableResponse response =
SerializableResponse.deserialize(deserializer, zoneId);
_responseCompleters.remove(response.requestId)!.complete(response);
return;
case MessageType.destroyRemoteInstanceZoneRequest:
DestroyRemoteInstanceZoneRequest request =
DestroyRemoteInstanceZoneRequest.deserialize(
deserializer, zoneId);
destroyRemoteInstanceZone(request.serializationZoneId);
return;
default:
throw StateError('Unhandled event type $type');
}
sendResult(serializer);
}, createIfMissing: true);
}
/// Handles [InstantiateMacroRequest]s.
Future<SerializableResponse> _instantiateMacro(
InstantiateMacroRequest request) async {
try {
Map<String, Map<String, Function>> classes =
_macroConstructors[request.library] ??
(throw ArgumentError(
'Unrecognized macro library ${request.library}'));
Map<String, Function> constructors = classes[request.name] ??
(throw ArgumentError(
'Unrecognized macro class ${request.name} for library '
'${request.library}'));
Function constructor = constructors[request.constructor] ??
(throw ArgumentError(
'Unrecognized constructor name "${request.constructor}" for '
'macro class "${request.name}".'));
Macro instance = Function.apply(constructor, [
for (Argument argument in request.arguments.positional) argument.value,
], {
for (MapEntry<String, Argument> entry
in request.arguments.named.entries)
Symbol(entry.key): entry.value.value,
}) as Macro;
MacroInstanceIdentifierImpl identifier =
MacroInstanceIdentifierImpl(instance, request.instanceId);
_macroInstances[identifier] = instance;
return SerializableResponse(
responseType: MessageType.macroInstanceIdentifier,
response: identifier,
requestId: request.id,
serializationZoneId: request.serializationZoneId);
} catch (e, s) {
return SerializableResponse(
responseType: MessageType.exception,
exception: MacroExceptionImpl.from(e, s),
requestId: request.id,
serializationZoneId: request.serializationZoneId);
}
}
Future<SerializableResponse> _executeTypesPhase(
ExecuteTypesPhaseRequest request,
Future<Response> Function(Request request) sendRequest) async {
try {
Macro instance = _macroInstances[request.macro] ??
(throw StateError('Unrecognized macro instance ${request.macro}\n'
'Known instances: $_macroInstances)'));
TypePhaseIntrospector introspector = ClientTypePhaseIntrospector(
sendRequest,
remoteInstance: request.introspector,
serializationZoneId: request.serializationZoneId);
MacroExecutionResult result = await runPhase(
() => executeTypesMacro(instance, request.target, introspector));
return SerializableResponse(
responseType: MessageType.macroExecutionResult,
response: result,
requestId: request.id,
serializationZoneId: request.serializationZoneId);
} catch (e, s) {
return SerializableResponse(
responseType: MessageType.exception,
exception: MacroExceptionImpl.from(e, s),
requestId: request.id,
serializationZoneId: request.serializationZoneId);
}
}
Future<SerializableResponse> _executeDeclarationsPhase(
ExecuteDeclarationsPhaseRequest request,
Future<Response> Function(Request request) sendRequest) async {
try {
Macro instance = _macroInstances[request.macro] ??
(throw StateError('Unrecognized macro instance ${request.macro}\n'
'Known instances: $_macroInstances)'));
DeclarationPhaseIntrospector introspector =
ClientDeclarationPhaseIntrospector(sendRequest,
remoteInstance: request.introspector,
serializationZoneId: request.serializationZoneId);
MacroExecutionResult result = await runPhase(() =>
executeDeclarationsMacro(instance, request.target, introspector));
return SerializableResponse(
responseType: MessageType.macroExecutionResult,
response: result,
requestId: request.id,
serializationZoneId: request.serializationZoneId);
} catch (e, s) {
return SerializableResponse(
responseType: MessageType.exception,
exception: MacroExceptionImpl.from(e, s),
requestId: request.id,
serializationZoneId: request.serializationZoneId);
}
}
Future<SerializableResponse> _executeDefinitionsPhase(
ExecuteDefinitionsPhaseRequest request,
Future<Response> Function(Request request) sendRequest) async {
try {
Macro instance = _macroInstances[request.macro] ??
(throw StateError('Unrecognized macro instance ${request.macro}\n'
'Known instances: $_macroInstances)'));
DefinitionPhaseIntrospector introspector =
ClientDefinitionPhaseIntrospector(sendRequest,
remoteInstance: request.introspector,
serializationZoneId: request.serializationZoneId);
MacroExecutionResult result = await runPhase(
() => executeDefinitionMacro(instance, request.target, introspector));
return SerializableResponse(
responseType: MessageType.macroExecutionResult,
response: result,
requestId: request.id,
serializationZoneId: request.serializationZoneId);
} catch (e, s) {
return SerializableResponse(
responseType: MessageType.exception,
exception: MacroExceptionImpl.from(e, s),
requestId: request.id,
serializationZoneId: request.serializationZoneId);
}
}
/// Serializes [request], passes it to [sendResult], and sets up a [Completer]
/// in [_responseCompleters] to handle the response.
Future<Response> _sendRequest(
Request request, void Function(Serializer serializer) sendResult) {
Completer<Response> completer = Completer();
_responseCompleters[request.id] = completer;
Serializer serializer = serializerFactory();
serializer.addInt(request.serializationZoneId);
request.serialize(serializer);
sendResult(serializer);
return completer.future;
}
}
/// Sends [serializer.result] to [sendPort], possibly wrapping it in a
/// [TransferableTypedData] object.
void _sendIsolateResult(Serializer serializer, SendPort sendPort) {
if (serializationMode == SerializationMode.byteData) {
sendPort
.send(TransferableTypedData.fromList([serializer.result as Uint8List]));
} else {
sendPort.send(serializer.result);
}
}
/// Returns a function which takes a [Serializer] and sends its result to
/// [sink].
///
/// Serializes the result to a string if using JSON.
void Function(Serializer) _sendIOSinkResultFactory(IOSink sink) =>
(Serializer serializer) {
if (serializationMode == SerializationMode.json) {
sink.writeln(jsonEncode(serializer.result));
} else if (serializationMode == SerializationMode.byteData) {
Uint8List result = (serializer as ByteDataSerializer).result;
int length = result.lengthInBytes;
BytesBuilder bytesBuilder = BytesBuilder(copy: false);
bytesBuilder.add([
length >> 24 & 0xff,
length >> 16 & 0xff,
length >> 8 & 0xff,
length & 0xff,
]);
bytesBuilder.add(result);
sink.add(bytesBuilder.takeBytes());
} else {
throw UnsupportedError(
'Unsupported serialization mode $serializationMode for '
'ProcessExecutor');
}
};
/// Runs [phase] in a [Zone] which tracks scheduled tasks, completing with a
/// [StateError] if [phase] returns a value while additional tasks or timers
/// are still scheduled.
Future<MacroExecutionResult> runPhase(
Future<MacroExecutionResult> Function() phase) {
final completer = Completer<MacroExecutionResult>();
var pendingMicrotasks = 0;
var activeTimers = 0;
Zone.current
.fork(
specification: ZoneSpecification(
handleUncaughtError: (self, parent, zone, error, stackTrace) {
if (completer.isCompleted) return;
completer.completeError(error, stackTrace);
},
createTimer: (self, parent, zone, duration, f) {
activeTimers++;
return _WrappedTimer(
parent.createTimer(zone, duration, () {
activeTimers--;
f();
}),
onCancel: () => activeTimers--);
},
createPeriodicTimer: (self, parent, zone, duration, f) {
activeTimers++;
return _WrappedTimer(parent.createPeriodicTimer(zone, duration, f),
onCancel: () => activeTimers--);
},
scheduleMicrotask: (self, parent, zone, f) {
pendingMicrotasks++;
parent.scheduleMicrotask(zone, () {
pendingMicrotasks--;
assert(pendingMicrotasks >= 0);
// This should only happen if we have previously competed with an
// error. Just skip this scheduled task in that case.
if (completer.isCompleted) return;
f();
});
},
))
.runGuarded(() => phase().then((value) {
if (completer.isCompleted) return;
if (pendingMicrotasks != 0) {
throw StateError(
'Macro completed but has $pendingMicrotasks async tasks still '
'pending. Macros must complete all async work prior to '
'returning.');
}
if (activeTimers != 0) {
throw StateError(
'Macro completed but has $activeTimers active timers. '
'Macros must cancel all timers prior to returning.');
}
completer.complete(value);
}));
return completer.future;
}
/// Wraps a [Timer] to track when it is cancelled and calls [onCancel], if the
/// timer is still active.
class _WrappedTimer implements Timer {
final Timer timer;
final void Function() onCancel;
_WrappedTimer(this.timer, {required this.onCancel});
@override
void cancel() {
if (isActive) onCancel();
timer.cancel();
}
@override
bool get isActive => timer.isActive;
@override
int get tick => timer.tick;
}