blob: cfc6a7b78c1565e82bad64a062fba839402d9010 [file] [log] [blame]
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// Library for spawning cquery process and communicating with it.
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:math' as math;
/// Helper class encapsulating communication with the cquery process.
///
/// cquery communicates using jsonrpc via stdin/stdout streams.
class CqueryDriver {
final Process _cquery;
final _pendingRequests = <int, Completer<dynamic>>{};
var _requestId = 1;
final _progressStreamController = StreamController<int>.broadcast();
CqueryDriver._(this._cquery) {
_cquery.stdout
.transform(utf8.decoder)
.transform(_JsonRpcParser.transformer)
.listen(_handleMessage);
_cquery.stderr
.transform(utf8.decoder)
.listen((data) => print('stderr: ${data}'));
}
static Future<CqueryDriver> start(String cqueryBinary) async {
final cquery = await Process.start(cqueryBinary, ['--language-server']);
return CqueryDriver._(cquery);
}
/// Stream of progress notifications from cquery. Indicate how many files
/// are currently pending indexing.
Stream<int> get progress => _progressStreamController.stream;
Future<int> get exitCode => _cquery.exitCode;
/// Send the given message to the cquery process.
void _sendMessage(Map<String, dynamic> data) {
data['jsonrpc'] = '2.0';
final rq = jsonEncode(data);
_cquery.stdin.write('Content-Length: ${rq.length}\r\n\r\n${rq}');
}
/// Send a notification message to the cquery process.
void notify(String method, {Map<String, dynamic> params}) {
final rq = <String, dynamic>{'method': method};
if (params != null) rq['params'] = params;
_sendMessage(rq);
}
/// Send a request message to the cquery process.
Future<dynamic> request(String method, {Map<String, dynamic> params}) {
final rq = <String, dynamic>{'method': method, 'id': _requestId++};
_pendingRequests[rq['id']] = Completer();
if (params != null) rq['params'] = params;
_sendMessage(rq);
return _pendingRequests[rq['id']].future;
}
/// Handle message received from cquery process.
void _handleMessage(Map<String, dynamic> data) {
final method = data['method'];
// If it is a progress notification issue progress event.
if (method == r'$cquery/progress') {
_progressStreamController.add(data['params']['indexRequestCount']);
}
// Otherwise check if it is a response to one of our requests and complete
// corresponding future if it is.
if (data.containsKey('id') && data.containsKey('result')) {
final id = data['id'];
final result = data['result'];
_pendingRequests[id].complete(result);
_pendingRequests[id] = null;
return;
}
}
}
/// Simple parser for jsonrpc protocol over arbitrary chunked stream.
class _JsonRpcParser {
/// Accumulator for the message content.
final StringBuffer content = StringBuffer();
/// Number of bytes left to read in the current message.
int pendingContentLength = 0;
/// Auxiliary variable to store various state between invocations of
/// [state] callback.
int index = 0;
/// Position inside incomming chunk of data.
int pos = 0;
/// Current parser state.
Function state = _JsonRpcParser.readHeader;
/// Callback to invoke when we finish parsing complete message.
void Function(Map<String, dynamic>) onMessage;
_JsonRpcParser({this.onMessage});
/// StreamTransformer wrapping _JsonRpcParser.
static StreamTransformer<String, Map<String, dynamic>> get transformer =>
StreamTransformer.fromBind((Stream<String> s) {
final output = StreamController<Map<String, dynamic>>();
final p = _JsonRpcParser(onMessage: output.add);
s.listen(p.addChunk);
return output.stream;
});
/// Parse the chunk of data.
void addChunk(String data) {
pos = 0;
while (pos < data.length) {
state = state(this, data);
}
}
/// Parsing state: waiting for 'Content-Length' header.
static Function readHeader(_JsonRpcParser p, String data) {
final codeUnit = data.codeUnitAt(p.pos++);
if (HEADER.codeUnitAt(p.index) != codeUnit) {
throw 'Unexpected codeUnit: ${String.fromCharCode(codeUnit)} expected ${HEADER[p.index]}';
}
p.index++;
if (p.index == HEADER.length) {
p.index = 0;
return _JsonRpcParser.readLength;
}
return _JsonRpcParser.readHeader;
}
/// Parsing state: parsing content length value.
static Function readLength(_JsonRpcParser p, String data) {
final codeUnit = data.codeUnitAt(p.pos++);
if (codeUnit == CR) {
p.pendingContentLength = p.index;
p.index = 0;
return _JsonRpcParser.readHeaderEnd;
}
if (codeUnit < CH0 || codeUnit > CH9) {
throw 'Unexpected codeUnit: ${String.fromCharCode(codeUnit)} expected 0 to 9';
}
p.index = p.index * 10 + (codeUnit - CH0);
return _JsonRpcParser.readLength;
}
/// Parsing state: content length was read, skipping line breaks before
/// content start.
static Function readHeaderEnd(_JsonRpcParser p, String data) {
final codeUnit = data.codeUnitAt(p.pos++);
if (HEADER_END.codeUnitAt(p.index) != codeUnit) {
throw 'Unexpected codeUnit: ${String.fromCharCode(codeUnit)} expected ${HEADER_END[p.index]}';
}
p.index++;
if (p.index == HEADER_END.length) {
return _JsonRpcParser.readContent;
}
return _JsonRpcParser.readHeaderEnd;
}
/// Parsing state: reading message content.
static Function readContent(_JsonRpcParser p, String data) {
final availableBytes = data.length - p.pos;
final bytesToRead = math.min(availableBytes, p.pendingContentLength);
p.content.write(data.substring(p.pos, p.pos + bytesToRead));
p.pendingContentLength -= bytesToRead;
p.pos += bytesToRead;
if (p.pendingContentLength == 0) {
p.onMessage(jsonDecode(p.content.toString()));
p.content.clear();
p.index = 0;
return _JsonRpcParser.readHeader;
} else {
return _JsonRpcParser.readContent;
}
}
static const HEADER = 'Content-Length: ';
static const HEADER_END = '\n\r\n';
static const CH0 = 48;
static const CH9 = 57;
static const CR = 13;
static const LF = 10;
}