[analysis_server] Move support for locking requests to the scheduler
This removes the original `lockRequestsWhile` functionality of the LSP server which worked by pausing reading from stdin to instead pause the processing of messages in the scheduler.
Change-Id: I737e977adee11fe6fc70f44185cfaa22c22805da
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/432342
Reviewed-by: Brian Wilkerson <brianwilkerson@google.com>
Reviewed-by: Keerti Parthasarathy <keertip@google.com>
Commit-Queue: Brian Wilkerson <brianwilkerson@google.com>
diff --git a/pkg/analysis_server/lib/src/analysis_server.dart b/pkg/analysis_server/lib/src/analysis_server.dart
index 7e5cc4e..88ea29a 100644
--- a/pkg/analysis_server/lib/src/analysis_server.dart
+++ b/pkg/analysis_server/lib/src/analysis_server.dart
@@ -273,6 +273,13 @@
/// A [TimingByteStore] that records timings for reads from the byte store.
TimingByteStore? _timingByteStore;
+ /// Whether notifications caused by analysis should be suppressed.
+ ///
+ /// This is used when an operation is temporarily modifying overlays and does
+ /// not want the client to be notified of any analysis happening on the
+ /// temporary content.
+ bool suppressAnalysisResults = false;
+
AnalysisServer(
this.options,
this.sdkManager,
@@ -884,6 +891,39 @@
/// given [path] was changed - added, updated, or removed.
void notifyFlutterWidgetDescriptions(String path) {}
+ /// Prevents the scheduler from processing new messages until [operation]
+ /// completes.
+ ///
+ /// This can be used to obtain analysis results/resolved units consistent with
+ /// the state of a file at the time this method was called, preventing
+ /// changes by incoming file modifications.
+ ///
+ /// The contents of [operation] should be kept as short as possible and since
+ /// cancellation requests will also be blocked for the duration of this
+ /// operation, handlers should generally check the cancellation flag
+ /// immediately after this function returns.
+ Future<T> pauseSchedulerWhile<T>(FutureOr<T> Function() operation) async {
+ // TODO(dantup): Prevent this method from locking responses from the client
+ // because this can lead to deadlocks if called during initialization where
+ // the server may wait for something (configuration) from the client. This
+ // might fit in with potential upcoming scheduler changes.
+ //
+ // This is currently used by Completion+FixAll (which are less likely, but
+ // possible to be called during init).
+ //
+ // https://github.com/dart-lang/sdk/issues/56311#issuecomment-2250089185
+
+ messageScheduler.pause();
+ try {
+ // `await` here is important to ensure `finally` doesn't execute until
+ // `operation()` completes (`whenComplete` is not available on
+ // `FutureOr`).
+ return await operation();
+ } finally {
+ messageScheduler.resume();
+ }
+ }
+
/// Read all files, resolve all URIs, and perform required analysis in
/// all current analysis drivers.
Future<void> reanalyze() async {
@@ -1087,6 +1127,10 @@
@override
@mustCallSuper
void handleFileResult(FileResult result) {
+ if (analysisServer.suppressAnalysisResults) {
+ return;
+ }
+
var path = result.path;
filesToFlush.add(path);
diff --git a/pkg/analysis_server/lib/src/lsp/handlers/commands/fix_all.dart b/pkg/analysis_server/lib/src/lsp/handlers/commands/fix_all.dart
index 2483a69..0c49abd 100644
--- a/pkg/analysis_server/lib/src/lsp/handlers/commands/fix_all.dart
+++ b/pkg/analysis_server/lib/src/lsp/handlers/commands/fix_all.dart
@@ -5,6 +5,7 @@
import 'dart:async';
import 'package:analysis_server/lsp_protocol/protocol.dart';
+import 'package:analysis_server/src/analysis_server.dart';
import 'package:analysis_server/src/lsp/constants.dart';
import 'package:analysis_server/src/lsp/error_or.dart';
import 'package:analysis_server/src/lsp/handlers/commands/simple_edit_handler.dart';
@@ -89,14 +90,14 @@
/// Computes edits for iterative fix-all using temporary overlays.
class _FixAllOperation extends TemporaryOverlayOperation
- with HandlerHelperMixin<LspAnalysisServer> {
+ with HandlerHelperMixin<AnalysisServer> {
final MessageInfo message;
final CancellationToken cancellationToken;
final String path;
final bool autoTriggered;
_FixAllOperation({
- required LspAnalysisServer server,
+ required AnalysisServer server,
required this.message,
required this.path,
required this.cancellationToken,
@@ -104,7 +105,7 @@
}) : super(server);
Future<ErrorOr<WorkspaceEdit?>> computeEdits() async {
- return await lockRequestsWithTemporaryOverlays(_computeEditsImpl);
+ return await pauseSchedulerWithTemporaryOverlays(_computeEditsImpl);
}
Future<ErrorOr<WorkspaceEdit?>> _computeEditsImpl() async {
diff --git a/pkg/analysis_server/lib/src/lsp/handlers/handler_completion.dart b/pkg/analysis_server/lib/src/lsp/handlers/handler_completion.dart
index 85d05f5..c568e51 100644
--- a/pkg/analysis_server/lib/src/lsp/handlers/handler_completion.dart
+++ b/pkg/analysis_server/lib/src/lsp/handlers/handler_completion.dart
@@ -119,7 +119,7 @@
// unit and LineInfo.
late ErrorOr<LineInfo> lineInfo;
late ErrorOr<ResolvedUnitResult> unit;
- await server.lockRequestsWhile(() async {
+ await server.pauseSchedulerWhile(() async {
unit = await path.mapResult(requireResolvedUnit);
lineInfo = await unit.map(
// If we don't have a unit, we can still try to obtain the line info from
diff --git a/pkg/analysis_server/lib/src/lsp/lsp_analysis_server.dart b/pkg/analysis_server/lib/src/lsp/lsp_analysis_server.dart
index baef2a9..17231cc 100644
--- a/pkg/analysis_server/lib/src/lsp/lsp_analysis_server.dart
+++ b/pkg/analysis_server/lib/src/lsp/lsp_analysis_server.dart
@@ -106,9 +106,6 @@
@visibleForTesting
int contextBuilds = 0;
- /// The subscription to the stream of incoming messages from the client.
- late final StreamSubscription<void> _channelSubscription;
-
/// An optional manager to handle file systems which may not always be
/// available.
final DetachableFileSystemManager? detachableFileSystemManager;
@@ -117,13 +114,6 @@
/// `sendStatusNotification` was invoked.
bool wasAnalyzing = false;
- /// Whether notifications caused by analysis should be suppressed.
- ///
- /// This is used when an operation is temporarily modifying overlays and does
- /// not want the client to be notified of any analysis happening on the
- /// temporary content.
- bool suppressAnalysisResults = false;
-
/// Tracks files that have non-empty diagnostics on the client.
///
/// This is an optimization to avoid sending empty diagnostics when they are
@@ -181,11 +171,7 @@
.listen(handleAnalysisEvent);
analysisDriverScheduler.start();
- _channelSubscription = channel.listen(
- scheduleMessage,
- onDone: done,
- onError: socketError,
- );
+ channel.listen(scheduleMessage, onDone: done, onError: socketError);
if (AnalysisServer.supportsPlugins) {
_pluginChangeSubscription = pluginManager.pluginsChanged.listen(
@@ -572,45 +558,6 @@
}, socketError);
}
- /// Locks the server from processing incoming messages until [operation]
- /// completes.
- ///
- /// This can be used to obtain analysis results/resolved units consistent with
- /// the state of a file at the time this method was called, preventing
- /// changes by incoming file modifications.
- ///
- /// The contents of [operation] should be kept as short as possible and since
- /// cancellation requests will also be blocked for the duration of this
- /// operation, handles should generally check the cancellation flag
- /// immediately after this function returns.
- Future<T> lockRequestsWhile<T>(FutureOr<T> Function() operation) async {
- // TODO(dantup): Prevent this method from locking responses from the client
- // because this can lead to deadlocks if called during initialization where
- // the server may wait for something (configuration) from the client. This
- // might fit in with potential upcoming scheduler changes.
- //
- // This is currently used by Completion+FixAll (which are less likely, but
- // possible to be called during init).
- //
- // https://github.com/dart-lang/sdk/issues/56311#issuecomment-2250089185
- var completer = Completer<void>();
-
- // Pause handling incoming messages until `operation` completes.
- //
- // If this method is called multiple times, the pauses will stack, meaning
- // the subscription will not resume until all operations complete.
- _channelSubscription.pause(completer.future);
-
- try {
- // `await` here is important to ensure `finally` doesn't execute until
- // `operation()` completes (`whenComplete` is not available on
- // `FutureOr`).
- return await operation();
- } finally {
- completer.complete();
- }
- }
-
/// Logs the error on the client using window/logMessage.
void logErrorToClient(String message) {
channel.sendNotification(
@@ -1305,15 +1252,6 @@
}
@override
- void handleFileResult(FileResult result) {
- if (analysisServer.suppressAnalysisResults) {
- return;
- }
-
- super.handleFileResult(result);
- }
-
- @override
void handleResolvedUnitResult(ResolvedUnitResult result) {
var path = result.path;
diff --git a/pkg/analysis_server/lib/src/lsp/temporary_overlay_operation.dart b/pkg/analysis_server/lib/src/lsp/temporary_overlay_operation.dart
index 811ee93..2e00f4a 100644
--- a/pkg/analysis_server/lib/src/lsp/temporary_overlay_operation.dart
+++ b/pkg/analysis_server/lib/src/lsp/temporary_overlay_operation.dart
@@ -4,8 +4,8 @@
import 'dart:async';
+import 'package:analysis_server/src/analysis_server.dart';
import 'package:analysis_server/src/context_manager.dart';
-import 'package:analysis_server/src/lsp/lsp_analysis_server.dart';
import 'package:analysis_server/src/protocol_server.dart';
import 'package:analyzer/dart/analysis/analysis_context.dart';
import 'package:analyzer/file_system/overlay_file_system.dart';
@@ -18,7 +18,7 @@
/// need to be merged together (to be mappable to LSP document changes) and then
/// reverted to allow the client to apply the change.
abstract class TemporaryOverlayOperation {
- final LspAnalysisServer server;
+ final AnalysisServer server;
final ContextManager contextManager;
final OverlayResourceProvider resourceProvider;
@@ -78,16 +78,16 @@
}
/// Locks the server from processing incoming messages until [operation]
- /// completes just like [LspAnalysisServer.lockRequestsWhile] but
+ /// completes just like [AnalysisServer.pauseSchedulerWhile] but
/// additionally provides a function for writing temporary overlays that will
/// be reverted when the operation completes.
///
/// Additionally, sending diagnostics, outlines, etc. are suppressed by the
/// temporary overlays and re-enabled after the overlays are restored.
- Future<T> lockRequestsWithTemporaryOverlays<T>(
+ Future<T> pauseSchedulerWithTemporaryOverlays<T>(
Future<T> Function() operation,
) {
- return server.lockRequestsWhile(() async {
+ return server.pauseSchedulerWhile(() async {
// Wait for any in-progress analysis to complete before we start
// suppressing analysis results.
server.contextManager.pauseWatchers();
diff --git a/pkg/analysis_server/lib/src/scheduler/message_scheduler.dart b/pkg/analysis_server/lib/src/scheduler/message_scheduler.dart
index 7024f24..8d3220e 100644
--- a/pkg/analysis_server/lib/src/scheduler/message_scheduler.dart
+++ b/pkg/analysis_server/lib/src/scheduler/message_scheduler.dart
@@ -45,6 +45,11 @@
/// Whether the [MessageScheduler] is currently processing messages.
bool _isProcessing = false;
+ /// The number of times [pause] has been called without matching [resume]s.
+ ///
+ /// If zero, the queue is not paused.
+ int _pauseCount = 0;
+
/// The completer used to indicate that message handling has been completed.
Completer<void> _completer = Completer();
@@ -58,6 +63,9 @@
/// around.
MessageScheduler({required this.listener});
+ /// Whether the queue is currently paused.
+ bool get isPaused => _pauseCount > 0;
+
/// Add the [message] to the end of the pending messages queue.
///
/// Some incoming messages are handled immediately rather than being added to
@@ -177,12 +185,28 @@
}
}
+ /// Pauses processing messages.
+ ///
+ /// Any messages that are already being processed will continue until they
+ /// complete, but no new messages will be processed.
+ ///
+ /// If this method is called multiple times, [resume] will need to be called
+ /// an equal number of times for processing to continue.
+ void pause() {
+ _pauseCount++;
+ listener?.pauseProcessingMessages(_pauseCount);
+ }
+
/// Dispatch the first message in the queue to be executed.
void processMessages() async {
+ if (isPaused) {
+ return;
+ }
+
_isProcessing = true;
listener?.startProcessingMessages();
try {
- while (_pendingMessages.isNotEmpty) {
+ while (_pendingMessages.isNotEmpty && !isPaused) {
var currentMessage = _pendingMessages.removeFirst();
_activeMessages.addLast(currentMessage);
listener?.addActiveMessage(currentMessage);
@@ -249,6 +273,21 @@
listener?.endProcessingMessages();
}
+ /// Resumes processing messages.
+ void resume() {
+ if (!isPaused) {
+ throw StateError('Cannot resume if not paused');
+ }
+ _pauseCount--;
+ listener?.resumeProcessingMessages(_pauseCount);
+ if (!isPaused && !_isProcessing) {
+ // Process on the next tick so that the caller to resume() doesn't get
+ // messages in the queue attributed to their time (or run before they
+ // complete).
+ Future.delayed(Duration.zero, processMessages);
+ }
+ }
+
/// Returns the parameters of a cancellation [message].
lsp.CancelParams? _getCancelParams(lsp.NotificationMessage message) {
try {
@@ -477,6 +516,14 @@
/// This implies that the message was active and wasn't cancelled.
void messageCompleted(ScheduledMessage message);
+ /// Report that the pause counter was increased to [newPauseCount], and that
+ /// processing will be paused.
+ void pauseProcessingMessages(int newPauseCount);
+
+ /// Report that the pause counter was decreased to [newPauseCount] which, if
+ /// zero, indicates processing will resume.
+ void resumeProcessingMessages(int newPauseCount);
+
/// Report that the loop that processes messages has started to run.
void startProcessingMessages();
}
diff --git a/pkg/analysis_server/lib/src/scheduler/scheduler_tracking_listener.dart b/pkg/analysis_server/lib/src/scheduler/scheduler_tracking_listener.dart
index bfcfc0b..d467bf2 100644
--- a/pkg/analysis_server/lib/src/scheduler/scheduler_tracking_listener.dart
+++ b/pkg/analysis_server/lib/src/scheduler/scheduler_tracking_listener.dart
@@ -113,23 +113,26 @@
@override
void cancelActiveMessage(ScheduledMessage message) {
- var messageData = _messageDataMap.remove(message);
+ var messageData = _messageDataMap[message];
if (messageData == null) {
return;
}
messageData.completeTime = _now;
messageData.wasCancelled = true;
- _activeMessageCount--;
- _reportMessageData(messageData);
+ // Don't decrement counts or report message data yet because
+ // cancelled messages still complete and call messageCompleted().
}
@override
void cancelPendingMessage(ScheduledMessage message) {
- var messageData = _messageDataMap.remove(message)!;
+ var messageData = _messageDataMap[message];
+ if (messageData == null) {
+ return;
+ }
messageData.completeTime = _now;
messageData.wasCancelled = true;
- _pendingMessageCount--;
- _reportMessageData(messageData);
+ // Don't decrement counts or report message data yet because
+ // cancelled messages still complete and call messageCompleted().
}
/// Report that the loop that processes messages has stopped running.
@@ -149,6 +152,16 @@
}
@override
+ void pauseProcessingMessages(int newPauseCount) {
+ // TODO(dantup): Consider tracking the pause start time if newPauseCount=1.
+ }
+
+ @override
+ void resumeProcessingMessages(int newPauseCount) {
+ // TODO(dantup): Consider recording the pause duration if newPauseCount=0.
+ }
+
+ @override
void startProcessingMessages() {
processingStartTime = _now;
// var idleDuration = processingStartTime - processingEndTime;
diff --git a/pkg/analysis_server/test/integration/server/message_scheduler_test.dart b/pkg/analysis_server/test/integration/server/message_scheduler_test.dart
index d4d9966..b30c4f8 100644
--- a/pkg/analysis_server/test/integration/server/message_scheduler_test.dart
+++ b/pkg/analysis_server/test/integration/server/message_scheduler_test.dart
@@ -295,6 +295,88 @@
''');
}
+ Future<void> test_pauseResume() async {
+ // Content isn't important, we just need a valid file to send requests for.
+ const content = '';
+ newFile(mainFilePath, content);
+
+ await initialize();
+ var futures = <Future<void>>[];
+
+ /// Helper to send two hover requests and pump the event queue, but not wait
+ /// for the requests to complete.
+ Future<void> sendHovers() async {
+ futures.add(getHover(mainFileUri, Position(line: 0, character: 0)));
+ futures.add(getHover(mainFileUri, Position(line: 0, character: 0)));
+ await pumpEventQueue(times: 5000);
+ }
+
+ /// Helper to resume the scheduler and pump the event queue to allow time
+ /// for processing to ensure the logs are in a consistent order.
+ Future<void> resume() async {
+ messageScheduler.resume();
+ await pumpEventQueue(times: 5000);
+ }
+
+ await sendHovers();
+ messageScheduler.pause(); // Pause 1
+ await sendHovers();
+ messageScheduler.pause(); // Pause 2
+ await sendHovers();
+ await resume(); // Resume 1
+ await sendHovers();
+ await resume(); // Resume 2
+
+ await Future.wait(futures);
+
+ _assertLogContents(testView!, r'''
+Incoming RequestMessage: lsp:initialize
+Entering process messages loop
+ Start LspMessage: lsp:initialize
+ Complete LspMessage: lsp:initialize
+Exit process messages loop
+Incoming NotificationMessage: lsp:initialized
+Entering process messages loop
+ Start LspMessage: lsp:initialized
+ Complete LspMessage: lsp:initialized
+Exit process messages loop
+Incoming RequestMessage: lsp:textDocument/hover
+Entering process messages loop
+ Start LspMessage: lsp:textDocument/hover
+ Complete LspMessage: lsp:textDocument/hover
+Exit process messages loop
+Incoming RequestMessage: lsp:textDocument/hover
+Entering process messages loop
+ Start LspMessage: lsp:textDocument/hover
+ Complete LspMessage: lsp:textDocument/hover
+Exit process messages loop
+Pause requested - there are now 1 pauses
+Incoming RequestMessage: lsp:textDocument/hover
+Incoming RequestMessage: lsp:textDocument/hover
+Pause requested - there are now 2 pauses
+Incoming RequestMessage: lsp:textDocument/hover
+Incoming RequestMessage: lsp:textDocument/hover
+Resume requested - there are now 1 pauses
+Incoming RequestMessage: lsp:textDocument/hover
+Incoming RequestMessage: lsp:textDocument/hover
+Resume requested - there are now 0 pauses
+Entering process messages loop
+ Start LspMessage: lsp:textDocument/hover
+ Complete LspMessage: lsp:textDocument/hover
+ Start LspMessage: lsp:textDocument/hover
+ Complete LspMessage: lsp:textDocument/hover
+ Start LspMessage: lsp:textDocument/hover
+ Complete LspMessage: lsp:textDocument/hover
+ Start LspMessage: lsp:textDocument/hover
+ Complete LspMessage: lsp:textDocument/hover
+ Start LspMessage: lsp:textDocument/hover
+ Complete LspMessage: lsp:textDocument/hover
+ Start LspMessage: lsp:textDocument/hover
+ Complete LspMessage: lsp:textDocument/hover
+Exit process messages loop
+''');
+ }
+
Future<void> test_response() async {
if (MessageScheduler.allowOverlappingHandlers) return;
diff --git a/pkg/analysis_server/test/lsp/code_actions_source_test.dart b/pkg/analysis_server/test/lsp/code_actions_source_test.dart
index f92ff04..ca1dbc0 100644
--- a/pkg/analysis_server/test/lsp/code_actions_source_test.dart
+++ b/pkg/analysis_server/test/lsp/code_actions_source_test.dart
@@ -162,10 +162,11 @@
Method.workspace_applyEdit,
ApplyWorkspaceEditParams.fromJson,
() async {
- // Apply the command and immediately modify a file afterwards.
+ // Apply the command and immediately modify a file afterwards without
+ // awaiting.
var commandFuture = executeCommand(command);
- await commandFuture;
await replaceFile(12345, testFileUri, 'client-modified-content');
+
return commandFuture;
},
handler: (edit) {
diff --git a/pkg/analysis_server/test/lsp/completion_dart_test.dart b/pkg/analysis_server/test/lsp/completion_dart_test.dart
index 566a16a..d432b89 100644
--- a/pkg/analysis_server/test/lsp/completion_dart_test.dart
+++ b/pkg/analysis_server/test/lsp/completion_dart_test.dart
@@ -1840,30 +1840,35 @@
getCompletion(mainFileUri, position),
];
- // Ensure all requests started, then let them continue.
- await pumpEventQueue(times: 5000);
- completer.complete();
+ var expectationFutures = [
+ expectLater(
+ responseFutures[0],
+ throwsA(
+ isResponseError(
+ ErrorCodes.RequestCancelled,
+ message: 'Another textDocument/completion request was started',
+ ),
+ ),
+ ),
+ expectLater(
+ responseFutures[1],
+ throwsA(
+ isResponseError(
+ ErrorCodes.RequestCancelled,
+ message: 'Another textDocument/completion request was started',
+ ),
+ ),
+ ),
+ expectLater(responseFutures[2], completion(isNotEmpty)),
+ ];
- expect(
- responseFutures[0],
- throwsA(
- isResponseError(
- ErrorCodes.RequestCancelled,
- message: 'Another textDocument/completion request was started',
- ),
- ),
- );
- expect(
- responseFutures[1],
- throwsA(
- isResponseError(
- ErrorCodes.RequestCancelled,
- message: 'Another textDocument/completion request was started',
- ),
- ),
- );
- var results = await responseFutures[2];
- expect(results, isNotEmpty);
+ // Ensure all requests started, then let them continue. This must be done
+ // after the expectations are set up above, because otherwise if the
+ // exceptions occur too quickly, they will be unhandled (whereas the
+ // expectations attach error handlers to them).
+ await pumpEventQueue(times: 50000);
+ completer.complete();
+ await Future.wait(expectationFutures);
} finally {
CompletionHandler.delayAfterResolveForTests = null;
}
diff --git a/pkg/analysis_server/test/lsp/temporary_overlay_operation_test.dart b/pkg/analysis_server/test/lsp/temporary_overlay_operation_test.dart
index d427408..97386bb 100644
--- a/pkg/analysis_server/test/lsp/temporary_overlay_operation_test.dart
+++ b/pkg/analysis_server/test/lsp/temporary_overlay_operation_test.dart
@@ -127,5 +127,5 @@
_TestTemporaryOverlayOperation(super.server, this.operation);
- Future<void> doWork() => lockRequestsWithTemporaryOverlays(operation);
+ Future<void> doWork() => pauseSchedulerWithTemporaryOverlays(operation);
}
diff --git a/pkg/analysis_server/test/utils/message_scheduler_test_view.dart b/pkg/analysis_server/test/utils/message_scheduler_test_view.dart
index 4bc8b01..0b92aab 100644
--- a/pkg/analysis_server/test/utils/message_scheduler_test_view.dart
+++ b/pkg/analysis_server/test/utils/message_scheduler_test_view.dart
@@ -44,6 +44,16 @@
}
@override
+ void pauseProcessingMessages(int newPauseCount) {
+ messageLog.add('Pause requested - there are now $newPauseCount pauses');
+ }
+
+ @override
+ void resumeProcessingMessages(int newPauseCount) {
+ messageLog.add('Resume requested - there are now $newPauseCount pauses');
+ }
+
+ @override
void startProcessingMessages() {
messageLog.add('Entering process messages loop');
}