Add IsolateRunner as a helper around Isolate.

Add single-message port helpers and a load balancer.

R=sethladd@google.com

Review URL: https://codereview.chromium.org//928663003
diff --git a/.status b/.status
new file mode 100644
index 0000000..a69ed9e
--- /dev/null
+++ b/.status
@@ -0,0 +1,11 @@
+# Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+# for details. All rights reserved. Use of this source code is governed by a
+# BSD-style license that can be found in the LICENSE file.
+
+[ $runtime == vm ]
+test/isolaterunner_test: RuntimeError  # addOnExitListener not implemented.
+
+[ $compiler == dart2js ]
+test/registry_test: CompileTimeError  # Unimplemented: private symbol literals.
+# FunctionRef will be removed when the VM supports sending functions.
+test/isolaterunner_test: RuntimeError  # Mirrors not working - FunctionRef broken.
\ No newline at end of file
diff --git a/AUTHORS b/AUTHORS
new file mode 100644
index 0000000..e8063a8
--- /dev/null
+++ b/AUTHORS
@@ -0,0 +1,6 @@
+# Below is a list of people and organizations that have contributed
+# to the project. Names should be added to the list like so:
+#
+#   Name/Organization <email address>
+
+Google Inc.
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..372014e
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,8 @@
+# Changelog
+
+## 0.1.0
+
+- Initial version
+  Adds IsolateRunner as a helper around Isolate.
+  Adds single-message port helpers and a load balancer.
+
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
new file mode 100644
index 0000000..6f5e0ea
--- /dev/null
+++ b/CONTRIBUTING.md
@@ -0,0 +1,33 @@
+Want to contribute? Great! First, read this page (including the small print at
+the end).
+
+### Before you contribute
+Before we can use your code, you must sign the
+[Google Individual Contributor License Agreement](https://cla.developers.google.com/about/google-individual)
+(CLA), which you can do online. The CLA is necessary mainly because you own the
+copyright to your changes, even after your contribution becomes part of our
+codebase, so we need your permission to use and distribute your code. We also
+need to be sure of various other things—for instance that you'll tell us if you
+know that your code infringes on other people's patents. You don't have to sign
+the CLA until after you've submitted your code for review and a member has
+approved it, but you must do it before we can put your code into our codebase.
+
+Before you start working on a larger contribution, you should get in touch with
+us first through the issue tracker with your idea so that we can help out and
+possibly guide you. Coordinating up front makes it much easier to avoid
+frustration later on.
+
+### Code reviews
+All submissions, including submissions by project members, require review.
+
+### File headers
+All files in the project must start with the following header.
+
+    // Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+    // for details. All rights reserved. Use of this source code is governed by a
+    // BSD-style license that can be found in the LICENSE file.
+
+### The small print
+Contributions made by corporations are covered by a different agreement than the
+one above, the
+[Software Grant and Corporate Contributor License Agreement](https://developers.google.com/open-source/cla/corporate).
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..de31e1a
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,26 @@
+Copyright 2015, the Dart project authors. All rights reserved.
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+    * Redistributions of source code must retain the above copyright
+      notice, this list of conditions and the following disclaimer.
+    * Redistributions in binary form must reproduce the above
+      copyright notice, this list of conditions and the following
+      disclaimer in the documentation and/or other materials provided
+      with the distribution.
+    * Neither the name of Google Inc. nor the names of its
+      contributors may be used to endorse or promote products derived
+      from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..e02d3b7
--- /dev/null
+++ b/README.md
@@ -0,0 +1,34 @@
+# isolate
+
+The `isolate` package helps with isolates and isolate communication.
+
+The package contains individual libraries with different purposes.
+
+### Creating send ports and responding to messages.
+
+The "ports.dart" sub-library contains functionality
+for creating `SendPort`s and reacting to values sent to those ports.
+
+### Working with isolates and running functions in other isolates.
+
+The "isolaterunner.dart" sub-library introduces an `IsolateRunner` class
+that gives easy access to the `Isolate` functionality, and also
+gives a way to run new functions in the isolate repeatedly, instead of
+just on the initial `spawn` call.
+
+### A central registry for values that can be used accross isolates.
+
+The "registry.dart" sub-library provides a way to create an
+object registry, and give access to it accross different isolates.
+
+### Balancing load accross several isolates.
+
+The "loadbalancer.dart" sub-library can manage multiple `Runner` objects,
+including `IsolateRunner`, and run functions on the currently least loaded
+runner.
+
+## Features and bugs
+
+Please file feature requests and bugs at the [issue tracker][tracker].
+
+[tracker]: https://github.com/dart-lang/isolate/issues
diff --git a/example/http-server.dart b/example/http-server.dart
new file mode 100644
index 0000000..fc3b442
--- /dev/null
+++ b/example/http-server.dart
@@ -0,0 +1,119 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library dart.pkg.isolate.sample.httpserver;
+
+import "dart:io";
+import "dart:async";
+import "dart:isolate";
+import "package:isolate/isolaterunner.dart";
+import "package:isolate/runner.dart";
+import "package:isolate/ports.dart";
+
+typedef Future RemoteStop();
+
+Future<RemoteStop> runHttpServer(
+    Runner runner, ServerSocket socket, HttpListener listener) {
+  return runner.run(_startHttpServer, new List(2)..[0] = socket.reference
+                                                 ..[1] = listener)
+               .then((SendPort stopPort) => () => _sendStop(stopPort));
+}
+
+Future _sendStop(SendPort stopPort) {
+  return singleResponseFuture(stopPort.send);
+}
+
+Future<SendPort> _startHttpServer(List args) {
+  ServerSocketReference ref = args[0];
+  HttpListener listener = args[1];
+  return ref.create().then((socket) {
+    return listener.start(new HttpServer.listenOn(socket));
+  }).then((_) {
+    return singleCallbackPort((SendPort resultPort) {
+      sendFutureResult(new Future.sync(listener.stop), resultPort);
+    });
+  });
+}
+
+/**
+ * An [HttpRequest] handler setup. Gets called when with the server, and
+ * is told when to stop listening.
+ *
+ * These callbacks allow the listener to set up handlers for HTTP requests.
+ * The object should be sendable to an equivalent isolate.
+ */
+abstract class HttpListener {
+  Future start(HttpServer server);
+  Future stop();
+}
+
+/**
+ * An [HttpListener] that sets itself up as an echo server.
+ *
+ * Returns the message content plus an ID describing the isolate that
+ * handled the request.
+ */
+class EchoHttpListener implements HttpListener {
+  StreamSubscription _subscription;
+  static int _id = new Object().hashCode;
+  SendPort _counter;
+
+  EchoHttpListener(this._counter);
+
+  start(HttpServer server) {
+    print("Starting isolate $_id");
+    _subscription = server.listen((HttpRequest request) {
+      request.response.addStream(request).then((_) {
+        _counter.send(null);
+        print("Request to $_id");
+        request.response.write("#$_id\n");
+        var t0 = new DateTime.now().add(new Duration(seconds:2));
+        while (new DateTime.now().isBefore(t0));
+        print("Response from $_id");
+        request.response.close();
+      });
+    });
+  }
+
+  stop() {
+    print("Stopping isolate $_id");
+    _subscription.cancel();
+    _subscription = null;
+  }
+}
+
+main(args) {
+  int port = 0;
+  if (args.length > 0) {
+    port = int.parse(args[0]);
+  }
+  RawReceivePort counter = new RawReceivePort();
+  HttpListener listener = new EchoHttpListener(counter.sendPort);
+  ServerSocket
+    .bind(InternetAddress.ANY_IP_V6, port)
+    .then((ServerSocket socket) {
+      port = socket.port;
+      return Future.wait(new Iterable.generate(5, (_) => IsolateRunner.spawn()),
+                         cleanUp: (isolate) { isolate.close(); })
+                   .then((List<IsolateRunner> isolates) {
+                     return Future.wait(isolates.map((IsolateRunner isolate) {
+                       return runHttpServer(isolate, socket, listener);
+                     }), cleanUp: (server) { server.stop(); });
+                   })
+                   .then((stoppers) {
+                     socket.close();
+                     int count = 25;
+                     counter.handler = (_) {
+                       count--;
+                       if (count == 0) {
+                         stoppers.forEach((f) => f());
+                         counter.close();
+                       }
+                     };
+                     print("Server listening on port $port for 25 requests");
+                     print("Test with:");
+                     print("  ab -c10 -n 25  http://localhost:$port/");
+                   });
+  });
+}
diff --git a/example/runner-pool.dart b/example/runner-pool.dart
new file mode 100644
index 0000000..e81e496
--- /dev/null
+++ b/example/runner-pool.dart
@@ -0,0 +1,58 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library dart.pkg.isolate.sample.runners;
+
+import "package:isolate/loadbalancer.dart";
+import "package:isolate/isolaterunner.dart";
+import "dart:async" show Future, Completer;
+
+
+void main() {
+  int N = 44;
+  var sw = new Stopwatch()..start();
+  // Compute fib up to 42 with 4 isolates.
+  parfib(N, 4).then((v1) {
+    var t1 = sw.elapsedMilliseconds;
+    sw.stop();
+    sw.reset();
+    print("fib#4(${N}) = ${v1[N]}, ms: $t1");
+    sw.start();
+    // Then compute fib up to 42 with 2 isolates.
+    parfib(N, 2).then((v2) {
+      var t2 = sw.elapsedMilliseconds;
+      sw.stop();
+      print("fib#2(${N}) = ${v2[N]}, ms: $t2");
+    });
+  });
+}
+
+// Compute fibonnacci 1..limit
+Future<List<int>> parfib(int limit, int parallelity) {
+  return LoadBalancer.create(parallelity, IsolateRunner.spawn).then(
+    (LoadBalancer pool) {
+      List<Future> fibs = new List(limit + 1);
+      // Schedule all calls with exact load value and the heaviest task
+      // assigned first.
+      schedule(a, b, i) {
+        if (i < limit) {
+          schedule(a + b, a, i + 1);
+        }
+        fibs[i] = pool.run(fib, i, load: a);
+      }
+      schedule(0, 1, 0);
+      // And wait for them all to complete.
+      return Future.wait(fibs).whenComplete(pool.close);
+    });
+}
+
+int computeFib(n) {
+  int result = fib(n);
+  return result;
+}
+
+int fib(n) {
+  if (n < 2) return n;
+  return fib(n - 1) + fib(n - 2);
+}
diff --git a/lib/isolate.dart b/lib/isolate.dart
new file mode 100644
index 0000000..9e838b8
--- /dev/null
+++ b/lib/isolate.dart
@@ -0,0 +1,14 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/**
+ * Utilities for working with isolates and isolate communication.
+ */
+library dart.pkg.isolate;
+
+export "isolaterunner.dart";
+export "loadbalancer.dart";
+export "ports.dart";
+export "registry.dart";
+export "runner.dart";
diff --git a/lib/isolaterunner.dart b/lib/isolaterunner.dart
new file mode 100644
index 0000000..7e15a40
--- /dev/null
+++ b/lib/isolaterunner.dart
@@ -0,0 +1,320 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library dart.pkg.isolate.isolaterunner;
+
+import "dart:isolate";
+import "dart:async";
+import "runner.dart";
+import "ports.dart";
+import "src/functionref.dart";
+import "src/lists.dart";
+
+// Command tags. Shared between IsolateRunner and IsolateRunnerRemote.
+const int _SHUTDOWN = 0;
+const int _RUN = 1;
+
+/**
+ * An easier to use interface on top of an [Isolate].
+ *
+ * Wraps an `Isolate` and allows pausing, killing and inspecting
+ * the isolate more conveniently than the raw `Isolate` methods.
+ *
+ * Also allows running simple functions in the other isolate, and get back
+ * the result.
+ */
+class IsolateRunner implements Runner {
+  /** The underlying [Isolate] object of the isolate being controlled. */
+  final Isolate isolate;
+
+  /** Command port for the [IsolateRunnerRemote]. */
+  final SendPort _commandPort;
+
+  /** Future returned by [onExit]. Set when [onExit] is first read. */
+  Future _onExitFuture;
+
+  /**
+   * Create an [IsolateRunner] wrapper for [isolate]
+   *
+   * The preferred way to create an `IsolateRunner` is to use [spawn]
+   * to create a new isolate and a runner for it.
+   *
+   * This constructor allows creating a runner for an already existing
+   * isolate.
+   * The [commandPort] must be the [IsolateRunnerRemote.commandPort] of
+   * a remote running in that isolate.
+   */
+  IsolateRunner(this.isolate, SendPort commandPort)
+      : _commandPort = commandPort;
+
+  /**
+   * Create a new [Isolate], as by [Isolate.spawn] and wrap that.
+   *
+   * The returned [IsolateRunner] forwards operations to the new isolate,
+   * and keeps a port open in the new isolate that receives commands
+   * from the `IsolateRunner`. Remember to [close] the `IsolateRunner` when
+   * it's no longer needed.
+   */
+  static Future<IsolateRunner> spawn() {
+    Completer portCompleter = new Completer.sync();
+    SendPort initPort = singleCompletePort(portCompleter);
+    return Isolate.spawn(IsolateRunnerRemote._create, initPort)
+                  .then((Isolate isolate) {
+                    // TODO: Add when VM supports it.
+                    // isolate.setErrorsFatal(false);
+                    return portCompleter.future.then((SendPort commandPort) {
+                      var result = new IsolateRunner(isolate, commandPort);
+                      // Guarantees that setErrorsFatal has completed.
+                      return result.ping().then((_) => result);
+                    });
+                  });
+  }
+
+  /**
+   * Closes the `IsolateRunner` communication down.
+   *
+   * If the isolate isn't running something else to keep it alive,
+   * this will also make the isolate shut down.
+   *
+   * Can be used to create an isolate, use [run] to start a service, and
+   * then drop the connection and let the service control the isolate's
+   * life cycle.
+   */
+  Future close() {
+    Completer portCompleter = new Completer.sync();
+    SendPort closePort = singleCallbackPort(portCompleter.complete);
+    _commandPort.send(list2(_SHUTDOWN, closePort));
+    return portCompleter.future;
+  }
+
+  /**
+   * Kills the isolate.
+   *
+   * Starts by calling [close], but if that doesn't cause the isolate to
+   * shut down in a timely manner, as given by [timeout], it follows up
+   * with [Isolate.kill], with increasing urgency if necessary.
+   *
+   * If [timeout] is a zero duration, it goes directly to the most urgent
+   * kill.
+   *
+   * If the isolate is already dead, the returned future will not complete.
+   * If that may be the case, use [Future.timeout] on the returned future
+   * to take extra action after a while. Example:
+   *
+   *     var f = isolate.kill();
+   *     f.then((_) => print('Dead')
+   *      .timeout(new Duration(...), onTimeout: () => print('No response'));
+   */
+  Future kill({Duration timeout: const Duration(seconds: 1)}) {
+    Future onExit = singleResponseFuture(isolate.addOnExitListener);
+    if (Duration.ZERO == timeout) {
+      isolate.kill(Isolate.IMMEDIATE);
+      return onExit;
+    } else {
+      // Try a more gentle shutdown sequence.
+      _commandPort.send(list1(_SHUTDOWN));
+      return onExit.timeout(timeout, onTimeout: () {
+        isolate.kill(Isolate.IMMEDIATE);
+        return onExit;
+      });
+    }
+  }
+
+  /**
+   * Queries the isolate on whether it's alive.
+   *
+   * If the isolate is alive and responding to commands, the
+   * returned future completes with `true`.
+   *
+   * If the other isolate is not alive (like after calling [kill]),
+   * or doesn't answer within [timeout] for any other reason,
+   * the returned future completes with `false`.
+   *
+   * Guaranteed to only complete after all previous sent isolate commands
+   * (like pause and resume) have been handled.
+   * Paused isolates do respond to ping requests.
+   */
+  Future<bool> ping({Duration timeout: const Duration(seconds: 1)}) {
+    Completer completer = new Completer<bool>();
+    SendPort port = singleCompletePort(completer,
+                                       callback: _kTrue,
+                                       timeout: timeout,
+                                       onTimeout: _kFalse);
+    isolate.ping(port);
+    return completer.future;
+  }
+
+  static bool _kTrue(_) => true;
+  static bool _kFalse() => false;
+
+  /**
+   * Pauses the isolate.
+   *
+   * While paused, no normal messages are processed, and calls to [run] will
+   * be delayed until the isolate is resumed.
+   *
+   * Commands like [kill] and [ping] are still executed while the isolate is
+   * paused.
+   *
+   * If [resumeCapability] is omitted, it defaults to the [isolate]'s
+   * [Isolate.pauseCapability].
+   *
+   * Calling pause more than once with the same `resumeCapability`
+   * has no further effect. Only a single call to [resume] is needed
+   * to resume the isolate.
+   */
+  void pause([Capability resumeCapability]) {
+    if (resumeCapability == null) resumeCapability = isolate.pauseCapability;
+    isolate.pause(resumeCapability);
+  }
+
+  /**
+   * Resumes after a [pause].
+   *
+   * If [resumeCapability] is omitted, it defaults to the isolate's
+   * [Isolate.pauseCapability].
+   *
+   * Even if `pause` has been called more than once with the same
+   * `resumeCapability`, a single resume call with stop the pause.
+   */
+  void resume([Capability resumeCapability]) {
+    if (resumeCapability == null) resumeCapability = isolate.pauseCapability;
+    isolate.resume(resumeCapability);
+  }
+
+  /**
+   * Execute `function(argument)` in the isolate and return the result.
+   *
+   * Sends [function] and [argument] to the isolate, runs the call, and
+   * returns the result, whether it returned a value or threw.
+   * If the call returns a [Future], the final result of that future
+   * will be returned.
+   *
+   * This works similar to the arguments to [Isolate.spawn], except that
+   * it runs in the existing isolate and the return value is returned to
+   * the caller.
+   *
+   * Example:
+   *
+   *     IsolateRunner iso = await IsolateRunner.spawn();
+   *     try {
+   *       return await iso.run(heavyComputation, argument);
+   *     } finally {
+   *       await iso.close();
+   *     }
+   */
+  Future run(function(argument), argument,
+             {Duration timeout, onTimeout()}) {
+    return singleResultFuture((SendPort port) {
+      _commandPort.send(
+          list4(_RUN, FunctionRef.from(function), argument, port));
+    }, timeout: timeout, onTimeout: onTimeout);
+  }
+
+  /**
+   * A broadcast stream of uncaught errors from the isolate.
+   *
+   * When listening on the stream, errors from the isolate will be reported
+   * as errors in the stream. Be ready to handle the errors.
+   *
+   * The stream closes when the isolate shuts down.
+   */
+  Stream get errors {
+    StreamController controller;
+    RawReceivePort port;
+    void handleError(message) {
+      if (message == null) {
+        // Isolate shutdown.
+        port.close();
+        controller.close();
+      } else {
+        // Uncaught error.
+        String errorDescription = message[0];
+        String stackDescription = message[1];
+        var error = new RemoteError(errorDescription, stackDescription);
+        controller.addError(error, error.stackTrace);
+      }
+    }
+    controller = new StreamController.broadcast(
+        sync: true,
+        onListen: () {
+          port = new RawReceivePort(handleError);
+          // TODO: When supported, uncomment this.
+          // isolate.addErrorListener(port.sendPort);
+          // isolate.addOnExitListener(port.sendPort);
+          // And remove the send below, which acts as an immediate close.
+          port.sendPort.send(null);
+        },
+        onCancel: () {
+          port.close();
+          // this.removeErrorListener(port.sendPort);
+          // this.removeOnExitListener(port.sendPort);
+          port = null;
+        });
+    return controller.stream;
+  }
+
+  /**
+   * Waits for the [isolate] to terminate.
+   *
+   * Completes the returned future when the isolate terminates.
+   *
+   * If the isolate has already stopped responding to commands,
+   * the returned future will never terminate.
+   */
+  Future get onExit {
+    // TODO(lrn): Is there a way to see if an isolate is dead
+    // so we can close the receive port for this future?
+    if (_onExitFuture == null) {
+      _onExitFuture = singleResponseFuture(isolate.addOnExitListener);
+    }
+    return _onExitFuture;
+  }
+}
+
+/**
+ * The remote part of an [IsolateRunner].
+ *
+ * The `IsolateRunner` sends commands to the controlled isolate through
+ * the `IsolateRunnerRemote` [commandPort].
+ *
+ * Only use this class if you need to set up the isolate manually
+ * instead of relying on [IsolateRunner.spawn].
+ */
+class IsolateRunnerRemote {
+  final RawReceivePort _commandPort = new RawReceivePort();
+  IsolateRunnerRemote() {
+    _commandPort.handler = _handleCommand;
+  }
+
+  /**
+   * The command port that can be used to send commands to this remote.
+   *
+   * Use this as argument to [new IsolateRunner] if creating the link
+   * manually, otherwise it's handled by [IsolateRunner.spawn].
+   */
+  SendPort get commandPort => _commandPort.sendPort;
+
+  static void _create(SendPort initPort) {
+    var remote = new IsolateRunnerRemote();
+    initPort.send(remote.commandPort);
+  }
+
+  void _handleCommand(List command) {
+    switch (command[0]) {
+      case _SHUTDOWN:
+        SendPort responsePort = command[1];
+        _commandPort.close();
+        responsePort.send(null);
+        return;
+      case _RUN:
+        Function function = command[1].function;
+        var argument = command[2];
+        SendPort responsePort = command[3];
+        sendFutureResult(new Future.sync(() => function(argument)),
+                         responsePort);
+        return;
+    }
+  }
+}
diff --git a/lib/loadbalancer.dart b/lib/loadbalancer.dart
new file mode 100644
index 0000000..fdb5d0b
--- /dev/null
+++ b/lib/loadbalancer.dart
@@ -0,0 +1,305 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/**
+ * A load-balancing runner pool.
+ */
+library dart.pkg.isolate.loadbalancer;
+
+import "runner.dart";
+import "src/errors.dart";
+import "src/lists.dart";
+import "dart:async" show Future;
+
+/**
+ * A pool of runners, ordered by load.
+ *
+ * Keeps a pool of runners,
+ * and allows running function through the runner with the lowest current load.
+ */
+class LoadBalancer implements Runner {
+  // A heap-based priority queue of entries, prioritized by `load`.
+  // Each entry has its own entry in the queue, for faster update.
+  List<_LoadBalancerEntry> _queue;
+
+  // The number of entries currently in the queue.
+  int _length;
+
+  // Whether [stop] has been called.
+  Future _stopFuture = null;
+
+  /**
+   * Create a load balancer for [service] with [size] isolates.
+   */
+  LoadBalancer(Iterable<Runner> runners) : this._(_createEntries(runners));
+
+  LoadBalancer._(List<_LoadBalancerEntry> entries)
+      : _queue = entries,
+        _length = entries.length {
+    for (int i = 0; i < _length; i++) {
+      _queue[i].queueIndex = i;
+    }
+  }
+
+  /**
+   * The number of runners currently in the pool.
+   */
+  int get length => _length;
+
+  /**
+   * Asynchronously create [size] runners and create a `LoadBalancer` of those.
+   *
+   * This is a helper function that makes it easy to create a `LoadBalancer`
+   * with asynchronously created runners, for example:
+   *
+   *     var isolatePool = LoadBalancer.create(10, IsolateRunner.spawn);
+   */
+  static Future<LoadBalancer> create(int size, Future<Runner> createRunner()) {
+    return Future.wait(new Iterable.generate(size, (_) => createRunner()),
+                       cleanUp: (Runner runner) { runner.close(); })
+           .then((runners) => new LoadBalancer(runners));
+  }
+
+  static List<_LoadBalancerEntry> _createEntries(Iterable<Runner> runners) {
+    var entries = runners.map((runner) => new _LoadBalancerEntry(runner));
+    return new List<_LoadBalancerEntry>.from(entries, growable: false);
+  }
+
+  /**
+   * Execute the command in the currently least loaded isolate.
+   *
+   * The optional [load] parameter represents the load that the command
+   * is causing on the isolate where it runs.
+   * The number has no fixed meaning, but should be seen as relative to
+   * other commands run in the same load balancer.
+   * The `load` must not be negative.
+   *
+   * If [timeout] and [onTimeout] are provided, they are forwarded to
+   * the runner running the function, which will handle a timeout
+   * as normal.
+   */
+  Future run(function(argument), argument, {Duration timeout,
+                                            onTimeout(),
+                                            int load: 100}) {
+    RangeError.checkNotNegative(load, "load");
+    _LoadBalancerEntry entry = _first;
+    _increaseLoad(entry, load);
+    return entry.run(this, load, function, argument, timeout, onTimeout);
+  }
+
+  /**
+   * Execute the same function in the least loaded [count] isolates.
+   *
+   * This guarantees that the function isn't run twice in the same isolate,
+   * so `count` is not allowed to exceed [length].
+   *
+   * The optional [load] parameter represents the load that the command
+   * is causing on the isolate where it runs.
+   * The number has no fixed meaning, but should be seen as relative to
+   * other commands run in the same load balancer.
+   * The `load` must not be negative.
+   *
+   * If [timeout] and [onTimeout] are provided, they are forwarded to
+   * the runners running the function, which will handle any timeouts
+   * as normal.
+   */
+  List<Future> runMultiple(int count, function(argument), argument,
+                           {Duration timeout,
+                            onTimeout(),
+                            int load: 100}) {
+    RangeError.checkValueInInterval(count, 1, _length, "count");
+    RangeError.checkNotNegative(load, "load");
+    if (count == 1) {
+      return list1(run(function, argument, load: load,
+                       timeout: timeout, onTimeout: onTimeout));
+    }
+    List result = new List<Future>(count);
+    if (count == _length) {
+      // No need to change the order of entries in the queue.
+      for (int i = 0; i < count; i++) {
+        _LoadBalancerEntry entry = _queue[i];
+        entry.load += load;
+        result[i] = entry.run(this, load, function, argument,
+                              timeout, onTimeout);
+      }
+    } else {
+      // Remove the [count] least loaded services and run the
+      // command on each, then add them back to the queue.
+      // This avoids running the same command twice in the same
+      // isolate.
+      List entries = new List(count);
+      for (int i = 0; i < count; i++) {
+        entries[i] = _removeFirst();
+      }
+      for (int i = 0; i < count; i++) {
+        _LoadBalancerEntry entry = entries[i];
+        entry.load += load;
+        _add(entry);
+        result[i] = entry.run(this, load, function, argument,
+                              timeout, onTimeout);
+      }
+    }
+    return result;
+  }
+
+  Future close() {
+    if (_stopFuture != null) return _stopFuture;
+    _stopFuture = MultiError.waitUnordered(_queue.take(_length)
+                                                 .map((e) => e.close()));
+    // Remove all entries.
+    for (int i = 0; i < _length; i++) _queue[i].queueIndex = -1;
+    _queue = null;
+    _length = 0;
+    return _stopFuture;
+  }
+
+  /**
+   * Place [element] in heap at [index] or above.
+   *
+   * Put element into the empty cell at `index`.
+   * While the `element` has higher priority than the
+   * parent, swap it with the parent.
+   */
+  void _bubbleUp(_LoadBalancerEntry element, int index) {
+    while (index > 0) {
+      int parentIndex = (index - 1) ~/ 2;
+      _LoadBalancerEntry parent = _queue[parentIndex];
+      if (element.compareTo(parent) > 0) break;
+      _queue[index] = parent;
+      parent.queueIndex = index;
+      index = parentIndex;
+    }
+    _queue[index] = element;
+    element.queueIndex = index;
+  }
+
+  /**
+   * Place [element] in heap at [index] or above.
+   *
+   * Put element into the empty cell at `index`.
+   * While the `element` has lower priority than either child,
+   * swap it with the highest priority child.
+   */
+  void _bubbleDown(_LoadBalancerEntry element, int index) {
+    while (true) {
+      int childIndex = index * 2 + 1;  // Left child index.
+      if (childIndex >= _length) break;
+      _LoadBalancerEntry child = _queue[childIndex];
+      int rightChildIndex = childIndex + 1;
+      if (rightChildIndex < _length) {
+        _LoadBalancerEntry rightChild = _queue[rightChildIndex];
+        if (rightChild.compareTo(child) < 0) {
+          childIndex = rightChildIndex;
+          child = rightChild;
+        }
+      }
+      if (element.compareTo(child) <= 0) break;
+      _queue[index] = child;
+      child.queueIndex = index;
+      index = childIndex;
+    }
+    _queue[index] = element;
+    element.queueIndex = index;
+  }
+
+  /**
+   * Removes the entry from the queue, but doesn't stop its service.
+   *
+   * The entry is expected to be either added back to the queue
+   * immediately or have its stop method called.
+   */
+  void _remove(_LoadBalancerEntry entry) {
+    int index = entry.queueIndex;
+    if (index < 0) return;
+    entry.queueIndex = -1;
+    _length--;
+    _LoadBalancerEntry replacement = _queue[_length];
+    _queue[_length] = null;
+    if (index < _length) {
+      if (entry.compareTo(replacement) < 0) {
+        _bubbleDown(replacement, index);
+      } else {
+        _bubbleUp(replacement, index);
+      }
+    }
+  }
+
+  /**
+   * Adds entry to the queue.
+   */
+  void _add(_LoadBalancerEntry entry) {
+    if (_stopFuture != null) throw new StateError("LoadBalancer is stopped");
+    assert(entry.queueIndex < 0);
+    if (_queue.length == _length) {
+      _grow();
+    }
+    int index = _length;
+    _length = index + 1;
+    _bubbleUp(entry, index);
+  }
+
+  void _increaseLoad(_LoadBalancerEntry entry, int load) {
+    assert(load >= 0);
+    entry.load += load;
+    if (entry.inQueue) {
+      _bubbleDown(entry, entry.queueIndex);
+    }
+  }
+
+  void _decreaseLoad(_LoadBalancerEntry entry, int load) {
+    assert(load >= 0);
+    entry.load -= load;
+    if (entry.inQueue) {
+      _bubbleUp(entry, entry.queueIndex);
+    }
+  }
+
+  void _grow() {
+    List newQueue = new List(_length * 2);
+    newQueue.setRange(0, _length, _queue);
+    _queue = newQueue;
+  }
+
+  _LoadBalancerEntry get _first {
+    assert(_length > 0);
+    return _queue[0];
+  }
+
+  _LoadBalancerEntry _removeFirst() {
+    _LoadBalancerEntry result = _first;
+    _remove(result);
+    return result;
+  }
+}
+
+class _LoadBalancerEntry implements Comparable<_LoadBalancerEntry> {
+  // The current load on the isolate.
+  int load = 0;
+  // The current index in the heap-queue.
+  // Negative when the entry is not part of the queue.
+  int queueIndex = -1;
+
+  // The service used to send commands to the other isolate.
+  Runner runner;
+
+  _LoadBalancerEntry(Runner runner)
+      : runner = runner;
+
+  /** Whether the entry is still in the queue. */
+  bool get inQueue => queueIndex >= 0;
+
+  Future run(LoadBalancer balancer, int load, function(argumen), argument,
+             Duration timeout, onTimeout()) {
+    return runner.run(function, argument,
+                      timeout: timeout, onTimeout: onTimeout).whenComplete(() {
+      balancer._decreaseLoad(this, load);
+    });
+  }
+
+  Future close() => runner.close();
+
+  int compareTo(_LoadBalancerEntry other) => load - other.load;
+}
+
+
diff --git a/lib/ports.dart b/lib/ports.dart
new file mode 100644
index 0000000..c8de655
--- /dev/null
+++ b/lib/ports.dart
@@ -0,0 +1,269 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/**
+ * Utility functions for setting up ports and sending data.
+ *
+ * This library contains a number of functions that handle the
+ * boiler-plate of setting up a receive port and receiving a
+ * single message on the port.
+ *
+ * There are different functions that offer different ways to
+ * handle the incoming message.
+ *
+ * The simplest function, [singleCallbackPort], takes a callback
+ * and returns a port, and then calls the callback for the first
+ * message sent on the port.
+ *
+ * Other functions intercept the returned value and either
+ * does something with it, or puts it into a [Future] or [Completer].
+ */
+library dart.pkg.isolate.ports;
+
+import "dart:isolate";
+import "dart:async";
+import "src/lists.dart";
+
+/**
+ * Create a [SendPort] that accepts only one message.
+ *
+ * The [callback] function is called once, with the first message
+ * received by the receive port.
+ * All futher messages are ignored.
+ *
+ * If [timeout] is supplied, it is used as a limit on how
+ * long it can take before the message is received. If a
+ * message isn't received in time, the `callback` function
+ * is called once with the [timeoutValue] instead.
+ *
+ * Returns the `SendPort` expecting the single message.
+ *
+ * Equivalent to:
+ *
+ *     (new ReceivePort()
+ *         ..first.timeout(duration, () => timeoutValue).then(callback))
+ *         .sendPort
+ */
+SendPort singleCallbackPort(void callback(response),
+                            {Duration timeout,
+                             var timeoutValue}) {
+  RawReceivePort responsePort = new RawReceivePort();
+  Zone zone = Zone.current;
+  callback = zone.registerUnaryCallback(callback);
+  var timer;
+  responsePort.handler = (response) {
+    responsePort.close();
+    if (timer != null) timer.cancel();
+    zone.runUnary(callback, response);
+  };
+  if (timeout != null) {
+    timer = new Timer(timeout, () {
+      responsePort.close();
+      callback(timeoutValue);
+    });
+  }
+  return responsePort.sendPort;
+}
+
+/**
+ * Create a [SendPort] that accepts only one message.
+ *
+ * When the first message is received, the [callback] function is
+ * called with the message as argument,
+ * and the [completer] is completed with the result of that call.
+ * All further messages are ignored.
+ *
+ * If `callback` is omitted, it defaults to an identity function.
+ * The `callback` call may return a future, and the completer will
+ * wait for that future to complete.
+ *
+ * If [timeout] is supplied, it is used as a limit on how
+ * long it can take before the message is received. If a
+ * message isn't received in time, the [onTimeout] is called,
+ * and `completer` is completed with the result of that call
+ * instead.
+ * The [callback] function will not be interrupted by the time-out,
+ * as long as the initial message is received in time.
+ * If `onTimeout` is omitted, it defaults to completing the `completer` with
+ * a [TimeoutException].
+ *
+ * The [completer] may be a synchronous completer. It is only
+ * completed in response to another event, either a port message or a timer.
+ *
+ * Returns the `SendPort` expecting the single message.
+ */
+SendPort singleCompletePort(Completer completer,
+                            {callback(message),
+                             Duration timeout,
+                             onTimeout()}) {
+  if (callback == null && timeout == null) {
+    return singleCallbackPort(completer.complete);
+  }
+  RawReceivePort responsePort = new RawReceivePort();
+  var timer;
+  if (callback == null) {
+    responsePort.handler = (response) {
+      responsePort.close();
+      if (timer != null) timer.cancel();
+      completer.complete(response);
+    };
+  } else {
+    Zone zone = Zone.current;
+    Function action = zone.registerUnaryCallback((response) {
+      completer.complete(new Future.sync(() => callback(response)));
+    });
+    responsePort.handler = (response) {
+      responsePort.close();
+      if (timer != null) timer.cancel();
+      zone.runUnary(action, response);
+    };
+  }
+  if (timeout != null) {
+    timer = new Timer(timeout, () {
+      responsePort.close();
+      if (onTimeout != null) {
+        completer.complete(new Future.sync(onTimeout));
+      } else {
+        completer.completeError(new TimeoutException("Future not completed",
+                                                     timeout));
+      }
+    });
+  }
+  return responsePort.sendPort;
+}
+
+/**
+ * Creates a [Future], and a [SendPort] that can be used to complete that
+ * future.
+ *
+ * Calls [action] with the response `SendPort`, then waits for someone
+ * to send a value on that port
+ * The returned `Future` is completed with the value sent on the port.
+ *
+ * If [action] throws, which it shouldn't,
+ * the returned future is completed with that error.
+ * Any return value of `action` is ignored, and if it is asynchronous,
+ * it should handle its own errors.
+ *
+ * If [timeout] is supplied, it is used as a limit on how
+ * long it can take before the message is received. If a
+ * message isn't received in time, the [timeoutValue] used
+ * as the returned future's value instead.
+ *
+ * If you want a timeout on the returned future, it's recommended to
+ * use the [timeout] parameter, and not [Future.timeout] on the result.
+ * The `Future` method won't be able to close the underlying [ReceivePort].
+ */
+Future singleResponseFuture(void action(SendPort responsePort),
+                            {Duration timeout,
+                             var timeoutValue}) {
+  Completer completer = new Completer.sync();
+  RawReceivePort responsePort = new RawReceivePort();
+  Timer timer;
+  Zone zone = Zone.current;
+  responsePort.handler = (v) {
+    responsePort.close();
+    if (timer != null) timer.cancel();
+    zone.run(() {
+      completer.complete(v);
+    });
+  };
+  if (timeout != null) {
+    timer = new Timer(timeout, () {
+      responsePort.close();
+      completer.complete(timeoutValue);
+    });
+  }
+  try {
+    action(responsePort.sendPort);
+  } catch (e, s) {
+    responsePort.close();
+    if (timer != null) timer.cancel();
+    // Delay completion because completer is sync.
+    scheduleMicrotask(() { completer.completeError(e, s); });
+  }
+  return completer.future;
+}
+
+
+/**
+ * Send the result of a future, either value or error, as a message.
+ *
+ * The result of [future] is sent on [resultPort] in a form expected by
+ * either [receiveFutureResult], [completeFutureResult], or
+ * by the port of [singleResultFuture].
+ */
+void sendFutureResult(Future future, SendPort resultPort) {
+  future.then((v) { resultPort.send(list1(v)); },
+              onError: (e, s) { resultPort.send(list2("$e", "$s")); });
+}
+
+
+/**
+ * Creates a [Future], and a [SendPort] that can be used to complete that
+ * future.
+ *
+ * Calls [action] with the response `SendPort`, then waits for someone
+ * to send a future result on that port using [sendFutureResult].
+ * The returned `Future` is completed with the future result sent on the port.
+ *
+ * If [action] throws, which it shouldn't,
+ * the returned future is completed with that error,
+ * unless someone manages to send a message on the port before `action` throws.
+ *
+ * If [timeout] is supplied, it is used as a limit on how
+ * long it can take before the message is received. If a
+ * message isn't received in time, the [onTimeout] is called,
+ * and the future is completed with the result of that call
+ * instead.
+ * If `onTimeout` is omitted, it defaults to throwing
+ * a [TimeoutException].
+ */
+Future singleResultFuture(void action(SendPort responsePort),
+                          {Duration timeout,
+                           onTimeout()}) {
+  Completer completer = new Completer.sync();
+  SendPort port = singleCompletePort(completer,
+                                     callback: receiveFutureResult,
+                                     timeout: timeout,
+                                     onTimeout: onTimeout);
+  try {
+    action(port);
+  } catch (e, s) {
+    // This should not happen.
+    sendFutureResult(new Future.error(e, s), port);
+  }
+  return completer.future;
+}
+
+/**
+ * Completes a completer with a message created by [sendFutureResult]
+ *
+ * The [response] must be a message on the format sent by [sendFutureResult].
+ */
+void completeFutureResult(var response, Completer completer) {
+  if (response.length == 2) {
+    var error = new RemoteError(response[0], response[1]);
+    completer.completeError(error, error.stackTrace);
+  } else {
+    var result = response[0];
+    completer.complete(result);
+  }
+}
+
+
+/**
+ * Convertes a received message created by [sendFutureResult] to a future
+ * result.
+ *
+ * The [response] must be a message on the format sent by [sendFutureResult].
+ */
+Future receiveFutureResult(var response) {
+  if (response.length == 2) {
+    var error = new RemoteError(response[0], response[1]);
+    return new Future.error(error, error.stackTrace);
+  }
+  var result = response[0];
+  return new Future.value(result);
+}
diff --git a/lib/registry.dart b/lib/registry.dart
new file mode 100644
index 0000000..9e87c11
--- /dev/null
+++ b/lib/registry.dart
@@ -0,0 +1,497 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/**
+ * An isolate-compatible object registry and lookup service.
+ */
+library dart.pkg.isolate.registry;
+
+import "dart:async" show Future, Completer, TimeoutException;
+import "dart:isolate" show RawReceivePort, SendPort, Capability;
+import "dart:collection" show HashMap, HashSet;
+import "ports.dart";
+import "src/lists.dart";
+
+// Command tags.
+const int _ADD = 0;
+const int _REMOVE = 1;
+const int _ADD_TAGS = 2;
+const int _REMOVE_TAGS = 3;
+const int _GET_TAGS = 4;
+const int _FIND = 5;
+
+/**
+ * An isolate-compatible object registry.
+ *
+ * Objects can be stored as elements of a registry,
+ * have "tags" assigned to them, and be looked up by tag.
+ *
+ * A `Registry` object caches objects found using the [lookup]
+ * method, or added using [add], and returns the same object every time
+ * they are requested.
+ * A different `Registry` object that works on the same registry will not
+ * preserve the identity of elements
+ *
+ * It is recommended to only have one `Registry` object working on the
+ * same registry in each isolate.
+ *
+ * When the registry is shared accross isolates, both elements and tags must
+ * be sendable between the isolates.
+ * Between isolates spawned using [Isolate.spawn] from the same initial
+ * isolate, most objectes can be sent.
+ * Only simple objects can be sent between isolates originating from different
+ * [Isolate.spawnUri] calls.
+ */
+class Registry<T> {
+  // Most operations fail if they haven't received a response for this duration.
+  final Duration _timeout;
+
+  // Each `Registry` object has a cache of objects being controlled by it.
+  // The cache is stored in an [Expando], not on the object.
+  // This allows sending the `Registry` object through a `SendPort` without
+  // also copying the cache.
+  static Expando _caches = new Expando();
+
+  /**
+   * Port for sending command to the central registry mananger.
+   */
+  SendPort _commandPort;
+
+  /**
+   * Create a registry linked to a [RegistryManager] through [commandPort].
+   *
+   * In most cases, a registry is created by using the
+   * [RegistryManager.registry] getter.
+   *
+   * If a registry is used between isolates created using [Isolate.spawnUri],
+   * the `Registry` object can't be sent between the isolates directly.
+   * Instead the [RegistryManager.commandPort] port can be sent and a
+   * `Registry` created from the command port using this constructor.
+   *
+   * The optional [timeout] parameter can be set to the duration
+   * this registry should wait before assuming that an operation
+   * has failed.
+   */
+  Registry.fromPort(SendPort commandPort,
+                    {Duration timeout: const Duration(seconds: 5)})
+      : _commandPort = commandPort,
+        _timeout = timeout;
+
+  _RegistryCache get _cache {
+    _RegistryCache cache = _caches[this];
+    if (cache != null) return cache;
+    cache = new _RegistryCache();
+    _caches[this] = cache;
+    return cache;
+  }
+
+  /**
+   * Check and get the identity of an element.
+   *
+   * Throws if [element] is not an element in the registry.
+   */
+  int _getId(T element) {
+    int id = _cache.id(element);
+    if (id == null) {
+      throw new StateError("Not an element: ${Error.safeToString(element)}");
+    }
+    return id;
+  }
+
+  /**
+   * Adds [element] to the registry with the provided tags.
+   *
+   * Fails if [element] is already in this registry.
+   * An object is already in the registry if it has been added using [add],
+   * or if it was returned by a [lookup] call on this registry object.
+   *
+   * Returns a capability that can be used with [remove] to remove
+   * the element from the registry again.
+   *
+   * The [tags] can be used to distinguish some of the elements
+   * from other elements. Any object can be used as a tag, as long as
+   * it preserves equality when sent through a [SendPort].
+   * This makes [Capability] objects a good choice for tags.
+   */
+  Future<Capability> add(T element, {Iterable tags}) {
+    _RegistryCache cache = _cache;
+    if (cache.contains(element)) {
+      return new Future<Capability>.sync(() {
+        throw new StateError(
+            "Object already in registry: ${Error.safeToString(element)}");
+      });
+    }
+    Completer completer = new Completer<Capability>();
+    SendPort port = singleCompletePort(completer, callback: (List response) {
+      assert(cache.isAdding(element));
+      int id = response[0];
+      Capability removeCapability = response[1];
+      cache.register(id, element);
+      return removeCapability;
+    }, timeout: _timeout, onTimeout: () {
+      cache.stopAdding(element);
+      throw new TimeoutException("Future not completed", _timeout);
+    });
+    if (tags != null) tags = tags.toList(growable: false);
+    cache.setAdding(element);
+    _commandPort.send(list4(_ADD, element, tags, port));
+    return completer.future;
+  }
+
+  /**
+   * Remove the element from the registry.
+   *
+   * Returns `true` if removing the element succeeded, or `false` if the
+   * elements wasn't in the registry, or if it couldn't be removed.
+   *
+   * The [removeCapability] must be the same capability returned by [add]
+   * when the object was added. If the capability is wrong, the
+   * object is not removed, and this function returns false.
+   */
+  Future<bool> remove(T element, Capability removeCapability) {
+    int id = _cache.id(element);
+    if (id == null) {
+      return new Future<bool>.value(false);
+    }
+    Completer completer = new Completer<bool>();
+    SendPort port = singleCompletePort(completer, callback: (bool result) {
+      _cache.remove(id);
+      return result;
+    }, timeout: _timeout);
+    _commandPort.send(list4(_REMOVE, id, removeCapability, port));
+    return completer.future;
+  }
+
+  /**
+   * Add tags to an object in the registry.
+   *
+   * Each element of the registry has a number of tags associated with
+   * it. A tag is either associated with an element or not, adding it more
+   * than once does not make any difference.
+   *
+   * Tags are compared using [Object.==] equality.
+   *
+   * Fails if any of the elements are not in the registry.
+   */
+  Future addTags(Iterable<T> elements, Iterable tags) {
+    List ids = elements.map(_getId).toList(growable: false);
+    return _addTags(ids, tags);
+  }
+
+  /**
+   * Remove tags from an object in the registry.
+   *
+   * After this operation, the [elements] will not be associated to the [tags].
+   * It doesn't matter whether the elements were associated with the tags
+   * before or not.
+   *
+   * Fails if any of the elements are not in the registry.
+   */
+  Future removeTags(Iterable<T> elements, Iterable tags) {
+    List ids = elements.map(_getId).toList(growable: false);
+    tags = tags.toList(growable: false);
+    Completer completer = new Completer();
+    SendPort port = singleCompletePort(completer, timeout: _timeout);
+    _commandPort.send(list4(_REMOVE_TAGS, ids, tags, port));
+    return completer.future;
+  }
+
+  Future _addTags(List<int> ids, Iterable tags) {
+    tags = tags.toList(growable: false);
+    Completer completer = new Completer();
+    SendPort port = singleCompletePort(completer, timeout: _timeout);
+    _commandPort.send(list4(_ADD_TAGS, ids, tags, port));
+    return completer.future;
+  }
+
+  /**
+   * Finds a number of elements that have all the desired [tags].
+   *
+   * If [tags] is omitted or empty, any element of the registry can be
+   * returned.
+   *
+   * If [max] is specified, it must be greater than zero.
+   * In that case, at most the first `max` results are returned,
+   * in whatever order the registry finds its results.
+   * Otherwise all matching elements are returned.
+   */
+  Future<List<T>> lookup({Iterable tags, int max}) {
+    if (max != null && max < 1) {
+      throw new RangeError.range(max, 1, null, "max");
+    }
+    if (tags != null) tags = tags.toList(growable: false);
+    Completer completer = new Completer<List<T>>();
+    SendPort port = singleCompletePort(completer, callback: (List response) {
+      // Response is even-length list of (id, element) pairs.
+      _RegistryCache cache = _cache;
+      int count = response.length ~/ 2;
+      List result = new List(count);
+      for (int i = 0; i < count; i++) {
+        int id = response[i * 2];
+        var element = response[i * 2 + 1];
+        element = cache.register(id, element);
+        result[i] = element;
+      }
+      return result;
+    }, timeout: _timeout);
+    _commandPort.send(list4(_FIND, tags, max, port));
+    return completer.future;
+  }
+}
+
+/**
+ * Isolate-local cache used by a [Registry].
+ *
+ * Maps between id-numbers and elements.
+ * An object is considered an element of the registry if it
+ */
+class _RegistryCache {
+  // Temporary marker until an object gets an id.
+  static const int _BEING_ADDED = -1;
+
+  final Map<int, Object> id2object = new HashMap();
+  final Map<Object, int> object2id = new HashMap.identity();
+
+  int id(Object object) {
+    int result = object2id[object];
+    if (result == _BEING_ADDED) return null;
+    return result;
+  }
+
+  Object operator[](int id) => id2object[id];
+
+  // Register a pair of id/object in the cache.
+  // if the id is already in the cache, just return the existing
+  // object.
+  Object register(int id, Object object) {
+    object = id2object.putIfAbsent(id, () {
+      object2id[object] = id;
+      return object;
+    });
+    return object;
+  }
+
+  bool isAdding(element) => object2id[element] == _BEING_ADDED;
+
+  void setAdding(element)  {
+    assert(!contains(element));
+    object2id[element] = _BEING_ADDED;
+  }
+
+  void stopAdding(element) {
+    assert(object2id[element] == _BEING_ADDED);
+    object2id.remove(element);
+  }
+
+  void remove(int id) {
+    var element = id2object.remove(id);
+    if (element != null) {
+      object2id.remove(element);
+    }
+  }
+
+  bool contains(element) => object2id.containsKey(element);
+}
+
+/**
+ * The central repository used by distributed [Registry] instances.
+ */
+class RegistryManager {
+  final Duration _timeout;
+  int _nextId = 0;
+  RawReceivePort _commandPort;
+
+  /**
+   * Maps id to entry. Each entry contains the id, the element, its tags,
+   * and a capability required to remove it again.
+   */
+  Map<int, _RegistryEntry> _entries = new HashMap();
+  Map<Object, Set<int>> _tag2id = new HashMap();
+
+  /**
+   * Create a new registry managed by the created [RegistryManager].
+   *
+   * The optional [timeout] parameter can be set to the duration
+   * registry objects should wait before assuming that an operation
+   * has failed.
+   */
+  RegistryManager({timeout: const Duration(seconds: 5)})
+      : _timeout = timeout,
+        _commandPort = new RawReceivePort() {
+      _commandPort.handler = _handleCommand;
+  }
+
+  /**
+   * The command port receiving commands for the registry manager.
+   *
+   * Use this port with [Registry.fromPort] to link a registry to the
+   * manager in isolates where you can't send a [Registry] object directly.
+   */
+  SendPort get commandPort => _commandPort.sendPort;
+
+  /**
+   * Get a registry backed by this manager.
+   *
+   * This registry can be sent to other isolates created using
+   * [Isolate.spawn].
+   */
+  Registry get registry => new Registry.fromPort(_commandPort.sendPort,
+                                                 timeout: _timeout);
+
+  // Used as argument to putIfAbsent.
+  static Set _createSet() => new HashSet();
+
+  void _handleCommand(List command) {
+    switch(command[0]) {
+      case _ADD:
+        _add(command[1], command[2], command[3]);
+        return;
+      case _REMOVE:
+        _remove(command[1], command[2], command[3]);
+        return;
+      case _ADD_TAGS:
+        _addTags(command[1], command[2], command[3]);
+        return;
+      case _REMOVE_TAGS:
+        _removeTags(command[1], command[2], command[3]);
+        return;
+      case _GET_TAGS:
+        _getTags(command[1], command[2]);
+        return;
+      case _FIND:
+        _find(command[1], command[2], command[3]);
+        return;
+      default:
+        throw new UnsupportedError("Unknown command: ${command[0]}");
+    }
+  }
+
+  void _add(Object object, List tags, SendPort replyPort) {
+    int id = ++_nextId;
+    var entry = new _RegistryEntry(id, object);
+    _entries[id] = entry;
+    if (tags != null) {
+      for (var tag in tags) {
+        entry.tags.add(tag);
+        _tag2id.putIfAbsent(tag, _createSet).add(id);
+      }
+    }
+    replyPort.send(list2(id, entry.removeCapability));
+  }
+
+  void _remove(int id, Capability removeCapability, SendPort replyPort) {
+    _RegistryEntry entry = _entries[id];
+    if (entry == null || entry.removeCapability != removeCapability) {
+      replyPort.send(false);
+      return;
+    }
+    _entries.remove(id);
+    for (var tag in entry.tags) {
+      _tag2id[tag].remove(id);
+    }
+    replyPort.send(true);
+  }
+
+  void _addTags(List<int> ids, List tags, SendPort replyPort) {
+    assert(tags != null);
+    assert(tags.isNotEmpty);
+    for (int id in ids) {
+      _RegistryEntry entry = _entries[id];
+      if (entry == null) continue;  // Entry was removed.
+      entry.tags.addAll(tags);
+      for (var tag in tags) {
+        Set ids = _tag2id.putIfAbsent(tag, _createSet);
+        ids.add(id);
+      }
+    }
+    replyPort.send(null);
+  }
+
+  void _removeTags(List<int> ids, List tags, SendPort replyPort) {
+    assert(tags != null);
+    assert(tags.isNotEmpty);
+    for (int id in ids) {
+      _RegistryEntry entry = _entries[id];
+      if (entry == null) continue;  // Object was removed.
+      entry.tags.removeAll(tags);
+    }
+    for (var tag in tags) {
+      Set tagIds = _tag2id[tag];
+      if (tagIds == null) continue;
+      tagIds.removeAll(ids);
+    }
+    replyPort.send(null);
+  }
+
+  void _getTags(int id, SendPort replyPort) {
+    _RegistryEntry entry = _entries[id];
+    if (entry != null) {
+      replyPort.send(entry.tags.toList(growable: false));
+    } else {
+      replyPort.send(const []);
+    }
+  }
+
+  Iterable<int> _findTaggedIds(List tags) {
+    var matchingFirstTagIds = _tag2id[tags[0]];
+    if (matchingFirstTagIds == null) {
+      return const [];
+    }
+    if (matchingFirstTagIds.isEmpty || tags.length == 1) {
+      return matchingFirstTagIds;
+    }
+    // Create new set, then start removing ids not also matched
+    // by other tags.
+    Set<int> matchingIds = matchingFirstTagIds.toSet();
+    for (int i = 1; i < tags.length; i++) {
+      var tagIds = _tag2id[tags[i]];
+      if (tagIds == null) return const [];
+      matchingIds.retainAll(tagIds);
+      if (matchingIds.isEmpty) break;
+    }
+    return matchingIds;
+  }
+
+  void _find(List tags, int max, SendPort replyPort) {
+    assert(max == null || max > 0);
+    List result = [];
+    if (tags == null || tags.isEmpty) {
+      var entries = _entries.values;
+      if (max != null) entries = entries.take(max);
+      for (_RegistryEntry entry in entries) {
+        result.add(entry.id);
+        result.add(entry.element);
+      }
+      replyPort.send(result);
+      return;
+    }
+    var matchingIds = _findTaggedIds(tags);
+    if (max == null) max = matchingIds.length;  // All results.
+    for (var id in matchingIds) {
+      result.add(id);
+      result.add(_entries[id].element);
+      max--;
+      if (max == 0) break;
+    }
+    replyPort.send(result);
+  }
+
+  /**
+   * Shut down the registry service.
+   *
+   * After this, all [Registry] operations will time out.
+   */
+  void close() {
+    _commandPort.close();
+  }
+}
+
+/** Entry in [RegistryManager]. */
+class _RegistryEntry {
+  final int id;
+  final Object element;
+  final Set tags = new HashSet();
+  final Capability removeCapability = new Capability();
+  _RegistryEntry(this.id, this.element);
+}
diff --git a/lib/runner.dart b/lib/runner.dart
new file mode 100644
index 0000000..cd26d5a
--- /dev/null
+++ b/lib/runner.dart
@@ -0,0 +1,61 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/**
+ * A [Runner] runs a function, potentially in a different scope
+ * or even isolate.
+ */
+library dart.pkg.isolate.runner;
+
+import "dart:async" show Future;
+
+/**
+ * Calls a function with an argument.
+ *
+ * The function can be run in a different place from where the `Runner`
+ * resides, e.g., in a different isolate.
+ */
+class Runner {
+  /**
+   * Request that [function] be called with the provided arguments.
+   *
+   * The arguments will be applied to the function in the same way as by
+   * [Function.apply], but it may happen in a diffent isolate or setting.
+   *
+   * It's necessary that the function can be sent through a [SendPort]
+   * if the call is performed in another isolate.
+   * That means the other isolate should be created using [Isolate.spawn]
+   * so that it is running the same code as the sending isoalte,
+   * and the function must be a static or top-level function.
+   *
+   * Waits for the result of the call, and completes the returned future
+   * with the result, whether it's a value or an error.
+   *
+   * If the returned future does not complete before `timeLimit` has passed,
+   * the [onTimeout] action is executed instead, and its result (whether it
+   * returns or throws) is used as the result of the returned future.
+   *
+   * If `onTimeout` is omitted, a timeout will cause the returned future to
+   * complete with a [TimeoutException].
+   *
+   * The default implementation runs the function in the current isolate.
+   */
+  Future run(function(argument), Object argument,
+             {Duration timeout, onTimeout()}) {
+    Future result = new Future.sync(() => function(argument));
+    if (timeout != null) {
+      result = result.timeout(timeout, onTimeout: onTimeout);
+    }
+    return result;
+  }
+
+  /**
+   * Stop the runner.
+   *
+   * If the runner has allocated resources, e.g., an isolate, it should
+   * be released. No further calls to [run] should be made after calling
+   * stop.
+   */
+  Future close() => new Future.value();
+}
diff --git a/lib/src/errors.dart b/lib/src/errors.dart
new file mode 100644
index 0000000..91abe5b
--- /dev/null
+++ b/lib/src/errors.dart
@@ -0,0 +1,188 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// TODO(lrn): This should be in package:async?
+/**
+ * Helper functions for working with errors.
+ *
+ * The [MultiError] class combines multiple errors into one object,
+ * and the [MultiError.wait] function works like [Future.wait] except
+ * that it returns all the errors.
+ */
+library pkg.isolate.errors;
+
+import "dart:async";
+
+class MultiError extends Error {
+  // Limits the number of lines included from each error's error message.
+  // A best-effort attempt is made at keeping below this number of lines
+  // in the output.
+  // If there are too many errors, they will all get at least one line.
+  static const int _MAX_LINES = 55;
+  // Minimum number of lines in the toString for each error.
+  static const int _MIN_LINES_PER_ERROR = 1;
+
+  /** The actual errors. */
+  final List errors;
+
+  /**
+   * Create a `MultiError` based on a list of errors.
+   *
+   * The errors represent errors of a number of individual operations.
+   *
+   * The list may contain `null` values, if the index of the error in the
+   * list is useful.
+   */
+  MultiError(this.errors);
+
+  /**
+   * Waits for all [futures] to complete, like [Future.wait].
+   *
+   * Where `Future.wait` only reports one error, even if multiple
+   * futures complete with errors, this function will complete
+   * with a [MultiError] if more than one future completes with an error.
+   *
+   * The order of values is not preserved (if that is needed, use
+   * [wait]).
+   */
+  static Future<List> waitUnordered(Iterable<Future> futures,
+                                    {cleanUp(successResult)}) {
+    Completer completer;
+    int count = 0;
+    int errors = 0;
+    int values = 0;
+    // Initilized to `new List(count)` when count is known.
+    // Filled up with values on the left, errors on the right.
+    // Order is not preserved.
+    List results;
+    void checkDone() {
+      if (errors + values < count) return;
+      if (errors == 0) {
+        completer.complete(results);
+        return;
+      }
+      var errorList = results.sublist(results.length - errors);
+      completer.completeError(new MultiError(errorList));
+    };
+    var handleValue = (v) {
+      // If this fails because [results] is null, there is a future
+      // which breaks the Future API by completing immediately when
+      // calling Future.then, probably by misusing a synchronous completer.
+      results[values++] = v;
+      if (errors > 0 && cleanUp != null) {
+        new Future.sync(() => cleanUp(v));
+      }
+      checkDone();
+    };
+    var handleError = (e, s) {
+      if (errors == 0 && cleanUp != null) {
+        for (int i = 0; i < values; i++) {
+          var value = results[i];
+          if (value != null) new Future.sync(() => cleanUp(value));
+        }
+      }
+      results[results.length - ++errors] = e;
+      checkDone();
+    };
+    for (Future future in futures) {
+      count++;
+      future.then(handleValue, onError: handleError);
+    }
+    if (count == 0) return new Future.value(new List(0));
+    results = new List(count);
+    completer = new Completer();
+    return completer.future;
+  }
+
+  /**
+   * Waits for all [futures] to complete, like [Future.wait].
+   *
+   * Where `Future.wait` only reports one error, even if multiple
+   * futures complete with errors, this function will complete
+   * with a [MultiError] if more than one future completes with an error.
+   *
+   * The order of values is preserved, and if any error occurs, the
+   * [MultiError.errors] list will have errors in the corresponding slots,
+   * and `null` for non-errors.
+   */
+  Future<List> wait(Iterable<Future> futures,
+                    {cleanUp(successResult)}) {
+    Completer completer;
+    int count = 0;
+    bool hasError = false;
+    int completed = 0;
+    // Initalized to `new List(count)` when count is known.
+    // Filled with values until the first error, then cleared
+    // and filled with errors.
+    List results;
+    void checkDone() {
+      completed++;
+      if (completed < count) return;
+      if (!hasError) {
+        completer.complete(results);
+        return;
+      }
+      completer.completeError(new MultiError(results));
+    };
+    for (Future future in futures) {
+      int i = count;
+      count++;
+      future.then((v) {
+        if (!hasError) {
+          results[i] = v;
+        } else if (cleanUp != null) {
+          new Future.sync(() => cleanUp(v));
+        }
+        checkDone();
+      }, onError: (e, s) {
+        if (!hasError) {
+          if (cleanUp != null) {
+            for (int i = 0; i < results.length; i++) {
+              var result = results[i];
+              if (result != null) new Future.sync(() => cleanUp(result));
+            }
+          }
+          results.fillRange(0, results.length, null);
+          hasError = true;
+        }
+        results[i] = e;
+        checkDone();
+      });
+    }
+    if (count == 0) return new Future.value(new List(0));
+    results = new List(count);
+    completer = new Completer();
+    return completer.future;
+  }
+
+
+  String toString() {
+    StringBuffer buffer = new StringBuffer();
+    buffer.write("Multiple Errors:\n");
+    int linesPerError = _MAX_LINES ~/ errors.length;
+    if (linesPerError < _MIN_LINES_PER_ERROR) {
+      linesPerError = _MIN_LINES_PER_ERROR;
+    }
+
+    for (int index = 0; index < errors.length; index++) {
+      var error = errors[index];
+      if (error == null) continue;
+      String errorString = error.toString();
+      int end = 0;
+      for (int i = 0; i < linesPerError; i++) {
+        end = errorString.indexOf('\n', end) + 1;
+        if (end == 0) {
+          end = errorString.length;
+          break;
+        }
+      }
+      buffer.write("#$index: ");
+      buffer.write(errorString.substring(0, end));
+      if (end < errorString.length) {
+        buffer.write("...\n");
+      }
+    }
+    return buffer.toString();
+  }
+}
diff --git a/lib/src/functionref.dart b/lib/src/functionref.dart
new file mode 100644
index 0000000..d4c0e8e
--- /dev/null
+++ b/lib/src/functionref.dart
@@ -0,0 +1,80 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/**
+ * Convert top-level or static functions to objects that can be sent to
+ * other isolates.
+ *
+ * This package is only needed until such functions can be sent directly
+ * through send-ports.
+ */
+library pkg.isolate.functionref;
+
+import "dart:mirrors";
+
+abstract class FunctionRef {
+  static FunctionRef from(Function function) {
+    var cm = reflect(function);
+    if (cm is ClosureMirror) {
+      MethodMirror fm = cm.function;
+      if (fm.isRegularMethod && fm.isStatic) {
+        Symbol functionName = fm.simpleName;
+        Symbol className;
+        DeclarationMirror owner = fm.owner;
+        if (owner is ClassMirror) {
+          className = owner.simpleName;
+          owner = owner.owner;
+        }
+        if (owner is LibraryMirror) {
+          LibraryMirror ownerLibrary = owner;
+          Uri libraryUri = ownerLibrary.uri;
+          return new _FunctionRef(libraryUri, className, functionName);
+        }
+      }
+      throw new ArgumentError.value(function, "function",
+                                    "Not a static or top-level function");
+    }
+    // It's a Function but not a closure, so it's a callable object.
+    return new _CallableObjectRef(function);
+  }
+
+  Function get function;
+}
+
+class _FunctionRef implements FunctionRef {
+  final Uri libraryUri;
+  final Symbol className;
+  final Symbol functionName;
+
+  _FunctionRef(this.libraryUri, this.className, this.functionName);
+
+  Function get function {
+    LibraryMirror lm = currentMirrorSystem().libraries[libraryUri];
+    if (lm != null) {
+      ObjectMirror owner = lm;
+      if (className != null) {
+        ClassMirror cm = lm.declarations[className];
+        owner = cm;
+      }
+      if (owner != null) {
+        ClosureMirror function = owner.getField(this.functionName);
+        if (function != null) return function.reflectee;
+      }
+    }
+    String functionName = MirrorSystem.getName(this.functionName);
+    String classQualifier = "";
+    if (this.className != null) {
+      classQualifier  = " in class ${MirrorSystem.getName(this.className)}";
+    }
+    throw new UnsupportedError(
+      "Function $functionName${classQualifier} not found in library $libraryUri"
+    );
+  }
+}
+
+class _CallableObjectRef implements FunctionRef {
+  final Function object;
+  _CallableObjectRef(this.object);
+  Function get function => object;
+}
diff --git a/lib/src/lists.dart b/lib/src/lists.dart
new file mode 100644
index 0000000..011ecf2
--- /dev/null
+++ b/lib/src/lists.dart
@@ -0,0 +1,31 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/** Utility functions to create fixed-length lists. */
+library pkg.isolate.util.lists;
+
+/** Create a single-element fixed-length list. */
+List list1(v1) => new List(1)..[0] = v1;
+
+/** Create a two-element fixed-length list. */
+List list2(v1, v2) => new List(2)..[0] = v1
+                                 ..[1] = v2;
+
+/** Create a three-element fixed-length list. */
+List list3(v1, v2, v3) => new List(3)..[0] = v1
+                                     ..[1] = v2
+                                     ..[2] = v3;
+
+/** Create a four-element fixed-length list. */
+List list4(v1, v2, v3, v4) => new List(4)..[0] = v1
+                                         ..[1] = v2
+                                         ..[2] = v3
+                                         ..[3] = v4;
+
+/** Create a five-element fixed-length list. */
+List list5(v1, v2, v3, v4, v5) => new List(5)..[0] = v1
+                                             ..[1] = v2
+                                             ..[2] = v3
+                                             ..[3] = v4
+                                             ..[4] = v5;
diff --git a/lib/src/multiplexport.dart b/lib/src/multiplexport.dart
new file mode 100644
index 0000000..d0c2f67
--- /dev/null
+++ b/lib/src/multiplexport.dart
@@ -0,0 +1,103 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/**
+ * A multiplexing [RawReceivePort].
+ *
+ * Allows creating a number of [RawReceivePort] implementations that all send
+ * messages through the same real `RawReceivePort`.
+ *
+ * This allows reducing the number of receive ports created, but adds an
+ * overhead to each message.
+ * If a library creates many short-lived receive ports, multiplexing might be
+ * faster.
+ *
+ * To use multiplexing receive ports, create and store a
+ * [RawReceivePortMultiplexer], and create receive ports by calling
+ * `multiplexer.createRawReceivePort(handler)` where you would otherwise
+ * write `new RawReceivePort(handler)`.
+ *
+ * Remember to [close] the multiplexer when it is no longer needed.
+ * `
+ * (TODO: Check if it really is faster - creating a receive port requires a
+ * global mutex, so it may be a bottleneck, but it's not clear how slow it is).
+ */
+library pkg.isolate.multiplexreceiveport;
+
+import "dart:isolate";
+import "dart:collection";
+import "lists.dart";
+
+class _MultiplexRawReceivePort implements RawReceivePort {
+  final RawReceivePortMultiplexer _multiplexer;
+  final int _id;
+  Function _handler;
+
+  _MultiplexRawReceivePort(this._multiplexer, this._id, this._handler);
+
+  void set handler(void handler(response)) {
+    this._handler = handler;
+  }
+
+  void close() {
+    _multiplexer._closePort(_id);
+  }
+
+  SendPort get sendPort => _multiplexer._createSendPort(_id);
+
+  void _invokeHandler(message) { _handler(message); }
+}
+
+class _MultiplexSendPort implements SendPort {
+  final SendPort _sendPort;
+  final int _id;
+  _MultiplexSendPort(this._id, this._sendPort);
+
+  void send(message) {
+    _sendPort.send(list2(_id, message));
+  }
+}
+
+/**
+ * A shared [RawReceivePort] that distributes messages to
+ * [RawReceivePort] instances that it manages.
+ */
+class RawReceivePortMultiplexer {
+  final RawReceivePort _port = new RawReceivePort();
+  final Map<int, _MultiplexRawReceivePort> _map = new HashMap();
+  int _nextId = 0;
+
+  RawReceivePortMultiplexer() {
+    _port.handler = _multiplexResponse;
+  }
+
+  RawReceivePort createRawReceivePort([void handler(value)]) {
+    int id = _nextId++;
+    var result = new _MultiplexRawReceivePort(this, id, handler);
+    _map[id] = result;
+    return result;
+  }
+
+  void close() {
+    _port.close();
+  }
+
+  void _multiplexResponse(list) {
+    int id = list[0];
+    var message = list[1];
+    _MultiplexRawReceivePort receivePort = _map[id];
+    // If the receive port is closed, messages are dropped, just as for
+    // the normal ReceivePort.
+    if (receivePort == null) return;  // Port closed.
+    receivePort._invokeHandler(message);
+  }
+
+  SendPort _createSendPort(int id) {
+    return new _MultiplexSendPort(id, _port.sendPort);
+  }
+
+  void _closePort(int id) {
+    _map.remove(id);
+  }
+}
diff --git a/pubspec.yaml b/pubspec.yaml
new file mode 100644
index 0000000..8e2923e
--- /dev/null
+++ b/pubspec.yaml
@@ -0,0 +1,11 @@
+name: isolate
+version: 0.1.0
+author: Dart Team <misc@dartlang.org>
+description: Utility functions and classes related to the 'dart:isolate' library.
+homepage: https://github.com/dart-lang/isolate
+
+environment:
+  sdk: ">=1.8.0 <2.0.0"
+
+dev_dependencies:
+  unittest: ">=0.10.0 <0.12.0"
diff --git a/test/isolaterunner_test.dart b/test/isolaterunner_test.dart
new file mode 100644
index 0000000..4b98b58
--- /dev/null
+++ b/test/isolaterunner_test.dart
@@ -0,0 +1,114 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library dart.pkg.isolate.isolaterunner_test;
+
+import "package:isolate/isolaterunner.dart";
+import "package:unittest/unittest.dart";
+import "dart:async" show Future;
+import "dart:isolate" show Capability;
+
+const MS = const Duration(milliseconds: 1);
+
+void main() {
+  test("create-close", testCreateClose);
+  test("create-run-close", testCreateRunClose);
+  test("separate-isolates", testSeparateIsolates);
+  testIsolateFunctions();
+}
+
+Future testCreateClose() {
+  return IsolateRunner.spawn().then((IsolateRunner runner) {
+    return runner.close();
+  });
+}
+
+Future testCreateRunClose() {
+  return IsolateRunner.spawn().then((IsolateRunner runner) {
+    return runner.run(id, "testCreateRunClose").then((v) {
+      expect(v, "testCreateRunClose");
+      return runner.close().then((_) => runner.onExit);
+    });
+  });
+}
+
+Future testSeparateIsolates() {
+  // Check that each isolate has its own _global variable.
+  return Future.wait(new Iterable.generate(2, (_) => IsolateRunner.spawn()))
+    .then((runners) {
+      Future runAll(action(IsolateRunner runner, int index)) {
+        var indices = new Iterable.generate(runners.length);
+        return Future.wait(indices.map((i) => action(runners[i], i)));
+      }
+
+      return runAll((runner, i) => runner.run(setGlobal, i + 1))
+             .then((values) {
+               expect(values, [1, 2]);
+               expect(_global, null);
+               return runAll((runner, _) => runner.run(getGlobal, null));
+             })
+             .then((values) {
+               expect(values, [1, 2]);
+               expect(_global, null);
+               return runAll((runner, _) => runner.close());
+             });
+    });
+}
+
+void testIsolateFunctions() {
+  test("pause", () {
+    bool mayComplete = false;
+    return IsolateRunner.spawn().then((isolate) {
+      isolate.pause();
+      new Future.delayed(MS * 500, () {
+        mayComplete = true;
+        isolate.resume();
+      });
+      isolate.run(id, 42).then((v) {
+        expect(v, 42);
+        expect(mayComplete, isTrue);
+      }).whenComplete(isolate.close);
+    });
+  });
+  test("pause2", () {
+    Capability c1 = new Capability();
+    Capability c2 = new Capability();
+    int mayCompleteCount = 2;
+    return IsolateRunner.spawn().then((isolate) {
+      isolate.pause(c1);
+      isolate.pause(c2);
+      new Future.delayed(MS * 500, () {
+        mayCompleteCount--;
+        isolate.resume(c1);
+      });
+      new Future.delayed(MS * 500, () {
+        mayCompleteCount--;
+        isolate.resume(c2);
+      });
+      isolate.run(id, 42).then((v) {
+        expect(v, 42);
+        expect(mayCompleteCount, 0);
+      }).whenComplete(isolate.close);
+    });
+  });
+  test("ping", () {
+    return IsolateRunner.spawn().then((isolate) {
+      return isolate.ping().then((v) {
+        expect(v, isTrue);
+        return isolate.close();
+      });
+    });
+  });
+  test("kill", () {
+    return IsolateRunner.spawn().then((isolate) {
+      return isolate.kill();
+    });
+  });
+}
+
+id(x) => x;
+
+var _global;
+getGlobal(_) => _global;
+void setGlobal(v) => _global = v;
diff --git a/test/ports_test.dart b/test/ports_test.dart
new file mode 100644
index 0000000..9586b96
--- /dev/null
+++ b/test/ports_test.dart
@@ -0,0 +1,394 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library dart.pkg.isolate.isolaterunner_test;
+
+import "package:isolate/ports.dart";
+import "package:unittest/unittest.dart";
+import "dart:async";
+import "dart:isolate";
+
+const Duration MS = const Duration(milliseconds: 1);
+
+main() {
+  testSingleCallbackPort();
+  testSingleCompletePort();
+  testSingleResponseFuture();
+  testSingleResultFuture();
+}
+
+void testSingleCallbackPort() {
+  test("singleCallbackValue", () {
+    Completer completer = new Completer.sync();
+    SendPort p = singleCallbackPort(completer.complete);
+    p.send(42);
+    return completer.future.then((v) {
+      expect(v, 42);
+    });
+  });
+
+  test("singleCallbackFirstValue", () {
+    Completer completer = new Completer.sync();
+    SendPort p = singleCallbackPort(completer.complete);
+    p.send(42);
+    p.send(37);
+    return completer.future.then((v) {
+      expect(v, 42);
+    });
+  });
+  test("singleCallbackValue", () {
+    Completer completer = new Completer.sync();
+    SendPort p = singleCallbackPort(completer.complete);
+    p.send(42);
+    return completer.future.then((v) {
+      expect(v, 42);
+    });
+  });
+
+  test("singleCallbackValueBeforeTimeout", () {
+    Completer completer = new Completer.sync();
+    SendPort p = singleCallbackPort(completer.complete,
+                                    timeout: MS * 500);
+    p.send(42);
+    return completer.future.then((v) {
+      expect(v, 42);
+    });
+  });
+
+  test("singleCallbackTimeout", () {
+    Completer completer = new Completer.sync();
+    singleCallbackPort(completer.complete,
+                       timeout: MS * 100,
+                       timeoutValue: 37);
+    return completer.future.then((v) {
+      expect(v, 37);
+    });
+  });
+
+  test("singleCallbackTimeoutFirst", () {
+    Completer completer = new Completer.sync();
+    SendPort p = singleCallbackPort(completer.complete,
+                                    timeout: MS * 100,
+                                    timeoutValue: 37);
+    new Timer(MS * 500, () => p.send(42));
+    return completer.future.then((v) {
+      expect(v, 37);
+    });
+  });
+}
+
+
+void testSingleCompletePort() {
+  test("singleCompleteValue", () {
+    Completer completer = new Completer.sync();
+    SendPort p = singleCompletePort(completer);
+    p.send(42);
+    return completer.future.then((v) {
+      expect(v, 42);
+    });
+  });
+
+  test("singleCompleteValueCallback", () {
+    Completer completer = new Completer.sync();
+    SendPort p = singleCompletePort(completer,
+                                    callback: (v) {
+                                      expect(42, v);
+                                      return 87;
+                                    });
+    p.send(42);
+    return completer.future.then((v) {
+      expect(v, 87);
+    });
+  });
+
+  test("singleCompleteValueCallbackFuture", () {
+    Completer completer = new Completer.sync();
+    SendPort p = singleCompletePort(completer,
+                                    callback: (v) {
+                                      expect(42, v);
+                                      return new Future.delayed(MS * 500,
+                                                                () => 88);
+                                    });
+    p.send(42);
+    return completer.future.then((v) {
+      expect(v, 88);
+    });
+  });
+
+  test("singleCompleteValueCallbackThrows", () {
+    Completer completer = new Completer.sync();
+    SendPort p = singleCompletePort(completer,
+                                    callback: (v) {
+                                      expect(42, v);
+                                      throw 89;
+                                    });
+    p.send(42);
+    return completer.future.then((v) {
+      fail("unreachable");
+    }, onError: (e, s) {
+      expect(e, 89);
+    });
+  });
+
+  test("singleCompleteValueCallbackThrowsFuture", () {
+    Completer completer = new Completer.sync();
+    SendPort p = singleCompletePort(completer,
+                                    callback: (v) {
+                                      expect(42, v);
+                                      return new Future.error(90);
+                                    });
+    p.send(42);
+    return completer.future.then((v) {
+      fail("unreachable");
+    }, onError: (e, s) {
+      expect(e, 90);
+    });
+  });
+
+  test("singleCompleteFirstValue", () {
+    Completer completer = new Completer.sync();
+    SendPort p = singleCompletePort(completer);
+    p.send(42);
+    p.send(37);
+    return completer.future.then((v) {
+      expect(v, 42);
+    });
+  });
+
+  test("singleCompleteFirstValueCallback", () {
+    Completer completer = new Completer.sync();
+    SendPort p = singleCompletePort(completer, callback: (v) {
+                                                 expect(v, 42);
+                                                 return 87;
+                                               });
+    p.send(42);
+    p.send(37);
+    return completer.future.then((v) {
+      expect(v, 87);
+    });
+  });
+
+  test("singleCompleteValueBeforeTimeout", () {
+    Completer completer = new Completer.sync();
+    SendPort p = singleCompletePort(completer,
+                                    timeout: MS * 500);
+    p.send(42);
+    return completer.future.then((v) {
+      expect(v, 42);
+    });
+  });
+
+  test("singleCompleteTimeout", () {
+    Completer completer = new Completer.sync();
+    singleCompletePort(completer,
+                       timeout: MS * 100);
+    return completer.future.then((v) {
+      fail("unreachable");
+    }, onError: (e, s) {
+      expect(e is TimeoutException, isTrue);
+    });
+  });
+
+  test("singleCompleteTimeoutCallback", () {
+    Completer completer = new Completer.sync();
+    singleCompletePort(completer,
+                       timeout: MS * 100,
+                       onTimeout: () => 87);
+    return completer.future.then((v) {
+      expect(v, 87);
+    });
+  });
+
+  test("singleCompleteTimeoutCallbackThrows", () {
+    Completer completer = new Completer.sync();
+    singleCompletePort(completer,
+                       timeout: MS * 100,
+                       onTimeout: () => throw 91);
+    return completer.future.then((v) {
+      fail("unreachable");
+    }, onError: (e, s) {
+      expect(e, 91);
+    });
+  });
+
+  test("singleCompleteTimeoutCallbackFuture", () {
+    Completer completer = new Completer.sync();
+    singleCompletePort(completer,
+                       timeout: MS * 100,
+                       onTimeout: () => new Future.value(87));
+    return completer.future.then((v) {
+      expect(v, 87);
+    });
+  });
+
+  test("singleCompleteTimeoutCallbackThrowsFuture", () {
+    Completer completer = new Completer.sync();
+    singleCompletePort(completer,
+                       timeout: MS * 100,
+                       onTimeout: () => new Future.error(92));
+    return completer.future.then((v) {
+      fail("unreachable");
+    }, onError: (e, s) {
+      expect(e, 92);
+    });
+  });
+
+  test("singleCompleteTimeoutCallbackSLow", () {
+    Completer completer = new Completer.sync();
+    singleCompletePort(
+        completer,
+        timeout: MS * 100,
+        onTimeout: () => new Future.delayed(MS * 500, () => 87));
+    return completer.future.then((v) {
+      expect(v, 87);
+    });
+  });
+
+  test("singleCompleteTimeoutCallbackThrowsSlow", () {
+    Completer completer = new Completer.sync();
+    singleCompletePort(
+        completer,
+        timeout: MS * 100,
+        onTimeout: () => new Future.delayed(MS * 500, () => throw 87));
+    return completer.future.then((v) {
+      fail("unreachable");
+    }, onError: (e, s) {
+      expect(e, 87);
+    });
+  });
+
+  test("singleCompleteTimeoutFirst", () {
+    Completer completer = new Completer.sync();
+    SendPort p = singleCompletePort(completer,
+                                    timeout: MS * 100,
+                                    onTimeout: () => 37);
+    new Timer(MS * 500, () => p.send(42));
+    return completer.future.then((v) {
+      expect(v, 37);
+    });
+  });
+}
+
+void testSingleResponseFuture() {
+  test("singleResponseFutureValue", () {
+    return singleResponseFuture((SendPort p) {
+      p.send(42);
+    }).then((v) {
+      expect(v, 42);
+    });
+  });
+
+  test("singleResponseFutureValueFirst", () {
+    return singleResponseFuture((SendPort p) {
+      p.send(42);
+      p.send(37);
+    }).then((v) {
+      expect(v, 42);
+    });
+  });
+
+  test("singleResponseFutureError", () {
+    return singleResponseFuture((SendPort p) {
+      throw 93;
+    }).then((v) {
+      fail("unreachable");
+    }, onError: (e, s) {
+      expect(e, 93);
+    });
+  });
+
+  test("singleResponseFutureTimeout", () {
+    return singleResponseFuture((SendPort p) {
+      // no-op.
+    }, timeout: MS * 100).then((v) {
+      expect(v, null);
+    });
+  });
+
+  test("singleResponseFutureTimeoutValue", () {
+    return singleResponseFuture((SendPort p) {
+      // no-op.
+    }, timeout: MS * 100, timeoutValue: 42).then((v) {
+      expect(v, 42);
+    });
+  });
+}
+
+void testSingleResultFuture() {
+  test("singleResultFutureValue", () {
+    return singleResultFuture((SendPort p) {
+      sendFutureResult(new Future.value(42), p);
+    }).then((v) {
+      expect(v, 42);
+    });
+  });
+
+  test("singleResultFutureValueFirst", () {
+    return singleResultFuture((SendPort p) {
+      sendFutureResult(new Future.value(42), p);
+      sendFutureResult(new Future.value(37), p);
+    }).then((v) {
+      expect(v, 42);
+    });
+  });
+
+  test("singleResultFutureError", () {
+    return singleResultFuture((SendPort p) {
+      sendFutureResult(new Future.error(94), p);
+    }).then((v) {
+      fail("unreachable");
+    }, onError: (e, s) {
+      expect(e is RemoteError, isTrue);
+    });
+  });
+
+  test("singleResultFutureErrorFirst", () {
+    return singleResultFuture((SendPort p) {
+      sendFutureResult(new Future.error(95), p);
+      sendFutureResult(new Future.error(96), p);
+    }).then((v) {
+      fail("unreachable");
+    }, onError: (e, s) {
+      expect(e is RemoteError, isTrue);
+    });
+  });
+
+  test("singleResultFutureError", () {
+    return singleResultFuture((SendPort p) {
+      throw 93;
+    }).then((v) {
+      fail("unreachable");
+    }, onError: (e, s) {
+      expect(e is RemoteError, isTrue);
+    });
+  });
+
+  test("singleResultFutureTimeout", () {
+    return singleResultFuture((SendPort p) {
+      // no-op.
+    }, timeout: MS * 100).then((v) {
+      fail("unreachable");
+    }, onError: (e, s) {
+      expect(e is TimeoutException, isTrue);
+    });
+  });
+
+  test("singleResultFutureTimeoutValue", () {
+    return singleResultFuture((SendPort p) {
+      // no-op.
+    }, timeout: MS * 100, onTimeout: () => 42).then((v) {
+      expect(v, 42);
+    });
+  });
+
+  test("singleResultFutureTimeoutError", () {
+    return singleResultFuture((SendPort p) {
+      // no-op.
+    }, timeout: MS * 100, onTimeout: () => throw 97).then((v) {
+      expect(v, 42);
+    }, onError: (e, s) {
+      expect(e, 97);
+    });
+  });
+}
diff --git a/test/registry_test.dart b/test/registry_test.dart
new file mode 100644
index 0000000..5e95522
--- /dev/null
+++ b/test/registry_test.dart
@@ -0,0 +1,539 @@
+// Copyright (c) 2015, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library dart.pkg.isolate.test.registry;
+
+import "package:isolate/isolaterunner.dart";
+import "package:isolate/registry.dart";
+import "dart:async";
+import "dart:isolate";
+
+import "package:unittest/unittest.dart";
+
+const MS = const Duration(milliseconds: 1);
+
+void main() {
+  testLookup();
+  testAddLookup();
+  testAddRemoveTags();
+  testRemove();
+  testCrossIsolate();
+  testTimeout();
+  testMultiRegistry();
+  testObjectsAndTags();
+}
+
+class Oddity {
+  static const int EVEN = 0;
+  static const int ODD = 1;
+}
+
+Future<List> waitAll(int n, Future action(int n)) {
+  return Future.wait(new Iterable.generate(n, action));
+}
+
+void testLookup() {
+
+  test("lookupAll", () {
+    RegistryManager regman = new RegistryManager();
+    Registry registry = regman.registry;
+    return waitAll(10, (i) {
+      var element = new Element(i);
+      var tag = i.isEven ? Oddity.EVEN : Oddity.ODD;
+      return registry.add(element, tags: [tag]);
+    })
+    .then((_) => registry.lookup())
+    .then((all) {
+      expect(all.length, 10);
+      expect(all.map((v) => v.id).toList()..sort(),
+             [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
+    })
+    .then((_) { regman.close(); });
+  });
+
+  test("lookupOdd", () {
+    RegistryManager regman = new RegistryManager();
+    Registry registry = regman.registry;
+    return waitAll(10, (i) {
+      var element = new Element(i);
+      var tag = i.isEven ? Oddity.EVEN : Oddity.ODD;
+      return registry.add(element, tags: [tag]);
+    })
+    .then((_) =>registry.lookup(tags:[Oddity.ODD]))
+    .then((all) {
+      expect(all.length, 5);
+      expect(all.map((v) => v.id).toList()..sort(),
+             [1, 3, 5, 7, 9]);
+    })
+    .then((_) { regman.close(); });
+  });
+
+  test("lookupMax", () {
+    RegistryManager regman = new RegistryManager();
+    Registry registry = regman.registry;
+    return waitAll(10, (i) {
+      var element = new Element(i);
+      var tag = i.isEven ? Oddity.EVEN : Oddity.ODD;
+      return registry.add(element, tags: [tag]);
+    })
+    .then((_) => registry.lookup(max: 5))
+    .then((all) {
+      expect(all.length, 5);
+    })
+    .then((_) { regman.close(); });
+  });
+
+  test("lookupMultiTag", () {
+    RegistryManager regman = new RegistryManager();
+    Registry registry = regman.registry;
+    return waitAll(25, (i) {
+      var element = new Element(i);
+      // Collect all numbers dividing i.
+      var tags = [i];
+      for (int j = 2; j < 25; j++) {
+        if (i % j == 0) tags.add(j);
+      }
+      return registry.add(element, tags: tags);
+    })
+    .then((_) => registry.lookup(tags: [2, 3]))
+    .then((all) {
+      expect(all.length, 5);
+      expect(all.map((v) => v.id).toList()..sort(),
+             [0, 6, 12, 18, 24]);
+    })
+    .then((_) { regman.close(); });
+  });
+
+  test("lookupMultiTagMax", () {
+    RegistryManager regman = new RegistryManager();
+    Registry registry = regman.registry;
+    return waitAll(25, (i) {
+      var element = new Element(i);
+      // Collect all numbers dividing i.
+      var tags = [i];
+      for (int j = 2; j < 25; j++) {
+        if (i % j == 0) tags.add(j);
+      }
+      return registry.add(element, tags: tags);
+    })
+    .then((_) => registry.lookup(tags: [2, 3], max: 3))
+    .then((all) {
+      expect(all.length, 3);
+      expect(all.every((v) => (v.id % 6) == 0), isTrue);
+    })
+    .then((_) { regman.close(); });
+  });
+}
+
+void testAddLookup() {
+  test("Add-lookup-identical", () {
+    RegistryManager regman = new RegistryManager();
+    Registry registry = regman.registry;
+    var object = new Object();
+    return registry.add(object).then((_) {
+      return registry.lookup().then((entries) {
+        expect(entries, hasLength(1));
+        expect(entries.first, same(object));
+      });
+    }).whenComplete(regman.close);
+  });
+
+  test("Add-multiple-identical", () {
+    RegistryManager regman = new RegistryManager();
+    Registry registry = regman.registry;
+    var object1 = new Object();
+    var object2 = new Object();
+    var object3 = new Object();
+    var objects = [object1, object2, object3];
+    return Future.wait(objects.map(registry.add))
+                 .then((_) {
+      return registry.lookup().then((entries) {
+        expect(entries, hasLength(3));
+        for (var entry in entries) {
+          expect(entry, isIn(objects));
+        }
+      });
+    }).whenComplete(regman.close);
+  });
+
+  test("Add-twice", () {
+    RegistryManager regman = new RegistryManager();
+    Registry registry = regman.registry;
+    var object = new Object();
+    return registry.add(object).then((_) {
+      return registry.add(object).then((_) {
+        fail("Unreachable");
+      }, onError: (e, s) {
+        expect(e, isStateError);
+      });
+    }).whenComplete(regman.close);
+  });
+
+  test("Add-lookup-add-lookup", () {
+    RegistryManager regman = new RegistryManager();
+    Registry registry = regman.registry;
+    var object = new Object();
+    var object2 = new Object();
+    return registry.add(object).then((_) {
+      return registry.lookup();
+    }).then((entries) {
+      expect(entries, hasLength(1));
+      expect(entries.first, same(object));
+      return registry.add(object2);
+    }).then((_) {
+      return registry.lookup().then((entries) {
+        expect(entries, hasLength(2));
+        var entry1 = entries.first;
+        var entry2 = entries.last;
+        if (object == entry1) {
+          expect(entry2, same(object2));
+        } else {
+          expect(entry1, same(object));
+        }
+      });
+    }).whenComplete(regman.close);
+  });
+
+  test("lookup-add-lookup", () {
+    RegistryManager regman = new RegistryManager();
+    Registry registry = regman.registry;
+    var object = new Object();
+    return registry.lookup().then((entries) {
+      expect(entries, isEmpty);
+      return registry.add(object);
+    }).then((_) {
+      return registry.lookup();
+    }).then((entries) {
+      expect(entries, hasLength(1));
+      expect(entries.first, same(object));
+    }).whenComplete(regman.close);
+  });
+
+  test("Add-multiple-tags", () {
+    RegistryManager regman = new RegistryManager();
+    Registry registry = regman.registry;
+    var object1 = new Object();
+    var object2 = new Object();
+    var object3 = new Object();
+    return registry.add(object1, tags: [1, 3, 5, 7]).then((_){
+      return registry.add(object2, tags: [2, 3, 6, 7]);
+    }).then((_) {
+      return registry.add(object3, tags: [4, 5, 6, 7]);
+    }).then((_) {
+      return registry.lookup(tags: [3]).then((entries) {
+        expect(entries, hasLength(2));
+        expect(entries.first == object1 || entries.last == object1, isTrue);
+        expect(entries.first == object2 || entries.last == object2, isTrue);
+      });
+    }).then((_) {
+      return registry.lookup(tags: [2]).then((entries) {
+        expect(entries, hasLength(1));
+        expect(entries.first, same(object2));
+      });
+    }).then((_) {
+      return registry.lookup(tags: [3, 6]).then((entries) {
+        expect(entries, hasLength(1));
+        expect(entries.first, same(object2));
+      });
+    }).whenComplete(regman.close);
+  });
+}
+
+void testRemove() {
+  test("Add-remove", () {
+    RegistryManager regman = new RegistryManager();
+    Registry registry = regman.registry;
+    var object = new Object();
+    return registry.add(object).then((removeCapability) {
+      return registry.lookup().then((entries) {
+        expect(entries, hasLength(1));
+        expect(entries.first, same(object));
+        return registry.remove(object, removeCapability);
+      }).then((removeSuccess) {
+        expect(removeSuccess, isTrue);
+        return registry.lookup();
+      }).then((entries) {
+        expect(entries, isEmpty);
+      });
+    }).whenComplete(regman.close);
+  });
+
+  test("Add-remove-fail", () {
+    RegistryManager regman = new RegistryManager();
+    Registry registry = regman.registry;
+    var object = new Object();
+    return registry.add(object).then((removeCapability) {
+      return registry.lookup().then((entries) {
+        expect(entries, hasLength(1));
+        expect(entries.first, same(object));
+        return registry.remove(object, new Capability());
+      }).then((removeSuccess) {
+        expect(removeSuccess, isFalse);
+      });
+    }).whenComplete(regman.close);
+  });
+}
+
+void testAddRemoveTags() {
+  test("Add-remove-tag", () {
+    RegistryManager regman = new RegistryManager();
+    Registry registry = regman.registry;
+    var object = new Object();
+    return registry.add(object).then((removeCapability) {
+      return registry.lookup(tags: ["x"]);
+    }).then((entries) {
+      expect(entries, isEmpty);
+      return registry.addTags([object], ["x"]);
+    }).then((_) {
+      return registry.lookup(tags: ["x"]);
+    }).then((entries) {
+      expect(entries, hasLength(1));
+      expect(entries.first, same(object));
+      return registry.removeTags([object], ["x"]);
+    }).then((_) {
+      return registry.lookup(tags: ["x"]);
+    }).then((entries) {
+      expect(entries, isEmpty);
+    }).whenComplete(regman.close);
+  });
+
+  test("Tag-twice", () {
+    RegistryManager regman = new RegistryManager();
+    Registry registry = regman.registry;
+    var object = new Object();
+    return registry.add(object, tags: ["x"]).then((removeCapability) {
+      return registry.lookup(tags: ["x"]);
+    }).then((entries) {
+      expect(entries, hasLength(1));
+      expect(entries.first, same(object));
+      // Adding the same tag twice is allowed, but does nothing.
+      return registry.addTags([object], ["x"]);
+    }).then((_) {
+      return registry.lookup(tags: ["x"]);
+    }).then((entries) {
+      expect(entries, hasLength(1));
+      expect(entries.first, same(object));
+      // Removing the tag once is enough to remove it.
+      return registry.removeTags([object], ["x"]);
+    }).then((_) {
+      return registry.lookup(tags: ["x"]);
+    }).then((entries) {
+      expect(entries, isEmpty);
+    }).whenComplete(regman.close);
+  });
+
+  test("Add-remove-multiple", () {
+    RegistryManager regman = new RegistryManager();
+    Registry registry = regman.registry;
+    var object1 = new Object();
+    var object2 = new Object();
+    var object3 = new Object();
+    var objects = [object1, object2, object3];
+    return Future.wait(objects.map(registry.add)).then((_){
+      return registry.addTags([object1, object2], ["x", "y"]);
+    }).then((_) {
+      return registry.addTags([object1, object3], ["z", "w"]);
+    }).then((_) {
+      return registry.lookup(tags: ["x", "z"]);
+    }).then((entries) {
+      expect(entries, hasLength(1));
+      expect(entries.first, same(object1));
+      return registry.removeTags([object1, object2], ["x", "z"]);
+    }).then((_) {
+      return registry.lookup(tags: ["z"]);
+    }).then((entries) {
+      expect(entries, hasLength(1));
+      expect(entries.first, same(object3));
+    }).whenComplete(regman.close);
+  });
+
+  test("Remove-wrong-object", () {
+    RegistryManager regman = new RegistryManager();
+    Registry registry = regman.registry;
+    expect(() => registry.removeTags([new Object()], ["x"]),
+           throws);
+    regman.close();
+  });
+}
+
+var _regmen = {};
+Registry createRegMan(id) {
+  var regman = new RegistryManager();
+  _regmen[id] = regman;
+  return regman.registry;
+}
+void closeRegMan(id) {
+  _regmen.remove(id).close();
+}
+
+void testCrossIsolate() {
+  var object = new Object();
+  test("regman-other-isolate", () {
+    // Add, lookup and remove object in other isolate.
+    return IsolateRunner.spawn().then((isolate) {
+      isolate.run(createRegMan, 1).then((registry) {
+        return registry.add(object, tags: ["a", "b"])
+                       .then((removeCapability) {
+          return registry.lookup(tags: ["a"]).then((entries) {
+            expect(entries, hasLength(1));
+            expect(entries.first, same(object));
+            return registry.remove(entries.first, removeCapability);
+          }).then((removeSuccess) {
+            expect(removeSuccess, isTrue);
+          });
+        });
+      }).whenComplete(() {
+        return isolate.run(closeRegMan, 1);
+      }).whenComplete(() {
+        return isolate.close();
+      });
+    });
+  });
+}
+
+void testTimeout() {
+  test("Timeout-add", () {
+    RegistryManager regman = new RegistryManager(timeout: MS * 500);
+    Registry registry = regman.registry;
+    regman.close();
+    return registry.add(new Object()).then((_) {
+      fail("unreachable");
+    }, onError: (e, s) {
+      expect(e is TimeoutException, isTrue);
+    });
+  });
+
+  test("Timeout-remove", () {
+    RegistryManager regman = new RegistryManager(timeout: MS * 500);
+    Registry registry = regman.registry;
+    var object = new Object();
+    return registry.add(object).then((rc) {
+      regman.close();
+      return registry.remove(object, rc).then((_) {
+        fail("unreachable");
+      }, onError: (e, s) {
+        expect(e is TimeoutException, isTrue);
+      });
+    });
+  });
+
+  test("Timeout-addTags", () {
+    RegistryManager regman = new RegistryManager(timeout: MS * 500);
+    Registry registry = regman.registry;
+    var object = new Object();
+    return registry.add(object).then((rc) {
+      regman.close();
+      return registry.addTags([object], ["x"]).then((_) {
+        fail("unreachable");
+      }, onError: (e, s) {
+        expect(e is TimeoutException, isTrue);
+      });
+    });
+  });
+
+  test("Timeout-removeTags", () {
+    RegistryManager regman = new RegistryManager(timeout: MS * 500);
+    Registry registry = regman.registry;
+    var object = new Object();
+    return registry.add(object).then((rc) {
+      regman.close();
+      return registry.removeTags([object], ["x"]).then((_) {
+        fail("unreachable");
+      }, onError: (e, s) {
+        expect(e is TimeoutException, isTrue);
+      });
+    });
+  });
+
+  test("Timeout-lookup", () {
+    RegistryManager regman = new RegistryManager(timeout: MS * 500);
+    Registry registry = regman.registry;
+    regman.close();
+    registry.lookup().then((_) {
+      fail("unreachable");
+    }, onError: (e, s) {
+      expect(e is TimeoutException, isTrue);
+    });
+  });
+}
+
+void testMultiRegistry() {
+  test("dual-registyr", () {
+    RegistryManager regman = new RegistryManager();
+    Registry registry1 = regman.registry;
+    Registry registry2 = regman.registry;
+    var l1 = ["x"];
+    var l2;
+    return registry1.add(l1, tags: ["y"]).then((removeCapability) {
+      return registry2.lookup().then((entries) {
+        expect(entries, hasLength(1));
+        l2 = entries.first;
+        expect(l2, equals(l1));
+        // The object for registry2 is not idential the one for registry1.
+        expect(!identical(l1, l2), isTrue);
+        // Removeing the registry1 object through registry2 doesn't work.
+        return registry2.remove(l1, removeCapability);
+      }).then((removeSuccess) {
+        expect(removeSuccess, isFalse);
+        return registry2.remove(l2, removeCapability);
+      }).then((removeSuccess) {
+        expect(removeSuccess, isTrue);
+        return registry1.lookup();
+      }).then((entries) {
+        expect(entries, isEmpty);
+      });
+    }).whenComplete(regman.close);
+  });
+}
+
+void testObjectsAndTags() {
+  testObject(object) {
+    String name = "Transfer-${object.runtimeType}";
+    test(name, () {
+      RegistryManager regman = new RegistryManager();
+      Registry registry1 = regman.registry;
+      Registry registry2 = regman.registry;
+      return registry1.add(object, tags: [object]).then((removeCapability) {
+        return registry2.lookup().then((entries) {
+          expect(entries, hasLength(1));
+          expect(entries.first, equals(object));
+          return registry2.lookup(tags: [object]);
+        }).then((entries) {
+          expect(entries, hasLength(1));
+          expect(entries.first, equals(object));
+          return registry2.removeTags([entries.first], [object]);
+        }).then((_) {
+          return registry2.lookup();
+        }).then((entries) {
+          expect(entries, hasLength(1));
+          expect(entries.first, equals(object));
+          return registry2.remove(entries.first, removeCapability);
+        }).then((removeSuccess) {
+          expect(removeSuccess, isTrue);
+          return registry2.lookup();
+        }).then((entries) {
+          expect(entries, isEmpty);
+        });
+      }).whenComplete(regman.close);
+    });
+  }
+  // Test objects that are sendable between equivalent isolates and
+  // that has an operator== that works after cloning (for use as tags).
+  testObject(42);
+  testObject(3.14);
+  testObject("string");
+  testObject(true);
+  testObject(null);
+  testObject(new Element(42));
+  testObject(#symbol);
+  testObject(#_privateSymbol);
+  testObject(new Capability());
+}
+
+class Element {
+  final int id;
+  Element(this.id);
+  int get hashCode => id;
+  bool operator==(Object other) => other is Element && id == other.id;
+}