[ DDS ] Ignore method no found responses for `streamCpuSamplesWithUserTag`

DWDS does not implement streamCpuSamplesWithUserTag and DDS can invoke
it without being prompted to by a client. This change gracefully handles
the absence of this RPC.

Related DWDS change: https://github.com/dart-lang/webdev/pull/1889

Change-Id: Ic821f39caefd25044c2c4a8b34a8271879c7bd5f
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/279020
Reviewed-by: Elliott Brooks <elliottbrooks@google.com>
Commit-Queue: Ben Konyi <bkonyi@google.com>
diff --git a/pkg/dds/CHANGELOG.md b/pkg/dds/CHANGELOG.md
index 4d59198..e68dd7c 100644
--- a/pkg/dds/CHANGELOG.md
+++ b/pkg/dds/CHANGELOG.md
@@ -5,6 +5,7 @@
 # 2.7.1
 - Updated `vm_service` version to >=9.0.0 <11.0.0.
 - Simplified the DevTools URI composed by DDS.
+- Fix issue where DDS was invoking an unimplemented RPC against a non-VM target.
 
 # 2.7.0
 - Added `DartDevelopmentService.setExternalDevToolsUri(Uri uri)`, adding support for registering an external DevTools server with DDS.
diff --git a/pkg/dds/lib/src/dds_impl.dart b/pkg/dds/lib/src/dds_impl.dart
index acd5d43..9d582c6 100644
--- a/pkg/dds/lib/src/dds_impl.dart
+++ b/pkg/dds/lib/src/dds_impl.dart
@@ -27,6 +27,7 @@
 import 'expression_evaluator.dart';
 import 'isolate_manager.dart';
 import 'package_uri_converter.dart';
+import 'rpc_error_codes.dart';
 import 'stream_manager.dart';
 
 @visibleForTesting
@@ -483,3 +484,18 @@
   late WebSocketChannel _vmServiceSocket;
   late HttpServer _server;
 }
+
+extension PeerExtension on json_rpc.Peer {
+  Future<dynamic> sendRequestAndIgnoreMethodNotFound(
+    String method, [
+    dynamic parameters,
+  ]) async {
+    try {
+      await sendRequest(method, parameters);
+    } on json_rpc.RpcException catch (e) {
+      if (e.code != RpcErrorCodes.kMethodNotFound) {
+        rethrow;
+      }
+    }
+  }
+}
diff --git a/pkg/dds/lib/src/isolate_manager.dart b/pkg/dds/lib/src/isolate_manager.dart
index ba725ce..837fd39 100644
--- a/pkg/dds/lib/src/isolate_manager.dart
+++ b/pkg/dds/lib/src/isolate_manager.dart
@@ -242,7 +242,7 @@
           }
         }
         if (dds.cachedUserTags.isNotEmpty) {
-          await dds.vmServiceClient.sendRequest(
+          await dds.vmServiceClient.sendRequestAndIgnoreMethodNotFound(
             'streamCpuSamplesWithUserTag',
             {
               'userTags': dds.cachedUserTags,
diff --git a/pkg/dds/lib/src/stream_manager.dart b/pkg/dds/lib/src/stream_manager.dart
index b6b4080..15dcea3 100644
--- a/pkg/dds/lib/src/stream_manager.dart
+++ b/pkg/dds/lib/src/stream_manager.dart
@@ -238,19 +238,13 @@
         if (streamListeners[stream]!.contains(client)) {
           throw kStreamAlreadySubscribedException;
         } else if (!streamNewlySubscribed && includePrivates != null) {
-          try {
-            await dds.vmServiceClient.sendRequest(
-                '_setStreamIncludePrivateMembers',
-                {'streamId': stream, 'includePrivateMembers': includePrivates});
-          } on json_rpc.RpcException catch (e) {
-            // This private RPC might not be present. If it's not, we're communicating with an older
-            // VM that doesn't support filtering private members, so they will always be included in
-            // responses. Handle the method not found exception so the streamListen call doesn't
-            // fail for older VMs.
-            if (e.code != RpcErrorCodes.kMethodNotFound) {
-              rethrow;
-            }
-          }
+          // This private RPC might not be present. If it's not, we're communicating with an older
+          // VM that doesn't support filtering private members, so they will always be included in
+          // responses. Handle the method not found exception so the streamListen call doesn't
+          // fail for older VMs.
+          await dds.vmServiceClient.sendRequestAndIgnoreMethodNotFound(
+              '_setStreamIncludePrivateMembers',
+              {'streamId': stream, 'includePrivateMembers': includePrivates});
         }
         if (client != null) {
           streamListeners[stream]!.add(client);
@@ -352,9 +346,12 @@
           _profilerUserTagSubscriptions.remove(subscribedTag);
         }
       }
-      await dds.vmServiceClient.sendRequest('streamCpuSamplesWithUserTag', {
-        'userTags': _profilerUserTagSubscriptions.toList(),
-      });
+      await dds.vmServiceClient.sendRequestAndIgnoreMethodNotFound(
+        'streamCpuSamplesWithUserTag',
+        {
+          'userTags': _profilerUserTagSubscriptions.toList(),
+        },
+      );
     });
   }