// Copyright (c) 2017, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:collection';
import 'dart:io';

import '../constants.dart';
import '../worker_protocol.pb.dart';
import 'driver_connection.dart';

typedef Future<Process> SpawnWorker();

/// A driver for talking to a bazel worker.
///
/// This allows you to use any binary that supports the bazel worker protocol in
/// the same way that bazel would, but from another dart process instead.
class BazelWorkerDriver {
  /// Idle worker processes.
  final _idleWorkers = <Process>[];

  /// The maximum number of idle workers at any given time.
  final int _maxIdleWorkers;

  /// The maximum number of times to retry a [WorkAttempt] if there is an error.
  final int _maxRetries;

  /// The maximum number of concurrent workers to run at any given time.
  final int _maxWorkers;

  /// The number of currently active workers.
  int get _numWorkers => _readyWorkers.length + _spawningWorkers.length;

  /// All workers that are fully spawned and ready to handle work.
  final _readyWorkers = <Process>[];

  /// All workers that are in the process of being spawned.
  final _spawningWorkers = <Future<Process>>[];

  /// Work requests that haven't been started yet.
  final _workQueue = new Queue<_WorkAttempt>();

  /// Factory method that spawns a worker process.
  final SpawnWorker _spawnWorker;

  BazelWorkerDriver(this._spawnWorker,
      {int maxIdleWorkers, int maxWorkers, int maxRetries})
      : this._maxIdleWorkers = maxIdleWorkers ?? 4,
        this._maxWorkers = maxWorkers ?? 4,
        this._maxRetries = maxRetries ?? 4;

  /// Waits for an available worker, and then sends [WorkRequest] to it.
  ///
  /// If [trackWork] is provided it will be invoked with a [Future] once the
  /// [request] has been actually sent to the worker. This allows the caller
  /// to determine when actual work is being done versus just waiting for an
  /// available worker.
  Future<WorkResponse> doWork(WorkRequest request,
      {Function(Future<WorkResponse>) trackWork}) {
    var attempt = new _WorkAttempt(request, trackWork: trackWork);
    _workQueue.add(attempt);
    _runWorkQueue();
    return attempt.response;
  }

  /// Calls `kill` on all worker processes.
  Future terminateWorkers() async {
    for (var worker in _readyWorkers.toList()) {
      _killWorker(worker);
    }
    await Future.wait(_spawningWorkers.map((worker) async {
      _killWorker(await worker);
    }));
  }

  /// Runs as many items in [_workQueue] as possible given the number of
  /// available workers.
  ///
  /// Will spawn additional workers until [_maxWorkers] has been reached.
  ///
  /// This method synchronously drains the [_workQueue] and [_idleWorkers], but
  /// some tasks may not actually start right away if they need to wait for a
  /// worker to spin up.
  void _runWorkQueue() {
    // Bail out conditions, we will continue to call ourselves indefinitely
    // until one of these is met.
    if (_workQueue.isEmpty) return;
    if (_numWorkers == _maxWorkers && _idleWorkers.isEmpty) return;
    if (_numWorkers > _maxWorkers) {
      throw new StateError('Internal error, created to many workers. Please '
          'file a bug at https://github.com/dart-lang/bazel_worker/issues/new');
    }

    // At this point we definitely want to run a task, we just need to decide
    // whether or not we need to start up a new worker.
    var attempt = _workQueue.removeFirst();
    if (_idleWorkers.isNotEmpty) {
      _runWorker(_idleWorkers.removeLast(), attempt);
    } else {
      // No need to block here, we want to continue to synchronously drain the
      // work queue.
      var futureWorker = _spawnWorker();
      _spawningWorkers.add(futureWorker);
      futureWorker.then((worker) {
        _spawningWorkers.remove(futureWorker);
        _readyWorkers.add(worker);

        _workerConnections[worker] = new StdDriverConnection.forWorker(worker);
        _runWorker(worker, attempt);

        // When the worker exits we should retry running the work queue in case
        // there is more work to be done. This is primarily just a defensive
        // thing but is cheap to do.
        worker.exitCode.then((exitCode) {
          _idleWorkers.remove(worker);
          _readyWorkers.remove(worker);
          _runWorkQueue();
        });
      });
    }
    // Recursively calls itself until one of the bail out conditions are met.
    _runWorkQueue();
  }

  /// Sends [request] to [worker].
  ///
  /// Once the worker responds then it will be added back to the pool of idle
  /// workers.
  void _runWorker(Process worker, _WorkAttempt attempt) {
    bool rescheduled = false;

    runZoned(() async {
      var connection = _workerConnections[worker];

      connection.writeRequest(attempt.request);
      var responseFuture = connection.readResponse();
      if (attempt.trackWork != null) {
        attempt.trackWork(responseFuture);
      }
      var response = await responseFuture;

      // It is possible for us to complete with an error response due to an
      // unhandled async error before we get here.
      if (!attempt.responseCompleter.isCompleted) {
        if (response == null) {
          rescheduled = _tryReschedule(attempt);
          if (rescheduled) return;
          stderr.writeln('Failed to run request ${attempt.request}');
          response = new WorkResponse()
            ..exitCode = EXIT_CODE_ERROR
            ..output =
                'Invalid response from worker, this probably means it wrote '
                'invalid output or died.';
        }
        attempt.responseCompleter.complete(response);
        _cleanUp(worker);
      }
    }, onError: (e, s) {
      // Note that we don't need to do additional cleanup here on failures. If
      // the worker dies that is already handled in a generic fashion, we just
      // need to make sure we complete with a valid response.
      if (!attempt.responseCompleter.isCompleted) {
        rescheduled = _tryReschedule(attempt);
        if (rescheduled) return;
        var response = new WorkResponse()
          ..exitCode = EXIT_CODE_ERROR
          ..output = 'Error running worker:\n$e\n$s';
        attempt.responseCompleter.complete(response);
        _cleanUp(worker);
      }
    });
  }

  /// Performs post-work cleanup for [worker].
  void _cleanUp(Process worker) {
    // If the worker crashes, it won't be in `_readyWorkers` any more, and
    // we don't want to add it to _idleWorkers.
    if (_readyWorkers.contains(worker)) {
      _idleWorkers.add(worker);
    }

    // Do additional work if available.
    _runWorkQueue();

    // If the worker wasn't immediately used we might have to many idle
    // workers now, kill one if necessary.
    if (_idleWorkers.length > _maxIdleWorkers) {
      // Note that whenever we spawn a worker we listen for its exit code
      // and clean it up so we don't need to do that here.
      var worker = _idleWorkers.removeLast();
      _killWorker(worker);
    }
  }

  /// Attempts to reschedule a failed [attempt].
  ///
  /// Returns whether or not the job was successfully rescheduled.
  bool _tryReschedule(_WorkAttempt attempt) {
    if (attempt.timesRetried >= _maxRetries) return false;
    stderr.writeln('Rescheduling failed request...');
    attempt.timesRetried++;
    _workQueue.add(attempt);
    _runWorkQueue();
    return true;
  }

  void _killWorker(Process worker) {
    _workerConnections[worker].cancel();
    _readyWorkers.remove(worker);
    _idleWorkers.remove(worker);
    worker.kill();
  }
}

/// Encapsulates an attempt to fulfill a [WorkRequest], a completer for the
/// [WorkResponse], and the number of times it has been retried.
class _WorkAttempt {
  final WorkRequest request;
  final responseCompleter = new Completer<WorkResponse>();
  final Function(Future<WorkResponse>) trackWork;

  Future<WorkResponse> get response => responseCompleter.future;

  int timesRetried = 0;

  _WorkAttempt(this.request, {this.trackWork});
}

final _workerConnections = new Expando<DriverConnection>('connection');
