blob: 8d3220e17b64a0413a750632458ea405697fb148 [file] [log] [blame]
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:collection';
import 'package:analysis_server/lsp_protocol/protocol.dart' as lsp;
import 'package:analysis_server/protocol/protocol.dart' as legacy;
import 'package:analysis_server/protocol/protocol_constants.dart' as legacy;
import 'package:analysis_server/protocol/protocol_generated.dart' as legacy;
import 'package:analysis_server/src/analysis_server.dart';
import 'package:analysis_server/src/legacy_analysis_server.dart';
import 'package:analysis_server/src/lsp/constants.dart';
import 'package:analysis_server/src/lsp/lsp_analysis_server.dart';
import 'package:analysis_server/src/scheduler/scheduled_message.dart';
import 'package:analysis_server/src/server/error_notifier.dart';
import 'package:analyzer/src/utilities/cancellation.dart';
import 'package:language_server_protocol/json_parsing.dart';
/// The [MessageScheduler] schedules messages received from the clients of the
/// [AnalysisServer].
///
/// Clients include IDE's (LSP and Legacy protocol), DTD, the Diagnostic server
/// and file watchers. The [MessageScheduler] acts as a hub for all incoming
/// messages and forwards the messages to the appropriate handlers.
final class MessageScheduler {
/// A flag to allow disabling overlapping message handlers.
static bool allowOverlappingHandlers = true;
/// A listener that can be used to watch the scheduler as it manages messages.
final MessageSchedulerListener? listener;
/// The [AnalysisServer] associated with the scheduler.
late final AnalysisServer server;
/// The messages that have been received and are waiting to be handled.
final ListQueue<ScheduledMessage> _pendingMessages =
ListQueue<ScheduledMessage>();
/// The messages that are currently being handled.
final ListQueue<ScheduledMessage> _activeMessages =
ListQueue<ScheduledMessage>();
/// Whether the [MessageScheduler] is currently processing messages.
bool _isProcessing = false;
/// The number of times [pause] has been called without matching [resume]s.
///
/// If zero, the queue is not paused.
int _pauseCount = 0;
/// The completer used to indicate that message handling has been completed.
Completer<void> _completer = Completer();
/// Initialize a newly created message scheduler.
///
/// The caller is expected to set the [server] immediately after creating the
/// instance. The only reason the server isn't initialized by the constructor
/// is because the analysis server and the message scheduler can't be created
/// atomically and it was decided that it was cleaner for the scheduler to
/// have the nullable reference to the server rather than the other way
/// around.
MessageScheduler({required this.listener});
/// Whether the queue is currently paused.
bool get isPaused => _pauseCount > 0;
/// Add the [message] to the end of the pending messages queue.
///
/// Some incoming messages are handled immediately rather than being added to
/// the queue. Specifically, those listed below.
///
/// LSP Messages
/// - Cancellation notifications are handled by the [MessageScheduler].
/// The current message and queued messages are looked through and the
/// [CancelableToken] for the request to be canceled is set.
/// - Document change notifications are used to cancel refactors, renames
/// (if any) that are currently in progress or on the queue for the document
/// that was changed.
/// - Incoming completion and refactor requests cancel out current and
/// queued requests of the same.
///
/// Legacy Messages
/// - Cancellation requests are sent immediately to the [LegacyAnalysisServer]
/// for processing.
///
/// LSP over Legacy
/// - The incoming [legacy.ANALYSIS_REQUEST_UPDATE_CONTENT] message cancels
/// any rename files request that is in progress.
void add(ScheduledMessage message) {
listener?.addPendingMessage(message);
if (message is LegacyMessage) {
var request = message.request;
var method = request.method;
if (method == legacy.SERVER_REQUEST_CANCEL_REQUEST) {
var id =
legacy.ServerCancelRequestParams.fromRequest(
request,
clientUriConverter: server.uriConverter,
).id;
listener?.addActiveMessage(message);
(server as LegacyAnalysisServer).cancelRequest(id);
// The message needs to be added to the queue of pending messages, but
// it seems like it shouldn't be necessary and that we ought to return
// at this point. However, doing so causes some tests to timeout.
} else if (method == legacy.ANALYSIS_REQUEST_UPDATE_CONTENT) {
for (var activeMessage in _activeMessages) {
if (activeMessage is LegacyMessage) {
var activeRequest = activeMessage.request;
if (activeRequest.method == legacy.LSP_REQUEST_HANDLE) {
var method = _getLspOverLegacyParams(activeRequest)?['method'];
if (method == lsp.Method.workspace_willRenameFiles.toString()) {
activeMessage.cancellationToken?.cancel(
code: lsp.ErrorCodes.ContentModified.toJson(),
reason: 'File content was modified',
);
listener?.cancelActiveMessage(activeMessage);
}
}
}
// TODO(brianwilkerson): Determine why it isn't appropriate to also
// cancel pending messages of the same kind.
}
}
} else if (message is LspMessage) {
var msg = message.message;
if (msg is lsp.ResponseMessage) {
// Responses don't go on the queue because there might be an active
// message that can't complete until the response is received. If the
// response was added to the queue then this process could deadlock.
listener?.addActiveMessage(message);
(server as LspAnalysisServer).handleMessage(msg, null);
return;
} else if (msg is lsp.NotificationMessage) {
var method = msg.method;
if (method == lsp.Method.cancelRequest) {
// Cancellations are handled immediately in order to minimize the
// amount of extra work that has to happen. Note that the request is
// canceled by setting the token to the 'cancelled' state rather than
// by removing the request from the queue. It's done this way to allow
// a response to be sent back to the client saying that the results
// aren't provided because the request was cancelled.
listener?.addActiveMessage(message);
_processCancellation(msg);
return;
} else if (method == lsp.Method.textDocument_didChange) {
// Document change notifications are _not_ handled immediately, but
// some active or pending requests can be cancelled before the normal
// processing is done.
_processDocumentChange(msg);
}
} else if (msg is lsp.RequestMessage) {
// Cancel in progress completion and refactoring requests.
var incomingMsgMethod = msg.method;
if (_isCancelableRequest(msg)) {
var reason =
incomingMsgMethod == lsp.Method.workspace_executeCommand
? 'Another workspace/executeCommand request for a refactor was started'
: 'Another textDocument/completion request was started';
for (var activeMessage in _activeMessages) {
if (activeMessage is LspMessage && activeMessage.isRequest) {
var message = activeMessage.message as lsp.RequestMessage;
if (message.method == incomingMsgMethod) {
activeMessage.cancellationToken?.cancel(reason: reason);
listener?.cancelActiveMessage(activeMessage);
}
}
}
for (var pendingMessage in _pendingMessages) {
if (pendingMessage is LspMessage && pendingMessage.isRequest) {
var message = pendingMessage.message as lsp.RequestMessage;
if (message.method == msg.method) {
pendingMessage.cancellationToken?.cancel(reason: reason);
listener?.cancelPendingMessage(pendingMessage);
}
}
}
}
}
}
_pendingMessages.addLast(message);
if (!_isProcessing) {
processMessages();
}
}
/// Pauses processing messages.
///
/// Any messages that are already being processed will continue until they
/// complete, but no new messages will be processed.
///
/// If this method is called multiple times, [resume] will need to be called
/// an equal number of times for processing to continue.
void pause() {
_pauseCount++;
listener?.pauseProcessingMessages(_pauseCount);
}
/// Dispatch the first message in the queue to be executed.
void processMessages() async {
if (isPaused) {
return;
}
_isProcessing = true;
listener?.startProcessingMessages();
try {
while (_pendingMessages.isNotEmpty && !isPaused) {
var currentMessage = _pendingMessages.removeFirst();
_activeMessages.addLast(currentMessage);
listener?.addActiveMessage(currentMessage);
_completer = Completer<void>();
unawaited(
_completer.future.then((_) {
_activeMessages.remove(currentMessage);
}),
);
switch (currentMessage) {
case LspMessage():
var lspMessage = currentMessage.message;
(server as LspAnalysisServer).handleMessage(
lspMessage,
cancellationToken: currentMessage.cancellationToken,
_completer,
);
case LegacyMessage():
var request = currentMessage.request;
(server as LegacyAnalysisServer).handleRequest(
request,
_completer,
currentMessage.cancellationToken,
);
case DtdMessage():
server.dtd!.processMessage(
currentMessage.message,
currentMessage.performance,
currentMessage.responseCompleter,
_completer,
);
case WatcherMessage():
server.contextManager.handleWatchEvent(currentMessage.event);
// Handling a watch event is a synchronous process, so there's
// nothing to wait for.
_completer.complete();
}
// Blocking here with an await on the future was intended to prevent
// unwanted interleaving but was found to cause a significant
// performance regression. For more context see:
// https://github.com/dart-lang/sdk/issues/60440. To re-disable
// interleaving, set [allowOverlappingHandlers] to `false`.
if (!allowOverlappingHandlers) {
await _completer.future;
}
// This message is not accurate if [allowOverlappingHandlers] is `true`
// because in that case we're not blocking anymore and the future might
// not be complete.
// TODO(pq): if not awaited, consider adding a `then` so we can track
// when the future completes. But note that we may see some flakiness in
// tests as message handling gets non-deterministically interleaved.
listener?.messageCompleted(currentMessage);
}
} catch (error, stackTrace) {
server.instrumentationService.logException(
FatalException('Failed to process message', error, stackTrace),
null,
server.crashReportingAttachmentsBuilder.forException(error),
);
}
_isProcessing = false;
listener?.endProcessingMessages();
}
/// Resumes processing messages.
void resume() {
if (!isPaused) {
throw StateError('Cannot resume if not paused');
}
_pauseCount--;
listener?.resumeProcessingMessages(_pauseCount);
if (!isPaused && !_isProcessing) {
// Process on the next tick so that the caller to resume() doesn't get
// messages in the queue attributed to their time (or run before they
// complete).
Future.delayed(Duration.zero, processMessages);
}
}
/// Returns the parameters of a cancellation [message].
lsp.CancelParams? _getCancelParams(lsp.NotificationMessage message) {
try {
var cancelJsonHandler = lsp.CancelParams.jsonHandler;
var reporter = LspJsonReporter('params');
var paramsJson = message.params as Map<String, Object?>?;
if (!cancelJsonHandler.validateParams(paramsJson, reporter)) {
return null;
}
return paramsJson != null
? cancelJsonHandler.convertParams(paramsJson)
: null;
} catch (error, stackTrace) {
(server as LspAnalysisServer).logException(
'An error occured while parsing cancel parameters',
error,
stackTrace,
);
}
return null;
}
lsp.DidChangeTextDocumentParams? _getChangeTextParams(
lsp.NotificationMessage message,
) {
var changeTextJsonHandler = lsp.DidChangeTextDocumentParams.jsonHandler;
var msg = message as lsp.IncomingMessage;
var reporter = LspJsonReporter('params');
var paramsJson = msg.params as Map<String, Object?>?;
if (!changeTextJsonHandler.validateParams(paramsJson, reporter)) {
return null;
}
return paramsJson != null
? changeTextJsonHandler.convertParams(paramsJson)
: null;
}
lsp.ExecuteCommandParams? _getCommandParams(lsp.RequestMessage message) {
var commandJsonHandler = lsp.ExecuteCommandParams.jsonHandler;
var reporter = LspJsonReporter('params');
var paramsJson = message.params as Map<String, Object?>?;
if (!commandJsonHandler.validateParams(paramsJson, reporter)) {
return null;
}
return paramsJson != null
? commandJsonHandler.convertParams(paramsJson)
: null;
}
Map<String, Object?>? _getLspOverLegacyParams(legacy.Request request) {
var params = legacy.LspHandleParams.fromRequest(
request,
clientUriConverter: server.uriConverter,
);
return params.lspMessage as Map<String, Object?>;
}
lsp.RenameParams? _getRenameParams(lsp.RequestMessage message) {
var jsonHandler = lsp.RenameParams.jsonHandler;
var reporter = LspJsonReporter('params');
var paramsJson = message.params as Map<String, Object?>?;
if (!jsonHandler.validateParams(paramsJson, reporter)) {
return null;
}
return paramsJson != null ? jsonHandler.convertParams(paramsJson) : null;
}
bool _isCancelableRequest(lsp.RequestMessage message) {
if (message.method == lsp.Method.textDocument_completion) {
return true;
}
if (message.method == lsp.Method.workspace_executeCommand) {
lsp.ExecuteCommandParams? params;
params = _getCommandParams(message);
if (params?.command == Commands.performRefactor) {
return true;
}
}
return false;
}
/// Process a cancellation notification.
///
/// The notification doesn't need to be placed on the pending messages queue
/// after it has been processed.
void _processCancellation(lsp.NotificationMessage msg) {
var params = _getCancelParams(msg);
if (params == null) {
return;
}
for (var activeMessage in _activeMessages) {
if (activeMessage is LspMessage && activeMessage.isRequest) {
var request = activeMessage.message as lsp.RequestMessage;
if (request.id == params.id) {
activeMessage.cancellationToken?.cancel();
listener?.cancelActiveMessage(activeMessage);
return;
}
}
}
for (var pendingMessage in _pendingMessages) {
if (pendingMessage is LspMessage && pendingMessage.isRequest) {
var request = pendingMessage.message as lsp.RequestMessage;
if (request.id == params.id) {
pendingMessage.cancellationToken?.cancel();
listener?.cancelPendingMessage(pendingMessage);
return;
}
}
}
}
/// Cancel the current refactor, if any, for the document changed.
/// Also check for any refactors in the queue.
///
/// The notification needs to be placed on the pending messages queue so that
/// the state of the document will be updated.
void _processDocumentChange(lsp.NotificationMessage msg) {
lsp.DidChangeTextDocumentParams? params;
params = _getChangeTextParams(msg);
if (params == null) {
return;
}
var documentChangeUri = params.textDocument.uri;
Uri? getRefactorUri(List<lsp.LSPAny?> args) {
// TODO(keertip): extract method in AbstractRefactorCommandHandler
// and use that instead.
String? path;
if (args.length == 6) {
path = args[1] as String?;
} else if (args.length == 1 && args[0] is Map<String, Object?>) {
path = (args.single as Map<String, Object?>)['path'] as String?;
}
return path != null ? Uri.file(path) : null;
}
void checkAndCancelRefactor(
LspMessage lspMessage, {
required bool isActive,
}) {
var request = lspMessage.message as lsp.RequestMessage;
var execParams = _getCommandParams(request);
if (execParams != null &&
execParams.command == Commands.performRefactor) {
var args = execParams.arguments ?? [];
var refactorUri = getRefactorUri(args);
if (refactorUri == documentChangeUri) {
lspMessage.cancellationToken?.cancel(
code: lsp.ErrorCodes.ContentModified.toJson(),
);
if (isActive) {
listener?.cancelActiveMessage(lspMessage);
} else {
listener?.cancelPendingMessage(lspMessage);
}
}
}
}
void checkAndCancelRename(LspMessage lspMessage, {required bool isActive}) {
var request = lspMessage.message as lsp.RequestMessage;
var renameParams = _getRenameParams(request);
if (renameParams != null) {
var renameUri = renameParams.textDocument.uri;
if (renameUri == documentChangeUri) {
lspMessage.cancellationToken?.cancel(
code: lsp.ErrorCodes.ContentModified.toJson(),
);
if (isActive) {
listener?.cancelActiveMessage(lspMessage);
} else {
listener?.cancelPendingMessage(lspMessage);
}
}
}
}
for (var activeMessage in _activeMessages) {
if (activeMessage is LspMessage && activeMessage.isRequest) {
var request = activeMessage.message as lsp.RequestMessage;
if (request.method == lsp.Method.workspace_executeCommand) {
checkAndCancelRefactor(activeMessage, isActive: true);
} else if (request.method == lsp.Method.textDocument_rename) {
checkAndCancelRename(activeMessage, isActive: true);
}
}
}
for (var pendingMessage in _pendingMessages) {
if (pendingMessage is LspMessage && pendingMessage.isRequest) {
var request = pendingMessage.message as lsp.RequestMessage;
if (request.method == lsp.Method.workspace_executeCommand) {
checkAndCancelRefactor(pendingMessage, isActive: false);
} else if (request.method == lsp.Method.textDocument_rename) {
checkAndCancelRename(pendingMessage, isActive: false);
}
}
}
}
}
abstract class MessageSchedulerListener {
/// Report that the [message] was added to the active message queue.
///
/// This implies that the message is no longer on the pending message queue.
void addActiveMessage(ScheduledMessage message);
/// Report that the [message] was added to the pending message queue.
///
/// This is always the first notification for the [message].
void addPendingMessage(ScheduledMessage message);
/// Report that an active [message] was cancelled.
void cancelActiveMessage(ScheduledMessage message);
/// Report that a pending [message] was cancelled.
void cancelPendingMessage(ScheduledMessage message);
/// Report that the loop that processes messages has stopped running.
void endProcessingMessages();
/// Report that the [message] has been completed.
///
/// This implies that the message was active and wasn't cancelled.
void messageCompleted(ScheduledMessage message);
/// Report that the pause counter was increased to [newPauseCount], and that
/// processing will be paused.
void pauseProcessingMessages(int newPauseCount);
/// Report that the pause counter was decreased to [newPauseCount] which, if
/// zero, indicates processing will resume.
void resumeProcessingMessages(int newPauseCount);
/// Report that the loop that processes messages has started to run.
void startProcessingMessages();
}