blob: 0191c3ae416a7a6d7eb7d87b2e33b31f26eab5c1 [file] [log] [blame]
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:collection';
import 'dart:io';
import 'package:test/test.dart';
import 'package:bazel_worker/bazel_worker.dart';
import 'package:bazel_worker/driver.dart';
void main() {
BazelWorkerDriver driver;
group('basic driver', () {
test('can run a single request', () async {
await _doRequests(count: 1);
await _doRequests(count: 1);
});
test('can run multiple batches of requests through multiple workers',
() async {
int maxWorkers = 4;
int maxIdleWorkers = 2;
driver = new BazelWorkerDriver(MockWorker.spawn,
maxWorkers: maxWorkers, maxIdleWorkers: maxIdleWorkers);
for (int i = 0; i < 10; i++) {
await _doRequests(driver: driver);
expect(MockWorker.liveWorkers.length, maxIdleWorkers);
// No workers should be killed while there is ongoing work, but they
// should be cleaned up once there isn't any more work to do.
expect(MockWorker.deadWorkers.length,
(maxWorkers - maxIdleWorkers) * (i + 1));
}
});
test('can run multiple requests through one worker', () async {
int maxWorkers = 1;
int maxIdleWorkers = 1;
driver = new BazelWorkerDriver(MockWorker.spawn,
maxWorkers: maxWorkers, maxIdleWorkers: maxIdleWorkers);
for (int i = 0; i < 10; i++) {
await _doRequests(driver: driver);
expect(MockWorker.liveWorkers.length, 1);
expect(MockWorker.deadWorkers.length, 0);
}
});
test('can run one request through multiple workers', () async {
driver = new BazelWorkerDriver(MockWorker.spawn,
maxWorkers: 4, maxIdleWorkers: 4);
for (int i = 0; i < 10; i++) {
await _doRequests(driver: driver, count: 1);
expect(MockWorker.liveWorkers.length, 1);
expect(MockWorker.deadWorkers.length, 0);
}
});
test('can run with maxIdleWorkers == 0', () async {
int maxWorkers = 4;
driver = new BazelWorkerDriver(MockWorker.spawn,
maxWorkers: maxWorkers, maxIdleWorkers: 0);
for (int i = 0; i < 10; i++) {
await _doRequests(driver: driver);
expect(MockWorker.liveWorkers.length, 0);
expect(MockWorker.deadWorkers.length, maxWorkers * (i + 1));
}
});
tearDown(() async {
await driver?.terminateWorkers();
expect(MockWorker.liveWorkers, isEmpty);
MockWorker.deadWorkers.clear();
});
});
}
/// Runs [count] of fake work requests through [driver], and asserts that they
/// all completed.
Future _doRequests({BazelWorkerDriver driver, int count}) async {
// If we create a driver, we need to make sure and terminate it.
var terminateDriver = driver == null;
driver ??= new BazelWorkerDriver(MockWorker.spawn);
count ??= 100;
terminateDriver ??= true;
var requests = new List.generate(count, (_) => new WorkRequest());
var responses = new List.generate(count, (_) => new WorkResponse());
MockWorker.responseQueue.addAll(responses);
var actualResponses = await Future.wait(requests.map(driver.doWork));
expect(actualResponses, unorderedEquals(responses));
if (terminateDriver) await driver.terminateWorkers();
}
/// A mock worker loop that returns work responses from the provided list.
///
/// Throws if it runs out of responses.
class MockWorkerLoop extends AsyncWorkerLoop {
final Queue<WorkResponse> _responseQueue;
MockWorkerLoop(this._responseQueue, {AsyncWorkerConnection connection})
: super(connection: connection);
Future<WorkResponse> performRequest(WorkRequest request) async {
print('Performing request $request');
return _responseQueue.removeFirst();
}
}
/// A mock worker process.
///
/// Items in [responseQueue] will be returned in order based on requests.
///
/// If there are no items left in [responseQueue] then it will throw.
class MockWorker implements Process {
/// Spawns a new [MockWorker].
static Future<MockWorker> spawn() async => new MockWorker();
/// Worker loop that handles reading requests and responding.
AsyncWorkerLoop _workerLoop;
/// Static queue of pending responses, these are shared by all workers.
///
/// If this is empty and a request is received then it will throw.
static final responseQueue = new Queue<WorkResponse>();
/// Static list of all live workers.
static final liveWorkers = <MockWorker>[];
/// Static list of all the dead workers.
static final deadWorkers = <MockWorker>[];
/// Standard constructor, creates the [_workerLoop].
MockWorker() {
liveWorkers.add(this);
_workerLoop = new MockWorkerLoop(responseQueue,
connection: new StdAsyncWorkerConnection(
inputStream: this._stdinController.stream,
outputStream: this._stdoutController.sink))
..run();
}
Future<int> get exitCode => _exitCodeCompleter.future;
final _exitCodeCompleter = new Completer<int>();
@override
Stream<List<int>> get stdout => _stdoutController.stream;
final _stdoutController = new StreamController<List<int>>();
@override
Stream<List<int>> get stderr => _stderrController.stream;
final _stderrController = new StreamController<List<int>>();
@override
IOSink get stdin {
_stdin ??= new IOSink(_stdinController.sink);
return _stdin;
}
IOSink _stdin;
final _stdinController = new StreamController<List<int>>();
int get pid => throw new UnsupportedError('Not needed.');
@override
bool kill([ProcessSignal = ProcessSignal.SIGTERM, int exitCode = 0]) {
if (_killed) return false;
() async {
await _stdoutController.close();
await _stderrController.close();
await _stdinController.close();
_exitCodeCompleter.complete(exitCode);
}();
deadWorkers.add(this);
liveWorkers.remove(this);
return true;
}
bool _killed = false;
}