blob: a04ab9c439a83833693887cf3182e1ea7cbd92ee [file] [log] [blame]
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
/// A load-balancing runner pool.
library isolate.load_balancer;
import 'dart:async' show Completer, Future, FutureOr;
import 'runner.dart';
import 'src/errors.dart';
import 'src/util.dart';
/// A pool of runners, ordered by load.
///
/// Keeps a pool of runners,
/// and allows running function through the runner with the lowest current load.
///
/// The number of pool runner entries is fixed when the pool is created.
/// When the pool is [close]d, all runners are closed as well.
///
/// The load balancer is not reentrant.
/// Executing a [run] function should not *synchronously*
/// call methods on the load balancer.
class LoadBalancer implements Runner {
/// A stand-in future which can be used as a default value.
///
/// The future never completes, so it should not be exposed to users.
static final _defaultFuture = Completer<Never>().future;
/// Reusable empty fixed-length list.
static final _emptyQueue = List<_LoadBalancerEntry>.empty(growable: false);
/// A heap-based priority queue of entries, prioritized by `load`.
///
/// The entries of the list never change, only their positions.
/// Those with positions below `_length`
/// are considered to currently be in the queue.
/// All operations except [close] should end up with all entries
/// still in the pool. Some entries may be removed temporarily in order
/// to change their load and then add them back.
///
/// Each [_LoadBalancerEntry] has its current position in the queue
/// as [_LoadBalancerEntry.queueIndex].
///
/// Is set to an empty list on clear.
List<_LoadBalancerEntry> _queue;
/// Current number of elements in [_queue].
///
/// Always a number between zero and [_queue.length].
/// Elements with indices below this value are
/// in the queue, and maintain the heap invariant.
/// Elements with indices above this value are temporarily
/// removed from the queue and are ordered by when they
/// were removed.
int _length;
/// The future returned by [stop].
///
/// Is `null` until [stop] is first called.
Future<void>? _stopFuture;
/// Create a load balancer backed by the [Runner]s of [runners].
LoadBalancer(Iterable<Runner> runners) : this._(_createEntries(runners));
LoadBalancer._(List<_LoadBalancerEntry> entries)
: _queue = entries,
_length = entries.length;
/// The number of runners currently in the pool.
int get length => _length;
/// Asynchronously create [size] runners and create a `LoadBalancer` of those.
///
/// This is a helper function that makes it easy to create a `LoadBalancer`
/// with asynchronously created runners, for example:
/// ```dart
/// var isolatePool = LoadBalancer.create(10, IsolateRunner.spawn);
/// ```
static Future<LoadBalancer> create(
int size, Future<Runner> Function() createRunner) {
return Future.wait(Iterable.generate(size, (_) => createRunner()),
cleanUp: (Runner runner) {
runner.close();
}).then((runners) => LoadBalancer(runners));
}
static List<_LoadBalancerEntry> _createEntries(Iterable<Runner> runners) {
var index = 0;
return runners
.map((runner) => _LoadBalancerEntry(runner, index++))
.toList(growable: false);
}
/// Execute the command in the currently least loaded isolate.
///
/// The optional [load] parameter represents the load that the command
/// is causing on the isolate where it runs.
/// The number has no fixed meaning, but should be seen as relative to
/// other commands run in the same load balancer.
/// The `load` must not be negative.
///
/// If [timeout] and [onTimeout] are provided, they are forwarded to
/// the runner running the function, which will handle a timeout
/// as normal. If the runners are running in other isolates, then
/// the [onTimeout] function must be a constant function.
@override
Future<R> run<R, P>(FutureOr<R> Function(P argument) function, P argument,
{Duration? timeout, FutureOr<R> Function()? onTimeout, int load = 100}) {
RangeError.checkNotNegative(load, 'load');
if (_length == 0) {
// Can happen if created with zero runners,
// or after being closed.
if (_stopFuture != null) {
throw StateError("Load balancer has been closed");
}
throw StateError("No runners in pool");
}
var entry = _queue.first;
entry.load += load;
_bubbleDown(entry, 0);
return entry.run(this, load, function, argument, timeout, onTimeout);
}
/// Execute the same function in the least loaded [count] isolates.
///
/// This guarantees that the function isn't run twice in the same isolate,
/// so `count` is not allowed to exceed [length].
///
/// The optional [load] parameter represents the load that the command
/// is causing on the isolate where it runs.
/// The number has no fixed meaning, but should be seen as relative to
/// other commands run in the same load balancer.
/// The `load` must not be negative.
///
/// If [timeout] and [onTimeout] are provided, they are forwarded to
/// the runners running the function, which will handle any timeouts
/// as normal.
List<Future<R>> runMultiple<R, P>(
int count, FutureOr<R> Function(P argument) function, P argument,
{Duration? timeout, FutureOr<R> Function()? onTimeout, int load = 100}) {
RangeError.checkValueInInterval(count, 1, _length, 'count');
RangeError.checkNotNegative(load, 'load');
if (count == 1) {
return List<Future<R>>.filled(
1,
run(function, argument,
load: load, timeout: timeout, onTimeout: onTimeout));
}
var result = List<Future<R>>.filled(count, _defaultFuture);
if (count == _length) {
// No need to change the order of entries in the queue.
for (var i = 0; i < _length; i++) {
var entry = _queue[i];
entry.load += load;
result[i] =
entry.run(this, load, function, argument, timeout, onTimeout);
}
} else {
// Remove the [count] least loaded services and run the
// command on each, then add them back to the queue.
for (var i = 0; i < count; i++) {
_removeFirst();
}
// The removed entries are stored in `_queue` in positions from
// `_length` to `_length + count - 1`.
for (var i = 0; i < count; i++) {
var entry = _queue[_length];
entry.load += load;
_addNext();
result[i] =
entry.run(this, load, function, argument, timeout, onTimeout);
}
}
return result;
}
@override
Future<void> close() {
var stopFuture = _stopFuture;
if (stopFuture != null) return stopFuture;
var queue = _queue;
var length = _length;
_queue = _emptyQueue;
_length = 0;
return _stopFuture = MultiError.waitUnordered(
[for (var i = 0; i < length; i++) queue[i].close()],
).then(ignore);
}
/// Place [element] in heap at [index] or above.
///
/// Put element into the empty cell at `index`.
/// While the `element` has higher priority than the
/// parent, swap it with the parent.
///
/// Ignores [element]'s initial [_LoadBalancerEntry.queueIndex],
/// but sets it to the final position when the element has
/// been placed.
void _bubbleUp(_LoadBalancerEntry element, int index) {
while (index > 0) {
var parentIndex = (index - 1) ~/ 2;
var parent = _queue[parentIndex];
if (element.compareTo(parent) > 0) break;
_queue[index] = parent..queueIndex = index;
index = parentIndex;
}
_queue[index] = element..queueIndex = index;
}
/// Place [element] in heap at [index] or above.
///
/// Put element into the empty cell at `index`.
/// While the `element` has lower priority than either child,
/// swap it with the highest priority child.
///
/// Ignores [element]'s initial [_LoadBalancerEntry.queueIndex],
/// but sets it to the final position when the element has
/// been placed.
void _bubbleDown(_LoadBalancerEntry element, int index) {
while (true) {
var childIndex = index * 2 + 1; // Left child index.
if (childIndex >= _length) break;
var child = _queue[childIndex];
var rightChildIndex = childIndex + 1;
if (rightChildIndex < _length) {
var rightChild = _queue[rightChildIndex];
if (rightChild.compareTo(child) < 0) {
childIndex = rightChildIndex;
child = rightChild;
}
}
if (element.compareTo(child) <= 0) break;
_queue[index] = child..queueIndex = index;
index = childIndex;
}
_queue[index] = element..queueIndex = index;
}
/// Removes the first entry from the queue, but doesn't stop its service.
///
/// The entry is expected to be either added back to the queue
/// immediately or have its stop method called.
///
/// After the remove, the entry is stored as `_queue[_length]`.
_LoadBalancerEntry _removeFirst() {
assert(_length > 0);
_LoadBalancerEntry entry = _queue.first;
_length--;
if (_length > 0) {
var replacement = _queue[_length];
_queue[_length] = entry..queueIndex = _length;
_bubbleDown(replacement, 0);
}
return entry;
}
/// Adds next unused entry to the queue.
///
/// Adds the entry at [_length] to the queue.
void _addNext() {
assert(_length < _queue.length);
var index = _length;
var entry = _queue[index];
_length = index + 1;
_bubbleUp(entry, index);
}
/// Decreases the load of an element which is currently in the queue.
///
/// Elements outside the queue can just have their `load` modified directly.
void _decreaseLoad(_LoadBalancerEntry entry, int load) {
assert(load >= 0);
entry.load -= load;
var index = entry.queueIndex;
// Should always be the case unless the load balancer
// has been closed, or events are happening out of their
// proper order.
if (index < _length) {
_bubbleUp(entry, index);
}
}
}
class _LoadBalancerEntry implements Comparable<_LoadBalancerEntry> {
/// The position in the heap queue.
///
/// Maintained when entries move around the queue.
/// Only needed for [LoadBalancer._decreaseLoad].
int queueIndex;
// The current load on the isolate.
int load = 0;
// The service used to execute commands.
Runner runner;
_LoadBalancerEntry(this.runner, this.queueIndex);
Future<R> run<R, P>(
LoadBalancer balancer,
int load,
FutureOr<R> Function(P argument) function,
P argument,
Duration? timeout,
FutureOr<R> Function()? onTimeout) {
return runner
.run<R, P>(function, argument, timeout: timeout, onTimeout: onTimeout)
.whenComplete(() {
balancer._decreaseLoad(this, load);
});
}
Future close() => runner.close();
@override
int compareTo(_LoadBalancerEntry other) => load - other.load;
}