blob: 903c14acc77bb12438be2a491bb6e628dea1f1d2 [file] [log] [blame]
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:collection';
import 'package:async/async.dart';
import 'package:pedantic/pedantic.dart';
import 'package:vm_service/src/vm_service.dart';
extension DdsExtension on VmService {
static bool _factoriesRegistered = false;
static Version? _ddsVersion;
/// The _getDartDevelopmentServiceVersion_ RPC is used to determine what version of
/// the Dart Development Service Protocol is served by a DDS instance.
///
/// The result of this call is cached for subsequent invocations.
Future<Version> getDartDevelopmentServiceVersion() async {
if (_ddsVersion == null) {
_ddsVersion =
await _callHelper<Version>('getDartDevelopmentServiceVersion');
}
return _ddsVersion!;
}
/// Retrieve the event history for `stream`.
///
/// If `stream` does not have event history collected, a parameter error is
/// returned.
Future<StreamHistory> getStreamHistory(String stream) async {
if (!(await _versionCheck(1, 2))) {
throw UnimplementedError('getStreamHistory requires DDS version 1.2');
}
return _callHelper<StreamHistory>('getStreamHistory', args: {
'stream': stream,
});
}
/// Returns the stream for a given stream id which includes historical
/// events.
///
/// If `stream` does not have event history collected, a parameter error is
/// sent over the returned [Stream].
Stream<Event> onEventWithHistory(String stream) {
late StreamController<Event> controller;
late StreamQueue<Event> streamEvents;
controller = StreamController<Event>(onListen: () async {
streamEvents = StreamQueue<Event>(onEvent(stream));
final history = (await getStreamHistory(stream)).history;
Event? firstStreamEvent;
unawaited(streamEvents.peek.then((e) {
firstStreamEvent = e;
}));
for (final event in history) {
if (firstStreamEvent != null &&
event.timestamp! > firstStreamEvent!.timestamp!) {
break;
}
controller.sink.add(event);
}
unawaited(controller.sink.addStream(streamEvents.rest));
}, onCancel: () {
streamEvents.cancel();
});
return controller.stream;
}
/// Returns a new [Stream<Event>] of `Logging` events which outputs
/// historical events before streaming real-time events.
///
/// Note: unlike [onLoggingEvent], the returned stream is a single
/// subscription stream and a new stream is created for each invocation of
/// this getter.
Stream<Event> get onLoggingEventWithHistory => onEventWithHistory('Logging');
/// Returns a new [Stream<Event>] of `Stdout` events which outputs
/// historical events before streaming real-time events.
///
/// Note: unlike [onStdoutEvent], the returned stream is a single
/// subscription stream and a new stream is created for each invocation of
/// this getter.
Stream<Event> get onStdoutEventWithHistory => onEventWithHistory('Stdout');
/// Returns a new [Stream<Event>] of `Stderr` events which outputs
/// historical events before streaming real-time events.
///
/// Note: unlike [onStderrEvent], the returned stream is a single
/// subscription stream and a new stream is created for each invocation of
/// this getter.
Stream<Event> get onStderrEventWithHistory => onEventWithHistory('Stderr');
/// Returns a new [Stream<Event>] of `Extension` events which outputs
/// historical events before streaming real-time events.
///
/// Note: unlike [onExtensionEvent], the returned stream is a single
/// subscription stream and a new stream is created for each invocation of
/// this getter.
Stream<Event> get onExtensionEventWithHistory =>
onEventWithHistory('Extension');
Future<bool> _versionCheck(int major, int minor) async {
if (_ddsVersion == null) {
_ddsVersion = await getDartDevelopmentServiceVersion();
}
return ((_ddsVersion!.major == major && _ddsVersion!.minor! >= minor) ||
(_ddsVersion!.major! > major));
}
Future<T> _callHelper<T>(String method,
{String? isolateId, Map args = const {}}) {
if (!_factoriesRegistered) {
_registerFactories();
}
return callMethod(
method,
args: {
if (isolateId != null) 'isolateId': isolateId,
...args,
},
).then((e) => e as T);
}
static void _registerFactories() {
addTypeFactory('StreamHistory', StreamHistory.parse);
_factoriesRegistered = true;
}
}
/// A collection of historical [Event]s from some stream.
class StreamHistory extends Response {
static StreamHistory? parse(Map<String, dynamic>? json) =>
json == null ? null : StreamHistory._fromJson(json);
StreamHistory({required List<Event> history}) : _history = history;
StreamHistory._fromJson(Map<String, dynamic> json)
: _history = json['history']
.map(
(e) => Event.parse(e),
)
.toList()
.cast<Event>() {
this.json = json;
}
@override
String get type => 'StreamHistory';
/// Historical [Event]s for a stream.
List<Event> get history => UnmodifiableListView(_history);
final List<Event> _history;
}