[ DDS ] Add extension methods and getters for streams including event history

Also fixes issue where `StreamHistory` events were not being parsed
correctly.

Fixes https://github.com/dart-lang/sdk/issues/44505

Change-Id: I36c3bccf229a02ea78168041a112a30046773d59
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/177720
Reviewed-by: Kenzie Schmoll <kenzieschmoll@google.com>
Commit-Queue: Ben Konyi <bkonyi@google.com>
diff --git a/pkg/dds/CHANGELOG.md b/pkg/dds/CHANGELOG.md
index aebaf89..c657089 100644
--- a/pkg/dds/CHANGELOG.md
+++ b/pkg/dds/CHANGELOG.md
@@ -1,6 +1,9 @@
 # 1.7.0
 - Added `package:dds/vm_service_extensions.dart`, which adds DDS functionality to
   `package:vm_service` when imported.
+  - Added `onEventWithHistory` method and `onLoggingEventWithHistory`, 
+    `onStdoutEventWithHistory`, `onStderrEventWithHistory`, and 
+    `onExtensionEventWithHistory` getters.
 - Added `getStreamHistory` RPC.
 
 # 1.6.1
diff --git a/pkg/dds/lib/src/stream_manager.dart b/pkg/dds/lib/src/stream_manager.dart
index e7b0f22..28c7f28 100644
--- a/pkg/dds/lib/src/stream_manager.dart
+++ b/pkg/dds/lib/src/stream_manager.dart
@@ -181,7 +181,7 @@
       return null;
     }
     return [
-      for (final event in loggingRepositories[stream]()) event,
+      for (final event in loggingRepositories[stream]()) event['event'],
     ];
   }
 
diff --git a/pkg/dds/lib/vm_service_extensions.dart b/pkg/dds/lib/vm_service_extensions.dart
index 069c737..5b540da 100644
--- a/pkg/dds/lib/vm_service_extensions.dart
+++ b/pkg/dds/lib/vm_service_extensions.dart
@@ -2,9 +2,12 @@
 // for details. All rights reserved. Use of this source code is governed by a
 // BSD-style license that can be found in the LICENSE file.
 
+import 'dart:async';
 import 'dart:collection';
 
+import 'package:async/async.dart';
 import 'package:meta/meta.dart';
+import 'package:pedantic/pedantic.dart';
 import 'package:vm_service/src/vm_service.dart';
 
 extension DdsExtension on VmService {
@@ -36,6 +39,70 @@
     });
   }
 
+  /// Returns the stream for a given stream id which includes historical
+  /// events.
+  ///
+  /// If `stream` does not have event history collected, a parameter error is
+  /// sent over the returned [Stream].
+  Stream<Event> onEventWithHistory(String stream) {
+    StreamController<Event> controller;
+    StreamQueue<Event> streamEvents;
+
+    controller = StreamController<Event>(onListen: () async {
+      streamEvents = StreamQueue<Event>(onEvent(stream));
+      final history = (await getStreamHistory(stream)).history;
+      Event firstStreamEvent;
+      unawaited(streamEvents.peek.then((e) {
+        firstStreamEvent = e;
+      }));
+      for (final event in history) {
+        if (firstStreamEvent != null &&
+            event.timestamp > firstStreamEvent.timestamp) {
+          break;
+        }
+        controller.sink.add(event);
+      }
+      unawaited(controller.sink.addStream(streamEvents.rest));
+    }, onCancel: () {
+      streamEvents.cancel();
+    });
+
+    return controller.stream;
+  }
+
+  /// Returns a new [Stream<Event>] of `Logging` events which outputs
+  /// historical events before streaming real-time events.
+  ///
+  /// Note: unlike [onLoggingEvent], the returned stream is a single
+  /// subscription stream and a new stream is created for each invocation of
+  /// this getter.
+  Stream<Event> get onLoggingEventWithHistory => onEventWithHistory('Logging');
+
+  /// Returns a new [Stream<Event>] of `Stdout` events which outputs
+  /// historical events before streaming real-time events.
+  ///
+  /// Note: unlike [onStdoutEvent], the returned stream is a single
+  /// subscription stream and a new stream is created for each invocation of
+  /// this getter.
+  Stream<Event> get onStdoutEventWithHistory => onEventWithHistory('Stdout');
+
+  /// Returns a new [Stream<Event>] of `Stderr` events which outputs
+  /// historical events before streaming real-time events.
+  ///
+  /// Note: unlike [onStderrEvent], the returned stream is a single
+  /// subscription stream and a new stream is created for each invocation of
+  /// this getter.
+  Stream<Event> get onStderrEventWithHistory => onEventWithHistory('Stderr');
+
+  /// Returns a new [Stream<Event>] of `Extension` events which outputs
+  /// historical events before streaming real-time events.
+  ///
+  /// Note: unlike [onExtensionEvent], the returned stream is a single
+  /// subscription stream and a new stream is created for each invocation of
+  /// this getter.
+  Stream<Event> get onExtensionEventWithHistory =>
+      onEventWithHistory('Extension');
+
   Future<bool> _versionCheck(int major, int minor) async {
     if (_ddsVersion == null) {
       _ddsVersion = await getDartDevelopmentServiceVersion();
@@ -72,10 +139,14 @@
   StreamHistory({@required List<Event> history}) : _history = history;
 
   StreamHistory._fromJson(Map<String, dynamic> json)
-      : _history = List<Event>.from(
-            createServiceObject(json['history'], const ['Event']) as List ??
-                []) {
+      : _history = json['history']
+            .map(
+              (e) => Event.parse(e),
+            )
+            .toList()
+            .cast<Event>() {
     type = json['type'];
+    this.json = json;
   }
 
   /// Historical [Event]s for a stream.
diff --git a/pkg/dds/test/on_event_with_history_script.dart b/pkg/dds/test/on_event_with_history_script.dart
new file mode 100644
index 0000000..3f27df0
--- /dev/null
+++ b/pkg/dds/test/on_event_with_history_script.dart
@@ -0,0 +1,12 @@
+import 'dart:developer';
+
+void main() {
+  for (int i = 1; i <= 10; ++i) {
+    log(i.toString());
+  }
+  debugger();
+  for (int i = 11; i <= 20; ++i) {
+    log(i.toString());
+  }
+  debugger();
+}
diff --git a/pkg/dds/test/on_event_with_history_test.dart b/pkg/dds/test/on_event_with_history_test.dart
new file mode 100644
index 0000000..55e04b9
--- /dev/null
+++ b/pkg/dds/test/on_event_with_history_test.dart
@@ -0,0 +1,59 @@
+// Copyright (c) 2021, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:io';
+
+import 'package:dds/dds.dart';
+import 'package:dds/vm_service_extensions.dart';
+import 'package:test/test.dart';
+import 'package:vm_service/vm_service_io.dart';
+import 'common/test_helper.dart';
+
+void main() {
+  Process process;
+  DartDevelopmentService dds;
+
+  setUp(() async {
+    process = await spawnDartProcess('on_event_with_history_script.dart',
+        pauseOnStart: false);
+  });
+
+  tearDown(() async {
+    await dds?.shutdown();
+    process?.kill();
+    dds = null;
+    process = null;
+  });
+
+  test('onEventWithHistory returns stream including log history', () async {
+    dds = await DartDevelopmentService.startDartDevelopmentService(
+      remoteVmServiceUri,
+    );
+    expect(dds.isRunning, true);
+    final service = await vmServiceConnectUri(dds.wsUri.toString());
+
+    await service.streamListen('Logging');
+    final stream = service.onLoggingEventWithHistory;
+
+    var completer = Completer<void>();
+    int count = 0;
+    stream.listen((event) {
+      count++;
+      expect(event.logRecord.message.valueAsString, count.toString());
+      if (count % 10 == 0) {
+        completer.complete();
+      }
+    });
+
+    await completer.future;
+
+    completer = Completer<void>();
+    final isolateId = (await service.getVM()).isolates.first.id;
+    await service.resume(isolateId);
+
+    await completer.future;
+    expect(count, 20);
+  });
+}