| // Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| import 'dart:async'; |
| import 'dart:convert'; |
| import 'dart:io'; |
| import 'dart:isolate'; |
| |
| import 'package:analysis_server/protocol/protocol.dart'; |
| import 'package:analysis_server/src/channel/channel.dart'; |
| import 'package:analysis_server/src/utilities/request_statistics.dart'; |
| import 'package:analyzer/instrumentation/instrumentation.dart'; |
| |
| /// Instances of the class [ByteStreamClientChannel] implement a |
| /// [ClientCommunicationChannel] that uses a stream and a sink (typically, |
| /// standard input and standard output) to communicate with servers. |
| class ByteStreamClientChannel implements ClientCommunicationChannel { |
| final IOSink output; |
| |
| @override |
| Stream<Response> responseStream; |
| |
| @override |
| Stream<Notification> notificationStream; |
| |
| factory ByteStreamClientChannel(Stream<List<int>> input, IOSink output) { |
| var jsonStream = |
| input |
| .transform(const Utf8Decoder()) |
| .transform(LineSplitter()) |
| .transform(JsonStreamDecoder()) |
| .where((json) => json is Map<String, Object?>) |
| .cast<Map<String, Object?>>() |
| .asBroadcastStream(); |
| var responseStream = |
| jsonStream |
| .where((json) => json[Notification.EVENT] == null) |
| .transform(ResponseConverter()) |
| .where((response) => response != null) |
| .cast<Response>() |
| .asBroadcastStream(); |
| var notificationStream = |
| jsonStream |
| .where((json) => json[Notification.EVENT] != null) |
| .transform(NotificationConverter()) |
| .asBroadcastStream(); |
| return ByteStreamClientChannel._( |
| output, |
| responseStream, |
| notificationStream, |
| ); |
| } |
| |
| ByteStreamClientChannel._( |
| this.output, |
| this.responseStream, |
| this.notificationStream, |
| ); |
| |
| @override |
| Future<void> close() { |
| return output.close(); |
| } |
| |
| @override |
| Future<Response> sendRequest(Request request) async { |
| var id = request.id; |
| output.write('${json.encode(request.toJson())}\n'); |
| return await responseStream.firstWhere( |
| (Response response) => response.id == id, |
| ); |
| } |
| } |
| |
| /// Instances of the subclasses of [ByteStreamServerChannel] implement a |
| /// [ServerCommunicationChannel] that uses a stream and a sink to communicate |
| /// with clients. |
| abstract class ByteStreamServerChannel implements ServerCommunicationChannel { |
| /// The instrumentation service that is to be used by this analysis server. |
| final InstrumentationService _instrumentationService; |
| |
| /// The helper for recording request / response statistics. |
| final RequestStatisticsHelper? _requestStatistics; |
| |
| /// Completer that will be signalled when the input stream is closed. |
| final Completer<void> _closed = Completer(); |
| |
| /// True if [close] has been called. |
| bool _closeRequested = false; |
| |
| @override |
| late final Stream<RequestOrResponse> requests = _lines.transform( |
| StreamTransformer.fromHandlers( |
| handleData: _readRequest, |
| handleDone: (sink) { |
| close(); |
| sink.close(); |
| }, |
| ), |
| ); |
| |
| ByteStreamServerChannel( |
| this._instrumentationService, { |
| RequestStatisticsHelper? requestStatistics, |
| }) : _requestStatistics = requestStatistics { |
| _requestStatistics?.serverChannel = this; |
| } |
| |
| /// Future that will be completed when the input stream is closed. |
| Future<void> get closed { |
| return _closed.future; |
| } |
| |
| Stream<String> get _lines; |
| |
| IOSink get _output; |
| |
| @override |
| void close() { |
| if (!_closeRequested) { |
| _closeRequested = true; |
| assert(!_closed.isCompleted); |
| _closed.complete(); |
| } |
| } |
| |
| @override |
| void sendNotification(Notification notification) { |
| // Don't send any further notifications after the communication channel is |
| // closed. |
| if (_closeRequested) { |
| return; |
| } |
| var jsonEncoding = json.encode(notification.toJson()); |
| _outputLine(jsonEncoding); |
| if (!identical(notification.event, 'server.log')) { |
| _instrumentationService.logNotification(jsonEncoding); |
| _requestStatistics?.logNotification(notification); |
| } |
| } |
| |
| @override |
| void sendRequest(Request request) { |
| // Don't send any further requests after the communication channel is |
| // closed. |
| if (_closeRequested) { |
| return; |
| } |
| var jsonEncoding = json.encode(request.toJson()); |
| _outputLine(jsonEncoding); |
| _instrumentationService.logRequest(jsonEncoding); |
| } |
| |
| @override |
| void sendResponse(Response response) { |
| // Don't send any further responses after the communication channel is |
| // closed. |
| if (_closeRequested) { |
| return; |
| } |
| _requestStatistics?.addResponse(response); |
| var jsonEncoding = json.encode(response.toJson()); |
| _outputLine(jsonEncoding); |
| _instrumentationService.logResponse(jsonEncoding); |
| } |
| |
| /// Send the string [s] to [_output] followed by a newline. |
| void _outputLine(String s) { |
| runZonedGuarded( |
| () { |
| _output.writeln(s); |
| }, |
| (e, s) { |
| close(); |
| }, |
| ); |
| } |
| |
| /// Read a request from the given [data] and use the given function to handle |
| /// the request. |
| void _readRequest(String data, Sink<RequestOrResponse> sink) { |
| // Ignore any further requests after the communication channel is closed. |
| if (_closed.isCompleted) { |
| return; |
| } |
| _instrumentationService.logRequest(data); |
| // Parse the string as a JSON descriptor and process the resulting |
| // structure as either a request or a response. |
| var requestOrResponse = |
| Request.fromString(data) ?? Response.fromString(data); |
| if (requestOrResponse == null) { |
| // If the data isn't valid, then assume it was an invalid request so that |
| // clients won't be left waiting for a response. |
| sendResponse(Response.invalidRequestFormat()); |
| return; |
| } |
| if (requestOrResponse is Request) { |
| _requestStatistics?.addRequest(requestOrResponse); |
| } |
| sink.add(requestOrResponse); |
| } |
| } |
| |
| /// Instances of the class of [InputOutputByteStreamServerChannel] implement a |
| /// [ServerCommunicationChannel] that communicate with clients via a |
| /// "byte stream" and a sink. |
| class InputOutputByteStreamServerChannel extends ByteStreamServerChannel { |
| final Stream<List<int>> _input; |
| |
| @override |
| final IOSink _output; |
| |
| @override |
| late final Stream<String> _lines = _input |
| .transform(const Utf8Decoder()) |
| .transform(const LineSplitter()); |
| |
| InputOutputByteStreamServerChannel( |
| this._input, |
| this._output, |
| super._instrumentationService, { |
| super.requestStatistics, |
| }); |
| } |
| |
| /// Communication channel that operates via stdin/stdout. |
| /// |
| /// Stdin communication is done via an isolate to speedup receiving data |
| /// (while we're busy performing other tasks in the main isolate), but with an |
| /// isolate already being used, it is used for transforming and splitting the |
| /// input into line strings as well. |
| class StdinStdoutLineStreamServerChannel extends ByteStreamServerChannel { |
| @override |
| final IOSink _output = stdout; |
| final ReceivePort _linesFromIsolate = ReceivePort(); |
| |
| @override |
| late final Stream<String> _lines = _linesFromIsolate.cast(); |
| |
| StdinStdoutLineStreamServerChannel( |
| super._instrumentationService, { |
| super.requestStatistics, |
| }) { |
| Isolate.spawn(_stdinInAnIsolate, _linesFromIsolate.sendPort); |
| } |
| |
| static void _stdinInAnIsolate(Object message) { |
| var replyTo = message as SendPort; |
| stdin |
| .transform(const Utf8Decoder()) |
| .transform(const LineSplitter()) |
| .listen(replyTo.send); |
| } |
| } |