blob: 621fb1bcb49bf2313f2fd52fe7f7bd6f1e055c2e [file] [log] [blame]
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
/// An isolate-compatible object registry and lookup service.
library isolate.registry;
import 'dart:async' show Future, Completer, TimeoutException;
import 'dart:collection' show HashMap, HashSet;
import 'dart:isolate' show RawReceivePort, SendPort, Capability;
import 'isolate_runner.dart'; // For documentation.
import 'ports.dart';
import 'src/util.dart';
// Command tags.
const int _addValue = 0;
const int _removeValue = 1;
const int _addTagsValue = 2;
const int _removeTagsValue = 3;
const int _getTagsValue = 4;
const int _findValue = 5;
/// An isolate-compatible object registry.
///
/// Objects can be stored as elements of a registry,
/// have "tags" assigned to them, and be looked up by tag.
///
/// Since the registry is identity based, the objects must not be numbers,
/// strings, booleans or null. See [Expando] for description of which objects
/// are not treated as having a clear identity.
///
/// A [Registry] object caches objects found using the [lookup]
/// method, or added using [add], and returns the same object every time
/// they are requested.
/// A different [Registry] object that works on the same underlying registry,
/// will not preserve the identity of elements
///
/// It is recommended to only have one `Registry` object working on the
/// same registry in each isolate.
///
/// When the registry is shared across isolates, both elements and tags must
/// be sendable between the isolates.
/// See [SendPort] for details on the restrictions on objects which can be sent
/// between isolates.
///
/// A registry can be sued to make a number of object available to separate
/// workers in different isolates, for example ones created using
/// [IsolateRunner], without sending all the objects to all the isolates.
/// A worker can then request the data it needs, and it can add new data
/// to the registry that will also be shared with all other workers.
/// Example:
/// ```dart
/// main() {
/// Registry<List<String>> dictionaryByFirstLetter = Registry();
/// for (var letter in alphabet) {
/// registry.add(
/// allWords.where((w) => w.startsWith(letter).toList,
/// tags: [letter]);
/// }
/// var loadBalancer = LoadBalancer(10);
/// for (var task in tasks) {
/// loadBalancer.run(_runTask, [task, dictionaryByFirstLetter]);
/// }
/// }
/// _runTask(task, Registry<List<String>> dictionaryByFirstLetter) async {
/// ...
/// // Fetch just the words starting with the needed letter.
/// var aWords = await dictionaryByFirstLetter.lookup(tags: [task.letter]);
/// ...
/// }
/// ```
///
/// A registry can be treated like a distributed multimap from tags to
/// objects, if each tag is only used once. Example:
/// ```dart
/// Registry<Capability> capabilities = Registry();
/// // local code:
/// ...
/// capabilities.add(Capability(), ["create"]);
/// capabilities.add(Capability(), ["read"]);
/// capabilities.add(Capability(), ["update"]);
/// capabilities.add(Capability(), ["delete"]);
/// ...
/// sendPort.send(capabilities);
///
/// // other isolate code:
/// Registry<Capability> capabilities = await receiveFromPort();
///
/// Future<Capability> get createCapability => (await
/// capabilities.lookup(tags: const ["create"])).first;
/// ```
class Registry<T> {
// Most operations fail if they haven't received a response for this duration.
final Duration _timeout;
// Each `Registry` object has a cache of objects being controlled by it.
// The cache is stored in an [Expando], not on the object.
// This allows sending the `Registry` object through a `SendPort` without
// also copying the cache.
static final Expando _caches = Expando();
/// Port for sending command to the central registry manager.
final SendPort _commandPort;
/// Create a registry linked to a [RegistryManager] through [commandPort].
///
/// In most cases, a registry is created by using the
/// [RegistryManager.registry] getter.
///
/// If a registry is used between isolates created using [Isolate.spawnUri],
/// the `Registry` object can't be sent between the isolates directly.
/// Instead the [RegistryManager.commandPort] port can be sent and a
/// `Registry` created from the command port using this constructor.
///
/// The optional [timeout] parameter can be set to the duration
/// this registry should wait before assuming that an operation
/// has failed.
Registry.fromPort(SendPort commandPort,
{Duration timeout = const Duration(seconds: 5)})
: _commandPort = commandPort,
_timeout = timeout;
_RegistryCache get _cache {
_RegistryCache cache = _caches[this];
if (cache != null) return cache;
cache = _RegistryCache();
_caches[this] = cache;
return cache;
}
/// Check and get the identity of an element.
///
/// Throws if [element] is not an element in the registry.
int _getId(T element) {
var id = _cache.id(element);
if (id == null) {
throw StateError('Not an element: ${Error.safeToString(element)}');
}
return id;
}
/// Adds [element] to the registry with the provided tags.
///
/// Fails if [element] is already in this registry.
/// An object is already in the registry if it has been added using [add],
/// or if it was returned by a [lookup] call on this registry object.
///
/// Returns a capability that can be used with [remove] to remove
/// the element from the registry again.
///
/// The [tags] can be used to distinguish some of the elements
/// from other elements. Any object can be used as a tag, as long as
/// it preserves equality when sent through a [SendPort].
/// This makes [Capability] objects a good choice for tags.
Future<Capability> add(T element, {Iterable tags}) {
var cache = _cache;
if (cache.contains(element)) {
return Future<Capability>.sync(() {
throw StateError(
'Object already in registry: ${Error.safeToString(element)}');
});
}
var completer = Completer<Capability>();
var port = singleCompletePort(completer,
callback: (List response) {
assert(cache.isAdding(element));
int id = response[0];
Capability removeCapability = response[1];
cache.register(id, element);
return removeCapability;
},
timeout: _timeout,
onTimeout: () {
cache.stopAdding(element);
throw TimeoutException('Future not completed', _timeout);
});
if (tags != null) tags = tags.toList(growable: false);
cache.setAdding(element);
_commandPort.send(list4(_addValue, element, tags, port));
return completer.future;
}
/// Remove the element from the registry.
///
/// Returns `true` if removing the element succeeded, or `false` if the
/// elements wasn't in the registry, or if it couldn't be removed.
///
/// The [removeCapability] must be the same capability returned by [add]
/// when the object was added. If the capability is wrong, the
/// object is not removed, and this function returns false.
Future<bool> remove(T element, Capability removeCapability) {
var id = _cache.id(element);
if (id == null) {
return Future<bool>.value(false);
}
var completer = Completer<bool>();
var port = singleCompletePort(completer, callback: (bool result) {
_cache.remove(id);
return result;
}, timeout: _timeout);
_commandPort.send(list4(_removeValue, id, removeCapability, port));
return completer.future;
}
/// Add tags to objects in the registry.
///
/// Each element of the registry has a number of tags associated with
/// it. A tag is either associated with an element or not, adding it more
/// than once does not make any difference.
///
/// Tags are compared using [Object.==] equality.
///
/// Fails if any of the elements are not in the registry.
Future addTags(Iterable<T> elements, Iterable<Object> tags) {
List<Object> ids = elements.map(_getId).toList(growable: false);
return _addTags(ids, tags);
}
/// Remove tags from objects in the registry.
///
/// After this operation, the [elements] will not be associated to the [tags].
/// It doesn't matter whether the elements were associated with the tags
/// before or not.
///
/// Fails if any of the elements are not in the registry.
Future<void> removeTags(Iterable<T> elements, Iterable tags) {
List ids = elements.map(_getId).toList(growable: false);
tags = tags.toList(growable: false);
var completer = Completer<void>();
var port = singleCompletePort(completer, timeout: _timeout);
_commandPort.send(list4(_removeTagsValue, ids, tags, port));
return completer.future;
}
Future<void> _addTags(List<int> ids, Iterable tags) {
tags = tags.toList(growable: false);
var completer = Completer<void>();
var port = singleCompletePort(completer, timeout: _timeout);
_commandPort.send(list4(_addTagsValue, ids, tags, port));
return completer.future;
}
/// Finds a number of elements that have all the desired [tags].
///
/// If [tags] is omitted or empty, any element of the registry can be
/// returned.
///
/// If [max] is specified, it must be greater than zero.
/// In that case, at most the first `max` results are returned,
/// in whatever order the registry finds its results.
/// Otherwise all matching elements are returned.
Future<List<T>> lookup({Iterable<Object> tags, int max}) {
if (max != null && max < 1) {
throw RangeError.range(max, 1, null, 'max');
}
if (tags != null) tags = tags.toList(growable: false);
var completer = Completer<List<T>>();
var port = singleCompletePort(completer, callback: (List response) {
// Response is even-length list of (id, element) pairs.
var cache = _cache;
var count = response.length ~/ 2;
var result = List<T>(count);
for (var i = 0; i < count; i++) {
var id = response[i * 2] as int;
var element = response[i * 2 + 1] as T;
element = cache.register(id, element);
result[i] = element;
}
return result;
}, timeout: _timeout);
_commandPort.send(list4(_findValue, tags, max, port));
return completer.future;
}
}
/// Isolate-local cache used by a [Registry].
///
/// Maps between id-numbers and elements.
/// An object is considered an element of the registry if it
class _RegistryCache {
// Temporary marker until an object gets an id.
static const int _beingAdded = -1;
final Map<int, Object> id2object = HashMap();
final Map<Object, int> object2id = HashMap.identity();
int id(Object object) {
var result = object2id[object];
if (result == _beingAdded) return null;
return result;
}
Object operator [](int id) => id2object[id];
// Register a pair of id/object in the cache.
// if the id is already in the cache, just return the existing
// object.
Object register(int id, Object object) {
object = id2object.putIfAbsent(id, () {
object2id[object] = id;
return object;
});
return object;
}
bool isAdding(element) => object2id[element] == _beingAdded;
void setAdding(element) {
assert(!contains(element));
object2id[element] = _beingAdded;
}
void stopAdding(element) {
assert(object2id[element] == _beingAdded);
object2id.remove(element);
}
void remove(int id) {
var element = id2object.remove(id);
if (element != null) {
object2id.remove(element);
}
}
bool contains(element) => object2id.containsKey(element);
}
/// The central repository used by distributed [Registry] instances.
class RegistryManager {
final Duration _timeout;
final RawReceivePort _commandPort;
int _nextId = 0;
/// Maps id to entry. Each entry contains the id, the element, its tags,
/// and a capability required to remove it again.
final _entries = HashMap<int, _RegistryEntry>();
final _tag2id = HashMap<Object, Set<int>>();
/// Create a new registry managed by the created [RegistryManager].
///
/// The optional [timeout] parameter can be set to the duration
/// registry objects should wait before assuming that an operation
/// has failed.
RegistryManager({Duration timeout = const Duration(seconds: 5)})
: _timeout = timeout,
_commandPort = RawReceivePort() {
_commandPort.handler = _handleCommand;
}
/// The command port receiving commands for the registry manager.
///
/// Use this port with [Registry.fromPort] to link a registry to the
/// manager in isolates where you can't send a [Registry] object directly.
SendPort get commandPort => _commandPort.sendPort;
/// Get a registry backed by this manager.
///
/// This registry can be sent to other isolates created using
/// [Isolate.spawn].
Registry get registry =>
Registry.fromPort(_commandPort.sendPort, timeout: _timeout);
// Used as argument to putIfAbsent.
static Set<int> _createSet() => HashSet<int>();
void _handleCommand(List command) {
switch (command[0]) {
case _addValue:
_add(command[1], command[2] as List, command[3] as SendPort);
return;
case _removeValue:
_remove(command[1], command[2] as Capability, command[3] as SendPort);
return;
case _addTagsValue:
_addTags(command[1], command[2] as List, command[3] as SendPort);
return;
case _removeTagsValue:
_removeTags(command[1], command[2] as List, command[3] as SendPort);
return;
case _getTagsValue:
_getTags(command[1], command[2] as SendPort);
return;
case _findValue:
_find(command[1] as List, command[2] as int, command[3] as SendPort);
return;
default:
throw UnsupportedError('Unknown command: ${command[0]}');
}
}
void _add(Object object, List tags, SendPort replyPort) {
var id = ++_nextId;
var entry = _RegistryEntry(id, object);
_entries[id] = entry;
if (tags != null) {
for (var tag in tags) {
entry.tags.add(tag);
_tag2id.putIfAbsent(tag, _createSet).add(id);
}
}
replyPort.send(list2(id, entry.removeCapability));
}
void _remove(int id, Capability removeCapability, SendPort replyPort) {
var entry = _entries[id];
if (entry == null || entry.removeCapability != removeCapability) {
replyPort.send(false);
return;
}
_entries.remove(id);
for (var tag in entry.tags) {
_tag2id[tag].remove(id);
}
replyPort.send(true);
}
void _addTags(List<int> ids, List tags, SendPort replyPort) {
assert(tags != null);
assert(tags.isNotEmpty);
for (var id in ids) {
var entry = _entries[id];
if (entry == null) continue; // Entry was removed.
entry.tags.addAll(tags);
for (var tag in tags) {
Set ids = _tag2id.putIfAbsent(tag, _createSet);
ids.add(id);
}
}
replyPort.send(null);
}
void _removeTags(List<int> ids, List tags, SendPort replyPort) {
assert(tags != null);
assert(tags.isNotEmpty);
for (var id in ids) {
var entry = _entries[id];
if (entry == null) continue; // Object was removed.
entry.tags.removeAll(tags);
}
for (var tag in tags) {
Set tagIds = _tag2id[tag];
if (tagIds == null) continue;
tagIds.removeAll(ids);
}
replyPort.send(null);
}
void _getTags(int id, SendPort replyPort) {
var entry = _entries[id];
if (entry != null) {
replyPort.send(entry.tags.toList(growable: false));
} else {
replyPort.send(const []);
}
}
Iterable<int> _findTaggedIds(List tags) {
var matchingFirstTagIds = _tag2id[tags[0]];
if (matchingFirstTagIds == null) {
return const [];
}
if (matchingFirstTagIds.isEmpty || tags.length == 1) {
return matchingFirstTagIds;
}
// Create new set, then start removing ids not also matched
// by other tags.
var matchingIds = matchingFirstTagIds.toSet();
for (var i = 1; i < tags.length; i++) {
var tagIds = _tag2id[tags[i]];
if (tagIds == null) return const [];
matchingIds.retainAll(tagIds);
if (matchingIds.isEmpty) break;
}
return matchingIds;
}
void _find(List tags, int max, SendPort replyPort) {
assert(max == null || max > 0);
var result = [];
if (tags == null || tags.isEmpty) {
var entries = _entries.values;
if (max != null) entries = entries.take(max);
for (var entry in entries) {
result.add(entry.id);
result.add(entry.element);
}
replyPort.send(result);
return;
}
var matchingIds = _findTaggedIds(tags);
max ??= matchingIds.length; // All results.
for (var id in matchingIds) {
result.add(id);
result.add(_entries[id].element);
max--;
if (max == 0) break;
}
replyPort.send(result);
}
/// Shut down the registry service.
///
/// After this, all [Registry] operations will time out.
void close() {
_commandPort.close();
}
}
/// Entry in [RegistryManager].
class _RegistryEntry {
final int id;
final Object element;
final Set tags = HashSet();
final Capability removeCapability = Capability();
_RegistryEntry(this.id, this.element);
}