Replace ServerCommunicationChannel.listen() with Stream of requests.
So, we could add a StreamTransformer to debounce.
Change-Id: I04da366222cc30524e303c6a652de966c247e98f
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/219380
Commit-Queue: Konstantin Shcheglov <scheglov@google.com>
Reviewed-by: Brian Wilkerson <brianwilkerson@google.com>
diff --git a/pkg/analysis_server/lib/src/analysis_server.dart b/pkg/analysis_server/lib/src/analysis_server.dart
index b7a951b..2b98916 100644
--- a/pkg/analysis_server/lib/src/analysis_server.dart
+++ b/pkg/analysis_server/lib/src/analysis_server.dart
@@ -171,7 +171,7 @@
io.pid,
).toNotification(),
);
- channel.listen(handleRequest, onDone: done, onError: error);
+ channel.requests.listen(handleRequest, onDone: done, onError: error);
handlers = <server.RequestHandler>[
ServerDomainHandler(this),
AnalysisDomainHandler(this),
diff --git a/pkg/analysis_server/lib/src/channel/byte_stream_channel.dart b/pkg/analysis_server/lib/src/channel/byte_stream_channel.dart
index bfbc6ba..955e547 100644
--- a/pkg/analysis_server/lib/src/channel/byte_stream_channel.dart
+++ b/pkg/analysis_server/lib/src/channel/byte_stream_channel.dart
@@ -73,7 +73,6 @@
/// standard input and standard output) to communicate with clients.
class ByteStreamServerChannel implements ServerCommunicationChannel {
final Stream _input;
-
final IOSink _output;
/// The instrumentation service that is to be used by this analysis server.
@@ -88,6 +87,20 @@
/// True if [close] has been called.
bool _closeRequested = false;
+ @override
+ late final Stream<Request> requests = _input
+ .transform(const Utf8Decoder())
+ .transform(const LineSplitter())
+ .transform(
+ StreamTransformer.fromHandlers(
+ handleData: _readRequest,
+ handleDone: (sink) {
+ close();
+ sink.close();
+ },
+ ),
+ );
+
ByteStreamServerChannel(
this._input, this._output, this._instrumentationService,
{RequestStatisticsHelper? requestStatistics})
@@ -110,17 +123,6 @@
}
@override
- void listen(void Function(Request request) onRequest,
- {Function? onError, void Function()? onDone}) {
- _input.transform(const Utf8Decoder()).transform(LineSplitter()).listen(
- (String data) => _readRequest(data, onRequest),
- onError: onError, onDone: () {
- close();
- onDone?.call();
- });
- }
-
- @override
void sendNotification(Notification notification) {
// Don't send any further notifications after the communication channel is
// closed.
@@ -159,7 +161,7 @@
/// Read a request from the given [data] and use the given function to handle
/// the request.
- void _readRequest(String data, void Function(Request request) onRequest) {
+ void _readRequest(String data, Sink<Request> sink) {
// Ignore any further requests after the communication channel is closed.
if (_closed.isCompleted) {
return;
@@ -173,6 +175,6 @@
return;
}
_requestStatistics?.addRequest(request);
- onRequest(request);
+ sink.add(request);
}
}
diff --git a/pkg/analysis_server/lib/src/channel/channel.dart b/pkg/analysis_server/lib/src/channel/channel.dart
index 6b5e95f..37a6e86 100644
--- a/pkg/analysis_server/lib/src/channel/channel.dart
+++ b/pkg/analysis_server/lib/src/channel/channel.dart
@@ -98,17 +98,12 @@
/// objects that allow an [AnalysisServer] to receive [Request]s and to return
/// both [Response]s and [Notification]s.
abstract class ServerCommunicationChannel {
+ /// The single-subscription stream of requests.
+ Stream<Request> get requests;
+
/// Close the communication channel.
void close();
- /// Listen to the channel for requests. If a request is received, invoke the
- /// [onRequest] function. If an error is encountered while trying to read from
- /// the socket, invoke the [onError] function. If the socket is closed by the
- /// client, invoke the [onDone] function.
- /// Only one listener is allowed per channel.
- void listen(void Function(Request request) onRequest,
- {Function onError, void Function() onDone});
-
/// Send the given [notification] to the client.
void sendNotification(Notification notification);
diff --git a/pkg/analysis_server/lib/src/server/dev_server.dart b/pkg/analysis_server/lib/src/server/dev_server.dart
index 62886da6..b11c418 100644
--- a/pkg/analysis_server/lib/src/server/dev_server.dart
+++ b/pkg/analysis_server/lib/src/server/dev_server.dart
@@ -165,21 +165,11 @@
Stream<Notification> get onNotification => _notificationController.stream;
@override
- void close() {
- _notificationController.close();
- }
+ Stream<Request> get requests => _requestController.stream;
@override
- void listen(
- void Function(Request request) onRequest, {
- Function? onError,
- void Function()? onDone,
- }) {
- _requestController.stream.listen(
- onRequest,
- onError: onError,
- onDone: onDone,
- );
+ void close() {
+ _notificationController.close();
}
@override
diff --git a/pkg/analysis_server/lib/src/socket_server.dart b/pkg/analysis_server/lib/src/socket_server.dart
index 59cafe6..8e938ef 100644
--- a/pkg/analysis_server/lib/src/socket_server.dart
+++ b/pkg/analysis_server/lib/src/socket_server.dart
@@ -62,7 +62,7 @@
var error = RequestError(
RequestErrorCode.SERVER_ALREADY_STARTED, 'Server already started');
serverChannel.sendResponse(Response('', error: error));
- serverChannel.listen((Request request) {
+ serverChannel.requests.listen((Request request) {
serverChannel.sendResponse(Response(request.id, error: error));
});
return;
diff --git a/pkg/analysis_server/lib/src/utilities/mocks.dart b/pkg/analysis_server/lib/src/utilities/mocks.dart
index 9c581cd..bef5102 100644
--- a/pkg/analysis_server/lib/src/utilities/mocks.dart
+++ b/pkg/analysis_server/lib/src/utilities/mocks.dart
@@ -36,6 +36,9 @@
MockServerChannel();
@override
+ Stream<Request> get requests => requestController.stream;
+
+ @override
void close() {
_closed = true;
}
@@ -46,13 +49,6 @@
}
@override
- void listen(void Function(Request request) onRequest,
- {Function? onError, void Function()? onDone}) {
- requestController.stream
- .listen(onRequest, onError: onError, onDone: onDone);
- }
-
- @override
void sendNotification(Notification notification) {
// Don't deliver notifications after the connection is closed.
if (_closed) {
diff --git a/pkg/analysis_server/test/channel/byte_stream_channel_test.dart b/pkg/analysis_server/test/channel/byte_stream_channel_test.dart
index 0724bfe..d89741d 100644
--- a/pkg/analysis_server/test/channel/byte_stream_channel_test.dart
+++ b/pkg/analysis_server/test/channel/byte_stream_channel_test.dart
@@ -135,7 +135,7 @@
errorStream = errorStreamController.stream;
var doneCompleter = Completer();
doneFuture = doneCompleter.future;
- channel.listen((Request request) {
+ channel.requests.listen((Request request) {
requestStreamController.add(request);
}, onError: (error) {
errorStreamController.add(error);
diff --git a/pkg/analysis_server/test/src/plugin/notification_manager_test.dart b/pkg/analysis_server/test/src/plugin/notification_manager_test.dart
index 0f7483d..5504afb 100644
--- a/pkg/analysis_server/test/src/plugin/notification_manager_test.dart
+++ b/pkg/analysis_server/test/src/plugin/notification_manager_test.dart
@@ -468,23 +468,10 @@
server.Notification? sentNotification;
@override
- void close() {
- fail('Unexpected invocation of close');
- }
-
- @override
- void listen(void Function(server.Request) onRequest,
- {Function? onError, void Function()? onDone}) {
- fail('Unexpected invocation of listen');
- }
+ dynamic noSuchMethod(Invocation invocation) => super.noSuchMethod(invocation);
@override
void sendNotification(server.Notification notification) {
sentNotification = notification;
}
-
- @override
- void sendResponse(server.Response response) {
- fail('Unexpected invocation of sendResponse');
- }
}