blob: 771ccee9289d93f642d589b438a88f9022ae846f [file] [log] [blame]
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'package:meta/meta.dart';
import 'package:sse/server/sse_handler.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import '../dds.dart';
import 'constants.dart';
import 'dds_impl.dart';
import 'rpc_error_codes.dart';
import 'stream_manager.dart';
/// Representation of a single DDS client which manages the connection and
/// DDS request intercepting / forwarding.
class DartDevelopmentServiceClient {
DartDevelopmentServiceClient.fromWebSocket(
DartDevelopmentService dds,
WebSocketChannel ws,
json_rpc.Peer vmServicePeer,
) : this._(
dds as DartDevelopmentServiceImpl,
ws,
vmServicePeer,
);
DartDevelopmentServiceClient.fromSSEConnection(
DartDevelopmentService dds,
SseConnection sse,
json_rpc.Peer vmServicePeer,
) : this._(
dds as DartDevelopmentServiceImpl,
sse,
vmServicePeer,
);
DartDevelopmentServiceClient._(
this.dds,
this.connection,
json_rpc.Peer vmServicePeer,
) : _vmServicePeer = vmServicePeer {
_clientPeer = json_rpc.Peer(
// Manually create a StreamChannel<String> instead of calling
// .cast<String>() as cast() results in addStream() being called,
// binding the underlying sink. This results in a StateError being thrown
// if we try and add directly to the sink, which we do for binary events
// in StreamManager's streamNotify().
StreamChannel<String>(
connection.stream.cast(),
StreamController(sync: true)
..stream
.cast()
.listen((event) => connection.sink.add(event))
.onDone(() => connection.sink.close()),
),
strictProtocolChecks: false,
);
_registerJsonRpcMethods();
}
/// Start receiving JSON RPC requests from the client.
///
/// Returned future completes when the peer is closed.
Future<void> listen() => _clientPeer.listen().then(
(_) => dds.streamManager.clientDisconnect(this),
);
/// Close the connection to the client.
Future<void> close() async {
// Cleanup the JSON RPC server for this connection if DDS has shutdown.
await _clientPeer.close();
}
/// Send a JSON RPC notification to the client.
void sendNotification(String method, [dynamic parameters]) {
if (_clientPeer.isClosed) {
return;
}
_clientPeer.sendNotification(method, parameters);
}
/// Send a JSON RPC request to the client.
Future<dynamic> sendRequest(String method, [dynamic parameters]) async {
if (_clientPeer.isClosed) {
return null;
}
return await _clientPeer.sendRequest(method, parameters);
}
/// Registers handlers for JSON RPC methods which need to be intercepted by
/// DDS as well as fallback request forwarder.
void _registerJsonRpcMethods() {
_clientPeer.registerMethod('streamListen', (parameters) async {
final streamId = parameters['streamId'].asString;
await dds.streamManager.streamListen(this, streamId);
return RPCResponses.success;
});
_clientPeer.registerMethod('streamCancel', (parameters) async {
final streamId = parameters['streamId'].asString;
await dds.streamManager.streamCancel(this, streamId);
return RPCResponses.success;
});
_clientPeer.registerMethod('registerService', (parameters) async {
final serviceId = parameters['service'].asString;
final alias = parameters['alias'].asString;
if (services.containsKey(serviceId)) {
throw RpcErrorCodes.buildRpcException(
RpcErrorCodes.kServiceAlreadyRegistered,
);
}
services[serviceId] = alias;
// Notify other clients that a new service extension is available.
dds.streamManager.sendServiceRegisteredEvent(
this,
serviceId,
alias,
);
return RPCResponses.success;
});
_clientPeer.registerMethod(
'getClientName',
(parameters) => {'type': 'ClientName', 'name': name},
);
_clientPeer.registerMethod(
'setClientName',
(parameters) => dds.clientManager.setClientName(this, parameters),
);
_clientPeer.registerMethod(
'requirePermissionToResume',
(parameters) =>
dds.clientManager.requirePermissionToResume(this, parameters),
);
_clientPeer.registerMethod(
'resume',
(parameters) => dds.isolateManager.resumeIsolate(this, parameters),
);
_clientPeer.registerMethod('getStreamHistory', (parameters) {
final stream = parameters['stream'].asString;
final events = dds.streamManager.getStreamHistory(stream);
if (events == null) {
throw json_rpc.RpcException.invalidParams(
"Event history is not collected for stream '$stream'",
);
}
return <String, dynamic>{
'type': 'StreamHistory',
'history': events,
};
});
_clientPeer.registerMethod(
'getLogHistorySize',
(parameters) => {
'type': 'Size',
'size': StreamManager
.loggingRepositories[StreamManager.kLoggingStream]!
.bufferSize,
});
_clientPeer.registerMethod('setLogHistorySize', (parameters) {
final size = parameters['size'].asInt;
if (size < 0) {
throw json_rpc.RpcException.invalidParams(
"'size' must be greater or equal to zero",
);
}
StreamManager.loggingRepositories[StreamManager.kLoggingStream]!
.resize(size);
return RPCResponses.success;
});
_clientPeer.registerMethod('getDartDevelopmentServiceVersion',
(parameters) async {
final ddsVersion = DartDevelopmentService.protocolVersion.split('.');
return <String, dynamic>{
'type': 'Version',
'major': int.parse(ddsVersion[0]),
'minor': int.parse(ddsVersion[1]),
};
});
_clientPeer.registerMethod('getSupportedProtocols', (parameters) async {
final Map<String, dynamic> supportedProtocols = (await _vmServicePeer
.sendRequest('getSupportedProtocols')) as Map<String, dynamic>;
final ddsVersion = DartDevelopmentService.protocolVersion.split('.');
final ddsProtocol = {
'protocolName': 'DDS',
'major': int.parse(ddsVersion[0]),
'minor': int.parse(ddsVersion[1]),
};
supportedProtocols['protocols']
.cast<Map<String, dynamic>>()
.add(ddsProtocol);
return supportedProtocols;
});
_clientPeer.registerMethod(
'getAvailableCachedCpuSamples',
(_) => {
'type': 'AvailableCachedCpuSamples',
'cacheNames': dds.cachedUserTags,
},
);
_clientPeer.registerMethod(
'getCachedCpuSamples',
dds.isolateManager.getCachedCpuSamples,
);
// `evaluate` and `evaluateInFrame` actually consist of multiple RPC
// invocations, including a call to `compileExpression` which can be
// overridden by clients which provide their own implementation (e.g.,
// Flutter Tools). We handle all of this in [_ExpressionEvaluator].
_clientPeer.registerMethod(
'evaluate',
dds.expressionEvaluator.execute,
);
_clientPeer.registerMethod(
'evaluateInFrame',
dds.expressionEvaluator.execute,
);
// When invoked within a fallback, the next fallback will start executing.
// The final fallback forwards the request to the VM service directly.
@alwaysThrows
nextFallback() => throw json_rpc.RpcException.methodNotFound('');
// Handle service extension invocations.
_clientPeer.registerFallback((parameters) async {
hasNamespace(String method) => method.contains('.');
getMethod(String method) => method.split('.').last;
getNamespace(String method) => method.split('.').first;
if (!hasNamespace(parameters.method)) {
nextFallback();
}
// Lookup the client associated with the service extension's namespace.
// If the client exists and that client has registered the specified
// method, forward the request to that client.
final method = getMethod(parameters.method);
final namespace = getNamespace(parameters.method);
final serviceClient = dds.clientManager.clients[namespace];
if (serviceClient != null && serviceClient.services.containsKey(method)) {
return await Future.any(
[
// Forward the request to the service client or...
serviceClient.sendRequest(method, parameters.asMap).catchError((_) {
throw RpcErrorCodes.buildRpcException(
RpcErrorCodes.kServiceDisappeared,
);
}, test: (error) => error is StateError),
// if the service client closes, return an error response.
serviceClient._clientPeer.done.then(
(_) => throw RpcErrorCodes.buildRpcException(
RpcErrorCodes.kServiceDisappeared,
),
),
],
);
}
throw json_rpc.RpcException(
RpcErrorCodes.kMethodNotFound,
'Unknown service: ${parameters.method}',
);
});
// Unless otherwise specified, the request is forwarded to the VM service.
// NOTE: This must be the last fallback registered.
_clientPeer.registerFallback((parameters) async {
// If _vmServicePeer closes in the middle of a request, this will throw
// a StateError. Listeners in dds_impl.dart will handle shutting down the
// DDS instance, so we don't try and handle the error here.
try {
return await _vmServicePeer.sendRequest(
parameters.method,
parameters.value,
);
} on StateError {
throw RpcErrorCodes.buildRpcException(
RpcErrorCodes.kServiceDisappeared,
);
}
});
}
static int _idCounter = 0;
final int _id = ++_idCounter;
/// The name given to the client upon its creation.
String get defaultClientName => 'client$_id';
/// The current name associated with this client.
String? get name => _name;
// NOTE: this should not be called directly except from:
// - `ClientManager._clearClientName`
// - `ClientManager._setClientNameHelper`
set name(String? n) => _name = n ?? defaultClientName;
String? _name;
final DartDevelopmentServiceImpl dds;
final StreamChannel connection;
final Map<String, String> services = {};
final json_rpc.Peer _vmServicePeer;
late json_rpc.Peer _clientPeer;
}