blob: ddc801adba0313cdfe4fd3cbf777586a8794266e [file] [log] [blame]
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
/// An isolate-compatible object registry and lookup service.
library isolate.registry;
import 'dart:async' show Future, Completer, TimeoutException;
import 'dart:collection' show HashMap, HashSet;
import 'dart:isolate' show RawReceivePort, SendPort, Capability;
import 'ports.dart';
import 'src/lists.dart';
// Command tags.
const int _ADD = 0;
const int _REMOVE = 1;
const int _ADD_TAGS = 2;
const int _REMOVE_TAGS = 3;
const int _GET_TAGS = 4;
const int _FIND = 5;
/// An isolate-compatible object registry.
///
/// Objects can be stored as elements of a registry,
/// have "tags" assigned to them, and be looked up by tag.
///
/// A `Registry` object caches objects found using the [lookup]
/// method, or added using [add], and returns the same object every time
/// they are requested.
/// A different `Registry` object that works on the same registry will not
/// preserve the identity of elements
///
/// It is recommended to only have one `Registry` object working on the
/// same registry in each isolate.
///
/// When the registry is shared accross isolates, both elements and tags must
/// be sendable between the isolates.
/// Between isolates spawned using [Isolate.spawn] from the same initial
/// isolate, most objectes can be sent.
/// Only simple objects can be sent between isolates originating from different
/// [Isolate.spawnUri] calls.
class Registry<T> {
// Most operations fail if they haven't received a response for this duration.
final Duration _timeout;
// Each `Registry` object has a cache of objects being controlled by it.
// The cache is stored in an [Expando], not on the object.
// This allows sending the `Registry` object through a `SendPort` without
// also copying the cache.
static Expando _caches = new Expando();
/// Port for sending command to the central registry mananger.
SendPort _commandPort;
/// Create a registry linked to a [RegistryManager] through [commandPort].
///
/// In most cases, a registry is created by using the
/// [RegistryManager.registry] getter.
///
/// If a registry is used between isolates created using [Isolate.spawnUri],
/// the `Registry` object can't be sent between the isolates directly.
/// Instead the [RegistryManager.commandPort] port can be sent and a
/// `Registry` created from the command port using this constructor.
///
/// The optional [timeout] parameter can be set to the duration
/// this registry should wait before assuming that an operation
/// has failed.
Registry.fromPort(SendPort commandPort,
{Duration timeout: const Duration(seconds: 5)})
: _commandPort = commandPort,
_timeout = timeout;
_RegistryCache get _cache {
_RegistryCache cache = _caches[this];
if (cache != null) return cache;
cache = new _RegistryCache();
_caches[this] = cache;
return cache;
}
/// Check and get the identity of an element.
///
/// Throws if [element] is not an element in the registry.
int _getId(T element) {
int id = _cache.id(element);
if (id == null) {
throw new StateError("Not an element: ${Error.safeToString(element)}");
}
return id;
}
/// Adds [element] to the registry with the provided tags.
///
/// Fails if [element] is already in this registry.
/// An object is already in the registry if it has been added using [add],
/// or if it was returned by a [lookup] call on this registry object.
///
/// Returns a capability that can be used with [remove] to remove
/// the element from the registry again.
///
/// The [tags] can be used to distinguish some of the elements
/// from other elements. Any object can be used as a tag, as long as
/// it preserves equality when sent through a [SendPort].
/// This makes [Capability] objects a good choice for tags.
Future<Capability> add(T element, {Iterable tags}) {
_RegistryCache cache = _cache;
if (cache.contains(element)) {
return new Future<Capability>.sync(() {
throw new StateError(
"Object already in registry: ${Error.safeToString(element)}");
});
}
Completer completer = new Completer<Capability>();
SendPort port = singleCompletePort(completer, callback: (List response) {
assert(cache.isAdding(element));
int id = response[0];
Capability removeCapability = response[1];
cache.register(id, element);
return removeCapability;
}, timeout: _timeout, onTimeout: () {
cache.stopAdding(element);
throw new TimeoutException("Future not completed", _timeout);
});
if (tags != null) tags = tags.toList(growable: false);
cache.setAdding(element);
_commandPort.send(list4(_ADD, element, tags, port));
return completer.future;
}
/// Remove the element from the registry.
///
/// Returns `true` if removing the element succeeded, or `false` if the
/// elements wasn't in the registry, or if it couldn't be removed.
///
/// The [removeCapability] must be the same capability returned by [add]
/// when the object was added. If the capability is wrong, the
/// object is not removed, and this function returns false.
Future<bool> remove(T element, Capability removeCapability) {
int id = _cache.id(element);
if (id == null) {
return new Future<bool>.value(false);
}
Completer completer = new Completer<bool>();
SendPort port = singleCompletePort(completer, callback: (bool result) {
_cache.remove(id);
return result;
}, timeout: _timeout);
_commandPort.send(list4(_REMOVE, id, removeCapability, port));
return completer.future;
}
/// Add tags to an object in the registry.
///
/// Each element of the registry has a number of tags associated with
/// it. A tag is either associated with an element or not, adding it more
/// than once does not make any difference.
///
/// Tags are compared using [Object.==] equality.
///
/// Fails if any of the elements are not in the registry.
Future addTags(Iterable<T> elements, Iterable tags) {
List ids = elements.map(_getId).toList(growable: false);
return _addTags(ids, tags);
}
/// Remove tags from an object in the registry.
///
/// After this operation, the [elements] will not be associated to the [tags].
/// It doesn't matter whether the elements were associated with the tags
/// before or not.
///
/// Fails if any of the elements are not in the registry.
Future removeTags(Iterable<T> elements, Iterable tags) {
List ids = elements.map(_getId).toList(growable: false);
tags = tags.toList(growable: false);
Completer completer = new Completer();
SendPort port = singleCompletePort(completer, timeout: _timeout);
_commandPort.send(list4(_REMOVE_TAGS, ids, tags, port));
return completer.future;
}
Future _addTags(List<int> ids, Iterable tags) {
tags = tags.toList(growable: false);
Completer completer = new Completer();
SendPort port = singleCompletePort(completer, timeout: _timeout);
_commandPort.send(list4(_ADD_TAGS, ids, tags, port));
return completer.future;
}
/// Finds a number of elements that have all the desired [tags].
///
/// If [tags] is omitted or empty, any element of the registry can be
/// returned.
///
/// If [max] is specified, it must be greater than zero.
/// In that case, at most the first `max` results are returned,
/// in whatever order the registry finds its results.
/// Otherwise all matching elements are returned.
Future<List<T>> lookup({Iterable tags, int max}) {
if (max != null && max < 1) {
throw new RangeError.range(max, 1, null, "max");
}
if (tags != null) tags = tags.toList(growable: false);
Completer completer = new Completer<List<T>>();
SendPort port = singleCompletePort(completer, callback: (List response) {
// Response is even-length list of (id, element) pairs.
_RegistryCache cache = _cache;
int count = response.length ~/ 2;
List result = new List(count);
for (int i = 0; i < count; i++) {
int id = response[i * 2];
var element = response[i * 2 + 1];
element = cache.register(id, element);
result[i] = element;
}
return result;
}, timeout: _timeout);
_commandPort.send(list4(_FIND, tags, max, port));
return completer.future;
}
}
/// Isolate-local cache used by a [Registry].
///
/// Maps between id-numbers and elements.
/// An object is considered an element of the registry if it
class _RegistryCache {
// Temporary marker until an object gets an id.
static const int _BEING_ADDED = -1;
final Map<int, Object> id2object = new HashMap();
final Map<Object, int> object2id = new HashMap.identity();
int id(Object object) {
int result = object2id[object];
if (result == _BEING_ADDED) return null;
return result;
}
Object operator [](int id) => id2object[id];
// Register a pair of id/object in the cache.
// if the id is already in the cache, just return the existing
// object.
Object register(int id, Object object) {
object = id2object.putIfAbsent(id, () {
object2id[object] = id;
return object;
});
return object;
}
bool isAdding(element) => object2id[element] == _BEING_ADDED;
void setAdding(element) {
assert(!contains(element));
object2id[element] = _BEING_ADDED;
}
void stopAdding(element) {
assert(object2id[element] == _BEING_ADDED);
object2id.remove(element);
}
void remove(int id) {
var element = id2object.remove(id);
if (element != null) {
object2id.remove(element);
}
}
bool contains(element) => object2id.containsKey(element);
}
/// The central repository used by distributed [Registry] instances.
class RegistryManager {
final Duration _timeout;
int _nextId = 0;
RawReceivePort _commandPort;
/// Maps id to entry. Each entry contains the id, the element, its tags,
/// and a capability required to remove it again.
Map<int, _RegistryEntry> _entries = new HashMap();
Map<Object, Set<int>> _tag2id = new HashMap();
/// Create a new registry managed by the created [RegistryManager].
///
/// The optional [timeout] parameter can be set to the duration
/// registry objects should wait before assuming that an operation
/// has failed.
RegistryManager({timeout: const Duration(seconds: 5)})
: _timeout = timeout,
_commandPort = new RawReceivePort() {
_commandPort.handler = _handleCommand;
}
/// The command port receiving commands for the registry manager.
///
/// Use this port with [Registry.fromPort] to link a registry to the
/// manager in isolates where you can't send a [Registry] object directly.
SendPort get commandPort => _commandPort.sendPort;
/// Get a registry backed by this manager.
///
/// This registry can be sent to other isolates created using
/// [Isolate.spawn].
Registry get registry =>
new Registry.fromPort(_commandPort.sendPort, timeout: _timeout);
// Used as argument to putIfAbsent.
static Set _createSet() => new HashSet();
void _handleCommand(List command) {
switch (command[0]) {
case _ADD:
_add(command[1], command[2], command[3]);
return;
case _REMOVE:
_remove(command[1], command[2], command[3]);
return;
case _ADD_TAGS:
_addTags(command[1], command[2], command[3]);
return;
case _REMOVE_TAGS:
_removeTags(command[1], command[2], command[3]);
return;
case _GET_TAGS:
_getTags(command[1], command[2]);
return;
case _FIND:
_find(command[1], command[2], command[3]);
return;
default:
throw new UnsupportedError("Unknown command: ${command[0]}");
}
}
void _add(Object object, List tags, SendPort replyPort) {
int id = ++_nextId;
var entry = new _RegistryEntry(id, object);
_entries[id] = entry;
if (tags != null) {
for (var tag in tags) {
entry.tags.add(tag);
_tag2id.putIfAbsent(tag, _createSet).add(id);
}
}
replyPort.send(list2(id, entry.removeCapability));
}
void _remove(int id, Capability removeCapability, SendPort replyPort) {
_RegistryEntry entry = _entries[id];
if (entry == null || entry.removeCapability != removeCapability) {
replyPort.send(false);
return;
}
_entries.remove(id);
for (var tag in entry.tags) {
_tag2id[tag].remove(id);
}
replyPort.send(true);
}
void _addTags(List<int> ids, List tags, SendPort replyPort) {
assert(tags != null);
assert(tags.isNotEmpty);
for (int id in ids) {
_RegistryEntry entry = _entries[id];
if (entry == null) continue; // Entry was removed.
entry.tags.addAll(tags);
for (var tag in tags) {
Set ids = _tag2id.putIfAbsent(tag, _createSet);
ids.add(id);
}
}
replyPort.send(null);
}
void _removeTags(List<int> ids, List tags, SendPort replyPort) {
assert(tags != null);
assert(tags.isNotEmpty);
for (int id in ids) {
_RegistryEntry entry = _entries[id];
if (entry == null) continue; // Object was removed.
entry.tags.removeAll(tags);
}
for (var tag in tags) {
Set tagIds = _tag2id[tag];
if (tagIds == null) continue;
tagIds.removeAll(ids);
}
replyPort.send(null);
}
void _getTags(int id, SendPort replyPort) {
_RegistryEntry entry = _entries[id];
if (entry != null) {
replyPort.send(entry.tags.toList(growable: false));
} else {
replyPort.send(const []);
}
}
Iterable<int> _findTaggedIds(List tags) {
var matchingFirstTagIds = _tag2id[tags[0]];
if (matchingFirstTagIds == null) {
return const [];
}
if (matchingFirstTagIds.isEmpty || tags.length == 1) {
return matchingFirstTagIds;
}
// Create new set, then start removing ids not also matched
// by other tags.
Set<int> matchingIds = matchingFirstTagIds.toSet();
for (int i = 1; i < tags.length; i++) {
var tagIds = _tag2id[tags[i]];
if (tagIds == null) return const [];
matchingIds.retainAll(tagIds);
if (matchingIds.isEmpty) break;
}
return matchingIds;
}
void _find(List tags, int max, SendPort replyPort) {
assert(max == null || max > 0);
List result = [];
if (tags == null || tags.isEmpty) {
var entries = _entries.values;
if (max != null) entries = entries.take(max);
for (_RegistryEntry entry in entries) {
result.add(entry.id);
result.add(entry.element);
}
replyPort.send(result);
return;
}
var matchingIds = _findTaggedIds(tags);
if (max == null) max = matchingIds.length; // All results.
for (var id in matchingIds) {
result.add(id);
result.add(_entries[id].element);
max--;
if (max == 0) break;
}
replyPort.send(result);
}
/// Shut down the registry service.
///
/// After this, all [Registry] operations will time out.
void close() {
_commandPort.close();
}
}
/// Entry in [RegistryManager].
class _RegistryEntry {
final int id;
final Object element;
final Set tags = new HashSet();
final Capability removeCapability = new Capability();
_RegistryEntry(this.id, this.element);
}