Update Isolate package.
No longer contains FunctionRef class since implementations can now send
functions.
Enable error/done listening on the isolate now that the VM implements it.
Add a new abstraction to ports.dart, the "SingleResponseChannel", which is like
the other features in that it creates a Future and a SendPort and forwards the
first value from the SendPort to the Future. This one exposes the two values as
an object with two getters instead of returning one and calling a callback with
the other.
Clean up some formatting suggested by formatter (but far from all).
R=sgjesse@google.com
Review URL: https://codereview.chromium.org//1012053002
diff --git a/example/http-server.dart b/example/http-server.dart
index fc3b442..00aa66b 100644
--- a/example/http-server.dart
+++ b/example/http-server.dart
@@ -36,24 +36,20 @@
});
}
-/**
- * An [HttpRequest] handler setup. Gets called when with the server, and
- * is told when to stop listening.
- *
- * These callbacks allow the listener to set up handlers for HTTP requests.
- * The object should be sendable to an equivalent isolate.
- */
+/// An [HttpRequest] handler setup. Gets called when with the server, and
+/// is told when to stop listening.
+///
+/// These callbacks allow the listener to set up handlers for HTTP requests.
+/// The object should be sendable to an equivalent isolate.
abstract class HttpListener {
Future start(HttpServer server);
Future stop();
}
-/**
- * An [HttpListener] that sets itself up as an echo server.
- *
- * Returns the message content plus an ID describing the isolate that
- * handled the request.
- */
+/// An [HttpListener] that sets itself up as an echo server.
+///
+/// Returns the message content plus an ID describing the isolate that
+/// handled the request.
class EchoHttpListener implements HttpListener {
StreamSubscription _subscription;
static int _id = new Object().hashCode;
@@ -68,7 +64,7 @@
_counter.send(null);
print("Request to $_id");
request.response.write("#$_id\n");
- var t0 = new DateTime.now().add(new Duration(seconds:2));
+ var t0 = new DateTime.now().add(new Duration(seconds: 2));
while (new DateTime.now().isBefore(t0));
print("Response from $_id");
request.response.close();
diff --git a/example/runner-pool.dart b/example/runner-pool.dart
index e81e496..a14e06d 100644
--- a/example/runner-pool.dart
+++ b/example/runner-pool.dart
@@ -8,7 +8,6 @@
import "package:isolate/isolaterunner.dart";
import "dart:async" show Future, Completer;
-
void main() {
int N = 44;
var sw = new Stopwatch()..start();
diff --git a/lib/isolate.dart b/lib/isolate.dart
index 9e838b8..b0a9a31 100644
--- a/lib/isolate.dart
+++ b/lib/isolate.dart
@@ -2,9 +2,7 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-/**
- * Utilities for working with isolates and isolate communication.
- */
+/// Utilities for working with isolates and isolate communication.
library dart.pkg.isolate;
export "isolaterunner.dart";
diff --git a/lib/isolaterunner.dart b/lib/isolaterunner.dart
index 7e15a40..795a10f 100644
--- a/lib/isolaterunner.dart
+++ b/lib/isolaterunner.dart
@@ -8,104 +8,95 @@
import "dart:async";
import "runner.dart";
import "ports.dart";
-import "src/functionref.dart";
import "src/lists.dart";
// Command tags. Shared between IsolateRunner and IsolateRunnerRemote.
const int _SHUTDOWN = 0;
const int _RUN = 1;
-/**
- * An easier to use interface on top of an [Isolate].
- *
- * Wraps an `Isolate` and allows pausing, killing and inspecting
- * the isolate more conveniently than the raw `Isolate` methods.
- *
- * Also allows running simple functions in the other isolate, and get back
- * the result.
- */
+/// An easier to use interface on top of an [Isolate].
+///
+/// Wraps an `Isolate` and allows pausing, killing and inspecting
+/// the isolate more conveniently than the raw `Isolate` methods.
+///
+/// Also allows running simple functions in the other isolate, and get back
+/// the result.
class IsolateRunner implements Runner {
- /** The underlying [Isolate] object of the isolate being controlled. */
+ /// The underlying [Isolate] object of the isolate being controlled.
final Isolate isolate;
- /** Command port for the [IsolateRunnerRemote]. */
+ /// Command port for the [IsolateRunnerRemote].
final SendPort _commandPort;
- /** Future returned by [onExit]. Set when [onExit] is first read. */
+ /// Future returned by [onExit]. Set when [onExit] is first read.
Future _onExitFuture;
- /**
- * Create an [IsolateRunner] wrapper for [isolate]
- *
- * The preferred way to create an `IsolateRunner` is to use [spawn]
- * to create a new isolate and a runner for it.
- *
- * This constructor allows creating a runner for an already existing
- * isolate.
- * The [commandPort] must be the [IsolateRunnerRemote.commandPort] of
- * a remote running in that isolate.
- */
+ /// Create an [IsolateRunner] wrapper for [isolate]
+ ///
+ /// The preferred way to create an `IsolateRunner` is to use [spawn]
+ /// to create a new isolate and a runner for it.
+ ///
+ /// This constructor allows creating a runner for an already existing
+ /// isolate.
+ /// The [commandPort] must be the [IsolateRunnerRemote.commandPort] of
+ /// a remote running in that isolate.
IsolateRunner(this.isolate, SendPort commandPort)
: _commandPort = commandPort;
- /**
- * Create a new [Isolate], as by [Isolate.spawn] and wrap that.
- *
- * The returned [IsolateRunner] forwards operations to the new isolate,
- * and keeps a port open in the new isolate that receives commands
- * from the `IsolateRunner`. Remember to [close] the `IsolateRunner` when
- * it's no longer needed.
- */
- static Future<IsolateRunner> spawn() {
- Completer portCompleter = new Completer.sync();
- SendPort initPort = singleCompletePort(portCompleter);
- return Isolate.spawn(IsolateRunnerRemote._create, initPort)
- .then((Isolate isolate) {
- // TODO: Add when VM supports it.
- // isolate.setErrorsFatal(false);
- return portCompleter.future.then((SendPort commandPort) {
- var result = new IsolateRunner(isolate, commandPort);
- // Guarantees that setErrorsFatal has completed.
- return result.ping().then((_) => result);
- });
- });
+ /// Create a new [Isolate], as by [Isolate.spawn] and wrap that.
+ ///
+ /// The returned [IsolateRunner] forwards operations to the new isolate,
+ /// and keeps a port open in the new isolate that receives commands
+ /// from the `IsolateRunner`. Remember to [close] the `IsolateRunner` when
+ /// it's no longer needed.
+ ///
+ /// The created isolate is set to have errors not be fatal.
+ static Future<IsolateRunner> spawn() async {
+ var channel = new SingleResponseChannel();
+ var isolate = await Isolate.spawn(IsolateRunnerRemote._create,
+ channel.port);
+ // The runner can be used to run multiple independent functions.
+ // An accidentally uncaught error shouldn't ruin it for everybody else.
+ isolate.setErrorsFatal(false);
+ var pingChannel = new SingleResponseChannel();
+ isolate.ping(pingChannel.port);
+ var commandPort = await channel.result;
+ var result = new IsolateRunner(isolate, commandPort);
+ // Guarantees that setErrorsFatal has completed.
+ await pingChannel.result;
+ return result;
}
- /**
- * Closes the `IsolateRunner` communication down.
- *
- * If the isolate isn't running something else to keep it alive,
- * this will also make the isolate shut down.
- *
- * Can be used to create an isolate, use [run] to start a service, and
- * then drop the connection and let the service control the isolate's
- * life cycle.
- */
+ /// Closes the `IsolateRunner` communication down.
+ ///
+ /// If the isolate isn't running something else to keep it alive,
+ /// this will also make the isolate shut down.
+ ///
+ /// Can be used to create an isolate, use [run] to start a service, and
+ /// then drop the connection and let the service control the isolate's
+ /// life cycle.
Future close() {
- Completer portCompleter = new Completer.sync();
- SendPort closePort = singleCallbackPort(portCompleter.complete);
- _commandPort.send(list2(_SHUTDOWN, closePort));
- return portCompleter.future;
+ var channel = new SingleResponseChannel();
+ _commandPort.send(list2(_SHUTDOWN, channel.port));
+ return channel.result;
}
- /**
- * Kills the isolate.
- *
- * Starts by calling [close], but if that doesn't cause the isolate to
- * shut down in a timely manner, as given by [timeout], it follows up
- * with [Isolate.kill], with increasing urgency if necessary.
- *
- * If [timeout] is a zero duration, it goes directly to the most urgent
- * kill.
- *
- * If the isolate is already dead, the returned future will not complete.
- * If that may be the case, use [Future.timeout] on the returned future
- * to take extra action after a while. Example:
- *
- * var f = isolate.kill();
- * f.then((_) => print('Dead')
- * .timeout(new Duration(...), onTimeout: () => print('No response'));
- */
+ /// Kills the isolate.
+ ///
+ /// Starts by calling [close], but if that doesn't cause the isolate to
+ /// shut down in a timely manner, as given by [timeout], it follows up
+ /// with [Isolate.kill], with increasing urgency if necessary.
+ ///
+ /// If [timeout] is a zero duration, it goes directly to the most urgent
+ /// kill.
+ ///
+ /// If the isolate is already dead, the returned future will not complete.
+ /// If that may be the case, use [Future.timeout] on the returned future
+ /// to take extra action after a while. Example:
+ ///
+ /// var f = isolate.kill();
+ /// f.then((_) => print('Dead')
+ /// .timeout(new Duration(...), onTimeout: () => print('No response'));
Future kill({Duration timeout: const Duration(seconds: 1)}) {
Future onExit = singleResponseFuture(isolate.addOnExitListener);
if (Duration.ZERO == timeout) {
@@ -121,105 +112,91 @@
}
}
- /**
- * Queries the isolate on whether it's alive.
- *
- * If the isolate is alive and responding to commands, the
- * returned future completes with `true`.
- *
- * If the other isolate is not alive (like after calling [kill]),
- * or doesn't answer within [timeout] for any other reason,
- * the returned future completes with `false`.
- *
- * Guaranteed to only complete after all previous sent isolate commands
- * (like pause and resume) have been handled.
- * Paused isolates do respond to ping requests.
- */
+ /// Queries the isolate on whether it's alive.
+ ///
+ /// If the isolate is alive and responding to commands, the
+ /// returned future completes with `true`.
+ ///
+ /// If the other isolate is not alive (like after calling [kill]),
+ /// or doesn't answer within [timeout] for any other reason,
+ /// the returned future completes with `false`.
+ ///
+ /// Guaranteed to only complete after all previous sent isolate commands
+ /// (like pause and resume) have been handled.
+ /// Paused isolates do respond to ping requests.
Future<bool> ping({Duration timeout: const Duration(seconds: 1)}) {
- Completer completer = new Completer<bool>();
- SendPort port = singleCompletePort(completer,
- callback: _kTrue,
- timeout: timeout,
- onTimeout: _kFalse);
- isolate.ping(port);
- return completer.future;
+ var channel = new SingleResponseChannel(callback: _kTrue,
+ timeout: timeout,
+ timeoutValue: false);
+ isolate.ping(channel.port);
+ return channel.result;
}
static bool _kTrue(_) => true;
static bool _kFalse() => false;
- /**
- * Pauses the isolate.
- *
- * While paused, no normal messages are processed, and calls to [run] will
- * be delayed until the isolate is resumed.
- *
- * Commands like [kill] and [ping] are still executed while the isolate is
- * paused.
- *
- * If [resumeCapability] is omitted, it defaults to the [isolate]'s
- * [Isolate.pauseCapability].
- *
- * Calling pause more than once with the same `resumeCapability`
- * has no further effect. Only a single call to [resume] is needed
- * to resume the isolate.
- */
+ /// Pauses the isolate.
+ ///
+ /// While paused, no normal messages are processed, and calls to [run] will
+ /// be delayed until the isolate is resumed.
+ ///
+ /// Commands like [kill] and [ping] are still executed while the isolate is
+ /// paused.
+ ///
+ /// If [resumeCapability] is omitted, it defaults to the [isolate]'s
+ /// [Isolate.pauseCapability].
+ ///
+ /// Calling pause more than once with the same `resumeCapability`
+ /// has no further effect. Only a single call to [resume] is needed
+ /// to resume the isolate.
void pause([Capability resumeCapability]) {
if (resumeCapability == null) resumeCapability = isolate.pauseCapability;
isolate.pause(resumeCapability);
}
- /**
- * Resumes after a [pause].
- *
- * If [resumeCapability] is omitted, it defaults to the isolate's
- * [Isolate.pauseCapability].
- *
- * Even if `pause` has been called more than once with the same
- * `resumeCapability`, a single resume call with stop the pause.
- */
+ /// Resumes after a [pause].
+ ///
+ /// If [resumeCapability] is omitted, it defaults to the isolate's
+ /// [Isolate.pauseCapability].
+ ///
+ /// Even if `pause` has been called more than once with the same
+ /// `resumeCapability`, a single resume call with stop the pause.
void resume([Capability resumeCapability]) {
if (resumeCapability == null) resumeCapability = isolate.pauseCapability;
isolate.resume(resumeCapability);
}
- /**
- * Execute `function(argument)` in the isolate and return the result.
- *
- * Sends [function] and [argument] to the isolate, runs the call, and
- * returns the result, whether it returned a value or threw.
- * If the call returns a [Future], the final result of that future
- * will be returned.
- *
- * This works similar to the arguments to [Isolate.spawn], except that
- * it runs in the existing isolate and the return value is returned to
- * the caller.
- *
- * Example:
- *
- * IsolateRunner iso = await IsolateRunner.spawn();
- * try {
- * return await iso.run(heavyComputation, argument);
- * } finally {
- * await iso.close();
- * }
- */
- Future run(function(argument), argument,
- {Duration timeout, onTimeout()}) {
+ /// Execute `function(argument)` in the isolate and return the result.
+ ///
+ /// Sends [function] and [argument] to the isolate, runs the call, and
+ /// returns the result, whether it returned a value or threw.
+ /// If the call returns a [Future], the final result of that future
+ /// will be returned.
+ ///
+ /// This works similar to the arguments to [Isolate.spawn], except that
+ /// it runs in the existing isolate and the return value is returned to
+ /// the caller.
+ ///
+ /// Example:
+ ///
+ /// IsolateRunner iso = await IsolateRunner.spawn();
+ /// try {
+ /// return await iso.run(heavyComputation, argument);
+ /// } finally {
+ /// await iso.close();
+ /// }
+ Future run(function(argument), argument, {Duration timeout, onTimeout()}) {
return singleResultFuture((SendPort port) {
- _commandPort.send(
- list4(_RUN, FunctionRef.from(function), argument, port));
+ _commandPort.send(list4(_RUN, function, argument, port));
}, timeout: timeout, onTimeout: onTimeout);
}
- /**
- * A broadcast stream of uncaught errors from the isolate.
- *
- * When listening on the stream, errors from the isolate will be reported
- * as errors in the stream. Be ready to handle the errors.
- *
- * The stream closes when the isolate shuts down.
- */
+ /// A broadcast stream of uncaught errors from the isolate.
+ ///
+ /// When listening on the stream, errors from the isolate will be reported
+ /// as errors in the stream. Be ready to handle the errors.
+ ///
+ /// The stream closes when the isolate shuts down.
Stream get errors {
StreamController controller;
RawReceivePort port;
@@ -240,60 +217,60 @@
sync: true,
onListen: () {
port = new RawReceivePort(handleError);
- // TODO: When supported, uncomment this.
- // isolate.addErrorListener(port.sendPort);
- // isolate.addOnExitListener(port.sendPort);
- // And remove the send below, which acts as an immediate close.
- port.sendPort.send(null);
+ isolate.addErrorListener(port.sendPort);
+ isolate.addOnExitListener(port.sendPort);
},
onCancel: () {
+ isolate.removeErrorListener(port.sendPort);
+ isolate.removeOnExitListener(port.sendPort);
port.close();
- // this.removeErrorListener(port.sendPort);
- // this.removeOnExitListener(port.sendPort);
port = null;
});
return controller.stream;
}
- /**
- * Waits for the [isolate] to terminate.
- *
- * Completes the returned future when the isolate terminates.
- *
- * If the isolate has already stopped responding to commands,
- * the returned future will never terminate.
- */
+ /// Waits for the [isolate] to terminate.
+ ///
+ /// Completes the returned future when the isolate terminates.
+ ///
+ /// If the isolate has already stopped responding to commands,
+ /// the returned future will be completed after one second,
+ /// using [ping] to check if the isolate is still alive.
Future get onExit {
// TODO(lrn): Is there a way to see if an isolate is dead
// so we can close the receive port for this future?
if (_onExitFuture == null) {
- _onExitFuture = singleResponseFuture(isolate.addOnExitListener);
+ var channel = new SingleResponseChannel();
+ isolate.addOnExitListener(channel.port);
+ _onExitFuture = channel.result;
+ ping().then((bool alive) {
+ if (!alive) {
+ channel.interrupt();
+ _onExitFuture = null;
+ }
+ });
}
return _onExitFuture;
}
}
-/**
- * The remote part of an [IsolateRunner].
- *
- * The `IsolateRunner` sends commands to the controlled isolate through
- * the `IsolateRunnerRemote` [commandPort].
- *
- * Only use this class if you need to set up the isolate manually
- * instead of relying on [IsolateRunner.spawn].
- */
+/// The remote part of an [IsolateRunner].
+///
+/// The `IsolateRunner` sends commands to the controlled isolate through
+/// the `IsolateRunnerRemote` [commandPort].
+///
+/// Only use this class if you need to set up the isolate manually
+/// instead of relying on [IsolateRunner.spawn].
class IsolateRunnerRemote {
final RawReceivePort _commandPort = new RawReceivePort();
IsolateRunnerRemote() {
_commandPort.handler = _handleCommand;
}
- /**
- * The command port that can be used to send commands to this remote.
- *
- * Use this as argument to [new IsolateRunner] if creating the link
- * manually, otherwise it's handled by [IsolateRunner.spawn].
- */
+ /// The command port that can be used to send commands to this remote.
+ ///
+ /// Use this as argument to [new IsolateRunner] if creating the link
+ /// manually, otherwise it's handled by [IsolateRunner.spawn].
SendPort get commandPort => _commandPort.sendPort;
static void _create(SendPort initPort) {
@@ -309,7 +286,7 @@
responsePort.send(null);
return;
case _RUN:
- Function function = command[1].function;
+ Function function = command[1];
var argument = command[2];
SendPort responsePort = command[3];
sendFutureResult(new Future.sync(() => function(argument)),
diff --git a/lib/loadbalancer.dart b/lib/loadbalancer.dart
index fdb5d0b..d71d023 100644
--- a/lib/loadbalancer.dart
+++ b/lib/loadbalancer.dart
@@ -2,9 +2,7 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-/**
- * A load-balancing runner pool.
- */
+/// A load-balancing runner pool.
library dart.pkg.isolate.loadbalancer;
import "runner.dart";
@@ -12,12 +10,10 @@
import "src/lists.dart";
import "dart:async" show Future;
-/**
- * A pool of runners, ordered by load.
- *
- * Keeps a pool of runners,
- * and allows running function through the runner with the lowest current load.
- */
+/// A pool of runners, ordered by load.
+///
+/// Keeps a pool of runners,
+/// and allows running function through the runner with the lowest current load.
class LoadBalancer implements Runner {
// A heap-based priority queue of entries, prioritized by `load`.
// Each entry has its own entry in the queue, for faster update.
@@ -29,9 +25,7 @@
// Whether [stop] has been called.
Future _stopFuture = null;
- /**
- * Create a load balancer for [service] with [size] isolates.
- */
+ /// Create a load balancer for [service] with [size] isolates.
LoadBalancer(Iterable<Runner> runners) : this._(_createEntries(runners));
LoadBalancer._(List<_LoadBalancerEntry> entries)
@@ -42,19 +36,15 @@
}
}
- /**
- * The number of runners currently in the pool.
- */
+ /// The number of runners currently in the pool.
int get length => _length;
- /**
- * Asynchronously create [size] runners and create a `LoadBalancer` of those.
- *
- * This is a helper function that makes it easy to create a `LoadBalancer`
- * with asynchronously created runners, for example:
- *
- * var isolatePool = LoadBalancer.create(10, IsolateRunner.spawn);
- */
+ /// Asynchronously create [size] runners and create a `LoadBalancer` of those.
+ ///
+ /// This is a helper function that makes it easy to create a `LoadBalancer`
+ /// with asynchronously created runners, for example:
+ ///
+ /// var isolatePool = LoadBalancer.create(10, IsolateRunner.spawn);
static Future<LoadBalancer> create(int size, Future<Runner> createRunner()) {
return Future.wait(new Iterable.generate(size, (_) => createRunner()),
cleanUp: (Runner runner) { runner.close(); })
@@ -66,19 +56,17 @@
return new List<_LoadBalancerEntry>.from(entries, growable: false);
}
- /**
- * Execute the command in the currently least loaded isolate.
- *
- * The optional [load] parameter represents the load that the command
- * is causing on the isolate where it runs.
- * The number has no fixed meaning, but should be seen as relative to
- * other commands run in the same load balancer.
- * The `load` must not be negative.
- *
- * If [timeout] and [onTimeout] are provided, they are forwarded to
- * the runner running the function, which will handle a timeout
- * as normal.
- */
+ /// Execute the command in the currently least loaded isolate.
+ ///
+ /// The optional [load] parameter represents the load that the command
+ /// is causing on the isolate where it runs.
+ /// The number has no fixed meaning, but should be seen as relative to
+ /// other commands run in the same load balancer.
+ /// The `load` must not be negative.
+ ///
+ /// If [timeout] and [onTimeout] are provided, they are forwarded to
+ /// the runner running the function, which will handle a timeout
+ /// as normal.
Future run(function(argument), argument, {Duration timeout,
onTimeout(),
int load: 100}) {
@@ -88,22 +76,20 @@
return entry.run(this, load, function, argument, timeout, onTimeout);
}
- /**
- * Execute the same function in the least loaded [count] isolates.
- *
- * This guarantees that the function isn't run twice in the same isolate,
- * so `count` is not allowed to exceed [length].
- *
- * The optional [load] parameter represents the load that the command
- * is causing on the isolate where it runs.
- * The number has no fixed meaning, but should be seen as relative to
- * other commands run in the same load balancer.
- * The `load` must not be negative.
- *
- * If [timeout] and [onTimeout] are provided, they are forwarded to
- * the runners running the function, which will handle any timeouts
- * as normal.
- */
+ /// Execute the same function in the least loaded [count] isolates.
+ ///
+ /// This guarantees that the function isn't run twice in the same isolate,
+ /// so `count` is not allowed to exceed [length].
+ ///
+ /// The optional [load] parameter represents the load that the command
+ /// is causing on the isolate where it runs.
+ /// The number has no fixed meaning, but should be seen as relative to
+ /// other commands run in the same load balancer.
+ /// The `load` must not be negative.
+ ///
+ /// If [timeout] and [onTimeout] are provided, they are forwarded to
+ /// the runners running the function, which will handle any timeouts
+ /// as normal.
List<Future> runMultiple(int count, function(argument), argument,
{Duration timeout,
onTimeout(),
@@ -120,8 +106,8 @@
for (int i = 0; i < count; i++) {
_LoadBalancerEntry entry = _queue[i];
entry.load += load;
- result[i] = entry.run(this, load, function, argument,
- timeout, onTimeout);
+ result[i] =
+ entry.run(this, load, function, argument, timeout, onTimeout);
}
} else {
// Remove the [count] least loaded services and run the
@@ -136,8 +122,8 @@
_LoadBalancerEntry entry = entries[i];
entry.load += load;
_add(entry);
- result[i] = entry.run(this, load, function, argument,
- timeout, onTimeout);
+ result[i] =
+ entry.run(this, load, function, argument, timeout, onTimeout);
}
}
return result;
@@ -145,8 +131,8 @@
Future close() {
if (_stopFuture != null) return _stopFuture;
- _stopFuture = MultiError.waitUnordered(_queue.take(_length)
- .map((e) => e.close()));
+ _stopFuture =
+ MultiError.waitUnordered(_queue.take(_length).map((e) => e.close()));
// Remove all entries.
for (int i = 0; i < _length; i++) _queue[i].queueIndex = -1;
_queue = null;
@@ -154,13 +140,11 @@
return _stopFuture;
}
- /**
- * Place [element] in heap at [index] or above.
- *
- * Put element into the empty cell at `index`.
- * While the `element` has higher priority than the
- * parent, swap it with the parent.
- */
+ /// Place [element] in heap at [index] or above.
+ ///
+ /// Put element into the empty cell at `index`.
+ /// While the `element` has higher priority than the
+ /// parent, swap it with the parent.
void _bubbleUp(_LoadBalancerEntry element, int index) {
while (index > 0) {
int parentIndex = (index - 1) ~/ 2;
@@ -174,13 +158,11 @@
element.queueIndex = index;
}
- /**
- * Place [element] in heap at [index] or above.
- *
- * Put element into the empty cell at `index`.
- * While the `element` has lower priority than either child,
- * swap it with the highest priority child.
- */
+ /// Place [element] in heap at [index] or above.
+ ///
+ /// Put element into the empty cell at `index`.
+ /// While the `element` has lower priority than either child,
+ /// swap it with the highest priority child.
void _bubbleDown(_LoadBalancerEntry element, int index) {
while (true) {
int childIndex = index * 2 + 1; // Left child index.
@@ -203,12 +185,10 @@
element.queueIndex = index;
}
- /**
- * Removes the entry from the queue, but doesn't stop its service.
- *
- * The entry is expected to be either added back to the queue
- * immediately or have its stop method called.
- */
+ /// Removes the entry from the queue, but doesn't stop its service.
+ ///
+ /// The entry is expected to be either added back to the queue
+ /// immediately or have its stop method called.
void _remove(_LoadBalancerEntry entry) {
int index = entry.queueIndex;
if (index < 0) return;
@@ -225,9 +205,7 @@
}
}
- /**
- * Adds entry to the queue.
- */
+ /// Adds entry to the queue.
void _add(_LoadBalancerEntry entry) {
if (_stopFuture != null) throw new StateError("LoadBalancer is stopped");
assert(entry.queueIndex < 0);
@@ -283,10 +261,9 @@
// The service used to send commands to the other isolate.
Runner runner;
- _LoadBalancerEntry(Runner runner)
- : runner = runner;
+ _LoadBalancerEntry(Runner runner) : runner = runner;
- /** Whether the entry is still in the queue. */
+ /// Whether the entry is still in the queue.
bool get inQueue => queueIndex >= 0;
Future run(LoadBalancer balancer, int load, function(argumen), argument,
@@ -301,5 +278,3 @@
int compareTo(_LoadBalancerEntry other) => load - other.load;
}
-
-
diff --git a/lib/ports.dart b/lib/ports.dart
index c8de655..a007ff3 100644
--- a/lib/ports.dart
+++ b/lib/ports.dart
@@ -2,49 +2,45 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-/**
- * Utility functions for setting up ports and sending data.
- *
- * This library contains a number of functions that handle the
- * boiler-plate of setting up a receive port and receiving a
- * single message on the port.
- *
- * There are different functions that offer different ways to
- * handle the incoming message.
- *
- * The simplest function, [singleCallbackPort], takes a callback
- * and returns a port, and then calls the callback for the first
- * message sent on the port.
- *
- * Other functions intercept the returned value and either
- * does something with it, or puts it into a [Future] or [Completer].
- */
+/// Utility functions for setting up ports and sending data.
+///
+/// This library contains a number of functions that handle the
+/// boiler-plate of setting up a receive port and receiving a
+/// single message on the port.
+///
+/// There are different functions that offer different ways to
+/// handle the incoming message.
+///
+/// The simplest function, [singleCallbackPort], takes a callback
+/// and returns a port, and then calls the callback for the first
+/// message sent on the port.
+///
+/// Other functions intercept the returned value and either
+/// does something with it, or puts it into a [Future] or [Completer].
library dart.pkg.isolate.ports;
import "dart:isolate";
import "dart:async";
import "src/lists.dart";
-/**
- * Create a [SendPort] that accepts only one message.
- *
- * The [callback] function is called once, with the first message
- * received by the receive port.
- * All futher messages are ignored.
- *
- * If [timeout] is supplied, it is used as a limit on how
- * long it can take before the message is received. If a
- * message isn't received in time, the `callback` function
- * is called once with the [timeoutValue] instead.
- *
- * Returns the `SendPort` expecting the single message.
- *
- * Equivalent to:
- *
- * (new ReceivePort()
- * ..first.timeout(duration, () => timeoutValue).then(callback))
- * .sendPort
- */
+/// Create a [SendPort] that accepts only one message.
+///
+/// The [callback] function is called once, with the first message
+/// received by the receive port.
+/// All futher messages are ignored.
+///
+/// If [timeout] is supplied, it is used as a limit on how
+/// long it can take before the message is received. If a
+/// message isn't received in time, the `callback` function
+/// is called once with the [timeoutValue] instead.
+///
+/// Returns the `SendPort` expecting the single message.
+///
+/// Equivalent to:
+///
+/// (new ReceivePort()
+/// ..first.timeout(duration, () => timeoutValue).then(callback))
+/// .sendPort
SendPort singleCallbackPort(void callback(response),
{Duration timeout,
var timeoutValue}) {
@@ -66,33 +62,31 @@
return responsePort.sendPort;
}
-/**
- * Create a [SendPort] that accepts only one message.
- *
- * When the first message is received, the [callback] function is
- * called with the message as argument,
- * and the [completer] is completed with the result of that call.
- * All further messages are ignored.
- *
- * If `callback` is omitted, it defaults to an identity function.
- * The `callback` call may return a future, and the completer will
- * wait for that future to complete.
- *
- * If [timeout] is supplied, it is used as a limit on how
- * long it can take before the message is received. If a
- * message isn't received in time, the [onTimeout] is called,
- * and `completer` is completed with the result of that call
- * instead.
- * The [callback] function will not be interrupted by the time-out,
- * as long as the initial message is received in time.
- * If `onTimeout` is omitted, it defaults to completing the `completer` with
- * a [TimeoutException].
- *
- * The [completer] may be a synchronous completer. It is only
- * completed in response to another event, either a port message or a timer.
- *
- * Returns the `SendPort` expecting the single message.
- */
+/// Create a [SendPort] that accepts only one message.
+///
+/// When the first message is received, the [callback] function is
+/// called with the message as argument,
+/// and the [completer] is completed with the result of that call.
+/// All further messages are ignored.
+///
+/// If `callback` is omitted, it defaults to an identity function.
+/// The `callback` call may return a future, and the completer will
+/// wait for that future to complete.
+///
+/// If [timeout] is supplied, it is used as a limit on how
+/// long it can take before the message is received. If a
+/// message isn't received in time, the [onTimeout] is called,
+/// and `completer` is completed with the result of that call
+/// instead.
+/// The [callback] function will not be interrupted by the time-out,
+/// as long as the initial message is received in time.
+/// If `onTimeout` is omitted, it defaults to completing the `completer` with
+/// a [TimeoutException].
+///
+/// The [completer] may be a synchronous completer. It is only
+/// completed in response to another event, either a port message or a timer.
+///
+/// Returns the `SendPort` expecting the single message.
SendPort singleCompletePort(Completer completer,
{callback(message),
Duration timeout,
@@ -125,36 +119,34 @@
if (onTimeout != null) {
completer.complete(new Future.sync(onTimeout));
} else {
- completer.completeError(new TimeoutException("Future not completed",
- timeout));
+ completer.completeError(
+ new TimeoutException("Future not completed", timeout));
}
});
}
return responsePort.sendPort;
}
-/**
- * Creates a [Future], and a [SendPort] that can be used to complete that
- * future.
- *
- * Calls [action] with the response `SendPort`, then waits for someone
- * to send a value on that port
- * The returned `Future` is completed with the value sent on the port.
- *
- * If [action] throws, which it shouldn't,
- * the returned future is completed with that error.
- * Any return value of `action` is ignored, and if it is asynchronous,
- * it should handle its own errors.
- *
- * If [timeout] is supplied, it is used as a limit on how
- * long it can take before the message is received. If a
- * message isn't received in time, the [timeoutValue] used
- * as the returned future's value instead.
- *
- * If you want a timeout on the returned future, it's recommended to
- * use the [timeout] parameter, and not [Future.timeout] on the result.
- * The `Future` method won't be able to close the underlying [ReceivePort].
- */
+/// Creates a [Future], and a [SendPort] that can be used to complete that
+/// future.
+///
+/// Calls [action] with the response `SendPort`, then waits for someone
+/// to send a value on that port
+/// The returned `Future` is completed with the value sent on the port.
+///
+/// If [action] throws, which it shouldn't,
+/// the returned future is completed with that error.
+/// Any return value of `action` is ignored, and if it is asynchronous,
+/// it should handle its own errors.
+///
+/// If [timeout] is supplied, it is used as a limit on how
+/// long it can take before the message is received. If a
+/// message isn't received in time, the [timeoutValue] used
+/// as the returned future's value instead.
+///
+/// If you want a timeout on the returned future, it's recommended to
+/// use the [timeout] parameter, and not [Future.timeout] on the result.
+/// The `Future` method won't be able to close the underlying [ReceivePort].
Future singleResponseFuture(void action(SendPort responsePort),
{Duration timeout,
var timeoutValue}) {
@@ -181,45 +173,46 @@
responsePort.close();
if (timer != null) timer.cancel();
// Delay completion because completer is sync.
- scheduleMicrotask(() { completer.completeError(e, s); });
+ scheduleMicrotask(() {
+ completer.completeError(e, s);
+ });
}
return completer.future;
}
-/**
- * Send the result of a future, either value or error, as a message.
- *
- * The result of [future] is sent on [resultPort] in a form expected by
- * either [receiveFutureResult], [completeFutureResult], or
- * by the port of [singleResultFuture].
- */
+/// Send the result of a future, either value or error, as a message.
+///
+/// The result of [future] is sent on [resultPort] in a form expected by
+/// either [receiveFutureResult], [completeFutureResult], or
+/// by the port of [singleResultFuture].
void sendFutureResult(Future future, SendPort resultPort) {
- future.then((v) { resultPort.send(list1(v)); },
- onError: (e, s) { resultPort.send(list2("$e", "$s")); });
+ future.then(
+ (v) { resultPort.send(list1(v));
+ }, onError: (e, s) {
+ resultPort.send(list2("$e", "$s"));
+ });
}
-/**
- * Creates a [Future], and a [SendPort] that can be used to complete that
- * future.
- *
- * Calls [action] with the response `SendPort`, then waits for someone
- * to send a future result on that port using [sendFutureResult].
- * The returned `Future` is completed with the future result sent on the port.
- *
- * If [action] throws, which it shouldn't,
- * the returned future is completed with that error,
- * unless someone manages to send a message on the port before `action` throws.
- *
- * If [timeout] is supplied, it is used as a limit on how
- * long it can take before the message is received. If a
- * message isn't received in time, the [onTimeout] is called,
- * and the future is completed with the result of that call
- * instead.
- * If `onTimeout` is omitted, it defaults to throwing
- * a [TimeoutException].
- */
+/// Creates a [Future], and a [SendPort] that can be used to complete that
+/// future.
+///
+/// Calls [action] with the response `SendPort`, then waits for someone
+/// to send a future result on that port using [sendFutureResult].
+/// The returned `Future` is completed with the future result sent on the port.
+///
+/// If [action] throws, which it shouldn't,
+/// the returned future is completed with that error,
+/// unless someone manages to send a message on the port before `action` throws.
+///
+/// If [timeout] is supplied, it is used as a limit on how
+/// long it can take before the message is received. If a
+/// message isn't received in time, the [onTimeout] is called,
+/// and the future is completed with the result of that call
+/// instead.
+/// If `onTimeout` is omitted, it defaults to throwing
+/// a [TimeoutException].
Future singleResultFuture(void action(SendPort responsePort),
{Duration timeout,
onTimeout()}) {
@@ -237,11 +230,9 @@
return completer.future;
}
-/**
- * Completes a completer with a message created by [sendFutureResult]
- *
- * The [response] must be a message on the format sent by [sendFutureResult].
- */
+/// Completes a completer with a message created by [sendFutureResult]
+///
+/// The [response] must be a message on the format sent by [sendFutureResult].
void completeFutureResult(var response, Completer completer) {
if (response.length == 2) {
var error = new RemoteError(response[0], response[1]);
@@ -253,12 +244,10 @@
}
-/**
- * Convertes a received message created by [sendFutureResult] to a future
- * result.
- *
- * The [response] must be a message on the format sent by [sendFutureResult].
- */
+/// Convertes a received message created by [sendFutureResult] to a future
+/// result.
+///
+/// The [response] must be a message on the format sent by [sendFutureResult].
Future receiveFutureResult(var response) {
if (response.length == 2) {
var error = new RemoteError(response[0], response[1]);
@@ -267,3 +256,104 @@
var result = response[0];
return new Future.value(result);
}
+
+/// A [Future] and a [SendPort] that can be used to complete the future.
+///
+/// The first value sent to [port] is used to complete the [result].
+/// All following values sent to `port` are ignored.
+class SingleResponseChannel {
+ Zone _zone;
+ final RawReceivePort _receivePort;
+ final Completer _completer;
+ final Function _callback;
+ Timer _timer;
+
+ /// Creates a response channel.
+ ///
+ /// The [result] is completed with the first value sent to [port].
+ ///
+ /// If [callback] is provided, the value sent to [port] is first passed
+ /// to `callback`, and the result of that is used to complete `result`.
+ ///
+ /// If [timeout] is provided, the future is completed after that
+ /// duration if it hasn't received a value from the port earlier.
+ /// If [throwOnTimeout] is true, the the future is completed with a
+ /// [TimeoutException] as an error if it times out.
+ /// Otherwise, if [onTimeout] is provided,
+ /// the future is completed with the result of running `onTimeout()`.
+ /// If `onTimeout` is not provided either,
+ /// the future is completed with `timeoutValue`, which defaults to `null`.
+ SingleResponseChannel({callback(value),
+ Duration timeout,
+ bool throwOnTimeout: false,
+ onTimeout(),
+ var timeoutValue})
+ : _receivePort = new RawReceivePort(),
+ _completer = new Completer.sync(),
+ _callback = callback,
+ _zone = Zone.current {
+ _receivePort.handler = _handleResponse;
+ if (timeout != null) {
+ _timer = new Timer(timeout, () {
+ // Executed as a timer event.
+ _receivePort.close();
+ if (!_completer.isCompleted) {
+ if (throwOnTimeout) {
+ _completer.completeError(new TimeoutException(timeout));
+ } else if (onTimeout != null) {
+ _completer.complete(new Future.sync(onTimeout));
+ } else {
+ _completer.complete(timeoutValue);
+ }
+ }
+ });
+ }
+ }
+
+ /// The port expecting a value that will complete [result].
+ SendPort get port => _receivePort.sendPort;
+
+ /// Future completed by the first value sent to [port].
+ Future get result => _completer.future;
+
+ /// If the channel hasn't completed yet, interrupt it and complete the result.
+ ///
+ /// If the channel hasn't received a value yet, or timed out, it is stopped
+ /// (like by a timeout) and the [SingleResponseChannel.result]
+ /// is completed with [result].
+ void interrupt([result]) {
+ _receivePort.close();
+ _cancelTimer();
+ if (!_completer.isCompleted) {
+ // Not in event tail position, so complete the sync completer later.
+ _completer.complete(new Future.microtask(() => result));
+ }
+ }
+
+ void _cancelTimer() {
+ if (_timer != null) {
+ _timer.cancel();
+ _timer = null;
+ }
+ }
+
+ void _handleResponse(v) {
+ // Executed as a port event.
+ _receivePort.close();
+ _cancelTimer();
+ if (_callback == null) {
+ _completer.complete(v);
+ } else {
+ // The _handleResponse function is the handler of a RawReceivePort.
+ // As such, it runs in the root zone.
+ // The _callback should be run in the original zone, both because it's
+ // what the user expects, and because it may create an error that needs
+ // to be propagated to the original completer. If that completer was
+ // created in a different error zone, an error from the root zone
+ // would become uncaught.
+ _zone.run(() {
+ _completer.complete(new Future.sync(() => _callback(v)));
+ });
+ }
+ }
+}
diff --git a/lib/registry.dart b/lib/registry.dart
index 9e87c11..6b9448c 100644
--- a/lib/registry.dart
+++ b/lib/registry.dart
@@ -2,9 +2,7 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-/**
- * An isolate-compatible object registry and lookup service.
- */
+/// An isolate-compatible object registry and lookup service.
library dart.pkg.isolate.registry;
import "dart:async" show Future, Completer, TimeoutException;
@@ -21,28 +19,26 @@
const int _GET_TAGS = 4;
const int _FIND = 5;
-/**
- * An isolate-compatible object registry.
- *
- * Objects can be stored as elements of a registry,
- * have "tags" assigned to them, and be looked up by tag.
- *
- * A `Registry` object caches objects found using the [lookup]
- * method, or added using [add], and returns the same object every time
- * they are requested.
- * A different `Registry` object that works on the same registry will not
- * preserve the identity of elements
- *
- * It is recommended to only have one `Registry` object working on the
- * same registry in each isolate.
- *
- * When the registry is shared accross isolates, both elements and tags must
- * be sendable between the isolates.
- * Between isolates spawned using [Isolate.spawn] from the same initial
- * isolate, most objectes can be sent.
- * Only simple objects can be sent between isolates originating from different
- * [Isolate.spawnUri] calls.
- */
+/// An isolate-compatible object registry.
+///
+/// Objects can be stored as elements of a registry,
+/// have "tags" assigned to them, and be looked up by tag.
+///
+/// A `Registry` object caches objects found using the [lookup]
+/// method, or added using [add], and returns the same object every time
+/// they are requested.
+/// A different `Registry` object that works on the same registry will not
+/// preserve the identity of elements
+///
+/// It is recommended to only have one `Registry` object working on the
+/// same registry in each isolate.
+///
+/// When the registry is shared accross isolates, both elements and tags must
+/// be sendable between the isolates.
+/// Between isolates spawned using [Isolate.spawn] from the same initial
+/// isolate, most objectes can be sent.
+/// Only simple objects can be sent between isolates originating from different
+/// [Isolate.spawnUri] calls.
class Registry<T> {
// Most operations fail if they haven't received a response for this duration.
final Duration _timeout;
@@ -53,26 +49,22 @@
// also copying the cache.
static Expando _caches = new Expando();
- /**
- * Port for sending command to the central registry mananger.
- */
+ /// Port for sending command to the central registry mananger.
SendPort _commandPort;
- /**
- * Create a registry linked to a [RegistryManager] through [commandPort].
- *
- * In most cases, a registry is created by using the
- * [RegistryManager.registry] getter.
- *
- * If a registry is used between isolates created using [Isolate.spawnUri],
- * the `Registry` object can't be sent between the isolates directly.
- * Instead the [RegistryManager.commandPort] port can be sent and a
- * `Registry` created from the command port using this constructor.
- *
- * The optional [timeout] parameter can be set to the duration
- * this registry should wait before assuming that an operation
- * has failed.
- */
+ /// Create a registry linked to a [RegistryManager] through [commandPort].
+ ///
+ /// In most cases, a registry is created by using the
+ /// [RegistryManager.registry] getter.
+ ///
+ /// If a registry is used between isolates created using [Isolate.spawnUri],
+ /// the `Registry` object can't be sent between the isolates directly.
+ /// Instead the [RegistryManager.commandPort] port can be sent and a
+ /// `Registry` created from the command port using this constructor.
+ ///
+ /// The optional [timeout] parameter can be set to the duration
+ /// this registry should wait before assuming that an operation
+ /// has failed.
Registry.fromPort(SendPort commandPort,
{Duration timeout: const Duration(seconds: 5)})
: _commandPort = commandPort,
@@ -86,11 +78,9 @@
return cache;
}
- /**
- * Check and get the identity of an element.
- *
- * Throws if [element] is not an element in the registry.
- */
+ /// Check and get the identity of an element.
+ ///
+ /// Throws if [element] is not an element in the registry.
int _getId(T element) {
int id = _cache.id(element);
if (id == null) {
@@ -99,21 +89,19 @@
return id;
}
- /**
- * Adds [element] to the registry with the provided tags.
- *
- * Fails if [element] is already in this registry.
- * An object is already in the registry if it has been added using [add],
- * or if it was returned by a [lookup] call on this registry object.
- *
- * Returns a capability that can be used with [remove] to remove
- * the element from the registry again.
- *
- * The [tags] can be used to distinguish some of the elements
- * from other elements. Any object can be used as a tag, as long as
- * it preserves equality when sent through a [SendPort].
- * This makes [Capability] objects a good choice for tags.
- */
+ /// Adds [element] to the registry with the provided tags.
+ ///
+ /// Fails if [element] is already in this registry.
+ /// An object is already in the registry if it has been added using [add],
+ /// or if it was returned by a [lookup] call on this registry object.
+ ///
+ /// Returns a capability that can be used with [remove] to remove
+ /// the element from the registry again.
+ ///
+ /// The [tags] can be used to distinguish some of the elements
+ /// from other elements. Any object can be used as a tag, as long as
+ /// it preserves equality when sent through a [SendPort].
+ /// This makes [Capability] objects a good choice for tags.
Future<Capability> add(T element, {Iterable tags}) {
_RegistryCache cache = _cache;
if (cache.contains(element)) {
@@ -139,16 +127,14 @@
return completer.future;
}
- /**
- * Remove the element from the registry.
- *
- * Returns `true` if removing the element succeeded, or `false` if the
- * elements wasn't in the registry, or if it couldn't be removed.
- *
- * The [removeCapability] must be the same capability returned by [add]
- * when the object was added. If the capability is wrong, the
- * object is not removed, and this function returns false.
- */
+ /// Remove the element from the registry.
+ ///
+ /// Returns `true` if removing the element succeeded, or `false` if the
+ /// elements wasn't in the registry, or if it couldn't be removed.
+ ///
+ /// The [removeCapability] must be the same capability returned by [add]
+ /// when the object was added. If the capability is wrong, the
+ /// object is not removed, and this function returns false.
Future<bool> remove(T element, Capability removeCapability) {
int id = _cache.id(element);
if (id == null) {
@@ -163,31 +149,27 @@
return completer.future;
}
- /**
- * Add tags to an object in the registry.
- *
- * Each element of the registry has a number of tags associated with
- * it. A tag is either associated with an element or not, adding it more
- * than once does not make any difference.
- *
- * Tags are compared using [Object.==] equality.
- *
- * Fails if any of the elements are not in the registry.
- */
+ /// Add tags to an object in the registry.
+ ///
+ /// Each element of the registry has a number of tags associated with
+ /// it. A tag is either associated with an element or not, adding it more
+ /// than once does not make any difference.
+ ///
+ /// Tags are compared using [Object.==] equality.
+ ///
+ /// Fails if any of the elements are not in the registry.
Future addTags(Iterable<T> elements, Iterable tags) {
List ids = elements.map(_getId).toList(growable: false);
return _addTags(ids, tags);
}
- /**
- * Remove tags from an object in the registry.
- *
- * After this operation, the [elements] will not be associated to the [tags].
- * It doesn't matter whether the elements were associated with the tags
- * before or not.
- *
- * Fails if any of the elements are not in the registry.
- */
+ /// Remove tags from an object in the registry.
+ ///
+ /// After this operation, the [elements] will not be associated to the [tags].
+ /// It doesn't matter whether the elements were associated with the tags
+ /// before or not.
+ ///
+ /// Fails if any of the elements are not in the registry.
Future removeTags(Iterable<T> elements, Iterable tags) {
List ids = elements.map(_getId).toList(growable: false);
tags = tags.toList(growable: false);
@@ -205,17 +187,15 @@
return completer.future;
}
- /**
- * Finds a number of elements that have all the desired [tags].
- *
- * If [tags] is omitted or empty, any element of the registry can be
- * returned.
- *
- * If [max] is specified, it must be greater than zero.
- * In that case, at most the first `max` results are returned,
- * in whatever order the registry finds its results.
- * Otherwise all matching elements are returned.
- */
+ /// Finds a number of elements that have all the desired [tags].
+ ///
+ /// If [tags] is omitted or empty, any element of the registry can be
+ /// returned.
+ ///
+ /// If [max] is specified, it must be greater than zero.
+ /// In that case, at most the first `max` results are returned,
+ /// in whatever order the registry finds its results.
+ /// Otherwise all matching elements are returned.
Future<List<T>> lookup({Iterable tags, int max}) {
if (max != null && max < 1) {
throw new RangeError.range(max, 1, null, "max");
@@ -240,12 +220,10 @@
}
}
-/**
- * Isolate-local cache used by a [Registry].
- *
- * Maps between id-numbers and elements.
- * An object is considered an element of the registry if it
- */
+/// Isolate-local cache used by a [Registry].
+///
+/// Maps between id-numbers and elements.
+/// An object is considered an element of the registry if it
class _RegistryCache {
// Temporary marker until an object gets an id.
static const int _BEING_ADDED = -1;
@@ -259,7 +237,7 @@
return result;
}
- Object operator[](int id) => id2object[id];
+ Object operator [](int id) => id2object[id];
// Register a pair of id/object in the cache.
// if the id is already in the cache, just return the existing
@@ -274,7 +252,7 @@
bool isAdding(element) => object2id[element] == _BEING_ADDED;
- void setAdding(element) {
+ void setAdding(element) {
assert(!contains(element));
object2id[element] = _BEING_ADDED;
}
@@ -294,56 +272,46 @@
bool contains(element) => object2id.containsKey(element);
}
-/**
- * The central repository used by distributed [Registry] instances.
- */
+/// The central repository used by distributed [Registry] instances.
class RegistryManager {
final Duration _timeout;
int _nextId = 0;
RawReceivePort _commandPort;
- /**
- * Maps id to entry. Each entry contains the id, the element, its tags,
- * and a capability required to remove it again.
- */
+ /// Maps id to entry. Each entry contains the id, the element, its tags,
+ /// and a capability required to remove it again.
Map<int, _RegistryEntry> _entries = new HashMap();
Map<Object, Set<int>> _tag2id = new HashMap();
- /**
- * Create a new registry managed by the created [RegistryManager].
- *
- * The optional [timeout] parameter can be set to the duration
- * registry objects should wait before assuming that an operation
- * has failed.
- */
+ /// Create a new registry managed by the created [RegistryManager].
+ ///
+ /// The optional [timeout] parameter can be set to the duration
+ /// registry objects should wait before assuming that an operation
+ /// has failed.
RegistryManager({timeout: const Duration(seconds: 5)})
: _timeout = timeout,
_commandPort = new RawReceivePort() {
- _commandPort.handler = _handleCommand;
+ _commandPort.handler = _handleCommand;
}
- /**
- * The command port receiving commands for the registry manager.
- *
- * Use this port with [Registry.fromPort] to link a registry to the
- * manager in isolates where you can't send a [Registry] object directly.
- */
+ /// The command port receiving commands for the registry manager.
+ ///
+ /// Use this port with [Registry.fromPort] to link a registry to the
+ /// manager in isolates where you can't send a [Registry] object directly.
SendPort get commandPort => _commandPort.sendPort;
- /**
- * Get a registry backed by this manager.
- *
- * This registry can be sent to other isolates created using
- * [Isolate.spawn].
- */
- Registry get registry => new Registry.fromPort(_commandPort.sendPort,
- timeout: _timeout);
+ /// Get a registry backed by this manager.
+ ///
+ /// This registry can be sent to other isolates created using
+ /// [Isolate.spawn].
+ Registry get registry =>
+ new Registry.fromPort(_commandPort.sendPort, timeout: _timeout);
// Used as argument to putIfAbsent.
static Set _createSet() => new HashSet();
void _handleCommand(List command) {
- switch(command[0]) {
+ switch (command[0]) {
case _ADD:
_add(command[1], command[2], command[3]);
return;
@@ -477,17 +445,15 @@
replyPort.send(result);
}
- /**
- * Shut down the registry service.
- *
- * After this, all [Registry] operations will time out.
- */
+ /// Shut down the registry service.
+ ///
+ /// After this, all [Registry] operations will time out.
void close() {
_commandPort.close();
}
}
-/** Entry in [RegistryManager]. */
+/// Entry in [RegistryManager].
class _RegistryEntry {
final int id;
final Object element;
diff --git a/lib/runner.dart b/lib/runner.dart
index cd26d5a..4f0a187 100644
--- a/lib/runner.dart
+++ b/lib/runner.dart
@@ -2,45 +2,39 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-/**
- * A [Runner] runs a function, potentially in a different scope
- * or even isolate.
- */
+/// A [Runner] runs a function, potentially in a different scope
+/// or even isolate.
library dart.pkg.isolate.runner;
import "dart:async" show Future;
-/**
- * Calls a function with an argument.
- *
- * The function can be run in a different place from where the `Runner`
- * resides, e.g., in a different isolate.
- */
+/// Calls a function with an argument.
+///
+/// The function can be run in a different place from where the `Runner`
+/// resides, e.g., in a different isolate.
class Runner {
- /**
- * Request that [function] be called with the provided arguments.
- *
- * The arguments will be applied to the function in the same way as by
- * [Function.apply], but it may happen in a diffent isolate or setting.
- *
- * It's necessary that the function can be sent through a [SendPort]
- * if the call is performed in another isolate.
- * That means the other isolate should be created using [Isolate.spawn]
- * so that it is running the same code as the sending isoalte,
- * and the function must be a static or top-level function.
- *
- * Waits for the result of the call, and completes the returned future
- * with the result, whether it's a value or an error.
- *
- * If the returned future does not complete before `timeLimit` has passed,
- * the [onTimeout] action is executed instead, and its result (whether it
- * returns or throws) is used as the result of the returned future.
- *
- * If `onTimeout` is omitted, a timeout will cause the returned future to
- * complete with a [TimeoutException].
- *
- * The default implementation runs the function in the current isolate.
- */
+ /// Request that [function] be called with the provided arguments.
+ ///
+ /// The arguments will be applied to the function in the same way as by
+ /// [Function.apply], but it may happen in a diffent isolate or setting.
+ ///
+ /// It's necessary that the function can be sent through a [SendPort]
+ /// if the call is performed in another isolate.
+ /// That means the other isolate should be created using [Isolate.spawn]
+ /// so that it is running the same code as the sending isoalte,
+ /// and the function must be a static or top-level function.
+ ///
+ /// Waits for the result of the call, and completes the returned future
+ /// with the result, whether it's a value or an error.
+ ///
+ /// If the returned future does not complete before `timeLimit` has passed,
+ /// the [onTimeout] action is executed instead, and its result (whether it
+ /// returns or throws) is used as the result of the returned future.
+ ///
+ /// If `onTimeout` is omitted, a timeout will cause the returned future to
+ /// complete with a [TimeoutException].
+ ///
+ /// The default implementation runs the function in the current isolate.
Future run(function(argument), Object argument,
{Duration timeout, onTimeout()}) {
Future result = new Future.sync(() => function(argument));
@@ -50,12 +44,10 @@
return result;
}
- /**
- * Stop the runner.
- *
- * If the runner has allocated resources, e.g., an isolate, it should
- * be released. No further calls to [run] should be made after calling
- * stop.
- */
+ /// Stop the runner.
+ ///
+ /// If the runner has allocated resources, e.g., an isolate, it should
+ /// be released. No further calls to [run] should be made after calling
+ /// stop.
Future close() => new Future.value();
}
diff --git a/lib/src/errors.dart b/lib/src/errors.dart
index 91abe5b..f724580 100644
--- a/lib/src/errors.dart
+++ b/lib/src/errors.dart
@@ -3,13 +3,11 @@
// BSD-style license that can be found in the LICENSE file.
// TODO(lrn): This should be in package:async?
-/**
- * Helper functions for working with errors.
- *
- * The [MultiError] class combines multiple errors into one object,
- * and the [MultiError.wait] function works like [Future.wait] except
- * that it returns all the errors.
- */
+/// Helper functions for working with errors.
+///
+/// The [MultiError] class combines multiple errors into one object,
+/// and the [MultiError.wait] function works like [Future.wait] except
+/// that it returns all the errors.
library pkg.isolate.errors;
import "dart:async";
@@ -23,29 +21,25 @@
// Minimum number of lines in the toString for each error.
static const int _MIN_LINES_PER_ERROR = 1;
- /** The actual errors. */
+ /// The actual errors.
final List errors;
- /**
- * Create a `MultiError` based on a list of errors.
- *
- * The errors represent errors of a number of individual operations.
- *
- * The list may contain `null` values, if the index of the error in the
- * list is useful.
- */
+ /// Create a `MultiError` based on a list of errors.
+ ///
+ /// The errors represent errors of a number of individual operations.
+ ///
+ /// The list may contain `null` values, if the index of the error in the
+ /// list is useful.
MultiError(this.errors);
- /**
- * Waits for all [futures] to complete, like [Future.wait].
- *
- * Where `Future.wait` only reports one error, even if multiple
- * futures complete with errors, this function will complete
- * with a [MultiError] if more than one future completes with an error.
- *
- * The order of values is not preserved (if that is needed, use
- * [wait]).
- */
+ /// Waits for all [futures] to complete, like [Future.wait].
+ ///
+ /// Where `Future.wait` only reports one error, even if multiple
+ /// futures complete with errors, this function will complete
+ /// with a [MultiError] if more than one future completes with an error.
+ ///
+ /// The order of values is not preserved (if that is needed, use
+ /// [wait]).
static Future<List> waitUnordered(Iterable<Future> futures,
{cleanUp(successResult)}) {
Completer completer;
@@ -95,19 +89,16 @@
return completer.future;
}
- /**
- * Waits for all [futures] to complete, like [Future.wait].
- *
- * Where `Future.wait` only reports one error, even if multiple
- * futures complete with errors, this function will complete
- * with a [MultiError] if more than one future completes with an error.
- *
- * The order of values is preserved, and if any error occurs, the
- * [MultiError.errors] list will have errors in the corresponding slots,
- * and `null` for non-errors.
- */
- Future<List> wait(Iterable<Future> futures,
- {cleanUp(successResult)}) {
+ /// Waits for all [futures] to complete, like [Future.wait].
+ ///
+ /// Where `Future.wait` only reports one error, even if multiple
+ /// futures complete with errors, this function will complete
+ /// with a [MultiError] if more than one future completes with an error.
+ ///
+ /// The order of values is preserved, and if any error occurs, the
+ /// [MultiError.errors] list will have errors in the corresponding slots,
+ /// and `null` for non-errors.
+ Future<List> wait(Iterable<Future> futures, {cleanUp(successResult)}) {
Completer completer;
int count = 0;
bool hasError = false;
@@ -156,7 +147,6 @@
return completer.future;
}
-
String toString() {
StringBuffer buffer = new StringBuffer();
buffer.write("Multiple Errors:\n");
diff --git a/lib/src/functionref.dart b/lib/src/functionref.dart
deleted file mode 100644
index d4c0e8e..0000000
--- a/lib/src/functionref.dart
+++ /dev/null
@@ -1,80 +0,0 @@
-// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-/**
- * Convert top-level or static functions to objects that can be sent to
- * other isolates.
- *
- * This package is only needed until such functions can be sent directly
- * through send-ports.
- */
-library pkg.isolate.functionref;
-
-import "dart:mirrors";
-
-abstract class FunctionRef {
- static FunctionRef from(Function function) {
- var cm = reflect(function);
- if (cm is ClosureMirror) {
- MethodMirror fm = cm.function;
- if (fm.isRegularMethod && fm.isStatic) {
- Symbol functionName = fm.simpleName;
- Symbol className;
- DeclarationMirror owner = fm.owner;
- if (owner is ClassMirror) {
- className = owner.simpleName;
- owner = owner.owner;
- }
- if (owner is LibraryMirror) {
- LibraryMirror ownerLibrary = owner;
- Uri libraryUri = ownerLibrary.uri;
- return new _FunctionRef(libraryUri, className, functionName);
- }
- }
- throw new ArgumentError.value(function, "function",
- "Not a static or top-level function");
- }
- // It's a Function but not a closure, so it's a callable object.
- return new _CallableObjectRef(function);
- }
-
- Function get function;
-}
-
-class _FunctionRef implements FunctionRef {
- final Uri libraryUri;
- final Symbol className;
- final Symbol functionName;
-
- _FunctionRef(this.libraryUri, this.className, this.functionName);
-
- Function get function {
- LibraryMirror lm = currentMirrorSystem().libraries[libraryUri];
- if (lm != null) {
- ObjectMirror owner = lm;
- if (className != null) {
- ClassMirror cm = lm.declarations[className];
- owner = cm;
- }
- if (owner != null) {
- ClosureMirror function = owner.getField(this.functionName);
- if (function != null) return function.reflectee;
- }
- }
- String functionName = MirrorSystem.getName(this.functionName);
- String classQualifier = "";
- if (this.className != null) {
- classQualifier = " in class ${MirrorSystem.getName(this.className)}";
- }
- throw new UnsupportedError(
- "Function $functionName${classQualifier} not found in library $libraryUri"
- );
- }
-}
-
-class _CallableObjectRef implements FunctionRef {
- final Function object;
- _CallableObjectRef(this.object);
- Function get function => object;
-}
diff --git a/lib/src/lists.dart b/lib/src/lists.dart
index 011ecf2..5ed7066 100644
--- a/lib/src/lists.dart
+++ b/lib/src/lists.dart
@@ -2,28 +2,28 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-/** Utility functions to create fixed-length lists. */
+/// Utility functions to create fixed-length lists.
library pkg.isolate.util.lists;
-/** Create a single-element fixed-length list. */
+/// Create a single-element fixed-length list.
List list1(v1) => new List(1)..[0] = v1;
-/** Create a two-element fixed-length list. */
+/// Create a two-element fixed-length list.
List list2(v1, v2) => new List(2)..[0] = v1
..[1] = v2;
-/** Create a three-element fixed-length list. */
+/// Create a three-element fixed-length list.
List list3(v1, v2, v3) => new List(3)..[0] = v1
..[1] = v2
..[2] = v3;
-/** Create a four-element fixed-length list. */
+/// Create a four-element fixed-length list.
List list4(v1, v2, v3, v4) => new List(4)..[0] = v1
..[1] = v2
..[2] = v3
..[3] = v4;
-/** Create a five-element fixed-length list. */
+/// Create a five-element fixed-length list.
List list5(v1, v2, v3, v4, v5) => new List(5)..[0] = v1
..[1] = v2
..[2] = v3
diff --git a/lib/src/multiplexport.dart b/lib/src/multiplexport.dart
index d0c2f67..2711c45 100644
--- a/lib/src/multiplexport.dart
+++ b/lib/src/multiplexport.dart
@@ -2,27 +2,25 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-/**
- * A multiplexing [RawReceivePort].
- *
- * Allows creating a number of [RawReceivePort] implementations that all send
- * messages through the same real `RawReceivePort`.
- *
- * This allows reducing the number of receive ports created, but adds an
- * overhead to each message.
- * If a library creates many short-lived receive ports, multiplexing might be
- * faster.
- *
- * To use multiplexing receive ports, create and store a
- * [RawReceivePortMultiplexer], and create receive ports by calling
- * `multiplexer.createRawReceivePort(handler)` where you would otherwise
- * write `new RawReceivePort(handler)`.
- *
- * Remember to [close] the multiplexer when it is no longer needed.
- * `
- * (TODO: Check if it really is faster - creating a receive port requires a
- * global mutex, so it may be a bottleneck, but it's not clear how slow it is).
- */
+/// A multiplexing [RawReceivePort].
+///
+/// Allows creating a number of [RawReceivePort] implementations that all send
+/// messages through the same real `RawReceivePort`.
+///
+/// This allows reducing the number of receive ports created, but adds an
+/// overhead to each message.
+/// If a library creates many short-lived receive ports, multiplexing might be
+/// faster.
+///
+/// To use multiplexing receive ports, create and store a
+/// [RawReceivePortMultiplexer], and create receive ports by calling
+/// `multiplexer.createRawReceivePort(handler)` where you would otherwise
+/// write `new RawReceivePort(handler)`.
+///
+/// Remember to [close] the multiplexer when it is no longer needed.
+/// `
+/// (TODO: Check if it really is faster - creating a receive port requires a
+/// global mutex, so it may be a bottleneck, but it's not clear how slow it is).
library pkg.isolate.multiplexreceiveport;
import "dart:isolate";
@@ -46,7 +44,9 @@
SendPort get sendPort => _multiplexer._createSendPort(_id);
- void _invokeHandler(message) { _handler(message); }
+ void _invokeHandler(message) {
+ _handler(message);
+ }
}
class _MultiplexSendPort implements SendPort {
@@ -59,10 +59,8 @@
}
}
-/**
- * A shared [RawReceivePort] that distributes messages to
- * [RawReceivePort] instances that it manages.
- */
+/// A shared [RawReceivePort] that distributes messages to
+/// [RawReceivePort] instances that it manages.
class RawReceivePortMultiplexer {
final RawReceivePort _port = new RawReceivePort();
final Map<int, _MultiplexRawReceivePort> _map = new HashMap();
diff --git a/pubspec.yaml b/pubspec.yaml
index 8e2923e..f0b2e2c 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: isolate
-version: 0.1.0
+version: 0.1.1
author: Dart Team <misc@dartlang.org>
description: Utility functions and classes related to the 'dart:isolate' library.
homepage: https://github.com/dart-lang/isolate
diff --git a/test/ports_test.dart b/test/ports_test.dart
index 9586b96..795cdb9 100644
--- a/test/ports_test.dart
+++ b/test/ports_test.dart
@@ -16,6 +16,7 @@
testSingleCompletePort();
testSingleResponseFuture();
testSingleResultFuture();
+ testSingleResponseChannel();
}
void testSingleCallbackPort() {
@@ -48,8 +49,7 @@
test("singleCallbackValueBeforeTimeout", () {
Completer completer = new Completer.sync();
- SendPort p = singleCallbackPort(completer.complete,
- timeout: MS * 500);
+ SendPort p = singleCallbackPort(completer.complete, timeout: MS * 500);
p.send(42);
return completer.future.then((v) {
expect(v, 42);
@@ -58,9 +58,7 @@
test("singleCallbackTimeout", () {
Completer completer = new Completer.sync();
- singleCallbackPort(completer.complete,
- timeout: MS * 100,
- timeoutValue: 37);
+ singleCallbackPort(completer.complete, timeout: MS * 100, timeoutValue: 37);
return completer.future.then((v) {
expect(v, 37);
});
@@ -91,11 +89,10 @@
test("singleCompleteValueCallback", () {
Completer completer = new Completer.sync();
- SendPort p = singleCompletePort(completer,
- callback: (v) {
- expect(42, v);
- return 87;
- });
+ SendPort p = singleCompletePort(completer, callback: (v) {
+ expect(42, v);
+ return 87;
+ });
p.send(42);
return completer.future.then((v) {
expect(v, 87);
@@ -104,12 +101,11 @@
test("singleCompleteValueCallbackFuture", () {
Completer completer = new Completer.sync();
- SendPort p = singleCompletePort(completer,
- callback: (v) {
- expect(42, v);
- return new Future.delayed(MS * 500,
- () => 88);
- });
+ SendPort p = singleCompletePort(completer, callback: (v) {
+ expect(42, v);
+ return new Future.delayed(MS * 500,
+ () => 88);
+ });
p.send(42);
return completer.future.then((v) {
expect(v, 88);
@@ -118,11 +114,10 @@
test("singleCompleteValueCallbackThrows", () {
Completer completer = new Completer.sync();
- SendPort p = singleCompletePort(completer,
- callback: (v) {
- expect(42, v);
- throw 89;
- });
+ SendPort p = singleCompletePort(completer, callback: (v) {
+ expect(42, v);
+ throw 89;
+ });
p.send(42);
return completer.future.then((v) {
fail("unreachable");
@@ -133,11 +128,10 @@
test("singleCompleteValueCallbackThrowsFuture", () {
Completer completer = new Completer.sync();
- SendPort p = singleCompletePort(completer,
- callback: (v) {
- expect(42, v);
- return new Future.error(90);
- });
+ SendPort p = singleCompletePort(completer, callback: (v) {
+ expect(42, v);
+ return new Future.error(90);
+ });
p.send(42);
return completer.future.then((v) {
fail("unreachable");
@@ -159,9 +153,9 @@
test("singleCompleteFirstValueCallback", () {
Completer completer = new Completer.sync();
SendPort p = singleCompletePort(completer, callback: (v) {
- expect(v, 42);
- return 87;
- });
+ expect(v, 42);
+ return 87;
+ });
p.send(42);
p.send(37);
return completer.future.then((v) {
@@ -171,8 +165,7 @@
test("singleCompleteValueBeforeTimeout", () {
Completer completer = new Completer.sync();
- SendPort p = singleCompletePort(completer,
- timeout: MS * 500);
+ SendPort p = singleCompletePort(completer, timeout: MS * 500);
p.send(42);
return completer.future.then((v) {
expect(v, 42);
@@ -181,8 +174,7 @@
test("singleCompleteTimeout", () {
Completer completer = new Completer.sync();
- singleCompletePort(completer,
- timeout: MS * 100);
+ singleCompletePort(completer, timeout: MS * 100);
return completer.future.then((v) {
fail("unreachable");
}, onError: (e, s) {
@@ -192,9 +184,7 @@
test("singleCompleteTimeoutCallback", () {
Completer completer = new Completer.sync();
- singleCompletePort(completer,
- timeout: MS * 100,
- onTimeout: () => 87);
+ singleCompletePort(completer, timeout: MS * 100, onTimeout: () => 87);
return completer.future.then((v) {
expect(v, 87);
});
@@ -202,9 +192,7 @@
test("singleCompleteTimeoutCallbackThrows", () {
Completer completer = new Completer.sync();
- singleCompletePort(completer,
- timeout: MS * 100,
- onTimeout: () => throw 91);
+ singleCompletePort(completer, timeout: MS * 100, onTimeout: () => throw 91);
return completer.future.then((v) {
fail("unreachable");
}, onError: (e, s) {
@@ -260,9 +248,8 @@
test("singleCompleteTimeoutFirst", () {
Completer completer = new Completer.sync();
- SendPort p = singleCompletePort(completer,
- timeout: MS * 100,
- onTimeout: () => 37);
+ SendPort p =
+ singleCompletePort(completer, timeout: MS * 100, onTimeout: () => 37);
new Timer(MS * 500, () => p.send(42));
return completer.future.then((v) {
expect(v, 37);
@@ -392,3 +379,138 @@
});
});
}
+
+void testSingleResponseChannel() {
+ test("singleResponseChannelValue", () {
+ var channel = new SingleResponseChannel();
+ channel.port.send(42);
+ return channel.result.then((v) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleResponseChannelValueFirst", () {
+ var channel = new SingleResponseChannel();
+ channel.port.send(42);
+ channel.port.send(37);
+ return channel.result.then((v) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleResponseChannelValueCallback", () {
+ var channel = new SingleResponseChannel(callback: (v) => v * 2);
+ channel.port.send(42);
+ return channel.result.then((v) {
+ expect(v, 84);
+ });
+ });
+
+ test("singleResponseChannelErrorCallback", () {
+ var channel = new SingleResponseChannel(callback: (v) => throw 42);
+ channel.port.send(37);
+ return channel.result.then((v) { fail("unreachable"); },
+ onError: (v, s) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleResponseChannelAsyncValueCallback", () {
+ var channel = new SingleResponseChannel(
+ callback: (v) => new Future.value(v * 2));
+ channel.port.send(42);
+ return channel.result.then((v) {
+ expect(v, 84);
+ });
+ });
+
+ test("singleResponseChannelAsyncErrorCallback", () {
+ var channel = new SingleResponseChannel(callback:
+ (v) => new Future.error(42));
+ channel.port.send(37);
+ return channel.result.then((v) { fail("unreachable"); },
+ onError: (v, s) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleResponseChannelTimeout", () {
+ var channel = new SingleResponseChannel(timeout: MS * 100);
+ return channel.result.then((v) {
+ expect(v, null);
+ });
+ });
+
+ test("singleResponseChannelTimeoutThrow", () {
+ var channel = new SingleResponseChannel(timeout: MS * 100,
+ throwOnTimeout: true);
+ return channel.result.then((v) { fail("unreachable"); },
+ onError: (v, s) {
+ expect(v is TimeoutException, isTrue);
+ });
+ });
+
+ test("singleResponseChannelTimeoutThrowOnTimeoutAndValue", () {
+ var channel = new SingleResponseChannel(timeout: MS * 100,
+ throwOnTimeout: true,
+ onTimeout: () => 42,
+ timeoutValue: 42);
+ return channel.result.then((v) { fail("unreachable"); },
+ onError: (v, s) {
+ expect(v is TimeoutException, isTrue);
+ });
+ });
+
+ test("singleResponseChannelTimeoutOnTimeout", () {
+ var channel = new SingleResponseChannel(timeout: MS * 100,
+ onTimeout: () => 42);
+ return channel.result.then((v) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleResponseChannelTimeoutOnTimeoutAndValue", () {
+ var channel = new SingleResponseChannel(timeout: MS * 100,
+ onTimeout: () => 42,
+ timeoutValue: 37);
+ return channel.result.then((v) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleResponseChannelTimeoutValue", () {
+ var channel = new SingleResponseChannel(timeout: MS * 100,
+ timeoutValue: 42);
+ return channel.result.then((v) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleResponseChannelTimeoutOnTimeoutError", () {
+ var channel = new SingleResponseChannel(timeout: MS * 100,
+ onTimeout: () => throw 42);
+ return channel.result.then((v) { fail("unreachable"); },
+ onError: (v, s) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleResponseChannelTimeoutOnTimeoutAsync", () {
+ var channel = new SingleResponseChannel(timeout: MS * 100,
+ onTimeout:
+ () => new Future.value(42));
+ return channel.result.then((v) {
+ expect(v, 42);
+ });
+ });
+
+ test("singleResponseChannelTimeoutOnTimeoutAsyncError", () {
+ var channel = new SingleResponseChannel(timeout: MS * 100,
+ onTimeout:
+ () => new Future.error(42));
+ return channel.result.then((v) { fail("unreachable"); },
+ onError: (v, s) {
+ expect(v, 42);
+ });
+ });
+}
diff --git a/test/registry_test.dart b/test/registry_test.dart
index 5e95522..c85bfcc 100644
--- a/test/registry_test.dart
+++ b/test/registry_test.dart
@@ -34,7 +34,6 @@
}
void testLookup() {
-
test("lookupAll", () {
RegistryManager regman = new RegistryManager();
Registry registry = regman.registry;
@@ -42,14 +41,14 @@
var element = new Element(i);
var tag = i.isEven ? Oddity.EVEN : Oddity.ODD;
return registry.add(element, tags: [tag]);
- })
- .then((_) => registry.lookup())
- .then((all) {
+ }).then((_) {
+ return registry.lookup();
+ }).then((all) {
expect(all.length, 10);
expect(all.map((v) => v.id).toList()..sort(),
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
})
- .then((_) { regman.close(); });
+ .whenComplete(regman.close);
});
test("lookupOdd", () {
@@ -59,14 +58,14 @@
var element = new Element(i);
var tag = i.isEven ? Oddity.EVEN : Oddity.ODD;
return registry.add(element, tags: [tag]);
- })
- .then((_) =>registry.lookup(tags:[Oddity.ODD]))
- .then((all) {
+ }).then((_) {
+ return registry.lookup(tags:[Oddity.ODD]);
+ }).then((all) {
expect(all.length, 5);
expect(all.map((v) => v.id).toList()..sort(),
[1, 3, 5, 7, 9]);
})
- .then((_) { regman.close(); });
+ .whenComplete(regman.close);
});
test("lookupMax", () {
@@ -76,12 +75,12 @@
var element = new Element(i);
var tag = i.isEven ? Oddity.EVEN : Oddity.ODD;
return registry.add(element, tags: [tag]);
- })
- .then((_) => registry.lookup(max: 5))
- .then((all) {
+ }).then((_) {
+ return registry.lookup(max: 5);
+ }).then((all) {
expect(all.length, 5);
})
- .then((_) { regman.close(); });
+ .whenComplete(regman.close);
});
test("lookupMultiTag", () {
@@ -95,14 +94,14 @@
if (i % j == 0) tags.add(j);
}
return registry.add(element, tags: tags);
- })
- .then((_) => registry.lookup(tags: [2, 3]))
- .then((all) {
+ }).then((_) {
+ return registry.lookup(tags: [2, 3]);
+ }).then((all) {
expect(all.length, 5);
expect(all.map((v) => v.id).toList()..sort(),
[0, 6, 12, 18, 24]);
})
- .then((_) { regman.close(); });
+ .whenComplete(regman.close);
});
test("lookupMultiTagMax", () {
@@ -116,13 +115,13 @@
if (i % j == 0) tags.add(j);
}
return registry.add(element, tags: tags);
- })
- .then((_) => registry.lookup(tags: [2, 3], max: 3))
- .then((all) {
+ }).then((_) {
+ return registry.lookup(tags: [2, 3], max: 3);
+ }).then((all) {
expect(all.length, 3);
expect(all.every((v) => (v.id % 6) == 0), isTrue);
})
- .then((_) { regman.close(); });
+ .whenComplete(regman.close);
});
}
@@ -132,10 +131,10 @@
Registry registry = regman.registry;
var object = new Object();
return registry.add(object).then((_) {
- return registry.lookup().then((entries) {
- expect(entries, hasLength(1));
- expect(entries.first, same(object));
- });
+ return registry.lookup();
+ }).then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, same(object));
}).whenComplete(regman.close);
});
@@ -146,14 +145,13 @@
var object2 = new Object();
var object3 = new Object();
var objects = [object1, object2, object3];
- return Future.wait(objects.map(registry.add))
- .then((_) {
- return registry.lookup().then((entries) {
- expect(entries, hasLength(3));
- for (var entry in entries) {
- expect(entry, isIn(objects));
- }
- });
+ return Future.wait(objects.map(registry.add)).then((_) {
+ return registry.lookup();
+ }).then((entries) {
+ expect(entries, hasLength(3));
+ for (var entry in entries) {
+ expect(entry, isIn(objects));
+ }
}).whenComplete(regman.close);
});
@@ -162,11 +160,11 @@
Registry registry = regman.registry;
var object = new Object();
return registry.add(object).then((_) {
- return registry.add(object).then((_) {
- fail("Unreachable");
- }, onError: (e, s) {
- expect(e, isStateError);
- });
+ return registry.add(object);
+ }).then((_) {
+ fail("Unreachable");
+ }, onError: (e, s) {
+ expect(e, isStateError);
}).whenComplete(regman.close);
});
@@ -182,16 +180,16 @@
expect(entries.first, same(object));
return registry.add(object2);
}).then((_) {
- return registry.lookup().then((entries) {
- expect(entries, hasLength(2));
- var entry1 = entries.first;
- var entry2 = entries.last;
- if (object == entry1) {
- expect(entry2, same(object2));
- } else {
- expect(entry1, same(object));
- }
- });
+ return registry.lookup();
+ }).then((entries) {
+ expect(entries, hasLength(2));
+ var entry1 = entries.first;
+ var entry2 = entries.last;
+ if (object == entry1) {
+ expect(entry2, same(object2));
+ } else {
+ expect(entry1, same(object));
+ }
}).whenComplete(regman.close);
});
@@ -216,26 +214,26 @@
var object1 = new Object();
var object2 = new Object();
var object3 = new Object();
- return registry.add(object1, tags: [1, 3, 5, 7]).then((_){
+ return registry.add(object1, tags: [1, 3, 5, 7]).then((_) {
return registry.add(object2, tags: [2, 3, 6, 7]);
}).then((_) {
return registry.add(object3, tags: [4, 5, 6, 7]);
}).then((_) {
- return registry.lookup(tags: [3]).then((entries) {
- expect(entries, hasLength(2));
- expect(entries.first == object1 || entries.last == object1, isTrue);
- expect(entries.first == object2 || entries.last == object2, isTrue);
- });
+ return registry.lookup(tags: [3]);
+ }).then((entries) {
+ expect(entries, hasLength(2));
+ expect(entries.first == object1 || entries.last == object1, isTrue);
+ expect(entries.first == object2 || entries.last == object2, isTrue);
}).then((_) {
- return registry.lookup(tags: [2]).then((entries) {
- expect(entries, hasLength(1));
- expect(entries.first, same(object2));
- });
+ return registry.lookup(tags: [2]);
+ }).then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, same(object2));
}).then((_) {
- return registry.lookup(tags: [3, 6]).then((entries) {
- expect(entries, hasLength(1));
- expect(entries.first, same(object2));
- });
+ return registry.lookup(tags: [3, 6]);
+ }).then((entries) {
+ expect(entries, hasLength(1));
+ expect(entries.first, same(object2));
}).whenComplete(regman.close);
});
}
@@ -250,12 +248,12 @@
expect(entries, hasLength(1));
expect(entries.first, same(object));
return registry.remove(object, removeCapability);
- }).then((removeSuccess) {
- expect(removeSuccess, isTrue);
- return registry.lookup();
- }).then((entries) {
- expect(entries, isEmpty);
});
+ }).then((removeSuccess) {
+ expect(removeSuccess, isTrue);
+ return registry.lookup();
+ }).then((entries) {
+ expect(entries, isEmpty);
}).whenComplete(regman.close);
});
@@ -268,9 +266,9 @@
expect(entries, hasLength(1));
expect(entries.first, same(object));
return registry.remove(object, new Capability());
- }).then((removeSuccess) {
- expect(removeSuccess, isFalse);
});
+ }).then((removeSuccess) {
+ expect(removeSuccess, isFalse);
}).whenComplete(regman.close);
});
}
@@ -330,7 +328,7 @@
var object2 = new Object();
var object3 = new Object();
var objects = [object1, object2, object3];
- return Future.wait(objects.map(registry.add)).then((_){
+ return Future.wait(objects.map(registry.add)).then((_) {
return registry.addTags([object1, object2], ["x", "y"]);
}).then((_) {
return registry.addTags([object1, object3], ["z", "w"]);
@@ -351,8 +349,7 @@
test("Remove-wrong-object", () {
RegistryManager regman = new RegistryManager();
Registry registry = regman.registry;
- expect(() => registry.removeTags([new Object()], ["x"]),
- throws);
+ expect(() => registry.removeTags([new Object()], ["x"]), throws);
regman.close();
});
}
@@ -373,8 +370,7 @@
// Add, lookup and remove object in other isolate.
return IsolateRunner.spawn().then((isolate) {
isolate.run(createRegMan, 1).then((registry) {
- return registry.add(object, tags: ["a", "b"])
- .then((removeCapability) {
+ return registry.add(object, tags: ["a", "b"]).then((removeCapability) {
return registry.lookup(tags: ["a"]).then((entries) {
expect(entries, hasLength(1));
expect(entries.first, same(object));
@@ -459,7 +455,7 @@
}
void testMultiRegistry() {
- test("dual-registyr", () {
+ test("dual-registry", () {
RegistryManager regman = new RegistryManager();
Registry registry1 = regman.registry;
Registry registry2 = regman.registry;
@@ -529,11 +525,14 @@
testObject(#symbol);
testObject(#_privateSymbol);
testObject(new Capability());
+ testObject(topLevelFunction);
}
class Element {
final int id;
Element(this.id);
int get hashCode => id;
- bool operator==(Object other) => other is Element && id == other.id;
+ bool operator ==(Object other) => other is Element && id == other.id;
}
+
+void topLevelFunction() {}