[ DDS ] Add extension methods and getters for streams including event history
Also fixes issue where `StreamHistory` events were not being parsed
correctly.
Fixes https://github.com/dart-lang/sdk/issues/44505
Change-Id: I36c3bccf229a02ea78168041a112a30046773d59
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/177720
Reviewed-by: Kenzie Schmoll <kenzieschmoll@google.com>
Commit-Queue: Ben Konyi <bkonyi@google.com>
diff --git a/pkg/dds/CHANGELOG.md b/pkg/dds/CHANGELOG.md
index aebaf89..c657089 100644
--- a/pkg/dds/CHANGELOG.md
+++ b/pkg/dds/CHANGELOG.md
@@ -1,6 +1,9 @@
# 1.7.0
- Added `package:dds/vm_service_extensions.dart`, which adds DDS functionality to
`package:vm_service` when imported.
+ - Added `onEventWithHistory` method and `onLoggingEventWithHistory`,
+ `onStdoutEventWithHistory`, `onStderrEventWithHistory`, and
+ `onExtensionEventWithHistory` getters.
- Added `getStreamHistory` RPC.
# 1.6.1
diff --git a/pkg/dds/lib/src/stream_manager.dart b/pkg/dds/lib/src/stream_manager.dart
index e7b0f22..28c7f28 100644
--- a/pkg/dds/lib/src/stream_manager.dart
+++ b/pkg/dds/lib/src/stream_manager.dart
@@ -181,7 +181,7 @@
return null;
}
return [
- for (final event in loggingRepositories[stream]()) event,
+ for (final event in loggingRepositories[stream]()) event['event'],
];
}
diff --git a/pkg/dds/lib/vm_service_extensions.dart b/pkg/dds/lib/vm_service_extensions.dart
index 069c737..5b540da 100644
--- a/pkg/dds/lib/vm_service_extensions.dart
+++ b/pkg/dds/lib/vm_service_extensions.dart
@@ -2,9 +2,12 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
import 'dart:collection';
+import 'package:async/async.dart';
import 'package:meta/meta.dart';
+import 'package:pedantic/pedantic.dart';
import 'package:vm_service/src/vm_service.dart';
extension DdsExtension on VmService {
@@ -36,6 +39,70 @@
});
}
+ /// Returns the stream for a given stream id which includes historical
+ /// events.
+ ///
+ /// If `stream` does not have event history collected, a parameter error is
+ /// sent over the returned [Stream].
+ Stream<Event> onEventWithHistory(String stream) {
+ StreamController<Event> controller;
+ StreamQueue<Event> streamEvents;
+
+ controller = StreamController<Event>(onListen: () async {
+ streamEvents = StreamQueue<Event>(onEvent(stream));
+ final history = (await getStreamHistory(stream)).history;
+ Event firstStreamEvent;
+ unawaited(streamEvents.peek.then((e) {
+ firstStreamEvent = e;
+ }));
+ for (final event in history) {
+ if (firstStreamEvent != null &&
+ event.timestamp > firstStreamEvent.timestamp) {
+ break;
+ }
+ controller.sink.add(event);
+ }
+ unawaited(controller.sink.addStream(streamEvents.rest));
+ }, onCancel: () {
+ streamEvents.cancel();
+ });
+
+ return controller.stream;
+ }
+
+ /// Returns a new [Stream<Event>] of `Logging` events which outputs
+ /// historical events before streaming real-time events.
+ ///
+ /// Note: unlike [onLoggingEvent], the returned stream is a single
+ /// subscription stream and a new stream is created for each invocation of
+ /// this getter.
+ Stream<Event> get onLoggingEventWithHistory => onEventWithHistory('Logging');
+
+ /// Returns a new [Stream<Event>] of `Stdout` events which outputs
+ /// historical events before streaming real-time events.
+ ///
+ /// Note: unlike [onStdoutEvent], the returned stream is a single
+ /// subscription stream and a new stream is created for each invocation of
+ /// this getter.
+ Stream<Event> get onStdoutEventWithHistory => onEventWithHistory('Stdout');
+
+ /// Returns a new [Stream<Event>] of `Stderr` events which outputs
+ /// historical events before streaming real-time events.
+ ///
+ /// Note: unlike [onStderrEvent], the returned stream is a single
+ /// subscription stream and a new stream is created for each invocation of
+ /// this getter.
+ Stream<Event> get onStderrEventWithHistory => onEventWithHistory('Stderr');
+
+ /// Returns a new [Stream<Event>] of `Extension` events which outputs
+ /// historical events before streaming real-time events.
+ ///
+ /// Note: unlike [onExtensionEvent], the returned stream is a single
+ /// subscription stream and a new stream is created for each invocation of
+ /// this getter.
+ Stream<Event> get onExtensionEventWithHistory =>
+ onEventWithHistory('Extension');
+
Future<bool> _versionCheck(int major, int minor) async {
if (_ddsVersion == null) {
_ddsVersion = await getDartDevelopmentServiceVersion();
@@ -72,10 +139,14 @@
StreamHistory({@required List<Event> history}) : _history = history;
StreamHistory._fromJson(Map<String, dynamic> json)
- : _history = List<Event>.from(
- createServiceObject(json['history'], const ['Event']) as List ??
- []) {
+ : _history = json['history']
+ .map(
+ (e) => Event.parse(e),
+ )
+ .toList()
+ .cast<Event>() {
type = json['type'];
+ this.json = json;
}
/// Historical [Event]s for a stream.
diff --git a/pkg/dds/test/on_event_with_history_script.dart b/pkg/dds/test/on_event_with_history_script.dart
new file mode 100644
index 0000000..3f27df0
--- /dev/null
+++ b/pkg/dds/test/on_event_with_history_script.dart
@@ -0,0 +1,12 @@
+import 'dart:developer';
+
+void main() {
+ for (int i = 1; i <= 10; ++i) {
+ log(i.toString());
+ }
+ debugger();
+ for (int i = 11; i <= 20; ++i) {
+ log(i.toString());
+ }
+ debugger();
+}
diff --git a/pkg/dds/test/on_event_with_history_test.dart b/pkg/dds/test/on_event_with_history_test.dart
new file mode 100644
index 0000000..55e04b9
--- /dev/null
+++ b/pkg/dds/test/on_event_with_history_test.dart
@@ -0,0 +1,59 @@
+// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:io';
+
+import 'package:dds/dds.dart';
+import 'package:dds/vm_service_extensions.dart';
+import 'package:test/test.dart';
+import 'package:vm_service/vm_service_io.dart';
+import 'common/test_helper.dart';
+
+void main() {
+ Process process;
+ DartDevelopmentService dds;
+
+ setUp(() async {
+ process = await spawnDartProcess('on_event_with_history_script.dart',
+ pauseOnStart: false);
+ });
+
+ tearDown(() async {
+ await dds?.shutdown();
+ process?.kill();
+ dds = null;
+ process = null;
+ });
+
+ test('onEventWithHistory returns stream including log history', () async {
+ dds = await DartDevelopmentService.startDartDevelopmentService(
+ remoteVmServiceUri,
+ );
+ expect(dds.isRunning, true);
+ final service = await vmServiceConnectUri(dds.wsUri.toString());
+
+ await service.streamListen('Logging');
+ final stream = service.onLoggingEventWithHistory;
+
+ var completer = Completer<void>();
+ int count = 0;
+ stream.listen((event) {
+ count++;
+ expect(event.logRecord.message.valueAsString, count.toString());
+ if (count % 10 == 0) {
+ completer.complete();
+ }
+ });
+
+ await completer.future;
+
+ completer = Completer<void>();
+ final isolateId = (await service.getVM()).isolates.first.id;
+ await service.resume(isolateId);
+
+ await completer.future;
+ expect(count, 20);
+ });
+}