Make the isolate package Dart 2 type-safe. (#21)
Make the isolate package Dart 2 type-safe.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fd6401d..89443ba 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 2.0.0
+
+* Make port functions generic so they can be used in a Dart 2 type-safe way.
+
## 1.1.0
* Add generic arguments to `run` in `LoadBalancer` and `IsolateRunner`.
diff --git a/example/http_server.dart b/example/http_server.dart
index 8b47d71..0f6f30e 100644
--- a/example/http_server.dart
+++ b/example/http_server.dart
@@ -8,15 +8,13 @@
import "dart:io";
import "dart:isolate";
-import 'package:isolate/isolate_runner.dart';
+import "package:isolate/isolate_runner.dart";
import "package:isolate/ports.dart";
import "package:isolate/runner.dart";
-typedef Future RemoteStop();
-
-Future<RemoteStop> runHttpServer(
+Future<Future<Object> Function()> runHttpServer(
Runner runner, int port, HttpListener listener) async {
- var stopPort = await runner.run(_startHttpServer, [port, listener]);
+ SendPort stopPort = await runner.run(_startHttpServer, [port, listener]);
return () => _sendStop(stopPort);
}
@@ -101,7 +99,7 @@
isolate.close();
});
- List<RemoteStop> stoppers =
+ List<Future<Object> Function()> stoppers =
await Future.wait(isolates.map((IsolateRunner isolate) {
return runHttpServer(isolate, socket.port, listener);
}), cleanUp: (shutdownServer) {
@@ -114,16 +112,17 @@
print("Server listening on port $port for $count requests");
print("Test with:");
print(" ab -l -c10 -n $count http://localhost:$port/");
+ print("where 'ab' is ApacheBench from, e.g., apache2_tools.");
- await for (var event in counter) {
+ await for (var _ in counter) {
count--;
if (count == 0) {
- print('Shutting down');
+ print("Shutting down");
for (var stopper in stoppers) {
await stopper();
}
counter.close();
}
}
- print('Finished');
+ print("Finished");
}
diff --git a/example/runner_pool.dart b/example/runner_pool.dart
index efb81e5..cf8ef9e 100644
--- a/example/runner_pool.dart
+++ b/example/runner_pool.dart
@@ -4,10 +4,10 @@
library isolate.example.runner_pool;
-import 'dart:async' show Future;
+import "dart:async" show Future;
-import 'package:isolate/load_balancer.dart';
-import 'package:isolate/isolate_runner.dart';
+import "package:isolate/load_balancer.dart";
+import "package:isolate/isolate_runner.dart";
void main() {
int N = 44;
diff --git a/lib/isolate.dart b/lib/isolate.dart
index 268a232..eeb6e7e 100644
--- a/lib/isolate.dart
+++ b/lib/isolate.dart
@@ -5,8 +5,8 @@
/// Utilities for working with isolates and isolate communication.
library isolate;
-export 'isolate_runner.dart';
-export 'load_balancer.dart';
-export 'ports.dart';
-export 'registry.dart';
-export 'runner.dart';
+export "isolate_runner.dart";
+export "load_balancer.dart";
+export "ports.dart";
+export "registry.dart";
+export "runner.dart";
diff --git a/lib/isolate_runner.dart b/lib/isolate_runner.dart
index 5dd8c4f..af7afc8 100644
--- a/lib/isolate_runner.dart
+++ b/lib/isolate_runner.dart
@@ -7,9 +7,9 @@
import "dart:async";
import "dart:isolate";
-import 'ports.dart';
-import 'runner.dart';
-import 'src/lists.dart';
+import "ports.dart";
+import "runner.dart";
+import "src/util.dart";
// Command tags. Shared between IsolateRunner and IsolateRunnerRemote.
const int _SHUTDOWN = 0;
@@ -76,10 +76,10 @@
/// Can be used to create an isolate, use [run] to start a service, and
/// then drop the connection and let the service control the isolate's
/// life cycle.
- Future close() {
+ Future<void> close() {
var channel = new SingleResponseChannel();
_commandPort.send(list2(_SHUTDOWN, channel.port));
- return channel.result;
+ return channel.result.then(ignore);
}
/// Kills the isolate.
@@ -94,20 +94,21 @@
/// If the isolate is already dead, the returned future will not complete.
/// If that may be the case, use [Future.timeout] on the returned future
/// to take extra action after a while. Example:
- ///
- /// var f = isolate.kill();
- /// f.then((_) => print('Dead')
- /// .timeout(new Duration(...), onTimeout: () => print('No response'));
+ /// ```dart
+ /// var f = isolate.kill();
+ /// f.then((_) => print("Dead")
+ /// .timeout(new Duration(...), onTimeout: () => print("No response"));
+ /// ```
Future kill({Duration timeout: const Duration(seconds: 1)}) {
Future onExit = singleResponseFuture(isolate.addOnExitListener);
- if (Duration.ZERO == timeout) {
- isolate.kill(priority: Isolate.IMMEDIATE);
+ if (Duration.zero == timeout) {
+ isolate.kill(priority: Isolate.immediate);
return onExit;
} else {
// Try a more gentle shutdown sequence.
_commandPort.send(list1(_SHUTDOWN));
return onExit.timeout(timeout, onTimeout: () {
- isolate.kill(priority: Isolate.IMMEDIATE);
+ isolate.kill(priority: Isolate.immediate);
return onExit;
});
}
@@ -177,16 +178,17 @@
/// the caller.
///
/// Example:
- ///
- /// IsolateRunner iso = await IsolateRunner.spawn();
- /// try {
- /// return await iso.run(heavyComputation, argument);
- /// } finally {
- /// await iso.close();
- /// }
- Future<R> run<R, P>(R function(P argument), P argument,
+ /// ```dart
+ /// IsolateRunner iso = await IsolateRunner.spawn();
+ /// try {
+ /// return await iso.run(heavyComputation, argument);
+ /// } finally {
+ /// await iso.close();
+ /// }
+ /// ```
+ Future<R> run<R, P>(FutureOr<R> function(P argument), P argument,
{Duration timeout, onTimeout()}) {
- return singleResultFuture((SendPort port) {
+ return singleResultFuture<R>((SendPort port) {
_commandPort.send(list4(_RUN, function, argument, port));
}, timeout: timeout, onTimeout: onTimeout);
}
@@ -237,13 +239,14 @@
/// If the isolate has already stopped responding to commands,
/// the returned future will be completed after one second,
/// using [ping] to check if the isolate is still alive.
- Future get onExit {
+ Future<void> get onExit {
// TODO(lrn): Is there a way to see if an isolate is dead
// so we can close the receive port for this future?
+ // Using [ping] for now.
if (_onExitFuture == null) {
var channel = new SingleResponseChannel();
isolate.addOnExitListener(channel.port);
- _onExitFuture = channel.result;
+ _onExitFuture = channel.result.then(ignore);
ping().then((bool alive) {
if (!alive) {
channel.interrupt();
@@ -280,7 +283,7 @@
initPort.send(remote.commandPort);
}
- void _handleCommand(List command) {
+ void _handleCommand(List<Object> command) {
switch (command[0]) {
case _SHUTDOWN:
SendPort responsePort = command[1];
diff --git a/lib/load_balancer.dart b/lib/load_balancer.dart
index 631d89b..845b0c5 100644
--- a/lib/load_balancer.dart
+++ b/lib/load_balancer.dart
@@ -5,11 +5,11 @@
/// A load-balancing runner pool.
library isolate.load_balancer;
-import 'dart:async' show Future;
+import "dart:async" show Future, FutureOr;
-import 'runner.dart';
-import 'src/errors.dart';
-import 'src/lists.dart';
+import "runner.dart";
+import "src/errors.dart";
+import "src/util.dart";
/// A pool of runners, ordered by load.
///
@@ -44,8 +44,9 @@
///
/// This is a helper function that makes it easy to create a `LoadBalancer`
/// with asynchronously created runners, for example:
- ///
- /// var isolatePool = LoadBalancer.create(10, IsolateRunner.spawn);
+ /// ```dart
+ /// var isolatePool = LoadBalancer.create(10, IsolateRunner.spawn);
+ /// ```
static Future<LoadBalancer> create(int size, Future<Runner> createRunner()) {
return Future.wait(new Iterable.generate(size, (_) => createRunner()),
cleanUp: (Runner runner) {
@@ -69,8 +70,8 @@
/// If [timeout] and [onTimeout] are provided, they are forwarded to
/// the runner running the function, which will handle a timeout
/// as normal.
- Future<R> run<R, P>(R function(P argument), argument,
- {Duration timeout, onTimeout(), int load: 100}) {
+ Future<R> run<R, P>(FutureOr<R> function(P argument), argument,
+ {Duration timeout, FutureOr<R> onTimeout(), int load: 100}) {
RangeError.checkNotNegative(load, "load");
_LoadBalancerEntry entry = _first;
_increaseLoad(entry, load);
@@ -128,10 +129,11 @@
return result;
}
- Future close() {
+ Future<void> close() {
if (_stopFuture != null) return _stopFuture;
- _stopFuture =
- MultiError.waitUnordered(_queue.take(_length).map((e) => e.close()));
+ _stopFuture = MultiError
+ .waitUnordered(_queue.take(_length).map((e) => e.close()))
+ .then(ignore);
// Remove all entries.
for (int i = 0; i < _length; i++) _queue[i].queueIndex = -1;
_queue = null;
@@ -265,8 +267,13 @@
/// Whether the entry is still in the queue.
bool get inQueue => queueIndex >= 0;
- Future<R> run<R, P>(LoadBalancer balancer, int load, R function(P argument),
- argument, Duration timeout, onTimeout()) {
+ Future<R> run<R, P>(
+ LoadBalancer balancer,
+ int load,
+ FutureOr<R> function(P argument),
+ argument,
+ Duration timeout,
+ FutureOr<R> onTimeout()) {
return runner
.run<R, P>(function, argument, timeout: timeout, onTimeout: onTimeout)
.whenComplete(() {
diff --git a/lib/ports.dart b/lib/ports.dart
index 5ea6866..a65d43e 100644
--- a/lib/ports.dart
+++ b/lib/ports.dart
@@ -22,7 +22,7 @@
import "dart:async";
import "dart:isolate";
-import "src/lists.dart";
+import "src/util.dart";
/// Create a [SendPort] that accepts only one message.
///
@@ -35,23 +35,27 @@
/// message isn't received in time, the `callback` function
/// is called once with the [timeoutValue] instead.
///
+/// If the received value is not a [T], it will cause an uncaught
+/// asynchronous error in the current zone.
+///
/// Returns the `SendPort` expecting the single message.
///
/// Equivalent to:
-///
-/// (new ReceivePort()
-/// ..first.timeout(duration, () => timeoutValue).then(callback))
-/// .sendPort
-SendPort singleCallbackPort<T>(void callback(T response),
- {Duration timeout, T timeoutValue}) {
+/// ```dart
+/// (new ReceivePort()
+/// ..first.timeout(duration, () => timeoutValue).then(callback))
+/// .sendPort
+/// ```
+SendPort singleCallbackPort<P>(void callback(P response),
+ {Duration timeout, P timeoutValue}) {
RawReceivePort responsePort = new RawReceivePort();
Zone zone = Zone.current;
callback = zone.registerUnaryCallback(callback);
Timer timer;
responsePort.handler = (response) {
responsePort.close();
- if (timer != null) timer.cancel();
- zone.runUnary(callback, response as T);
+ timer?.cancel();
+ zone.runUnary(callback, response as P);
};
if (timeout != null) {
timer = new Timer(timeout, () {
@@ -71,7 +75,8 @@
///
/// If `callback` is omitted, it defaults to an identity function.
/// The `callback` call may return a future, and the completer will
-/// wait for that future to complete.
+/// wait for that future to complete. If [callback] is omitted, the
+/// message on the port must be an instance of [R].
///
/// If [timeout] is supplied, it is used as a limit on how
/// long it can take before the message is received. If a
@@ -87,28 +92,37 @@
/// completed in response to another event, either a port message or a timer.
///
/// Returns the `SendPort` expecting the single message.
-SendPort singleCompletePort<T>(Completer completer,
- {callback(T message), Duration timeout, T onTimeout()}) {
+SendPort singleCompletePort<R, P>(Completer<R> completer,
+ {FutureOr<R> callback(P message),
+ Duration timeout,
+ FutureOr<R> onTimeout()}) {
if (callback == null && timeout == null) {
- return singleCallbackPort(completer.complete);
+ return singleCallbackPort<Object>((response) {
+ _castComplete<R>(completer, response);
+ });
}
RawReceivePort responsePort = new RawReceivePort();
Timer timer;
if (callback == null) {
- responsePort.handler = (T response) {
+ responsePort.handler = (response) {
responsePort.close();
- if (timer != null) timer.cancel();
- completer.complete(response);
+ timer?.cancel();
+ _castComplete<R>(completer, response);
};
} else {
Zone zone = Zone.current;
- var action = zone.registerUnaryCallback((T response) {
- completer.complete(new Future.sync(() => callback(response)));
+ var action = zone.registerUnaryCallback((response) {
+ try {
+ // Also catch it if callback throws.
+ completer.complete(callback(response as P));
+ } catch (error, stack) {
+ completer.completeError(error, stack);
+ }
});
responsePort.handler = (response) {
responsePort.close();
- if (timer != null) timer.cancel();
- zone.runUnary(action, response as T);
+ timer?.cancel();
+ zone.runUnary(action, response as P);
};
}
if (timeout != null) {
@@ -145,17 +159,17 @@
/// If you want a timeout on the returned future, it's recommended to
/// use the [timeout] parameter, and not [Future.timeout] on the result.
/// The `Future` method won't be able to close the underlying [ReceivePort].
-Future singleResponseFuture(void action(SendPort responsePort),
- {Duration timeout, var timeoutValue}) {
- Completer completer = new Completer.sync();
+Future<R> singleResponseFuture<R>(void action(SendPort responsePort),
+ {Duration timeout, R timeoutValue}) {
+ Completer<R> completer = new Completer<R>.sync();
RawReceivePort responsePort = new RawReceivePort();
Timer timer;
Zone zone = Zone.current;
- responsePort.handler = (v) {
+ responsePort.handler = (Object response) {
responsePort.close();
- if (timer != null) timer.cancel();
+ timer?.cancel();
zone.run(() {
- completer.complete(v);
+ _castComplete<R>(completer, response);
});
};
if (timeout != null) {
@@ -166,12 +180,12 @@
}
try {
action(responsePort.sendPort);
- } catch (e, s) {
+ } catch (error, stack) {
responsePort.close();
- if (timer != null) timer.cancel();
+ timer?.cancel();
// Delay completion because completer is sync.
scheduleMicrotask(() {
- completer.completeError(e, s);
+ completer.completeError(error, stack);
});
}
return completer.future;
@@ -182,11 +196,11 @@
/// The result of [future] is sent on [resultPort] in a form expected by
/// either [receiveFutureResult], [completeFutureResult], or
/// by the port of [singleResultFuture].
-void sendFutureResult(Future future, SendPort resultPort) {
- future.then((v) {
- resultPort.send(list1(v));
- }, onError: (e, s) {
- resultPort.send(list2("$e", "$s"));
+void sendFutureResult(Future<Object> future, SendPort resultPort) {
+ future.then((value) {
+ resultPort.send(list1(value));
+ }, onError: (error, stack) {
+ resultPort.send(list2("$error", "$stack"));
});
}
@@ -208,10 +222,10 @@
/// instead.
/// If `onTimeout` is omitted, it defaults to throwing
/// a [TimeoutException].
-Future singleResultFuture(void action(SendPort responsePort),
- {Duration timeout, onTimeout()}) {
- Completer completer = new Completer.sync();
- SendPort port = singleCompletePort(completer,
+Future<R> singleResultFuture<R>(void action(SendPort responsePort),
+ {Duration timeout, FutureOr<R> onTimeout()}) {
+ var completer = new Completer<R>.sync();
+ SendPort port = singleCompletePort<R, List<Object>>(completer,
callback: receiveFutureResult, timeout: timeout, onTimeout: onTimeout);
try {
action(port);
@@ -225,12 +239,12 @@
/// Completes a completer with a message created by [sendFutureResult]
///
/// The [response] must be a message on the format sent by [sendFutureResult].
-void completeFutureResult(var response, Completer completer) {
+void completeFutureResult<R>(List<Object> response, Completer<R> completer) {
if (response.length == 2) {
var error = new RemoteError(response[0], response[1]);
completer.completeError(error, error.stackTrace);
} else {
- var result = response[0];
+ R result = response[0];
completer.complete(result);
}
}
@@ -239,23 +253,23 @@
/// result.
///
/// The [response] must be a message on the format sent by [sendFutureResult].
-Future receiveFutureResult(var response) {
+Future<R> receiveFutureResult<R>(List<Object> response) {
if (response.length == 2) {
var error = new RemoteError(response[0], response[1]);
return new Future.error(error, error.stackTrace);
}
- var result = response[0];
- return new Future.value(result);
+ R result = response[0];
+ return new Future<R>.value(result);
}
/// A [Future] and a [SendPort] that can be used to complete the future.
///
/// The first value sent to [port] is used to complete the [result].
/// All following values sent to `port` are ignored.
-class SingleResponseChannel {
+class SingleResponseChannel<R> {
Zone _zone;
final RawReceivePort _receivePort;
- final Completer _completer;
+ final Completer<R> _completer;
final Function _callback;
Timer _timer;
@@ -275,13 +289,13 @@
/// If `onTimeout` is not provided either,
/// the future is completed with `timeoutValue`, which defaults to `null`.
SingleResponseChannel(
- {callback(value),
+ {FutureOr<R> callback(Null value),
Duration timeout,
bool throwOnTimeout: false,
- onTimeout(),
- var timeoutValue})
+ FutureOr<R> onTimeout(),
+ R timeoutValue})
: _receivePort = new RawReceivePort(),
- _completer = new Completer.sync(),
+ _completer = new Completer<R>.sync(),
_callback = callback,
_zone = Zone.current {
_receivePort.handler = _handleResponse;
@@ -292,7 +306,7 @@
if (!_completer.isCompleted) {
if (throwOnTimeout) {
_completer.completeError(
- new TimeoutException('Timeout waiting for response', timeout));
+ new TimeoutException("Timeout waiting for response", timeout));
} else if (onTimeout != null) {
_completer.complete(new Future.sync(onTimeout));
} else {
@@ -307,14 +321,14 @@
SendPort get port => _receivePort.sendPort;
/// Future completed by the first value sent to [port].
- Future get result => _completer.future;
+ Future<R> get result => _completer.future;
/// If the channel hasn't completed yet, interrupt it and complete the result.
///
/// If the channel hasn't received a value yet, or timed out, it is stopped
/// (like by a timeout) and the [SingleResponseChannel.result]
/// is completed with [result].
- void interrupt([result]) {
+ void interrupt([R result]) {
_receivePort.close();
_cancelTimer();
if (!_completer.isCompleted) {
@@ -335,7 +349,11 @@
_receivePort.close();
_cancelTimer();
if (_callback == null) {
- _completer.complete(v);
+ try {
+ _completer.complete(v as R);
+ } catch (e, s) {
+ _completer.completeError(e, s);
+ }
} else {
// The _handleResponse function is the handler of a RawReceivePort.
// As such, it runs in the root zone.
@@ -350,3 +368,13 @@
}
}
}
+
+// Helper function that casts an object to a type and completes a
+// corresponding completer, or completes with the error if the cast fails.
+void _castComplete<R>(Completer<R> completer, Object value) {
+ try {
+ completer.complete(value as R);
+ } catch (error, stack) {
+ completer.completeError(error, stack);
+ }
+}
diff --git a/lib/registry.dart b/lib/registry.dart
index 37014ee..4073e4f 100644
--- a/lib/registry.dart
+++ b/lib/registry.dart
@@ -5,12 +5,12 @@
/// An isolate-compatible object registry and lookup service.
library isolate.registry;
-import 'dart:async' show Future, Completer, TimeoutException;
-import 'dart:collection' show HashMap, HashSet;
-import 'dart:isolate' show RawReceivePort, SendPort, Capability;
+import "dart:async" show Future, Completer, TimeoutException;
+import "dart:collection" show HashMap, HashSet;
+import "dart:isolate" show RawReceivePort, SendPort, Capability;
-import 'ports.dart';
-import 'src/lists.dart';
+import "ports.dart";
+import "src/util.dart";
// Command tags.
const int _ADD = 0;
@@ -111,7 +111,7 @@
"Object already in registry: ${Error.safeToString(element)}");
});
}
- Completer completer = new Completer<Capability>();
+ var completer = new Completer<Capability>();
SendPort port = singleCompletePort(completer,
callback: (List response) {
assert(cache.isAdding(element));
@@ -144,7 +144,7 @@
if (id == null) {
return new Future<bool>.value(false);
}
- Completer completer = new Completer<bool>();
+ var completer = new Completer<bool>();
SendPort port = singleCompletePort(completer, callback: (bool result) {
_cache.remove(id);
return result;
@@ -174,18 +174,18 @@
/// before or not.
///
/// Fails if any of the elements are not in the registry.
- Future removeTags(Iterable<T> elements, Iterable tags) {
+ Future<void> removeTags(Iterable<T> elements, Iterable tags) {
List ids = elements.map(_getId).toList(growable: false);
tags = tags.toList(growable: false);
- Completer completer = new Completer();
+ var completer = new Completer<void>();
SendPort port = singleCompletePort(completer, timeout: _timeout);
_commandPort.send(list4(_REMOVE_TAGS, ids, tags, port));
return completer.future;
}
- Future _addTags(List<int> ids, Iterable tags) {
+ Future<void> _addTags(List<int> ids, Iterable tags) {
tags = tags.toList(growable: false);
- Completer completer = new Completer();
+ var completer = new Completer<void>();
SendPort port = singleCompletePort(completer, timeout: _timeout);
_commandPort.send(list4(_ADD_TAGS, ids, tags, port));
return completer.future;
@@ -205,7 +205,7 @@
throw new RangeError.range(max, 1, null, "max");
}
if (tags != null) tags = tags.toList(growable: false);
- Completer completer = new Completer<List<T>>();
+ var completer = new Completer<List<T>>();
SendPort port = singleCompletePort(completer, callback: (List response) {
// Response is even-length list of (id, element) pairs.
_RegistryCache cache = _cache;
diff --git a/lib/runner.dart b/lib/runner.dart
index a8a848c..b050a38 100644
--- a/lib/runner.dart
+++ b/lib/runner.dart
@@ -6,7 +6,7 @@
/// or even isolate.
library isolate.runner;
-import 'dart:async' show Future;
+import "dart:async" show Future, FutureOr;
/// Calls a function with an argument.
///
@@ -35,8 +35,8 @@
/// complete with a [TimeoutException].
///
/// The default implementation runs the function in the current isolate.
- Future<R> run<R, P>(R function(P argument), P argument,
- {Duration timeout, onTimeout()}) {
+ Future<R> run<R, P>(FutureOr<R> function(P argument), P argument,
+ {Duration timeout, FutureOr<R> onTimeout()}) {
Future result = new Future.sync(() => function(argument));
if (timeout != null) {
result = result.timeout(timeout, onTimeout: onTimeout);
diff --git a/lib/src/errors.dart b/lib/src/errors.dart
index a5813da..81a5d6b 100644
--- a/lib/src/errors.dart
+++ b/lib/src/errors.dart
@@ -10,7 +10,7 @@
/// that it returns all the errors.
library isolate.errors;
-import 'dart:async';
+import "dart:async";
class MultiError extends Error {
// Limits the number of lines included from each error's error message.
@@ -40,16 +40,16 @@
///
/// The order of values is not preserved (if that is needed, use
/// [wait]).
- static Future<List> waitUnordered(Iterable<Future> futures,
- {cleanUp(successResult)}) {
- Completer completer;
+ static Future<List<Object>> waitUnordered<T>(Iterable<Future<T>> futures,
+ {void cleanUp(T successResult)}) {
+ Completer<List<Object>> completer;
int count = 0;
int errors = 0;
int values = 0;
// Initialized to `new List(count)` when count is known.
// Filled up with values on the left, errors on the right.
// Order is not preserved.
- List results;
+ List<Object> results;
void checkDone() {
if (errors + values < count) return;
if (errors == 0) {
@@ -60,7 +60,7 @@
completer.completeError(new MultiError(errorList));
}
- var handleValue = (v) {
+ var handleValue = (T v) {
// If this fails because [results] is null, there is a future
// which breaks the Future API by completing immediately when
// calling Future.then, probably by misusing a synchronous completer.
@@ -80,7 +80,7 @@
results[results.length - ++errors] = e;
checkDone();
};
- for (Future future in futures) {
+ for (var future in futures) {
count++;
future.then(handleValue, onError: handleError);
}
@@ -99,15 +99,16 @@
/// The order of values is preserved, and if any error occurs, the
/// [MultiError.errors] list will have errors in the corresponding slots,
/// and `null` for non-errors.
- Future<List> wait(Iterable<Future> futures, {cleanUp(successResult)}) {
- Completer completer;
+ Future<List<Object>> wait<T>(Iterable<Future<T>> futures,
+ {void cleanUp(T successResult)}) {
+ Completer<List<Object>> completer;
int count = 0;
bool hasError = false;
int completed = 0;
// Initialized to `new List(count)` when count is known.
// Filled with values until the first error, then cleared
// and filled with errors.
- List results;
+ List<Object> results;
void checkDone() {
completed++;
if (completed < count) return;
@@ -118,7 +119,7 @@
completer.completeError(new MultiError(results));
}
- for (Future future in futures) {
+ for (var future in futures) {
int i = count;
count++;
future.then((v) {
@@ -136,7 +137,7 @@
if (result != null) new Future.sync(() => cleanUp(result));
}
}
- results.fillRange(0, results.length, null);
+ results = new List<Object>(count);
hasError = true;
}
results[i] = e;
@@ -144,7 +145,7 @@
});
}
if (count == 0) return new Future.value(new List(0));
- results = new List(count);
+ results = new List<T>(count);
completer = new Completer();
return completer.future;
}
diff --git a/lib/src/lists.dart b/lib/src/lists.dart
deleted file mode 100644
index 24c0010..0000000
--- a/lib/src/lists.dart
+++ /dev/null
@@ -1,35 +0,0 @@
-// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-/// Utility functions to create fixed-length lists.
-library isolate.lists;
-
-/// Create a single-element fixed-length list.
-List list1(v1) => new List(1)..[0] = v1;
-
-/// Create a two-element fixed-length list.
-List list2(v1, v2) => new List(2)
- ..[0] = v1
- ..[1] = v2;
-
-/// Create a three-element fixed-length list.
-List list3(v1, v2, v3) => new List(3)
- ..[0] = v1
- ..[1] = v2
- ..[2] = v3;
-
-/// Create a four-element fixed-length list.
-List list4(v1, v2, v3, v4) => new List(4)
- ..[0] = v1
- ..[1] = v2
- ..[2] = v3
- ..[3] = v4;
-
-/// Create a five-element fixed-length list.
-List list5(v1, v2, v3, v4, v5) => new List(5)
- ..[0] = v1
- ..[1] = v2
- ..[2] = v3
- ..[3] = v4
- ..[4] = v5;
diff --git a/lib/src/raw_receive_port_multiplexer.dart b/lib/src/raw_receive_port_multiplexer.dart
index 00843ad..a284af1 100644
--- a/lib/src/raw_receive_port_multiplexer.dart
+++ b/lib/src/raw_receive_port_multiplexer.dart
@@ -23,10 +23,10 @@
/// global mutex, so it may be a bottleneck, but it's not clear how slow it is).
library isolate.raw_receive_port_multiplexer;
-import 'dart:collection';
-import 'dart:isolate';
+import "dart:collection";
+import "dart:isolate";
-import 'lists.dart';
+import "util.dart";
class _MultiplexRawReceivePort implements RawReceivePort {
final RawReceivePortMultiplexer _multiplexer;
diff --git a/lib/src/util.dart b/lib/src/util.dart
new file mode 100644
index 0000000..7c55ff4
--- /dev/null
+++ b/lib/src/util.dart
@@ -0,0 +1,40 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/// Utility functions.
+
+/// Ignore an argument.
+///
+/// Can be used to drop the result of a future like `future.then(ignore)`.
+void ignore(_) {}
+
+/// Create a single-element fixed-length list.
+List<Object> list1(Object v1) => new List(1)..[0] = v1;
+
+/// Create a two-element fixed-length list.
+List<Object> list2(Object v1, Object v2) => new List(2)
+ ..[0] = v1
+ ..[1] = v2;
+
+/// Create a three-element fixed-length list.
+List<Object> list3(Object v1, Object v2, Object v3) => new List(3)
+ ..[0] = v1
+ ..[1] = v2
+ ..[2] = v3;
+
+/// Create a four-element fixed-length list.
+List<Object> list4(Object v1, Object v2, Object v3, Object v4) => new List(4)
+ ..[0] = v1
+ ..[1] = v2
+ ..[2] = v3
+ ..[3] = v4;
+
+/// Create a five-element fixed-length list.
+List<Object> list5(Object v1, Object v2, Object v3, Object v4, Object v5) =>
+ new List(5)
+ ..[0] = v1
+ ..[1] = v2
+ ..[2] = v3
+ ..[3] = v4
+ ..[4] = v5;
diff --git a/test/isolaterunner_test.dart b/test/isolaterunner_test.dart
index ab22e34..e13415c 100644
--- a/test/isolaterunner_test.dart
+++ b/test/isolaterunner_test.dart
@@ -4,11 +4,11 @@
library isolate.test.isolaterunner_test;
-import 'dart:async' show Future;
-import 'dart:isolate' show Capability;
+import "dart:async" show Future;
+import "dart:isolate" show Capability;
-import 'package:isolate/isolate_runner.dart';
-import 'package:test/test.dart';
+import "package:isolate/isolate_runner.dart";
+import "package:test/test.dart";
const MS = const Duration(milliseconds: 1);
@@ -16,7 +16,7 @@
test("create-close", testCreateClose);
test("create-run-close", testCreateRunClose);
test("separate-isolates", testSeparateIsolates);
- group('isolate functions', testIsolateFunctions);
+ group("isolate functions", testIsolateFunctions);
}
Future testCreateClose() {
diff --git a/test/ports_test.dart b/test/ports_test.dart
index 77f3604..8a80c65 100644
--- a/test/ports_test.dart
+++ b/test/ports_test.dart
@@ -4,20 +4,20 @@
library isolate.test.ports_test;
-import 'dart:async';
-import 'dart:isolate';
+import "dart:async";
+import "dart:isolate";
-import 'package:isolate/ports.dart';
-import 'package:test/test.dart';
+import "package:isolate/ports.dart";
+import "package:test/test.dart";
const Duration MS = const Duration(milliseconds: 1);
main() {
- group('SingleCallbackPort', testSingleCallbackPort);
- group('SingleCompletePort', testSingleCompletePort);
- group('SingleResponseFuture', testSingleResponseFuture);
- group('SingleResponseFuture', testSingleResultFuture);
- group('SingleResponseChannel', testSingleResponseChannel);
+ group("SingleCallbackPort", testSingleCallbackPort);
+ group("SingleCompletePort", testSingleCompletePort);
+ group("SingleResponseFuture", testSingleResponseFuture);
+ group("SingleResponseFuture", testSingleResultFuture);
+ group("SingleResponseChannel", testSingleResponseChannel);
}
void testSingleCallbackPort() {
@@ -393,7 +393,7 @@
});
test("ValueCallback", () {
- var channel = new SingleResponseChannel(callback: (v) => v * 2);
+ var channel = new SingleResponseChannel(callback: (v) => 2 * v);
channel.port.send(42);
return channel.result.then((v) {
expect(v, 84);
@@ -412,7 +412,7 @@
test("AsyncValueCallback", () {
var channel =
- new SingleResponseChannel(callback: (v) => new Future.value(v * 2));
+ new SingleResponseChannel(callback: (v) => new Future.value(2 * v));
channel.port.send(42);
return channel.result.then((v) {
expect(v, 84);
diff --git a/test/registry_test.dart b/test/registry_test.dart
index a69aa1f..76d0420 100644
--- a/test/registry_test.dart
+++ b/test/registry_test.dart
@@ -4,25 +4,25 @@
library isolate.test.registry_test;
-import 'dart:async';
-import 'dart:isolate';
+import "dart:async";
+import "dart:isolate";
-import 'package:isolate/isolate_runner.dart';
-import 'package:isolate/registry.dart';
+import "package:isolate/isolate_runner.dart";
+import "package:isolate/registry.dart";
-import 'package:test/test.dart';
+import "package:test/test.dart";
const MS = const Duration(milliseconds: 1);
void main() {
- group('lookup', testLookup);
- group('AddLookup', testAddLookup);
- group('AddRemoveTags', testAddRemoveTags);
- group('Remove', testRemove);
- group('CrossIsolate', testCrossIsolate);
- group('Timeout', testTimeout);
- group('MultiRegistry', testMultiRegistry);
- group('ObjectsAndTags', testObjectsAndTags);
+ group("lookup", testLookup);
+ group("AddLookup", testAddLookup);
+ group("AddRemoveTags", testAddRemoveTags);
+ group("Remove", testRemove);
+ group("CrossIsolate", testCrossIsolate);
+ group("Timeout", testTimeout);
+ group("MultiRegistry", testMultiRegistry);
+ group("ObjectsAndTags", testObjectsAndTags);
}
class Oddity {