Add IsolateRunner as a helper around Isolate.
Add single-message port helpers and a load balancer.
R=sethladd@google.com
Review URL: https://codereview.chromium.org//928663003
diff --git a/.status b/.status
new file mode 100644
index 0000000..a69ed9e
--- /dev/null
+++ b/.status
@@ -0,0 +1,11 @@
+# Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+# for details. All rights reserved. Use of this source code is governed by a
+# BSD-style license that can be found in the LICENSE file.
+
+[ $runtime == vm ]
+test/isolaterunner_test: RuntimeError # addOnExitListener not implemented.
+
+[ $compiler == dart2js ]
+test/registry_test: CompileTimeError # Unimplemented: private symbol literals.
+# FunctionRef will be removed when the VM supports sending functions.
+test/isolaterunner_test: RuntimeError # Mirrors not working - FunctionRef broken.
\ No newline at end of file
diff --git a/AUTHORS b/AUTHORS
new file mode 100644
index 0000000..e8063a8
--- /dev/null
+++ b/AUTHORS
@@ -0,0 +1,6 @@
+# Below is a list of people and organizations that have contributed
+# to the project. Names should be added to the list like so:
+#
+# Name/Organization <email address>
+
+Google Inc.
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..372014e
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,8 @@
+# Changelog
+
+## 0.1.0
+
+- Initial version
+ Adds IsolateRunner as a helper around Isolate.
+ Adds single-message port helpers and a load balancer.
+
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
new file mode 100644
index 0000000..6f5e0ea
--- /dev/null
+++ b/CONTRIBUTING.md
@@ -0,0 +1,33 @@
+Want to contribute? Great! First, read this page (including the small print at
+the end).
+
+### Before you contribute
+Before we can use your code, you must sign the
+[Google Individual Contributor License Agreement](https://cla.developers.google.com/about/google-individual)
+(CLA), which you can do online. The CLA is necessary mainly because you own the
+copyright to your changes, even after your contribution becomes part of our
+codebase, so we need your permission to use and distribute your code. We also
+need to be sure of various other things—for instance that you'll tell us if you
+know that your code infringes on other people's patents. You don't have to sign
+the CLA until after you've submitted your code for review and a member has
+approved it, but you must do it before we can put your code into our codebase.
+
+Before you start working on a larger contribution, you should get in touch with
+us first through the issue tracker with your idea so that we can help out and
+possibly guide you. Coordinating up front makes it much easier to avoid
+frustration later on.
+
+### Code reviews
+All submissions, including submissions by project members, require review.
+
+### File headers
+All files in the project must start with the following header.
+
+ // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+ // for details. All rights reserved. Use of this source code is governed by a
+ // BSD-style license that can be found in the LICENSE file.
+
+### The small print
+Contributions made by corporations are covered by a different agreement than the
+one above, the
+[Software Grant and Corporate Contributor License Agreement](https://developers.google.com/open-source/cla/corporate).
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..de31e1a
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,26 @@
+Copyright 2015, the Dart project authors. All rights reserved.
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+ copyright notice, this list of conditions and the following
+ disclaimer in the documentation and/or other materials provided
+ with the distribution.
+ * Neither the name of Google Inc. nor the names of its
+ contributors may be used to endorse or promote products derived
+ from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..e02d3b7
--- /dev/null
+++ b/README.md
@@ -0,0 +1,34 @@
+# isolate
+
+The `isolate` package helps with isolates and isolate communication.
+
+The package contains individual libraries with different purposes.
+
+### Creating send ports and responding to messages.
+
+The "ports.dart" sub-library contains functionality
+for creating `SendPort`s and reacting to values sent to those ports.
+
+### Working with isolates and running functions in other isolates.
+
+The "isolaterunner.dart" sub-library introduces an `IsolateRunner` class
+that gives easy access to the `Isolate` functionality, and also
+gives a way to run new functions in the isolate repeatedly, instead of
+just on the initial `spawn` call.
+
+### A central registry for values that can be used accross isolates.
+
+The "registry.dart" sub-library provides a way to create an
+object registry, and give access to it accross different isolates.
+
+### Balancing load accross several isolates.
+
+The "loadbalancer.dart" sub-library can manage multiple `Runner` objects,
+including `IsolateRunner`, and run functions on the currently least loaded
+runner.
+
+## Features and bugs
+
+Please file feature requests and bugs at the [issue tracker][tracker].
+
+[tracker]: https://github.com/dart-lang/isolate/issues
diff --git a/example/http-server.dart b/example/http-server.dart
new file mode 100644
index 0000000..fc3b442
--- /dev/null
+++ b/example/http-server.dart
@@ -0,0 +1,119 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library dart.pkg.isolate.sample.httpserver;
+
+import "dart:io";
+import "dart:async";
+import "dart:isolate";
+import "package:isolate/isolaterunner.dart";
+import "package:isolate/runner.dart";
+import "package:isolate/ports.dart";
+
+typedef Future RemoteStop();
+
+Future<RemoteStop> runHttpServer(
+ Runner runner, ServerSocket socket, HttpListener listener) {
+ return runner.run(_startHttpServer, new List(2)..[0] = socket.reference
+ ..[1] = listener)
+ .then((SendPort stopPort) => () => _sendStop(stopPort));
+}
+
+Future _sendStop(SendPort stopPort) {
+ return singleResponseFuture(stopPort.send);
+}
+
+Future<SendPort> _startHttpServer(List args) {
+ ServerSocketReference ref = args[0];
+ HttpListener listener = args[1];
+ return ref.create().then((socket) {
+ return listener.start(new HttpServer.listenOn(socket));
+ }).then((_) {
+ return singleCallbackPort((SendPort resultPort) {
+ sendFutureResult(new Future.sync(listener.stop), resultPort);
+ });
+ });
+}
+
+/**
+ * An [HttpRequest] handler setup. Gets called when with the server, and
+ * is told when to stop listening.
+ *
+ * These callbacks allow the listener to set up handlers for HTTP requests.
+ * The object should be sendable to an equivalent isolate.
+ */
+abstract class HttpListener {
+ Future start(HttpServer server);
+ Future stop();
+}
+
+/**
+ * An [HttpListener] that sets itself up as an echo server.
+ *
+ * Returns the message content plus an ID describing the isolate that
+ * handled the request.
+ */
+class EchoHttpListener implements HttpListener {
+ StreamSubscription _subscription;
+ static int _id = new Object().hashCode;
+ SendPort _counter;
+
+ EchoHttpListener(this._counter);
+
+ start(HttpServer server) {
+ print("Starting isolate $_id");
+ _subscription = server.listen((HttpRequest request) {
+ request.response.addStream(request).then((_) {
+ _counter.send(null);
+ print("Request to $_id");
+ request.response.write("#$_id\n");
+ var t0 = new DateTime.now().add(new Duration(seconds:2));
+ while (new DateTime.now().isBefore(t0));
+ print("Response from $_id");
+ request.response.close();
+ });
+ });
+ }
+
+ stop() {
+ print("Stopping isolate $_id");
+ _subscription.cancel();
+ _subscription = null;
+ }
+}
+
+main(args) {
+ int port = 0;
+ if (args.length > 0) {
+ port = int.parse(args[0]);
+ }
+ RawReceivePort counter = new RawReceivePort();
+ HttpListener listener = new EchoHttpListener(counter.sendPort);
+ ServerSocket
+ .bind(InternetAddress.ANY_IP_V6, port)
+ .then((ServerSocket socket) {
+ port = socket.port;
+ return Future.wait(new Iterable.generate(5, (_) => IsolateRunner.spawn()),
+ cleanUp: (isolate) { isolate.close(); })
+ .then((List<IsolateRunner> isolates) {
+ return Future.wait(isolates.map((IsolateRunner isolate) {
+ return runHttpServer(isolate, socket, listener);
+ }), cleanUp: (server) { server.stop(); });
+ })
+ .then((stoppers) {
+ socket.close();
+ int count = 25;
+ counter.handler = (_) {
+ count--;
+ if (count == 0) {
+ stoppers.forEach((f) => f());
+ counter.close();
+ }
+ };
+ print("Server listening on port $port for 25 requests");
+ print("Test with:");
+ print(" ab -c10 -n 25 http://localhost:$port/");
+ });
+ });
+}
diff --git a/example/runner-pool.dart b/example/runner-pool.dart
new file mode 100644
index 0000000..e81e496
--- /dev/null
+++ b/example/runner-pool.dart
@@ -0,0 +1,58 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library dart.pkg.isolate.sample.runners;
+
+import "package:isolate/loadbalancer.dart";
+import "package:isolate/isolaterunner.dart";
+import "dart:async" show Future, Completer;
+
+
+void main() {
+ int N = 44;
+ var sw = new Stopwatch()..start();
+ // Compute fib up to 42 with 4 isolates.
+ parfib(N, 4).then((v1) {
+ var t1 = sw.elapsedMilliseconds;
+ sw.stop();
+ sw.reset();
+ print("fib#4(${N}) = ${v1[N]}, ms: $t1");
+ sw.start();
+ // Then compute fib up to 42 with 2 isolates.
+ parfib(N, 2).then((v2) {
+ var t2 = sw.elapsedMilliseconds;
+ sw.stop();
+ print("fib#2(${N}) = ${v2[N]}, ms: $t2");
+ });
+ });
+}
+
+// Compute fibonnacci 1..limit
+Future<List<int>> parfib(int limit, int parallelity) {
+ return LoadBalancer.create(parallelity, IsolateRunner.spawn).then(
+ (LoadBalancer pool) {
+ List<Future> fibs = new List(limit + 1);
+ // Schedule all calls with exact load value and the heaviest task
+ // assigned first.
+ schedule(a, b, i) {
+ if (i < limit) {
+ schedule(a + b, a, i + 1);
+ }
+ fibs[i] = pool.run(fib, i, load: a);
+ }
+ schedule(0, 1, 0);
+ // And wait for them all to complete.
+ return Future.wait(fibs).whenComplete(pool.close);
+ });
+}
+
+int computeFib(n) {
+ int result = fib(n);
+ return result;
+}
+
+int fib(n) {
+ if (n < 2) return n;
+ return fib(n - 1) + fib(n - 2);
+}
diff --git a/lib/isolate.dart b/lib/isolate.dart
new file mode 100644
index 0000000..9e838b8
--- /dev/null
+++ b/lib/isolate.dart
@@ -0,0 +1,14 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/**
+ * Utilities for working with isolates and isolate communication.
+ */
+library dart.pkg.isolate;
+
+export "isolaterunner.dart";
+export "loadbalancer.dart";
+export "ports.dart";
+export "registry.dart";
+export "runner.dart";
diff --git a/lib/isolaterunner.dart b/lib/isolaterunner.dart
new file mode 100644
index 0000000..7e15a40
--- /dev/null
+++ b/lib/isolaterunner.dart
@@ -0,0 +1,320 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library dart.pkg.isolate.isolaterunner;
+
+import "dart:isolate";
+import "dart:async";
+import "runner.dart";
+import "ports.dart";
+import "src/functionref.dart";
+import "src/lists.dart";
+
+// Command tags. Shared between IsolateRunner and IsolateRunnerRemote.
+const int _SHUTDOWN = 0;
+const int _RUN = 1;
+
+/**
+ * An easier to use interface on top of an [Isolate].
+ *
+ * Wraps an `Isolate` and allows pausing, killing and inspecting
+ * the isolate more conveniently than the raw `Isolate` methods.
+ *
+ * Also allows running simple functions in the other isolate, and get back
+ * the result.
+ */
+class IsolateRunner implements Runner {
+ /** The underlying [Isolate] object of the isolate being controlled. */
+ final Isolate isolate;
+
+ /** Command port for the [IsolateRunnerRemote]. */
+ final SendPort _commandPort;
+
+ /** Future returned by [onExit]. Set when [onExit] is first read. */
+ Future _onExitFuture;
+
+ /**
+ * Create an [IsolateRunner] wrapper for [isolate]
+ *
+ * The preferred way to create an `IsolateRunner` is to use [spawn]
+ * to create a new isolate and a runner for it.
+ *
+ * This constructor allows creating a runner for an already existing
+ * isolate.
+ * The [commandPort] must be the [IsolateRunnerRemote.commandPort] of
+ * a remote running in that isolate.
+ */
+ IsolateRunner(this.isolate, SendPort commandPort)
+ : _commandPort = commandPort;
+
+ /**
+ * Create a new [Isolate], as by [Isolate.spawn] and wrap that.
+ *
+ * The returned [IsolateRunner] forwards operations to the new isolate,
+ * and keeps a port open in the new isolate that receives commands
+ * from the `IsolateRunner`. Remember to [close] the `IsolateRunner` when
+ * it's no longer needed.
+ */
+ static Future<IsolateRunner> spawn() {
+ Completer portCompleter = new Completer.sync();
+ SendPort initPort = singleCompletePort(portCompleter);
+ return Isolate.spawn(IsolateRunnerRemote._create, initPort)
+ .then((Isolate isolate) {
+ // TODO: Add when VM supports it.
+ // isolate.setErrorsFatal(false);
+ return portCompleter.future.then((SendPort commandPort) {
+ var result = new IsolateRunner(isolate, commandPort);
+ // Guarantees that setErrorsFatal has completed.
+ return result.ping().then((_) => result);
+ });
+ });
+ }
+
+ /**
+ * Closes the `IsolateRunner` communication down.
+ *
+ * If the isolate isn't running something else to keep it alive,
+ * this will also make the isolate shut down.
+ *
+ * Can be used to create an isolate, use [run] to start a service, and
+ * then drop the connection and let the service control the isolate's
+ * life cycle.
+ */
+ Future close() {
+ Completer portCompleter = new Completer.sync();
+ SendPort closePort = singleCallbackPort(portCompleter.complete);
+ _commandPort.send(list2(_SHUTDOWN, closePort));
+ return portCompleter.future;
+ }
+
+ /**
+ * Kills the isolate.
+ *
+ * Starts by calling [close], but if that doesn't cause the isolate to
+ * shut down in a timely manner, as given by [timeout], it follows up
+ * with [Isolate.kill], with increasing urgency if necessary.
+ *
+ * If [timeout] is a zero duration, it goes directly to the most urgent
+ * kill.
+ *
+ * If the isolate is already dead, the returned future will not complete.
+ * If that may be the case, use [Future.timeout] on the returned future
+ * to take extra action after a while. Example:
+ *
+ * var f = isolate.kill();
+ * f.then((_) => print('Dead')
+ * .timeout(new Duration(...), onTimeout: () => print('No response'));
+ */
+ Future kill({Duration timeout: const Duration(seconds: 1)}) {
+ Future onExit = singleResponseFuture(isolate.addOnExitListener);
+ if (Duration.ZERO == timeout) {
+ isolate.kill(Isolate.IMMEDIATE);
+ return onExit;
+ } else {
+ // Try a more gentle shutdown sequence.
+ _commandPort.send(list1(_SHUTDOWN));
+ return onExit.timeout(timeout, onTimeout: () {
+ isolate.kill(Isolate.IMMEDIATE);
+ return onExit;
+ });
+ }
+ }
+
+ /**
+ * Queries the isolate on whether it's alive.
+ *
+ * If the isolate is alive and responding to commands, the
+ * returned future completes with `true`.
+ *
+ * If the other isolate is not alive (like after calling [kill]),
+ * or doesn't answer within [timeout] for any other reason,
+ * the returned future completes with `false`.
+ *
+ * Guaranteed to only complete after all previous sent isolate commands
+ * (like pause and resume) have been handled.
+ * Paused isolates do respond to ping requests.
+ */
+ Future<bool> ping({Duration timeout: const Duration(seconds: 1)}) {
+ Completer completer = new Completer<bool>();
+ SendPort port = singleCompletePort(completer,
+ callback: _kTrue,
+ timeout: timeout,
+ onTimeout: _kFalse);
+ isolate.ping(port);
+ return completer.future;
+ }
+
+ static bool _kTrue(_) => true;
+ static bool _kFalse() => false;
+
+ /**
+ * Pauses the isolate.
+ *
+ * While paused, no normal messages are processed, and calls to [run] will
+ * be delayed until the isolate is resumed.
+ *
+ * Commands like [kill] and [ping] are still executed while the isolate is
+ * paused.
+ *
+ * If [resumeCapability] is omitted, it defaults to the [isolate]'s
+ * [Isolate.pauseCapability].
+ *
+ * Calling pause more than once with the same `resumeCapability`
+ * has no further effect. Only a single call to [resume] is needed
+ * to resume the isolate.
+ */
+ void pause([Capability resumeCapability]) {
+ if (resumeCapability == null) resumeCapability = isolate.pauseCapability;
+ isolate.pause(resumeCapability);
+ }
+
+ /**
+ * Resumes after a [pause].
+ *
+ * If [resumeCapability] is omitted, it defaults to the isolate's
+ * [Isolate.pauseCapability].
+ *
+ * Even if `pause` has been called more than once with the same
+ * `resumeCapability`, a single resume call with stop the pause.
+ */
+ void resume([Capability resumeCapability]) {
+ if (resumeCapability == null) resumeCapability = isolate.pauseCapability;
+ isolate.resume(resumeCapability);
+ }
+
+ /**
+ * Execute `function(argument)` in the isolate and return the result.
+ *
+ * Sends [function] and [argument] to the isolate, runs the call, and
+ * returns the result, whether it returned a value or threw.
+ * If the call returns a [Future], the final result of that future
+ * will be returned.
+ *
+ * This works similar to the arguments to [Isolate.spawn], except that
+ * it runs in the existing isolate and the return value is returned to
+ * the caller.
+ *
+ * Example:
+ *
+ * IsolateRunner iso = await IsolateRunner.spawn();
+ * try {
+ * return await iso.run(heavyComputation, argument);
+ * } finally {
+ * await iso.close();
+ * }
+ */
+ Future run(function(argument), argument,
+ {Duration timeout, onTimeout()}) {
+ return singleResultFuture((SendPort port) {
+ _commandPort.send(
+ list4(_RUN, FunctionRef.from(function), argument, port));
+ }, timeout: timeout, onTimeout: onTimeout);
+ }
+
+ /**
+ * A broadcast stream of uncaught errors from the isolate.
+ *
+ * When listening on the stream, errors from the isolate will be reported
+ * as errors in the stream. Be ready to handle the errors.
+ *
+ * The stream closes when the isolate shuts down.
+ */
+ Stream get errors {
+ StreamController controller;
+ RawReceivePort port;
+ void handleError(message) {
+ if (message == null) {
+ // Isolate shutdown.
+ port.close();
+ controller.close();
+ } else {
+ // Uncaught error.
+ String errorDescription = message[0];
+ String stackDescription = message[1];
+ var error = new RemoteError(errorDescription, stackDescription);
+ controller.addError(error, error.stackTrace);
+ }
+ }
+ controller = new StreamController.broadcast(
+ sync: true,
+ onListen: () {
+ port = new RawReceivePort(handleError);
+ // TODO: When supported, uncomment this.
+ // isolate.addErrorListener(port.sendPort);
+ // isolate.addOnExitListener(port.sendPort);
+ // And remove the send below, which acts as an immediate close.
+ port.sendPort.send(null);
+ },
+ onCancel: () {
+ port.close();
+ // this.removeErrorListener(port.sendPort);
+ // this.removeOnExitListener(port.sendPort);
+ port = null;
+ });
+ return controller.stream;
+ }
+
+ /**
+ * Waits for the [isolate] to terminate.
+ *
+ * Completes the returned future when the isolate terminates.
+ *
+ * If the isolate has already stopped responding to commands,
+ * the returned future will never terminate.
+ */
+ Future get onExit {
+ // TODO(lrn): Is there a way to see if an isolate is dead
+ // so we can close the receive port for this future?
+ if (_onExitFuture == null) {
+ _onExitFuture = singleResponseFuture(isolate.addOnExitListener);
+ }
+ return _onExitFuture;
+ }
+}
+
+/**
+ * The remote part of an [IsolateRunner].
+ *
+ * The `IsolateRunner` sends commands to the controlled isolate through
+ * the `IsolateRunnerRemote` [commandPort].
+ *
+ * Only use this class if you need to set up the isolate manually
+ * instead of relying on [IsolateRunner.spawn].
+ */
+class IsolateRunnerRemote {
+ final RawReceivePort _commandPort = new RawReceivePort();
+ IsolateRunnerRemote() {
+ _commandPort.handler = _handleCommand;
+ }
+
+ /**
+ * The command port that can be used to send commands to this remote.
+ *
+ * Use this as argument to [new IsolateRunner] if creating the link
+ * manually, otherwise it's handled by [IsolateRunner.spawn].
+ */
+ SendPort get commandPort => _commandPort.sendPort;
+
+ static void _create(SendPort initPort) {
+ var remote = new IsolateRunnerRemote();
+ initPort.send(remote.commandPort);
+ }
+
+ void _handleCommand(List command) {
+ switch (command[0]) {
+ case _SHUTDOWN:
+ SendPort responsePort = command[1];
+ _commandPort.close();
+ responsePort.send(null);
+ return;
+ case _RUN:
+ Function function = command[1].function;
+ var argument = command[2];
+ SendPort responsePort = command[3];
+ sendFutureResult(new Future.sync(() => function(argument)),
+ responsePort);
+ return;
+ }
+ }
+}
diff --git a/lib/loadbalancer.dart b/lib/loadbalancer.dart
new file mode 100644
index 0000000..fdb5d0b
--- /dev/null
+++ b/lib/loadbalancer.dart
@@ -0,0 +1,305 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/**
+ * A load-balancing runner pool.
+ */
+library dart.pkg.isolate.loadbalancer;
+
+import "runner.dart";
+import "src/errors.dart";
+import "src/lists.dart";
+import "dart:async" show Future;
+
+/**
+ * A pool of runners, ordered by load.
+ *
+ * Keeps a pool of runners,
+ * and allows running function through the runner with the lowest current load.
+ */
+class LoadBalancer implements Runner {
+ // A heap-based priority queue of entries, prioritized by `load`.
+ // Each entry has its own entry in the queue, for faster update.
+ List<_LoadBalancerEntry> _queue;
+
+ // The number of entries currently in the queue.
+ int _length;
+
+ // Whether [stop] has been called.
+ Future _stopFuture = null;
+
+ /**
+ * Create a load balancer for [service] with [size] isolates.
+ */
+ LoadBalancer(Iterable<Runner> runners) : this._(_createEntries(runners));
+
+ LoadBalancer._(List<_LoadBalancerEntry> entries)
+ : _queue = entries,
+ _length = entries.length {
+ for (int i = 0; i < _length; i++) {
+ _queue[i].queueIndex = i;
+ }
+ }
+
+ /**
+ * The number of runners currently in the pool.
+ */
+ int get length => _length;
+
+ /**
+ * Asynchronously create [size] runners and create a `LoadBalancer` of those.
+ *
+ * This is a helper function that makes it easy to create a `LoadBalancer`
+ * with asynchronously created runners, for example:
+ *
+ * var isolatePool = LoadBalancer.create(10, IsolateRunner.spawn);
+ */
+ static Future<LoadBalancer> create(int size, Future<Runner> createRunner()) {
+ return Future.wait(new Iterable.generate(size, (_) => createRunner()),
+ cleanUp: (Runner runner) { runner.close(); })
+ .then((runners) => new LoadBalancer(runners));
+ }
+
+ static List<_LoadBalancerEntry> _createEntries(Iterable<Runner> runners) {
+ var entries = runners.map((runner) => new _LoadBalancerEntry(runner));
+ return new List<_LoadBalancerEntry>.from(entries, growable: false);
+ }
+
+ /**
+ * Execute the command in the currently least loaded isolate.
+ *
+ * The optional [load] parameter represents the load that the command
+ * is causing on the isolate where it runs.
+ * The number has no fixed meaning, but should be seen as relative to
+ * other commands run in the same load balancer.
+ * The `load` must not be negative.
+ *
+ * If [timeout] and [onTimeout] are provided, they are forwarded to
+ * the runner running the function, which will handle a timeout
+ * as normal.
+ */
+ Future run(function(argument), argument, {Duration timeout,
+ onTimeout(),
+ int load: 100}) {
+ RangeError.checkNotNegative(load, "load");
+ _LoadBalancerEntry entry = _first;
+ _increaseLoad(entry, load);
+ return entry.run(this, load, function, argument, timeout, onTimeout);
+ }
+
+ /**
+ * Execute the same function in the least loaded [count] isolates.
+ *
+ * This guarantees that the function isn't run twice in the same isolate,
+ * so `count` is not allowed to exceed [length].
+ *
+ * The optional [load] parameter represents the load that the command
+ * is causing on the isolate where it runs.
+ * The number has no fixed meaning, but should be seen as relative to
+ * other commands run in the same load balancer.
+ * The `load` must not be negative.
+ *
+ * If [timeout] and [onTimeout] are provided, they are forwarded to
+ * the runners running the function, which will handle any timeouts
+ * as normal.
+ */
+ List<Future> runMultiple(int count, function(argument), argument,
+ {Duration timeout,
+ onTimeout(),
+ int load: 100}) {
+ RangeError.checkValueInInterval(count, 1, _length, "count");
+ RangeError.checkNotNegative(load, "load");
+ if (count == 1) {
+ return list1(run(function, argument, load: load,
+ timeout: timeout, onTimeout: onTimeout));
+ }
+ List result = new List<Future>(count);
+ if (count == _length) {
+ // No need to change the order of entries in the queue.
+ for (int i = 0; i < count; i++) {
+ _LoadBalancerEntry entry = _queue[i];
+ entry.load += load;
+ result[i] = entry.run(this, load, function, argument,
+ timeout, onTimeout);
+ }
+ } else {
+ // Remove the [count] least loaded services and run the
+ // command on each, then add them back to the queue.
+ // This avoids running the same command twice in the same
+ // isolate.
+ List entries = new List(count);
+ for (int i = 0; i < count; i++) {
+ entries[i] = _removeFirst();
+ }
+ for (int i = 0; i < count; i++) {
+ _LoadBalancerEntry entry = entries[i];
+ entry.load += load;
+ _add(entry);
+ result[i] = entry.run(this, load, function, argument,
+ timeout, onTimeout);
+ }
+ }
+ return result;
+ }
+
+ Future close() {
+ if (_stopFuture != null) return _stopFuture;
+ _stopFuture = MultiError.waitUnordered(_queue.take(_length)
+ .map((e) => e.close()));
+ // Remove all entries.
+ for (int i = 0; i < _length; i++) _queue[i].queueIndex = -1;
+ _queue = null;
+ _length = 0;
+ return _stopFuture;
+ }
+
+ /**
+ * Place [element] in heap at [index] or above.
+ *
+ * Put element into the empty cell at `index`.
+ * While the `element` has higher priority than the
+ * parent, swap it with the parent.
+ */
+ void _bubbleUp(_LoadBalancerEntry element, int index) {
+ while (index > 0) {
+ int parentIndex = (index - 1) ~/ 2;
+ _LoadBalancerEntry parent = _queue[parentIndex];
+ if (element.compareTo(parent) > 0) break;
+ _queue[index] = parent;
+ parent.queueIndex = index;
+ index = parentIndex;
+ }
+ _queue[index] = element;
+ element.queueIndex = index;
+ }
+
+ /**
+ * Place [element] in heap at [index] or above.
+ *
+ * Put element into the empty cell at `index`.
+ * While the `element` has lower priority than either child,
+ * swap it with the highest priority child.
+ */
+ void _bubbleDown(_LoadBalancerEntry element, int index) {
+ while (true) {
+ int childIndex = index * 2 + 1; // Left child index.
+ if (childIndex >= _length) break;
+ _LoadBalancerEntry child = _queue[childIndex];
+ int rightChildIndex = childIndex + 1;
+ if (rightChildIndex < _length) {
+ _LoadBalancerEntry rightChild = _queue[rightChildIndex];
+ if (rightChild.compareTo(child) < 0) {
+ childIndex = rightChildIndex;
+ child = rightChild;
+ }
+ }
+ if (element.compareTo(child) <= 0) break;
+ _queue[index] = child;
+ child.queueIndex = index;
+ index = childIndex;
+ }
+ _queue[index] = element;
+ element.queueIndex = index;
+ }
+
+ /**
+ * Removes the entry from the queue, but doesn't stop its service.
+ *
+ * The entry is expected to be either added back to the queue
+ * immediately or have its stop method called.
+ */
+ void _remove(_LoadBalancerEntry entry) {
+ int index = entry.queueIndex;
+ if (index < 0) return;
+ entry.queueIndex = -1;
+ _length--;
+ _LoadBalancerEntry replacement = _queue[_length];
+ _queue[_length] = null;
+ if (index < _length) {
+ if (entry.compareTo(replacement) < 0) {
+ _bubbleDown(replacement, index);
+ } else {
+ _bubbleUp(replacement, index);
+ }
+ }
+ }
+
+ /**
+ * Adds entry to the queue.
+ */
+ void _add(_LoadBalancerEntry entry) {
+ if (_stopFuture != null) throw new StateError("LoadBalancer is stopped");
+ assert(entry.queueIndex < 0);
+ if (_queue.length == _length) {
+ _grow();
+ }
+ int index = _length;
+ _length = index + 1;
+ _bubbleUp(entry, index);
+ }
+
+ void _increaseLoad(_LoadBalancerEntry entry, int load) {
+ assert(load >= 0);
+ entry.load += load;
+ if (entry.inQueue) {
+ _bubbleDown(entry, entry.queueIndex);
+ }
+ }
+
+ void _decreaseLoad(_LoadBalancerEntry entry, int load) {
+ assert(load >= 0);
+ entry.load -= load;
+ if (entry.inQueue) {
+ _bubbleUp(entry, entry.queueIndex);
+ }
+ }
+
+ void _grow() {
+ List newQueue = new List(_length * 2);
+ newQueue.setRange(0, _length, _queue);
+ _queue = newQueue;
+ }
+
+ _LoadBalancerEntry get _first {
+ assert(_length > 0);
+ return _queue[0];
+ }
+
+ _LoadBalancerEntry _removeFirst() {
+ _LoadBalancerEntry result = _first;
+ _remove(result);
+ return result;
+ }
+}
+
+class _LoadBalancerEntry implements Comparable<_LoadBalancerEntry> {
+ // The current load on the isolate.
+ int load = 0;
+ // The current index in the heap-queue.
+ // Negative when the entry is not part of the queue.
+ int queueIndex = -1;
+
+ // The service used to send commands to the other isolate.
+ Runner runner;
+
+ _LoadBalancerEntry(Runner runner)
+ : runner = runner;
+
+ /** Whether the entry is still in the queue. */
+ bool get inQueue => queueIndex >= 0;
+
+ Future run(LoadBalancer balancer, int load, function(argumen), argument,
+ Duration timeout, onTimeout()) {
+ return runner.run(function, argument,
+ timeout: timeout, onTimeout: onTimeout).whenComplete(() {
+ balancer._decreaseLoad(this, load);
+ });
+ }
+
+ Future close() => runner.close();
+
+ int compareTo(_LoadBalancerEntry other) => load - other.load;
+}
+
+
diff --git a/lib/ports.dart b/lib/ports.dart
new file mode 100644
index 0000000..c8de655
--- /dev/null
+++ b/lib/ports.dart
@@ -0,0 +1,269 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/**
+ * Utility functions for setting up ports and sending data.
+ *
+ * This library contains a number of functions that handle the
+ * boiler-plate of setting up a receive port and receiving a
+ * single message on the port.
+ *
+ * There are different functions that offer different ways to
+ * handle the incoming message.
+ *
+ * The simplest function, [singleCallbackPort], takes a callback
+ * and returns a port, and then calls the callback for the first
+ * message sent on the port.
+ *
+ * Other functions intercept the returned value and either
+ * does something with it, or puts it into a [Future] or [Completer].
+ */
+library dart.pkg.isolate.ports;
+
+import "dart:isolate";
+import "dart:async";
+import "src/lists.dart";
+
+/**
+ * Create a [SendPort] that accepts only one message.
+ *
+ * The [callback] function is called once, with the first message
+ * received by the receive port.
+ * All futher messages are ignored.
+ *
+ * If [timeout] is supplied, it is used as a limit on how
+ * long it can take before the message is received. If a
+ * message isn't received in time, the `callback` function
+ * is called once with the [timeoutValue] instead.
+ *
+ * Returns the `SendPort` expecting the single message.
+ *
+ * Equivalent to:
+ *
+ * (new ReceivePort()
+ * ..first.timeout(duration, () => timeoutValue).then(callback))
+ * .sendPort
+ */
+SendPort singleCallbackPort(void callback(response),
+ {Duration timeout,
+ var timeoutValue}) {
+ RawReceivePort responsePort = new RawReceivePort();
+ Zone zone = Zone.current;
+ callback = zone.registerUnaryCallback(callback);
+ var timer;
+ responsePort.handler = (response) {
+ responsePort.close();
+ if (timer != null) timer.cancel();
+ zone.runUnary(callback, response);
+ };
+ if (timeout != null) {
+ timer = new Timer(timeout, () {
+ responsePort.close();
+ callback(timeoutValue);
+ });
+ }
+ return responsePort.sendPort;
+}
+
+/**
+ * Create a [SendPort] that accepts only one message.
+ *
+ * When the first message is received, the [callback] function is
+ * called with the message as argument,
+ * and the [completer] is completed with the result of that call.
+ * All further messages are ignored.
+ *
+ * If `callback` is omitted, it defaults to an identity function.
+ * The `callback` call may return a future, and the completer will
+ * wait for that future to complete.
+ *
+ * If [timeout] is supplied, it is used as a limit on how
+ * long it can take before the message is received. If a
+ * message isn't received in time, the [onTimeout] is called,
+ * and `completer` is completed with the result of that call
+ * instead.
+ * The [callback] function will not be interrupted by the time-out,
+ * as long as the initial message is received in time.
+ * If `onTimeout` is omitted, it defaults to completing the `completer` with
+ * a [TimeoutException].
+ *
+ * The [completer] may be a synchronous completer. It is only
+ * completed in response to another event, either a port message or a timer.
+ *
+ * Returns the `SendPort` expecting the single message.
+ */
+SendPort singleCompletePort(Completer completer,
+ {callback(message),
+ Duration timeout,
+ onTimeout()}) {
+ if (callback == null && timeout == null) {
+ return singleCallbackPort(completer.complete);
+ }
+ RawReceivePort responsePort = new RawReceivePort();
+ var timer;
+ if (callback == null) {
+ responsePort.handler = (response) {
+ responsePort.close();
+ if (timer != null) timer.cancel();
+ completer.complete(response);
+ };
+ } else {
+ Zone zone = Zone.current;
+ Function action = zone.registerUnaryCallback((response) {
+ completer.complete(new Future.sync(() => callback(response)));
+ });
+ responsePort.handler = (response) {
+ responsePort.close();
+ if (timer != null) timer.cancel();
+ zone.runUnary(action, response);
+ };
+ }
+ if (timeout != null) {
+ timer = new Timer(timeout, () {
+ responsePort.close();
+ if (onTimeout != null) {
+ completer.complete(new Future.sync(onTimeout));
+ } else {
+ completer.completeError(new TimeoutException("Future not completed",
+ timeout));
+ }
+ });
+ }
+ return responsePort.sendPort;
+}
+
+/**
+ * Creates a [Future], and a [SendPort] that can be used to complete that
+ * future.
+ *
+ * Calls [action] with the response `SendPort`, then waits for someone
+ * to send a value on that port
+ * The returned `Future` is completed with the value sent on the port.
+ *
+ * If [action] throws, which it shouldn't,
+ * the returned future is completed with that error.
+ * Any return value of `action` is ignored, and if it is asynchronous,
+ * it should handle its own errors.
+ *
+ * If [timeout] is supplied, it is used as a limit on how
+ * long it can take before the message is received. If a
+ * message isn't received in time, the [timeoutValue] used
+ * as the returned future's value instead.
+ *
+ * If you want a timeout on the returned future, it's recommended to
+ * use the [timeout] parameter, and not [Future.timeout] on the result.
+ * The `Future` method won't be able to close the underlying [ReceivePort].
+ */
+Future singleResponseFuture(void action(SendPort responsePort),
+ {Duration timeout,
+ var timeoutValue}) {
+ Completer completer = new Completer.sync();
+ RawReceivePort responsePort = new RawReceivePort();
+ Timer timer;
+ Zone zone = Zone.current;
+ responsePort.handler = (v) {
+ responsePort.close();
+ if (timer != null) timer.cancel();
+ zone.run(() {
+ completer.complete(v);
+ });
+ };
+ if (timeout != null) {
+ timer = new Timer(timeout, () {
+ responsePort.close();
+ completer.complete(timeoutValue);
+ });
+ }
+ try {
+ action(responsePort.sendPort);
+ } catch (e, s) {
+ responsePort.close();
+ if (timer != null) timer.cancel();
+ // Delay completion because completer is sync.
+ scheduleMicrotask(() { completer.completeError(e, s); });
+ }
+ return completer.future;
+}
+
+
+/**
+ * Send the result of a future, either value or error, as a message.
+ *
+ * The result of [future] is sent on [resultPort] in a form expected by
+ * either [receiveFutureResult], [completeFutureResult], or
+ * by the port of [singleResultFuture].
+ */
+void sendFutureResult(Future future, SendPort resultPort) {
+ future.then((v) { resultPort.send(list1(v)); },
+ onError: (e, s) { resultPort.send(list2("$e", "$s")); });
+}
+
+
+/**
+ * Creates a [Future], and a [SendPort] that can be used to complete that
+ * future.
+ *
+ * Calls [action] with the response `SendPort`, then waits for someone
+ * to send a future result on that port using [sendFutureResult].
+ * The returned `Future` is completed with the future result sent on the port.
+ *
+ * If [action] throws, which it shouldn't,
+ * the returned future is completed with that error,
+ * unless someone manages to send a message on the port before `action` throws.
+ *
+ * If [timeout] is supplied, it is used as a limit on how
+ * long it can take before the message is received. If a
+ * message isn't received in time, the [onTimeout] is called,
+ * and the future is completed with the result of that call
+ * instead.
+ * If `onTimeout` is omitted, it defaults to throwing
+ * a [TimeoutException].
+ */
+Future singleResultFuture(void action(SendPort responsePort),
+ {Duration timeout,
+ onTimeout()}) {
+ Completer completer = new Completer.sync();
+ SendPort port = singleCompletePort(completer,
+ callback: receiveFutureResult,
+ timeout: timeout,
+ onTimeout: onTimeout);
+ try {
+ action(port);
+ } catch (e, s) {
+ // This should not happen.
+ sendFutureResult(new Future.error(e, s), port);
+ }
+ return completer.future;
+}
+
+/**
+ * Completes a completer with a message created by [sendFutureResult]
+ *
+ * The [response] must be a message on the format sent by [sendFutureResult].
+ */
+void completeFutureResult(var response, Completer completer) {
+ if (response.length == 2) {
+ var error = new RemoteError(response[0], response[1]);
+ completer.completeError(error, error.stackTrace);
+ } else {
+ var result = response[0];
+ completer.complete(result);
+ }
+}
+
+
+/**
+ * Convertes a received message created by [sendFutureResult] to a future
+ * result.
+ *
+ * The [response] must be a message on the format sent by [sendFutureResult].
+ */
+Future receiveFutureResult(var response) {
+ if (response.length == 2) {
+ var error = new RemoteError(response[0], response[1]);
+ return new Future.error(error, error.stackTrace);
+ }
+ var result = response[0];
+ return new Future.value(result);
+}
diff --git a/lib/registry.dart b/lib/registry.dart
new file mode 100644
index 0000000..9e87c11
--- /dev/null
+++ b/lib/registry.dart
@@ -0,0 +1,497 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/**
+ * An isolate-compatible object registry and lookup service.
+ */
+library dart.pkg.isolate.registry;
+
+import "dart:async" show Future, Completer, TimeoutException;
+import "dart:isolate" show RawReceivePort, SendPort, Capability;
+import "dart:collection" show HashMap, HashSet;
+import "ports.dart";
+import "src/lists.dart";
+
+// Command tags.
+const int _ADD = 0;
+const int _REMOVE = 1;
+const int _ADD_TAGS = 2;
+const int _REMOVE_TAGS = 3;
+const int _GET_TAGS = 4;
+const int _FIND = 5;
+
+/**
+ * An isolate-compatible object registry.
+ *
+ * Objects can be stored as elements of a registry,
+ * have "tags" assigned to them, and be looked up by tag.
+ *
+ * A `Registry` object caches objects found using the [lookup]
+ * method, or added using [add], and returns the same object every time
+ * they are requested.
+ * A different `Registry` object that works on the same registry will not
+ * preserve the identity of elements
+ *
+ * It is recommended to only have one `Registry` object working on the
+ * same registry in each isolate.
+ *
+ * When the registry is shared accross isolates, both elements and tags must
+ * be sendable between the isolates.
+ * Between isolates spawned using [Isolate.spawn] from the same initial
+ * isolate, most objectes can be sent.
+ * Only simple objects can be sent between isolates originating from different
+ * [Isolate.spawnUri] calls.
+ */
+class Registry<T> {
+ // Most operations fail if they haven't received a response for this duration.
+ final Duration _timeout;
+
+ // Each `Registry` object has a cache of objects being controlled by it.
+ // The cache is stored in an [Expando], not on the object.
+ // This allows sending the `Registry` object through a `SendPort` without
+ // also copying the cache.
+ static Expando _caches = new Expando();
+
+ /**
+ * Port for sending command to the central registry mananger.
+ */
+ SendPort _commandPort;
+
+ /**
+ * Create a registry linked to a [RegistryManager] through [commandPort].
+ *
+ * In most cases, a registry is created by using the
+ * [RegistryManager.registry] getter.
+ *
+ * If a registry is used between isolates created using [Isolate.spawnUri],
+ * the `Registry` object can't be sent between the isolates directly.
+ * Instead the [RegistryManager.commandPort] port can be sent and a
+ * `Registry` created from the command port using this constructor.
+ *
+ * The optional [timeout] parameter can be set to the duration
+ * this registry should wait before assuming that an operation
+ * has failed.
+ */
+ Registry.fromPort(SendPort commandPort,
+ {Duration timeout: const Duration(seconds: 5)})
+ : _commandPort = commandPort,
+ _timeout = timeout;
+
+ _RegistryCache get _cache {
+ _RegistryCache cache = _caches[this];
+ if (cache != null) return cache;
+ cache = new _RegistryCache();
+ _caches[this] = cache;
+ return cache;
+ }
+
+ /**
+ * Check and get the identity of an element.
+ *
+ * Throws if [element] is not an element in the registry.
+ */
+ int _getId(T element) {
+ int id = _cache.id(element);
+ if (id == null) {
+ throw new StateError("Not an element: ${Error.safeToString(element)}");
+ }
+ return id;
+ }
+
+ /**
+ * Adds [element] to the registry with the provided tags.
+ *
+ * Fails if [element] is already in this registry.
+ * An object is already in the registry if it has been added using [add],
+ * or if it was returned by a [lookup] call on this registry object.
+ *
+ * Returns a capability that can be used with [remove] to remove
+ * the element from the registry again.
+ *
+ * The [tags] can be used to distinguish some of the elements
+ * from other elements. Any object can be used as a tag, as long as
+ * it preserves equality when sent through a [SendPort].
+ * This makes [Capability] objects a good choice for tags.
+ */
+ Future<Capability> add(T element, {Iterable tags}) {
+ _RegistryCache cache = _cache;
+ if (cache.contains(element)) {
+ return new Future<Capability>.sync(() {
+ throw new StateError(
+ "Object already in registry: ${Error.safeToString(element)}");
+ });
+ }
+ Completer completer = new Completer<Capability>();
+ SendPort port = singleCompletePort(completer, callback: (List response) {
+ assert(cache.isAdding(element));
+ int id = response[0];
+ Capability removeCapability = response[1];
+ cache.register(id, element);
+ return removeCapability;
+ }, timeout: _timeout, onTimeout: () {
+ cache.stopAdding(element);
+ throw new TimeoutException("Future not completed", _timeout);
+ });
+ if (tags != null) tags = tags.toList(growable: false);
+ cache.setAdding(element);
+ _commandPort.send(list4(_ADD, element, tags, port));
+ return completer.future;
+ }
+
+ /**
+ * Remove the element from the registry.
+ *
+ * Returns `true` if removing the element succeeded, or `false` if the
+ * elements wasn't in the registry, or if it couldn't be removed.
+ *
+ * The [removeCapability] must be the same capability returned by [add]
+ * when the object was added. If the capability is wrong, the
+ * object is not removed, and this function returns false.
+ */
+ Future<bool> remove(T element, Capability removeCapability) {
+ int id = _cache.id(element);
+ if (id == null) {
+ return new Future<bool>.value(false);
+ }
+ Completer completer = new Completer<bool>();
+ SendPort port = singleCompletePort(completer, callback: (bool result) {
+ _cache.remove(id);
+ return result;
+ }, timeout: _timeout);
+ _commandPort.send(list4(_REMOVE, id, removeCapability, port));
+ return completer.future;
+ }
+
+ /**
+ * Add tags to an object in the registry.
+ *
+ * Each element of the registry has a number of tags associated with
+ * it. A tag is either associated with an element or not, adding it more
+ * than once does not make any difference.
+ *
+ * Tags are compared using [Object.==] equality.
+ *
+ * Fails if any of the elements are not in the registry.
+ */
+ Future addTags(Iterable<T> elements, Iterable tags) {
+ List ids = elements.map(_getId).toList(growable: false);
+ return _addTags(ids, tags);
+ }
+
+ /**
+ * Remove tags from an object in the registry.
+ *
+ * After this operation, the [elements] will not be associated to the [tags].
+ * It doesn't matter whether the elements were associated with the tags
+ * before or not.
+ *
+ * Fails if any of the elements are not in the registry.
+ */
+ Future removeTags(Iterable<T> elements, Iterable tags) {
+ List ids = elements.map(_getId).toList(growable: false);
+ tags = tags.toList(growable: false);
+ Completer completer = new Completer();
+ SendPort port = singleCompletePort(completer, timeout: _timeout);
+ _commandPort.send(list4(_REMOVE_TAGS, ids, tags, port));
+ return completer.future;
+ }
+
+ Future _addTags(List<int> ids, Iterable tags) {
+ tags = tags.toList(growable: false);
+ Completer completer = new Completer();
+ SendPort port = singleCompletePort(completer, timeout: _timeout);
+ _commandPort.send(list4(_ADD_TAGS, ids, tags, port));
+ return completer.future;
+ }
+
+ /**
+ * Finds a number of elements that have all the desired [tags].
+ *
+ * If [tags] is omitted or empty, any element of the registry can be
+ * returned.
+ *
+ * If [max] is specified, it must be greater than zero.
+ * In that case, at most the first `max` results are returned,
+ * in whatever order the registry finds its results.
+ * Otherwise all matching elements are returned.
+ */
+ Future<List<T>> lookup({Iterable tags, int max}) {
+ if (max != null && max < 1) {
+ throw new RangeError.range(max, 1, null, "max");
+ }
+ if (tags != null) tags = tags.toList(growable: false);
+ Completer completer = new Completer<List<T>>();
+ SendPort port = singleCompletePort(completer, callback: (List response) {
+ // Response is even-length list of (id, element) pairs.
+ _RegistryCache cache = _cache;
+ int count = response.length ~/ 2;
+ List result = new List(count);
+ for (int i = 0; i < count; i++) {
+ int id = response[i * 2];
+ var element = response[i * 2 + 1];
+ element = cache.register(id, element);
+ result[i] = element;
+ }
+ return result;
+ }, timeout: _timeout);
+ _commandPort.send(list4(_FIND, tags, max, port));
+ return completer.future;
+ }
+}
+
+/**
+ * Isolate-local cache used by a [Registry].
+ *
+ * Maps between id-numbers and elements.
+ * An object is considered an element of the registry if it
+ */
+class _RegistryCache {
+ // Temporary marker until an object gets an id.
+ static const int _BEING_ADDED = -1;
+
+ final Map<int, Object> id2object = new HashMap();
+ final Map<Object, int> object2id = new HashMap.identity();
+
+ int id(Object object) {
+ int result = object2id[object];
+ if (result == _BEING_ADDED) return null;
+ return result;
+ }
+
+ Object operator[](int id) => id2object[id];
+
+ // Register a pair of id/object in the cache.
+ // if the id is already in the cache, just return the existing
+ // object.
+ Object register(int id, Object object) {
+ object = id2object.putIfAbsent(id, () {
+ object2id[object] = id;
+ return object;
+ });
+ return object;
+ }
+
+ bool isAdding(element) => object2id[element] == _BEING_ADDED;
+
+ void setAdding(element) {
+ assert(!contains(element));
+ object2id[element] = _BEING_ADDED;
+ }
+
+ void stopAdding(element) {
+ assert(object2id[element] == _BEING_ADDED);
+ object2id.remove(element);
+ }
+
+ void remove(int id) {
+ var element = id2object.remove(id);
+ if (element != null) {
+ object2id.remove(element);
+ }
+ }
+
+ bool contains(element) => object2id.containsKey(element);
+}
+
+/**
+ * The central repository used by distributed [Registry] instances.
+ */
+class RegistryManager {
+ final Duration _timeout;
+ int _nextId = 0;
+ RawReceivePort _commandPort;
+
+ /**
+ * Maps id to entry. Each entry contains the id, the element, its tags,
+ * and a capability required to remove it again.
+ */
+ Map<int, _RegistryEntry> _entries = new HashMap();
+ Map<Object, Set<int>> _tag2id = new HashMap();
+
+ /**
+ * Create a new registry managed by the created [RegistryManager].
+ *
+ * The optional [timeout] parameter can be set to the duration
+ * registry objects should wait before assuming that an operation
+ * has failed.
+ */
+ RegistryManager({timeout: const Duration(seconds: 5)})
+ : _timeout = timeout,
+ _commandPort = new RawReceivePort() {
+ _commandPort.handler = _handleCommand;
+ }
+
+ /**
+ * The command port receiving commands for the registry manager.
+ *
+ * Use this port with [Registry.fromPort] to link a registry to the
+ * manager in isolates where you can't send a [Registry] object directly.
+ */
+ SendPort get commandPort => _commandPort.sendPort;
+
+ /**
+ * Get a registry backed by this manager.
+ *
+ * This registry can be sent to other isolates created using
+ * [Isolate.spawn].
+ */
+ Registry get registry => new Registry.fromPort(_commandPort.sendPort,
+ timeout: _timeout);
+
+ // Used as argument to putIfAbsent.
+ static Set _createSet() => new HashSet();
+
+ void _handleCommand(List command) {
+ switch(command[0]) {
+ case _ADD:
+ _add(command[1], command[2], command[3]);
+ return;
+ case _REMOVE:
+ _remove(command[1], command[2], command[3]);
+ return;
+ case _ADD_TAGS:
+ _addTags(command[1], command[2], command[3]);
+ return;
+ case _REMOVE_TAGS:
+ _removeTags(command[1], command[2], command[3]);
+ return;
+ case _GET_TAGS:
+ _getTags(command[1], command[2]);
+ return;
+ case _FIND:
+ _find(command[1], command[2], command[3]);
+ return;
+ default:
+ throw new UnsupportedError("Unknown command: ${command[0]}");
+ }
+ }
+
+ void _add(Object object, List tags, SendPort replyPort) {
+ int id = ++_nextId;
+ var entry = new _RegistryEntry(id, object);
+ _entries[id] = entry;
+ if (tags != null) {
+ for (var tag in tags) {
+ entry.tags.add(tag);
+ _tag2id.putIfAbsent(tag, _createSet).add(id);
+ }
+ }
+ replyPort.send(list2(id, entry.removeCapability));
+ }
+
+ void _remove(int id, Capability removeCapability, SendPort replyPort) {
+ _RegistryEntry entry = _entries[id];
+ if (entry == null || entry.removeCapability != removeCapability) {
+ replyPort.send(false);
+ return;
+ }
+ _entries.remove(id);
+ for (var tag in entry.tags) {
+ _tag2id[tag].remove(id);
+ }
+ replyPort.send(true);
+ }
+
+ void _addTags(List<int> ids, List tags, SendPort replyPort) {
+ assert(tags != null);
+ assert(tags.isNotEmpty);
+ for (int id in ids) {
+ _RegistryEntry entry = _entries[id];
+ if (entry == null) continue; // Entry was removed.
+ entry.tags.addAll(tags);
+ for (var tag in tags) {
+ Set ids = _tag2id.putIfAbsent(tag, _createSet);
+ ids.add(id);
+ }
+ }
+ replyPort.send(null);
+ }
+
+ void _removeTags(List<int> ids, List tags, SendPort replyPort) {
+ assert(tags != null);
+ assert(tags.isNotEmpty);
+ for (int id in ids) {
+ _RegistryEntry entry = _entries[id];
+ if (entry == null) continue; // Object was removed.
+ entry.tags.removeAll(tags);
+ }
+ for (var tag in tags) {
+ Set tagIds = _tag2id[tag];
+ if (tagIds == null) continue;
+ tagIds.removeAll(ids);
+ }
+ replyPort.send(null);
+ }
+
+ void _getTags(int id, SendPort replyPort) {
+ _RegistryEntry entry = _entries[id];
+ if (entry != null) {
+ replyPort.send(entry.tags.toList(growable: false));
+ } else {
+ replyPort.send(const []);
+ }
+ }
+
+ Iterable<int> _findTaggedIds(List tags) {
+ var matchingFirstTagIds = _tag2id[tags[0]];
+ if (matchingFirstTagIds == null) {
+ return const [];
+ }
+ if (matchingFirstTagIds.isEmpty || tags.length == 1) {
+ return matchingFirstTagIds;
+ }
+ // Create new set, then start removing ids not also matched
+ // by other tags.
+ Set<int> matchingIds = matchingFirstTagIds.toSet();
+ for (int i = 1; i < tags.length; i++) {
+ var tagIds = _tag2id[tags[i]];
+ if (tagIds == null) return const [];
+ matchingIds.retainAll(tagIds);
+ if (matchingIds.isEmpty) break;
+ }
+ return matchingIds;
+ }
+
+ void _find(List tags, int max, SendPort replyPort) {
+ assert(max == null || max > 0);
+ List result = [];
+ if (tags == null || tags.isEmpty) {
+ var entries = _entries.values;
+ if (max != null) entries = entries.take(max);
+ for (_RegistryEntry entry in entries) {
+ result.add(entry.id);
+ result.add(entry.element);
+ }
+ replyPort.send(result);
+ return;
+ }
+ var matchingIds = _findTaggedIds(tags);
+ if (max == null) max = matchingIds.length; // All results.
+ for (var id in matchingIds) {
+ result.add(id);
+ result.add(_entries[id].element);
+ max--;
+ if (max == 0) break;
+ }
+ replyPort.send(result);
+ }
+
+ /**
+ * Shut down the registry service.
+ *
+ * After this, all [Registry] operations will time out.
+ */
+ void close() {
+ _commandPort.close();
+ }
+}
+
+/** Entry in [RegistryManager]. */
+class _RegistryEntry {
+ final int id;
+ final Object element;
+ final Set tags = new HashSet();
+ final Capability removeCapability = new Capability();
+ _RegistryEntry(this.id, this.element);
+}
diff --git a/lib/runner.dart b/lib/runner.dart
new file mode 100644
index 0000000..cd26d5a
--- /dev/null
+++ b/lib/runner.dart
@@ -0,0 +1,61 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/**
+ * A [Runner] runs a function, potentially in a different scope
+ * or even isolate.
+ */
+library dart.pkg.isolate.runner;
+
+import "dart:async" show Future;
+
+/**
+ * Calls a function with an argument.
+ *
+ * The function can be run in a different place from where the `Runner`
+ * resides, e.g., in a different isolate.
+ */
+class Runner {
+ /**
+ * Request that [function] be called with the provided arguments.
+ *
+ * The arguments will be applied to the function in the same way as by
+ * [Function.apply], but it may happen in a diffent isolate or setting.
+ *
+ * It's necessary that the function can be sent through a [SendPort]
+ * if the call is performed in another isolate.
+ * That means the other isolate should be created using [Isolate.spawn]
+ * so that it is running the same code as the sending isoalte,
+ * and the function must be a static or top-level function.
+ *
+ * Waits for the result of the call, and completes the returned future
+ * with the result, whether it's a value or an error.
+ *
+ * If the returned future does not complete before `timeLimit` has passed,
+ * the [onTimeout] action is executed instead, and its result (whether it
+ * returns or throws) is used as the result of the returned future.
+ *
+ * If `onTimeout` is omitted, a timeout will cause the returned future to
+ * complete with a [TimeoutException].
+ *
+ * The default implementation runs the function in the current isolate.
+ */
+ Future run(function(argument), Object argument,
+ {Duration timeout, onTimeout()}) {
+ Future result = new Future.sync(() => function(argument));
+ if (timeout != null) {
+ result = result.timeout(timeout, onTimeout: onTimeout);
+ }
+ return result;
+ }
+
+ /**
+ * Stop the runner.
+ *
+ * If the runner has allocated resources, e.g., an isolate, it should
+ * be released. No further calls to [run] should be made after calling
+ * stop.
+ */
+ Future close() => new Future.value();
+}
diff --git a/lib/src/errors.dart b/lib/src/errors.dart
new file mode 100644
index 0000000..91abe5b
--- /dev/null
+++ b/lib/src/errors.dart
@@ -0,0 +1,188 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// TODO(lrn): This should be in package:async?
+/**
+ * Helper functions for working with errors.
+ *
+ * The [MultiError] class combines multiple errors into one object,
+ * and the [MultiError.wait] function works like [Future.wait] except
+ * that it returns all the errors.
+ */
+library pkg.isolate.errors;
+
+import "dart:async";
+
+class MultiError extends Error {
+ // Limits the number of lines included from each error's error message.
+ // A best-effort attempt is made at keeping below this number of lines
+ // in the output.
+ // If there are too many errors, they will all get at least one line.
+ static const int _MAX_LINES = 55;
+ // Minimum number of lines in the toString for each error.
+ static const int _MIN_LINES_PER_ERROR = 1;
+
+ /** The actual errors. */
+ final List errors;
+
+ /**
+ * Create a `MultiError` based on a list of errors.
+ *
+ * The errors represent errors of a number of individual operations.
+ *
+ * The list may contain `null` values, if the index of the error in the
+ * list is useful.
+ */
+ MultiError(this.errors);
+
+ /**
+ * Waits for all [futures] to complete, like [Future.wait].
+ *
+ * Where `Future.wait` only reports one error, even if multiple
+ * futures complete with errors, this function will complete
+ * with a [MultiError] if more than one future completes with an error.
+ *
+ * The order of values is not preserved (if that is needed, use
+ * [wait]).
+ */
+ static Future<List> waitUnordered(Iterable<Future> futures,
+ {cleanUp(successResult)}) {
+ Completer completer;
+ int count = 0;
+ int errors = 0;
+ int values = 0;
+ // Initilized to `new List(count)` when count is known.
+ // Filled up with values on the left, errors on the right.
+ // Order is not preserved.
+ List results;
+ void checkDone() {
+ if (errors + values < count) return;
+ if (errors == 0) {
+ completer.complete(results);
+ return;
+ }
+ var errorList = results.sublist(results.length - errors);
+ completer.completeError(new MultiError(errorList));
+ };
+ var handleValue = (v) {
+ // If this fails because [results] is null, there is a future
+ // which breaks the Future API by completing immediately when
+ // calling Future.then, probably by misusing a synchronous completer.
+ results[values++] = v;
+ if (errors > 0 && cleanUp != null) {
+ new Future.sync(() => cleanUp(v));
+ }
+ checkDone();
+ };
+ var handleError = (e, s) {
+ if (errors == 0 && cleanUp != null) {
+ for (int i = 0; i < values; i++) {
+ var value = results[i];
+ if (value != null) new Future.sync(() => cleanUp(value));
+ }
+ }
+ results[results.length - ++errors] = e;
+ checkDone();
+ };
+ for (Future future in futures) {
+ count++;
+ future.then(handleValue, onError: handleError);
+ }
+ if (count == 0) return new Future.value(new List(0));
+ results = new List(count);
+ completer = new Completer();
+ return completer.future;
+ }
+
+ /**
+ * Waits for all [futures] to complete, like [Future.wait].
+ *
+ * Where `Future.wait` only reports one error, even if multiple
+ * futures complete with errors, this function will complete
+ * with a [MultiError] if more than one future completes with an error.
+ *
+ * The order of values is preserved, and if any error occurs, the
+ * [MultiError.errors] list will have errors in the corresponding slots,
+ * and `null` for non-errors.
+ */
+ Future<List> wait(Iterable<Future> futures,
+ {cleanUp(successResult)}) {
+ Completer completer;
+ int count = 0;
+ bool hasError = false;
+ int completed = 0;
+ // Initalized to `new List(count)` when count is known.
+ // Filled with values until the first error, then cleared
+ // and filled with errors.
+ List results;
+ void checkDone() {
+ completed++;
+ if (completed < count) return;
+ if (!hasError) {
+ completer.complete(results);
+ return;
+ }
+ completer.completeError(new MultiError(results));
+ };
+ for (Future future in futures) {
+ int i = count;
+ count++;
+ future.then((v) {
+ if (!hasError) {
+ results[i] = v;
+ } else if (cleanUp != null) {
+ new Future.sync(() => cleanUp(v));
+ }
+ checkDone();
+ }, onError: (e, s) {
+ if (!hasError) {
+ if (cleanUp != null) {
+ for (int i = 0; i < results.length; i++) {
+ var result = results[i];
+ if (result != null) new Future.sync(() => cleanUp(result));
+ }
+ }
+ results.fillRange(0, results.length, null);
+ hasError = true;
+ }
+ results[i] = e;
+ checkDone();
+ });
+ }
+ if (count == 0) return new Future.value(new List(0));
+ results = new List(count);
+ completer = new Completer();
+ return completer.future;
+ }
+
+
+ String toString() {
+ StringBuffer buffer = new StringBuffer();
+ buffer.write("Multiple Errors:\n");
+ int linesPerError = _MAX_LINES ~/ errors.length;
+ if (linesPerError < _MIN_LINES_PER_ERROR) {
+ linesPerError = _MIN_LINES_PER_ERROR;
+ }
+
+ for (int index = 0; index < errors.length; index++) {
+ var error = errors[index];
+ if (error == null) continue;
+ String errorString = error.toString();
+ int end = 0;
+ for (int i = 0; i < linesPerError; i++) {
+ end = errorString.indexOf('\n', end) + 1;
+ if (end == 0) {
+ end = errorString.length;
+ break;
+ }
+ }
+ buffer.write("#$index: ");
+ buffer.write(errorString.substring(0, end));
+ if (end < errorString.length) {
+ buffer.write("...\n");
+ }
+ }
+ return buffer.toString();
+ }
+}
diff --git a/lib/src/functionref.dart b/lib/src/functionref.dart
new file mode 100644
index 0000000..d4c0e8e
--- /dev/null
+++ b/lib/src/functionref.dart
@@ -0,0 +1,80 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/**
+ * Convert top-level or static functions to objects that can be sent to
+ * other isolates.
+ *
+ * This package is only needed until such functions can be sent directly
+ * through send-ports.
+ */
+library pkg.isolate.functionref;
+
+import "dart:mirrors";
+
+abstract class FunctionRef {
+ static FunctionRef from(Function function) {
+ var cm = reflect(function);
+ if (cm is ClosureMirror) {
+ MethodMirror fm = cm.function;
+ if (fm.isRegularMethod && fm.isStatic) {
+ Symbol functionName = fm.simpleName;
+ Symbol className;
+ DeclarationMirror owner = fm.owner;
+ if (owner is ClassMirror) {
+ className = owner.simpleName;
+ owner = owner.owner;
+ }
+ if (owner is LibraryMirror) {
+ LibraryMirror ownerLibrary = owner;
+ Uri libraryUri = ownerLibrary.uri;
+ return new _FunctionRef(libraryUri, className, functionName);
+ }
+ }
+ throw new ArgumentError.value(function, "function",
+ "Not a static or top-level function");
+ }
+ // It's a Function but not a closure, so it's a callable object.
+ return new _CallableObjectRef(function);
+ }
+
+ Function get function;
+}
+
+class _FunctionRef implements FunctionRef {
+ final Uri libraryUri;
+ final Symbol className;
+ final Symbol functionName;
+
+ _FunctionRef(this.libraryUri, this.className, this.functionName);
+
+ Function get function {
+ LibraryMirror lm = currentMirrorSystem().libraries[libraryUri];
+ if (lm != null) {
+ ObjectMirror owner = lm;
+ if (className != null) {
+ ClassMirror cm = lm.declarations[className];
+ owner = cm;
+ }
+ if (owner != null) {
+ ClosureMirror function = owner.getField(this.functionName);
+ if (function != null) return function.reflectee;
+ }
+ }
+ String functionName = MirrorSystem.getName(this.functionName);
+ String classQualifier = "";
+ if (this.className != null) {
+ classQualifier = " in class ${MirrorSystem.getName(this.className)}";
+ }
+ throw new UnsupportedError(
+ "Function $functionName${classQualifier} not found in library $libraryUri"
+ );
+ }
+}
+
+class _CallableObjectRef implements FunctionRef {
+ final Function object;
+ _CallableObjectRef(this.object);
+ Function get function => object;
+}
diff --git a/lib/src/lists.dart b/lib/src/lists.dart
new file mode 100644
index 0000000..011ecf2
--- /dev/null
+++ b/lib/src/lists.dart
@@ -0,0 +1,31 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/** Utility functions to create fixed-length lists. */
+library pkg.isolate.util.lists;
+
+/** Create a single-element fixed-length list. */
+List list1(v1) => new List(1)..[0] = v1;
+
+/** Create a two-element fixed-length list. */
+List list2(v1, v2) => new List(2)..[0] = v1
+ ..[1] = v2;
+
+/** Create a three-element fixed-length list. */
+List list3(v1, v2, v3) => new List(3)..[0] = v1
+ ..[1] = v2
+ ..[2] = v3;
+
+/** Create a four-element fixed-length list. */
+List list4(v1, v2, v3, v4) => new List(4)..[0] = v1
+ ..[1] = v2
+ ..[2] = v3
+ ..[3] = v4;
+
+/** Create a five-element fixed-length list. */
+List list5(v1, v2, v3, v4, v5) => new List(5)..[0] = v1
+ ..[1] = v2
+ ..[2] = v3
+ ..[3] = v4
+ ..[4] = v5;
diff --git a/lib/src/multiplexport.dart b/lib/src/multiplexport.dart
new file mode 100644
index 0000000..d0c2f67
--- /dev/null
+++ b/lib/src/multiplexport.dart
@@ -0,0 +1,103 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/**
+ * A multiplexing [RawReceivePort].
+ *
+ * Allows creating a number of [RawReceivePort] implementations that all send
+ * messages through the same real `RawReceivePort`.
+ *
+ * This allows reducing the number of receive ports created, but adds an
+ * overhead to each message.
+ * If a library creates many short-lived receive ports, multiplexing might be
+ * faster.
+ *
+ * To use multiplexing receive ports, create and store a
+ * [RawReceivePortMultiplexer], and create receive ports by calling
+ * `multiplexer.createRawReceivePort(handler)` where you would otherwise
+ * write `new RawReceivePort(handler)`.
+ *
+ * Remember to [close] the multiplexer when it is no longer needed.
+ * `
+ * (TODO: Check if it really is faster - creating a receive port requires a
+ * global mutex, so it may be a bottleneck, but it's not clear how slow it is).
+ */
+library pkg.isolate.multiplexreceiveport;
+
+import "dart:isolate";
+import "dart:collection";
+import "lists.dart";
+
+class _MultiplexRawReceivePort implements RawReceivePort {
+ final RawReceivePortMultiplexer _multiplexer;
+ final int _id;
+ Function _handler;
+
+ _MultiplexRawReceivePort(this._multiplexer, this._id, this._handler);
+
+ void set handler(void handler(response)) {
+ this._handler = handler;
+ }
+
+ void close() {
+ _multiplexer._closePort(_id);
+ }
+
+ SendPort get sendPort => _multiplexer._createSendPort(_id);
+
+ void _invokeHandler(message) { _handler(message); }
+}
+
+class _MultiplexSendPort implements SendPort {
+ final SendPort _sendPort;
+ final int _id;
+ _MultiplexSendPort(this._id, this._sendPort);
+
+ void send(message) {
+ _sendPort.send(list2(_id, message));
+ }
+}
+
+/**
+ * A shared [RawReceivePort] that distributes messages to
+ * [RawReceivePort] instances that it manages.
+ */
+class RawReceivePortMultiplexer {
+ final RawReceivePort _port = new RawReceivePort();
+ final Map<int, _MultiplexRawReceivePort> _map = new HashMap();
+ int _nextId = 0;
+
+ RawReceivePortMultiplexer() {
+ _port.handler = _multiplexResponse;
+ }
+
+ RawReceivePort createRawReceivePort([void handler(value)]) {
+ int id = _nextId++;
+ var result = new _MultiplexRawReceivePort(this, id, handler);
+ _map[id] = result;
+ return result;
+ }
+
+ void close() {
+ _port.close();
+ }
+
+ void _multiplexResponse(list) {
+ int id = list[0];
+ var message = list[1];
+ _MultiplexRawReceivePort receivePort = _map[id];
+ // If the receive port is closed, messages are dropped, just as for
+ // the normal ReceivePort.
+ if (receivePort == null) return; // Port closed.
+ receivePort._invokeHandler(message);
+ }
+
+ SendPort _createSendPort(int id) {
+ return new _MultiplexSendPort(id, _port.sendPort);
+ }
+
+ void _closePort(int id) {
+ _map.remove(id);
+ }
+}
diff --git a/pubspec.yaml b/pubspec.yaml
new file mode 100644
index 0000000..8e2923e
--- /dev/null
+++ b/pubspec.yaml
@@ -0,0 +1,11 @@
+name: isolate
+version: 0.1.0
+author: Dart Team <misc@dartlang.org>
+description: Utility functions and classes related to the 'dart:isolate' library.
+homepage: https://github.com/dart-lang/isolate
+
+environment:
+ sdk: ">=1.8.0 <2.0.0"
+
+dev_dependencies:
+ unittest: ">=0.10.0 <0.12.0"
diff --git a/test/isolaterunner_test.dart b/test/isolaterunner_test.dart
new file mode 100644
index 0000000..4b98b58
--- /dev/null
+++ b/test/isolaterunner_test.dart
@@ -0,0 +1,114 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library dart.pkg.isolate.isolaterunner_test;
+
+import "package:isolate/isolaterunner.dart";
+import "package:unittest/unittest.dart";
+import "dart:async" show Future;
+import "dart:isolate" show Capability;
+
+const MS = const Duration(milliseconds: 1);
+
+void main() {
+ test("create-close", testCreateClose);
+ test("create-run-close", testCreateRunClose);
+ test("separate-isolates", testSeparateIsolates);
+ testIsolateFunctions();
+}
+
+Future testCreateClose() {
+ return IsolateRunner.spawn().then((IsolateRunner runner) {
+ return runner.close();
+ });
+}
+
+Future testCreateRunClose() {
+ return IsolateRunner.spawn().then((IsolateRunner runner) {
+ return runner.run(id, "testCreateRunClose").then((v) {
+ expect(v, "testCreateRunClose");
+ return runner.close().then((_) => runner.onExit);
+ });
+ });
+}
+
+Future testSeparateIsolates() {
+ // Check that each isolate has its own _global variable.
+ return Future.wait(new Iterable.generate(2, (_) => IsolateRunner.spawn()))
+ .then((runners) {
+ Future runAll(action(IsolateRunner runner, int index)) {
+ var indices = new Iterable.generate(runners.length);
+ return Future.wait(indices.map((i) => action(runners[i], i)));
+ }
+
+ return runAll((runner, i) => runner.run(setGlobal, i + 1))
+ .then((values) {
+ expect(values, [1, 2]);
+ expect(_global, null);
+ return runAll((runner, _) => runner.run(getGlobal, null));
+ })
+ .then((values) {
+ expect(values, [1, 2]);
+ expect(_global, null);
+ return runAll((runner, _) => runner.close());
+ });
+ });
+}
+
+void testIsolateFunctions() {
+ test("pause", () {
+ bool mayComplete = false;
+ return IsolateRunner.spawn().then((isolate) {
+ isolate.pause();
+ new Future.delayed(MS * 500, () {
+ mayComplete = true;
+ isolate.resume();
+ });
+ isolate.run(id, 42).then((v) {
+ expect(v, 42);
+ expect(mayComplete, isTrue);
+ }).whenComplete(isolate.close);
+ });
+ });
+ test("pause2", () {
+ Capability c1 = new Capability();
+ Capability c2 = new Capability();
+ int mayCompleteCount = 2;
+ return IsolateRunner.spawn().then((isolate) {
+ isolate.pause(c1);
+ isolate.pause(c2);
+ new Future.delayed(MS * 500, () {
+ mayCompleteCount--;
+ isolate.resume(c1);
+ });
+ new Future.delayed(MS * 500, () {
+ mayCompleteCount--;
+ isolate.resume(c2);
+ });
+ isolate.run(id, 42).then((v) {
+ expect(v, 42);
+ expect(mayCompleteCount, 0);
+ }).whenComplete(isolate.close);
+ });
+ });
+ test("ping", () {
+ return IsolateRunner.spawn().then((isolate) {
+ return isolate.ping().then((v) {
+ expect(v, isTrue);
+ return isolate.close();
+ });
+ });
+ });
+ test("kill", () {
+ return IsolateRunner.spawn().then((isolate) {
+ return isolate.kill();
+ });
+ });
+}
+
+id(x) => x;
+
+var _global;
+getGlobal(_) => _global;
+void setGlobal(v) => _global = v;
diff --git a/test/ports_test.dart b/test/ports_test.dart
new file mode 100644
index 0000000..9586b96
--- /dev/null
+++ b/test/ports_test.dart
@@ -0,0 +1,394 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library dart.pkg.isolate.isolaterunner_test;
+
+import "package:isolate/ports.dart";
+import "package:unittest/unittest.dart";
+import "dart:async";
+import "dart:isolate";
+
+const Duration MS = const Duration(milliseconds: 1);
+
+main() {
+ testSingleCallbackPort();
+ testSingleCompletePort();
+ testSingleResponseFuture();
+ testSingleResultFuture();
+}
+
+void testSingleCallbackPort() {
+ test("singleCallbackValue", () {
+ Completer completer = new Completer.sync();
+ SendPort p = singleCallbackPort(completer.complete);
+ p.send(42);
+ return completer.future.then((v) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleCallbackFirstValue", () {
+ Completer completer = new Completer.sync();
+ SendPort p = singleCallbackPort(completer.complete);
+ p.send(42);
+ p.send(37);
+ return completer.future.then((v) {
+ expect(v, 42);
+ });
+ });
+ test("singleCallbackValue", () {
+ Completer completer = new Completer.sync();
+ SendPort p = singleCallbackPort(completer.complete);
+ p.send(42);
+ return completer.future.then((v) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleCallbackValueBeforeTimeout", () {
+ Completer completer = new Completer.sync();
+ SendPort p = singleCallbackPort(completer.complete,
+ timeout: MS * 500);
+ p.send(42);
+ return completer.future.then((v) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleCallbackTimeout", () {
+ Completer completer = new Completer.sync();
+ singleCallbackPort(completer.complete,
+ timeout: MS * 100,
+ timeoutValue: 37);
+ return completer.future.then((v) {
+ expect(v, 37);
+ });
+ });
+
+ test("singleCallbackTimeoutFirst", () {
+ Completer completer = new Completer.sync();
+ SendPort p = singleCallbackPort(completer.complete,
+ timeout: MS * 100,
+ timeoutValue: 37);
+ new Timer(MS * 500, () => p.send(42));
+ return completer.future.then((v) {
+ expect(v, 37);
+ });
+ });
+}
+
+
+void testSingleCompletePort() {
+ test("singleCompleteValue", () {
+ Completer completer = new Completer.sync();
+ SendPort p = singleCompletePort(completer);
+ p.send(42);
+ return completer.future.then((v) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleCompleteValueCallback", () {
+ Completer completer = new Completer.sync();
+ SendPort p = singleCompletePort(completer,
+ callback: (v) {
+ expect(42, v);
+ return 87;
+ });
+ p.send(42);
+ return completer.future.then((v) {
+ expect(v, 87);
+ });
+ });
+
+ test("singleCompleteValueCallbackFuture", () {
+ Completer completer = new Completer.sync();
+ SendPort p = singleCompletePort(completer,
+ callback: (v) {
+ expect(42, v);
+ return new Future.delayed(MS * 500,
+ () => 88);
+ });
+ p.send(42);
+ return completer.future.then((v) {
+ expect(v, 88);
+ });
+ });
+
+ test("singleCompleteValueCallbackThrows", () {
+ Completer completer = new Completer.sync();
+ SendPort p = singleCompletePort(completer,
+ callback: (v) {
+ expect(42, v);
+ throw 89;
+ });
+ p.send(42);
+ return completer.future.then((v) {
+ fail("unreachable");
+ }, onError: (e, s) {
+ expect(e, 89);
+ });
+ });
+
+ test("singleCompleteValueCallbackThrowsFuture", () {
+ Completer completer = new Completer.sync();
+ SendPort p = singleCompletePort(completer,
+ callback: (v) {
+ expect(42, v);
+ return new Future.error(90);
+ });
+ p.send(42);
+ return completer.future.then((v) {
+ fail("unreachable");
+ }, onError: (e, s) {
+ expect(e, 90);
+ });
+ });
+
+ test("singleCompleteFirstValue", () {
+ Completer completer = new Completer.sync();
+ SendPort p = singleCompletePort(completer);
+ p.send(42);
+ p.send(37);
+ return completer.future.then((v) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleCompleteFirstValueCallback", () {
+ Completer completer = new Completer.sync();
+ SendPort p = singleCompletePort(completer, callback: (v) {
+ expect(v, 42);
+ return 87;
+ });
+ p.send(42);
+ p.send(37);
+ return completer.future.then((v) {
+ expect(v, 87);
+ });
+ });
+
+ test("singleCompleteValueBeforeTimeout", () {
+ Completer completer = new Completer.sync();
+ SendPort p = singleCompletePort(completer,
+ timeout: MS * 500);
+ p.send(42);
+ return completer.future.then((v) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleCompleteTimeout", () {
+ Completer completer = new Completer.sync();
+ singleCompletePort(completer,
+ timeout: MS * 100);
+ return completer.future.then((v) {
+ fail("unreachable");
+ }, onError: (e, s) {
+ expect(e is TimeoutException, isTrue);
+ });
+ });
+
+ test("singleCompleteTimeoutCallback", () {
+ Completer completer = new Completer.sync();
+ singleCompletePort(completer,
+ timeout: MS * 100,
+ onTimeout: () => 87);
+ return completer.future.then((v) {
+ expect(v, 87);
+ });
+ });
+
+ test("singleCompleteTimeoutCallbackThrows", () {
+ Completer completer = new Completer.sync();
+ singleCompletePort(completer,
+ timeout: MS * 100,
+ onTimeout: () => throw 91);
+ return completer.future.then((v) {
+ fail("unreachable");
+ }, onError: (e, s) {
+ expect(e, 91);
+ });
+ });
+
+ test("singleCompleteTimeoutCallbackFuture", () {
+ Completer completer = new Completer.sync();
+ singleCompletePort(completer,
+ timeout: MS * 100,
+ onTimeout: () => new Future.value(87));
+ return completer.future.then((v) {
+ expect(v, 87);
+ });
+ });
+
+ test("singleCompleteTimeoutCallbackThrowsFuture", () {
+ Completer completer = new Completer.sync();
+ singleCompletePort(completer,
+ timeout: MS * 100,
+ onTimeout: () => new Future.error(92));
+ return completer.future.then((v) {
+ fail("unreachable");
+ }, onError: (e, s) {
+ expect(e, 92);
+ });
+ });
+
+ test("singleCompleteTimeoutCallbackSLow", () {
+ Completer completer = new Completer.sync();
+ singleCompletePort(
+ completer,
+ timeout: MS * 100,
+ onTimeout: () => new Future.delayed(MS * 500, () => 87));
+ return completer.future.then((v) {
+ expect(v, 87);
+ });
+ });
+
+ test("singleCompleteTimeoutCallbackThrowsSlow", () {
+ Completer completer = new Completer.sync();
+ singleCompletePort(
+ completer,
+ timeout: MS * 100,
+ onTimeout: () => new Future.delayed(MS * 500, () => throw 87));
+ return completer.future.then((v) {
+ fail("unreachable");
+ }, onError: (e, s) {
+ expect(e, 87);
+ });
+ });
+
+ test("singleCompleteTimeoutFirst", () {
+ Completer completer = new Completer.sync();
+ SendPort p = singleCompletePort(completer,
+ timeout: MS * 100,
+ onTimeout: () => 37);
+ new Timer(MS * 500, () => p.send(42));
+ return completer.future.then((v) {
+ expect(v, 37);
+ });
+ });
+}
+
+void testSingleResponseFuture() {
+ test("singleResponseFutureValue", () {
+ return singleResponseFuture((SendPort p) {
+ p.send(42);
+ }).then((v) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleResponseFutureValueFirst", () {
+ return singleResponseFuture((SendPort p) {
+ p.send(42);
+ p.send(37);
+ }).then((v) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleResponseFutureError", () {
+ return singleResponseFuture((SendPort p) {
+ throw 93;
+ }).then((v) {
+ fail("unreachable");
+ }, onError: (e, s) {
+ expect(e, 93);
+ });
+ });
+
+ test("singleResponseFutureTimeout", () {
+ return singleResponseFuture((SendPort p) {
+ // no-op.
+ }, timeout: MS * 100).then((v) {
+ expect(v, null);
+ });
+ });
+
+ test("singleResponseFutureTimeoutValue", () {
+ return singleResponseFuture((SendPort p) {
+ // no-op.
+ }, timeout: MS * 100, timeoutValue: 42).then((v) {
+ expect(v, 42);
+ });
+ });
+}
+
+void testSingleResultFuture() {
+ test("singleResultFutureValue", () {
+ return singleResultFuture((SendPort p) {
+ sendFutureResult(new Future.value(42), p);
+ }).then((v) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleResultFutureValueFirst", () {
+ return singleResultFuture((SendPort p) {
+ sendFutureResult(new Future.value(42), p);
+ sendFutureResult(new Future.value(37), p);
+ }).then((v) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleResultFutureError", () {
+ return singleResultFuture((SendPort p) {
+ sendFutureResult(new Future.error(94), p);
+ }).then((v) {
+ fail("unreachable");
+ }, onError: (e, s) {
+ expect(e is RemoteError, isTrue);
+ });
+ });
+
+ test("singleResultFutureErrorFirst", () {
+ return singleResultFuture((SendPort p) {
+ sendFutureResult(new Future.error(95), p);
+ sendFutureResult(new Future.error(96), p);
+ }).then((v) {
+ fail("unreachable");
+ }, onError: (e, s) {
+ expect(e is RemoteError, isTrue);
+ });
+ });
+
+ test("singleResultFutureError", () {
+ return singleResultFuture((SendPort p) {
+ throw 93;
+ }).then((v) {
+ fail("unreachable");
+ }, onError: (e, s) {
+ expect(e is RemoteError, isTrue);
+ });
+ });
+
+ test("singleResultFutureTimeout", () {
+ return singleResultFuture((SendPort p) {
+ // no-op.
+ }, timeout: MS * 100).then((v) {
+ fail("unreachable");
+ }, onError: (e, s) {
+ expect(e is TimeoutException, isTrue);
+ });
+ });
+
+ test("singleResultFutureTimeoutValue", () {
+ return singleResultFuture((SendPort p) {
+ // no-op.
+ }, timeout: MS * 100, onTimeout: () => 42).then((v) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleResultFutureTimeoutError", () {
+ return singleResultFuture((SendPort p) {
+ // no-op.
+ }, timeout: MS * 100, onTimeout: () => throw 97).then((v) {
+ expect(v, 42);
+ }, onError: (e, s) {
+ expect(e, 97);
+ });
+ });
+}
diff --git a/test/registry_test.dart b/test/registry_test.dart
new file mode 100644
index 0000000..5e95522
--- /dev/null
+++ b/test/registry_test.dart
@@ -0,0 +1,539 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library dart.pkg.isolate.test.registry;
+
+import "package:isolate/isolaterunner.dart";
+import "package:isolate/registry.dart";
+import "dart:async";
+import "dart:isolate";
+
+import "package:unittest/unittest.dart";
+
+const MS = const Duration(milliseconds: 1);
+
+void main() {
+ testLookup();
+ testAddLookup();
+ testAddRemoveTags();
+ testRemove();
+ testCrossIsolate();
+ testTimeout();
+ testMultiRegistry();
+ testObjectsAndTags();
+}
+
+class Oddity {
+ static const int EVEN = 0;
+ static const int ODD = 1;
+}
+
+Future<List> waitAll(int n, Future action(int n)) {
+ return Future.wait(new Iterable.generate(n, action));
+}
+
+void testLookup() {
+
+ test("lookupAll", () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry = regman.registry;
+ return waitAll(10, (i) {
+ var element = new Element(i);
+ var tag = i.isEven ? Oddity.EVEN : Oddity.ODD;
+ return registry.add(element, tags: [tag]);
+ })
+ .then((_) => registry.lookup())
+ .then((all) {
+ expect(all.length, 10);
+ expect(all.map((v) => v.id).toList()..sort(),
+ [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
+ })
+ .then((_) { regman.close(); });
+ });
+
+ test("lookupOdd", () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry = regman.registry;
+ return waitAll(10, (i) {
+ var element = new Element(i);
+ var tag = i.isEven ? Oddity.EVEN : Oddity.ODD;
+ return registry.add(element, tags: [tag]);
+ })
+ .then((_) =>registry.lookup(tags:[Oddity.ODD]))
+ .then((all) {
+ expect(all.length, 5);
+ expect(all.map((v) => v.id).toList()..sort(),
+ [1, 3, 5, 7, 9]);
+ })
+ .then((_) { regman.close(); });
+ });
+
+ test("lookupMax", () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry = regman.registry;
+ return waitAll(10, (i) {
+ var element = new Element(i);
+ var tag = i.isEven ? Oddity.EVEN : Oddity.ODD;
+ return registry.add(element, tags: [tag]);
+ })
+ .then((_) => registry.lookup(max: 5))
+ .then((all) {
+ expect(all.length, 5);
+ })
+ .then((_) { regman.close(); });
+ });
+
+ test("lookupMultiTag", () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry = regman.registry;
+ return waitAll(25, (i) {
+ var element = new Element(i);
+ // Collect all numbers dividing i.
+ var tags = [i];
+ for (int j = 2; j < 25; j++) {
+ if (i % j == 0) tags.add(j);
+ }
+ return registry.add(element, tags: tags);
+ })
+ .then((_) => registry.lookup(tags: [2, 3]))
+ .then((all) {
+ expect(all.length, 5);
+ expect(all.map((v) => v.id).toList()..sort(),
+ [0, 6, 12, 18, 24]);
+ })
+ .then((_) { regman.close(); });
+ });
+
+ test("lookupMultiTagMax", () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry = regman.registry;
+ return waitAll(25, (i) {
+ var element = new Element(i);
+ // Collect all numbers dividing i.
+ var tags = [i];
+ for (int j = 2; j < 25; j++) {
+ if (i % j == 0) tags.add(j);
+ }
+ return registry.add(element, tags: tags);
+ })
+ .then((_) => registry.lookup(tags: [2, 3], max: 3))
+ .then((all) {
+ expect(all.length, 3);
+ expect(all.every((v) => (v.id % 6) == 0), isTrue);
+ })
+ .then((_) { regman.close(); });
+ });
+}
+
+void testAddLookup() {
+ test("Add-lookup-identical", () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry = regman.registry;
+ var object = new Object();
+ return registry.add(object).then((_) {
+ return registry.lookup().then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, same(object));
+ });
+ }).whenComplete(regman.close);
+ });
+
+ test("Add-multiple-identical", () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry = regman.registry;
+ var object1 = new Object();
+ var object2 = new Object();
+ var object3 = new Object();
+ var objects = [object1, object2, object3];
+ return Future.wait(objects.map(registry.add))
+ .then((_) {
+ return registry.lookup().then((entries) {
+ expect(entries, hasLength(3));
+ for (var entry in entries) {
+ expect(entry, isIn(objects));
+ }
+ });
+ }).whenComplete(regman.close);
+ });
+
+ test("Add-twice", () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry = regman.registry;
+ var object = new Object();
+ return registry.add(object).then((_) {
+ return registry.add(object).then((_) {
+ fail("Unreachable");
+ }, onError: (e, s) {
+ expect(e, isStateError);
+ });
+ }).whenComplete(regman.close);
+ });
+
+ test("Add-lookup-add-lookup", () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry = regman.registry;
+ var object = new Object();
+ var object2 = new Object();
+ return registry.add(object).then((_) {
+ return registry.lookup();
+ }).then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, same(object));
+ return registry.add(object2);
+ }).then((_) {
+ return registry.lookup().then((entries) {
+ expect(entries, hasLength(2));
+ var entry1 = entries.first;
+ var entry2 = entries.last;
+ if (object == entry1) {
+ expect(entry2, same(object2));
+ } else {
+ expect(entry1, same(object));
+ }
+ });
+ }).whenComplete(regman.close);
+ });
+
+ test("lookup-add-lookup", () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry = regman.registry;
+ var object = new Object();
+ return registry.lookup().then((entries) {
+ expect(entries, isEmpty);
+ return registry.add(object);
+ }).then((_) {
+ return registry.lookup();
+ }).then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, same(object));
+ }).whenComplete(regman.close);
+ });
+
+ test("Add-multiple-tags", () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry = regman.registry;
+ var object1 = new Object();
+ var object2 = new Object();
+ var object3 = new Object();
+ return registry.add(object1, tags: [1, 3, 5, 7]).then((_){
+ return registry.add(object2, tags: [2, 3, 6, 7]);
+ }).then((_) {
+ return registry.add(object3, tags: [4, 5, 6, 7]);
+ }).then((_) {
+ return registry.lookup(tags: [3]).then((entries) {
+ expect(entries, hasLength(2));
+ expect(entries.first == object1 || entries.last == object1, isTrue);
+ expect(entries.first == object2 || entries.last == object2, isTrue);
+ });
+ }).then((_) {
+ return registry.lookup(tags: [2]).then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, same(object2));
+ });
+ }).then((_) {
+ return registry.lookup(tags: [3, 6]).then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, same(object2));
+ });
+ }).whenComplete(regman.close);
+ });
+}
+
+void testRemove() {
+ test("Add-remove", () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry = regman.registry;
+ var object = new Object();
+ return registry.add(object).then((removeCapability) {
+ return registry.lookup().then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, same(object));
+ return registry.remove(object, removeCapability);
+ }).then((removeSuccess) {
+ expect(removeSuccess, isTrue);
+ return registry.lookup();
+ }).then((entries) {
+ expect(entries, isEmpty);
+ });
+ }).whenComplete(regman.close);
+ });
+
+ test("Add-remove-fail", () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry = regman.registry;
+ var object = new Object();
+ return registry.add(object).then((removeCapability) {
+ return registry.lookup().then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, same(object));
+ return registry.remove(object, new Capability());
+ }).then((removeSuccess) {
+ expect(removeSuccess, isFalse);
+ });
+ }).whenComplete(regman.close);
+ });
+}
+
+void testAddRemoveTags() {
+ test("Add-remove-tag", () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry = regman.registry;
+ var object = new Object();
+ return registry.add(object).then((removeCapability) {
+ return registry.lookup(tags: ["x"]);
+ }).then((entries) {
+ expect(entries, isEmpty);
+ return registry.addTags([object], ["x"]);
+ }).then((_) {
+ return registry.lookup(tags: ["x"]);
+ }).then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, same(object));
+ return registry.removeTags([object], ["x"]);
+ }).then((_) {
+ return registry.lookup(tags: ["x"]);
+ }).then((entries) {
+ expect(entries, isEmpty);
+ }).whenComplete(regman.close);
+ });
+
+ test("Tag-twice", () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry = regman.registry;
+ var object = new Object();
+ return registry.add(object, tags: ["x"]).then((removeCapability) {
+ return registry.lookup(tags: ["x"]);
+ }).then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, same(object));
+ // Adding the same tag twice is allowed, but does nothing.
+ return registry.addTags([object], ["x"]);
+ }).then((_) {
+ return registry.lookup(tags: ["x"]);
+ }).then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, same(object));
+ // Removing the tag once is enough to remove it.
+ return registry.removeTags([object], ["x"]);
+ }).then((_) {
+ return registry.lookup(tags: ["x"]);
+ }).then((entries) {
+ expect(entries, isEmpty);
+ }).whenComplete(regman.close);
+ });
+
+ test("Add-remove-multiple", () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry = regman.registry;
+ var object1 = new Object();
+ var object2 = new Object();
+ var object3 = new Object();
+ var objects = [object1, object2, object3];
+ return Future.wait(objects.map(registry.add)).then((_){
+ return registry.addTags([object1, object2], ["x", "y"]);
+ }).then((_) {
+ return registry.addTags([object1, object3], ["z", "w"]);
+ }).then((_) {
+ return registry.lookup(tags: ["x", "z"]);
+ }).then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, same(object1));
+ return registry.removeTags([object1, object2], ["x", "z"]);
+ }).then((_) {
+ return registry.lookup(tags: ["z"]);
+ }).then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, same(object3));
+ }).whenComplete(regman.close);
+ });
+
+ test("Remove-wrong-object", () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry = regman.registry;
+ expect(() => registry.removeTags([new Object()], ["x"]),
+ throws);
+ regman.close();
+ });
+}
+
+var _regmen = {};
+Registry createRegMan(id) {
+ var regman = new RegistryManager();
+ _regmen[id] = regman;
+ return regman.registry;
+}
+void closeRegMan(id) {
+ _regmen.remove(id).close();
+}
+
+void testCrossIsolate() {
+ var object = new Object();
+ test("regman-other-isolate", () {
+ // Add, lookup and remove object in other isolate.
+ return IsolateRunner.spawn().then((isolate) {
+ isolate.run(createRegMan, 1).then((registry) {
+ return registry.add(object, tags: ["a", "b"])
+ .then((removeCapability) {
+ return registry.lookup(tags: ["a"]).then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, same(object));
+ return registry.remove(entries.first, removeCapability);
+ }).then((removeSuccess) {
+ expect(removeSuccess, isTrue);
+ });
+ });
+ }).whenComplete(() {
+ return isolate.run(closeRegMan, 1);
+ }).whenComplete(() {
+ return isolate.close();
+ });
+ });
+ });
+}
+
+void testTimeout() {
+ test("Timeout-add", () {
+ RegistryManager regman = new RegistryManager(timeout: MS * 500);
+ Registry registry = regman.registry;
+ regman.close();
+ return registry.add(new Object()).then((_) {
+ fail("unreachable");
+ }, onError: (e, s) {
+ expect(e is TimeoutException, isTrue);
+ });
+ });
+
+ test("Timeout-remove", () {
+ RegistryManager regman = new RegistryManager(timeout: MS * 500);
+ Registry registry = regman.registry;
+ var object = new Object();
+ return registry.add(object).then((rc) {
+ regman.close();
+ return registry.remove(object, rc).then((_) {
+ fail("unreachable");
+ }, onError: (e, s) {
+ expect(e is TimeoutException, isTrue);
+ });
+ });
+ });
+
+ test("Timeout-addTags", () {
+ RegistryManager regman = new RegistryManager(timeout: MS * 500);
+ Registry registry = regman.registry;
+ var object = new Object();
+ return registry.add(object).then((rc) {
+ regman.close();
+ return registry.addTags([object], ["x"]).then((_) {
+ fail("unreachable");
+ }, onError: (e, s) {
+ expect(e is TimeoutException, isTrue);
+ });
+ });
+ });
+
+ test("Timeout-removeTags", () {
+ RegistryManager regman = new RegistryManager(timeout: MS * 500);
+ Registry registry = regman.registry;
+ var object = new Object();
+ return registry.add(object).then((rc) {
+ regman.close();
+ return registry.removeTags([object], ["x"]).then((_) {
+ fail("unreachable");
+ }, onError: (e, s) {
+ expect(e is TimeoutException, isTrue);
+ });
+ });
+ });
+
+ test("Timeout-lookup", () {
+ RegistryManager regman = new RegistryManager(timeout: MS * 500);
+ Registry registry = regman.registry;
+ regman.close();
+ registry.lookup().then((_) {
+ fail("unreachable");
+ }, onError: (e, s) {
+ expect(e is TimeoutException, isTrue);
+ });
+ });
+}
+
+void testMultiRegistry() {
+ test("dual-registyr", () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry1 = regman.registry;
+ Registry registry2 = regman.registry;
+ var l1 = ["x"];
+ var l2;
+ return registry1.add(l1, tags: ["y"]).then((removeCapability) {
+ return registry2.lookup().then((entries) {
+ expect(entries, hasLength(1));
+ l2 = entries.first;
+ expect(l2, equals(l1));
+ // The object for registry2 is not idential the one for registry1.
+ expect(!identical(l1, l2), isTrue);
+ // Removeing the registry1 object through registry2 doesn't work.
+ return registry2.remove(l1, removeCapability);
+ }).then((removeSuccess) {
+ expect(removeSuccess, isFalse);
+ return registry2.remove(l2, removeCapability);
+ }).then((removeSuccess) {
+ expect(removeSuccess, isTrue);
+ return registry1.lookup();
+ }).then((entries) {
+ expect(entries, isEmpty);
+ });
+ }).whenComplete(regman.close);
+ });
+}
+
+void testObjectsAndTags() {
+ testObject(object) {
+ String name = "Transfer-${object.runtimeType}";
+ test(name, () {
+ RegistryManager regman = new RegistryManager();
+ Registry registry1 = regman.registry;
+ Registry registry2 = regman.registry;
+ return registry1.add(object, tags: [object]).then((removeCapability) {
+ return registry2.lookup().then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, equals(object));
+ return registry2.lookup(tags: [object]);
+ }).then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, equals(object));
+ return registry2.removeTags([entries.first], [object]);
+ }).then((_) {
+ return registry2.lookup();
+ }).then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, equals(object));
+ return registry2.remove(entries.first, removeCapability);
+ }).then((removeSuccess) {
+ expect(removeSuccess, isTrue);
+ return registry2.lookup();
+ }).then((entries) {
+ expect(entries, isEmpty);
+ });
+ }).whenComplete(regman.close);
+ });
+ }
+ // Test objects that are sendable between equivalent isolates and
+ // that has an operator== that works after cloning (for use as tags).
+ testObject(42);
+ testObject(3.14);
+ testObject("string");
+ testObject(true);
+ testObject(null);
+ testObject(new Element(42));
+ testObject(#symbol);
+ testObject(#_privateSymbol);
+ testObject(new Capability());
+}
+
+class Element {
+ final int id;
+ Element(this.id);
+ int get hashCode => id;
+ bool operator==(Object other) => other is Element && id == other.id;
+}