Add a MessageScheduler queue that all incoming messages to the analysis server go through.
Change-Id: I3770b5a1a8faa391fb35a5a1e3d2678dbc4fb3f2
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/383703
Commit-Queue: Keerti Parthasarathy <keertip@google.com>
Reviewed-by: Brian Wilkerson <brianwilkerson@google.com>
diff --git a/pkg/analysis_server/lib/src/analysis_server.dart b/pkg/analysis_server/lib/src/analysis_server.dart
index d0178e3..067f441 100644
--- a/pkg/analysis_server/lib/src/analysis_server.dart
+++ b/pkg/analysis_server/lib/src/analysis_server.dart
@@ -26,6 +26,7 @@
import 'package:analysis_server/src/protocol_server.dart' as server;
import 'package:analysis_server/src/server/crash_reporting_attachments.dart';
import 'package:analysis_server/src/server/diagnostic_server.dart';
+import 'package:analysis_server/src/server/message_scheduler.dart';
import 'package:analysis_server/src/server/performance.dart';
import 'package:analysis_server/src/services/completion/completion_performance.dart';
import 'package:analysis_server/src/services/correction/assist_internal.dart';
@@ -250,6 +251,10 @@
/// A completer for [lspUninitialized].
final Completer<void> _lspUninitializedCompleter = Completer<void>();
+ /// A scheduler that keeps track of all incoming messages and schedules them
+ /// for processing.
+ final MessageScheduler messageScheduler;
+
AnalysisServer(
this.options,
this.sdkManager,
@@ -269,7 +274,9 @@
}) : resourceProvider = OverlayResourceProvider(baseResourceProvider),
pubApi = PubApi(instrumentationService, httpClient,
Platform.environment['PUB_HOSTED_URL']),
- producerGeneratorsForLintRules = AssistProcessor.computeLintRuleMap() {
+ producerGeneratorsForLintRules = AssistProcessor.computeLintRuleMap(),
+ messageScheduler = MessageScheduler() {
+ messageScheduler.setServer(this);
// Set the default URI converter. This uses the resource providers path
// context (unlike the initialized value) which allows tests to override it.
uriConverter = ClientUriConverter.noop(baseResourceProvider.pathContext);
diff --git a/pkg/analysis_server/lib/src/legacy_analysis_server.dart b/pkg/analysis_server/lib/src/legacy_analysis_server.dart
index 5dd6d05..1b7daf5 100644
--- a/pkg/analysis_server/lib/src/legacy_analysis_server.dart
+++ b/pkg/analysis_server/lib/src/legacy_analysis_server.dart
@@ -86,6 +86,7 @@
import 'package:analysis_server/src/server/diagnostic_server.dart';
import 'package:analysis_server/src/server/error_notifier.dart';
import 'package:analysis_server/src/server/features.dart';
+import 'package:analysis_server/src/server/message_scheduler.dart';
import 'package:analysis_server/src/server/performance.dart';
import 'package:analysis_server/src/server/sdk_configuration.dart';
import 'package:analysis_server/src/services/completion/completion_state.dart';
@@ -606,7 +607,8 @@
/// Handle a [request] that was read from the communication channel.
void handleRequestOrResponse(RequestOrResponse requestOrResponse) {
if (requestOrResponse is Request) {
- handleRequest(requestOrResponse);
+ messageScheduler.add(LegacyMessage(request: requestOrResponse));
+ messageScheduler.notify();
} else if (requestOrResponse is Response) {
handleResponse(requestOrResponse);
}
diff --git a/pkg/analysis_server/lib/src/lsp/lsp_analysis_server.dart b/pkg/analysis_server/lib/src/lsp/lsp_analysis_server.dart
index 9935437..52d392b 100644
--- a/pkg/analysis_server/lib/src/lsp/lsp_analysis_server.dart
+++ b/pkg/analysis_server/lib/src/lsp/lsp_analysis_server.dart
@@ -27,6 +27,7 @@
import 'package:analysis_server/src/server/detachable_filesystem_manager.dart';
import 'package:analysis_server/src/server/diagnostic_server.dart';
import 'package:analysis_server/src/server/error_notifier.dart';
+import 'package:analysis_server/src/server/message_scheduler.dart';
import 'package:analysis_server/src/server/performance.dart';
import 'package:analysis_server/src/services/user_prompts/dart_fix_prompt_manager.dart';
import 'package:analysis_server/src/utilities/extensions/flutter.dart';
@@ -191,7 +192,8 @@
analysisDriverScheduler.start();
_channelSubscription =
- channel.listen(handleMessage, onDone: done, onError: socketError);
+ channel.listen(scheduleMessage, onDone: done, onError: socketError);
+
if (AnalysisServer.supportsPlugins) {
_pluginChangeSubscription =
pluginManager.pluginsChanged.listen((_) => _onPluginsChanged());
@@ -757,6 +759,11 @@
}
}
+ void scheduleMessage(Message message) {
+ messageScheduler.add(LspMessage(message: message));
+ messageScheduler.notify();
+ }
+
void sendErrorResponse(Message message, ResponseError error) {
if (message is RequestMessage) {
sendResponse(ResponseMessage(
diff --git a/pkg/analysis_server/lib/src/server/message_scheduler.dart b/pkg/analysis_server/lib/src/server/message_scheduler.dart
new file mode 100644
index 0000000..7d9948b
--- /dev/null
+++ b/pkg/analysis_server/lib/src/server/message_scheduler.dart
@@ -0,0 +1,92 @@
+// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:collection';
+
+import 'package:analysis_server/lsp_protocol/protocol.dart' as lsp;
+import 'package:analysis_server/protocol/protocol.dart' as legacy;
+import 'package:analysis_server/src/analysis_server.dart';
+import 'package:analysis_server/src/legacy_analysis_server.dart';
+import 'package:analysis_server/src/lsp/lsp_analysis_server.dart';
+import 'package:analyzer/src/util/performance/operation_performance.dart';
+import 'package:meta/meta.dart';
+
+/// Represents a message from DTD (Dart Tooling Daemon).
+final class DtdMessage extends MessageObject {
+ final lsp.IncomingMessage message;
+ final Completer<Map<String, Object?>> completer;
+ final OperationPerformanceImpl performance;
+
+ DtdMessage(
+ {required this.message,
+ required this.completer,
+ required this.performance});
+}
+
+/// Represents a message in the Legacy protocol format.
+final class LegacyMessage extends MessageObject {
+ final legacy.Request request;
+
+ LegacyMessage({required this.request});
+}
+
+/// Represents a message in the LSP protocol format.
+final class LspMessage extends MessageObject {
+ final lsp.Message message;
+
+ LspMessage({required this.message});
+}
+
+/// Represents a message from a client, can be an IDE, DTD etc.
+sealed class MessageObject {}
+
+/// The [MessageScheduler] receives messages from all clients of the
+/// [AnalysisServer]. Clients can include IDE's (LSP and Legacy protocol), DTD,
+/// and the Diagnostic server. The [MessageSchedular] acts as a hub for all
+/// incoming messages and forwards the messages to the appropriate handlers.
+final class MessageScheduler {
+ /// The [AnalaysisServer] associated with the schedular.
+ late final AnalysisServer server;
+
+ /// A queue of incoming messages from all the clients of the [AnalysisServer].
+ final ListQueue<MessageObject> _incomingMessages = ListQueue<MessageObject>();
+
+ @visibleForTesting
+ ListQueue<MessageObject> get incomingMessages => _incomingMessages;
+
+ /// Add a message to the end of the incoming messages queue.
+ void add(MessageObject message) {
+ _incomingMessages.addLast(message);
+ }
+
+ /// Notify the [MessageSchedular] to process the messages queue.
+ void notify() async {
+ processMessages();
+ }
+
+ /// Dispatch the first message in the queue to be executed.
+ void processMessages() {
+ if (_incomingMessages.isEmpty) {
+ return;
+ }
+ var message = _incomingMessages.removeFirst();
+ switch (message) {
+ case LspMessage():
+ var lspMessage = message.message;
+ (server as LspAnalysisServer).handleMessage(lspMessage);
+ case LegacyMessage():
+ var request = message.request;
+ (server as LegacyAnalysisServer).handleRequest(request);
+ case DtdMessage():
+ server.dtd!.processMessage(
+ message.message, message.performance, message.completer);
+ }
+ }
+
+ /// Set the [AnalysisServer].
+ void setServer(AnalysisServer analysisServer) {
+ server = analysisServer;
+ }
+}
diff --git a/pkg/analysis_server/lib/src/services/dart_tooling_daemon/dtd_services.dart b/pkg/analysis_server/lib/src/services/dart_tooling_daemon/dtd_services.dart
index 425d832..b58af9b 100644
--- a/pkg/analysis_server/lib/src/services/dart_tooling_daemon/dtd_services.dart
+++ b/pkg/analysis_server/lib/src/services/dart_tooling_daemon/dtd_services.dart
@@ -6,10 +6,10 @@
import 'package:analysis_server/lsp_protocol/protocol.dart';
import 'package:analysis_server/src/analysis_server.dart';
-import 'package:analysis_server/src/lsp/client_capabilities.dart';
import 'package:analysis_server/src/lsp/error_or.dart';
import 'package:analysis_server/src/lsp/handlers/handler_states.dart';
import 'package:analysis_server/src/lsp/handlers/handlers.dart';
+import 'package:analysis_server/src/server/message_scheduler.dart';
import 'package:analysis_server/src/server/performance.dart';
import 'package:analyzer/src/util/performance/operation_performance.dart';
import 'package:dtd/dtd.dart';
@@ -63,6 +63,54 @@
DtdConnectionState get state => _state;
+ /// Executes the LSP handler [messageHandler] with [params] and returns the
+ /// results as a map to provide back to DTD.
+ ///
+ /// If the handler fails, throws an [RpcException] to be propagated to the
+ /// client.
+ void processMessage(
+ IncomingMessage message,
+ OperationPerformanceImpl performance,
+ Completer<Map<String, Object?>> completer) async {
+ // (TODO:keertip) Lookup the right handler, execute and return results.
+ // For now, complete with exception.
+ completer.completeError(RpcException(
+ ErrorCodes.InvalidRequest.toJson(),
+ 'DTD requests are not yet supported',
+ ));
+
+ // (TODO:keertip) Uncomment when lookup has been implemented
+ // var info = MessageInfo(
+ // performance: performance,
+ // // DTD clients requests are always executed with a fixed set of
+ // // capabilities so that the responses don't change in format based on the
+ // // owning editor.
+ // clientCapabilities: fixedBasicLspClientCapabilities,
+ // );
+ // var token = NotCancelableToken(); // We don't currently support cancel.
+
+ // // Execute the handler.
+ // var result = await messageHandler.handleMessage(message, info, token);
+
+ // // Map the result (or error) on to what a DTD handler needs to return.
+ // return result.map(
+ // // Map LSP errors on to equiv JSON-RPC errors for DTD.
+ // (error) => throw RpcException(
+ // error.code.toJson(),
+ // error.message,
+ // data: error.data,
+ // ),
+ // // DTD requires that all results are a Map and that they contain a
+ // // 'type' field. This differs slightly from LSP where we could return a
+ // // boolean (for example). This means we need to put the result in a
+ // // field, which we're calling 'result'.
+ // (result) => {
+ // 'type': result?.runtimeType.toString(),
+ // 'result': result,
+ // },
+ // );
+ }
+
/// Closes the connection to DTD and cleans up.
void _close([DtdConnectionState state = DtdConnectionState.Disconnected]) {
_state = state;
@@ -122,54 +170,26 @@
}
}
- /// Executes the LSP handler [messageHandler] with [params] and returns the
- /// results as a map to provide back to DTD.
- ///
- /// If the handler fails, throws an [RpcException] to be propagated to the
- /// client.
+ /// The incoming request is sent to the [MessageScheduler] for execution.
+ /// A completer is returned which will be completed with the result of the
+ /// execution of the request by the corresponding [MessageHandler].
Future<Map<String, Object?>> _executeLspHandler(
MessageHandler<Object?, Object?, AnalysisServer> messageHandler,
Parameters params,
OperationPerformanceImpl performance,
) async {
- // TODO(dantup): Currently the handler just runs immediately, but this
- // should interact with the scheduler in future.
-
// Map the incoming request into types we use for LSP request handling.
var message = IncomingMessage(
jsonrpc: jsonRpcVersion,
method: messageHandler.handlesMessage,
params: params.asMap,
);
- var info = MessageInfo(
- performance: performance,
- // DTD clients requests are always executed with a fixed set of
- // capabilities so that the responses don't change in format based on the
- // owning editor.
- clientCapabilities: fixedBasicLspClientCapabilities,
- );
- var token = NotCancelableToken(); // We don't currently support cancel.
-
- // Execute the handler.
- var result = await messageHandler.handleMessage(message, info, token);
-
- // Map the result (or error) on to what a DTD handler needs to return.
- return result.map(
- // Map LSP errors on to equiv JSON-RPC errors for DTD.
- (error) => throw RpcException(
- error.code.toJson(),
- error.message,
- data: error.data,
- ),
- // DTD requires that all results are a Map and that they contain a
- // 'type' field. This differs slightly from LSP where we could return a
- // boolean (for example). This means we need to put the result in a
- // field, which we're calling 'result'.
- (result) => {
- 'type': result?.runtimeType.toString(),
- 'result': result,
- },
- );
+ var scheduler = _server.messageScheduler;
+ var completer = Completer<Map<String, Object?>>();
+ scheduler.add(DtdMessage(
+ message: message, performance: performance, completer: completer));
+ scheduler.notify();
+ return completer.future;
}
/// Handles an unexpected error occurring on the DTD connection by logging and
diff --git a/pkg/analysis_server/test/integration/server/message_scheduler_test.dart b/pkg/analysis_server/test/integration/server/message_scheduler_test.dart
new file mode 100644
index 0000000..1df1d6d
--- /dev/null
+++ b/pkg/analysis_server/test/integration/server/message_scheduler_test.dart
@@ -0,0 +1,119 @@
+// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:collection';
+
+import 'package:analysis_server/lsp_protocol/protocol.dart' as lsp;
+import 'package:analysis_server/protocol/protocol.dart' as legacy;
+import 'package:analysis_server/protocol/protocol_constants.dart';
+import 'package:analysis_server/protocol/protocol_generated.dart';
+import 'package:analysis_server/src/server/message_scheduler.dart';
+import 'package:analyzer/src/util/performance/operation_performance.dart';
+import 'package:analyzer_utilities/testing/tree_string_sink.dart';
+import 'package:test/test.dart';
+import 'package:test_reflective_loader/test_reflective_loader.dart';
+
+void main() {
+ defineReflectiveSuite(() {
+ defineReflectiveTests(MessageSchedulerTest);
+ });
+}
+
+@reflectiveTest
+class MessageSchedulerTest {
+ late MessageScheduler messageScheduler;
+
+ DtdMessage get dtdMessage {
+ return DtdMessage(
+ message: lspRquest,
+ performance: OperationPerformanceImpl('<root>'),
+ completer: Completer());
+ }
+
+ legacy.Request get legacyRequest {
+ var params = AnalysisSetAnalysisRootsParams(['a', 'b', 'c'], ['d', 'e']);
+ return legacy.Request('1', ANALYSIS_REQUEST_SET_ANALYSIS_ROOTS,
+ params.toJson(clientUriConverter: null));
+ }
+
+ lsp.RequestMessage get lspRquest {
+ var params = {'processId': 'invalid'};
+ return lsp.RequestMessage(
+ id: lsp.Either2<int, String>.t1(1),
+ method: lsp.Method.initialize,
+ params: params,
+ jsonrpc: lsp.jsonRpcVersion,
+ );
+ }
+
+ void setUp() {
+ messageScheduler = MessageScheduler();
+ }
+
+ void test_addMultipleToQueue() {
+ messageScheduler.add(LegacyMessage(request: legacyRequest));
+ messageScheduler.add(LspMessage(message: lspRquest));
+ messageScheduler.add(dtdMessage);
+ _assertQueueContents(r'''
+incomingMessages
+ LegacyMessage
+ method: analysis.setAnalysisRoots
+ LspMessage
+ method: initialize
+ DtdMessage
+ method: initialize
+''');
+ }
+
+ void test_addSingleToQueue() {
+ messageScheduler.add(LspMessage(message: lspRquest));
+ _assertQueueContents(r'''
+incomingMessages
+ LspMessage
+ method: initialize
+''');
+ }
+
+ void _assertQueueContents(String expected) {
+ var actual = _getQueueContents(messageScheduler.incomingMessages);
+ if (actual != expected) {
+ print('-------- Actual --------');
+ print('$actual------------------------');
+ }
+ expect(actual, expected);
+ }
+
+ String _getQueueContents(ListQueue<MessageObject> queue) {
+ var buffer = StringBuffer();
+ var sink = TreeStringSink(sink: buffer, indent: ' ');
+ sink.writeln('incomingMessages');
+ while (queue.isNotEmpty) {
+ var message = queue.removeFirst();
+ switch (message) {
+ case DtdMessage():
+ sink.writelnWithIndent('DtdMessage');
+ sink.withIndent(() {
+ sink.writelnWithIndent(
+ 'method: ${message.message.method.toString()}');
+ });
+ case LegacyMessage():
+ sink.writelnWithIndent('LegacyMessage');
+ sink.withIndent(() {
+ sink.writelnWithIndent(
+ 'method: ${message.request.method.toString()}');
+ });
+ case LspMessage():
+ sink.writelnWithIndent('LspMessage');
+ var msg = message.message;
+ if (msg case lsp.RequestMessage()) {
+ sink.withIndent(() {
+ sink.writelnWithIndent('method: ${msg.method.toString()}');
+ });
+ }
+ }
+ }
+ return buffer.toString();
+ }
+}
diff --git a/pkg/analysis_server/test/integration/server/test_all.dart b/pkg/analysis_server/test/integration/server/test_all.dart
index d96283a..ea0a87e 100644
--- a/pkg/analysis_server/test/integration/server/test_all.dart
+++ b/pkg/analysis_server/test/integration/server/test_all.dart
@@ -7,6 +7,7 @@
import 'blaze_changes_test.dart' as blaze_changes_test;
import 'command_line_options_test.dart' as command_line_options_test;
import 'get_version_test.dart' as get_version_test;
+import 'message_scheduler_test.dart' as scheduler_test;
import 'set_subscriptions_invalid_service_test.dart'
as set_subscriptions_invalid_service_test;
import 'set_subscriptions_test.dart' as set_subscriptions_test;
@@ -18,6 +19,7 @@
blaze_changes_test.main();
command_line_options_test.main();
get_version_test.main();
+ scheduler_test.main();
set_subscriptions_test.main();
set_subscriptions_invalid_service_test.main();
shutdown_test.main();