// Copyright (c) 2017, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:collection';
import 'dart:io';

import 'package:bazel_worker/bazel_worker.dart';
import 'package:bazel_worker/driver.dart';
import 'package:test/test.dart';

void main() {
  BazelWorkerDriver? driver;
  final disconnectedResponse = WorkResponse(
    exitCode: EXIT_CODE_BROKEN_PIPE,
    output: 'Connection closed',
  )..freeze();

  group('basic driver', () {
    test('can run a single request', () async {
      await _doRequests(count: 1);
      await _doRequests(count: 1);
    });

    test('can run multiple batches of requests through multiple workers',
        () async {
      var maxWorkers = 4;
      var maxIdleWorkers = 2;
      driver = BazelWorkerDriver(MockWorker.spawn,
          maxWorkers: maxWorkers, maxIdleWorkers: maxIdleWorkers);
      for (var i = 0; i < 10; i++) {
        await _doRequests(driver: driver);
        expect(MockWorker.liveWorkers.length, maxIdleWorkers);
        // No workers should be killed while there is ongoing work, but they
        // should be cleaned up once there isn't any more work to do.
        expect(MockWorker.deadWorkers.length,
            (maxWorkers - maxIdleWorkers) * (i + 1));
      }
    });

    test('can run multiple requests through one worker', () async {
      var maxWorkers = 1;
      var maxIdleWorkers = 1;
      driver = BazelWorkerDriver(MockWorker.spawn,
          maxWorkers: maxWorkers, maxIdleWorkers: maxIdleWorkers);
      for (var i = 0; i < 10; i++) {
        await _doRequests(driver: driver);
        expect(MockWorker.liveWorkers.length, 1);
        expect(MockWorker.deadWorkers.length, 0);
      }
    });

    test('can run one request through multiple workers', () async {
      driver =
          BazelWorkerDriver(MockWorker.spawn, maxWorkers: 4, maxIdleWorkers: 4);
      for (var i = 0; i < 10; i++) {
        await _doRequests(driver: driver, count: 1);
        expect(MockWorker.liveWorkers.length, 1);
        expect(MockWorker.deadWorkers.length, 0);
      }
    });

    test('can run with maxIdleWorkers == 0', () async {
      var maxWorkers = 4;
      driver = BazelWorkerDriver(MockWorker.spawn,
          maxWorkers: maxWorkers, maxIdleWorkers: 0);
      for (var i = 0; i < 10; i++) {
        await _doRequests(driver: driver);
        expect(MockWorker.liveWorkers.length, 0);
        expect(MockWorker.deadWorkers.length, maxWorkers * (i + 1));
      }
    });

    test('trackWork gets invoked when a worker is actually ready', () async {
      var maxWorkers = 2;
      driver = BazelWorkerDriver(MockWorker.spawn, maxWorkers: maxWorkers);
      var tracking = <Future>[];
      await _doRequests(
          driver: driver,
          count: 10,
          trackWork: (Future response) {
            // We should never be tracking more than `maxWorkers` jobs at a time.
            expect(tracking.length, lessThan(maxWorkers));
            tracking.add(response);
            response.then((_) => tracking.remove(response));
          });
    });

    group('failing workers', () {
      /// A driver which spawns [numBadWorkers] failing workers and then good
      /// ones after that, and which will retry [maxRetries] times.
      void createDriver({int maxRetries = 2, int numBadWorkers = 2}) {
        var numSpawned = 0;
        driver = BazelWorkerDriver(
            () async => MockWorker(workerLoopFactory: (MockWorker worker) {
                  var connection = StdAsyncWorkerConnection(
                      inputStream: worker._stdinController.stream,
                      outputStream: worker._stdoutController.sink);
                  if (numSpawned < numBadWorkers) {
                    numSpawned++;
                    return ThrowingMockWorkerLoop(
                        worker, MockWorker.responseQueue, connection);
                  } else {
                    return MockWorkerLoop(MockWorker.responseQueue,
                        connection: connection);
                  }
                }),
            maxRetries: maxRetries);
      }

      test('should retry up to maxRetries times', () async {
        createDriver();
        var expectedResponse = WorkResponse();
        MockWorker.responseQueue.addAll(
            [disconnectedResponse, disconnectedResponse, expectedResponse]);
        var actualResponse = await driver!.doWork(WorkRequest());
        // The first 2 null responses are thrown away, and we should get the
        // third one.
        expect(actualResponse, expectedResponse);

        expect(MockWorker.deadWorkers.length, 2);
        expect(MockWorker.liveWorkers.length, 1);
      });

      test('should fail if it exceeds maxRetries failures', () async {
        createDriver(maxRetries: 2, numBadWorkers: 3);
        MockWorker.responseQueue.addAll(
            [disconnectedResponse, disconnectedResponse, WorkResponse()]);
        var actualResponse = await driver!.doWork(WorkRequest());
        // Should actually get a bad response.
        expect(actualResponse.exitCode, 15);
        expect(
            actualResponse.output,
            'Invalid response from worker, this probably means it wrote '
            'invalid output or died.');

        expect(MockWorker.deadWorkers.length, 3);
      });
    });

    tearDown(() async {
      await driver?.terminateWorkers();
      expect(MockWorker.liveWorkers, isEmpty);
      MockWorker.deadWorkers.clear();
      MockWorker.responseQueue.clear();
    });
  });
}

/// Runs [count] of fake work requests through [driver], and asserts that they
/// all completed.
Future _doRequests(
    {BazelWorkerDriver? driver,
    int count = 100,
    Function(Future<WorkResponse?>)? trackWork}) async {
  // If we create a driver, we need to make sure and terminate it.
  var terminateDriver = driver == null;
  driver ??= BazelWorkerDriver(MockWorker.spawn);
  var requests = List.generate(count, (_) => WorkRequest());
  var responses = List.generate(count, (_) => WorkResponse());
  MockWorker.responseQueue.addAll(responses);
  var actualResponses = await Future.wait(
      requests.map((request) => driver!.doWork(request, trackWork: trackWork)));
  expect(actualResponses, unorderedEquals(responses));
  if (terminateDriver) await driver.terminateWorkers();
}

/// A mock worker loop that returns work responses from the provided list.
///
/// Throws if it runs out of responses.
class MockWorkerLoop extends AsyncWorkerLoop {
  final Queue<WorkResponse> _responseQueue;

  MockWorkerLoop(this._responseQueue, {AsyncWorkerConnection? connection})
      : super(connection: connection);

  @override
  Future<WorkResponse> performRequest(WorkRequest request) async {
    print('Performing request $request');
    return _responseQueue.removeFirst();
  }
}

/// A mock worker loop with a custom `run` function that throws.
class ThrowingMockWorkerLoop extends MockWorkerLoop {
  final MockWorker _mockWorker;

  ThrowingMockWorkerLoop(this._mockWorker, Queue<WorkResponse> responseQueue,
      AsyncWorkerConnection connection)
      : super(responseQueue, connection: connection);

  /// Run the worker loop. The returned [Future] doesn't complete until
  /// [connection#readRequest] returns `null`.
  @override
  Future run() async {
    while (true) {
      var request = await connection.readRequest();
      if (request == null) break;
      await performRequest(request);
      _mockWorker.kill();
    }
  }
}

/// A mock worker process.
///
/// Items in [responseQueue] will be returned in order based on requests.
///
/// If there are no items left in [responseQueue] then it will throw.
class MockWorker implements Process {
  /// Spawns a new [MockWorker].
  static Future<MockWorker> spawn() async => MockWorker();

  /// Static queue of pending responses, these are shared by all workers.
  ///
  /// If this is empty and a request is received then it will throw.
  static final responseQueue = Queue<WorkResponse>();

  /// Static list of all live workers.
  static final liveWorkers = <MockWorker>[];

  /// Static list of all the dead workers.
  static final deadWorkers = <MockWorker>[];

  /// Standard constructor, creates a [WorkerLoop] from [workerLoopFactory] or
  /// a [MockWorkerLoop] if no factory is provided.
  MockWorker({WorkerLoop Function(MockWorker)? workerLoopFactory}) {
    liveWorkers.add(this);
    var workerLoop = workerLoopFactory != null
        ? workerLoopFactory(this)
        : MockWorkerLoop(responseQueue,
            connection: StdAsyncWorkerConnection(
                inputStream: _stdinController.stream,
                outputStream: _stdoutController.sink));
    workerLoop.run();
  }

  @override
  Future<int> get exitCode => throw UnsupportedError('Not needed.');

  @override
  Stream<List<int>> get stdout => _stdoutController.stream;
  final _stdoutController = StreamController<List<int>>();

  @override
  Stream<List<int>> get stderr => _stderrController.stream;
  final _stderrController = StreamController<List<int>>();

  @override
  late final IOSink stdin = IOSink(_stdinController.sink);
  final _stdinController = StreamController<List<int>>();

  @override
  int get pid => throw UnsupportedError('Not needed.');

  @override
  bool kill(
      [ProcessSignal processSignal = ProcessSignal.sigterm, int exitCode = 0]) {
    if (_killed) return false;
    () async {
      await _stdoutController.close();
      await _stderrController.close();
      await _stdinController.close();
    }();
    deadWorkers.add(this);
    liveWorkers.remove(this);
    return true;
  }

  final _killed = false;
}
