match analysis server logs when replaying scenarios

Change-Id: I68fa02bbcbfa667d65fe07abe69a9f04a47daa13
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/466240
Reviewed-by: Brian Wilkerson <brianwilkerson@google.com>
Auto-Submit: Jake Macdonald <jakemac@google.com>
Commit-Queue: Jake Macdonald <jakemac@google.com>
diff --git a/pkg/analysis_server/tool/log_player/log_player.dart b/pkg/analysis_server/tool/log_player/log_player.dart
index b0251f2..ba1dec9 100644
--- a/pkg/analysis_server/tool/log_player/log_player.dart
+++ b/pkg/analysis_server/tool/log_player/log_player.dart
@@ -2,10 +2,15 @@
 // for details. All rights reserved. Use of this source code is governed by a
 // BSD-style license that can be found in the LICENSE file.
 
+import 'dart:async';
+import 'dart:convert';
+import 'dart:io';
+
 import 'package:analysis_server/src/server/driver.dart';
 import 'package:analysis_server/src/session_logger/entry_kind.dart';
 import 'package:analysis_server/src/session_logger/log_entry.dart';
 import 'package:analysis_server/src/session_logger/process_id.dart';
+import 'package:collection/collection.dart';
 
 import 'log.dart';
 import 'server_driver.dart';
@@ -18,9 +23,6 @@
   /// The log to be played.
   Log log;
 
-  /// The object used to communicate with the running server.
-  ServerDriver? server;
-
   /// Whether the `shutdown` method has been seen.
   bool _hasSeenShutdown = false;
 
@@ -37,84 +39,76 @@
   Future<void> play() async {
     var entries = log.entries;
     var nextIndex = 0;
-    while (nextIndex < entries.length) {
-      // TODO(brianwilkerson): This doesn't currently attempt to retain the same
-      //  timing of messages as was recorded in the log.
-      var entry = entries[nextIndex];
-      switch (entry.kind) {
-        case EntryKind.commandLine:
-          if (this.server != null) {
-            throw StateError(
-              'Analysis server already started, only one instance is allowed.',
-            );
-          }
-          var server = this.server = ServerDriver(arguments: entry.argList);
-          await server.start();
-        case EntryKind.message:
-          if (entry.receiver == ProcessId.server) {
-            await _sendMessageToServer(entry);
-          } else if (entry.sender == ProcessId.server) {
-            _handleMessageFromServer(entry);
-          } else {
-            throw StateError('''
+    ServerDriver? server;
+    var pendingServerMessageExpectations = <Message>[];
+    try {
+      while (nextIndex < entries.length) {
+        // TODO(brianwilkerson): This doesn't currently attempt to retain the same
+        //  timing of messages as was recorded in the log.
+        var entry = entries[nextIndex];
+        switch (entry.kind) {
+          case EntryKind.commandLine:
+            if (server != null) {
+              throw StateError(
+                'Analysis server already started, only one instance is allowed.',
+              );
+            }
+            server = ServerDriver(arguments: entry.argList);
+            await server.start();
+            server.serverMessages.listen((message) {
+              var entryToRemove = pendingServerMessageExpectations
+                  .firstWhereOrNull(
+                    (expectation) => const MapEquality().equals(
+                      expectation.map,
+                      message.map,
+                    ),
+                  );
+              if (entryToRemove != null) {
+                pendingServerMessageExpectations.remove(entryToRemove);
+              } else {
+                stderr.writeln(
+                  'Unexpected message from analysis server:\n'
+                  '${jsonEncode(message)}',
+                );
+              }
+            });
+          case EntryKind.message:
+            if (entry.receiver == ProcessId.server) {
+              await _sendMessageToServer(entry, server);
+            } else if (entry.sender == ProcessId.server) {
+              pendingServerMessageExpectations.add(entry.message);
+              // TODO(jakemac): Remove this once are not reliant on consistent
+              // ordering.
+              await _waitForMessagesFromServer(
+                pendingServerMessageExpectations,
+              );
+            } else {
+              throw StateError('''
 Unexpected sender/receiver for message:
 
 sender: ${entry.sender}
 receiver: ${entry.receiver}
 ''');
-          }
-      }
-      nextIndex++;
-    }
-    await _readMessagesFromServer();
-    if (!_hasSeenShutdown) {
-      server?.shutdown();
-    }
-    if (!_hasSeenExit) {
-      server?.exit();
-    }
-    await _readMessagesFromServer();
-    server = null;
-  }
-
-  /// Responds to a message sent from the server to some other process.
-  void _handleMessageFromServer(LogEntry entry) {
-    var message = entry.message;
-    switch (entry.receiver) {
-      case ProcessId.dtd:
-        throw UnimplementedError();
-      case ProcessId.ide:
-        if (message.isLogMessage ||
-            message.isShowDocument ||
-            message.isShowMessage ||
-            message.isShowMessageRequest) {
-          // The response from the client should be recorded in the log, so it
-          // will eventually be sent to the server.
-          return;
+            }
         }
-      // throw UnimplementedError();
-      case ProcessId.plugin:
-        throw UnimplementedError();
-      case ProcessId.server:
-        throw StateError(
-          'Cannot send a message from the server to the server.',
-        );
-      case ProcessId.watcher:
-        throw StateError(
-          'Cannot send a message from the server to the file watcher.',
-        );
+        nextIndex++;
+      }
+      await _waitForMessagesFromServer(pendingServerMessageExpectations);
+    } finally {
+      if (!_hasSeenShutdown) {
+        server?.shutdown();
+      }
+      if (!_hasSeenExit) {
+        server?.exit();
+      }
     }
   }
 
-  /// Wait for the server to process any messages that it may have received and
-  /// for any responses to be sent back.
-  Future<void> _readMessagesFromServer() async {
-    await Future.delayed(const Duration(seconds: 1));
-  }
-
   /// Sends the message in the [entry] to the server.
-  Future<void> _sendMessageToServer(LogEntry entry) async {
-    var server = this.server;
+  Future<void> _sendMessageToServer(
+    LogEntry entry,
+    ServerDriver? server,
+  ) async {
     if (server == null) {
       throw StateError('Analysis server not started.');
     }
@@ -136,7 +130,6 @@
           _hasSeenShutdown = true;
         } else if (message.isExit) {
           _hasSeenExit = true;
-          this.server = null;
         }
         server.sendMessageFromIde(message);
       case ProcessId.plugin:
@@ -149,4 +142,23 @@
         server.sendMessageFromFileWatcher(message);
     }
   }
+
+  /// Waits up to 5 seconds for [pendingServerMessageExpectations] to be
+  /// emptied out.
+  Future<void> _waitForMessagesFromServer(
+    List<Message> pendingServerMessageExpectations,
+  ) async {
+    if (pendingServerMessageExpectations.isEmpty) return;
+    var watch = Stopwatch()..start();
+    while (watch.elapsed < const Duration(seconds: 5)) {
+      if (pendingServerMessageExpectations.isEmpty) {
+        return;
+      }
+      await Future.delayed(const Duration(milliseconds: 50));
+    }
+    throw TimeoutException(
+      'Timed out waiting for analysis server messages:\n\n'
+      '${pendingServerMessageExpectations.join('\n\n')}',
+    );
+  }
 }
diff --git a/pkg/analysis_server/tool/log_player/server_driver.dart b/pkg/analysis_server/tool/log_player/server_driver.dart
index 205b7c0..c9564dd 100644
--- a/pkg/analysis_server/tool/log_player/server_driver.dart
+++ b/pkg/analysis_server/tool/log_player/server_driver.dart
@@ -2,6 +2,7 @@
 // for details. All rights reserved. Use of this source code is governed by a
 // BSD-style license that can be found in the LICENSE file.
 
+import 'dart:async';
 import 'dart:convert';
 import 'dart:io';
 
@@ -26,8 +27,9 @@
   /// server has not been connected to DTD using [connectToDtd].
   WebSocket? _dtdSocket;
 
-  /// The messages read from the analysis server's stdout.
-  final List<String> _messagesFromServer = [];
+  /// Stream controller for analysis server output messages.
+  final StreamController<Message> _serverMessagesController =
+      StreamController();
 
   /// Creates a new driver that can be used to communicate with a server.
   ///
@@ -68,6 +70,9 @@
   ServerDriver._({required this.arguments, required ServerProtocol protocol})
     : _protocol = protocol;
 
+  /// The messages read from the analysis server's stdout.
+  Stream<Message> get serverMessages => _serverMessagesController.stream;
+
   /// Returns the path to the `dart` executable.
   String get _dartExecutable {
     return Platform.resolvedExecutable;
@@ -100,6 +105,7 @@
     _stdinSink = null;
     _dtdSocket?.close();
     _dtdSocket = null;
+    _serverMessagesController.close();
   }
 
   void sendMessageFromDTD(Message message) {
@@ -203,7 +209,11 @@
   }
 
   void _receiveMessageFromServer(String message) {
-    _messagesFromServer.add(message);
+    if (_serverMessagesController.isClosed) {
+      stderr.writeln('Got analysis server message after shutdown:\n$message');
+    } else {
+      _serverMessagesController.add(jsonDecode(message) as Message);
+    }
   }
 }