Replace ServerCommunicationChannel.listen() with Stream of requests.

So, we could add a StreamTransformer to debounce.

Change-Id: I04da366222cc30524e303c6a652de966c247e98f
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/219380
Commit-Queue: Konstantin Shcheglov <scheglov@google.com>
Reviewed-by: Brian Wilkerson <brianwilkerson@google.com>
diff --git a/pkg/analysis_server/lib/src/analysis_server.dart b/pkg/analysis_server/lib/src/analysis_server.dart
index b7a951b..2b98916 100644
--- a/pkg/analysis_server/lib/src/analysis_server.dart
+++ b/pkg/analysis_server/lib/src/analysis_server.dart
@@ -171,7 +171,7 @@
         io.pid,
       ).toNotification(),
     );
-    channel.listen(handleRequest, onDone: done, onError: error);
+    channel.requests.listen(handleRequest, onDone: done, onError: error);
     handlers = <server.RequestHandler>[
       ServerDomainHandler(this),
       AnalysisDomainHandler(this),
diff --git a/pkg/analysis_server/lib/src/channel/byte_stream_channel.dart b/pkg/analysis_server/lib/src/channel/byte_stream_channel.dart
index bfbc6ba..955e547 100644
--- a/pkg/analysis_server/lib/src/channel/byte_stream_channel.dart
+++ b/pkg/analysis_server/lib/src/channel/byte_stream_channel.dart
@@ -73,7 +73,6 @@
 /// standard input and standard output) to communicate with clients.
 class ByteStreamServerChannel implements ServerCommunicationChannel {
   final Stream _input;
-
   final IOSink _output;
 
   /// The instrumentation service that is to be used by this analysis server.
@@ -88,6 +87,20 @@
   /// True if [close] has been called.
   bool _closeRequested = false;
 
+  @override
+  late final Stream<Request> requests = _input
+      .transform(const Utf8Decoder())
+      .transform(const LineSplitter())
+      .transform(
+        StreamTransformer.fromHandlers(
+          handleData: _readRequest,
+          handleDone: (sink) {
+            close();
+            sink.close();
+          },
+        ),
+      );
+
   ByteStreamServerChannel(
       this._input, this._output, this._instrumentationService,
       {RequestStatisticsHelper? requestStatistics})
@@ -110,17 +123,6 @@
   }
 
   @override
-  void listen(void Function(Request request) onRequest,
-      {Function? onError, void Function()? onDone}) {
-    _input.transform(const Utf8Decoder()).transform(LineSplitter()).listen(
-        (String data) => _readRequest(data, onRequest),
-        onError: onError, onDone: () {
-      close();
-      onDone?.call();
-    });
-  }
-
-  @override
   void sendNotification(Notification notification) {
     // Don't send any further notifications after the communication channel is
     // closed.
@@ -159,7 +161,7 @@
 
   /// Read a request from the given [data] and use the given function to handle
   /// the request.
-  void _readRequest(String data, void Function(Request request) onRequest) {
+  void _readRequest(String data, Sink<Request> sink) {
     // Ignore any further requests after the communication channel is closed.
     if (_closed.isCompleted) {
       return;
@@ -173,6 +175,6 @@
       return;
     }
     _requestStatistics?.addRequest(request);
-    onRequest(request);
+    sink.add(request);
   }
 }
diff --git a/pkg/analysis_server/lib/src/channel/channel.dart b/pkg/analysis_server/lib/src/channel/channel.dart
index 6b5e95f..37a6e86 100644
--- a/pkg/analysis_server/lib/src/channel/channel.dart
+++ b/pkg/analysis_server/lib/src/channel/channel.dart
@@ -98,17 +98,12 @@
 /// objects that allow an [AnalysisServer] to receive [Request]s and to return
 /// both [Response]s and [Notification]s.
 abstract class ServerCommunicationChannel {
+  /// The single-subscription stream of requests.
+  Stream<Request> get requests;
+
   /// Close the communication channel.
   void close();
 
-  /// Listen to the channel for requests. If a request is received, invoke the
-  /// [onRequest] function. If an error is encountered while trying to read from
-  /// the socket, invoke the [onError] function. If the socket is closed by the
-  /// client, invoke the [onDone] function.
-  /// Only one listener is allowed per channel.
-  void listen(void Function(Request request) onRequest,
-      {Function onError, void Function() onDone});
-
   /// Send the given [notification] to the client.
   void sendNotification(Notification notification);
 
diff --git a/pkg/analysis_server/lib/src/server/dev_server.dart b/pkg/analysis_server/lib/src/server/dev_server.dart
index 62886da6..b11c418 100644
--- a/pkg/analysis_server/lib/src/server/dev_server.dart
+++ b/pkg/analysis_server/lib/src/server/dev_server.dart
@@ -165,21 +165,11 @@
   Stream<Notification> get onNotification => _notificationController.stream;
 
   @override
-  void close() {
-    _notificationController.close();
-  }
+  Stream<Request> get requests => _requestController.stream;
 
   @override
-  void listen(
-    void Function(Request request) onRequest, {
-    Function? onError,
-    void Function()? onDone,
-  }) {
-    _requestController.stream.listen(
-      onRequest,
-      onError: onError,
-      onDone: onDone,
-    );
+  void close() {
+    _notificationController.close();
   }
 
   @override
diff --git a/pkg/analysis_server/lib/src/socket_server.dart b/pkg/analysis_server/lib/src/socket_server.dart
index 59cafe6..8e938ef 100644
--- a/pkg/analysis_server/lib/src/socket_server.dart
+++ b/pkg/analysis_server/lib/src/socket_server.dart
@@ -62,7 +62,7 @@
       var error = RequestError(
           RequestErrorCode.SERVER_ALREADY_STARTED, 'Server already started');
       serverChannel.sendResponse(Response('', error: error));
-      serverChannel.listen((Request request) {
+      serverChannel.requests.listen((Request request) {
         serverChannel.sendResponse(Response(request.id, error: error));
       });
       return;
diff --git a/pkg/analysis_server/lib/src/utilities/mocks.dart b/pkg/analysis_server/lib/src/utilities/mocks.dart
index 9c581cd..bef5102 100644
--- a/pkg/analysis_server/lib/src/utilities/mocks.dart
+++ b/pkg/analysis_server/lib/src/utilities/mocks.dart
@@ -36,6 +36,9 @@
   MockServerChannel();
 
   @override
+  Stream<Request> get requests => requestController.stream;
+
+  @override
   void close() {
     _closed = true;
   }
@@ -46,13 +49,6 @@
   }
 
   @override
-  void listen(void Function(Request request) onRequest,
-      {Function? onError, void Function()? onDone}) {
-    requestController.stream
-        .listen(onRequest, onError: onError, onDone: onDone);
-  }
-
-  @override
   void sendNotification(Notification notification) {
     // Don't deliver notifications after the connection is closed.
     if (_closed) {
diff --git a/pkg/analysis_server/test/channel/byte_stream_channel_test.dart b/pkg/analysis_server/test/channel/byte_stream_channel_test.dart
index 0724bfe..d89741d 100644
--- a/pkg/analysis_server/test/channel/byte_stream_channel_test.dart
+++ b/pkg/analysis_server/test/channel/byte_stream_channel_test.dart
@@ -135,7 +135,7 @@
     errorStream = errorStreamController.stream;
     var doneCompleter = Completer();
     doneFuture = doneCompleter.future;
-    channel.listen((Request request) {
+    channel.requests.listen((Request request) {
       requestStreamController.add(request);
     }, onError: (error) {
       errorStreamController.add(error);
diff --git a/pkg/analysis_server/test/src/plugin/notification_manager_test.dart b/pkg/analysis_server/test/src/plugin/notification_manager_test.dart
index 0f7483d..5504afb 100644
--- a/pkg/analysis_server/test/src/plugin/notification_manager_test.dart
+++ b/pkg/analysis_server/test/src/plugin/notification_manager_test.dart
@@ -468,23 +468,10 @@
   server.Notification? sentNotification;
 
   @override
-  void close() {
-    fail('Unexpected invocation of close');
-  }
-
-  @override
-  void listen(void Function(server.Request) onRequest,
-      {Function? onError, void Function()? onDone}) {
-    fail('Unexpected invocation of listen');
-  }
+  dynamic noSuchMethod(Invocation invocation) => super.noSuchMethod(invocation);
 
   @override
   void sendNotification(server.Notification notification) {
     sentNotification = notification;
   }
-
-  @override
-  void sendResponse(server.Response response) {
-    fail('Unexpected invocation of sendResponse');
-  }
 }