| // Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| import 'dart:async'; |
| import 'dart:collection'; |
| |
| import 'package:async/async.dart'; |
| import 'package:pedantic/pedantic.dart'; |
| import 'package:vm_service/src/vm_service.dart'; |
| |
| extension DdsExtension on VmService { |
| static bool _factoriesRegistered = false; |
| static Version? _ddsVersion; |
| |
| /// The _getDartDevelopmentServiceVersion_ RPC is used to determine what version of |
| /// the Dart Development Service Protocol is served by a DDS instance. |
| /// |
| /// The result of this call is cached for subsequent invocations. |
| Future<Version> getDartDevelopmentServiceVersion() async { |
| if (_ddsVersion == null) { |
| _ddsVersion = |
| await _callHelper<Version>('getDartDevelopmentServiceVersion'); |
| } |
| return _ddsVersion!; |
| } |
| |
| /// Retrieve the event history for `stream`. |
| /// |
| /// If `stream` does not have event history collected, a parameter error is |
| /// returned. |
| Future<StreamHistory> getStreamHistory(String stream) async { |
| if (!(await _versionCheck(1, 2))) { |
| throw UnimplementedError('getStreamHistory requires DDS version 1.2'); |
| } |
| return _callHelper<StreamHistory>('getStreamHistory', args: { |
| 'stream': stream, |
| }); |
| } |
| |
| /// Returns the stream for a given stream id which includes historical |
| /// events. |
| /// |
| /// If `stream` does not have event history collected, a parameter error is |
| /// sent over the returned [Stream]. |
| Stream<Event> onEventWithHistory(String stream) { |
| late StreamController<Event> controller; |
| late StreamQueue<Event> streamEvents; |
| |
| controller = StreamController<Event>(onListen: () async { |
| streamEvents = StreamQueue<Event>(onEvent(stream)); |
| final history = (await getStreamHistory(stream)).history; |
| Event? firstStreamEvent; |
| unawaited(streamEvents.peek.then((e) { |
| firstStreamEvent = e; |
| })); |
| for (final event in history) { |
| if (firstStreamEvent != null && |
| event.timestamp! > firstStreamEvent!.timestamp!) { |
| break; |
| } |
| controller.sink.add(event); |
| } |
| unawaited(controller.sink.addStream(streamEvents.rest)); |
| }, onCancel: () { |
| streamEvents.cancel(); |
| }); |
| |
| return controller.stream; |
| } |
| |
| /// Returns a new [Stream<Event>] of `Logging` events which outputs |
| /// historical events before streaming real-time events. |
| /// |
| /// Note: unlike [onLoggingEvent], the returned stream is a single |
| /// subscription stream and a new stream is created for each invocation of |
| /// this getter. |
| Stream<Event> get onLoggingEventWithHistory => onEventWithHistory('Logging'); |
| |
| /// Returns a new [Stream<Event>] of `Stdout` events which outputs |
| /// historical events before streaming real-time events. |
| /// |
| /// Note: unlike [onStdoutEvent], the returned stream is a single |
| /// subscription stream and a new stream is created for each invocation of |
| /// this getter. |
| Stream<Event> get onStdoutEventWithHistory => onEventWithHistory('Stdout'); |
| |
| /// Returns a new [Stream<Event>] of `Stderr` events which outputs |
| /// historical events before streaming real-time events. |
| /// |
| /// Note: unlike [onStderrEvent], the returned stream is a single |
| /// subscription stream and a new stream is created for each invocation of |
| /// this getter. |
| Stream<Event> get onStderrEventWithHistory => onEventWithHistory('Stderr'); |
| |
| /// Returns a new [Stream<Event>] of `Extension` events which outputs |
| /// historical events before streaming real-time events. |
| /// |
| /// Note: unlike [onExtensionEvent], the returned stream is a single |
| /// subscription stream and a new stream is created for each invocation of |
| /// this getter. |
| Stream<Event> get onExtensionEventWithHistory => |
| onEventWithHistory('Extension'); |
| |
| Future<bool> _versionCheck(int major, int minor) async { |
| if (_ddsVersion == null) { |
| _ddsVersion = await getDartDevelopmentServiceVersion(); |
| } |
| return ((_ddsVersion!.major == major && _ddsVersion!.minor! >= minor) || |
| (_ddsVersion!.major! > major)); |
| } |
| |
| Future<T> _callHelper<T>(String method, |
| {String? isolateId, Map args = const {}}) { |
| if (!_factoriesRegistered) { |
| _registerFactories(); |
| } |
| return callMethod( |
| method, |
| args: { |
| if (isolateId != null) 'isolateId': isolateId, |
| ...args, |
| }, |
| ).then((e) => e as T); |
| } |
| |
| static void _registerFactories() { |
| addTypeFactory('StreamHistory', StreamHistory.parse); |
| _factoriesRegistered = true; |
| } |
| } |
| |
| /// A collection of historical [Event]s from some stream. |
| class StreamHistory extends Response { |
| static StreamHistory? parse(Map<String, dynamic>? json) => |
| json == null ? null : StreamHistory._fromJson(json); |
| |
| StreamHistory({required List<Event> history}) : _history = history; |
| |
| StreamHistory._fromJson(Map<String, dynamic> json) |
| : _history = json['history'] |
| .map( |
| (e) => Event.parse(e), |
| ) |
| .toList() |
| .cast<Event>() { |
| this.json = json; |
| } |
| |
| @override |
| String get type => 'StreamHistory'; |
| |
| /// Historical [Event]s for a stream. |
| List<Event> get history => UnmodifiableListView(_history); |
| final List<Event> _history; |
| } |