[beta][perf_witness] Handle abrupt disconnects better

Issue description: perf_witness client disconnecting abruptly can crash
the server

What is the fix: Add necessary exception handling to prevent network
exceptions leaking outside of JSON RPC layer.

Why cherry-pick: Avoid crashing DAS when info record-performance is
used.

Risk: Low

Issue link(s): https://github.com/dart-lang/sdk/issues/62534

Cherry-pick: https://dart-review.googlesource.com/c/sdk/+/495381
Change-Id: I5681dd9ba249346a79efd6d63ae93f8886bdb79d
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/495920
Reviewed-by: Martin Kustermann <kustermann@google.com>
Commit-Queue: Slava Egorov <vegorov@google.com>
diff --git a/pkg/perf_witness/lib/recorder.dart b/pkg/perf_witness/lib/recorder.dart
index 3c38849..c965a5a 100644
--- a/pkg/perf_witness/lib/recorder.dart
+++ b/pkg/perf_witness/lib/recorder.dart
@@ -7,6 +7,7 @@
 import 'dart:io' as io;
 
 import 'package:args/args.dart';
+import 'package:meta/meta.dart';
 import 'package:path/path.dart' as p;
 
 import 'src/common.dart';
@@ -318,9 +319,13 @@
 class Connection {
   final Stopwatch _recordingTime = Stopwatch();
   final ProcessInfo info;
+
+  @visibleForTesting
+  final io.Socket? socket;
+
   final JsonRpcPeer _endpoint;
 
-  Connection._(this.info, this._endpoint);
+  Connection._(this.info, this._endpoint, {this.socket});
 
   Future<void> startRecording(
     String outputDir, {
@@ -352,13 +357,12 @@
   }
 
   static Future<Connection> connectTo(String controlSocketPath) async {
-    final client = jsonRpcPeerFromSocket(
-      await UnixDomainSocket.connect(controlSocketPath),
-    );
+    final socket = await UnixDomainSocket.connect(controlSocketPath);
+    final client = jsonRpcPeerFromSocket(socket);
     final info = ProcessInfo.fromJson(
       await client.sendRequest('process.getInfo') as Map<String, Object?>,
     );
-    return Connection._(info, client);
+    return Connection._(info, client, socket: socket);
   }
 
   static Future<Connection?> _tryConnectTo(String controlSocket) async {
diff --git a/pkg/perf_witness/lib/server.dart b/pkg/perf_witness/lib/server.dart
index 37cd2fa..d34284b 100644
--- a/pkg/perf_witness/lib/server.dart
+++ b/pkg/perf_witness/lib/server.dart
@@ -20,7 +20,7 @@
   final String _recorderSocketPath;
   final ffi.Pointer<ffi.Bool> _isRecordingTimelineWithAsyncSpans;
 
-  bool _isRecordingTimeline = false;
+  json_rpc.Peer? _timelineRequestor;
 
   JsonRpcServer? _server;
   json_rpc.Peer? _recorderConnection;
@@ -141,7 +141,7 @@
     json_rpc.Peer requestor,
     Map<String, Object?>? params,
   ) async {
-    if (_isRecordingTimeline) {
+    if (_timelineRequestor != null) {
       throw StateError('Timeline is already being recorded');
     }
 
@@ -167,7 +167,14 @@
       enableProfiler: paramsObj.enableProfiler ?? false,
       samplingInterval: samplingInterval,
     );
-    _isRecordingTimeline = true;
+    _timelineRequestor = requestor;
+    requestor.done.whenComplete(() {
+      if (_timelineRequestor == requestor) {
+        developer.NativeRuntime.stopStreamingTimeline();
+        _isRecordingTimelineWithAsyncSpans.value = false;
+        _timelineRequestor = null;
+      }
+    }).ignore();
     _isRecordingTimelineWithAsyncSpans.value = enableAsyncSpans;
   }
 
@@ -175,12 +182,16 @@
     json_rpc.Peer requestor,
     Map<String, Object?>? params,
   ) async {
-    if (!_isRecordingTimeline) {
+    if (_timelineRequestor == null) {
       throw StateError('Timeline is not being recorded');
     }
 
+    if (_timelineRequestor != requestor) {
+      throw StateError('This peer did not start recording timeline');
+    }
+
+    _timelineRequestor = null;
     developer.NativeRuntime.stopStreamingTimeline();
-    _isRecordingTimeline = false;
     _isRecordingTimelineWithAsyncSpans.value = false;
   }
 
@@ -220,6 +231,9 @@
         await UnixDomainSocket.connect(recorderPath),
         _methods,
       );
+      _recorderConnection!.done.whenComplete(() {
+        _recorderConnection = null;
+      }).ignore();
       await _recorderConnection!.sendRequest(
         'process.announce',
         ProcessInfo.current(tag: tag).toJson(),
@@ -231,15 +245,15 @@
 
   Future<void> _shutdown() async {
     _recorderConnection?.close();
+    if (_timelineRequestor != null) {
+      developer.NativeRuntime.stopStreamingTimeline();
+      _timelineRequestor = null;
+    }
     await _server?.close();
     if (io.FileSystemEntity.typeSync(_controlSocketPath) != .notFound) {
       io.File(_controlSocketPath).deleteSync();
     }
     calloc.free(_isRecordingTimelineWithAsyncSpans);
-    if (_isRecordingTimeline) {
-      developer.NativeRuntime.stopStreamingTimeline();
-      _isRecordingTimeline = false;
-    }
   }
 }
 
diff --git a/pkg/perf_witness/lib/src/json_rpc.dart b/pkg/perf_witness/lib/src/json_rpc.dart
index 182a6b5..91c25ef 100644
--- a/pkg/perf_witness/lib/src/json_rpc.dart
+++ b/pkg/perf_witness/lib/src/json_rpc.dart
@@ -25,12 +25,22 @@
 ]) {
   final lineChannel = StreamChannel<String>(
     const LineSplitter().bind(utf8.decoder.bind(socket)),
-    StreamController<String>(sync: true, onCancel: socket.close)
+    StreamController<String>(
+        sync: true,
+        onCancel: () async {
+          try {
+            await socket.close();
+          } catch (_) {}
+        },
+      )
       ..stream.listen((line) {
         socket.write(line);
         socket.write('\n');
       }),
   );
+
+  socket.done.ignore();
+
   final peer = json_rpc.Peer(lineChannel);
   if (methods != null) {
     for (final MapEntry(:key, :value) in methods.entries) {
@@ -57,7 +67,7 @@
       _endpoints.add(endpoint);
       endpoint.done.whenComplete(() {
         _endpoints.remove(endpoint);
-      });
+      }).ignore();
     });
   }
 
@@ -65,7 +75,9 @@
   List<JsonRpcPeer> get endpoints => _endpoints.toList();
 
   Future<void> close() async {
-    await Future.wait(_endpoints.toList().map((e) => e.close()));
+    try {
+      await Future.wait(_endpoints.toList().map((e) => e.close()));
+    } catch (_) {}
     await _serverSocket.close();
   }
 }
diff --git a/pkg/perf_witness/pubspec.yaml b/pkg/perf_witness/pubspec.yaml
index f343df6..21e20e7 100644
--- a/pkg/perf_witness/pubspec.yaml
+++ b/pkg/perf_witness/pubspec.yaml
@@ -13,6 +13,7 @@
   dart_data_home: any
   ffi: any
   json_rpc_2: any
+  meta: any
   path: any
   stream_channel: any
 
diff --git a/pkg/perf_witness/test/recorder_server_test.dart b/pkg/perf_witness/test/recorder_server_test.dart
index c4d51221..b5679bc 100644
--- a/pkg/perf_witness/test/recorder_server_test.dart
+++ b/pkg/perf_witness/test/recorder_server_test.dart
@@ -862,6 +862,71 @@
       await clientSocket.close();
       await PerfWitnessServer.shutdown();
     });
+
+    test('recording stops when client disconnects', () async {
+      final busyLoopProcess = await BusyLoopProcess.start(
+        'busy-loop-tag',
+        tempDir,
+      );
+
+      // Request recording, but then disconnect. The server should handle this
+      // gracefully and stop recording.
+      {
+        final conn = await Connection.connectTo(
+          controlSocketPathForPid(
+            busyLoopProcess.process.pid,
+            controlSocketDirectory: p.join(tempDir.path, 'perf'),
+          )!,
+        );
+        await conn.startRecording(
+          tempDir.path,
+          config: PerfWitnessRecorderConfig(),
+        );
+        conn.disconnect();
+        await conn.socket?.done;
+      }
+
+      // Request recording again and check that this does not error.
+      {
+        final conn = await Connection.connectTo(
+          controlSocketPathForPid(
+            busyLoopProcess.process.pid,
+            controlSocketDirectory: p.join(tempDir.path, 'perf'),
+          )!,
+        );
+        await conn.startRecording(
+          tempDir.path,
+          config: PerfWitnessRecorderConfig(),
+        );
+        await conn.stopRecording();
+        conn.disconnect();
+        await conn.socket?.done;
+      }
+
+      await busyLoopProcess.process.askToExit();
+      expect(await busyLoopProcess.process.exitCode, 0);
+    });
+
+    test('server does not crash on abrupt client disconnect', () async {
+      final busyLoopProcess = await BusyLoopProcess.start(
+        'busy-loop-tag',
+        tempDir,
+      );
+      final conn = await Connection.connectTo(
+        controlSocketPathForPid(
+          busyLoopProcess.process.pid,
+          controlSocketDirectory: p.join(tempDir.path, 'perf'),
+        )!,
+      );
+      await conn.startRecording(
+        tempDir.path,
+        config: PerfWitnessRecorderConfig(),
+      );
+      conn.stopRecording().ignore();
+      conn.socket?.destroy();
+      await busyLoopProcess.process.askToExit();
+      expect(await busyLoopProcess.process.exitCode, 0);
+    });
   });
 
   group('AOT specific', () {