blob: 602348e065fff4f2306f81a4a47f06b30fb66dcc [file] [log] [blame]
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:dart_service_protocol_shared/dart_service_protocol_shared.dart';
import 'package:dtd/dtd.dart' show CoreDtdServiceConstants, DtdParameters;
import 'package:sse/server/sse_handler.dart';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:dtd/dtd.dart' show RpcErrorCodes, RegisteredServicesResponse;
import 'constants.dart';
import 'dart_tooling_daemon.dart';
/// Represents a client that is connected to a DTD service.
///
/// [DTDClient] is used only by the server to handle the remote DTD client and
/// by the client itself/on the client side.
class DTDClient extends Client {
final StreamChannel<String> connection;
late json_rpc.Peer _clientPeer;
final DartToolingDaemon dtd;
late final Future<void> _done;
Future<void> get done => _done;
DTDClient.fromWebSocket(
DartToolingDaemon dtd,
WebSocketChannel ws,
) : this._(
dtd,
ws.cast<String>(),
);
DTDClient.fromSSEConnection(
DartToolingDaemon dtd,
SseConnection sse,
) : this._(
dtd,
sse,
);
DTDClient._(
this.dtd,
this.connection,
) {
_clientPeer = json_rpc.Peer(
connection,
strictProtocolChecks: false,
);
_registerJsonRpcMethods();
_done = listen();
}
@override
Future<void> close() => _clientPeer.close();
@override
Future<dynamic> sendRequest({
required String method,
dynamic parameters,
}) async {
if (_clientPeer.isClosed) {
return;
}
return await _clientPeer.sendRequest(method, parameters.value);
}
@override
void streamNotify(String stream, Object data) {
_clientPeer.sendNotification(
CoreDtdServiceConstants.streamNotify,
data,
);
}
/// Start receiving JSON RPC requests from the client.
///
/// Returned future completes when the peer is closed.
Future<void> listen() => _clientPeer.listen().then(
(_) => dtd.streamManager.onClientDisconnect(this),
);
/// The set of RPC methods registered by DTD itself.
///
/// This will be a combination of first party DTD methods and methods
/// registered by internal services.
final _dtdRpcMethods = <String>{};
/// Registers handlers for the Dart Tooling Daemon JSON RPC method endpoints.
void _registerJsonRpcMethods() {
_registerDtdMethod(CoreDtdServiceConstants.streamListen, _streamListen);
_registerDtdMethod(CoreDtdServiceConstants.streamCancel, _streamCancel);
_registerDtdMethod(CoreDtdServiceConstants.postEvent, _postEvent);
_registerDtdMethod(
CoreDtdServiceConstants.registerService,
_registerService,
);
_registerDtdMethod(
CoreDtdServiceConstants.getRegisteredServices,
_getRegisteredServices,
);
// Handle service extension invocations.
_clientPeer.registerFallback(_fallback);
}
/// jrpc endpoint for listening to a stream.
///
/// Parameters:
/// 'streamId': the stream to be cancelled.
Future<Map<String, Object?>> _streamListen(
json_rpc.Parameters parameters,
) async {
final streamId = parameters[DtdParameters.streamId].asString;
try {
await dtd.streamManager.streamListen(
this,
streamId,
);
} on StreamAlreadyListeningException catch (_) {
throw RpcErrorCodes.buildRpcException(
RpcErrorCodes.kStreamAlreadySubscribed,
data: {
'details': "The stream '$streamId' is already subscribed",
},
);
}
// If the remote client was subscribing to the services stream, send all
// of the existing streams.
if (streamId == CoreDtdServiceConstants.servicesStreamId) {
for (final client in dtd.clientManager.clients) {
for (final service in client.services.values) {
for (final method in service.methods.values) {
_streamNotifyHelper(
CoreDtdServiceConstants.servicesStreamId,
CoreDtdServiceConstants.serviceRegisteredKind,
_buildServiceRegisteredData(
service: service.name,
method: method.name,
capabilities: method.capabilities,
),
);
}
}
}
for (final method in _dtdRpcMethods) {
// If the DTD service method has the form 'service.method', split up the
// two values. Otherwise, leave the service null and use the entire name
// as the method.
String? serviceName;
String methodName;
final parts = method.split('.');
if (parts.length == 2) {
serviceName = parts[0];
}
methodName = parts.last;
_streamNotifyHelper(
CoreDtdServiceConstants.servicesStreamId,
CoreDtdServiceConstants.serviceRegisteredKind,
_buildServiceRegisteredData(
service: serviceName,
method: methodName,
),
);
}
}
return RPCResponses.success;
}
/// A helper to emit an event ([eventKind] to [stream]).
void _streamNotifyHelper(
String stream,
String eventKind,
Map<String, Object?>? eventData,
) {
streamNotify(stream, {
DtdParameters.streamId: stream,
DtdParameters.eventKind: eventKind,
DtdParameters.eventData: eventData,
DtdParameters.timestamp: DateTime.now().millisecondsSinceEpoch,
});
}
/// jrpc endpoint for stopping listening to a stream.
///
/// Parameters:
/// 'streamId': the stream that the client would like to stop listening to.
Future<Map<String, Object?>> _streamCancel(
json_rpc.Parameters parameters,
) async {
final streamId = parameters[DtdParameters.streamId].asString;
if (!dtd.streamManager.isSubscribed(this, streamId)) {
throw RpcErrorCodes.buildRpcException(
RpcErrorCodes.kStreamNotSubscribed,
data: {
'details': "Client is not listening to '$streamId'",
},
);
}
await dtd.streamManager.streamCancel(this, streamId);
return RPCResponses.success;
}
/// jrpc endpoint for posting an event to a stream.
///
/// Parameters:
/// 'eventKind': the kind of event being sent.
/// 'eventData': the data being sent over the stream.
/// 'streamId: the stream that is being posted to.
Future<Map<String, Object?>> _postEvent(
json_rpc.Parameters parameters,
) async {
final eventKind = parameters[DtdParameters.eventKind].asString;
final eventData =
parameters[DtdParameters.eventData].asMap.cast<String, Object?>();
final stream = parameters[DtdParameters.streamId].asString;
dtd.streamManager.postEventHelper(stream, eventKind, eventData);
return RPCResponses.success;
}
bool _isValidServiceName(String serviceName) {
return !serviceName.contains('.');
}
/// jrpc endpoint for registering a service to the tooling daemon.
///
/// Parameters:
/// 'service': the name of the service that is being registered to.
/// 'method': the name of the method that is being registered on the service.
Map<String, Object?> _registerService(json_rpc.Parameters parameters) {
final serviceName = parameters[DtdParameters.service].asString;
final methodName = parameters[DtdParameters.method].asString;
final capabilities = parameters[DtdParameters.capabilities].exists
? parameters[DtdParameters.capabilities].asMap.cast<String, Object?>()
: null;
if (!_isValidServiceName(serviceName)) {
throw RpcErrorCodes.buildRpcException(
RpcErrorCodes.kServiceNameInvalid,
data: {
'details': "'$serviceName' is not a valid service name. "
"Services may not include dots in their names.",
},
);
}
if (dtd.internalServices.containsKey(serviceName)) {
throw RpcErrorCodes.buildRpcException(
RpcErrorCodes.kServiceAlreadyRegistered,
data: {
'details':
"Service '$serviceName' is already registered as a DTD internal service.",
},
);
}
final existingServiceOwnerClient =
dtd.clientManager.findClientThatOwnsService(serviceName);
if (existingServiceOwnerClient != null &&
existingServiceOwnerClient != this) {
throw RpcErrorCodes.buildRpcException(
RpcErrorCodes.kServiceAlreadyRegistered,
data: {
'details':
"Service '$serviceName' is already registered by another client. "
"Only 1 client at a time may register methods to a service.",
},
);
}
if (services[serviceName]?.methods.containsKey(methodName) ?? false) {
throw RpcErrorCodes.buildRpcException(
RpcErrorCodes.kServiceMethodAlreadyRegistered,
data: {
'details':
"$methodName has already been registered for the $serviceName service by this client.",
},
);
}
final methodInfo = ClientServiceMethodInfo(methodName, capabilities);
services
.putIfAbsent(serviceName, () => ClientServiceInfo(serviceName))
.methods[methodName] = methodInfo;
// Send an event to inform other clients that this service method is
// available.
dtd.streamManager.postEventHelper(
CoreDtdServiceConstants.servicesStreamId,
CoreDtdServiceConstants.serviceRegisteredKind,
_buildServiceRegisteredData(
service: serviceName,
method: methodName,
capabilities: capabilities,
),
);
return RPCResponses.success;
}
/// Returns a structured response with all the currently registered services
/// available on this DTD instance.
Map<String, Object?> _getRegisteredServices() {
final clientServices =
dtd.clientManager.clients.map((client) => client.services);
final combinedClientServices = {
// This will not create collisions because [_registerService] ensures
// the uniqueness of service methods across clients.
for (final servicesForClient in clientServices) ...servicesForClient,
};
return RegisteredServicesResponse(
dtdServices: _dtdRpcMethods.toList(),
clientServices: combinedClientServices.values.toList(),
).toJson();
}
/// Cleans up when this client is disconnecting, before it is removed from the
/// client manager.
void onClientDisconnect() {
for (final service in services.values) {
for (final method in service.methods.values) {
// Notify other clients about this service going away.
dtd.streamManager.postEventHelper(
CoreDtdServiceConstants.servicesStreamId,
CoreDtdServiceConstants.serviceUnregisteredKind,
_buildServiceUnregisteredData(service.name, method.name),
);
}
}
}
/// Builds a structured object describing a service method that is being
/// registered on DTD.
///
/// [service] may be null if this service method is a first party service
/// method registered by DTD or by an internal service.
Map<String, Object?> _buildServiceRegisteredData({
required String? service,
required String method,
Map<String, Object?>? capabilities,
}) {
return {
if (service != null) DtdParameters.service: service,
DtdParameters.method: method,
if (capabilities != null) DtdParameters.capabilities: capabilities,
};
}
Map<String, Object?> _buildServiceUnregisteredData(
String service,
String method,
) {
return {
DtdParameters.service: service,
DtdParameters.method: method,
};
}
/// jrpc fallback handler.
///
/// Handles all service method calls that will be forwarded to the respective
/// client which registered that service method.
Future<Object?> _fallback(json_rpc.Parameters parameters) async {
// Lookup the client associated with the service extension's namespace.
// If the client exists and that client has registered the specified
// method, forward the request to that client.
final combinedName = parameters.method;
final dotIndex = combinedName.indexOf('.');
if (dotIndex == -1) {
// All service methods must have a dot in the name.
throw json_rpc.RpcException(
RpcErrorCodes.kMethodNotFound,
'Unknown service method: $combinedName',
);
}
final serviceName = combinedName.substring(0, dotIndex);
final methodName = combinedName.substring(dotIndex + 1);
final client = dtd.clientManager.findClientThatHandlesServiceMethod(
serviceName,
methodName,
);
if (client == null) {
throw json_rpc.RpcException(
RpcErrorCodes.kMethodNotFound,
'Unknown service method: $combinedName',
);
}
return await client.sendRequest(
method: combinedName,
parameters: parameters,
);
}
/// Registers a service method to the Dart Tooling Daemon using the name
/// "[service].[method]".
///
/// This method is a helper for registering service methods that are available
/// when the Dart Tooling Daemon is initialized. To trigger a service method
/// registered with this helper see the
/// [dtd_protocol.md#servicemethod](https://github.com/dart-lang/sdk/blob/main/pkg/dtd_impl/dtd_protocol.md#servicemethod)
/// documentation.
///
/// When a client of the Dart Tooling Daemon calls [service].[method], then
/// [callback] will be run with the parameters of that call.
void registerServiceMethod(
String service,
String method,
void Function(json_rpc.Parameters parameters) callback,
) {
final combinedName = '$service.$method';
_registerDtdMethod(combinedName, callback);
}
/// A helper method for registering a service method on DTD and adding the
/// service method to the [_dtdRpcMethods] set.
void _registerDtdMethod(String method, Function callback) {
_dtdRpcMethods.add(method);
_clientPeer.registerMethod(method, callback);
}
}