// Copyright (c) 2020, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:json_rpc_2/json_rpc_2.dart' as json_rpc;
import 'package:meta/meta.dart';
import 'package:sse/server/sse_handler.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

import '../dds.dart';
import 'constants.dart';
import 'dds_impl.dart';
import 'rpc_error_codes.dart';
import 'stream_manager.dart';

/// Representation of a single DDS client which manages the connection and
/// DDS request intercepting / forwarding.
class DartDevelopmentServiceClient {
  DartDevelopmentServiceClient.fromWebSocket(
    DartDevelopmentService dds,
    WebSocketChannel ws,
    json_rpc.Peer vmServicePeer,
  ) : this._(
          dds as DartDevelopmentServiceImpl,
          ws,
          vmServicePeer,
        );

  DartDevelopmentServiceClient.fromSSEConnection(
    DartDevelopmentService dds,
    SseConnection sse,
    json_rpc.Peer vmServicePeer,
  ) : this._(
          dds as DartDevelopmentServiceImpl,
          sse,
          vmServicePeer,
        );

  DartDevelopmentServiceClient._(
    this.dds,
    this.connection,
    json_rpc.Peer vmServicePeer,
  ) : _vmServicePeer = vmServicePeer {
    _clientPeer = json_rpc.Peer(
      // Manually create a StreamChannel<String> instead of calling
      // .cast<String>() as cast() results in addStream() being called,
      // binding the underlying sink. This results in a StateError being thrown
      // if we try and add directly to the sink, which we do for binary events
      // in StreamManager's streamNotify().
      StreamChannel<String>(
        connection.stream.cast(),
        StreamController(sync: true)
          ..stream
              .cast()
              .listen((event) => connection.sink.add(event))
              .onDone(() => connection.sink.close()),
      ),
      strictProtocolChecks: false,
    );
    _registerJsonRpcMethods();
  }

  /// Start receiving JSON RPC requests from the client.
  ///
  /// Returned future completes when the peer is closed.
  Future<void> listen() => _clientPeer.listen().then(
        (_) => dds.streamManager.clientDisconnect(this),
      );

  /// Close the connection to the client.
  Future<void> close() async {
    // Cleanup the JSON RPC server for this connection if DDS has shutdown.
    await _clientPeer.close();
  }

  /// Send a JSON RPC notification to the client.
  void sendNotification(String method, [dynamic parameters]) {
    if (_clientPeer.isClosed) {
      return;
    }
    _clientPeer.sendNotification(method, parameters);
  }

  /// Send a JSON RPC request to the client.
  Future<dynamic> sendRequest(String method, [dynamic parameters]) async {
    if (_clientPeer.isClosed) {
      return null;
    }
    return await _clientPeer.sendRequest(method, parameters);
  }

  /// Registers handlers for JSON RPC methods which need to be intercepted by
  /// DDS as well as fallback request forwarder.
  void _registerJsonRpcMethods() {
    _clientPeer.registerMethod('streamListen', (parameters) async {
      final streamId = parameters['streamId'].asString;
      await dds.streamManager.streamListen(this, streamId);
      return RPCResponses.success;
    });

    _clientPeer.registerMethod('streamCancel', (parameters) async {
      final streamId = parameters['streamId'].asString;
      await dds.streamManager.streamCancel(this, streamId);
      return RPCResponses.success;
    });

    _clientPeer.registerMethod('registerService', (parameters) async {
      final serviceId = parameters['service'].asString;
      final alias = parameters['alias'].asString;
      if (services.containsKey(serviceId)) {
        throw RpcErrorCodes.buildRpcException(
          RpcErrorCodes.kServiceAlreadyRegistered,
        );
      }
      services[serviceId] = alias;
      // Notify other clients that a new service extension is available.
      dds.streamManager.sendServiceRegisteredEvent(
        this,
        serviceId,
        alias,
      );
      return RPCResponses.success;
    });

    _clientPeer.registerMethod(
      'getClientName',
      (parameters) => {'type': 'ClientName', 'name': name},
    );

    _clientPeer.registerMethod(
      'setClientName',
      (parameters) => dds.clientManager.setClientName(this, parameters),
    );

    _clientPeer.registerMethod(
      'requirePermissionToResume',
      (parameters) =>
          dds.clientManager.requirePermissionToResume(this, parameters),
    );

    _clientPeer.registerMethod(
      'resume',
      (parameters) => dds.isolateManager.resumeIsolate(this, parameters),
    );

    _clientPeer.registerMethod('getStreamHistory', (parameters) {
      final stream = parameters['stream'].asString;
      final events = dds.streamManager.getStreamHistory(stream);
      if (events == null) {
        throw json_rpc.RpcException.invalidParams(
          "Event history is not collected for stream '$stream'",
        );
      }
      return <String, dynamic>{
        'type': 'StreamHistory',
        'history': events,
      };
    });

    _clientPeer.registerMethod(
        'getLogHistorySize',
        (parameters) => {
              'type': 'Size',
              'size': StreamManager
                  .loggingRepositories[StreamManager.kLoggingStream]!
                  .bufferSize,
            });

    _clientPeer.registerMethod('setLogHistorySize', (parameters) {
      final size = parameters['size'].asInt;
      if (size < 0) {
        throw json_rpc.RpcException.invalidParams(
          "'size' must be greater or equal to zero",
        );
      }
      StreamManager.loggingRepositories[StreamManager.kLoggingStream]!
          .resize(size);
      return RPCResponses.success;
    });

    _clientPeer.registerMethod('getDartDevelopmentServiceVersion',
        (parameters) async {
      final ddsVersion = DartDevelopmentService.protocolVersion.split('.');
      return <String, dynamic>{
        'type': 'Version',
        'major': int.parse(ddsVersion[0]),
        'minor': int.parse(ddsVersion[1]),
      };
    });

    _clientPeer.registerMethod('getSupportedProtocols', (parameters) async {
      final Map<String, dynamic> supportedProtocols = (await _vmServicePeer
          .sendRequest('getSupportedProtocols')) as Map<String, dynamic>;
      final ddsVersion = DartDevelopmentService.protocolVersion.split('.');
      final ddsProtocol = {
        'protocolName': 'DDS',
        'major': int.parse(ddsVersion[0]),
        'minor': int.parse(ddsVersion[1]),
      };
      supportedProtocols['protocols']
          .cast<Map<String, dynamic>>()
          .add(ddsProtocol);
      return supportedProtocols;
    });

    _clientPeer.registerMethod(
      'getAvailableCachedCpuSamples',
      (_) => {
        'type': 'AvailableCachedCpuSamples',
        'cacheNames': dds.cachedUserTags,
      },
    );

    _clientPeer.registerMethod(
      'getCachedCpuSamples',
      dds.isolateManager.getCachedCpuSamples,
    );

    // `evaluate` and `evaluateInFrame` actually consist of multiple RPC
    // invocations, including a call to `compileExpression` which can be
    // overridden by clients which provide their own implementation (e.g.,
    // Flutter Tools). We handle all of this in [_ExpressionEvaluator].
    _clientPeer.registerMethod(
      'evaluate',
      dds.expressionEvaluator.execute,
    );
    _clientPeer.registerMethod(
      'evaluateInFrame',
      dds.expressionEvaluator.execute,
    );

    // When invoked within a fallback, the next fallback will start executing.
    // The final fallback forwards the request to the VM service directly.
    @alwaysThrows
    nextFallback() => throw json_rpc.RpcException.methodNotFound('');

    // Handle service extension invocations.
    _clientPeer.registerFallback((parameters) async {
      hasNamespace(String method) => method.contains('.');
      getMethod(String method) => method.split('.').last;
      getNamespace(String method) => method.split('.').first;
      if (!hasNamespace(parameters.method)) {
        nextFallback();
      }
      // Lookup the client associated with the service extension's namespace.
      // If the client exists and that client has registered the specified
      // method, forward the request to that client.
      final method = getMethod(parameters.method);
      final namespace = getNamespace(parameters.method);
      final serviceClient = dds.clientManager.clients[namespace];
      if (serviceClient != null && serviceClient.services.containsKey(method)) {
        return await Future.any(
          [
            // Forward the request to the service client or...
            serviceClient.sendRequest(method, parameters.asMap).catchError((_) {
              throw RpcErrorCodes.buildRpcException(
                RpcErrorCodes.kServiceDisappeared,
              );
            }, test: (error) => error is StateError),
            // if the service client closes, return an error response.
            serviceClient._clientPeer.done.then(
              (_) => throw RpcErrorCodes.buildRpcException(
                RpcErrorCodes.kServiceDisappeared,
              ),
            ),
          ],
        );
      }
      throw json_rpc.RpcException(
        RpcErrorCodes.kMethodNotFound,
        'Unknown service: ${parameters.method}',
      );
    });

    // Unless otherwise specified, the request is forwarded to the VM service.
    // NOTE: This must be the last fallback registered.
    _clientPeer.registerFallback((parameters) async {
      // If _vmServicePeer closes in the middle of a request, this will throw
      // a StateError. Listeners in dds_impl.dart will handle shutting down the
      // DDS instance, so we don't try and handle the error here.
      try {
        return await _vmServicePeer.sendRequest(
          parameters.method,
          parameters.value,
        );
      } on StateError {
        throw RpcErrorCodes.buildRpcException(
          RpcErrorCodes.kServiceDisappeared,
        );
      }
    });
  }

  static int _idCounter = 0;
  final int _id = ++_idCounter;

  /// The name given to the client upon its creation.
  String get defaultClientName => 'client$_id';

  /// The current name associated with this client.
  String? get name => _name;

  // NOTE: this should not be called directly except from:
  //   - `ClientManager._clearClientName`
  //   - `ClientManager._setClientNameHelper`
  set name(String? n) => _name = n ?? defaultClientName;
  String? _name;

  final DartDevelopmentServiceImpl dds;
  final StreamChannel connection;
  final Map<String, String> services = {};
  final json_rpc.Peer _vmServicePeer;
  late json_rpc.Peer _clientPeer;
}
