// Copyright (c) 2017, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:collection';
import 'dart:io';

import '../constants.dart';
import '../worker_protocol.pb.dart';
import 'driver_connection.dart';

typedef SpawnWorker = Future<Process> Function();

/// A driver for talking to a bazel worker.
///
/// This allows you to use any binary that supports the bazel worker protocol in
/// the same way that bazel would, but from another dart process instead.
class BazelWorkerDriver {
  /// Idle worker processes.
  final _idleWorkers = <Process>[];

  /// The maximum number of idle workers at any given time.
  final int _maxIdleWorkers;

  /// The maximum number of times to retry a [_WorkAttempt] if there is an error.
  final int _maxRetries;

  /// The maximum number of concurrent workers to run at any given time.
  final int _maxWorkers;

  /// The number of currently active workers.
  int get _numWorkers => _readyWorkers.length + _spawningWorkers.length;

  /// All workers that are fully spawned and ready to handle work.
  final _readyWorkers = <Process>[];

  /// All workers that are in the process of being spawned.
  final _spawningWorkers = <Future<Process>>[];

  /// Work requests that haven't been started yet.
  final _workQueue = Queue<_WorkAttempt>();

  /// Factory method that spawns a worker process.
  final SpawnWorker _spawnWorker;

  BazelWorkerDriver(
    this._spawnWorker, {
    int? maxIdleWorkers,
    int? maxWorkers,
    int? maxRetries,
  })  : _maxIdleWorkers = maxIdleWorkers ?? 4,
        _maxWorkers = maxWorkers ?? 4,
        _maxRetries = maxRetries ?? 4;

  /// Waits for an available worker, and then sends [WorkRequest] to it.
  ///
  /// If [trackWork] is provided it will be invoked with a [Future] once the
  /// [request] has been actually sent to the worker. This allows the caller
  /// to determine when actual work is being done versus just waiting for an
  /// available worker.
  Future<WorkResponse> doWork(
    WorkRequest request, {
    void Function(Future<WorkResponse?>)? trackWork,
  }) {
    var attempt = _WorkAttempt(request, trackWork: trackWork);
    _workQueue.add(attempt);
    _runWorkQueue();
    return attempt.response;
  }

  /// Calls `kill` on all worker processes.
  Future terminateWorkers() async {
    for (var worker in _readyWorkers.toList()) {
      _killWorker(worker);
    }
    await Future.wait(
      _spawningWorkers.map((worker) async {
        _killWorker(await worker);
      }),
    );
  }

  /// Runs as many items in [_workQueue] as possible given the number of
  /// available workers.
  ///
  /// Will spawn additional workers until [_maxWorkers] has been reached.
  ///
  /// This method synchronously drains the [_workQueue] and [_idleWorkers], but
  /// some tasks may not actually start right away if they need to wait for a
  /// worker to spin up.
  void _runWorkQueue() {
    // Bail out conditions, we will continue to call ourselves indefinitely
    // until one of these is met.
    if (_workQueue.isEmpty) return;
    if (_numWorkers == _maxWorkers && _idleWorkers.isEmpty) return;
    if (_numWorkers > _maxWorkers) {
      throw StateError(
        'Internal error, created to many workers. Please '
        'file a bug at https://github.com/dart-lang/bazel_worker/issues/new',
      );
    }

    // At this point we definitely want to run a task, we just need to decide
    // whether or not we need to start up a new worker.
    var attempt = _workQueue.removeFirst();
    if (_idleWorkers.isNotEmpty) {
      _runWorker(_idleWorkers.removeLast(), attempt);
    } else {
      // No need to block here, we want to continue to synchronously drain the
      // work queue.
      var futureWorker = _spawnWorker();
      _spawningWorkers.add(futureWorker);
      futureWorker.then((worker) {
        _spawningWorkers.remove(futureWorker);
        _readyWorkers.add(worker);
        var connection = StdDriverConnection.forWorker(worker);
        _workerConnections[worker] = connection;
        _runWorker(worker, attempt);

        // When the worker exits we should retry running the work queue in case
        // there is more work to be done. This is primarily just a defensive
        // thing but is cheap to do.
        //
        // We don't use `exitCode` because it is null for detached processes (
        // which is common for workers).
        connection.done.then((_) {
          _idleWorkers.remove(worker);
          _readyWorkers.remove(worker);
          _runWorkQueue();
        });
      }).onError<Object>((e, s) {
        _spawningWorkers.remove(futureWorker);
        if (attempt.responseCompleter.isCompleted) return;
        attempt.responseCompleter.completeError(e, s);
      });
    }
    // Recursively calls itself until one of the bail out conditions are met.
    _runWorkQueue();
  }

  /// Sends [attempt] to [worker].
  ///
  /// Once the worker responds then it will be added back to the pool of idle
  /// workers.
  void _runWorker(Process worker, _WorkAttempt attempt) {
    var rescheduled = false;

    runZonedGuarded(
      () async {
        var connection = _workerConnections[worker]!;

        connection.writeRequest(attempt.request);
        var responseFuture = connection.readResponse();
        if (attempt.trackWork != null) {
          attempt.trackWork!(responseFuture);
        }
        var response = await responseFuture;

        // It is possible for us to complete with an error response due to an
        // unhandled async error before we get here.
        if (!attempt.responseCompleter.isCompleted) {
          if (response.exitCode == EXIT_CODE_BROKEN_PIPE) {
            rescheduled = _tryReschedule(attempt);
            if (rescheduled) return;
            stderr.writeln('Failed to run request ${attempt.request}');
            response = WorkResponse(
              exitCode: EXIT_CODE_ERROR,
              output:
                  'Invalid response from worker, this probably means it wrote '
                  'invalid output or died.',
            );
          }
          attempt.responseCompleter.complete(response);
          _cleanUp(worker);
        }
      },
      (e, s) {
        // Note that we don't need to do additional cleanup here on failures. If
        // the worker dies that is already handled in a generic fashion, we just
        // need to make sure we complete with a valid response.
        if (!attempt.responseCompleter.isCompleted) {
          rescheduled = _tryReschedule(attempt);
          if (rescheduled) return;
          var response = WorkResponse(
            exitCode: EXIT_CODE_ERROR,
            output: 'Error running worker:\n$e\n$s',
          );
          attempt.responseCompleter.complete(response);
          _cleanUp(worker);
        }
      },
    );
  }

  /// Performs post-work cleanup for [worker].
  void _cleanUp(Process worker) {
    // If the worker crashes, it won't be in `_readyWorkers` any more, and
    // we don't want to add it to _idleWorkers.
    if (_readyWorkers.contains(worker)) {
      _idleWorkers.add(worker);
    }

    // Do additional work if available.
    _runWorkQueue();

    // If the worker wasn't immediately used we might have to many idle
    // workers now, kill one if necessary.
    if (_idleWorkers.length > _maxIdleWorkers) {
      // Note that whenever we spawn a worker we listen for its exit code
      // and clean it up so we don't need to do that here.
      var worker = _idleWorkers.removeLast();
      _killWorker(worker);
    }
  }

  /// Attempts to reschedule a failed [attempt].
  ///
  /// Returns whether or not the job was successfully rescheduled.
  bool _tryReschedule(_WorkAttempt attempt) {
    if (attempt.timesRetried >= _maxRetries) return false;
    stderr.writeln('Rescheduling failed request...');
    attempt.timesRetried++;
    _workQueue.add(attempt);
    _runWorkQueue();
    return true;
  }

  void _killWorker(Process worker) {
    _workerConnections[worker]!.cancel();
    _readyWorkers.remove(worker);
    _idleWorkers.remove(worker);
    worker.kill();
  }
}

/// Encapsulates an attempt to fulfill a [WorkRequest], a completer for the
/// [WorkResponse], and the number of times it has been retried.
class _WorkAttempt {
  final WorkRequest request;
  final responseCompleter = Completer<WorkResponse>();
  final void Function(Future<WorkResponse?>)? trackWork;

  Future<WorkResponse> get response => responseCompleter.future;

  int timesRetried = 0;

  _WorkAttempt(this.request, {this.trackWork});
}

final _workerConnections = Expando<DriverConnection>('connection');
