blob: 97b0aed9f48838d4f370a48703f11e7529877fb5 [file] [log] [blame]
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart = 2.10
@TestOn('vm')
import 'dart:async';
import 'dart:convert';
import 'package:async/async.dart';
import 'package:mockito/mockito.dart';
import 'package:pedantic/pedantic.dart';
import 'package:test/test.dart';
import 'package:vm_service/vm_service.dart';
void main() {
MockVmService serviceMock;
StreamController<Map<String, Object>> requestsController;
StreamController<Map<String, Object>> responsesController;
ServiceExtensionRegistry serviceRegistry;
setUp(() {
serviceMock = MockVmService();
requestsController = StreamController<Map<String, Object>>();
responsesController = StreamController<Map<String, Object>>();
serviceRegistry = ServiceExtensionRegistry();
VmServerConnection(requestsController.stream, responsesController.sink,
serviceRegistry, serviceMock);
});
tearDown(() {
requestsController.close();
responsesController.close();
});
group('method delegation', () {
test('works for simple methods', () {
var request = rpcRequest("getVersion");
var version = Version(major: 1, minor: 0);
when(serviceMock.getVersion()).thenAnswer((_) => Future.value(version));
expect(responsesController.stream, emits(rpcResponse(version)));
requestsController.add(request);
});
test('works for methods with parameters', () {
var isolate = Isolate(
name: 'isolate',
exceptionPauseMode: ExceptionPauseMode.kNone,
id: '123',
number: '0',
startTime: 1,
runnable: true,
livePorts: 2,
isolateFlags: [],
pauseOnExit: false,
pauseEvent: Event(
kind: EventKind.kResume,
timestamp: 3,
),
libraries: [],
breakpoints: [],
isSystemIsolate: false,
);
var request = rpcRequest("getIsolate", params: {'isolateId': isolate.id});
when(serviceMock.getIsolate(isolate.id))
.thenAnswer((Invocation invocation) {
expect(invocation.positionalArguments, equals([isolate.id]));
return Future.value(isolate);
});
expect(responsesController.stream, emits(rpcResponse(isolate)));
requestsController.add(request);
});
test('works for methods with list parameters', () {
var isolate = Isolate(
name: 'isolate',
exceptionPauseMode: ExceptionPauseMode.kNone,
id: '123',
number: '0',
startTime: 1,
runnable: true,
livePorts: 2,
isolateFlags: [],
pauseOnExit: false,
pauseEvent: Event(
kind: EventKind.kResume,
timestamp: 3,
),
libraries: [],
breakpoints: [],
isSystemIsolate: false,
);
var request = rpcRequest("setVMTimelineFlags", params: {
'isolateId': isolate.id,
// Note: the dynamic list below is intentional in order to exercise the
// code under test.
'recordedStreams': <dynamic>['GC', 'Dart', 'Embedder'],
});
var response = Success();
when(serviceMock.getIsolate(isolate.id))
.thenAnswer((Invocation invocation) {
expect(invocation.namedArguments,
equals({Symbol('isolateId'): null, Symbol('args'): null}));
return Future.value(isolate);
});
expect(responsesController.stream, emits(rpcResponse(response)));
requestsController.add(request);
});
group('custom service extensions', () {
test('with no params or isolateId', () {
var extension = 'ext.cool';
var request = rpcRequest(extension, params: null);
var response = Response()..json = {"hello": "world"};
when(serviceMock.callServiceExtension(
extension,
isolateId: argThat(isNull, named: 'isolateId'),
args: argThat(isNull, named: 'args'),
)).thenAnswer((Invocation invocation) {
expect(invocation.namedArguments,
equals({Symbol('isolateId'): null, Symbol('args'): null}));
return Future.value(response);
});
expect(responsesController.stream, emits(rpcResponse(response)));
requestsController.add(request);
});
test('with isolateId and no other params', () {
var extension = 'ext.cool';
var request = rpcRequest(extension, params: {'isolateId': '1'});
var response = Response()..json = {"hello": "world"};
when(serviceMock.callServiceExtension(
extension,
isolateId: argThat(equals('1'), named: 'isolateId'),
args: argThat(equals({}), named: 'args'),
)).thenAnswer((Invocation invocation) {
expect(invocation.namedArguments,
equals({Symbol('isolateId'): '1', Symbol('args'): {}}));
return Future.value(response);
});
expect(responsesController.stream, emits(rpcResponse(response)));
requestsController.add(request);
});
test('with params and no isolateId', () {
var extension = 'ext.cool';
var params = {'cool': 'option'};
var request = rpcRequest(extension, params: params);
var response = Response()..json = {"hello": "world"};
when(serviceMock.callServiceExtension(
extension,
isolateId: argThat(isNull, named: 'isolateId'),
args: argThat(equals(params), named: 'args'),
)).thenAnswer((Invocation invocation) {
expect(invocation.namedArguments,
equals({Symbol('isolateId'): null, Symbol('args'): params}));
return Future.value(response);
});
expect(responsesController.stream, emits(rpcResponse(response)));
requestsController.add(request);
});
test('with params and isolateId', () {
var extension = 'ext.cool';
var params = {'cool': 'option'};
var request =
rpcRequest(extension, params: Map.of(params)..['isolateId'] = '1');
var response = Response()..json = {"hello": "world"};
when(serviceMock.callServiceExtension(
extension,
isolateId: argThat(equals("1"), named: 'isolateId'),
args: argThat(equals(params), named: 'args'),
)).thenAnswer((Invocation invocation) {
expect(invocation.namedArguments,
equals({Symbol('isolateId'): '1', Symbol('args'): params}));
return Future.value(response);
});
expect(responsesController.stream, emits(rpcResponse(response)));
requestsController.add(request);
});
});
});
group('error handling', () {
test('special cases RPCError instances', () {
var request = rpcRequest("getVersion");
var error =
RPCError('getVersion', 1234, 'custom message', {'custom': 'data'});
when(serviceMock.getVersion()).thenAnswer((_) => Future.error(error));
expect(responsesController.stream, emits(rpcErrorResponse(error)));
requestsController.add(request);
});
test('has a fallback for generic exceptions', () {
var request = rpcRequest("getVersion");
var error = UnimplementedError();
when(serviceMock.getVersion()).thenAnswer((_) => Future.error(error));
expect(
responsesController.stream.map((response) => '$response'),
emits(startsWith(
'{jsonrpc: 2.0, id: 1, error: {code: -32603, message: getVersion: UnimplementedError')));
requestsController.add(request);
});
});
group('streams', () {
test('can be listened to and canceled', () async {
var streamId = 'Isolate';
var responseQueue = StreamQueue(responsesController.stream);
StreamController<Event> eventController;
{
var request =
rpcRequest('streamListen', params: {'streamId': streamId});
var response = Success();
when(serviceMock.streamListen(streamId))
.thenAnswer((_) => Future.value(response));
requestsController.add(request);
await expectLater(responseQueue, emitsThrough(rpcResponse(response)));
eventController = serviceMock.streamControllers[streamId];
var events = [
Event(
kind: EventKind.kIsolateStart,
timestamp: 0,
),
Event(
kind: EventKind.kIsolateExit,
timestamp: 1,
)
];
events.forEach(eventController.add);
await expectLater(
responseQueue,
emitsInOrder(
events.map((event) => streamNotifyResponse(streamId, event))));
}
{
var request =
rpcRequest('streamCancel', params: {'streamId': streamId});
var response = Success();
when(serviceMock.streamListen(streamId))
.thenAnswer((_) => Future.value(response));
requestsController.add(request);
await expectLater(responseQueue, emitsThrough(rpcResponse(response)));
var nextEvent = Event(
kind: EventKind.kIsolateReload,
timestamp: 2,
);
eventController.add(nextEvent);
expect(responseQueue,
neverEmits(streamNotifyResponse(streamId, nextEvent)));
await pumpEventQueue();
await eventController.close();
await responsesController.close();
}
});
test("can't be listened to twice", () {
var streamId = 'Isolate';
var responseQueue = StreamQueue(responsesController.stream);
{
var request =
rpcRequest('streamListen', params: {'streamId': streamId});
var response = Success();
when(serviceMock.streamListen(streamId))
.thenAnswer((_) => Future.value(response));
requestsController.add(request);
expect(responseQueue, emitsThrough(rpcResponse(response)));
}
{
var request =
rpcRequest('streamListen', params: {'streamId': streamId});
var response = Success();
when(serviceMock.streamListen(streamId))
.thenAnswer((_) => Future.value(response));
requestsController.add(request);
expect(
responseQueue,
emitsThrough(rpcErrorResponse(
RPCError('streamSubcribe', 103, 'Stream already subscribed', {
'details': "The stream '$streamId' is already subscribed",
}))));
}
});
test("can't cancel a stream that isn't being listened to", () {
var streamId = 'Isolate';
var responseQueue = StreamQueue(responsesController.stream);
var request = rpcRequest('streamCancel', params: {'streamId': streamId});
var response = Success();
when(serviceMock.streamListen(streamId))
.thenAnswer((_) => Future.value(response));
requestsController.add(request);
expect(
responseQueue,
emitsThrough(rpcErrorResponse(
RPCError('streamCancel', 104, 'Stream not subscribed', {
'details': "The stream '$streamId' is not subscribed",
}))));
});
group('Service', () {
final serviceStream = 'Service';
test('gives register and unregister events', () async {
var serviceId = 'ext.test.service';
var serviceRegisteredEvent = streamNotifyResponse(
serviceStream,
Event(
kind: EventKind.kServiceRegistered,
timestamp: 0,
method: serviceId,
service: serviceId,
),
);
var serviceUnRegisteredEvent = streamNotifyResponse(
serviceStream,
Event(
kind: EventKind.kServiceUnregistered,
timestamp: 0,
method: serviceId,
service: serviceId,
),
);
requestsController.add(
rpcRequest('streamListen', params: {'streamId': serviceStream}));
requestsController
.add(rpcRequest('registerService', params: {'service': serviceId}));
await expectLater(
responsesController.stream
.map((Map response) => stripEventTimestamp(response)),
emitsThrough(serviceRegisteredEvent));
// Connect another client to get the previous register events and the
// unregister event.
var requestsController2 = StreamController<Map<String, Object>>();
var responsesController2 = StreamController<Map<String, Object>>();
addTearDown(() {
requestsController2.close();
responsesController2.close();
});
VmServerConnection(
requestsController2.stream,
responsesController2.sink,
serviceRegistry,
VmService(Stream.empty(), (String _) => null),
);
expect(
responsesController2.stream
.map((Map response) => stripEventTimestamp(response)),
emitsThrough(emitsInOrder(
[serviceRegisteredEvent, serviceUnRegisteredEvent])));
// Should get the previously registered extension event, as well as
// the unregister event when the client disconnects.
requestsController2.add(
rpcRequest('streamListen', params: {'streamId': serviceStream}));
// Need to give the client a chance to subscribe.
await pumpEventQueue();
unawaited(requestsController.close());
// Give the old client a chance to shut down
await pumpEventQueue();
// Connect yet another client, it should get zero registration or
// unregistration events.
var requestsController3 = StreamController<Map<String, Object>>();
var responsesController3 = StreamController<Map<String, Object>>();
VmServerConnection(
requestsController3.stream,
responsesController3.sink,
serviceRegistry,
VmService(Stream.empty(), (String _) => null),
);
expect(
responsesController3.stream,
neverEmits(
anyOf(serviceRegisteredEvent, serviceUnRegisteredEvent)));
// Give it a chance to deliver events.
await pumpEventQueue();
// Disconnect the client so the test can shut down.
unawaited(requestsController3.close());
unawaited(responsesController3.close());
});
});
});
group('registerService', () {
test('can delegate requests between clients', () async {
var serviceId = 'ext.test.service';
var responseQueue = StreamQueue(responsesController.stream);
var clientInputController =
StreamController<Map<String, Object>>.broadcast();
var clientOutputController =
StreamController<Map<String, Object>>.broadcast();
var client = VmService(clientInputController.stream.map(jsonEncode),
(String message) => clientOutputController.add(jsonDecode(message)),
disposeHandler: () async {
await clientInputController.close();
await clientOutputController.close();
});
var clientConnection = VmServerConnection(clientOutputController.stream,
clientInputController.sink, serviceRegistry, serviceMock);
var requestParams = {'foo': 'bar'};
var expectedResponse = Response()..json = {'zap': 'zip'};
await client.registerService(serviceId, 'service');
// Duplicate registrations should fail.
expect(client.registerService(serviceId, 'service'),
throwsA(const TypeMatcher<RPCError>()));
client.registerServiceCallback(serviceId, (request) async {
expect(request, equals(requestParams));
return {'result': expectedResponse.toJson()};
});
var serviceRequest = rpcRequest(serviceId, params: requestParams);
requestsController.add(serviceRequest);
expect(await responseQueue.next, rpcResponse(expectedResponse));
// Kill the client that registered the handler, it should now fall back
// on `callServiceExtension`.
await client.dispose();
// This should complete as well.
await clientConnection.done;
var mockResponse = Response()..json = {'mock': 'response'};
when(serviceMock.callServiceExtension(serviceId,
args: argThat(equals(requestParams), named: 'args'),
isolateId: argThat(isNull, named: 'isolateId')))
.thenAnswer((_) async => mockResponse);
requestsController.add(serviceRequest);
expect(await responseQueue.next, rpcResponse(mockResponse));
});
});
}
Map<String, Object> rpcRequest(String method,
{Map<String, Object> params = const {}, String id = "1"}) =>
{
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": id,
};
Map<String, Object> rpcResponse(Response response, {String id = "1"}) => {
'jsonrpc': '2.0',
'id': id,
'result': response.toJson(),
};
Map<String, Object> rpcErrorResponse(Object error, {String id = "1"}) {
Map<String, Object> errorJson;
if (error is RPCError) {
errorJson = {
'code': error.code,
'message': error.message,
};
if (error.data != null) {
errorJson['data'] = error.data;
}
} else {
errorJson = {
'code': -32603,
'message': error.toString(),
};
}
return {
'jsonrpc': '2.0',
'id': id,
'error': errorJson,
};
}
Map<String, Object> streamNotifyResponse(String streamId, Event event) {
return {
'jsonrpc': '2.0',
'method': 'streamNotify',
'params': {
'streamId': '$streamId',
'event': event.toJson(),
},
};
}
Map<String, Object> stripEventTimestamp(Map response) {
if (response.containsKey('params') &&
response['params'].containsKey('event')) {
response['params']['event']['timestamp'] = 0;
}
return response as Map<String, Object>;
}
class MockVmService extends Mock implements VmServiceInterface {
final streamControllers = <String, StreamController<Event>>{};
@override
Stream<Event> onEvent(String streamId) => streamControllers
.putIfAbsent(streamId, () => StreamController<Event>())
.stream;
@override
Future<Success> setVMTimelineFlags(List<String> recordedStreams) async {
return Success();
}
}