[dds] Handle "Service Disappeared" errors from DDS/VM Service during shutdown

Change-Id: I1ddc59c56778461d5f42b210422eb24ba2f6da7b
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/220005
Reviewed-by: Ben Konyi <bkonyi@google.com>
Commit-Queue: Ben Konyi <bkonyi@google.com>
diff --git a/pkg/dds/lib/src/dap/adapters/dart.dart b/pkg/dds/lib/src/dap/adapters/dart.dart
index 67be4b5..655d646 100644
--- a/pkg/dds/lib/src/dap/adapters/dart.dart
+++ b/pkg/dds/lib/src/dap/adapters/dart.dart
@@ -13,6 +13,7 @@
 import 'package:vm_service/vm_service.dart' as vm;
 
 import '../../../dds.dart';
+import '../../rpc_error_codes.dart';
 import '../base_debug_adapter.dart';
 import '../exceptions.dart';
 import '../isolate_manager.dart';
@@ -49,6 +50,9 @@
 /// will work.
 const threadExceptionExpression = r'$_threadException';
 
+/// Typedef for handlers of VM Service stream events.
+typedef _StreamEventHandler<T> = FutureOr<void> Function(T data);
+
 /// Pattern for extracting useful error messages from an evaluation exception.
 final _evalErrorMessagePattern = RegExp('Error: (.*)');
 
@@ -354,6 +358,17 @@
 
   late final sendLogsToClient = args.sendLogsToClient ?? false;
 
+  /// Whether or not the DAP is terminating.
+  ///
+  /// When set to `true`, some requests that return "Service Disappeared" errors
+  /// will be caught and dropped as these are expected if the process is
+  /// terminating.
+  ///
+  /// This flag may be set by incoming requests from the client
+  /// (terminateRequest/disconnectRequest) or when a process terminates, or the
+  /// VM Service disconnects.
+  bool isTerminating = false;
+
   DartDebugAdapter(
     ByteStreamServerChannel channel, {
     this.ipv6 = false,
@@ -531,16 +546,20 @@
     this.vmService = vmService;
 
     unawaited(vmService.onDone.then((_) => _handleVmServiceClosed()));
+
+    // Handlers must be wrapped to handle Service Disappeared errors if async
+    // code tries to call the VM Service after termination begins.
+    final wrap = _wrapHandlerWithErrorHandling;
     _subscriptions.addAll([
-      vmService.onIsolateEvent.listen(handleIsolateEvent),
-      vmService.onDebugEvent.listen(handleDebugEvent),
-      vmService.onLoggingEvent.listen(handleLoggingEvent),
-      vmService.onExtensionEvent.listen(handleExtensionEvent),
-      vmService.onServiceEvent.listen(handleServiceEvent),
+      vmService.onIsolateEvent.listen(wrap(handleIsolateEvent)),
+      vmService.onDebugEvent.listen(wrap(handleDebugEvent)),
+      vmService.onLoggingEvent.listen(wrap(handleLoggingEvent)),
+      vmService.onExtensionEvent.listen(wrap(handleExtensionEvent)),
+      vmService.onServiceEvent.listen(wrap(handleServiceEvent)),
       if (_subscribeToOutputStreams)
-        vmService.onStdoutEvent.listen(_handleStdoutEvent),
+        vmService.onStdoutEvent.listen(wrap(_handleStdoutEvent)),
       if (_subscribeToOutputStreams)
-        vmService.onStderrEvent.listen(_handleStderrEvent),
+        vmService.onStderrEvent.listen(wrap(_handleStderrEvent)),
     ]);
     await Future.wait([
       vmService.streamListen(vm.EventStreams.kIsolate),
@@ -558,8 +577,20 @@
     // Let the subclass do any existing setup once we have a connection.
     await debuggerConnected(vmInfo);
 
-    // Process any existing isolates that may have been created before the
-    // streams above were set up.
+    await _withErrorHandling(
+      () => _configureExistingIsolates(vmService, vmInfo, resumeIfStarting),
+    );
+
+    _debuggerInitializedCompleter.complete();
+  }
+
+  /// Process any existing isolates that may have been created before the
+  /// streams above were set up.
+  Future<void> _configureExistingIsolates(
+    vm.VmService vmService,
+    vm.VM vmInfo,
+    bool resumeIfStarting,
+  ) async {
     final existingIsolateRefs = vmInfo.isolates;
     final existingIsolates = existingIsolateRefs != null
         ? await Future.wait(existingIsolateRefs
@@ -596,8 +627,6 @@
         }
       }
     }));
-
-    _debuggerInitializedCompleter.complete();
   }
 
   /// Handles the clients "continue" ("resume") request for the thread in
@@ -695,6 +724,8 @@
     DisconnectArguments? args,
     void Function() sendResponse,
   ) async {
+    isTerminating = true;
+
     await disconnectImpl();
     await shutdown();
     sendResponse();
@@ -839,6 +870,7 @@
       return;
     }
 
+    isTerminating = true;
     _hasSentTerminatedEvent = true;
     // Always add a leading newline since the last written text might not have
     // had one.
@@ -1318,6 +1350,8 @@
     TerminateArguments? args,
     void Function() sendResponse,
   ) async {
+    isTerminating = true;
+
     await terminateImpl();
     await shutdown();
     sendResponse();
@@ -1661,6 +1695,7 @@
   }
 
   Future<void> _handleVmServiceClosed() async {
+    isTerminating = true;
     if (terminateOnVmServiceClose) {
       handleSessionTerminate();
     }
@@ -1767,6 +1802,40 @@
       streamClosed: streamClosedCompleter.future,
     );
   }
+
+  /// Wraps a function with an error handler that handles errors that occur when
+  /// the VM Service/DDS shuts down.
+  ///
+  /// When the debug adapter is terminating, it's possible in-flight requests
+  /// triggered by handlers will fail with "Service Disappeared". This is
+  /// normal and such errors can be ignored, rather than allowed to pass
+  /// uncaught.
+  _StreamEventHandler<T> _wrapHandlerWithErrorHandling<T>(
+    _StreamEventHandler<T> handler,
+  ) {
+    return (data) => _withErrorHandling(() => handler(data));
+  }
+
+  /// Calls a function with an error handler that handles errors that occur when
+  /// the VM Service/DDS shuts down.
+  ///
+  /// When the debug adapter is terminating, it's possible in-flight requests
+  /// will fail with "Service Disappeared". This is normal and such errors can
+  /// be ignored, rather than allowed to pass uncaught.
+  FutureOr<T?> _withErrorHandling<T>(FutureOr<T> Function() func) async {
+    try {
+      return await func();
+    } on vm.RPCError catch (e) {
+      // If we're been asked to shut down while this request was occurring,
+      // it's normal to get kServiceDisappeared so we should handle this
+      // silently.
+      if (isTerminating && e.code == RpcErrorCodes.kServiceDisappeared) {
+        return null;
+      }
+
+      rethrow;
+    }
+  }
 }
 
 /// An implementation of [LaunchRequestArguments] that includes all fields used
diff --git a/pkg/dds/test/dap/integration/debug_test.dart b/pkg/dds/test/dap/integration/debug_test.dart
index 6e7565b..12f17b4 100644
--- a/pkg/dds/test/dap/integration/debug_test.dart
+++ b/pkg/dds/test/dap/integration/debug_test.dart
@@ -2,6 +2,7 @@
 // for details. All rights reserved. Use of this source code is governed by a
 // BSD-style license that can be found in the LICENSE file.
 
+import 'dart:async';
 import 'dart:io';
 
 import 'package:dds/src/dap/protocol_generated.dart';
@@ -159,6 +160,19 @@
       final source = await client.getValidSource(topFrame.source!);
       expect(source.content, contains('void print(Object? object) {'));
     });
+
+    test('can shutdown during startup', () async {
+      final testFile = dap.createTestFile(simpleArgPrintingProgram);
+
+      // Terminate the app immediately upon recieving the first Thread event.
+      // The DAP is also responding to this event to configure the isolate (eg.
+      // set breakpoints and exception pause behaviour) and will cause it to
+      // receive "Service has disappeared" responses if these are in-flight as
+      // the process terminates. These should not go unhandled since they are
+      // normal during shutdown.
+      unawaited(dap.client.event('thread').then((_) => dap.client.terminate()));
+      await dap.client.start(file: testFile);
+    });
     // These tests can be slow due to starting up the external server process.
   }, timeout: Timeout.none);
 
diff --git a/pkg/dds/test/dap/integration/test_client.dart b/pkg/dds/test/dap/integration/test_client.dart
index 66f44da..af40d1c 100644
--- a/pkg/dds/test/dap/integration/test_client.dart
+++ b/pkg/dds/test/dap/integration/test_client.dart
@@ -399,7 +399,7 @@
       } else {
         completer.completeError(message);
       }
-    } else if (message is Event) {
+    } else if (message is Event && !_eventController.isClosed) {
       _eventController.add(message);
 
       // When we see a terminated event, close the event stream so if any