// Copyright (c) 2019, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:convert';
import 'dart:js_interop';

import 'package:logging/logging.dart';
import 'package:pool/pool.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:web/web.dart';

import '../src/util/uuid.dart';

/// Limit for the number of concurrent outgoing requests.
///
/// Chrome drops outgoing requests on the floor after some threshold. To prevent
/// these errors we buffer outgoing requests with a pool.
///
/// Note Chrome's limit is 6000. So this gives us plenty of headroom.
final _requestPool = Pool(1000);

/// A client for bi-directional sse communication.
///
/// The client can send any JSON-encodable messages to the server by adding
/// them to the [sink] and listen to messages from the server on the [stream].
class SseClient extends StreamChannelMixin<String?> {
  final String _clientId;

  final _incomingController = StreamController<String>();

  final _outgoingController = StreamController<String>();

  final _logger = Logger('SseClient');

  final _onConnected = Completer<void>();

  int _lastMessageId = -1;

  late EventSource _eventSource;

  late String _serverUrl;

  Timer? _errorTimer;

  /// [serverUrl] is the URL under which the server is listening for
  /// incoming bi-directional SSE connections. [debugKey] is an optional key
  /// that can be used to identify the SSE connection.
  SseClient(String serverUrl, {String? debugKey})
      : _clientId = debugKey == null
            ? generateUuidV4()
            : '$debugKey-${generateUuidV4()}' {
    _serverUrl = '$serverUrl?sseClientId=$_clientId';
    _eventSource =
        EventSource(_serverUrl, EventSourceInit(withCredentials: true));
    _eventSource.onOpen.first.whenComplete(() {
      _onConnected.complete();
      _outgoingController.stream
          .listen(_onOutgoingMessage, onDone: _onOutgoingDone);
    });
    _eventSource.addEventListener('message', _onIncomingMessage.toJS);
    _eventSource.addEventListener('control', _onIncomingControlMessage.toJS);

    _eventSource.onOpen.listen((_) {
      _errorTimer?.cancel();
    });
    _eventSource.onError.listen((error) {
      if (!(_errorTimer?.isActive ?? false)) {
        // By default the SSE client uses keep-alive.
        // Allow for a retry to connect before giving up.
        _errorTimer = Timer(const Duration(seconds: 5), () {
          _closeWithError(error);
        });
      }
    });
  }

  @Deprecated('Use onConnected instead.')
  Stream<Event> get onOpen => _eventSource.onOpen;

  Future<void> get onConnected => _onConnected.future;

  /// Add messages to this [StreamSink] to send them to the server.
  ///
  /// The message added to the sink has to be JSON encodable. Messages that fail
  /// to encode will be logged through a [Logger].
  @override
  StreamSink<String> get sink => _outgoingController.sink;

  /// [Stream] of messages sent from the server to this client.
  ///
  /// A message is a decoded JSON object.
  @override
  Stream<String> get stream => _incomingController.stream;

  void close() {
    _eventSource.close();
    // If the initial connection was never established. Add a listener so close
    // adds a done event to [sink].
    if (!_onConnected.isCompleted) _outgoingController.stream.drain<void>();
    _incomingController.close();
    _outgoingController.close();
  }

  void _closeWithError(Object error) {
    _incomingController.addError(error);
    close();
    if (!_onConnected.isCompleted) {
      // This call must happen after the call to close() which checks
      // whether the completer was completed earlier.
      _onConnected.completeError(error);
    }
  }

  void _onIncomingControlMessage(Event message) {
    var data = (message as MessageEvent).data;
    if (data.dartify() == 'close') {
      close();
    } else {
      throw UnsupportedError('[$_clientId] Illegal Control Message "$data"');
    }
  }

  void _onIncomingMessage(Event message) {
    var decoded =
        jsonDecode(((message as MessageEvent).data as JSString).toDart);
    _incomingController.add(decoded as String);
  }

  void _onOutgoingDone() {
    close();
  }

  void _onOutgoingMessage(String? message) async {
    String? encodedMessage;
    await _requestPool.withResource(() async {
      try {
        encodedMessage = jsonEncode(message);
        // ignore: avoid_catching_errors
      } on JsonUnsupportedObjectError catch (e) {
        _logger.warning('[$_clientId] Unable to encode outgoing message: $e');
        // ignore: avoid_catching_errors
      } on ArgumentError catch (e) {
        _logger.warning('[$_clientId] Invalid argument: $e');
      }
      try {
        final url = '$_serverUrl&messageId=${++_lastMessageId}';
        await _fetch(
            url,
            RequestInit(
                method: 'POST',
                body: encodedMessage?.toJS,
                credentials: 'include'));
      } catch (error) {
        final augmentedError =
            '[$_clientId] SSE client failed to send $message:\n $error';
        _logger.severe(augmentedError);
        _closeWithError(augmentedError);
      }
    });
  }
}

Future<void> _fetch(String resourceUrl, RequestInit options) =>
    window.fetch(resourceUrl.toJS, options).toDart;
