Batch evaluation requests when possible (#1746)
* Batch expression evaluations from the same library
* Update changelog
* Fix tests failures
* Destroy isolate on app connection exit
* Fix failure to compile debug extension and client with dart2js
diff --git a/dwds/CHANGELOG.md b/dwds/CHANGELOG.md
index 1ce76f1..612b520 100644
--- a/dwds/CHANGELOG.md
+++ b/dwds/CHANGELOG.md
@@ -1,7 +1,11 @@
## 16.0.1-dev
-- Allow `LoadStrategy.serverPathForModule` and `LoadStrategy.sourceMapPathForModule`
- to return `null` and add error handling.
+- Allow the following API to return `null` and add error handling:
+ - `LoadStrategy.serverPathForModule`
+ - `LoadStrategy.sourceMapPathForModule`
+- Expression evaluation performance improvement:
+ - Batch `ChromeProxyService.evaluate()` requests that are close in time
+ and are executed in the same library and scope.
## 16.0.0
diff --git a/dwds/debug_extension/web/background.dart b/dwds/debug_extension/web/background.dart
index a7a25e5..350291c 100644
--- a/dwds/debug_extension/web/background.dart
+++ b/dwds/debug_extension/web/background.dart
@@ -20,7 +20,10 @@
import 'package:dwds/data/extension_request.dart';
import 'package:dwds/data/serializers.dart';
import 'package:dwds/src/sockets.dart';
-import 'package:dwds/src/utilities/batched_stream.dart';
+// NOTE(annagrin): using 'package:dwds/src/utilities/batched_stream.dart'
+// makes dart2js skip creating background.js, so we use a copy instead.
+// import 'package:dwds/src/utilities/batched_stream.dart';
+import 'package:dwds/src/web_utilities/batched_stream.dart';
import 'package:js/js.dart';
import 'package:js/js_util.dart' as js_util;
import 'package:pub_semver/pub_semver.dart';
diff --git a/dwds/lib/src/connections/app_connection.dart b/dwds/lib/src/connections/app_connection.dart
index 1dd7ed1..fc19384 100644
--- a/dwds/lib/src/connections/app_connection.dart
+++ b/dwds/lib/src/connections/app_connection.dart
@@ -15,14 +15,19 @@
/// The initial connection request sent from the application in the browser.
final ConnectRequest request;
final _startedCompleter = Completer<void>();
+ final _doneCompleter = Completer<void>();
final SocketConnection _connection;
- AppConnection(this.request, this._connection);
+ AppConnection(this.request, this._connection) {
+ unawaited(_connection.sink.done.then((v) => _doneCompleter.complete()));
+ }
bool get isInKeepAlivePeriod => _connection.isInKeepAlivePeriod;
void shutDown() => _connection.shutdown();
bool get isStarted => _startedCompleter.isCompleted;
Future<void> get onStart => _startedCompleter.future;
+ bool get isDone => _doneCompleter.isCompleted;
+ Future<void> get onDone => _doneCompleter.future;
void runMain() {
if (_startedCompleter.isCompleted) {
diff --git a/dwds/lib/src/injected/client.js b/dwds/lib/src/injected/client.js
index 0390ffc..66f32e9 100644
--- a/dwds/lib/src/injected/client.js
+++ b/dwds/lib/src/injected/client.js
@@ -1,4 +1,4 @@
-// Generated by dart2js (NullSafetyMode.sound, csp, deferred-serialization, intern-composite-values), the Dart to JavaScript compiler version: 2.19.0-edge.6682ac145d5b99fa05a034c7838b94c5d1143f78.
+// Generated by dart2js (NullSafetyMode.sound, csp, deferred-serialization, intern-composite-values), the Dart to JavaScript compiler version: 2.19.0-177.0.dev.
// The code supports the following hooks:
// dartPrint(message):
// if this function is defined it is called instead of the Dart [print]
diff --git a/dwds/lib/src/services/batched_expression_evaluator.dart b/dwds/lib/src/services/batched_expression_evaluator.dart
new file mode 100644
index 0000000..64338fd
--- /dev/null
+++ b/dwds/lib/src/services/batched_expression_evaluator.dart
@@ -0,0 +1,135 @@
+// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:collection/collection.dart';
+import 'package:dwds/src/utilities/domain.dart';
+import 'package:logging/logging.dart';
+import 'package:webkit_inspection_protocol/webkit_inspection_protocol.dart';
+
+import '../debugging/debugger.dart';
+import '../debugging/location.dart';
+import '../debugging/modules.dart';
+import '../utilities/batched_stream.dart';
+import 'expression_compiler.dart';
+import 'expression_evaluator.dart';
+
+class EvaluateRequest {
+ final String isolateId;
+ final String? libraryUri;
+ final String expression;
+ final Map<String, String>? scope;
+ final completer = Completer<RemoteObject>();
+
+ EvaluateRequest(this.isolateId, this.libraryUri, this.expression, this.scope);
+}
+
+class BatchedExpressionEvaluator extends ExpressionEvaluator {
+ final _logger = Logger('BatchedExpressionEvaluator');
+ final Debugger _debugger;
+ final _requestController =
+ BatchedStreamController<EvaluateRequest>(delay: 200);
+
+ BatchedExpressionEvaluator(
+ String entrypoint,
+ AppInspectorInterface inspector,
+ this._debugger,
+ Locations locations,
+ Modules modules,
+ ExpressionCompiler compiler,
+ ) : super(entrypoint, inspector, _debugger, locations, modules, compiler) {
+ _requestController.stream.listen(_processRequest);
+ }
+
+ @override
+ void close() {
+ _logger.fine('Closed');
+ _requestController.close();
+ }
+
+ @override
+ Future<RemoteObject> evaluateExpression(
+ String isolateId,
+ String? libraryUri,
+ String expression,
+ Map<String, String>? scope,
+ ) {
+ final request = EvaluateRequest(isolateId, libraryUri, expression, scope);
+ _requestController.sink.add(request);
+ return request.completer.future;
+ }
+
+ void _processRequest(List<EvaluateRequest> requests) async {
+ String? libraryUri;
+ String? isolateId;
+ Map<String, String>? scope;
+ List<EvaluateRequest> currentRequests = [];
+
+ for (var request in requests) {
+ libraryUri ??= request.libraryUri;
+ isolateId ??= request.isolateId;
+ scope ??= request.scope;
+
+ if (libraryUri != request.libraryUri ||
+ isolateId != request.isolateId ||
+ !MapEquality().equals(scope, request.scope)) {
+ _logger.fine('New batch due to');
+ if (libraryUri != request.libraryUri) {
+ _logger.fine(' - library uri: $libraryUri != ${request.libraryUri}');
+ }
+ if (isolateId != request.isolateId) {
+ _logger.fine(' - isolateId: $isolateId != ${request.isolateId}');
+ }
+ if (!MapEquality().equals(scope, request.scope)) {
+ _logger.fine(' - scope: $scope != ${request.scope}');
+ }
+
+ unawaited(_evaluateBatch(currentRequests));
+ currentRequests = [];
+ libraryUri = request.libraryUri;
+ isolateId = request.isolateId;
+ scope = request.scope;
+ }
+ currentRequests.add(request);
+ }
+ unawaited(_evaluateBatch(currentRequests));
+ }
+
+ Future<void> _evaluateBatch(List<EvaluateRequest> requests) async {
+ if (requests.isEmpty) return;
+
+ final first = requests.first;
+ if (requests.length == 1) {
+ if (first.completer.isCompleted) return;
+ return super
+ .evaluateExpression(
+ first.isolateId, first.libraryUri, first.expression, first.scope)
+ .then(requests.first.completer.complete);
+ }
+
+ final expressions = requests.map((r) => r.expression).join(', ');
+ final batchedExpression = '[ $expressions ]';
+
+ _logger.fine('Evaluating batch of expressions $batchedExpression');
+
+ final RemoteObject list = await super.evaluateExpression(
+ first.isolateId, first.libraryUri, batchedExpression, first.scope);
+
+ for (var i = 0; i < requests.length; i++) {
+ final request = requests[i];
+ if (request.completer.isCompleted) continue;
+ _logger.fine('Getting result out of a batch for ${request.expression}');
+ _debugger
+ .getProperties(list.objectId!,
+ offset: i, count: 1, length: requests.length)
+ .then((v) {
+ final result = v.first.value;
+ _logger.fine(
+ 'Got result out of a batch for ${request.expression}: $result');
+ request.completer.complete(result);
+ });
+ }
+ }
+}
diff --git a/dwds/lib/src/services/chrome_proxy_service.dart b/dwds/lib/src/services/chrome_proxy_service.dart
index 2d3a89e..daf4237 100644
--- a/dwds/lib/src/services/chrome_proxy_service.dart
+++ b/dwds/lib/src/services/chrome_proxy_service.dart
@@ -30,6 +30,7 @@
import '../utilities/sdk_configuration.dart';
import '../utilities/shared.dart';
import 'expression_evaluator.dart';
+import 'batched_expression_evaluator.dart';
/// A proxy from the chrome debug protocol to the dart vm service protocol.
class ChromeProxyService implements VmServiceInterface {
@@ -241,7 +242,7 @@
final compiler = _compiler;
_expressionEvaluator = compiler == null
? null
- : ExpressionEvaluator(
+ : BatchedExpressionEvaluator(
entrypoint,
inspector,
debugger,
@@ -259,6 +260,8 @@
_startedCompleter.complete();
}));
+ unawaited(appConnection.onDone.then((_) => destroyIsolate()));
+
final isolateRef = inspector.isolateRef;
final timestamp = DateTime.now().millisecondsSinceEpoch;
@@ -301,6 +304,7 @@
///
/// Clears out the [_inspector] and all related cached information.
void destroyIsolate() {
+ _logger.fine('Destroying isolate');
if (!_isIsolateRunning) return;
final isolate = inspector.isolate;
final isolateRef = inspector.isolateRef;
@@ -318,6 +322,7 @@
_inspector = null;
_previousBreakpoints.clear();
_previousBreakpoints.addAll(isolate.breakpoints ?? []);
+ _expressionEvaluator?.close();
_consoleSubscription?.cancel();
_consoleSubscription = null;
}
diff --git a/dwds/lib/src/services/expression_evaluator.dart b/dwds/lib/src/services/expression_evaluator.dart
index 6ae50ff..35aef1c 100644
--- a/dwds/lib/src/services/expression_evaluator.dart
+++ b/dwds/lib/src/services/expression_evaluator.dart
@@ -2,6 +2,8 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
+
import 'package:dwds/src/utilities/domain.dart';
import 'package:logging/logging.dart';
import 'package:webkit_inspection_protocol/webkit_inspection_protocol.dart';
@@ -61,6 +63,8 @@
<String, String>{'type': '$severity', 'value': message});
}
+ void close() {}
+
/// Evaluate dart expression inside a given library.
///
/// Uses ExpressionCompiler interface to compile the expression to
diff --git a/dwds/lib/src/web_utilities/batched_stream.dart b/dwds/lib/src/web_utilities/batched_stream.dart
new file mode 100644
index 0000000..6da6465
--- /dev/null
+++ b/dwds/lib/src/web_utilities/batched_stream.dart
@@ -0,0 +1,85 @@
+// Copyright (c) 2022, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'package:async/async.dart';
+
+/// Stream controller allowing to batch events.
+class BatchedStreamController<T> {
+ static const _defaultBatchDelayMilliseconds = 1000;
+ static const _checkDelayMilliseconds = 100;
+
+ final int _batchDelayMilliseconds;
+
+ final StreamController<T> _inputController;
+ late StreamQueue<T> _inputQueue;
+
+ final StreamController<List<T>> _outputController;
+ final Completer<bool> _completer = Completer<bool>();
+
+ /// Create batched stream controller.
+ ///
+ /// Collects events from input [sink] and emits them in batches to the
+ /// output [stream] every [delay] milliseconds. Keeps the original order.
+ BatchedStreamController({
+ int delay = _defaultBatchDelayMilliseconds,
+ }) : _batchDelayMilliseconds = delay,
+ _inputController = StreamController<T>(),
+ _outputController = StreamController<List<T>>() {
+ _inputQueue = StreamQueue<T>(_inputController.stream);
+ unawaited(_batchAndSendEvents());
+ }
+
+ /// Sink collecting events.
+ StreamSink<T> get sink => _inputController.sink;
+
+ /// Output stream of batch events.
+ Stream<List<T>> get stream => _outputController.stream;
+
+ /// Close the controller.
+ Future<dynamic> close() async {
+ unawaited(_inputController.close());
+ return _completer.future.then((value) => _outputController.close());
+ }
+
+ /// Send events to the output in a batch every [_batchDelayMilliseconds].
+ Future<void> _batchAndSendEvents() async {
+ const duration = Duration(milliseconds: _checkDelayMilliseconds);
+ final buffer = <T>[];
+
+ // Batch events every `_batchDelayMilliseconds`.
+ //
+ // Note that events might arrive at random intervals, so collecting
+ // a predetermined number of events to send in a batch might delay
+ // the batch indefinitely. Instead, check for new events every
+ // `_checkDelayMilliseconds` to make sure batches are sent in regular
+ // intervals.
+ var lastSendTime = DateTime.now().millisecondsSinceEpoch;
+ while (await _hasEventOrTimeOut(duration)) {
+ if (await _hasEventDuring(duration)) {
+ buffer.add(await _inputQueue.next);
+ }
+
+ final now = DateTime.now().millisecondsSinceEpoch;
+ if (now > lastSendTime + _batchDelayMilliseconds) {
+ lastSendTime = now;
+ if (buffer.isNotEmpty) {
+ _outputController.sink.add(List.from(buffer));
+ buffer.clear();
+ }
+ }
+ }
+
+ if (buffer.isNotEmpty) {
+ _outputController.sink.add(List.from(buffer));
+ }
+ _completer.complete(true);
+ }
+
+ Future<bool> _hasEventOrTimeOut(Duration duration) =>
+ _inputQueue.hasNext.timeout(duration, onTimeout: () => true);
+
+ Future<bool> _hasEventDuring(Duration duration) =>
+ _inputQueue.hasNext.timeout(duration, onTimeout: () => false);
+}
diff --git a/dwds/test/evaluate_common.dart b/dwds/test/evaluate_common.dart
index 3ca60cc..b03ceff 100644
--- a/dwds/test/evaluate_common.dart
+++ b/dwds/test/evaluate_common.dart
@@ -539,6 +539,25 @@
tearDown(() async {});
+ test('in parallel (in a batch)', () async {
+ final library = isolate.rootLib!;
+ final evaluation1 = setup.service
+ .evaluate(isolateId, library.id!, 'MainClass(0).toString()');
+ final evaluation2 = setup.service
+ .evaluate(isolateId, library.id!, 'MainClass(1).toString()');
+
+ final results = await Future.wait([evaluation1, evaluation2]);
+ expect(
+ results[0],
+ const TypeMatcher<InstanceRef>().having(
+ (instance) => instance.valueAsString, 'valueAsString', '0'));
+
+ expect(
+ results[1],
+ const TypeMatcher<InstanceRef>().having(
+ (instance) => instance.valueAsString, 'valueAsString', '1'));
+ });
+
test('with scope override', () async {
final library = isolate.rootLib!;
final object = await setup.service
diff --git a/dwds/test/expression_compiler_service_test.dart b/dwds/test/expression_compiler_service_test.dart
index ab873b8..dc4f267 100644
--- a/dwds/test/expression_compiler_service_test.dart
+++ b/dwds/test/expression_compiler_service_test.dart
@@ -165,6 +165,83 @@
await stop();
});
+
+ test('can evaluate multiple expressions', () async {
+ expect(output.stream, neverEmits(contains('[SEVERE]')));
+ expect(
+ output.stream,
+ emitsThrough(contains(
+ '[INFO] ExpressionCompilerService: Updating dependencies...')));
+ expect(
+ output.stream,
+ emitsThrough(contains(
+ '[INFO] ExpressionCompilerService: Updated dependencies.')));
+
+ expect(output.stream,
+ emitsThrough(contains('[INFO] ExpressionCompilerService: Stopped.')));
+ final result = await service
+ .updateDependencies({'try': ModuleInfo('try.full.dill', 'try.dill')});
+ expect(result, true, reason: 'failed to update dependencies');
+
+ final compilationResult1 = await service.compileExpressionToJs(
+ '0', 'org-dartlang-app:/try.dart', 2, 1, {}, {}, 'try', 'true');
+ final compilationResult2 = await service.compileExpressionToJs(
+ '0', 'org-dartlang-app:/try.dart', 2, 1, {}, {}, 'try', 'false');
+
+ expect(
+ compilationResult1,
+ isA<ExpressionCompilationResult>()
+ .having((r) => r.result, 'result', contains('return true;'))
+ .having((r) => r.isError, 'isError', false));
+
+ expect(
+ compilationResult2,
+ isA<ExpressionCompilationResult>()
+ .having((r) => r.result, 'result', contains('return false;'))
+ .having((r) => r.isError, 'isError', false));
+
+ await stop();
+ });
+
+ test('can compile multiple expressions in parallel', () async {
+ expect(output.stream, neverEmits(contains('[SEVERE]')));
+ expect(
+ output.stream,
+ emitsThrough(contains(
+ '[INFO] ExpressionCompilerService: Updating dependencies...')));
+ expect(
+ output.stream,
+ emitsThrough(contains(
+ '[INFO] ExpressionCompilerService: Updated dependencies.')));
+
+ expect(output.stream,
+ emitsThrough(contains('[INFO] ExpressionCompilerService: Stopped.')));
+ final result = await service
+ .updateDependencies({'try': ModuleInfo('try.full.dill', 'try.dill')});
+ expect(result, true, reason: 'failed to update dependencies');
+
+ final compilationResult1 = service.compileExpressionToJs(
+ '0', 'org-dartlang-app:/try.dart', 2, 1, {}, {}, 'try', 'true');
+ final compilationResult2 = service.compileExpressionToJs(
+ '0', 'org-dartlang-app:/try.dart', 2, 1, {}, {}, 'try', 'false');
+
+ final results =
+ await Future.wait([compilationResult1, compilationResult2]);
+
+ expect(
+ results[0],
+ isA<ExpressionCompilationResult>()
+ .having((r) => r.result, 'result', contains('return true;'))
+ .having((r) => r.isError, 'isError', false));
+
+ expect(
+ results[1],
+ isA<ExpressionCompilationResult>()
+ .having((r) => r.result, 'result', contains('return false;'))
+ .having((r) => r.isError, 'isError', false));
+
+ await stop();
+ });
});
}
diff --git a/dwds/web/client.dart b/dwds/web/client.dart
index ac7eb7f..8fc88ad 100644
--- a/dwds/web/client.dart
+++ b/dwds/web/client.dart
@@ -19,7 +19,10 @@
import 'package:dwds/data/run_request.dart';
import 'package:dwds/data/serializers.dart';
import 'package:dwds/src/sockets.dart';
-import 'package:dwds/src/utilities/batched_stream.dart';
+// NOTE(annagrin): using 'package:dwds/src/utilities/batched_stream.dart'
+// makes dart2js skip creating background.js, so we use a copy instead.
+// import 'package:dwds/src/utilities/batched_stream.dart';
+import 'package:dwds/src/web_utilities/batched_stream.dart';
import 'package:js/js.dart';
import 'package:sse/client/sse_client.dart';
import 'package:uuid/uuid.dart';
diff --git a/webdev/test/e2e_test.dart b/webdev/test/e2e_test.dart
index 29a4b6b..ecdb353 100644
--- a/webdev/test/e2e_test.dart
+++ b/webdev/test/e2e_test.dart
@@ -321,6 +321,7 @@
wsUri = getDebugServiceUri(message as String);
return wsUri != null;
}));
+ Logger.root.fine('vm service uri: $wsUri');
expect(wsUri, isNotNull);
vmService = await vmServiceConnectUri(wsUri);