[beta][perf_witness] Handle abrupt disconnects better Issue description: perf_witness client disconnecting abruptly can crash the server What is the fix: Add necessary exception handling to prevent network exceptions leaking outside of JSON RPC layer. Why cherry-pick: Avoid crashing DAS when info record-performance is used. Risk: Low Issue link(s): https://github.com/dart-lang/sdk/issues/62534 Cherry-pick: https://dart-review.googlesource.com/c/sdk/+/495381 Change-Id: I5681dd9ba249346a79efd6d63ae93f8886bdb79d Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/495920 Reviewed-by: Martin Kustermann <kustermann@google.com> Commit-Queue: Slava Egorov <vegorov@google.com>
diff --git a/pkg/perf_witness/lib/recorder.dart b/pkg/perf_witness/lib/recorder.dart index 3c38849..c965a5a 100644 --- a/pkg/perf_witness/lib/recorder.dart +++ b/pkg/perf_witness/lib/recorder.dart
@@ -7,6 +7,7 @@ import 'dart:io' as io; import 'package:args/args.dart'; +import 'package:meta/meta.dart'; import 'package:path/path.dart' as p; import 'src/common.dart'; @@ -318,9 +319,13 @@ class Connection { final Stopwatch _recordingTime = Stopwatch(); final ProcessInfo info; + + @visibleForTesting + final io.Socket? socket; + final JsonRpcPeer _endpoint; - Connection._(this.info, this._endpoint); + Connection._(this.info, this._endpoint, {this.socket}); Future<void> startRecording( String outputDir, { @@ -352,13 +357,12 @@ } static Future<Connection> connectTo(String controlSocketPath) async { - final client = jsonRpcPeerFromSocket( - await UnixDomainSocket.connect(controlSocketPath), - ); + final socket = await UnixDomainSocket.connect(controlSocketPath); + final client = jsonRpcPeerFromSocket(socket); final info = ProcessInfo.fromJson( await client.sendRequest('process.getInfo') as Map<String, Object?>, ); - return Connection._(info, client); + return Connection._(info, client, socket: socket); } static Future<Connection?> _tryConnectTo(String controlSocket) async {
diff --git a/pkg/perf_witness/lib/server.dart b/pkg/perf_witness/lib/server.dart index 37cd2fa..d34284b 100644 --- a/pkg/perf_witness/lib/server.dart +++ b/pkg/perf_witness/lib/server.dart
@@ -20,7 +20,7 @@ final String _recorderSocketPath; final ffi.Pointer<ffi.Bool> _isRecordingTimelineWithAsyncSpans; - bool _isRecordingTimeline = false; + json_rpc.Peer? _timelineRequestor; JsonRpcServer? _server; json_rpc.Peer? _recorderConnection; @@ -141,7 +141,7 @@ json_rpc.Peer requestor, Map<String, Object?>? params, ) async { - if (_isRecordingTimeline) { + if (_timelineRequestor != null) { throw StateError('Timeline is already being recorded'); } @@ -167,7 +167,14 @@ enableProfiler: paramsObj.enableProfiler ?? false, samplingInterval: samplingInterval, ); - _isRecordingTimeline = true; + _timelineRequestor = requestor; + requestor.done.whenComplete(() { + if (_timelineRequestor == requestor) { + developer.NativeRuntime.stopStreamingTimeline(); + _isRecordingTimelineWithAsyncSpans.value = false; + _timelineRequestor = null; + } + }).ignore(); _isRecordingTimelineWithAsyncSpans.value = enableAsyncSpans; } @@ -175,12 +182,16 @@ json_rpc.Peer requestor, Map<String, Object?>? params, ) async { - if (!_isRecordingTimeline) { + if (_timelineRequestor == null) { throw StateError('Timeline is not being recorded'); } + if (_timelineRequestor != requestor) { + throw StateError('This peer did not start recording timeline'); + } + + _timelineRequestor = null; developer.NativeRuntime.stopStreamingTimeline(); - _isRecordingTimeline = false; _isRecordingTimelineWithAsyncSpans.value = false; } @@ -220,6 +231,9 @@ await UnixDomainSocket.connect(recorderPath), _methods, ); + _recorderConnection!.done.whenComplete(() { + _recorderConnection = null; + }).ignore(); await _recorderConnection!.sendRequest( 'process.announce', ProcessInfo.current(tag: tag).toJson(), @@ -231,15 +245,15 @@ Future<void> _shutdown() async { _recorderConnection?.close(); + if (_timelineRequestor != null) { + developer.NativeRuntime.stopStreamingTimeline(); + _timelineRequestor = null; + } await _server?.close(); if (io.FileSystemEntity.typeSync(_controlSocketPath) != .notFound) { io.File(_controlSocketPath).deleteSync(); } calloc.free(_isRecordingTimelineWithAsyncSpans); - if (_isRecordingTimeline) { - developer.NativeRuntime.stopStreamingTimeline(); - _isRecordingTimeline = false; - } } }
diff --git a/pkg/perf_witness/lib/src/json_rpc.dart b/pkg/perf_witness/lib/src/json_rpc.dart index 182a6b5..91c25ef 100644 --- a/pkg/perf_witness/lib/src/json_rpc.dart +++ b/pkg/perf_witness/lib/src/json_rpc.dart
@@ -25,12 +25,22 @@ ]) { final lineChannel = StreamChannel<String>( const LineSplitter().bind(utf8.decoder.bind(socket)), - StreamController<String>(sync: true, onCancel: socket.close) + StreamController<String>( + sync: true, + onCancel: () async { + try { + await socket.close(); + } catch (_) {} + }, + ) ..stream.listen((line) { socket.write(line); socket.write('\n'); }), ); + + socket.done.ignore(); + final peer = json_rpc.Peer(lineChannel); if (methods != null) { for (final MapEntry(:key, :value) in methods.entries) { @@ -57,7 +67,7 @@ _endpoints.add(endpoint); endpoint.done.whenComplete(() { _endpoints.remove(endpoint); - }); + }).ignore(); }); } @@ -65,7 +75,9 @@ List<JsonRpcPeer> get endpoints => _endpoints.toList(); Future<void> close() async { - await Future.wait(_endpoints.toList().map((e) => e.close())); + try { + await Future.wait(_endpoints.toList().map((e) => e.close())); + } catch (_) {} await _serverSocket.close(); } }
diff --git a/pkg/perf_witness/pubspec.yaml b/pkg/perf_witness/pubspec.yaml index f343df6..21e20e7 100644 --- a/pkg/perf_witness/pubspec.yaml +++ b/pkg/perf_witness/pubspec.yaml
@@ -13,6 +13,7 @@ dart_data_home: any ffi: any json_rpc_2: any + meta: any path: any stream_channel: any
diff --git a/pkg/perf_witness/test/recorder_server_test.dart b/pkg/perf_witness/test/recorder_server_test.dart index c4d51221..b5679bc 100644 --- a/pkg/perf_witness/test/recorder_server_test.dart +++ b/pkg/perf_witness/test/recorder_server_test.dart
@@ -862,6 +862,71 @@ await clientSocket.close(); await PerfWitnessServer.shutdown(); }); + + test('recording stops when client disconnects', () async { + final busyLoopProcess = await BusyLoopProcess.start( + 'busy-loop-tag', + tempDir, + ); + + // Request recording, but then disconnect. The server should handle this + // gracefully and stop recording. + { + final conn = await Connection.connectTo( + controlSocketPathForPid( + busyLoopProcess.process.pid, + controlSocketDirectory: p.join(tempDir.path, 'perf'), + )!, + ); + await conn.startRecording( + tempDir.path, + config: PerfWitnessRecorderConfig(), + ); + conn.disconnect(); + await conn.socket?.done; + } + + // Request recording again and check that this does not error. + { + final conn = await Connection.connectTo( + controlSocketPathForPid( + busyLoopProcess.process.pid, + controlSocketDirectory: p.join(tempDir.path, 'perf'), + )!, + ); + await conn.startRecording( + tempDir.path, + config: PerfWitnessRecorderConfig(), + ); + await conn.stopRecording(); + conn.disconnect(); + await conn.socket?.done; + } + + await busyLoopProcess.process.askToExit(); + expect(await busyLoopProcess.process.exitCode, 0); + }); + + test('server does not crash on abrupt client disconnect', () async { + final busyLoopProcess = await BusyLoopProcess.start( + 'busy-loop-tag', + tempDir, + ); + final conn = await Connection.connectTo( + controlSocketPathForPid( + busyLoopProcess.process.pid, + controlSocketDirectory: p.join(tempDir.path, 'perf'), + )!, + ); + await conn.startRecording( + tempDir.path, + config: PerfWitnessRecorderConfig(), + ); + conn.stopRecording().ignore(); + conn.socket?.destroy(); + await busyLoopProcess.process.askToExit(); + expect(await busyLoopProcess.process.exitCode, 0); + }); }); group('AOT specific', () {