[analysis_server] Move support for locking requests to the scheduler

This removes the original `lockRequestsWhile` functionality of the LSP server which worked by pausing reading from stdin to instead pause the processing of messages in the scheduler.

Change-Id: I737e977adee11fe6fc70f44185cfaa22c22805da
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/432342
Reviewed-by: Brian Wilkerson <brianwilkerson@google.com>
Reviewed-by: Keerti Parthasarathy <keertip@google.com>
Commit-Queue: Brian Wilkerson <brianwilkerson@google.com>
diff --git a/pkg/analysis_server/lib/src/analysis_server.dart b/pkg/analysis_server/lib/src/analysis_server.dart
index 7e5cc4e..88ea29a 100644
--- a/pkg/analysis_server/lib/src/analysis_server.dart
+++ b/pkg/analysis_server/lib/src/analysis_server.dart
@@ -273,6 +273,13 @@
   /// A [TimingByteStore] that records timings for reads from the byte store.
   TimingByteStore? _timingByteStore;
 
+  /// Whether notifications caused by analysis should be suppressed.
+  ///
+  /// This is used when an operation is temporarily modifying overlays and does
+  /// not want the client to be notified of any analysis happening on the
+  /// temporary content.
+  bool suppressAnalysisResults = false;
+
   AnalysisServer(
     this.options,
     this.sdkManager,
@@ -884,6 +891,39 @@
   /// given [path] was changed - added, updated, or removed.
   void notifyFlutterWidgetDescriptions(String path) {}
 
+  /// Prevents the scheduler from processing new messages until [operation]
+  /// completes.
+  ///
+  /// This can be used to obtain analysis results/resolved units consistent with
+  /// the state of a file at the time this method was called, preventing
+  /// changes by incoming file modifications.
+  ///
+  /// The contents of [operation] should be kept as short as possible and since
+  /// cancellation requests will also be blocked for the duration of this
+  /// operation, handlers should generally check the cancellation flag
+  /// immediately after this function returns.
+  Future<T> pauseSchedulerWhile<T>(FutureOr<T> Function() operation) async {
+    // TODO(dantup): Prevent this method from locking responses from the client
+    //  because this can lead to deadlocks if called during initialization where
+    //  the server may wait for something (configuration) from the client. This
+    //  might fit in with potential upcoming scheduler changes.
+    //
+    // This is currently used by Completion+FixAll (which are less likely, but
+    // possible to be called during init).
+    //
+    // https://github.com/dart-lang/sdk/issues/56311#issuecomment-2250089185
+
+    messageScheduler.pause();
+    try {
+      // `await` here is important to ensure `finally` doesn't execute until
+      // `operation()` completes (`whenComplete` is not available on
+      // `FutureOr`).
+      return await operation();
+    } finally {
+      messageScheduler.resume();
+    }
+  }
+
   /// Read all files, resolve all URIs, and perform required analysis in
   /// all current analysis drivers.
   Future<void> reanalyze() async {
@@ -1087,6 +1127,10 @@
   @override
   @mustCallSuper
   void handleFileResult(FileResult result) {
+    if (analysisServer.suppressAnalysisResults) {
+      return;
+    }
+
     var path = result.path;
     filesToFlush.add(path);
 
diff --git a/pkg/analysis_server/lib/src/lsp/handlers/commands/fix_all.dart b/pkg/analysis_server/lib/src/lsp/handlers/commands/fix_all.dart
index 2483a69..0c49abd 100644
--- a/pkg/analysis_server/lib/src/lsp/handlers/commands/fix_all.dart
+++ b/pkg/analysis_server/lib/src/lsp/handlers/commands/fix_all.dart
@@ -5,6 +5,7 @@
 import 'dart:async';
 
 import 'package:analysis_server/lsp_protocol/protocol.dart';
+import 'package:analysis_server/src/analysis_server.dart';
 import 'package:analysis_server/src/lsp/constants.dart';
 import 'package:analysis_server/src/lsp/error_or.dart';
 import 'package:analysis_server/src/lsp/handlers/commands/simple_edit_handler.dart';
@@ -89,14 +90,14 @@
 
 /// Computes edits for iterative fix-all using temporary overlays.
 class _FixAllOperation extends TemporaryOverlayOperation
-    with HandlerHelperMixin<LspAnalysisServer> {
+    with HandlerHelperMixin<AnalysisServer> {
   final MessageInfo message;
   final CancellationToken cancellationToken;
   final String path;
   final bool autoTriggered;
 
   _FixAllOperation({
-    required LspAnalysisServer server,
+    required AnalysisServer server,
     required this.message,
     required this.path,
     required this.cancellationToken,
@@ -104,7 +105,7 @@
   }) : super(server);
 
   Future<ErrorOr<WorkspaceEdit?>> computeEdits() async {
-    return await lockRequestsWithTemporaryOverlays(_computeEditsImpl);
+    return await pauseSchedulerWithTemporaryOverlays(_computeEditsImpl);
   }
 
   Future<ErrorOr<WorkspaceEdit?>> _computeEditsImpl() async {
diff --git a/pkg/analysis_server/lib/src/lsp/handlers/handler_completion.dart b/pkg/analysis_server/lib/src/lsp/handlers/handler_completion.dart
index 85d05f5..c568e51 100644
--- a/pkg/analysis_server/lib/src/lsp/handlers/handler_completion.dart
+++ b/pkg/analysis_server/lib/src/lsp/handlers/handler_completion.dart
@@ -119,7 +119,7 @@
     // unit and LineInfo.
     late ErrorOr<LineInfo> lineInfo;
     late ErrorOr<ResolvedUnitResult> unit;
-    await server.lockRequestsWhile(() async {
+    await server.pauseSchedulerWhile(() async {
       unit = await path.mapResult(requireResolvedUnit);
       lineInfo = await unit.map(
         // If we don't have a unit, we can still try to obtain the line info from
diff --git a/pkg/analysis_server/lib/src/lsp/lsp_analysis_server.dart b/pkg/analysis_server/lib/src/lsp/lsp_analysis_server.dart
index baef2a9..17231cc 100644
--- a/pkg/analysis_server/lib/src/lsp/lsp_analysis_server.dart
+++ b/pkg/analysis_server/lib/src/lsp/lsp_analysis_server.dart
@@ -106,9 +106,6 @@
   @visibleForTesting
   int contextBuilds = 0;
 
-  /// The subscription to the stream of incoming messages from the client.
-  late final StreamSubscription<void> _channelSubscription;
-
   /// An optional manager to handle file systems which may not always be
   /// available.
   final DetachableFileSystemManager? detachableFileSystemManager;
@@ -117,13 +114,6 @@
   /// `sendStatusNotification` was invoked.
   bool wasAnalyzing = false;
 
-  /// Whether notifications caused by analysis should be suppressed.
-  ///
-  /// This is used when an operation is temporarily modifying overlays and does
-  /// not want the client to be notified of any analysis happening on the
-  /// temporary content.
-  bool suppressAnalysisResults = false;
-
   /// Tracks files that have non-empty diagnostics on the client.
   ///
   /// This is an optimization to avoid sending empty diagnostics when they are
@@ -181,11 +171,7 @@
         .listen(handleAnalysisEvent);
     analysisDriverScheduler.start();
 
-    _channelSubscription = channel.listen(
-      scheduleMessage,
-      onDone: done,
-      onError: socketError,
-    );
+    channel.listen(scheduleMessage, onDone: done, onError: socketError);
 
     if (AnalysisServer.supportsPlugins) {
       _pluginChangeSubscription = pluginManager.pluginsChanged.listen(
@@ -572,45 +558,6 @@
     }, socketError);
   }
 
-  /// Locks the server from processing incoming messages until [operation]
-  /// completes.
-  ///
-  /// This can be used to obtain analysis results/resolved units consistent with
-  /// the state of a file at the time this method was called, preventing
-  /// changes by incoming file modifications.
-  ///
-  /// The contents of [operation] should be kept as short as possible and since
-  /// cancellation requests will also be blocked for the duration of this
-  /// operation, handles should generally check the cancellation flag
-  /// immediately after this function returns.
-  Future<T> lockRequestsWhile<T>(FutureOr<T> Function() operation) async {
-    // TODO(dantup): Prevent this method from locking responses from the client
-    //  because this can lead to deadlocks if called during initialization where
-    //  the server may wait for something (configuration) from the client. This
-    //  might fit in with potential upcoming scheduler changes.
-    //
-    // This is currently used by Completion+FixAll (which are less likely, but
-    // possible to be called during init).
-    //
-    // https://github.com/dart-lang/sdk/issues/56311#issuecomment-2250089185
-    var completer = Completer<void>();
-
-    // Pause handling incoming messages until `operation` completes.
-    //
-    // If this method is called multiple times, the pauses will stack, meaning
-    // the subscription will not resume until all operations complete.
-    _channelSubscription.pause(completer.future);
-
-    try {
-      // `await` here is important to ensure `finally` doesn't execute until
-      // `operation()` completes (`whenComplete` is not available on
-      // `FutureOr`).
-      return await operation();
-    } finally {
-      completer.complete();
-    }
-  }
-
   /// Logs the error on the client using window/logMessage.
   void logErrorToClient(String message) {
     channel.sendNotification(
@@ -1305,15 +1252,6 @@
   }
 
   @override
-  void handleFileResult(FileResult result) {
-    if (analysisServer.suppressAnalysisResults) {
-      return;
-    }
-
-    super.handleFileResult(result);
-  }
-
-  @override
   void handleResolvedUnitResult(ResolvedUnitResult result) {
     var path = result.path;
 
diff --git a/pkg/analysis_server/lib/src/lsp/temporary_overlay_operation.dart b/pkg/analysis_server/lib/src/lsp/temporary_overlay_operation.dart
index 811ee93..2e00f4a 100644
--- a/pkg/analysis_server/lib/src/lsp/temporary_overlay_operation.dart
+++ b/pkg/analysis_server/lib/src/lsp/temporary_overlay_operation.dart
@@ -4,8 +4,8 @@
 
 import 'dart:async';
 
+import 'package:analysis_server/src/analysis_server.dart';
 import 'package:analysis_server/src/context_manager.dart';
-import 'package:analysis_server/src/lsp/lsp_analysis_server.dart';
 import 'package:analysis_server/src/protocol_server.dart';
 import 'package:analyzer/dart/analysis/analysis_context.dart';
 import 'package:analyzer/file_system/overlay_file_system.dart';
@@ -18,7 +18,7 @@
 /// need to be merged together (to be mappable to LSP document changes) and then
 /// reverted to allow the client to apply the change.
 abstract class TemporaryOverlayOperation {
-  final LspAnalysisServer server;
+  final AnalysisServer server;
   final ContextManager contextManager;
   final OverlayResourceProvider resourceProvider;
 
@@ -78,16 +78,16 @@
   }
 
   /// Locks the server from processing incoming messages until [operation]
-  /// completes just like [LspAnalysisServer.lockRequestsWhile] but
+  /// completes just like [AnalysisServer.pauseSchedulerWhile] but
   /// additionally provides a function for writing temporary overlays that will
   /// be reverted when the operation completes.
   ///
   /// Additionally, sending diagnostics, outlines, etc. are suppressed by the
   /// temporary overlays and re-enabled after the overlays are restored.
-  Future<T> lockRequestsWithTemporaryOverlays<T>(
+  Future<T> pauseSchedulerWithTemporaryOverlays<T>(
     Future<T> Function() operation,
   ) {
-    return server.lockRequestsWhile(() async {
+    return server.pauseSchedulerWhile(() async {
       // Wait for any in-progress analysis to complete before we start
       // suppressing analysis results.
       server.contextManager.pauseWatchers();
diff --git a/pkg/analysis_server/lib/src/scheduler/message_scheduler.dart b/pkg/analysis_server/lib/src/scheduler/message_scheduler.dart
index 7024f24..8d3220e 100644
--- a/pkg/analysis_server/lib/src/scheduler/message_scheduler.dart
+++ b/pkg/analysis_server/lib/src/scheduler/message_scheduler.dart
@@ -45,6 +45,11 @@
   /// Whether the [MessageScheduler] is currently processing messages.
   bool _isProcessing = false;
 
+  /// The number of times [pause] has been called without matching [resume]s.
+  ///
+  /// If zero, the queue is not paused.
+  int _pauseCount = 0;
+
   /// The completer used to indicate that message handling has been completed.
   Completer<void> _completer = Completer();
 
@@ -58,6 +63,9 @@
   /// around.
   MessageScheduler({required this.listener});
 
+  /// Whether the queue is currently paused.
+  bool get isPaused => _pauseCount > 0;
+
   /// Add the [message] to the end of the pending messages queue.
   ///
   /// Some incoming messages are handled immediately rather than being added to
@@ -177,12 +185,28 @@
     }
   }
 
+  /// Pauses processing messages.
+  ///
+  /// Any messages that are already being processed will continue until they
+  /// complete, but no new messages will be processed.
+  ///
+  /// If this method is called multiple times, [resume] will need to be called
+  /// an equal number of times for processing to continue.
+  void pause() {
+    _pauseCount++;
+    listener?.pauseProcessingMessages(_pauseCount);
+  }
+
   /// Dispatch the first message in the queue to be executed.
   void processMessages() async {
+    if (isPaused) {
+      return;
+    }
+
     _isProcessing = true;
     listener?.startProcessingMessages();
     try {
-      while (_pendingMessages.isNotEmpty) {
+      while (_pendingMessages.isNotEmpty && !isPaused) {
         var currentMessage = _pendingMessages.removeFirst();
         _activeMessages.addLast(currentMessage);
         listener?.addActiveMessage(currentMessage);
@@ -249,6 +273,21 @@
     listener?.endProcessingMessages();
   }
 
+  /// Resumes processing messages.
+  void resume() {
+    if (!isPaused) {
+      throw StateError('Cannot resume if not paused');
+    }
+    _pauseCount--;
+    listener?.resumeProcessingMessages(_pauseCount);
+    if (!isPaused && !_isProcessing) {
+      // Process on the next tick so that the caller to resume() doesn't get
+      // messages in the queue attributed to their time (or run before they
+      // complete).
+      Future.delayed(Duration.zero, processMessages);
+    }
+  }
+
   /// Returns the parameters of a cancellation [message].
   lsp.CancelParams? _getCancelParams(lsp.NotificationMessage message) {
     try {
@@ -477,6 +516,14 @@
   /// This implies that the message was active and wasn't cancelled.
   void messageCompleted(ScheduledMessage message);
 
+  /// Report that the pause counter was increased to [newPauseCount], and that
+  /// processing will be paused.
+  void pauseProcessingMessages(int newPauseCount);
+
+  /// Report that the pause counter was decreased to [newPauseCount] which, if
+  /// zero, indicates processing will resume.
+  void resumeProcessingMessages(int newPauseCount);
+
   /// Report that the loop that processes messages has started to run.
   void startProcessingMessages();
 }
diff --git a/pkg/analysis_server/lib/src/scheduler/scheduler_tracking_listener.dart b/pkg/analysis_server/lib/src/scheduler/scheduler_tracking_listener.dart
index bfcfc0b..d467bf2 100644
--- a/pkg/analysis_server/lib/src/scheduler/scheduler_tracking_listener.dart
+++ b/pkg/analysis_server/lib/src/scheduler/scheduler_tracking_listener.dart
@@ -113,23 +113,26 @@
 
   @override
   void cancelActiveMessage(ScheduledMessage message) {
-    var messageData = _messageDataMap.remove(message);
+    var messageData = _messageDataMap[message];
     if (messageData == null) {
       return;
     }
     messageData.completeTime = _now;
     messageData.wasCancelled = true;
-    _activeMessageCount--;
-    _reportMessageData(messageData);
+    // Don't decrement counts or report message data yet because
+    // cancelled messages still complete and call messageCompleted().
   }
 
   @override
   void cancelPendingMessage(ScheduledMessage message) {
-    var messageData = _messageDataMap.remove(message)!;
+    var messageData = _messageDataMap[message];
+    if (messageData == null) {
+      return;
+    }
     messageData.completeTime = _now;
     messageData.wasCancelled = true;
-    _pendingMessageCount--;
-    _reportMessageData(messageData);
+    // Don't decrement counts or report message data yet because
+    // cancelled messages still complete and call messageCompleted().
   }
 
   /// Report that the loop that processes messages has stopped running.
@@ -149,6 +152,16 @@
   }
 
   @override
+  void pauseProcessingMessages(int newPauseCount) {
+    // TODO(dantup): Consider tracking the pause start time if newPauseCount=1.
+  }
+
+  @override
+  void resumeProcessingMessages(int newPauseCount) {
+    // TODO(dantup): Consider recording the pause duration if newPauseCount=0.
+  }
+
+  @override
   void startProcessingMessages() {
     processingStartTime = _now;
     // var idleDuration = processingStartTime - processingEndTime;
diff --git a/pkg/analysis_server/test/integration/server/message_scheduler_test.dart b/pkg/analysis_server/test/integration/server/message_scheduler_test.dart
index d4d9966..b30c4f8 100644
--- a/pkg/analysis_server/test/integration/server/message_scheduler_test.dart
+++ b/pkg/analysis_server/test/integration/server/message_scheduler_test.dart
@@ -295,6 +295,88 @@
 ''');
   }
 
+  Future<void> test_pauseResume() async {
+    // Content isn't important, we just need a valid file to send requests for.
+    const content = '';
+    newFile(mainFilePath, content);
+
+    await initialize();
+    var futures = <Future<void>>[];
+
+    /// Helper to send two hover requests and pump the event queue, but not wait
+    /// for the requests to complete.
+    Future<void> sendHovers() async {
+      futures.add(getHover(mainFileUri, Position(line: 0, character: 0)));
+      futures.add(getHover(mainFileUri, Position(line: 0, character: 0)));
+      await pumpEventQueue(times: 5000);
+    }
+
+    /// Helper to resume the scheduler and pump the event queue to allow time
+    /// for processing to ensure the logs are in a consistent order.
+    Future<void> resume() async {
+      messageScheduler.resume();
+      await pumpEventQueue(times: 5000);
+    }
+
+    await sendHovers();
+    messageScheduler.pause(); // Pause 1
+    await sendHovers();
+    messageScheduler.pause(); // Pause 2
+    await sendHovers();
+    await resume(); // Resume 1
+    await sendHovers();
+    await resume(); // Resume 2
+
+    await Future.wait(futures);
+
+    _assertLogContents(testView!, r'''
+Incoming RequestMessage: lsp:initialize
+Entering process messages loop
+  Start LspMessage: lsp:initialize
+  Complete LspMessage: lsp:initialize
+Exit process messages loop
+Incoming NotificationMessage: lsp:initialized
+Entering process messages loop
+  Start LspMessage: lsp:initialized
+  Complete LspMessage: lsp:initialized
+Exit process messages loop
+Incoming RequestMessage: lsp:textDocument/hover
+Entering process messages loop
+  Start LspMessage: lsp:textDocument/hover
+  Complete LspMessage: lsp:textDocument/hover
+Exit process messages loop
+Incoming RequestMessage: lsp:textDocument/hover
+Entering process messages loop
+  Start LspMessage: lsp:textDocument/hover
+  Complete LspMessage: lsp:textDocument/hover
+Exit process messages loop
+Pause requested - there are now 1 pauses
+Incoming RequestMessage: lsp:textDocument/hover
+Incoming RequestMessage: lsp:textDocument/hover
+Pause requested - there are now 2 pauses
+Incoming RequestMessage: lsp:textDocument/hover
+Incoming RequestMessage: lsp:textDocument/hover
+Resume requested - there are now 1 pauses
+Incoming RequestMessage: lsp:textDocument/hover
+Incoming RequestMessage: lsp:textDocument/hover
+Resume requested - there are now 0 pauses
+Entering process messages loop
+  Start LspMessage: lsp:textDocument/hover
+  Complete LspMessage: lsp:textDocument/hover
+  Start LspMessage: lsp:textDocument/hover
+  Complete LspMessage: lsp:textDocument/hover
+  Start LspMessage: lsp:textDocument/hover
+  Complete LspMessage: lsp:textDocument/hover
+  Start LspMessage: lsp:textDocument/hover
+  Complete LspMessage: lsp:textDocument/hover
+  Start LspMessage: lsp:textDocument/hover
+  Complete LspMessage: lsp:textDocument/hover
+  Start LspMessage: lsp:textDocument/hover
+  Complete LspMessage: lsp:textDocument/hover
+Exit process messages loop
+''');
+  }
+
   Future<void> test_response() async {
     if (MessageScheduler.allowOverlappingHandlers) return;
 
diff --git a/pkg/analysis_server/test/lsp/code_actions_source_test.dart b/pkg/analysis_server/test/lsp/code_actions_source_test.dart
index f92ff04..ca1dbc0 100644
--- a/pkg/analysis_server/test/lsp/code_actions_source_test.dart
+++ b/pkg/analysis_server/test/lsp/code_actions_source_test.dart
@@ -162,10 +162,11 @@
       Method.workspace_applyEdit,
       ApplyWorkspaceEditParams.fromJson,
       () async {
-        // Apply the command and immediately modify a file afterwards.
+        // Apply the command and immediately modify a file afterwards without
+        // awaiting.
         var commandFuture = executeCommand(command);
-        await commandFuture;
         await replaceFile(12345, testFileUri, 'client-modified-content');
+
         return commandFuture;
       },
       handler: (edit) {
diff --git a/pkg/analysis_server/test/lsp/completion_dart_test.dart b/pkg/analysis_server/test/lsp/completion_dart_test.dart
index 566a16a..d432b89 100644
--- a/pkg/analysis_server/test/lsp/completion_dart_test.dart
+++ b/pkg/analysis_server/test/lsp/completion_dart_test.dart
@@ -1840,30 +1840,35 @@
         getCompletion(mainFileUri, position),
       ];
 
-      // Ensure all requests started, then let them continue.
-      await pumpEventQueue(times: 5000);
-      completer.complete();
+      var expectationFutures = [
+        expectLater(
+          responseFutures[0],
+          throwsA(
+            isResponseError(
+              ErrorCodes.RequestCancelled,
+              message: 'Another textDocument/completion request was started',
+            ),
+          ),
+        ),
+        expectLater(
+          responseFutures[1],
+          throwsA(
+            isResponseError(
+              ErrorCodes.RequestCancelled,
+              message: 'Another textDocument/completion request was started',
+            ),
+          ),
+        ),
+        expectLater(responseFutures[2], completion(isNotEmpty)),
+      ];
 
-      expect(
-        responseFutures[0],
-        throwsA(
-          isResponseError(
-            ErrorCodes.RequestCancelled,
-            message: 'Another textDocument/completion request was started',
-          ),
-        ),
-      );
-      expect(
-        responseFutures[1],
-        throwsA(
-          isResponseError(
-            ErrorCodes.RequestCancelled,
-            message: 'Another textDocument/completion request was started',
-          ),
-        ),
-      );
-      var results = await responseFutures[2];
-      expect(results, isNotEmpty);
+      // Ensure all requests started, then let them continue. This must be done
+      // after the expectations are set up above, because otherwise if the
+      // exceptions occur too quickly, they will be unhandled (whereas the
+      // expectations attach error handlers to them).
+      await pumpEventQueue(times: 50000);
+      completer.complete();
+      await Future.wait(expectationFutures);
     } finally {
       CompletionHandler.delayAfterResolveForTests = null;
     }
diff --git a/pkg/analysis_server/test/lsp/temporary_overlay_operation_test.dart b/pkg/analysis_server/test/lsp/temporary_overlay_operation_test.dart
index d427408..97386bb 100644
--- a/pkg/analysis_server/test/lsp/temporary_overlay_operation_test.dart
+++ b/pkg/analysis_server/test/lsp/temporary_overlay_operation_test.dart
@@ -127,5 +127,5 @@
 
   _TestTemporaryOverlayOperation(super.server, this.operation);
 
-  Future<void> doWork() => lockRequestsWithTemporaryOverlays(operation);
+  Future<void> doWork() => pauseSchedulerWithTemporaryOverlays(operation);
 }
diff --git a/pkg/analysis_server/test/utils/message_scheduler_test_view.dart b/pkg/analysis_server/test/utils/message_scheduler_test_view.dart
index 4bc8b01..0b92aab 100644
--- a/pkg/analysis_server/test/utils/message_scheduler_test_view.dart
+++ b/pkg/analysis_server/test/utils/message_scheduler_test_view.dart
@@ -44,6 +44,16 @@
   }
 
   @override
+  void pauseProcessingMessages(int newPauseCount) {
+    messageLog.add('Pause requested - there are now $newPauseCount pauses');
+  }
+
+  @override
+  void resumeProcessingMessages(int newPauseCount) {
+    messageLog.add('Resume requested - there are now $newPauseCount pauses');
+  }
+
+  @override
   void startProcessingMessages() {
     messageLog.add('Entering process messages loop');
   }