blob: e9da09d55c73a579eeba454f38cf502c0e40595d [file] [log] [blame]
// Copyright (c) 2026, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:dart_runtime_service/src/dart_runtime_service_options.dart';
import 'package:dart_runtime_service/src/dart_runtime_service_rpcs.dart';
import 'package:dart_runtime_service/src/event_streams.dart';
import 'package:test/test.dart';
import 'package:vm_service/vm_service.dart';
import 'package:vm_service/vm_service_io.dart';
import 'utils/matchers.dart';
import 'utils/utilities.dart';
final class HelloWorldEvent extends StreamEvent {
HelloWorldEvent() : super(streamId: kStreamId, kind: kKind);
static const kStreamId = 'CustomStream';
static const kKind = 'hello_world';
@override
Map<String, Object?> toJson() {
return {
StreamEvent.kStreamId: streamId,
StreamEvent.kEvent: Event(kind: kind, timestamp: timestamp).toJson(),
};
}
}
void main() {
group('$DartRuntimeServiceRpcs:', () {
test('streamListen + streamCancel', () async {
final service = await createDartRuntimeServiceForTest(
config: const DartRuntimeServiceOptions(enableLogging: true),
);
final client = await vmServiceConnectUri(service.uri.toString());
final completer = Completer<void>();
// Register a listener for events on kStreamId.
client.onEvent(HelloWorldEvent.kStreamId).listen((event) {
expect(event.kind, HelloWorldEvent.kKind);
completer.complete();
});
// Verify the stream has been subscribed to.
await client.streamListen(HelloWorldEvent.kStreamId);
expect(
service.eventStreamManager.streamListeners[HelloWorldEvent.kStreamId],
isNotEmpty,
);
// Post an event to the stream and wait for the client to receive it.
HelloWorldEvent().send(eventStreamMethods: service.eventStreamManager);
await completer.future;
// Verify the stream has no listeners after the client cancels its
// subscription.
await client.streamCancel(HelloWorldEvent.kStreamId);
expect(
service.eventStreamManager.streamListeners[HelloWorldEvent.kStreamId],
isEmpty,
);
});
test('streamListen already subscribed', () async {
final service = await createDartRuntimeServiceForTest(
config: const DartRuntimeServiceOptions(enableLogging: true),
);
final client = await vmServiceConnectUri(service.uri.toString());
// Verify the stream has been subscribed to.
await client.streamListen(HelloWorldEvent.kStreamId);
expect(
service.eventStreamManager.streamListeners[HelloWorldEvent.kStreamId],
isNotEmpty,
);
// Listening to a stream that's already subscribed to results in an RPC
// error being returned by the service.
expect(
() async => await client.streamListen(HelloWorldEvent.kStreamId),
throwsStreamAlreadySubscribedRPCError,
);
});
test('streamCancel stream with no subscription', () async {
final service = await createDartRuntimeServiceForTest(
config: const DartRuntimeServiceOptions(enableLogging: true),
);
final client = await vmServiceConnectUri(service.uri.toString());
// Cancelling a stream that's not subscribed to results in an RPC error
// being returned by the service.
expect(
() async => await client.streamCancel(HelloWorldEvent.kStreamId),
throwsStreamNotSubscribedRPCError,
);
});
});
}