match analysis server logs when replaying scenarios
Change-Id: I68fa02bbcbfa667d65fe07abe69a9f04a47daa13
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/466240
Reviewed-by: Brian Wilkerson <brianwilkerson@google.com>
Auto-Submit: Jake Macdonald <jakemac@google.com>
Commit-Queue: Jake Macdonald <jakemac@google.com>
diff --git a/pkg/analysis_server/tool/log_player/log_player.dart b/pkg/analysis_server/tool/log_player/log_player.dart
index b0251f2..ba1dec9 100644
--- a/pkg/analysis_server/tool/log_player/log_player.dart
+++ b/pkg/analysis_server/tool/log_player/log_player.dart
@@ -2,10 +2,15 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
+import 'dart:convert';
+import 'dart:io';
+
import 'package:analysis_server/src/server/driver.dart';
import 'package:analysis_server/src/session_logger/entry_kind.dart';
import 'package:analysis_server/src/session_logger/log_entry.dart';
import 'package:analysis_server/src/session_logger/process_id.dart';
+import 'package:collection/collection.dart';
import 'log.dart';
import 'server_driver.dart';
@@ -18,9 +23,6 @@
/// The log to be played.
Log log;
- /// The object used to communicate with the running server.
- ServerDriver? server;
-
/// Whether the `shutdown` method has been seen.
bool _hasSeenShutdown = false;
@@ -37,84 +39,76 @@
Future<void> play() async {
var entries = log.entries;
var nextIndex = 0;
- while (nextIndex < entries.length) {
- // TODO(brianwilkerson): This doesn't currently attempt to retain the same
- // timing of messages as was recorded in the log.
- var entry = entries[nextIndex];
- switch (entry.kind) {
- case EntryKind.commandLine:
- if (this.server != null) {
- throw StateError(
- 'Analysis server already started, only one instance is allowed.',
- );
- }
- var server = this.server = ServerDriver(arguments: entry.argList);
- await server.start();
- case EntryKind.message:
- if (entry.receiver == ProcessId.server) {
- await _sendMessageToServer(entry);
- } else if (entry.sender == ProcessId.server) {
- _handleMessageFromServer(entry);
- } else {
- throw StateError('''
+ ServerDriver? server;
+ var pendingServerMessageExpectations = <Message>[];
+ try {
+ while (nextIndex < entries.length) {
+ // TODO(brianwilkerson): This doesn't currently attempt to retain the same
+ // timing of messages as was recorded in the log.
+ var entry = entries[nextIndex];
+ switch (entry.kind) {
+ case EntryKind.commandLine:
+ if (server != null) {
+ throw StateError(
+ 'Analysis server already started, only one instance is allowed.',
+ );
+ }
+ server = ServerDriver(arguments: entry.argList);
+ await server.start();
+ server.serverMessages.listen((message) {
+ var entryToRemove = pendingServerMessageExpectations
+ .firstWhereOrNull(
+ (expectation) => const MapEquality().equals(
+ expectation.map,
+ message.map,
+ ),
+ );
+ if (entryToRemove != null) {
+ pendingServerMessageExpectations.remove(entryToRemove);
+ } else {
+ stderr.writeln(
+ 'Unexpected message from analysis server:\n'
+ '${jsonEncode(message)}',
+ );
+ }
+ });
+ case EntryKind.message:
+ if (entry.receiver == ProcessId.server) {
+ await _sendMessageToServer(entry, server);
+ } else if (entry.sender == ProcessId.server) {
+ pendingServerMessageExpectations.add(entry.message);
+ // TODO(jakemac): Remove this once are not reliant on consistent
+ // ordering.
+ await _waitForMessagesFromServer(
+ pendingServerMessageExpectations,
+ );
+ } else {
+ throw StateError('''
Unexpected sender/receiver for message:
sender: ${entry.sender}
receiver: ${entry.receiver}
''');
- }
- }
- nextIndex++;
- }
- await _readMessagesFromServer();
- if (!_hasSeenShutdown) {
- server?.shutdown();
- }
- if (!_hasSeenExit) {
- server?.exit();
- }
- await _readMessagesFromServer();
- server = null;
- }
-
- /// Responds to a message sent from the server to some other process.
- void _handleMessageFromServer(LogEntry entry) {
- var message = entry.message;
- switch (entry.receiver) {
- case ProcessId.dtd:
- throw UnimplementedError();
- case ProcessId.ide:
- if (message.isLogMessage ||
- message.isShowDocument ||
- message.isShowMessage ||
- message.isShowMessageRequest) {
- // The response from the client should be recorded in the log, so it
- // will eventually be sent to the server.
- return;
+ }
}
- // throw UnimplementedError();
- case ProcessId.plugin:
- throw UnimplementedError();
- case ProcessId.server:
- throw StateError(
- 'Cannot send a message from the server to the server.',
- );
- case ProcessId.watcher:
- throw StateError(
- 'Cannot send a message from the server to the file watcher.',
- );
+ nextIndex++;
+ }
+ await _waitForMessagesFromServer(pendingServerMessageExpectations);
+ } finally {
+ if (!_hasSeenShutdown) {
+ server?.shutdown();
+ }
+ if (!_hasSeenExit) {
+ server?.exit();
+ }
}
}
- /// Wait for the server to process any messages that it may have received and
- /// for any responses to be sent back.
- Future<void> _readMessagesFromServer() async {
- await Future.delayed(const Duration(seconds: 1));
- }
-
/// Sends the message in the [entry] to the server.
- Future<void> _sendMessageToServer(LogEntry entry) async {
- var server = this.server;
+ Future<void> _sendMessageToServer(
+ LogEntry entry,
+ ServerDriver? server,
+ ) async {
if (server == null) {
throw StateError('Analysis server not started.');
}
@@ -136,7 +130,6 @@
_hasSeenShutdown = true;
} else if (message.isExit) {
_hasSeenExit = true;
- this.server = null;
}
server.sendMessageFromIde(message);
case ProcessId.plugin:
@@ -149,4 +142,23 @@
server.sendMessageFromFileWatcher(message);
}
}
+
+ /// Waits up to 5 seconds for [pendingServerMessageExpectations] to be
+ /// emptied out.
+ Future<void> _waitForMessagesFromServer(
+ List<Message> pendingServerMessageExpectations,
+ ) async {
+ if (pendingServerMessageExpectations.isEmpty) return;
+ var watch = Stopwatch()..start();
+ while (watch.elapsed < const Duration(seconds: 5)) {
+ if (pendingServerMessageExpectations.isEmpty) {
+ return;
+ }
+ await Future.delayed(const Duration(milliseconds: 50));
+ }
+ throw TimeoutException(
+ 'Timed out waiting for analysis server messages:\n\n'
+ '${pendingServerMessageExpectations.join('\n\n')}',
+ );
+ }
}
diff --git a/pkg/analysis_server/tool/log_player/server_driver.dart b/pkg/analysis_server/tool/log_player/server_driver.dart
index 205b7c0..c9564dd 100644
--- a/pkg/analysis_server/tool/log_player/server_driver.dart
+++ b/pkg/analysis_server/tool/log_player/server_driver.dart
@@ -2,6 +2,7 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
import 'dart:convert';
import 'dart:io';
@@ -26,8 +27,9 @@
/// server has not been connected to DTD using [connectToDtd].
WebSocket? _dtdSocket;
- /// The messages read from the analysis server's stdout.
- final List<String> _messagesFromServer = [];
+ /// Stream controller for analysis server output messages.
+ final StreamController<Message> _serverMessagesController =
+ StreamController();
/// Creates a new driver that can be used to communicate with a server.
///
@@ -68,6 +70,9 @@
ServerDriver._({required this.arguments, required ServerProtocol protocol})
: _protocol = protocol;
+ /// The messages read from the analysis server's stdout.
+ Stream<Message> get serverMessages => _serverMessagesController.stream;
+
/// Returns the path to the `dart` executable.
String get _dartExecutable {
return Platform.resolvedExecutable;
@@ -100,6 +105,7 @@
_stdinSink = null;
_dtdSocket?.close();
_dtdSocket = null;
+ _serverMessagesController.close();
}
void sendMessageFromDTD(Message message) {
@@ -203,7 +209,11 @@
}
void _receiveMessageFromServer(String message) {
- _messagesFromServer.add(message);
+ if (_serverMessagesController.isClosed) {
+ stderr.writeln('Got analysis server message after shutdown:\n$message');
+ } else {
+ _serverMessagesController.add(jsonDecode(message) as Message);
+ }
}
}