Add a MessageScheduler queue that all incoming messages to the analysis server go through.

Change-Id: I3770b5a1a8faa391fb35a5a1e3d2678dbc4fb3f2
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/383703
Commit-Queue: Keerti Parthasarathy <keertip@google.com>
Reviewed-by: Brian Wilkerson <brianwilkerson@google.com>
diff --git a/pkg/analysis_server/lib/src/analysis_server.dart b/pkg/analysis_server/lib/src/analysis_server.dart
index d0178e3..067f441 100644
--- a/pkg/analysis_server/lib/src/analysis_server.dart
+++ b/pkg/analysis_server/lib/src/analysis_server.dart
@@ -26,6 +26,7 @@
 import 'package:analysis_server/src/protocol_server.dart' as server;
 import 'package:analysis_server/src/server/crash_reporting_attachments.dart';
 import 'package:analysis_server/src/server/diagnostic_server.dart';
+import 'package:analysis_server/src/server/message_scheduler.dart';
 import 'package:analysis_server/src/server/performance.dart';
 import 'package:analysis_server/src/services/completion/completion_performance.dart';
 import 'package:analysis_server/src/services/correction/assist_internal.dart';
@@ -250,6 +251,10 @@
   /// A completer for [lspUninitialized].
   final Completer<void> _lspUninitializedCompleter = Completer<void>();
 
+  /// A scheduler that keeps track of all incoming messages and schedules them
+  /// for processing.
+  final MessageScheduler messageScheduler;
+
   AnalysisServer(
     this.options,
     this.sdkManager,
@@ -269,7 +274,9 @@
   })  : resourceProvider = OverlayResourceProvider(baseResourceProvider),
         pubApi = PubApi(instrumentationService, httpClient,
             Platform.environment['PUB_HOSTED_URL']),
-        producerGeneratorsForLintRules = AssistProcessor.computeLintRuleMap() {
+        producerGeneratorsForLintRules = AssistProcessor.computeLintRuleMap(),
+        messageScheduler = MessageScheduler() {
+    messageScheduler.setServer(this);
     // Set the default URI converter. This uses the resource providers path
     // context (unlike the initialized value) which allows tests to override it.
     uriConverter = ClientUriConverter.noop(baseResourceProvider.pathContext);
diff --git a/pkg/analysis_server/lib/src/legacy_analysis_server.dart b/pkg/analysis_server/lib/src/legacy_analysis_server.dart
index 5dd6d05..1b7daf5 100644
--- a/pkg/analysis_server/lib/src/legacy_analysis_server.dart
+++ b/pkg/analysis_server/lib/src/legacy_analysis_server.dart
@@ -86,6 +86,7 @@
 import 'package:analysis_server/src/server/diagnostic_server.dart';
 import 'package:analysis_server/src/server/error_notifier.dart';
 import 'package:analysis_server/src/server/features.dart';
+import 'package:analysis_server/src/server/message_scheduler.dart';
 import 'package:analysis_server/src/server/performance.dart';
 import 'package:analysis_server/src/server/sdk_configuration.dart';
 import 'package:analysis_server/src/services/completion/completion_state.dart';
@@ -606,7 +607,8 @@
   /// Handle a [request] that was read from the communication channel.
   void handleRequestOrResponse(RequestOrResponse requestOrResponse) {
     if (requestOrResponse is Request) {
-      handleRequest(requestOrResponse);
+      messageScheduler.add(LegacyMessage(request: requestOrResponse));
+      messageScheduler.notify();
     } else if (requestOrResponse is Response) {
       handleResponse(requestOrResponse);
     }
diff --git a/pkg/analysis_server/lib/src/lsp/lsp_analysis_server.dart b/pkg/analysis_server/lib/src/lsp/lsp_analysis_server.dart
index 9935437..52d392b 100644
--- a/pkg/analysis_server/lib/src/lsp/lsp_analysis_server.dart
+++ b/pkg/analysis_server/lib/src/lsp/lsp_analysis_server.dart
@@ -27,6 +27,7 @@
 import 'package:analysis_server/src/server/detachable_filesystem_manager.dart';
 import 'package:analysis_server/src/server/diagnostic_server.dart';
 import 'package:analysis_server/src/server/error_notifier.dart';
+import 'package:analysis_server/src/server/message_scheduler.dart';
 import 'package:analysis_server/src/server/performance.dart';
 import 'package:analysis_server/src/services/user_prompts/dart_fix_prompt_manager.dart';
 import 'package:analysis_server/src/utilities/extensions/flutter.dart';
@@ -191,7 +192,8 @@
     analysisDriverScheduler.start();
 
     _channelSubscription =
-        channel.listen(handleMessage, onDone: done, onError: socketError);
+        channel.listen(scheduleMessage, onDone: done, onError: socketError);
+
     if (AnalysisServer.supportsPlugins) {
       _pluginChangeSubscription =
           pluginManager.pluginsChanged.listen((_) => _onPluginsChanged());
@@ -757,6 +759,11 @@
     }
   }
 
+  void scheduleMessage(Message message) {
+    messageScheduler.add(LspMessage(message: message));
+    messageScheduler.notify();
+  }
+
   void sendErrorResponse(Message message, ResponseError error) {
     if (message is RequestMessage) {
       sendResponse(ResponseMessage(
diff --git a/pkg/analysis_server/lib/src/server/message_scheduler.dart b/pkg/analysis_server/lib/src/server/message_scheduler.dart
new file mode 100644
index 0000000..7d9948b
--- /dev/null
+++ b/pkg/analysis_server/lib/src/server/message_scheduler.dart
@@ -0,0 +1,92 @@
+// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:collection';
+
+import 'package:analysis_server/lsp_protocol/protocol.dart' as lsp;
+import 'package:analysis_server/protocol/protocol.dart' as legacy;
+import 'package:analysis_server/src/analysis_server.dart';
+import 'package:analysis_server/src/legacy_analysis_server.dart';
+import 'package:analysis_server/src/lsp/lsp_analysis_server.dart';
+import 'package:analyzer/src/util/performance/operation_performance.dart';
+import 'package:meta/meta.dart';
+
+/// Represents a message from DTD (Dart Tooling Daemon).
+final class DtdMessage extends MessageObject {
+  final lsp.IncomingMessage message;
+  final Completer<Map<String, Object?>> completer;
+  final OperationPerformanceImpl performance;
+
+  DtdMessage(
+      {required this.message,
+      required this.completer,
+      required this.performance});
+}
+
+/// Represents a message in the Legacy protocol format.
+final class LegacyMessage extends MessageObject {
+  final legacy.Request request;
+
+  LegacyMessage({required this.request});
+}
+
+/// Represents a message in the LSP protocol format.
+final class LspMessage extends MessageObject {
+  final lsp.Message message;
+
+  LspMessage({required this.message});
+}
+
+/// Represents a message from a client, can be an IDE, DTD etc.
+sealed class MessageObject {}
+
+/// The [MessageScheduler] receives messages from all clients of the
+/// [AnalysisServer]. Clients can include IDE's (LSP and Legacy protocol), DTD,
+/// and the Diagnostic server. The [MessageSchedular] acts as a hub for all
+/// incoming messages and forwards the messages to the appropriate handlers.
+final class MessageScheduler {
+  /// The [AnalaysisServer] associated with the schedular.
+  late final AnalysisServer server;
+
+  /// A queue of incoming messages from all the clients of the [AnalysisServer].
+  final ListQueue<MessageObject> _incomingMessages = ListQueue<MessageObject>();
+
+  @visibleForTesting
+  ListQueue<MessageObject> get incomingMessages => _incomingMessages;
+
+  /// Add a message to the end of the incoming messages queue.
+  void add(MessageObject message) {
+    _incomingMessages.addLast(message);
+  }
+
+  /// Notify the [MessageSchedular] to process the messages queue.
+  void notify() async {
+    processMessages();
+  }
+
+  /// Dispatch the first message in the queue to be executed.
+  void processMessages() {
+    if (_incomingMessages.isEmpty) {
+      return;
+    }
+    var message = _incomingMessages.removeFirst();
+    switch (message) {
+      case LspMessage():
+        var lspMessage = message.message;
+        (server as LspAnalysisServer).handleMessage(lspMessage);
+      case LegacyMessage():
+        var request = message.request;
+        (server as LegacyAnalysisServer).handleRequest(request);
+      case DtdMessage():
+        server.dtd!.processMessage(
+            message.message, message.performance, message.completer);
+    }
+  }
+
+  /// Set the [AnalysisServer].
+  void setServer(AnalysisServer analysisServer) {
+    server = analysisServer;
+  }
+}
diff --git a/pkg/analysis_server/lib/src/services/dart_tooling_daemon/dtd_services.dart b/pkg/analysis_server/lib/src/services/dart_tooling_daemon/dtd_services.dart
index 425d832..b58af9b 100644
--- a/pkg/analysis_server/lib/src/services/dart_tooling_daemon/dtd_services.dart
+++ b/pkg/analysis_server/lib/src/services/dart_tooling_daemon/dtd_services.dart
@@ -6,10 +6,10 @@
 
 import 'package:analysis_server/lsp_protocol/protocol.dart';
 import 'package:analysis_server/src/analysis_server.dart';
-import 'package:analysis_server/src/lsp/client_capabilities.dart';
 import 'package:analysis_server/src/lsp/error_or.dart';
 import 'package:analysis_server/src/lsp/handlers/handler_states.dart';
 import 'package:analysis_server/src/lsp/handlers/handlers.dart';
+import 'package:analysis_server/src/server/message_scheduler.dart';
 import 'package:analysis_server/src/server/performance.dart';
 import 'package:analyzer/src/util/performance/operation_performance.dart';
 import 'package:dtd/dtd.dart';
@@ -63,6 +63,54 @@
 
   DtdConnectionState get state => _state;
 
+  /// Executes the LSP handler [messageHandler] with [params] and returns the
+  /// results as a map to provide back to DTD.
+  ///
+  /// If the handler fails, throws an [RpcException] to be propagated to the
+  /// client.
+  void processMessage(
+      IncomingMessage message,
+      OperationPerformanceImpl performance,
+      Completer<Map<String, Object?>> completer) async {
+    // (TODO:keertip) Lookup the right handler, execute and return results.
+    // For now, complete with exception.
+    completer.completeError(RpcException(
+      ErrorCodes.InvalidRequest.toJson(),
+      'DTD requests are not yet supported',
+    ));
+
+    // (TODO:keertip) Uncomment when lookup has been implemented
+    // var info = MessageInfo(
+    //   performance: performance,
+    //   // DTD clients requests are always executed with a fixed set of
+    //   // capabilities so that the responses don't change in format based on the
+    //   // owning editor.
+    //   clientCapabilities: fixedBasicLspClientCapabilities,
+    // );
+    // var token = NotCancelableToken(); // We don't currently support cancel.
+
+    // // Execute the handler.
+    // var result = await messageHandler.handleMessage(message, info, token);
+
+    // // Map the result (or error) on to what a DTD handler needs to return.
+    // return result.map(
+    //   // Map LSP errors on to equiv JSON-RPC errors for DTD.
+    //   (error) => throw RpcException(
+    //     error.code.toJson(),
+    //     error.message,
+    //     data: error.data,
+    //   ),
+    //   // DTD requires that all results are a Map and that they contain a
+    //   // 'type' field. This differs slightly from LSP where we could return a
+    //   // boolean (for example). This means we need to put the result in a
+    //   // field, which we're calling 'result'.
+    //   (result) => {
+    //     'type': result?.runtimeType.toString(),
+    //     'result': result,
+    //   },
+    // );
+  }
+
   /// Closes the connection to DTD and cleans up.
   void _close([DtdConnectionState state = DtdConnectionState.Disconnected]) {
     _state = state;
@@ -122,54 +170,26 @@
     }
   }
 
-  /// Executes the LSP handler [messageHandler] with [params] and returns the
-  /// results as a map to provide back to DTD.
-  ///
-  /// If the handler fails, throws an [RpcException] to be propagated to the
-  /// client.
+  /// The incoming request is sent to the [MessageScheduler] for execution.
+  /// A completer is returned which will be completed with the result of the
+  /// execution of the request by the corresponding [MessageHandler].
   Future<Map<String, Object?>> _executeLspHandler(
     MessageHandler<Object?, Object?, AnalysisServer> messageHandler,
     Parameters params,
     OperationPerformanceImpl performance,
   ) async {
-    // TODO(dantup): Currently the handler just runs immediately, but this
-    //  should interact with the scheduler in future.
-
     // Map the incoming request into types we use for LSP request handling.
     var message = IncomingMessage(
       jsonrpc: jsonRpcVersion,
       method: messageHandler.handlesMessage,
       params: params.asMap,
     );
-    var info = MessageInfo(
-      performance: performance,
-      // DTD clients requests are always executed with a fixed set of
-      // capabilities so that the responses don't change in format based on the
-      // owning editor.
-      clientCapabilities: fixedBasicLspClientCapabilities,
-    );
-    var token = NotCancelableToken(); // We don't currently support cancel.
-
-    // Execute the handler.
-    var result = await messageHandler.handleMessage(message, info, token);
-
-    // Map the result (or error) on to what a DTD handler needs to return.
-    return result.map(
-      // Map LSP errors on to equiv JSON-RPC errors for DTD.
-      (error) => throw RpcException(
-        error.code.toJson(),
-        error.message,
-        data: error.data,
-      ),
-      // DTD requires that all results are a Map and that they contain a
-      // 'type' field. This differs slightly from LSP where we could return a
-      // boolean (for example). This means we need to put the result in a
-      // field, which we're calling 'result'.
-      (result) => {
-        'type': result?.runtimeType.toString(),
-        'result': result,
-      },
-    );
+    var scheduler = _server.messageScheduler;
+    var completer = Completer<Map<String, Object?>>();
+    scheduler.add(DtdMessage(
+        message: message, performance: performance, completer: completer));
+    scheduler.notify();
+    return completer.future;
   }
 
   /// Handles an unexpected error occurring on the DTD connection by logging and
diff --git a/pkg/analysis_server/test/integration/server/message_scheduler_test.dart b/pkg/analysis_server/test/integration/server/message_scheduler_test.dart
new file mode 100644
index 0000000..1df1d6d
--- /dev/null
+++ b/pkg/analysis_server/test/integration/server/message_scheduler_test.dart
@@ -0,0 +1,119 @@
+// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:collection';
+
+import 'package:analysis_server/lsp_protocol/protocol.dart' as lsp;
+import 'package:analysis_server/protocol/protocol.dart' as legacy;
+import 'package:analysis_server/protocol/protocol_constants.dart';
+import 'package:analysis_server/protocol/protocol_generated.dart';
+import 'package:analysis_server/src/server/message_scheduler.dart';
+import 'package:analyzer/src/util/performance/operation_performance.dart';
+import 'package:analyzer_utilities/testing/tree_string_sink.dart';
+import 'package:test/test.dart';
+import 'package:test_reflective_loader/test_reflective_loader.dart';
+
+void main() {
+  defineReflectiveSuite(() {
+    defineReflectiveTests(MessageSchedulerTest);
+  });
+}
+
+@reflectiveTest
+class MessageSchedulerTest {
+  late MessageScheduler messageScheduler;
+
+  DtdMessage get dtdMessage {
+    return DtdMessage(
+        message: lspRquest,
+        performance: OperationPerformanceImpl('<root>'),
+        completer: Completer());
+  }
+
+  legacy.Request get legacyRequest {
+    var params = AnalysisSetAnalysisRootsParams(['a', 'b', 'c'], ['d', 'e']);
+    return legacy.Request('1', ANALYSIS_REQUEST_SET_ANALYSIS_ROOTS,
+        params.toJson(clientUriConverter: null));
+  }
+
+  lsp.RequestMessage get lspRquest {
+    var params = {'processId': 'invalid'};
+    return lsp.RequestMessage(
+      id: lsp.Either2<int, String>.t1(1),
+      method: lsp.Method.initialize,
+      params: params,
+      jsonrpc: lsp.jsonRpcVersion,
+    );
+  }
+
+  void setUp() {
+    messageScheduler = MessageScheduler();
+  }
+
+  void test_addMultipleToQueue() {
+    messageScheduler.add(LegacyMessage(request: legacyRequest));
+    messageScheduler.add(LspMessage(message: lspRquest));
+    messageScheduler.add(dtdMessage);
+    _assertQueueContents(r'''
+incomingMessages
+  LegacyMessage
+    method: analysis.setAnalysisRoots
+  LspMessage
+    method: initialize
+  DtdMessage
+    method: initialize
+''');
+  }
+
+  void test_addSingleToQueue() {
+    messageScheduler.add(LspMessage(message: lspRquest));
+    _assertQueueContents(r'''
+incomingMessages
+  LspMessage
+    method: initialize
+''');
+  }
+
+  void _assertQueueContents(String expected) {
+    var actual = _getQueueContents(messageScheduler.incomingMessages);
+    if (actual != expected) {
+      print('-------- Actual --------');
+      print('$actual------------------------');
+    }
+    expect(actual, expected);
+  }
+
+  String _getQueueContents(ListQueue<MessageObject> queue) {
+    var buffer = StringBuffer();
+    var sink = TreeStringSink(sink: buffer, indent: '  ');
+    sink.writeln('incomingMessages');
+    while (queue.isNotEmpty) {
+      var message = queue.removeFirst();
+      switch (message) {
+        case DtdMessage():
+          sink.writelnWithIndent('DtdMessage');
+          sink.withIndent(() {
+            sink.writelnWithIndent(
+                'method: ${message.message.method.toString()}');
+          });
+        case LegacyMessage():
+          sink.writelnWithIndent('LegacyMessage');
+          sink.withIndent(() {
+            sink.writelnWithIndent(
+                'method: ${message.request.method.toString()}');
+          });
+        case LspMessage():
+          sink.writelnWithIndent('LspMessage');
+          var msg = message.message;
+          if (msg case lsp.RequestMessage()) {
+            sink.withIndent(() {
+              sink.writelnWithIndent('method: ${msg.method.toString()}');
+            });
+          }
+      }
+    }
+    return buffer.toString();
+  }
+}
diff --git a/pkg/analysis_server/test/integration/server/test_all.dart b/pkg/analysis_server/test/integration/server/test_all.dart
index d96283a..ea0a87e 100644
--- a/pkg/analysis_server/test/integration/server/test_all.dart
+++ b/pkg/analysis_server/test/integration/server/test_all.dart
@@ -7,6 +7,7 @@
 import 'blaze_changes_test.dart' as blaze_changes_test;
 import 'command_line_options_test.dart' as command_line_options_test;
 import 'get_version_test.dart' as get_version_test;
+import 'message_scheduler_test.dart' as scheduler_test;
 import 'set_subscriptions_invalid_service_test.dart'
     as set_subscriptions_invalid_service_test;
 import 'set_subscriptions_test.dart' as set_subscriptions_test;
@@ -18,6 +19,7 @@
     blaze_changes_test.main();
     command_line_options_test.main();
     get_version_test.main();
+    scheduler_test.main();
     set_subscriptions_test.main();
     set_subscriptions_invalid_service_test.main();
     shutdown_test.main();