Batch evaluation requests when possible (#1746)

* Batch expression evaluations from the same library

* Update changelog

* Fix tests failures

* Destroy isolate on app connection exit

* Fix failure to compile debug extension and client with dart2js
diff --git a/dwds/CHANGELOG.md b/dwds/CHANGELOG.md
index 1ce76f1..612b520 100644
--- a/dwds/CHANGELOG.md
+++ b/dwds/CHANGELOG.md
@@ -1,7 +1,11 @@
 ## 16.0.1-dev
 
-- Allow `LoadStrategy.serverPathForModule` and `LoadStrategy.sourceMapPathForModule`
-  to return `null` and add error handling.
+- Allow the following API to return `null` and add error handling:
+  - `LoadStrategy.serverPathForModule`
+  - `LoadStrategy.sourceMapPathForModule`
+- Expression evaluation performance improvement:
+  - Batch `ChromeProxyService.evaluate()` requests that are close in time
+    and are executed in the same library and scope.
 
 ## 16.0.0
 
diff --git a/dwds/debug_extension/web/background.dart b/dwds/debug_extension/web/background.dart
index a7a25e5..350291c 100644
--- a/dwds/debug_extension/web/background.dart
+++ b/dwds/debug_extension/web/background.dart
@@ -20,7 +20,10 @@
 import 'package:dwds/data/extension_request.dart';
 import 'package:dwds/data/serializers.dart';
 import 'package:dwds/src/sockets.dart';
-import 'package:dwds/src/utilities/batched_stream.dart';
+// NOTE(annagrin): using 'package:dwds/src/utilities/batched_stream.dart'
+// makes dart2js skip creating background.js, so we use a copy instead.
+// import 'package:dwds/src/utilities/batched_stream.dart';
+import 'package:dwds/src/web_utilities/batched_stream.dart';
 import 'package:js/js.dart';
 import 'package:js/js_util.dart' as js_util;
 import 'package:pub_semver/pub_semver.dart';
diff --git a/dwds/lib/src/connections/app_connection.dart b/dwds/lib/src/connections/app_connection.dart
index 1dd7ed1..fc19384 100644
--- a/dwds/lib/src/connections/app_connection.dart
+++ b/dwds/lib/src/connections/app_connection.dart
@@ -15,14 +15,19 @@
   /// The initial connection request sent from the application in the browser.
   final ConnectRequest request;
   final _startedCompleter = Completer<void>();
+  final _doneCompleter = Completer<void>();
   final SocketConnection _connection;
 
-  AppConnection(this.request, this._connection);
+  AppConnection(this.request, this._connection) {
+    unawaited(_connection.sink.done.then((v) => _doneCompleter.complete()));
+  }
 
   bool get isInKeepAlivePeriod => _connection.isInKeepAlivePeriod;
   void shutDown() => _connection.shutdown();
   bool get isStarted => _startedCompleter.isCompleted;
   Future<void> get onStart => _startedCompleter.future;
+  bool get isDone => _doneCompleter.isCompleted;
+  Future<void> get onDone => _doneCompleter.future;
 
   void runMain() {
     if (_startedCompleter.isCompleted) {
diff --git a/dwds/lib/src/injected/client.js b/dwds/lib/src/injected/client.js
index 0390ffc..66f32e9 100644
--- a/dwds/lib/src/injected/client.js
+++ b/dwds/lib/src/injected/client.js
@@ -1,4 +1,4 @@
-// Generated by dart2js (NullSafetyMode.sound, csp, deferred-serialization, intern-composite-values), the Dart to JavaScript compiler version: 2.19.0-edge.6682ac145d5b99fa05a034c7838b94c5d1143f78.
+// Generated by dart2js (NullSafetyMode.sound, csp, deferred-serialization, intern-composite-values), the Dart to JavaScript compiler version: 2.19.0-177.0.dev.
 // The code supports the following hooks:
 // dartPrint(message):
 //    if this function is defined it is called instead of the Dart [print]
diff --git a/dwds/lib/src/services/batched_expression_evaluator.dart b/dwds/lib/src/services/batched_expression_evaluator.dart
new file mode 100644
index 0000000..64338fd
--- /dev/null
+++ b/dwds/lib/src/services/batched_expression_evaluator.dart
@@ -0,0 +1,135 @@
+// Copyright (c) 2022, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:collection/collection.dart';
+import 'package:dwds/src/utilities/domain.dart';
+import 'package:logging/logging.dart';
+import 'package:webkit_inspection_protocol/webkit_inspection_protocol.dart';
+
+import '../debugging/debugger.dart';
+import '../debugging/location.dart';
+import '../debugging/modules.dart';
+import '../utilities/batched_stream.dart';
+import 'expression_compiler.dart';
+import 'expression_evaluator.dart';
+
+class EvaluateRequest {
+  final String isolateId;
+  final String? libraryUri;
+  final String expression;
+  final Map<String, String>? scope;
+  final completer = Completer<RemoteObject>();
+
+  EvaluateRequest(this.isolateId, this.libraryUri, this.expression, this.scope);
+}
+
+class BatchedExpressionEvaluator extends ExpressionEvaluator {
+  final _logger = Logger('BatchedExpressionEvaluator');
+  final Debugger _debugger;
+  final _requestController =
+      BatchedStreamController<EvaluateRequest>(delay: 200);
+
+  BatchedExpressionEvaluator(
+    String entrypoint,
+    AppInspectorInterface inspector,
+    this._debugger,
+    Locations locations,
+    Modules modules,
+    ExpressionCompiler compiler,
+  ) : super(entrypoint, inspector, _debugger, locations, modules, compiler) {
+    _requestController.stream.listen(_processRequest);
+  }
+
+  @override
+  void close() {
+    _logger.fine('Closed');
+    _requestController.close();
+  }
+
+  @override
+  Future<RemoteObject> evaluateExpression(
+    String isolateId,
+    String? libraryUri,
+    String expression,
+    Map<String, String>? scope,
+  ) {
+    final request = EvaluateRequest(isolateId, libraryUri, expression, scope);
+    _requestController.sink.add(request);
+    return request.completer.future;
+  }
+
+  void _processRequest(List<EvaluateRequest> requests) async {
+    String? libraryUri;
+    String? isolateId;
+    Map<String, String>? scope;
+    List<EvaluateRequest> currentRequests = [];
+
+    for (var request in requests) {
+      libraryUri ??= request.libraryUri;
+      isolateId ??= request.isolateId;
+      scope ??= request.scope;
+
+      if (libraryUri != request.libraryUri ||
+          isolateId != request.isolateId ||
+          !MapEquality().equals(scope, request.scope)) {
+        _logger.fine('New batch due to');
+        if (libraryUri != request.libraryUri) {
+          _logger.fine(' - library uri: $libraryUri != ${request.libraryUri}');
+        }
+        if (isolateId != request.isolateId) {
+          _logger.fine(' - isolateId: $isolateId != ${request.isolateId}');
+        }
+        if (!MapEquality().equals(scope, request.scope)) {
+          _logger.fine(' - scope: $scope != ${request.scope}');
+        }
+
+        unawaited(_evaluateBatch(currentRequests));
+        currentRequests = [];
+        libraryUri = request.libraryUri;
+        isolateId = request.isolateId;
+        scope = request.scope;
+      }
+      currentRequests.add(request);
+    }
+    unawaited(_evaluateBatch(currentRequests));
+  }
+
+  Future<void> _evaluateBatch(List<EvaluateRequest> requests) async {
+    if (requests.isEmpty) return;
+
+    final first = requests.first;
+    if (requests.length == 1) {
+      if (first.completer.isCompleted) return;
+      return super
+          .evaluateExpression(
+              first.isolateId, first.libraryUri, first.expression, first.scope)
+          .then(requests.first.completer.complete);
+    }
+
+    final expressions = requests.map((r) => r.expression).join(', ');
+    final batchedExpression = '[ $expressions ]';
+
+    _logger.fine('Evaluating batch of expressions $batchedExpression');
+
+    final RemoteObject list = await super.evaluateExpression(
+        first.isolateId, first.libraryUri, batchedExpression, first.scope);
+
+    for (var i = 0; i < requests.length; i++) {
+      final request = requests[i];
+      if (request.completer.isCompleted) continue;
+      _logger.fine('Getting result out of a batch for ${request.expression}');
+      _debugger
+          .getProperties(list.objectId!,
+              offset: i, count: 1, length: requests.length)
+          .then((v) {
+        final result = v.first.value;
+        _logger.fine(
+            'Got result out of a batch for ${request.expression}: $result');
+        request.completer.complete(result);
+      });
+    }
+  }
+}
diff --git a/dwds/lib/src/services/chrome_proxy_service.dart b/dwds/lib/src/services/chrome_proxy_service.dart
index 2d3a89e..daf4237 100644
--- a/dwds/lib/src/services/chrome_proxy_service.dart
+++ b/dwds/lib/src/services/chrome_proxy_service.dart
@@ -30,6 +30,7 @@
 import '../utilities/sdk_configuration.dart';
 import '../utilities/shared.dart';
 import 'expression_evaluator.dart';
+import 'batched_expression_evaluator.dart';
 
 /// A proxy from the chrome debug protocol to the dart vm service protocol.
 class ChromeProxyService implements VmServiceInterface {
@@ -241,7 +242,7 @@
     final compiler = _compiler;
     _expressionEvaluator = compiler == null
         ? null
-        : ExpressionEvaluator(
+        : BatchedExpressionEvaluator(
             entrypoint,
             inspector,
             debugger,
@@ -259,6 +260,8 @@
       _startedCompleter.complete();
     }));
 
+    unawaited(appConnection.onDone.then((_) => destroyIsolate()));
+
     final isolateRef = inspector.isolateRef;
     final timestamp = DateTime.now().millisecondsSinceEpoch;
 
@@ -301,6 +304,7 @@
   ///
   /// Clears out the [_inspector] and all related cached information.
   void destroyIsolate() {
+    _logger.fine('Destroying isolate');
     if (!_isIsolateRunning) return;
     final isolate = inspector.isolate;
     final isolateRef = inspector.isolateRef;
@@ -318,6 +322,7 @@
     _inspector = null;
     _previousBreakpoints.clear();
     _previousBreakpoints.addAll(isolate.breakpoints ?? []);
+    _expressionEvaluator?.close();
     _consoleSubscription?.cancel();
     _consoleSubscription = null;
   }
diff --git a/dwds/lib/src/services/expression_evaluator.dart b/dwds/lib/src/services/expression_evaluator.dart
index 6ae50ff..35aef1c 100644
--- a/dwds/lib/src/services/expression_evaluator.dart
+++ b/dwds/lib/src/services/expression_evaluator.dart
@@ -2,6 +2,8 @@
 // for details. All rights reserved. Use of this source code is governed by a
 // BSD-style license that can be found in the LICENSE file.
 
+import 'dart:async';
+
 import 'package:dwds/src/utilities/domain.dart';
 import 'package:logging/logging.dart';
 import 'package:webkit_inspection_protocol/webkit_inspection_protocol.dart';
@@ -61,6 +63,8 @@
         <String, String>{'type': '$severity', 'value': message});
   }
 
+  void close() {}
+
   /// Evaluate dart expression inside a given library.
   ///
   /// Uses ExpressionCompiler interface to compile the expression to
diff --git a/dwds/lib/src/web_utilities/batched_stream.dart b/dwds/lib/src/web_utilities/batched_stream.dart
new file mode 100644
index 0000000..6da6465
--- /dev/null
+++ b/dwds/lib/src/web_utilities/batched_stream.dart
@@ -0,0 +1,85 @@
+// Copyright (c) 2022, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'package:async/async.dart';
+
+/// Stream controller allowing to batch events.
+class BatchedStreamController<T> {
+  static const _defaultBatchDelayMilliseconds = 1000;
+  static const _checkDelayMilliseconds = 100;
+
+  final int _batchDelayMilliseconds;
+
+  final StreamController<T> _inputController;
+  late StreamQueue<T> _inputQueue;
+
+  final StreamController<List<T>> _outputController;
+  final Completer<bool> _completer = Completer<bool>();
+
+  /// Create batched stream controller.
+  ///
+  /// Collects events from input [sink] and emits them in batches to the
+  /// output [stream] every [delay] milliseconds. Keeps the original order.
+  BatchedStreamController({
+    int delay = _defaultBatchDelayMilliseconds,
+  })  : _batchDelayMilliseconds = delay,
+        _inputController = StreamController<T>(),
+        _outputController = StreamController<List<T>>() {
+    _inputQueue = StreamQueue<T>(_inputController.stream);
+    unawaited(_batchAndSendEvents());
+  }
+
+  /// Sink collecting events.
+  StreamSink<T> get sink => _inputController.sink;
+
+  /// Output stream of batch events.
+  Stream<List<T>> get stream => _outputController.stream;
+
+  /// Close the controller.
+  Future<dynamic> close() async {
+    unawaited(_inputController.close());
+    return _completer.future.then((value) => _outputController.close());
+  }
+
+  /// Send events to the output in a batch every [_batchDelayMilliseconds].
+  Future<void> _batchAndSendEvents() async {
+    const duration = Duration(milliseconds: _checkDelayMilliseconds);
+    final buffer = <T>[];
+
+    // Batch events every `_batchDelayMilliseconds`.
+    //
+    // Note that events might arrive at random intervals, so collecting
+    // a predetermined number of events to send in a batch might delay
+    // the batch indefinitely.  Instead, check for new events every
+    // `_checkDelayMilliseconds` to make sure batches are sent in regular
+    // intervals.
+    var lastSendTime = DateTime.now().millisecondsSinceEpoch;
+    while (await _hasEventOrTimeOut(duration)) {
+      if (await _hasEventDuring(duration)) {
+        buffer.add(await _inputQueue.next);
+      }
+
+      final now = DateTime.now().millisecondsSinceEpoch;
+      if (now > lastSendTime + _batchDelayMilliseconds) {
+        lastSendTime = now;
+        if (buffer.isNotEmpty) {
+          _outputController.sink.add(List.from(buffer));
+          buffer.clear();
+        }
+      }
+    }
+
+    if (buffer.isNotEmpty) {
+      _outputController.sink.add(List.from(buffer));
+    }
+    _completer.complete(true);
+  }
+
+  Future<bool> _hasEventOrTimeOut(Duration duration) =>
+      _inputQueue.hasNext.timeout(duration, onTimeout: () => true);
+
+  Future<bool> _hasEventDuring(Duration duration) =>
+      _inputQueue.hasNext.timeout(duration, onTimeout: () => false);
+}
diff --git a/dwds/test/evaluate_common.dart b/dwds/test/evaluate_common.dart
index 3ca60cc..b03ceff 100644
--- a/dwds/test/evaluate_common.dart
+++ b/dwds/test/evaluate_common.dart
@@ -539,6 +539,25 @@
 
       tearDown(() async {});
 
+      test('in parallel (in a batch)', () async {
+        final library = isolate.rootLib!;
+        final evaluation1 = setup.service
+            .evaluate(isolateId, library.id!, 'MainClass(0).toString()');
+        final evaluation2 = setup.service
+            .evaluate(isolateId, library.id!, 'MainClass(1).toString()');
+
+        final results = await Future.wait([evaluation1, evaluation2]);
+        expect(
+            results[0],
+            const TypeMatcher<InstanceRef>().having(
+                (instance) => instance.valueAsString, 'valueAsString', '0'));
+
+        expect(
+            results[1],
+            const TypeMatcher<InstanceRef>().having(
+                (instance) => instance.valueAsString, 'valueAsString', '1'));
+      });
+
       test('with scope override', () async {
         final library = isolate.rootLib!;
         final object = await setup.service
diff --git a/dwds/test/expression_compiler_service_test.dart b/dwds/test/expression_compiler_service_test.dart
index ab873b8..dc4f267 100644
--- a/dwds/test/expression_compiler_service_test.dart
+++ b/dwds/test/expression_compiler_service_test.dart
@@ -165,6 +165,83 @@
 
       await stop();
     });
+
+    test('can evaluate multiple expressions', () async {
+      expect(output.stream, neverEmits(contains('[SEVERE]')));
+      expect(
+          output.stream,
+          emitsThrough(contains(
+              '[INFO] ExpressionCompilerService: Updating dependencies...')));
+      expect(
+          output.stream,
+          emitsThrough(contains(
+              '[INFO] ExpressionCompilerService: Updated dependencies.')));
+
+      expect(output.stream,
+          emitsThrough(contains('[INFO] ExpressionCompilerService: Stopped.')));
+      final result = await service
+          .updateDependencies({'try': ModuleInfo('try.full.dill', 'try.dill')});
+      expect(result, true, reason: 'failed to update dependencies');
+
+      final compilationResult1 = await service.compileExpressionToJs(
+          '0', 'org-dartlang-app:/try.dart', 2, 1, {}, {}, 'try', 'true');
+      final compilationResult2 = await service.compileExpressionToJs(
+          '0', 'org-dartlang-app:/try.dart', 2, 1, {}, {}, 'try', 'false');
+
+      expect(
+          compilationResult1,
+          isA<ExpressionCompilationResult>()
+              .having((r) => r.result, 'result', contains('return true;'))
+              .having((r) => r.isError, 'isError', false));
+
+      expect(
+          compilationResult2,
+          isA<ExpressionCompilationResult>()
+              .having((r) => r.result, 'result', contains('return false;'))
+              .having((r) => r.isError, 'isError', false));
+
+      await stop();
+    });
+
+    test('can compile multiple expressions in parallel', () async {
+      expect(output.stream, neverEmits(contains('[SEVERE]')));
+      expect(
+          output.stream,
+          emitsThrough(contains(
+              '[INFO] ExpressionCompilerService: Updating dependencies...')));
+      expect(
+          output.stream,
+          emitsThrough(contains(
+              '[INFO] ExpressionCompilerService: Updated dependencies.')));
+
+      expect(output.stream,
+          emitsThrough(contains('[INFO] ExpressionCompilerService: Stopped.')));
+      final result = await service
+          .updateDependencies({'try': ModuleInfo('try.full.dill', 'try.dill')});
+      expect(result, true, reason: 'failed to update dependencies');
+
+      final compilationResult1 = service.compileExpressionToJs(
+          '0', 'org-dartlang-app:/try.dart', 2, 1, {}, {}, 'try', 'true');
+      final compilationResult2 = service.compileExpressionToJs(
+          '0', 'org-dartlang-app:/try.dart', 2, 1, {}, {}, 'try', 'false');
+
+      final results =
+          await Future.wait([compilationResult1, compilationResult2]);
+
+      expect(
+          results[0],
+          isA<ExpressionCompilationResult>()
+              .having((r) => r.result, 'result', contains('return true;'))
+              .having((r) => r.isError, 'isError', false));
+
+      expect(
+          results[1],
+          isA<ExpressionCompilationResult>()
+              .having((r) => r.result, 'result', contains('return false;'))
+              .having((r) => r.isError, 'isError', false));
+
+      await stop();
+    });
   });
 }
 
diff --git a/dwds/web/client.dart b/dwds/web/client.dart
index ac7eb7f..8fc88ad 100644
--- a/dwds/web/client.dart
+++ b/dwds/web/client.dart
@@ -19,7 +19,10 @@
 import 'package:dwds/data/run_request.dart';
 import 'package:dwds/data/serializers.dart';
 import 'package:dwds/src/sockets.dart';
-import 'package:dwds/src/utilities/batched_stream.dart';
+// NOTE(annagrin): using 'package:dwds/src/utilities/batched_stream.dart'
+// makes dart2js skip creating background.js, so we use a copy instead.
+// import 'package:dwds/src/utilities/batched_stream.dart';
+import 'package:dwds/src/web_utilities/batched_stream.dart';
 import 'package:js/js.dart';
 import 'package:sse/client/sse_client.dart';
 import 'package:uuid/uuid.dart';
diff --git a/webdev/test/e2e_test.dart b/webdev/test/e2e_test.dart
index 29a4b6b..ecdb353 100644
--- a/webdev/test/e2e_test.dart
+++ b/webdev/test/e2e_test.dart
@@ -321,6 +321,7 @@
                 wsUri = getDebugServiceUri(message as String);
                 return wsUri != null;
               }));
+              Logger.root.fine('vm service uri: $wsUri');
               expect(wsUri, isNotNull);
 
               vmService = await vmServiceConnectUri(wsUri);