Fix bug in `IsolateRunner.kill`. (#33)
Fix bug in `IsolateRunner.kill`.
Update `LoadBalancer.runMultiple` to be properly generic.
Cleanups and tweaks.
Add example for registry.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9fadba4..cd8b7e3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,9 @@
## 2.0.3
* Update SDK requirements.
+* Fix bug in `IsolateRunner.kill` with a zero duration.
+* Update some type from `Future` to `Future<void>`.
+* Make `LoadBalancer.runMultiple` properly generic.
## 2.0.1
@@ -40,5 +43,5 @@
## 0.1.0
* Initial version
-* Adds IsolateRunner as a helper around Isolate.
+* Adds `IsolateRunner` as a helper around Isolate.
* Adds single-message port helpers and a load balancer.
diff --git a/README.md b/README.md
index c992c06..c4d55d9 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,8 @@
[![Build Status](https://travis-ci.org/dart-lang/isolate.svg?branch=master)](https://travis-ci.org/dart-lang/isolate)
Helps with isolates and isolate communication in Dart.
+Requires the `dart:isolate` library being available.
+Isolates are not available for Dart on the web.
The package contains individual libraries with different purposes.
diff --git a/lib/isolate_runner.dart b/lib/isolate_runner.dart
index c1040a1..1317fd4 100644
--- a/lib/isolate_runner.dart
+++ b/lib/isolate_runner.dart
@@ -30,7 +30,7 @@
final SendPort _commandPort;
/// Future returned by [onExit]. Set when [onExit] is first read.
- Future _onExitFuture;
+ Future<void> _onExitFuture;
/// Create an [IsolateRunner] wrapper for [isolate]
///
@@ -99,14 +99,14 @@
/// f.then((_) => print("Dead")
/// .timeout(new Duration(...), onTimeout: () => print("No response"));
/// ```
- Future kill({Duration timeout = const Duration(seconds: 1)}) {
- Future onExit = singleResponseFuture(isolate.addOnExitListener);
+ Future<void> kill({Duration timeout = const Duration(seconds: 1)}) {
+ Future<void> onExit = singleResponseFuture(isolate.addOnExitListener);
if (Duration.zero == timeout) {
isolate.kill(priority: Isolate.immediate);
return onExit;
} else {
// Try a more gentle shutdown sequence.
- _commandPort.send(list1(_shutdown));
+ _commandPort.send(list2(_shutdown, null));
return onExit.timeout(timeout, onTimeout: () {
isolate.kill(priority: Isolate.immediate);
return onExit;
@@ -114,10 +114,10 @@
}
}
- /// Queries the isolate on whether it's alive.
- ///
- /// If the isolate is alive and responding to commands, the
- /// returned future completes with `true`.
+ /// Queries the isolate on whethreturner it's alive.
+ ///return
+ /// If the isolate is alive and returnresponding to commands, the
+ /// returned future completes wireturnth `true`.
///
/// If the other isolate is not alive (like after calling [kill]),
/// or doesn't answer within [timeout] for any other reason,
@@ -244,7 +244,7 @@
// so we can close the receive port for this future?
// Using [ping] for now.
if (_onExitFuture == null) {
- var channel = SingleResponseChannel();
+ var channel = SingleResponseChannel<void>();
isolate.addOnExitListener(channel.port);
_onExitFuture = channel.result.then(ignore);
ping().then((bool alive) {
@@ -286,16 +286,15 @@
void _handleCommand(List<Object> command) {
switch (command[0]) {
case _shutdown:
- SendPort responsePort = command[1];
_commandPort.close();
- responsePort.send(null);
- return;
+ (command[1] as SendPort)?.send(null);
+ break;
case _run:
- Function function = command[1];
+ var function = command[1] as Function;
var argument = command[2];
- SendPort responsePort = command[3];
+ var responsePort = command[3] as SendPort;
sendFutureResult(Future.sync(() => function(argument)), responsePort);
- return;
+ break;
}
}
}
diff --git a/lib/load_balancer.dart b/lib/load_balancer.dart
index bd41219..dcfa10c 100644
--- a/lib/load_balancer.dart
+++ b/lib/load_balancer.dart
@@ -24,9 +24,9 @@
int _length;
// Whether [stop] has been called.
- Future _stopFuture;
+ Future<void> _stopFuture;
- /// Create a load balancer for [service] with [size] isolates.
+ /// Create a load balancer backed by the [Runner]s of [runners].
LoadBalancer(Iterable<Runner> runners) : this._(_createEntries(runners));
LoadBalancer._(List<_LoadBalancerEntry> entries)
@@ -69,7 +69,8 @@
///
/// If [timeout] and [onTimeout] are provided, they are forwarded to
/// the runner running the function, which will handle a timeout
- /// as normal.
+ /// as normal. If the runners are running in other isolates, then
+ /// the [onTimeout] function must be a constant function.
Future<R> run<R, P>(FutureOr<R> function(P argument), argument,
{Duration timeout, FutureOr<R> onTimeout(), int load = 100}) {
RangeError.checkNotNegative(load, "load");
@@ -92,15 +93,17 @@
/// If [timeout] and [onTimeout] are provided, they are forwarded to
/// the runners running the function, which will handle any timeouts
/// as normal.
- List<Future> runMultiple(int count, function(argument), argument,
- {Duration timeout, onTimeout(), int load = 100}) {
+ List<Future<R>> runMultiple<R, P>(
+ int count, FutureOr<R> function(P argument), P argument,
+ {Duration timeout, FutureOr<R> onTimeout(), int load = 100}) {
RangeError.checkValueInInterval(count, 1, _length, "count");
RangeError.checkNotNegative(load, "load");
if (count == 1) {
- return list1(run(function, argument,
- load: load, timeout: timeout, onTimeout: onTimeout));
+ return List<Future<R>>(1)
+ ..[0] = run(function, argument,
+ load: load, timeout: timeout, onTimeout: onTimeout);
}
- List result = List<Future>(count);
+ var result = List<Future<R>>(count);
if (count == _length) {
// No need to change the order of entries in the queue.
for (int i = 0; i < count; i++) {
@@ -114,7 +117,9 @@
// command on each, then add them back to the queue.
// This avoids running the same command twice in the same
// isolate.
- List entries = List(count);
+ // We can't assume that the first [count] entries in the
+ // heap list are the least loaded.
+ var entries = List<_LoadBalancerEntry>(count);
for (int i = 0; i < count; i++) {
entries[i] = _removeFirst();
}
diff --git a/lib/registry.dart b/lib/registry.dart
index f8ed657..5b681dd 100644
--- a/lib/registry.dart
+++ b/lib/registry.dart
@@ -9,6 +9,7 @@
import "dart:collection" show HashMap, HashSet;
import "dart:isolate" show RawReceivePort, SendPort, Capability;
+import "isolate_runner.dart"; // For documentation.
import "ports.dart";
import "src/util.dart";
@@ -25,21 +26,70 @@
/// Objects can be stored as elements of a registry,
/// have "tags" assigned to them, and be looked up by tag.
///
-/// A `Registry` object caches objects found using the [lookup]
+/// Since the registry is identity based, the objects must not be numbers,
+/// strings, booleans or null. See [Expando] for description of which objects
+/// are not treated as having a clear identity.
+///
+/// A [Registry] object caches objects found using the [lookup]
/// method, or added using [add], and returns the same object every time
/// they are requested.
-/// A different `Registry` object that works on the same registry will not
-/// preserve the identity of elements
+/// A different [Registry] object that works on the same underlying registry,
+/// will not preserve the identity of elements
///
/// It is recommended to only have one `Registry` object working on the
/// same registry in each isolate.
///
/// When the registry is shared across isolates, both elements and tags must
/// be sendable between the isolates.
-/// Between isolates spawned using [Isolate.spawn] from the same initial
-/// isolate, most objects can be sent.
-/// Only simple objects can be sent between isolates originating from different
-/// [Isolate.spawnUri] calls.
+/// See [SendPort] for details on the restrictions on objects which can be sent
+/// between isolates.
+///
+/// A registry can be sued to make a number of object available to separate
+/// workers in different isolates, for example ones created using
+/// [IsolateRunner], without sending all the objects to all the isolates.
+/// A worker can then request the data it needs, and it can add new data
+/// to the registry that will also be shared with all other workers.
+/// Example:
+/// ```dart
+/// main() {
+/// Registry<List<String>> dictionaryByFirstLetter = Registry();
+/// for (var letter in alphabet) {
+/// registry.add(
+/// allWords.where((w) => w.startsWith(letter).toList,
+/// tags: [letter]);
+/// }
+/// var loadBalancer = LoadBalancer(10);
+/// for (var task in tasks) {
+/// loadBalancer.run(_runTask, [task, dictionaryByFirstLetter]);
+/// }
+/// }
+/// _runTask(task, Registry<List<String>> dictionaryByFirstLetter) async {
+/// ...
+/// // Fetch just the words starting with the needed letter.
+/// var aWords = await dictionaryByFirstLetter.lookup(tags: [task.letter]);
+/// ...
+/// }
+/// ```
+///
+/// A registry can be treated like a distributed multimap from tags to
+/// objects, if each tag is only used once. Example:
+/// ```dart
+/// Registry<Capability> capabilities = Registry();
+/// // local code:
+/// ...
+/// capabilities.add(Capability(), ["create"]);
+/// capabilities.add(Capability(), ["read"]);
+/// capabilities.add(Capability(), ["update"]);
+/// capabilities.add(Capability(), ["delete"]);
+/// ...
+/// sendPort.send(capabilities);
+///
+/// // other isolate code:
+/// Registry<Capability> capabilities = await receiveFromPort();
+///
+/// Future<Capability> get createCapability => (await
+/// capabilities.lookup(tags: const ["create"])).first;
+/// ```
class Registry<T> {
// Most operations fail if they haven't received a response for this duration.
final Duration _timeout;
@@ -153,7 +203,7 @@
return completer.future;
}
- /// Add tags to an object in the registry.
+ /// Add tags to objects in the registry.
///
/// Each element of the registry has a number of tags associated with
/// it. A tag is either associated with an element or not, adding it more
@@ -162,12 +212,12 @@
/// Tags are compared using [Object.==] equality.
///
/// Fails if any of the elements are not in the registry.
- Future addTags(Iterable<T> elements, Iterable tags) {
- List ids = elements.map(_getId).toList(growable: false);
+ Future addTags(Iterable<T> elements, Iterable<Object> tags) {
+ List<Object> ids = elements.map(_getId).toList(growable: false);
return _addTags(ids, tags);
}
- /// Remove tags from an object in the registry.
+ /// Remove tags from objects in the registry.
///
/// After this operation, the [elements] will not be associated to the [tags].
/// It doesn't matter whether the elements were associated with the tags
@@ -200,7 +250,7 @@
/// In that case, at most the first `max` results are returned,
/// in whatever order the registry finds its results.
/// Otherwise all matching elements are returned.
- Future<List<T>> lookup({Iterable tags, int max}) {
+ Future<List<T>> lookup({Iterable<Object> tags, int max}) {
if (max != null && max < 1) {
throw RangeError.range(max, 1, null, "max");
}
@@ -210,10 +260,10 @@
// Response is even-length list of (id, element) pairs.
_RegistryCache cache = _cache;
int count = response.length ~/ 2;
- List result = List(count);
+ List<T> result = List(count);
for (int i = 0; i < count; i++) {
- int id = response[i * 2];
- var element = response[i * 2 + 1];
+ var id = response[i * 2] as int;
+ var element = response[i * 2 + 1] as T;
element = cache.register(id, element);
result[i] = element;
}
diff --git a/lib/runner.dart b/lib/runner.dart
index 0f6cc98..e19cb03 100644
--- a/lib/runner.dart
+++ b/lib/runner.dart
@@ -49,5 +49,5 @@
/// If the runner has allocated resources, e.g., an isolate, it should
/// be released. No further calls to [run] should be made after calling
/// stop.
- Future close() => Future.value();
+ Future<void> close() => Future.value();
}
diff --git a/pubspec.yaml b/pubspec.yaml
index d1f1594..c5c4e08 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,12 +1,12 @@
name: isolate
-version: 2.0.3-dev
+version: 2.0.3
author: Dart Team <misc@dartlang.org>
description: >-
Utility functions and classes related to the 'dart:isolate' library.
homepage: https://github.com/dart-lang/isolate
environment:
- sdk: '>=2.0.0 <3.0.0'
+ sdk: '>=2.3.0 <3.0.0'
dev_dependencies:
pedantic: ^1.0.0