blob: 6e2d8dd683fdf7f16b9ecc891f8ffe2a27fbabf5 [file] [log] [blame]
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of dds;
class _StreamManager {
_StreamManager(this.dds);
/// Send `streamNotify` notifications to clients subscribed to `streamId`.
///
/// If `data` is of type `Uint8List`, the notification is assumed to be a
/// binary event and is forwarded directly over the subscriber's websocket.
/// Otherwise, the event is sent via the JSON RPC client.
///
/// If `excludedClient` is provided, the notification will be sent to all
/// clients subscribed to `streamId` except for `excludedClient`.
void streamNotify(
String streamId,
data, {
_DartDevelopmentServiceClient excludedClient,
}) {
if (streamListeners.containsKey(streamId)) {
final listeners = streamListeners[streamId];
final isBinaryData = data is Uint8List;
for (final listener in listeners) {
if (listener == excludedClient) {
continue;
}
if (isBinaryData) {
listener.ws.sink.add(data);
} else {
listener.sendNotification('streamNotify', data);
}
}
}
}
void sendServiceRegisteredEvent(
_DartDevelopmentServiceClient client,
String service,
String alias,
) {
final namespace = dds._getNamespace(client);
streamNotify(
kServiceStream,
{
'streamId': kServiceStream,
'event': {
'type': 'Event',
'kind': 'ServiceRegistered',
'timestamp': DateTime.now().millisecondsSinceEpoch,
'service': service,
'method': namespace + '.' + service,
'alias': alias,
},
},
excludedClient: client,
);
}
void _sendServiceUnregisteredEvents(
_DartDevelopmentServiceClient client,
) {
final namespace = dds._getNamespace(client);
for (final service in client.services.keys) {
streamNotify(
kServiceStream,
{
'streamId': kServiceStream,
'event': {
'type': 'Event',
'kind': 'ServiceUnregistered',
'timestamp': DateTime.now().millisecondsSinceEpoch,
'service': service,
'method': namespace + '.' + service,
},
},
excludedClient: client,
);
}
}
/// Start listening for `streamNotify` events from the VM service and forward
/// them to the clients which have subscribed to the stream.
void listen() => dds._vmServiceClient.registerMethod(
'streamNotify',
(parameters) {
final streamId = parameters['streamId'].asString;
streamNotify(streamId, parameters.value);
},
);
/// Subscribes `client` to a stream.
///
/// If `client` is the first client to listen to `stream`, DDS will send a
/// `streamListen` request for `stream` to the VM service.
Future<void> streamListen(
_DartDevelopmentServiceClient client,
String stream,
) async {
assert(stream != null && stream.isNotEmpty);
if (!streamListeners.containsKey(stream)) {
// This will return an RPC exception if the stream doesn't exist. This
// will throw and the exception will be forwarded to the client.
final result = await dds._vmServiceClient.sendRequest('streamListen', {
'streamId': stream,
});
assert(result['type'] == 'Success');
streamListeners[stream] = <_DartDevelopmentServiceClient>[];
}
if (streamListeners[stream].contains(client)) {
throw kStreamAlreadySubscribedException;
}
streamListeners[stream].add(client);
}
/// Unsubscribes `client` from a stream.
///
/// If `client` is the last client to unsubscribe from `stream`, DDS will
/// send a `streamCancel` request for `stream` to the VM service.
Future<void> streamCancel(
_DartDevelopmentServiceClient client,
String stream,
) async {
assert(stream != null && stream.isNotEmpty);
final listeners = streamListeners[stream];
if (listeners == null || !listeners.contains(client)) {
throw kStreamNotSubscribedException;
}
listeners.remove(client);
if (listeners.isEmpty) {
streamListeners.remove(stream);
final result = await dds._vmServiceClient.sendRequest('streamCancel', {
'streamId': stream,
});
assert(result['type'] == 'Success');
} else {
streamListeners[stream] = listeners;
}
}
/// Cleanup stream subscriptions for `client` when it has disconnected.
void clientDisconnect(_DartDevelopmentServiceClient client) {
for (final streamId in streamListeners.keys.toList()) {
streamCancel(client, streamId).catchError(
(_) => null,
// Ignore 'stream not subscribed' errors.
test: (e) => e is json_rpc.RpcException,
);
}
// Notify other service clients of service extensions that are being
// unregistered.
_sendServiceUnregisteredEvents(client);
}
static const kServiceStream = 'Service';
static final kStreamAlreadySubscribedException =
_RpcErrorCodes.buildRpcException(
_RpcErrorCodes.kStreamAlreadySubscribed,
);
static final kStreamNotSubscribedException = _RpcErrorCodes.buildRpcException(
_RpcErrorCodes.kStreamNotSubscribed,
);
final _DartDevelopmentService dds;
final streamListeners = <String, List<_DartDevelopmentServiceClient>>{};
}