blob: 5aa0a9d55cb30e4222837c14d26b1efd980e07d6 [file] [log] [blame]
// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:isolate';
import 'dart:typed_data';
import 'package:_macros/src/executor/message_grouper.dart';
import 'package:_macros/src/executor/serialization.dart';
void main() async {
for (var serializationMode in [
SerializationMode.json,
SerializationMode.byteData
]) {
await withSerializationMode(serializationMode, () async {
await _isolateSpawnBenchmarks();
await _isolateSpawnUriBenchmarks();
await _separateProcessStdioBenchmarks();
await _separateProcessSocketBenchmarks();
});
}
}
Future<void> _isolateSpawnBenchmarks() async {
void Function(SendPort) childIsolateFn(SerializationMode mode) =>
(SendPort sendPort) => withSerializationMode(mode, () {
var isolateReceivePort = ReceivePort();
isolateReceivePort.listen((data) {
deserialize(data);
var result = serialize();
result = result is Uint8List
? TransferableTypedData.fromList([result])
: result;
sendPort.send(result);
});
sendPort.send(isolateReceivePort.sendPort);
});
Completer? responseCompleter;
late SendPort sendPort;
var receivePort = ReceivePort();
var isolate = await Isolate.spawn(
childIsolateFn(serializationMode), receivePort.sendPort);
final sendPortCompleter = Completer<SendPort>();
receivePort.listen((data) {
if (!sendPortCompleter.isCompleted) {
sendPortCompleter.complete(data as SendPort);
} else {
responseCompleter!.complete(data);
}
});
sendPort = await sendPortCompleter.future;
// warmup
for (var i = 0; i < 100; i++) {
responseCompleter = Completer();
var result = serialize();
result =
result is Uint8List ? TransferableTypedData.fromList([result]) : result;
sendPort.send(result);
deserialize(await responseCompleter.future);
}
// measure
var watch = Stopwatch()..start();
for (var i = 0; i < 100; i++) {
responseCompleter = Completer();
var result = serialize();
result =
result is Uint8List ? TransferableTypedData.fromList([result]) : result;
sendPort.send(result);
deserialize(await responseCompleter.future);
}
print('Isolate.spawn + $serializationMode: ${watch.elapsed}');
receivePort.close();
isolate.kill();
}
Future<void> _isolateSpawnUriBenchmarks() async {
Completer? responseCompleter;
late SendPort sendPort;
var receivePort = ReceivePort();
var isolate = await Isolate.spawnUri(
Uri.dataFromString(childProgram(serializationMode)),
[],
receivePort.sendPort);
final sendPortCompleter = Completer<SendPort>();
receivePort.listen((data) {
if (!sendPortCompleter.isCompleted) {
sendPortCompleter.complete(data as SendPort);
} else {
responseCompleter!.complete(data);
}
});
sendPort = await sendPortCompleter.future;
// warmup
for (var i = 0; i < 100; i++) {
responseCompleter = Completer();
var result = serialize();
result =
result is Uint8List ? TransferableTypedData.fromList([result]) : result;
sendPort.send(result);
deserialize(await responseCompleter.future);
}
// measure
var watch = Stopwatch()..start();
for (var i = 0; i < 100; i++) {
responseCompleter = Completer();
var result = serialize();
result =
result is Uint8List ? TransferableTypedData.fromList([result]) : result;
sendPort.send(result);
deserialize(await responseCompleter.future);
}
print('Isolate.spawnUri + $serializationMode: ${watch.elapsed}');
receivePort.close();
isolate.kill();
}
Future<void> _separateProcessStdioBenchmarks() async {
Completer? responseCompleter;
var tmpDir = Directory.systemTemp.createTempSync('serialize_bench');
try {
var file = File(tmpDir.uri.resolve('main.dart').toFilePath());
file.writeAsStringSync(childProgram(serializationMode));
var process = await Process.start(Platform.resolvedExecutable, [
'--packages=${(await Isolate.packageConfig)!.toFilePath()}',
file.path,
]);
var listeners = <StreamSubscription>[
process.stderr.listen((event) {
print('stderr: ${utf8.decode(event)}');
}),
(serializationMode == SerializationMode.json
? process.stdout
: MessageGrouper(process.stdout).messageStream)
.listen((data) {
responseCompleter!.complete(data);
}),
];
// warmup
for (var i = 0; i < 100; i++) {
responseCompleter = Completer();
var result = serialize();
if (result is List<int>) {
final bytesBuilder = BytesBuilder(copy: false);
_writeLength(result, bytesBuilder);
bytesBuilder.add(result);
process.stdin.add(bytesBuilder.takeBytes());
} else {
process.stdin.writeln(jsonEncode(result));
}
deserialize(await responseCompleter.future);
}
// measure
var watch = Stopwatch()..start();
for (var i = 0; i < 100; i++) {
responseCompleter = Completer();
var result = serialize();
if (result is List<int>) {
final bytesBuilder = BytesBuilder(copy: false);
_writeLength(result, bytesBuilder);
bytesBuilder.add(result);
process.stdin.add(bytesBuilder.takeBytes());
} else {
process.stdin.writeln(jsonEncode(result));
}
deserialize(await responseCompleter.future);
}
print('Separate process + Stdio + $serializationMode: ${watch.elapsed}');
for (var listener in listeners) {
listener.cancel();
}
process.kill();
} catch (e, s) {
print('Error running benchmark \n$e\n\n$s');
} finally {
tmpDir.deleteSync(recursive: true);
}
}
Future<void> _separateProcessSocketBenchmarks() async {
Completer? responseCompleter;
var tmpDir = Directory.systemTemp.createTempSync('serialize_bench');
try {
var file = File(tmpDir.uri.resolve('main.dart').toFilePath());
file.writeAsStringSync(childProgram(serializationMode));
ServerSocket serverSocket;
// Try an ipv6 address loopback first, and fall back on ipv4.
try {
serverSocket = await ServerSocket.bind(InternetAddress.loopbackIPv6, 0);
} on SocketException catch (_) {
serverSocket = await ServerSocket.bind(InternetAddress.loopbackIPv4, 0);
}
Completer<Socket> clientCompleter = Completer();
serverSocket.listen((client) {
clientCompleter.complete(client);
});
var process = await Process.start(Platform.resolvedExecutable, [
'--packages=${(await Isolate.packageConfig)!.toFilePath()}',
file.path,
serverSocket.address.address,
serverSocket.port.toString(),
]);
var client = await clientCompleter.future;
// Nagle's algorithm slows us down >100x, disable it.
client.setOption(SocketOption.tcpNoDelay, true);
var listeners = <StreamSubscription>[
(serializationMode == SerializationMode.json
? client
: MessageGrouper(client).messageStream)
.listen((event) {
responseCompleter!.complete(event);
}),
process.stderr.listen((event) {
print('stderr: ${utf8.decode(event)}');
}),
process.stdout.listen((event) {
print('stdout: ${utf8.decode(event)}');
}),
];
// warmup
for (var i = 0; i < 100; i++) {
responseCompleter = Completer();
var result = serialize();
if (result is List<int>) {
final bytesBuilder = BytesBuilder(copy: false);
_writeLength(result, bytesBuilder);
bytesBuilder.add(result);
client.add(bytesBuilder.takeBytes());
} else {
client.write(jsonEncode(result));
}
deserialize(await responseCompleter.future);
}
// measure
var watch = Stopwatch()..start();
for (var i = 0; i < 100; i++) {
responseCompleter = Completer();
var result = serialize();
if (result is List<int>) {
final bytesBuilder = BytesBuilder(copy: false);
_writeLength(result, bytesBuilder);
bytesBuilder.add(result);
client.add(bytesBuilder.takeBytes());
} else {
client.write(jsonEncode(result));
}
deserialize(await responseCompleter.future);
}
print('Separate process + Socket + $serializationMode: ${watch.elapsed}');
for (var listener in listeners) {
listener.cancel();
}
process.kill();
await serverSocket.close();
client.destroy();
} catch (e, s) {
print('Error running benchmark \n$e\n\n$s');
} finally {
tmpDir.deleteSync(recursive: true);
}
}
void _writeLength(List<int> result, BytesBuilder bytesBuilder) {
int length = (result as Uint8List).lengthInBytes;
if (length > 0xffffffff) {
throw StateError('Message was larger than the allowed size!');
}
bytesBuilder.add([
length >> 24 & 0xff,
length >> 16 & 0xff,
length >> 8 & 0xff,
length & 0xff
]);
}
String childProgram(SerializationMode mode) => '''
import 'dart:convert';
import 'dart:io';
import 'dart:isolate';
import 'dart:typed_data';
import 'package:_fe_analyzer_shared/src/macros/executor/message_grouper.dart';
import 'package:_fe_analyzer_shared/src/macros/executor/serialization.dart';
void main(List<String> args, [SendPort? sendPort]) async {
var mode = $mode;
await withSerializationMode(mode, () async {
if (sendPort != null) {
var isolateReceivePort = ReceivePort();
isolateReceivePort.listen((data) {
deserialize(data);
var result = serialize();
result = result is Uint8List
? TransferableTypedData.fromList([result])
: result;
sendPort.send(result);
});
sendPort.send(isolateReceivePort.sendPort);
} else if (args.isNotEmpty) {
var address = args[0];
var port = int.parse(args[1]);
var socket = await Socket.connect(address, port);
if (mode == SerializationMode.json) {
socket.listen((data) {
var json = utf8.decode(data).trimRight();
deserialize(jsonDecode(json));
socket.write(jsonEncode(serialize()));
});
} else {
MessageGrouper(socket).messageStream.listen((data) {
deserialize(data);
var result = serialize() as Uint8List;
final bytesBuilder = BytesBuilder(copy: false);
_writeLength(result, bytesBuilder);
bytesBuilder.add(result);
socket.add(bytesBuilder.takeBytes());
});
}
} else {
// We allow one empty line to work around some weird data.
var allowEmpty = true;
if (mode == SerializationMode.json) {
stdin.listen((data) {
var json = utf8.decode(data).trimRight();
// On exit we tend to get extra empty lines sometimes?
if (json.isEmpty && allowEmpty) {
allowEmpty = false;
return;
}
deserialize(jsonDecode(json));
stdout.write(jsonEncode(serialize()));
});
} else {
MessageGrouper(stdin).messageStream.listen((data) {
deserialize(data);
var result = serialize() as Uint8List;
final bytesBuilder = BytesBuilder(copy: false);
_writeLength(result, bytesBuilder);
bytesBuilder.add(result);
stdout.add(bytesBuilder.takeBytes());
});
}
}
});
}
Object? serialize() {
var serializer = serializerFactory();
for (var i = 0; i < 100; i++) {
serializer.addInt(i * 100);
serializer.addString('foo' * i);
serializer.addBool(i % 2 == 0);
serializer.startList();
for (var j = 0; j < 10; j++) {
serializer.addDouble(i * 5);
}
serializer.endList();
serializer.addNull();
}
return serializer.result;
}
void deserialize(Object? result) {
result = result is TransferableTypedData
? result.materialize().asUint8List()
: result;
var deserializer = deserializerFactory(result);
while (deserializer.moveNext()) {
deserializer
..expectInt()
..moveNext()
..expectString()
..moveNext()
..expectBool()
..moveNext()
..expectList();
while (deserializer.moveNext()) {
deserializer.expectDouble();
}
deserializer
..moveNext()
..checkNull();
}
}
void _writeLength(Uint8List result, BytesBuilder bytesBuilder) {
int length = result.lengthInBytes;
if (length > 0xffffffff) {
throw new StateError('Message was larger than the allowed size!');
}
bytesBuilder.add([
length >> 24 & 0xff,
length >> 16 & 0xff,
length >> 8 & 0xff,
length & 0xff
]);
}''';
Object? serialize() {
var serializer = serializerFactory();
for (var i = -50; i < 50; i++) {
serializer.addInt(i % 2 * 100);
serializer.addString('foo' * i);
serializer.addBool(i < 0);
serializer.startList();
for (var j = 0.0; j < 10; j++) {
serializer.addDouble(i * j);
}
serializer.endList();
serializer.addNull();
}
return serializer.result;
}
void deserialize(Object? result) {
result = result is TransferableTypedData
? result.materialize().asUint8List()
: result;
if (serializationMode == SerializationMode.json) {
if (result is List<int>) {
result = jsonDecode(utf8.decode(result));
}
}
var deserializer = deserializerFactory(result);
while (deserializer.moveNext()) {
deserializer
..expectInt()
..moveNext()
..expectString()
..moveNext()
..expectBool()
..moveNext()
..expectList();
while (deserializer.moveNext()) {
deserializer.expectDouble();
}
deserializer
..moveNext()
..checkNull();
}
}