blob: 318cc39a78bca46518ab37454964354b6f557e7c [file] [log] [blame]
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
part of dds;
class _StreamManager {
_StreamManager(this.dds);
/// Send `streamNotify` notifications to clients subscribed to `streamId`.
///
/// If `data` is of type `Uint8List`, the notification is assumed to be a
/// binary event and is forwarded directly over the subscriber's websocket.
/// Otherwise, the event is sent via the JSON RPC client.
void streamNotify(String streamId, data) {
if (streamListeners.containsKey(streamId)) {
final listeners = streamListeners[streamId];
final isBinaryData = data is Uint8List;
for (final listener in listeners) {
if (isBinaryData) {
listener.ws.sink.add(data);
} else {
listener.sendNotification('streamNotify', data);
}
}
}
}
/// Start listening for `streamNotify` events from the VM service and forward
/// them to the clients which have subscribed to the stream.
void listen() => dds._vmServiceClient.registerMethod(
'streamNotify',
(parameters) {
final streamId = parameters['streamId'].asString;
streamNotify(streamId, parameters.value);
},
);
/// Subscribes `client` to a stream.
///
/// If `client` is the first client to listen to `stream`, DDS will send a
/// `streamListen` request for `stream` to the VM service.
Future<void> streamListen(
_DartDevelopmentServiceClient client,
String stream,
) async {
assert(stream != null && stream.isNotEmpty);
if (!streamListeners.containsKey(stream)) {
// This will return an RPC exception if the stream doesn't exist. This
// will throw and the exception will be forwarded to the client.
final result = await dds._vmServiceClient.sendRequest('streamListen', {
'streamId': stream,
});
assert(result['type'] == 'Success');
streamListeners[stream] = <_DartDevelopmentServiceClient>[];
}
if (streamListeners[stream].contains(client)) {
throw kStreamAlreadySubscribedException;
}
streamListeners[stream].add(client);
}
/// Unsubscribes `client` from a stream.
///
/// If `client` is the last client to unsubscribe from `stream`, DDS will
/// send a `streamCancel` request for `stream` to the VM service.
Future<void> streamCancel(
_DartDevelopmentServiceClient client,
String stream,
) async {
assert(stream != null && stream.isNotEmpty);
final listeners = streamListeners[stream];
if (listeners == null || !listeners.contains(client)) {
throw kStreamNotSubscribedException;
}
listeners.remove(client);
if (listeners.isEmpty) {
streamListeners.remove(stream);
final result = await dds._vmServiceClient.sendRequest('streamCancel', {
'streamId': stream,
});
assert(result['type'] == 'Success');
} else {
streamListeners[stream] = listeners;
}
}
/// Cleanup stream subscriptions for `client` when it has disconnected.
void clientDisconnect(_DartDevelopmentServiceClient client) {
for (final streamId in streamListeners.keys.toList()) {
streamCancel(client, streamId);
}
}
// These error codes must be kept in sync with those in vm/json_stream.h and
// vmservice.dart.
static const kStreamAlreadySubscribed = 103;
static const kStreamNotSubscribed = 104;
// Keep these messages in sync with the VM service.
static final kStreamAlreadySubscribedException = json_rpc.RpcException(
kStreamAlreadySubscribed,
'Stream already subscribed',
);
static final kStreamNotSubscribedException = json_rpc.RpcException(
kStreamNotSubscribed,
'Stream not subscribed',
);
final _DartDevelopmentService dds;
final streamListeners = <String, List<_DartDevelopmentServiceClient>>{};
}