DTDManager handles the subscription to the service stream (#9635)

diff --git a/packages/devtools_app/lib/src/shared/editor/editor_client.dart b/packages/devtools_app/lib/src/shared/editor/editor_client.dart
index 2c2136a..ef03c44 100644
--- a/packages/devtools_app/lib/src/shared/editor/editor_client.dart
+++ b/packages/devtools_app/lib/src/shared/editor/editor_client.dart
@@ -4,85 +4,54 @@
 
 import 'dart:async';
 
+import 'package:devtools_app_shared/service.dart';
 import 'package:devtools_app_shared/utils.dart';
 import 'package:devtools_shared/devtools_shared.dart';
 import 'package:dtd/dtd.dart';
 import 'package:flutter/foundation.dart';
 import 'package:json_rpc_2/json_rpc_2.dart';
+import 'package:logging/logging.dart';
 
 import '../analytics/constants.dart';
 import '../framework/app_error_handling.dart';
 import 'api_classes.dart';
 
+final _log = Logger('editor_client');
+
 /// A client wrapper that connects to an editor over DTD.
 ///
 /// Changes made to the editor services/events should be considered carefully to
 /// ensure they are not breaking changes to already-shipped editors.
 class EditorClient extends DisposableController
     with AutoDisposeControllerMixin {
-  EditorClient(this._dtd) {
+  EditorClient(this._dtdManager) {
     unawaited(initialized); // Trigger async initialization.
   }
 
-  final DartToolingDaemon _dtd;
+  final DTDManager _dtdManager;
   late final initialized = _initialize();
 
+  DartToolingDaemon get _dtd => _dtdManager.connection.value!;
+
   String get gaId => EditorSidebar.id;
 
   Future<void> _initialize() async {
     autoDisposeStreamSubscription(
-      _dtd.onEvent(CoreDtdServiceConstants.servicesStreamId).listen((data) {
-        final kind = data.kind;
-        if (kind != CoreDtdServiceConstants.serviceRegisteredKind &&
-            kind != CoreDtdServiceConstants.serviceUnregisteredKind) {
-          return;
-        }
-
+      _dtdManager.serviceRegistrationBroadcastStream.listen((data) {
         final service = data.data[DtdParameters.service] as String?;
-        if (service == null ||
-            (service != editorServiceName && service != lspServiceName)) {
-          return;
-        }
-
+        if (service == null) return;
         final isRegistered =
-            kind == CoreDtdServiceConstants.serviceRegisteredKind;
+            data.kind == CoreDtdServiceConstants.serviceRegisteredKind;
         final method = data.data[DtdParameters.method] as String;
         final capabilities =
             data.data[DtdParameters.capabilities] as Map<String, Object?>?;
-        final lspMethod = LspMethod.fromMethodName(method);
-        if (lspMethod != null) {
-          lspMethod.isRegistered = isRegistered;
-          if (lspMethod == LspMethod.editableArguments) {
-            // Update the notifier so that the Property Editor is aware that the
-            // editableArguments API is registered.
-            _editableArgumentsApiIsRegistered.value = isRegistered;
-          }
-        } else if (method == EditorMethod.getDevices.name) {
-          _supportsGetDevices = isRegistered;
-        } else if (method == EditorMethod.getDebugSessions.name) {
-          _supportsGetDebugSessions = isRegistered;
-        } else if (method == EditorMethod.selectDevice.name) {
-          _supportsSelectDevice = isRegistered;
-        } else if (method == EditorMethod.hotReload.name) {
-          _supportsHotReload = isRegistered;
-        } else if (method == EditorMethod.hotRestart.name) {
-          _supportsHotRestart = isRegistered;
-        } else if (method == EditorMethod.openDevToolsPage.name) {
-          _supportsOpenDevToolsPage = isRegistered;
-          _supportsOpenDevToolsForceExternal =
-              capabilities?[Field.supportsForceExternal] == true;
-        } else {
-          return;
-        }
 
-        final info = isRegistered
-            ? ServiceRegistered(
-                service: service,
-                method: method,
-                capabilities: capabilities,
-              )
-            : ServiceUnregistered(service: service, method: method);
-        _editorServiceChangedController.add(info);
+        _handleServiceRegistration(
+          service: service,
+          method: method,
+          capabilities: capabilities,
+          isRegistered: isRegistered,
+        );
       }),
     );
 
@@ -126,15 +95,75 @@
         }
       }),
     );
-    await [
-      _dtd.streamListen(CoreDtdServiceConstants.servicesStreamId),
-      _dtd.streamListen(editorStreamName).catchError((_) {
-        // Because we currently call streamListen in two places (here and
-        // ThemeManager) this can fail. It doesn't matter if this happens,
-        // however we should refactor this code to better support using the DTD
-        // connection in multiple places without them having to coordinate.
-      }),
-    ].wait;
+
+    await _dtd.streamListen(editorStreamName).catchError((_) {
+      // Because we currently call streamListen in two places (here and
+      // ThemeManager) this can fail. It doesn't matter if this happens,
+      // however we should refactor this code to better support using the DTD
+      // connection in multiple places without them having to coordinate.
+    });
+
+    // Check if any client services have already been registered against DTD.
+    try {
+      final response = await _dtd.getRegisteredServices();
+      for (final service in response.clientServices) {
+        for (final method in service.methods.values) {
+          _handleServiceRegistration(
+            service: service.name,
+            method: method.name,
+            capabilities: method.capabilities,
+          );
+        }
+      }
+    } catch (e) {
+      _log.warning('Failed to fetch registered services: $e');
+    }
+  }
+
+  void _handleServiceRegistration({
+    required String service,
+    required String method,
+    Map<String, Object?>? capabilities,
+    bool isRegistered = true,
+  }) {
+    if (service != editorServiceName && service != lspServiceName) {
+      return;
+    }
+
+    final lspMethod = LspMethod.fromMethodName(method);
+    if (lspMethod != null) {
+      lspMethod.isRegistered = isRegistered;
+      if (lspMethod == LspMethod.editableArguments) {
+        // Update the notifier so that the Property Editor is aware that the
+        // editableArguments API is registered.
+        _editableArgumentsApiIsRegistered.value = isRegistered;
+      }
+    } else if (method == EditorMethod.getDevices.name) {
+      _supportsGetDevices = isRegistered;
+    } else if (method == EditorMethod.getDebugSessions.name) {
+      _supportsGetDebugSessions = isRegistered;
+    } else if (method == EditorMethod.selectDevice.name) {
+      _supportsSelectDevice = isRegistered;
+    } else if (method == EditorMethod.hotReload.name) {
+      _supportsHotReload = isRegistered;
+    } else if (method == EditorMethod.hotRestart.name) {
+      _supportsHotRestart = isRegistered;
+    } else if (method == EditorMethod.openDevToolsPage.name) {
+      _supportsOpenDevToolsPage = isRegistered;
+      _supportsOpenDevToolsForceExternal =
+          capabilities?[Field.supportsForceExternal] == true;
+    } else {
+      return;
+    }
+
+    final info = isRegistered
+        ? ServiceRegistered(
+            service: service,
+            method: method,
+            capabilities: capabilities,
+          )
+        : ServiceUnregistered(service: service, method: method);
+    _editorServiceChangedController.add(info);
   }
 
   /// Close the connection to DTD.
diff --git a/packages/devtools_app/lib/src/standalone_ui/ide_shared/property_editor/property_editor_panel.dart b/packages/devtools_app/lib/src/standalone_ui/ide_shared/property_editor/property_editor_panel.dart
index bf6e889..6558652 100644
--- a/packages/devtools_app/lib/src/standalone_ui/ide_shared/property_editor/property_editor_panel.dart
+++ b/packages/devtools_app/lib/src/standalone_ui/ide_shared/property_editor/property_editor_panel.dart
@@ -4,9 +4,9 @@
 
 import 'dart:async';
 
+import 'package:devtools_app_shared/service.dart';
 import 'package:devtools_app_shared/ui.dart';
 import 'package:devtools_app_shared/utils.dart';
-import 'package:dtd/dtd.dart';
 import 'package:flutter/material.dart';
 
 import '../../../framework/scaffold/report_feedback_button.dart';
@@ -20,9 +20,9 @@
 
 /// The side panel for the Property Editor.
 class PropertyEditorPanel extends StatefulWidget {
-  const PropertyEditorPanel(this.dtd, {super.key});
+  const PropertyEditorPanel(this.dtdManager, {super.key});
 
-  final DartToolingDaemon dtd;
+  final DTDManager dtdManager;
 
   @override
   State<PropertyEditorPanel> createState() => _PropertyEditorPanelState();
@@ -38,7 +38,7 @@
   void initState() {
     super.initState();
 
-    final editor = EditorClient(widget.dtd);
+    final editor = EditorClient(widget.dtdManager);
     ga.screen(gac.PropertyEditorSidebar.id);
     unawaited(
       _editor = editor.initialized.then((_) {
diff --git a/packages/devtools_app/lib/src/standalone_ui/standalone_screen.dart b/packages/devtools_app/lib/src/standalone_ui/standalone_screen.dart
index 0072eed..7cb0792 100644
--- a/packages/devtools_app/lib/src/standalone_ui/standalone_screen.dart
+++ b/packages/devtools_app/lib/src/standalone_ui/standalone_screen.dart
@@ -37,11 +37,11 @@
       ),
       StandaloneScreenType.editorSidebar => _DtdConnectedScreen(
         dtdManager: dtdManager,
-        builder: (dtd) => EditorSidebarPanel(dtd),
+        builder: EditorSidebarPanel.new,
       ),
       StandaloneScreenType.propertyEditor => _DtdConnectedScreen(
         dtdManager: dtdManager,
-        builder: (dtd) => PropertyEditorPanel(dtd),
+        builder: PropertyEditorPanel.new,
       ),
     };
   }
@@ -58,7 +58,7 @@
   const _DtdConnectedScreen({required this.dtdManager, required this.builder});
 
   final DTDManager dtdManager;
-  final Widget Function(DartToolingDaemon) builder;
+  final Widget Function(DTDManager) builder;
 
   @override
   Widget build(BuildContext context) {
@@ -80,7 +80,7 @@
                   // reconnect occurs.
                   KeyedSubtree(
                     key: ValueKey(connection),
-                    child: builder(connection),
+                    child: builder(dtdManager),
                   ),
                 if (connectionState is! ConnectedDTDState)
                   NotConnectedOverlay(connectionState),
diff --git a/packages/devtools_app/lib/src/standalone_ui/vs_code/flutter_panel.dart b/packages/devtools_app/lib/src/standalone_ui/vs_code/flutter_panel.dart
index abfba5e..a540ca7 100644
--- a/packages/devtools_app/lib/src/standalone_ui/vs_code/flutter_panel.dart
+++ b/packages/devtools_app/lib/src/standalone_ui/vs_code/flutter_panel.dart
@@ -4,9 +4,9 @@
 
 import 'dart:async';
 
+import 'package:devtools_app_shared/service.dart';
 import 'package:devtools_app_shared/ui.dart';
 import 'package:devtools_app_shared/utils.dart';
-import 'package:dtd/dtd.dart';
 import 'package:flutter/material.dart';
 
 import '../../shared/analytics/analytics.dart' as ga;
@@ -22,9 +22,9 @@
 /// Provides some basic functionality to improve discoverability of features
 /// such as creation of new projects, device selection and DevTools features.
 class EditorSidebarPanel extends StatefulWidget {
-  const EditorSidebarPanel(this.dtd, {super.key});
+  const EditorSidebarPanel(this.dtdManager, {super.key});
 
-  final DartToolingDaemon dtd;
+  final DTDManager dtdManager;
 
   @override
   State<EditorSidebarPanel> createState() => _EditorSidebarPanelState();
@@ -39,7 +39,7 @@
   void initState() {
     super.initState();
 
-    final editor = EditorClient(widget.dtd);
+    final editor = EditorClient(widget.dtdManager);
     ga.screen(editor.gaId);
     unawaited(_editor = editor.initialized.then((_) => editor));
   }
diff --git a/packages/devtools_app/test/service/dtd_manager_test.dart b/packages/devtools_app/test/service/dtd_manager_test.dart
new file mode 100644
index 0000000..7db638e
--- /dev/null
+++ b/packages/devtools_app/test/service/dtd_manager_test.dart
@@ -0,0 +1,181 @@
+// Copyright 2026 The Flutter Authors
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file or at https://developers.google.com/open-source/licenses/bsd.
+
+import 'dart:async';
+
+import 'package:async/async.dart';
+import 'package:devtools_app_shared/service.dart';
+import 'package:devtools_test/devtools_test.dart';
+import 'package:dtd/dtd.dart';
+import 'package:flutter_test/flutter_test.dart';
+import 'package:mockito/mockito.dart';
+
+void main() {
+  final fakeDtdUri = Uri.parse('ws://127.0.0.1:65314/KKXNgPdXnFk=');
+
+  group('serviceRegistrationBroadcastStream', () {
+    final fooBarRegisteredEvent = DTDEvent('Service', 'ServiceRegistered', {
+      'service': 'foo',
+      'method': 'bar',
+    }, 1);
+    final bazQuxRegisteredEvent = DTDEvent('Service', 'ServiceRegistered', {
+      'service': 'baz',
+      'method': 'qux',
+    }, 2);
+    final fooBarUnregisteredEvent = DTDEvent('Service', 'ServiceUnregistered', {
+      'service': 'foo',
+      'method': 'bar',
+    }, 4);
+    final invalidEvent = DTDEvent('Service', 'Invalid', {}, 3);
+
+    late TestDTDManager manager;
+    late MockDartToolingDaemon mockDtd1;
+    late MockDartToolingDaemon mockDtd2;
+
+    /// Sets up the [mockDTD] to return a [StreamController] so that events can
+    /// be added to the stream during the test.
+    StreamController<DTDEvent> setUpEventStream(MockDartToolingDaemon mockDTD) {
+      final streamController = StreamController<DTDEvent>();
+      when(mockDTD.streamListen(any)).thenAnswer((_) async => const Success());
+      when(mockDTD.onEvent(any)).thenAnswer((_) => streamController.stream);
+      return streamController;
+    }
+
+    setUp(() {
+      mockDtd1 = MockDartToolingDaemon();
+      mockDtd2 = MockDartToolingDaemon();
+      manager = TestDTDManager();
+    });
+
+    tearDown(() async {
+      await manager.dispose();
+    });
+
+    test('supports multiple subscribers', () async {
+      // Connect to DTD.
+      final streamController = setUpEventStream(mockDtd1);
+      manager.mockDtd = mockDtd1;
+      await manager.connect(fakeDtdUri);
+
+      // Create two subscribers.
+      final eventQueue1 = StreamQueue(
+        manager.serviceRegistrationBroadcastStream,
+      );
+      final eventQueue2 = StreamQueue(
+        manager.serviceRegistrationBroadcastStream,
+      );
+
+      try {
+        // Add an event.
+        streamController.add(fooBarRegisteredEvent);
+
+        // Verify both subscribers received the event.
+        expect(await eventQueue1.next, equals(fooBarRegisteredEvent));
+        expect(await eventQueue2.next, equals(fooBarRegisteredEvent));
+      } finally {
+        await eventQueue1.cancel();
+        await eventQueue2.cancel();
+      }
+    });
+
+    test(
+      'only forwards ServiceRegistered and ServiceUnregistered events',
+      () async {
+        // Connect to DTD.
+        final streamController = setUpEventStream(mockDtd1);
+        manager.mockDtd = mockDtd1;
+        await manager.connect(fakeDtdUri);
+
+        // Subscribe to the service registration stream.
+        final eventQueue = StreamQueue(
+          manager.serviceRegistrationBroadcastStream,
+        );
+
+        try {
+          // The manager only forwards registered and unregistered events.
+          streamController.add(fooBarRegisteredEvent);
+          streamController.add(invalidEvent);
+          streamController.add(fooBarUnregisteredEvent);
+          expect(
+            manager.serviceRegistrationBroadcastStream,
+            emitsInOrder([fooBarRegisteredEvent, fooBarUnregisteredEvent]),
+          );
+        } finally {
+          await eventQueue.cancel();
+        }
+      },
+    );
+
+    test('forwards events across multiple DTD connections', () async {
+      // Connect to the first DTD instance.
+      final streamController1 = setUpEventStream(mockDtd1);
+      manager.mockDtd = mockDtd1;
+      await manager.connect(fakeDtdUri);
+
+      final eventQueue = StreamQueue(
+        manager.serviceRegistrationBroadcastStream,
+      );
+
+      try {
+        // The manager forwards events from the first DTD instance.
+        streamController1.add(fooBarRegisteredEvent);
+        expect(await eventQueue.next, equals(fooBarRegisteredEvent));
+
+        // Connect to the second DTD instance:
+        final streamController2 = setUpEventStream(mockDtd2);
+        manager.mockDtd = mockDtd2;
+        await manager.connect(fakeDtdUri);
+
+        // The manager forwards events from the second DTD instance.
+        streamController2.add(bazQuxRegisteredEvent);
+        expect(await eventQueue.next, equals(bazQuxRegisteredEvent));
+      } finally {
+        await eventQueue.cancel();
+      }
+    });
+
+    test('continues to forward events while DTD is reconnecting', () async {
+      // Connect to DTD.
+      final streamController = setUpEventStream(mockDtd1);
+      final dtdDoneCompleter = Completer<void>();
+      when(mockDtd1.done).thenAnswer((_) => dtdDoneCompleter.future);
+      manager.mockDtd = mockDtd1;
+      await manager.connect(fakeDtdUri);
+
+      // Subscribe to the service registration stream.
+      final eventQueue = StreamQueue(
+        manager.serviceRegistrationBroadcastStream,
+      );
+      try {
+        // Send events while DTD is reconnecting.
+        manager.connectionState.addListener(() {
+          if (manager.connectionState.value is NotConnectedDTDState) {
+            streamController.add(fooBarRegisteredEvent);
+          }
+          if (manager.connectionState.value is ConnectingDTDState) {
+            streamController.add(bazQuxRegisteredEvent);
+          }
+        });
+
+        // Trigger a done event to force DTD to reconnect.
+        dtdDoneCompleter.complete();
+
+        // Verify the events sent during reconnection were received.
+        expect(await eventQueue.next, equals(fooBarRegisteredEvent));
+        expect(await eventQueue.next, equals(bazQuxRegisteredEvent));
+      } finally {
+        await eventQueue.cancel();
+      }
+    });
+  });
+}
+
+class TestDTDManager extends DTDManager {
+  DartToolingDaemon? mockDtd;
+
+  @override
+  Future<DartToolingDaemon> connectDtdImpl(Uri uri) async {
+    return mockDtd!;
+  }
+}
diff --git a/packages/devtools_app/test/shared/editor/editor_client_test.dart b/packages/devtools_app/test/shared/editor/editor_client_test.dart
index 69526c3..765c0ba 100644
--- a/packages/devtools_app/test/shared/editor/editor_client_test.dart
+++ b/packages/devtools_app/test/shared/editor/editor_client_test.dart
@@ -4,14 +4,17 @@
 
 import 'dart:convert';
 
+import 'package:dart_service_protocol_shared/dart_service_protocol_shared.dart';
 import 'package:devtools_app/src/shared/editor/api_classes.dart';
 import 'package:devtools_app/src/shared/editor/editor_client.dart';
 import 'package:devtools_test/devtools_test.dart';
 import 'package:dtd/dtd.dart';
+import 'package:flutter/foundation.dart';
 import 'package:flutter_test/flutter_test.dart';
 import 'package:mockito/mockito.dart';
 
 void main() {
+  late MockDTDManager mockDTDManager;
   late MockDartToolingDaemon mockDtd;
   late EditorClient editorClient;
 
@@ -23,7 +26,9 @@
   };
 
   setUp(() {
+    mockDTDManager = MockDTDManager();
     mockDtd = MockDartToolingDaemon();
+    when(mockDTDManager.connection).thenReturn(ValueNotifier(mockDtd));
 
     for (final MapEntry(key: method, value: responseJson)
         in methodToResponseJson.entries) {
@@ -40,7 +45,7 @@
       method.isRegistered = true;
     }
 
-    editorClient = EditorClient(mockDtd);
+    editorClient = EditorClient(mockDTDManager);
   });
 
   group('getRefactors', () {
@@ -234,6 +239,29 @@
       expect(result.errorMessage, 'API is unavailable.');
     });
   });
+
+  group('initialization', () {
+    test('checks for existing services registered on DTD', () async {
+      // Add getDevices service to registered services.
+      final getDevicesService = ClientServiceInfo(editorServiceName, {
+        EditorMethod.getDevices.name: ClientServiceMethodInfo(
+          EditorMethod.getDevices.name,
+        ),
+      });
+      final response = RegisteredServicesResponse(
+        dtdServices: [],
+        clientServices: [getDevicesService],
+      );
+      when(mockDtd.getRegisteredServices()).thenAnswer((_) async => response);
+
+      // Initialize the client.
+      final client = EditorClient(mockDTDManager);
+      await client.initialized;
+
+      // Check that it supports getDevices.
+      expect(client.supportsGetDevices, isTrue);
+    });
+  });
 }
 
 const _fakeScreenId = 'DevToolsScreen';
diff --git a/packages/devtools_app_shared/lib/src/service/dtd_manager.dart b/packages/devtools_app_shared/lib/src/service/dtd_manager.dart
index 183ca53..0035f7a 100644
--- a/packages/devtools_app_shared/lib/src/service/dtd_manager.dart
+++ b/packages/devtools_app_shared/lib/src/service/dtd_manager.dart
@@ -32,6 +32,21 @@
   Uri? get uri => _uri;
   Uri? _uri;
 
+  /// A stream of [CoreDtdServiceConstants.serviceRegisteredKind] and
+  /// [CoreDtdServiceConstants.serviceUnregisteredKind] events.
+  ///
+  /// Since this is a broadcast stream, it supports multiple subscribers.
+  /// Subscribers should also call [DartToolingDaemon.getRegisteredServices] to
+  /// detect any services that were already registered.
+  Stream<DTDEvent> get serviceRegistrationBroadcastStream =>
+      _serviceRegistrationController.stream;
+  final _serviceRegistrationController = StreamController<DTDEvent>.broadcast();
+
+  /// The subscription to the current service registration stream.
+  ///
+  /// This is canceled and reset with the DTD connection changes.
+  StreamSubscription<DTDEvent>? _currentServiceRegistrationSubscription;
+
   /// Whether or not to automatically reconnect if disconnected.
   ///
   /// This will happen by default as long as the disconnect wasn't
@@ -74,7 +89,7 @@
   }
 
   /// Triggers a reconnect to the last connected URI if the current state is
-  /// [ConnectionFailedDTDState] (and there was a pervious connection).
+  /// [ConnectionFailedDTDState] (and there was a previous connection).
   Future<void> reconnect() {
     final reconnectFunc = _lastConnectFunc;
     if (_connectionState.value is! ConnectionFailedDTDState ||
@@ -149,11 +164,18 @@
 
     try {
       final connection = await _connectWithRetries(uri, maxRetries: maxRetries);
+      await _listenForServiceRegistrationEvents(connection);
 
+      // Save the previous connection so that we can close it after the new
+      // connection is reestablished.
+      final previousConnection = _connection.value;
       _uri = uri;
       // Set this after setting the value of [_uri] so that [_uri] can be used
       // by any listeners of the [_connection] notifier.
       _connection.value = connection;
+      // Close the previous connection.
+      await previousConnection?.close();
+
       _connectionState.value = ConnectedDTDState();
       _log.info('Successfully connected to DTD at: $uri');
 
@@ -230,16 +252,19 @@
       // an explicit disconnect.
       _automaticallyReconnect = false;
 
-      // We only clear the connection if we are explicitly disconnecting. In the
-      // case where the connection just dropped, we leave it so that we can
-      // continue to render a page (usually with an overlay).
+      // We only close and clear the connection if we are explicitly
+      // disconnecting.
+      //
+      // In the case where the connection just dropped, we leave it so
+      // that we can continue to render a page (usually with an overlay), then
+      // only close it once the new connection is established.
+      if (_connection.value case final connection?) {
+        await connection.close();
+      }
       _connection.value = null;
     }
 
     _periodicConnectionCheck?.cancel();
-    if (_connection.value case final connection?) {
-      await connection.close();
-    }
 
     _connectionState.value = NotConnectedDTDState();
     _uri = null;
@@ -249,9 +274,47 @@
 
   Future<void> dispose() async {
     await disconnect();
+    await _currentServiceRegistrationSubscription?.cancel();
+    await _serviceRegistrationController.close();
     _connection.dispose();
   }
 
+  /// Listens for service registration events on the [dtd] connection.
+  Future<void> _listenForServiceRegistrationEvents(
+      DartToolingDaemon dtd) async {
+    // We immediately begin listening for service registration events on the new
+    // DTD connection before canceling the previous subscription. This
+    // guarantees that we don't miss any events across reconnects.
+    // ignore: cancel_subscriptions, false positive, it is canceled below.
+    final nextServiceRegistrationSubscription = dtd
+        .onEvent(CoreDtdServiceConstants.servicesStreamId)
+        .listen(_forwardServiceRegistrationEvents,
+            onError: _logServiceStreamError);
+    await dtd.streamListen(CoreDtdServiceConstants.servicesStreamId);
+
+    // Cancel the previous subscription.
+    await _currentServiceRegistrationSubscription?.cancel();
+    _currentServiceRegistrationSubscription =
+        nextServiceRegistrationSubscription;
+  }
+
+  /// Forwards service registration events to the
+  /// [_serviceRegistrationController].
+  void _forwardServiceRegistrationEvents(DTDEvent event) {
+    final kind = event.kind;
+    final isRegistrationEvent =
+        kind == CoreDtdServiceConstants.serviceRegisteredKind ||
+            kind == CoreDtdServiceConstants.serviceUnregisteredKind;
+
+    if (isRegistrationEvent) {
+      _serviceRegistrationController.add(event);
+    }
+  }
+
+  void _logServiceStreamError(Object error) {
+    _log.warning('Error in DTD service stream', error);
+  }
+
   /// Returns the workspace roots for the Dart Tooling Daemon connection.
   ///
   /// These roots are set by the tool that started DTD, which may be the IDE,