// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:convert';
import 'dart:io';

import 'package:analysis_server/protocol/protocol.dart';
import 'package:analysis_server/src/channel/channel.dart';
import 'package:analysis_server/src/utilities/request_statistics.dart';
import 'package:analyzer/instrumentation/instrumentation.dart';

/// Instances of the class [ByteStreamClientChannel] implement a
/// [ClientCommunicationChannel] that uses a stream and a sink (typically,
/// standard input and standard output) to communicate with servers.
class ByteStreamClientChannel implements ClientCommunicationChannel {
  final IOSink output;

  @override
  Stream<Response> responseStream;

  @override
  Stream<Notification> notificationStream;

  factory ByteStreamClientChannel(Stream<List<int>> input, IOSink output) {
    var jsonStream = input
        .transform(const Utf8Decoder())
        .transform(LineSplitter())
        .transform(JsonStreamDecoder())
        .where((json) => json is Map<String, Object?>)
        .cast<Map<String, Object?>>()
        .asBroadcastStream();
    var responseStream = jsonStream
        .where((json) => json[Notification.EVENT] == null)
        .transform(ResponseConverter())
        .where((response) => response != null)
        .cast<Response>()
        .asBroadcastStream();
    var notificationStream = jsonStream
        .where((json) => json[Notification.EVENT] != null)
        .transform(NotificationConverter())
        .asBroadcastStream();
    return ByteStreamClientChannel._(
      output,
      responseStream,
      notificationStream,
    );
  }

  ByteStreamClientChannel._(
    this.output,
    this.responseStream,
    this.notificationStream,
  );

  @override
  Future close() {
    return output.close();
  }

  @override
  Future<Response> sendRequest(Request request) async {
    var id = request.id;
    output.write(json.encode(request.toJson()) + '\n');
    return await responseStream
        .firstWhere((Response response) => response.id == id);
  }
}

/// Instances of the class [ByteStreamServerChannel] implement a
/// [ServerCommunicationChannel] that uses a stream and a sink (typically,
/// standard input and standard output) to communicate with clients.
class ByteStreamServerChannel implements ServerCommunicationChannel {
  final Stream _input;

  final IOSink _output;

  /// The instrumentation service that is to be used by this analysis server.
  final InstrumentationService _instrumentationService;

  /// The helper for recording request / response statistics.
  final RequestStatisticsHelper? _requestStatistics;

  /// Completer that will be signalled when the input stream is closed.
  final Completer _closed = Completer();

  /// True if [close] has been called.
  bool _closeRequested = false;

  ByteStreamServerChannel(
      this._input, this._output, this._instrumentationService,
      {RequestStatisticsHelper? requestStatistics})
      : _requestStatistics = requestStatistics {
    _requestStatistics?.serverChannel = this;
  }

  /// Future that will be completed when the input stream is closed.
  Future get closed {
    return _closed.future;
  }

  @override
  void close() {
    if (!_closeRequested) {
      _closeRequested = true;
      assert(!_closed.isCompleted);
      _closed.complete();
    }
  }

  @override
  void listen(void Function(Request request) onRequest,
      {Function? onError, void Function()? onDone}) {
    _input.transform(const Utf8Decoder()).transform(LineSplitter()).listen(
        (String data) => _readRequest(data, onRequest),
        onError: onError, onDone: () {
      close();
      onDone?.call();
    });
  }

  @override
  void sendNotification(Notification notification) {
    // Don't send any further notifications after the communication channel is
    // closed.
    if (_closeRequested) {
      return;
    }
    var jsonEncoding = json.encode(notification.toJson());
    _outputLine(jsonEncoding);
    if (!identical(notification.event, 'server.log')) {
      _instrumentationService.logNotification(jsonEncoding);
      _requestStatistics?.logNotification(notification);
    }
  }

  @override
  void sendResponse(Response response) {
    // Don't send any further responses after the communication channel is
    // closed.
    if (_closeRequested) {
      return;
    }
    _requestStatistics?.addResponse(response);
    var jsonEncoding = json.encode(response.toJson());
    _outputLine(jsonEncoding);
    _instrumentationService.logResponse(jsonEncoding);
  }

  /// Send the string [s] to [_output] followed by a newline.
  void _outputLine(String s) {
    runZonedGuarded(() {
      _output.writeln(s);
    }, (e, s) {
      close();
    });
  }

  /// Read a request from the given [data] and use the given function to handle
  /// the request.
  void _readRequest(String data, void Function(Request request) onRequest) {
    // Ignore any further requests after the communication channel is closed.
    if (_closed.isCompleted) {
      return;
    }
    _instrumentationService.logRequest(data);
    // Parse the string as a JSON descriptor and process the resulting
    // structure as a request.
    var request = Request.fromString(data);
    if (request == null) {
      sendResponse(Response.invalidRequestFormat());
      return;
    }
    _requestStatistics?.addRequest(request);
    onRequest(request);
  }
}
