Tweak null safety (#54)
* Tweak the Null Safety migrated APIs and implementation.
* Revert depending on a separate priority queue.
Update the existing code be tailored for this particular use-case,
which also avoids the nullability issues leading to changing it in the first place.
* Update changelog and some documentation.
* Add tests for load_balancer.
Fix bugs found by tests.
Change type of LoadBalancer.runMultiple back to List<Future<R>>.
* Use `package:lints` for linting.
Ignore misfiring lints.
diff --git a/.status b/.status
deleted file mode 100644
index a69ed9e..0000000
--- a/.status
+++ /dev/null
@@ -1,11 +0,0 @@
-# Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
-# for details. All rights reserved. Use of this source code is governed by a
-# BSD-style license that can be found in the LICENSE file.
-
-[ $runtime == vm ]
-test/isolaterunner_test: RuntimeError # addOnExitListener not implemented.
-
-[ $compiler == dart2js ]
-test/registry_test: CompileTimeError # Unimplemented: private symbol literals.
-# FunctionRef will be removed when the VM supports sending functions.
-test/isolaterunner_test: RuntimeError # Mirrors not working - FunctionRef broken.
\ No newline at end of file
diff --git a/CHANGELOG.md b/CHANGELOG.md
index cd8b7e3..d9bd7c2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,10 @@
+## 2.1.0
+
+* Migrate to null safety.
+* Add `singleResponseFutureWithTimeout` and `singleCallbackPortWithTimeout`,
+ while deprecating the timeout functionality of
+ `singleResponseFuture` and `singleCallbackPort`.
+
## 2.0.3
* Update SDK requirements.
diff --git a/analysis_options.yaml b/analysis_options.yaml
index 0711aca..0b0a7b5 100644
--- a/analysis_options.yaml
+++ b/analysis_options.yaml
@@ -1,43 +1,13 @@
-include: package:pedantic/analysis_options.yaml
+include: package:lints/recommended.yaml
analyzer:
- strong-mode:
- implicit-casts: false
+ errors:
+ deprecated_member_use_from_same_package: ignore # https://github.com/dart-lang/linter/issues/2703
+ void_checks: ignore # https://github.com/dart-lang/linter/issues/2685
linter:
rules:
- - avoid_empty_else
- - avoid_init_to_null
- - avoid_null_checks_in_equality_operators
- avoid_unused_constructor_parameters
- - await_only_futures
- - camel_case_types
- cancel_subscriptions
- - constant_identifier_names
- - control_flow_in_finally
- directives_ordering
- - empty_catches
- - empty_constructor_bodies
- - empty_statements
- - hash_and_equals
- - implementation_imports
- - iterable_contains_unrelated_type
- - library_names
- - library_prefixes
- - list_remove_unrelated_type
- - non_constant_identifier_names
- - overridden_fields
- package_api_docs
- - package_names
- - package_prefixed_library_names
- - prefer_equal_for_default_values
- - prefer_final_fields
- - prefer_generic_function_type_aliases
- - prefer_is_not_empty
- - slash_for_doc_comments
- test_types_in_equals
- throw_in_finally
- - type_init_formals
- - unnecessary_brace_in_string_interps
- - unnecessary_const
- - unnecessary_new
- - unrelated_type_equality_checks
- - valid_regexps
diff --git a/example/http_server.dart b/example/http_server.dart
index c93a504..5e4857f 100644
--- a/example/http_server.dart
+++ b/example/http_server.dart
@@ -20,7 +20,7 @@
}
Future<Object?> _sendStop(SendPort stopPort) =>
- singleResponseFutureWithoutTimeout(stopPort.send);
+ singleResponseFuture(stopPort.send);
Future<SendPort> _startHttpServer(List<Object?> args) async {
var port = args[0] as int;
@@ -30,7 +30,7 @@
await HttpServer.bind(InternetAddress.anyIPv6, port, shared: true);
await listener.start(server);
- return singleCallbackPortWithoutTimeout((SendPort resultPort) {
+ return singleCallbackPort((SendPort resultPort) {
sendFutureResult(Future.sync(listener.stop), resultPort);
});
}
diff --git a/example/runner_pool.dart b/example/runner_pool.dart
index 000ca33..b479a60 100644
--- a/example/runner_pool.dart
+++ b/example/runner_pool.dart
@@ -48,11 +48,6 @@
});
}
-int computeFib(int n) {
- var result = fib(n);
- return result;
-}
-
int fib(int n) {
if (n < 2) return n;
return fib(n - 1) + fib(n - 2);
diff --git a/lib/isolate_runner.dart b/lib/isolate_runner.dart
index c16a4aa..15e0f65 100644
--- a/lib/isolate_runner.dart
+++ b/lib/isolate_runner.dart
@@ -101,8 +101,7 @@
/// .timeout(new Duration(...), onTimeout: () => print("No response"));
/// ```
Future<void> kill({Duration timeout = const Duration(seconds: 1)}) {
- final onExit =
- singleResponseFutureWithoutTimeout(isolate.addOnExitListener);
+ final onExit = singleResponseFuture(isolate.addOnExitListener);
if (Duration.zero == timeout) {
isolate.kill(priority: Isolate.immediate);
return onExit;
@@ -128,14 +127,10 @@
/// Guaranteed to only complete after all previous sent isolate commands
/// (like pause and resume) have been handled.
/// Paused isolates do respond to ping requests.
- Future<bool> ping({Duration timeout = const Duration(seconds: 1)}) {
- var channel = SingleResponseChannel(
- callback: _kTrue, timeout: timeout, timeoutValue: false);
- isolate.ping(channel.port);
- return channel.result.then((result) => result ?? false);
- }
-
- static bool _kTrue(_) => true;
+ Future<bool> ping({Duration timeout = const Duration(seconds: 1)}) =>
+ singleResponseFutureWithTimeout((port) {
+ isolate.ping(port, response: true);
+ }, timeout, false);
/// Pauses the isolate.
///
@@ -147,12 +142,14 @@
///
/// If [resumeCapability] is omitted, it defaults to the [isolate]'s
/// [Isolate.pauseCapability].
+ /// If the isolate has no pause capability, nothing happens.
///
/// Calling pause more than once with the same `resumeCapability`
/// has no further effect. Only a single call to [resume] is needed
/// to resume the isolate.
void pause([Capability? resumeCapability]) {
resumeCapability ??= isolate.pauseCapability;
+ if (resumeCapability == null) return;
isolate.pause(resumeCapability);
}
@@ -160,6 +157,7 @@
///
/// If [resumeCapability] is omitted, it defaults to the isolate's
/// [Isolate.pauseCapability].
+ /// If the isolate has no pause capability, nothing happens.
///
/// Even if `pause` has been called more than once with the same
/// `resumeCapability`, a single resume call with stop the pause.
@@ -176,6 +174,12 @@
/// If the call returns a [Future], the final result of that future
/// will be returned.
///
+ /// If [timeout] is provided, and the returned future does not complete
+ /// before that duration has passed,
+ /// the [onTimeout] action is executed instead, and its result (whether it
+ /// returns or throws) is used as the result of the returned future.
+ /// If [onTimeout] is omitted, it defaults to throwing a[TimeoutException].
+ ///
/// This works similar to the arguments to [Isolate.spawn], except that
/// it runs in the existing isolate and the return value is returned to
/// the caller.
@@ -203,34 +207,35 @@
/// as errors in the stream. Be ready to handle the errors.
///
/// The stream closes when the isolate shuts down.
- Stream get errors {
- var controller = StreamController.broadcast(sync: true);
- var port = RawReceivePort();
- void handleError(message) {
- if (message == null) {
- // Isolate shutdown.
+ ///
+ /// If the isolate shuts down while noone is listening on this stream,
+ /// the stream will not be closed, and listening to the stream again
+ /// after the isolate has shut down will not yield any events.
+ Stream<Never> get errors {
+ var controller = StreamController<Never>.broadcast(sync: true);
+ controller.onListen = () {
+ var port = RawReceivePort();
+ port.handler = (message) {
+ if (message == null) {
+ // Isolate shutdown.
+ port.close();
+ controller.close();
+ } else {
+ // Uncaught error.
+ final errorDescription = message[0] as String;
+ final stackDescription = message[1] as String;
+ var error = RemoteError(errorDescription, stackDescription);
+ controller.addError(error, error.stackTrace);
+ }
+ };
+ isolate.addErrorListener(port.sendPort);
+ isolate.addOnExitListener(port.sendPort);
+ controller.onCancel = () {
port.close();
- controller.close();
- } else {
- // Uncaught error.
- final errorDescription = message[0] as String;
- final stackDescription = message[1] as String;
- var error = RemoteError(errorDescription, stackDescription);
- controller.addError(error, error.stackTrace);
- }
- }
-
- controller
- ..onListen = () {
- port.handler = handleError;
- isolate.addErrorListener(port.sendPort);
- isolate.addOnExitListener(port.sendPort);
- }
- ..onCancel = () {
isolate.removeErrorListener(port.sendPort);
isolate.removeOnExitListener(port.sendPort);
- port.close();
};
+ };
return controller.stream;
}
@@ -242,9 +247,8 @@
/// the returned future will be completed after one second,
/// using [ping] to check if the isolate is still alive.
Future<void>? get onExit {
- // TODO(lrn): Is there a way to see if an isolate is dead
- // so we can close the receive port for this future?
- // Using [ping] for now.
+ // Using [ping] to see if the isolate is dead.
+ // Can't distinguish that from a slow-to-answer isolate.
if (_onExitFuture == null) {
var channel = SingleResponseChannel<void>();
isolate.addOnExitListener(channel.port);
diff --git a/lib/load_balancer.dart b/lib/load_balancer.dart
index 170e02a..a04ab9c 100644
--- a/lib/load_balancer.dart
+++ b/lib/load_balancer.dart
@@ -5,32 +5,71 @@
/// A load-balancing runner pool.
library isolate.load_balancer;
-import 'dart:async' show Future, FutureOr;
+import 'dart:async' show Completer, Future, FutureOr;
import 'runner.dart';
import 'src/errors.dart';
-import 'src/priority_queue.dart';
import 'src/util.dart';
/// A pool of runners, ordered by load.
///
/// Keeps a pool of runners,
/// and allows running function through the runner with the lowest current load.
+///
+/// The number of pool runner entries is fixed when the pool is created.
+/// When the pool is [close]d, all runners are closed as well.
+///
+/// The load balancer is not reentrant.
+/// Executing a [run] function should not *synchronously*
+/// call methods on the load balancer.
class LoadBalancer implements Runner {
- // A heap-based priority queue of entries, prioritized by `load`.
- // Each entry has its own entry in the queue, for faster update.
- PriorityQueue<_LoadBalancerEntry> _queue;
+ /// A stand-in future which can be used as a default value.
+ ///
+ /// The future never completes, so it should not be exposed to users.
+ static final _defaultFuture = Completer<Never>().future;
- // Whether [stop] has been called.
+ /// Reusable empty fixed-length list.
+ static final _emptyQueue = List<_LoadBalancerEntry>.empty(growable: false);
+
+ /// A heap-based priority queue of entries, prioritized by `load`.
+ ///
+ /// The entries of the list never change, only their positions.
+ /// Those with positions below `_length`
+ /// are considered to currently be in the queue.
+ /// All operations except [close] should end up with all entries
+ /// still in the pool. Some entries may be removed temporarily in order
+ /// to change their load and then add them back.
+ ///
+ /// Each [_LoadBalancerEntry] has its current position in the queue
+ /// as [_LoadBalancerEntry.queueIndex].
+ ///
+ /// Is set to an empty list on clear.
+ List<_LoadBalancerEntry> _queue;
+
+ /// Current number of elements in [_queue].
+ ///
+ /// Always a number between zero and [_queue.length].
+ /// Elements with indices below this value are
+ /// in the queue, and maintain the heap invariant.
+ /// Elements with indices above this value are temporarily
+ /// removed from the queue and are ordered by when they
+ /// were removed.
+ int _length;
+
+ /// The future returned by [stop].
+ ///
+ /// Is `null` until [stop] is first called.
Future<void>? _stopFuture;
/// Create a load balancer backed by the [Runner]s of [runners].
LoadBalancer(Iterable<Runner> runners) : this._(_createEntries(runners));
- LoadBalancer._(PriorityQueue<_LoadBalancerEntry> entries) : _queue = entries;
+ LoadBalancer._(List<_LoadBalancerEntry> entries)
+ : _queue = entries,
+ _length = entries.length;
/// The number of runners currently in the pool.
- int get length => _queue.length;
+ int get length => _length;
/// Asynchronously create [size] runners and create a `LoadBalancer` of those.
///
@@ -47,10 +86,12 @@
}).then((runners) => LoadBalancer(runners));
}
- static PriorityQueue<_LoadBalancerEntry> _createEntries(
- Iterable<Runner> runners) =>
- PriorityQueue<_LoadBalancerEntry>()
- ..addAll(runners.map((runner) => _LoadBalancerEntry(runner)));
+ static List<_LoadBalancerEntry> _createEntries(Iterable<Runner> runners) {
+ var index = 0;
+ return runners
+ .map((runner) => _LoadBalancerEntry(runner, index++))
+ .toList(growable: false);
+ }
/// Execute the command in the currently least loaded isolate.
///
@@ -68,9 +109,17 @@
Future<R> run<R, P>(FutureOr<R> Function(P argument) function, P argument,
{Duration? timeout, FutureOr<R> Function()? onTimeout, int load = 100}) {
RangeError.checkNotNegative(load, 'load');
- final entry = _queue.removeFirst();
+ if (_length == 0) {
+ // Can happen if created with zero runners,
+ // or after being closed.
+ if (_stopFuture != null) {
+ throw StateError("Load balancer has been closed");
+ }
+ throw StateError("No runners in pool");
+ }
+ var entry = _queue.first;
entry.load += load;
- _queue.add(entry);
+ _bubbleDown(entry, 0);
return entry.run(this, load, function, argument, timeout, onTimeout);
}
@@ -88,22 +137,22 @@
/// If [timeout] and [onTimeout] are provided, they are forwarded to
/// the runners running the function, which will handle any timeouts
/// as normal.
- List<FutureOr<R>> runMultiple<R, P>(
+ List<Future<R>> runMultiple<R, P>(
int count, FutureOr<R> Function(P argument) function, P argument,
{Duration? timeout, FutureOr<R> Function()? onTimeout, int load = 100}) {
- RangeError.checkValueInInterval(count, 1, length, 'count');
+ RangeError.checkValueInInterval(count, 1, _length, 'count');
RangeError.checkNotNegative(load, 'load');
if (count == 1) {
- return List<FutureOr<R>>.filled(
+ return List<Future<R>>.filled(
1,
run(function, argument,
load: load, timeout: timeout, onTimeout: onTimeout));
}
- final result = List<FutureOr<R>>.filled(count, _defaultFuture);
- if (count == length) {
+ var result = List<Future<R>>.filled(count, _defaultFuture);
+ if (count == _length) {
// No need to change the order of entries in the queue.
- for (var i = 0; i < _queue.unorderedElements.length; i++) {
- var entry = _queue.unorderedElements.elementAt(i);
+ for (var i = 0; i < _length; i++) {
+ var entry = _queue[i];
entry.load += load;
result[i] =
entry.run(this, load, function, argument, timeout, onTimeout);
@@ -111,19 +160,15 @@
} else {
// Remove the [count] least loaded services and run the
// command on each, then add them back to the queue.
- // This avoids running the same command twice in the same
- // isolate.
- // We can't assume that the first [count] entries in the
- // heap list are the least loaded.
- var entries = List<_LoadBalancerEntry>.generate(
- count,
- (_) => _queue.removeFirst(),
- growable: false,
- );
for (var i = 0; i < count; i++) {
- var entry = entries[i];
+ _removeFirst();
+ }
+ // The removed entries are stored in `_queue` in positions from
+ // `_length` to `_length + count - 1`.
+ for (var i = 0; i < count; i++) {
+ var entry = _queue[_length];
entry.load += load;
- _queue.add(entry);
+ _addNext();
result[i] =
entry.run(this, load, function, argument, timeout, onTimeout);
}
@@ -131,26 +176,127 @@
return result;
}
- static final _defaultFuture = Future<Never>.error('')..catchError((_) {});
-
@override
Future<void> close() {
var stopFuture = _stopFuture;
if (stopFuture != null) return stopFuture;
- return _stopFuture ??= MultiError.waitUnordered(
- _queue.removeAll().map((e) => e.close()),
+ var queue = _queue;
+ var length = _length;
+ _queue = _emptyQueue;
+ _length = 0;
+ return _stopFuture = MultiError.waitUnordered(
+ [for (var i = 0; i < length; i++) queue[i].close()],
).then(ignore);
}
+
+ /// Place [element] in heap at [index] or above.
+ ///
+ /// Put element into the empty cell at `index`.
+ /// While the `element` has higher priority than the
+ /// parent, swap it with the parent.
+ ///
+ /// Ignores [element]'s initial [_LoadBalancerEntry.queueIndex],
+ /// but sets it to the final position when the element has
+ /// been placed.
+ void _bubbleUp(_LoadBalancerEntry element, int index) {
+ while (index > 0) {
+ var parentIndex = (index - 1) ~/ 2;
+ var parent = _queue[parentIndex];
+ if (element.compareTo(parent) > 0) break;
+ _queue[index] = parent..queueIndex = index;
+ index = parentIndex;
+ }
+ _queue[index] = element..queueIndex = index;
+ }
+
+ /// Place [element] in heap at [index] or above.
+ ///
+ /// Put element into the empty cell at `index`.
+ /// While the `element` has lower priority than either child,
+ /// swap it with the highest priority child.
+ ///
+ /// Ignores [element]'s initial [_LoadBalancerEntry.queueIndex],
+ /// but sets it to the final position when the element has
+ /// been placed.
+ void _bubbleDown(_LoadBalancerEntry element, int index) {
+ while (true) {
+ var childIndex = index * 2 + 1; // Left child index.
+ if (childIndex >= _length) break;
+ var child = _queue[childIndex];
+ var rightChildIndex = childIndex + 1;
+ if (rightChildIndex < _length) {
+ var rightChild = _queue[rightChildIndex];
+ if (rightChild.compareTo(child) < 0) {
+ childIndex = rightChildIndex;
+ child = rightChild;
+ }
+ }
+ if (element.compareTo(child) <= 0) break;
+ _queue[index] = child..queueIndex = index;
+ index = childIndex;
+ }
+ _queue[index] = element..queueIndex = index;
+ }
+
+ /// Removes the first entry from the queue, but doesn't stop its service.
+ ///
+ /// The entry is expected to be either added back to the queue
+ /// immediately or have its stop method called.
+ ///
+ /// After the remove, the entry is stored as `_queue[_length]`.
+ _LoadBalancerEntry _removeFirst() {
+ assert(_length > 0);
+ _LoadBalancerEntry entry = _queue.first;
+ _length--;
+ if (_length > 0) {
+ var replacement = _queue[_length];
+ _queue[_length] = entry..queueIndex = _length;
+ _bubbleDown(replacement, 0);
+ }
+ return entry;
+ }
+
+ /// Adds next unused entry to the queue.
+ ///
+ /// Adds the entry at [_length] to the queue.
+ void _addNext() {
+ assert(_length < _queue.length);
+ var index = _length;
+ var entry = _queue[index];
+ _length = index + 1;
+ _bubbleUp(entry, index);
+ }
+
+ /// Decreases the load of an element which is currently in the queue.
+ ///
+ /// Elements outside the queue can just have their `load` modified directly.
+ void _decreaseLoad(_LoadBalancerEntry entry, int load) {
+ assert(load >= 0);
+ entry.load -= load;
+ var index = entry.queueIndex;
+ // Should always be the case unless the load balancer
+ // has been closed, or events are happening out of their
+ // proper order.
+ if (index < _length) {
+ _bubbleUp(entry, index);
+ }
+ }
}
class _LoadBalancerEntry implements Comparable<_LoadBalancerEntry> {
+ /// The position in the heap queue.
+ ///
+ /// Maintained when entries move around the queue.
+ /// Only needed for [LoadBalancer._decreaseLoad].
+ int queueIndex;
+
// The current load on the isolate.
int load = 0;
- // The service used to send commands to the other isolate.
+ // The service used to execute commands.
Runner runner;
- _LoadBalancerEntry(Runner runner) : runner = runner;
+ _LoadBalancerEntry(this.runner, this.queueIndex);
Future<R> run<R, P>(
LoadBalancer balancer,
@@ -162,8 +308,7 @@
return runner
.run<R, P>(function, argument, timeout: timeout, onTimeout: onTimeout)
.whenComplete(() {
- this.load -= load;
- balancer._queue.add(this);
+ balancer._decreaseLoad(this, load);
});
}
diff --git a/lib/ports.dart b/lib/ports.dart
index 9363cc7..0bc1eda 100644
--- a/lib/ports.dart
+++ b/lib/ports.dart
@@ -28,14 +28,21 @@
///
/// The [callback] function is called once, with the first message
/// received by the receive port.
-/// All further messages are ignored.
+/// All further messages are ignored and the port is closed.
///
/// If [timeout] is supplied, it is used as a limit on how
-/// long it can take before the message is received. If a
-/// message isn't received in time, the `callback` function
-/// is called once with the [timeoutValue] instead.
+/// long it can take before the message is received.
+/// If a message has not been received within the [timeout] duration,
+/// the callback is called with the [timeoutValue] instead, and
+/// the port is closed.
+/// If the message type, [P], does not allow `null` and [timeout] is
+/// non-`null`, then [timeoutValue] must be provided and non-`null`.
///
-/// If the received value is not a [T], it will cause an uncaught
+/// Use [singleCallbackPortWithTimeout] instead of the deprecated
+/// members. That will prevent run-time error arising from calling
+/// this method with a non-nullable [P] type and a null [timeoutValue].
+///
+/// If the received value is not a [P], it will cause an uncaught
/// asynchronous error in the current zone.
///
/// Returns the `SendPort` expecting the single message.
@@ -46,28 +53,58 @@
/// ..first.timeout(duration, () => timeoutValue).then(callback))
/// .sendPort
/// ```
-SendPort singleCallbackPort<P>(void Function(P? response) callback,
- {Duration? timeout, P? timeoutValue}) =>
- singleCallbackPortWithTimeout(
- callback,
- timeoutValue: timeoutValue,
- timeout: timeout,
- );
-
-/// Same as [singleResponseFuture], but without [timeout],
-/// this allows us not to require a nullable value in the [callback]
-SendPort singleCallbackPortWithoutTimeout<P>(
- void Function(P response) callback) {
- return singleCallbackPortWithTimeout<P?>(
- (response) => callback(response as P),
- timeoutValue: null,
- );
+/// when [timeout] is provided.
+SendPort singleCallbackPort<P>(void Function(P response) callback,
+ {@Deprecated("Use singleCallbackPortWithTimeout instead") Duration? timeout,
+ @Deprecated("Use singleCallbackPortWithTimeout instead") P? timeoutValue}) {
+ if (timeout == null) {
+ return _singleCallbackPort<P>(callback);
+ }
+ if (timeoutValue is! P) {
+ throw ArgumentError.value(
+ null, "timeoutValue", "The result type is non-null");
+ }
+ return singleCallbackPortWithTimeout<P>(callback, timeout, timeoutValue);
}
-/// Same as [singleResponseFuture], but with required [timeoutValue],
-/// this allows us not to require a nullable value in the [callback]
-SendPort singleCallbackPortWithTimeout<P>(void Function(P response) callback,
- {Duration? timeout, required P timeoutValue}) {
+/// Helper function for [singleCallbackPort].
+///
+/// Replace [singleCallbackPort] with this
+/// when removing the deprecated parameters.
+SendPort _singleCallbackPort<P>(void Function(P) callback) {
+ var responsePort = RawReceivePort();
+ var zone = Zone.current;
+ callback = zone.registerUnaryCallback(callback);
+ responsePort.handler = (response) {
+ responsePort.close();
+ zone.runUnary(callback, response as P);
+ };
+ return responsePort.sendPort;
+}
+
+/// Create a [SendPort] that accepts only one message.
+///
+/// The [callback] function is called once, with the first message
+/// received by the receive port.
+/// All further messages are ignored and the port is closed.
+///
+/// If a message has not been received within the [timeout] duration,
+/// the callback is called with the [timeoutValue] instead, and
+/// the port is closed.
+///
+/// If the received value is not a [P], it will cause an uncaught
+/// asynchronous error in the current zone.
+///
+/// Returns the `SendPort` expecting the single message.
+///
+/// Equivalent to:
+/// ```dart
+/// (new ReceivePort()
+/// ..first.timeout(duration, () => timeoutValue).then(callback))
+/// .sendPort
+/// ```
+SendPort singleCallbackPortWithTimeout<P>(
+ void Function(P response) callback, Duration timeout, P timeoutValue) {
var responsePort = RawReceivePort();
var zone = Zone.current;
callback = zone.registerUnaryCallback(callback);
@@ -78,12 +115,10 @@
zone.runUnary(callback, response as P);
};
- if (timeout != null) {
- timer = Timer(timeout, () {
- responsePort.close();
- callback(timeoutValue);
- });
- }
+ timer = Timer(timeout, () {
+ responsePort.close();
+ callback(timeoutValue);
+ });
return responsePort.sendPort;
}
@@ -121,7 +156,7 @@
FutureOr<R> Function()? onTimeout,
}) {
if (callback == null && timeout == null) {
- return singleCallbackPortWithoutTimeout<Object>((response) {
+ return _singleCallbackPort<Object>((response) {
_castComplete<R>(completer, response);
});
}
@@ -185,57 +220,85 @@
/// long it can take before the message is received. If a
/// message isn't received in time, the [timeoutValue] used
/// as the returned future's value instead.
+/// If the result type, [R], does not allow `null`, and [timeout] is provided,
+/// then [timeoutValue] must also be non-`null`.
+/// Use [singleResponseFutureWithTimeout] instead of providing
+/// the optional parameters to this function. It prevents getting run-time
+/// errors from providing a [timeout] and no [timeoutValue] with a non-nullable
+/// result type.
///
-/// If you want a timeout on the returned future, it's recommended to
-/// use the [timeout] parameter, and not [Future.timeout] on the result.
-/// The `Future` method won't be able to close the underlying [ReceivePort].
-Future<R?> singleResponseFuture<R>(
+/// If you need a timeout on the operation, it's recommended to specify
+/// a timeout using [singleResponseFutureWithTimeout],
+/// and not use [Future.timeout] on the returned `Future`.
+/// The `Future` method won't be able to close the underlying [ReceivePort],
+/// and will keep waiting for the first response anyway.
+Future<R> singleResponseFuture<R>(
void Function(SendPort responsePort) action, {
- Duration? timeout,
- R? timeoutValue,
-}) =>
- singleResponseFutureWithTimeout(
- action,
- timeout: timeout,
- timeoutValue: timeoutValue,
- );
-
-/// Same as [singleResponseFuture], but without [timeout],
-/// this allows us not to require a nullable return value
-Future<R> singleResponseFutureWithoutTimeout<R>(
- void Function(SendPort responsePort) action) =>
- singleResponseFutureWithTimeout<R?>(action, timeoutValue: null)
- .then((value) => value as R);
-
-/// Same as [singleResponseFuture], but with required [timeoutValue],
-/// this allows us not to require a nullable return value
-Future<R> singleResponseFutureWithTimeout<R>(
- void Function(SendPort responsePort) action, {
- Duration? timeout,
- required R timeoutValue,
+ @Deprecated("Use singleResponseFutureWithTimeout instead") Duration? timeout,
+ @Deprecated("Use singleResponseFutureWithTimeout instead") R? timeoutValue,
}) {
+ if (timeout == null) {
+ return _singleResponseFuture<R>(action);
+ }
+ if (timeoutValue is! R) {
+ throw ArgumentError.value(
+ null, "timeoutValue", "The result type is non-null");
+ }
+ return singleResponseFutureWithTimeout(action, timeout, timeoutValue);
+}
+
+/// Helper function for [singleResponseFuture].
+///
+/// Use this as the implementation of [singleResponseFuture]
+/// when removing the deprecated parameters.
+Future<R> _singleResponseFuture<R>(
+ void Function(SendPort responsePort) action) {
var completer = Completer<R>.sync();
var responsePort = RawReceivePort();
- Timer? timer;
var zone = Zone.current;
responsePort.handler = (response) {
responsePort.close();
- timer?.cancel();
zone.run(() {
_castComplete<R>(completer, response);
});
};
- if (timeout != null) {
- timer = Timer(timeout, () {
- responsePort.close();
- completer.complete(timeoutValue);
- });
- }
try {
action(responsePort.sendPort);
} catch (error, stack) {
responsePort.close();
- timer?.cancel();
+ // Delay completion because completer is sync.
+ scheduleMicrotask(() {
+ completer.completeError(error, stack);
+ });
+ }
+ return completer.future;
+}
+
+/// Same as [singleResponseFuture], but with required [timeoutValue],
+/// this allows us not to require a nullable return value
+Future<R> singleResponseFutureWithTimeout<R>(
+ void Function(SendPort responsePort) action,
+ Duration timeout,
+ R timeoutValue) {
+ var completer = Completer<R>.sync();
+ var responsePort = RawReceivePort();
+ var timer = Timer(timeout, () {
+ responsePort.close();
+ completer.complete(timeoutValue);
+ });
+ var zone = Zone.current;
+ responsePort.handler = (response) {
+ responsePort.close();
+ timer.cancel();
+ zone.run(() {
+ _castComplete<R>(completer, response);
+ });
+ };
+ try {
+ action(responsePort.sendPort);
+ } catch (error, stack) {
+ responsePort.close();
+ timer.cancel();
// Delay completion because completer is sync.
scheduleMicrotask(() {
completer.completeError(error, stack);
@@ -297,7 +360,7 @@
var error = RemoteError(response[0] as String, response[1] as String);
completer.completeError(error, error.stackTrace);
} else {
- final result = response[0] as R;
+ var result = response[0] as R;
completer.complete(result);
}
}
@@ -311,7 +374,7 @@
var error = RemoteError(response[0] as String, response[1] as String);
return Future.error(error, error.stackTrace);
}
- final result = response[0] as R;
+ var result = response[0] as R;
return Future<R>.value(result);
}
@@ -322,7 +385,7 @@
class SingleResponseChannel<R> {
final Zone _zone;
final RawReceivePort _receivePort;
- final Completer<R?> _completer;
+ final Completer<R> _completer;
final FutureOr<R> Function(dynamic)? _callback;
Timer? _timer;
@@ -334,13 +397,18 @@
/// to `callback`, and the result of that is used to complete `result`.
///
/// If [timeout] is provided, the future is completed after that
- /// duration if it hasn't received a value from the port earlier.
+ /// duration if it hasn't received a value from the port earlier,
+ /// with a value determined as follows:
/// If [throwOnTimeout] is true, the the future is completed with a
- /// [TimeoutException] as an error if it times out.
+ /// [TimeoutException] as an error.
/// Otherwise, if [onTimeout] is provided,
/// the future is completed with the result of running `onTimeout()`.
- /// If `onTimeout` is not provided either,
- /// the future is completed with `timeoutValue`, which defaults to `null`.
+ /// If `onTimeout` is also not provided,
+ /// then the future is completed with the provided [timeoutValue],
+ /// which defaults to `null`.
+ /// If the result type, [R], is not nullable, and [timeoutValue]
+ /// is to be used as the result of the future,
+ /// then it must have a non-`null` value.
SingleResponseChannel(
{FutureOr<R> Function(dynamic value)? callback,
Duration? timeout,
@@ -348,11 +416,19 @@
FutureOr<R> Function()? onTimeout,
R? timeoutValue})
: _receivePort = RawReceivePort(),
- _completer = Completer<R?>.sync(),
+ _completer = Completer<R>.sync(),
_callback = callback,
_zone = Zone.current {
_receivePort.handler = _handleResponse;
if (timeout != null) {
+ if (!throwOnTimeout &&
+ onTimeout == null &&
+ timeoutValue == null &&
+ timeoutValue is! R) {
+ _receivePort.close();
+ throw ArgumentError.value(null, "timeoutValue",
+ "The value is needed and the result must not be null");
+ }
_timer = Timer(timeout, () {
// Executed as a timer event.
_receivePort.close();
@@ -363,7 +439,7 @@
} else if (onTimeout != null) {
_completer.complete(Future.sync(onTimeout));
} else {
- _completer.complete(timeoutValue);
+ _completer.complete(timeoutValue as R);
}
}
});
@@ -374,14 +450,19 @@
SendPort get port => _receivePort.sendPort;
/// Future completed by the first value sent to [port].
- Future<R?> get result => _completer.future;
+ Future<R> get result => _completer.future;
/// If the channel hasn't completed yet, interrupt it and complete the result.
///
/// If the channel hasn't received a value yet, or timed out, it is stopped
/// (like by a timeout) and the [SingleResponseChannel.result]
/// is completed with [result].
+ /// If the result type is not nullable, the [result] must not be `null`.
void interrupt([R? result]) {
+ if (result is! R) {
+ throw ArgumentError.value(null, "result",
+ "The value is needed and the result must not be null");
+ }
_receivePort.close();
_cancelTimer();
if (!_completer.isCompleted) {
@@ -418,7 +499,7 @@
// created in a different error zone, an error from the root zone
// would become uncaught.
_zone.run(() {
- _completer.complete(Future<R?>.sync(() => callback(v)));
+ _completer.complete(Future<R>.sync(() => callback(v)));
});
}
}
diff --git a/lib/registry.dart b/lib/registry.dart
index a444f8b..42442a5 100644
--- a/lib/registry.dart
+++ b/lib/registry.dart
@@ -157,10 +157,10 @@
}
var completer = Completer<Capability>();
var port = singleCompletePort(completer,
- callback: (List response) {
+ callback: (List<Object?> response) {
assert(cache.isAdding(element));
- final id = response[0] as int;
- final removeCapability = response[1] as Capability;
+ var id = response[0] as int;
+ var removeCapability = response[1] as Capability;
cache.register(id, element);
return removeCapability;
},
@@ -175,22 +175,24 @@
return completer.future;
}
- /// Remove the element from the registry.
+ /// Removes the [element] from the registry.
///
- /// Returns `true` if removing the element succeeded, or `false` if the
- /// elements wasn't in the registry, or if it couldn't be removed.
+ /// Returns `true` if removing the element succeeded, and `false` if the
+ /// elements either wasn't in the registry, or it couldn't be removed.
///
/// The [removeCapability] must be the same capability returned by [add]
/// when the object was added. If the capability is wrong, the
- /// object is not removed, and this function returns false.
+ /// object is not removed, and this function returns `false`.
Future<bool> remove(T element, Capability removeCapability) {
var id = _cache.id(element);
if (id == null) {
+ // If the element is not in the cache, then it was not a value
+ // that originally came from the registry.
return Future<bool>.value(false);
}
var completer = Completer<bool>();
var port = singleCompletePort(completer, callback: (bool result) {
- _cache.remove(id);
+ if (result) _cache.remove(id);
return result;
}, timeout: _timeout);
_commandPort.send(list4(_removeValue, id, removeCapability, port));
@@ -218,8 +220,8 @@
/// before or not.
///
/// Fails if any of the elements are not in the registry.
- Future<void> removeTags(Iterable<T> elements, Iterable tags) {
- List ids = elements.map(_getId).toList(growable: false);
+ Future<void> removeTags(Iterable<T> elements, Iterable<Object?> tags) {
+ var ids = elements.map(_getId).toList(growable: false);
tags = tags.toList(growable: false);
var completer = Completer<void>();
var port = singleCompletePort(completer, timeout: _timeout);
@@ -227,7 +229,7 @@
return completer.future;
}
- Future<void> _addTags(List<int> ids, Iterable tags) {
+ Future<void> _addTags(List<int> ids, Iterable<Object?> tags) {
tags = tags.toList(growable: false);
var completer = Completer<void>();
var port = singleCompletePort(completer, timeout: _timeout);
@@ -268,8 +270,15 @@
/// Isolate-local cache used by a [Registry].
///
-/// Maps between id-numbers and elements.
-/// An object is considered an element of the registry if it
+/// Maps between id numbers and elements.
+///
+/// Each instance of [Registry] has its own cache,
+/// and only considers elements part of the registry
+/// if they are registered in its cache.
+/// An object becomes registered either when calling
+/// [add] on that particular [Registry] instance,
+/// or when fetched using [lookup] through that
+/// registry instance.
class _RegistryCache {
// Temporary marker until an object gets an id.
static const int _beingAdded = -1;
@@ -419,7 +428,7 @@
replyPort.send(true);
}
- void _addTags(List<int> ids, List tags, SendPort replyPort) {
+ void _addTags(List<int> ids, List<Object?> tags, SendPort replyPort) {
assert(tags.isNotEmpty);
for (var id in ids) {
var entry = _entries[id];
diff --git a/lib/runner.dart b/lib/runner.dart
index 200680c..5d31194 100644
--- a/lib/runner.dart
+++ b/lib/runner.dart
@@ -27,12 +27,11 @@
/// Waits for the result of the call, and completes the returned future
/// with the result, whether it's a value or an error.
///
- /// If the returned future does not complete before `timeLimit` has passed,
+ /// If [timeout] is provided, and the returned future does not complete
+ /// before that duration has passed,
/// the [onTimeout] action is executed instead, and its result (whether it
/// returns or throws) is used as the result of the returned future.
- ///
- /// If `onTimeout` is omitted, a timeout will cause the returned future to
- /// complete with a [TimeoutException].
+ /// If [onTimeout] is omitted, it defaults to throwing a[TimeoutException].
///
/// The default implementation runs the function in the current isolate.
Future<R> run<R, P>(FutureOr<R> Function(P argument) function, P argument,
diff --git a/lib/src/errors.dart b/lib/src/errors.dart
index 3d89734..4c76642 100644
--- a/lib/src/errors.dart
+++ b/lib/src/errors.dart
@@ -2,7 +2,6 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-// TODO(lrn): This should be in package:async?
/// Helper functions for working with errors.
///
/// The [MultiError] class combines multiple errors into one object,
@@ -127,7 +126,7 @@
for (var future in futures) {
var i = count;
count++;
- future.then<Null>((v) {
+ future.then<void>((v) {
if (!hasError) {
results[i] = v;
} else if (cleanUp != null) {
diff --git a/lib/src/priority_queue.dart b/lib/src/priority_queue.dart
deleted file mode 100644
index 8b7d3d0..0000000
--- a/lib/src/priority_queue.dart
+++ /dev/null
@@ -1,499 +0,0 @@
-// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
-// for details. All rights reserved. Use of this source code is governed by a
-// BSD-style license that can be found in the LICENSE file.
-
-import 'dart:collection';
-
-import 'util.dart';
-
-/// A priority queue is a priority based work-list of elements.
-///
-/// The queue allows adding elements, and removing them again in priority order.
-/// The same object can be added to the queue more than once.
-/// There is no specified ordering for objects with the same priority
-/// (where the `comparison` function returns zero).
-///
-/// Operations which care about object equality, [contains] and [remove],
-/// use [Object.==] for testing equality.
-/// In most situations this will be the same as identity ([identical]),
-/// but there are types, like [String], where users can reasonably expect
-/// distinct objects to represent the same value.
-/// If elements override [Object.==], the `comparison` function must
-/// always give equal objects the same priority,
-/// otherwise [contains] or [remove] might not work correctly.
-abstract class PriorityQueue<E> {
- /// Creates an empty [PriorityQueue].
- ///
- /// The created [PriorityQueue] is a plain [HeapPriorityQueue].
- ///
- /// The [comparison] is a [Comparator] used to compare the priority of
- /// elements. An element that compares as less than another element has
- /// a higher priority.
- ///
- /// If [comparison] is omitted, it defaults to [Comparable.compare]. If this
- /// is the case, `E` must implement [Comparable], and this is checked at
- /// runtime for every comparison.
- factory PriorityQueue([int Function(E, E)? comparison]) =
- HeapPriorityQueue<E>;
-
- /// Number of elements in the queue.
- int get length;
-
- /// Whether the queue is empty.
- bool get isEmpty;
-
- /// Whether the queue has any elements.
- bool get isNotEmpty;
-
- /// Checks if [object] is in the queue.
- ///
- /// Returns true if the element is found.
- ///
- /// Uses the [Object.==] of elements in the queue to check
- /// for whether they are equal to [object].
- /// Equal objects objects must have the same priority
- /// according to the [comparison] function.
- /// That is, if `a == b` then `comparison(a, b) == 0`.
- /// If that is not the case, this check might fail to find
- /// an object.
- bool contains(E object);
-
- /// Provides efficient access to all the elements curently in the queue.
- ///
- /// The operation should be performed without copying or moving
- /// the elements, if at all possible.
- ///
- /// The elements are iterated in no particular order.
- /// The order is stable as long as the queue is not modified.
- /// The queue must not be modified during an iteration.
- Iterable<E> get unorderedElements;
-
- /// Adds element to the queue.
- ///
- /// The element will become the next to be removed by [removeFirst]
- /// when all elements with higher priority have been removed.
- void add(E element);
-
- /// Adds all [elements] to the queue.
- void addAll(Iterable<E> elements);
-
- /// Returns the next element that will be returned by [removeFirst].
- ///
- /// The element is not removed from the queue.
- ///
- /// The queue must not be empty when this method is called.
- E get first;
-
- /// Removes and returns the element with the highest priority.
- ///
- /// Repeatedly calling this method, without adding element in between,
- /// is guaranteed to return elements in non-decreasing order as, specified by
- /// [comparison].
- ///
- /// The queue must not be empty when this method is called.
- E removeFirst();
-
- /// Removes an element of the queue that compares equal to [element].
- ///
- /// Returns true if an element is found and removed,
- /// and false if no equal element is found.
- ///
- /// If the queue contains more than one object equal to [element],
- /// only one of them is removed.
- ///
- /// Uses the [Object.==] of elements in the queue to check
- /// for whether they are equal to [element].
- /// Equal objects objects must have the same priority
- /// according to the [comparison] function.
- /// That is, if `a == b` then `comparison(a, b) == 0`.
- /// If that is not the case, this check might fail to find
- /// an object.
- bool remove(E element);
-
- /// Removes all the elements from this queue and returns them.
- ///
- /// The returned iterable has no specified order.
- Iterable<E> removeAll();
-
- /// Removes all the elements from this queue.
- void clear();
-
- /// Returns a list of the elements of this queue in priority order.
- ///
- /// The queue is not modified.
- ///
- /// The order is the order that the elements would be in if they were
- /// removed from this queue using [removeFirst].
- List<E> toList();
-
- /// Returns a list of the elements of this queue in no specific order.
- ///
- /// The queue is not modified.
- ///
- /// The order of the elements is implementation specific.
- /// The order may differ between different calls on the same queue.
- List<E> toUnorderedList();
-
- /// Return a comparator based set using the comparator of this queue.
- ///
- /// The queue is not modified.
- ///
- /// The returned [Set] is currently a [SplayTreeSet],
- /// but this may change as other ordered sets are implemented.
- ///
- /// The set contains all the elements of this queue.
- /// If an element occurs more than once in the queue,
- /// the set will contain it only once.
- Set<E> toSet();
-}
-
-/// Heap based priority queue.
-///
-/// The elements are kept in a heap structure,
-/// where the element with the highest priority is immediately accessible,
-/// and modifying a single element takes
-/// logarithmic time in the number of elements on average.
-///
-/// * The [add] and [removeFirst] operations take amortized logarithmic time,
-/// O(log(n)), but may occasionally take linear time when growing the capacity
-/// of the heap.
-/// * The [addAll] operation works as doing repeated [add] operations.
-/// * The [first] getter takes constant time, O(1).
-/// * The [clear] and [removeAll] methods also take constant time, O(1).
-/// * The [contains] and [remove] operations may need to search the entire
-/// queue for the elements, taking O(n) time.
-/// * The [toList] operation effectively sorts the elements, taking O(n*log(n))
-/// time.
-/// * The [toUnorderedList] operation copies, but does not sort, the elements,
-/// and is linear, O(n).
-/// * The [toSet] operation effectively adds each element to the new set, taking
-/// an expected O(n*log(n)) time.
-class HeapPriorityQueue<E> implements PriorityQueue<E> {
- /// Initial capacity of a queue when created, or when added to after a
- /// [clear].
- ///
- /// Number can be any positive value. Picking a size that gives a whole
- /// number of "tree levels" in the heap is only done for aesthetic reasons.
- static const int _initialCapacity = 7;
-
- /// The comparison being used to compare the priority of elements.
- final Comparator<E> comparison;
-
- /// List implementation of a heap.
- List<E?> _queue = List<E?>.filled(_initialCapacity, null);
-
- /// Number of elements in queue.
- ///
- /// The heap is implemented in the first [_length] entries of [_queue].
- int _length = 0;
-
- /// Modification count.
- ///
- /// Used to detect concurrent modifications during iteration.
- int _modificationCount = 0;
-
- /// Create a new priority queue.
- ///
- /// The [comparison] is a [Comparator] used to compare the priority of
- /// elements. An element that compares as less than another element has
- /// a higher priority.
- ///
- /// If [comparison] is omitted, it defaults to [Comparable.compare]. If this
- /// is the case, `E` must implement [Comparable], and this is checked at
- /// runtime for every comparison.
- HeapPriorityQueue([int Function(E, E)? comparison])
- : comparison = comparison ?? defaultCompare;
-
- E _elementAt(int index) => _queue[index] ?? (null as E);
-
- @override
- void add(E element) {
- _modificationCount++;
- _add(element);
- }
-
- @override
- void addAll(Iterable<E> elements) {
- var modified = 0;
- for (var element in elements) {
- modified = 1;
- _add(element);
- }
- _modificationCount += modified;
- }
-
- @override
- void clear() {
- _modificationCount++;
- _queue = const [];
- _length = 0;
- }
-
- @override
- bool contains(E object) => _locate(object) >= 0;
-
- /// Provides efficient access to all the elements curently in the queue.
- ///
- /// The operation is performed in the order they occur
- /// in the underlying heap structure.
- ///
- /// The order is stable as long as the queue is not modified.
- /// The queue must not be modified during an iteration.
- @override
- Iterable<E> get unorderedElements => _UnorderedElementsIterable<E>(this);
-
- @override
- E get first {
- if (_length == 0) throw StateError('No element');
- return _elementAt(0);
- }
-
- @override
- bool get isEmpty => _length == 0;
-
- @override
- bool get isNotEmpty => _length != 0;
-
- @override
- int get length => _length;
-
- @override
- bool remove(E element) {
- var index = _locate(element);
- if (index < 0) return false;
- _modificationCount++;
- var last = _removeLast();
- if (index < _length) {
- var comp = comparison(last, element);
- if (comp <= 0) {
- _bubbleUp(last, index);
- } else {
- _bubbleDown(last, index);
- }
- }
- return true;
- }
-
- /// Removes all the elements from this queue and returns them.
- ///
- /// The returned iterable has no specified order.
- /// The operation does not copy the elements,
- /// but instead keeps them in the existing heap structure,
- /// and iterates over that directly.
- @override
- Iterable<E> removeAll() {
- _modificationCount++;
- var result = _queue;
- var length = _length;
- _queue = const [];
- _length = 0;
- return result.take(length).cast();
- }
-
- @override
- E removeFirst() {
- if (_length == 0) throw StateError('No element');
- _modificationCount++;
- var result = _elementAt(0);
- var last = _removeLast();
- if (_length > 0) {
- _bubbleDown(last, 0);
- }
- return result;
- }
-
- @override
- List<E> toList() => _toUnorderedList()..sort(comparison);
-
- @override
- Set<E> toSet() {
- var set = SplayTreeSet<E>(comparison);
- for (var i = 0; i < _length; i++) {
- set.add(_elementAt(i));
- }
- return set;
- }
-
- @override
- List<E> toUnorderedList() => _toUnorderedList();
-
- List<E> _toUnorderedList() =>
- [for (var i = 0; i < _length; i++) _elementAt(i)];
-
- /// Returns some representation of the queue.
- ///
- /// The format isn't significant, and may change in the future.
- @override
- String toString() {
- return _queue.take(_length).toString();
- }
-
- /// Add element to the queue.
- ///
- /// Grows the capacity if the backing list is full.
- void _add(E element) {
- if (_length == _queue.length) _grow();
- _bubbleUp(element, _length++);
- }
-
- /// Find the index of an object in the heap.
- ///
- /// Returns -1 if the object is not found.
- ///
- /// A matching object, `o`, must satisfy that
- /// `comparison(o, object) == 0 && o == object`.
- int _locate(E object) {
- if (_length == 0) return -1;
- // Count positions from one instead of zero. This gives the numbers
- // some nice properties. For example, all right children are odd,
- // their left sibling is even, and the parent is found by shifting
- // right by one.
- // Valid range for position is [1.._length], inclusive.
- var position = 1;
- // Pre-order depth first search, omit child nodes if the current
- // node has lower priority than [object], because all nodes lower
- // in the heap will also have lower priority.
- do {
- var index = position - 1;
- var element = _elementAt(index);
- var comp = comparison(element, object);
- if (comp <= 0) {
- if (comp == 0 && element == object) return index;
- // Element may be in subtree.
- // Continue with the left child, if it is there.
- var leftChildPosition = position * 2;
- if (leftChildPosition <= _length) {
- position = leftChildPosition;
- continue;
- }
- }
- // Find the next right sibling or right ancestor sibling.
- do {
- while (position.isOdd) {
- // While position is a right child, go to the parent.
- position >>= 1;
- }
- // Then go to the right sibling of the left-child.
- position += 1;
- } while (position > _length); // Happens if last element is a left child.
- } while (position != 1); // At root again. Happens for right-most element.
- return -1;
- }
-
- E _removeLast() {
- var newLength = _length - 1;
- var last = _elementAt(newLength);
- _queue[newLength] = null;
- _length = newLength;
- return last;
- }
-
- /// Place [element] in heap at [index] or above.
- ///
- /// Put element into the empty cell at `index`.
- /// While the `element` has higher priority than the
- /// parent, swap it with the parent.
- void _bubbleUp(E element, int index) {
- while (index > 0) {
- var parentIndex = (index - 1) ~/ 2;
- var parent = _elementAt(parentIndex);
- if (comparison(element, parent) > 0) break;
- _queue[index] = parent;
- index = parentIndex;
- }
- _queue[index] = element;
- }
-
- /// Place [element] in heap at [index] or above.
- ///
- /// Put element into the empty cell at `index`.
- /// While the `element` has lower priority than either child,
- /// swap it with the highest priority child.
- void _bubbleDown(E element, int index) {
- var rightChildIndex = index * 2 + 2;
- while (rightChildIndex < _length) {
- var leftChildIndex = rightChildIndex - 1;
- var leftChild = _elementAt(leftChildIndex);
- var rightChild = _elementAt(rightChildIndex);
- var comp = comparison(leftChild, rightChild);
- int minChildIndex;
- E minChild;
- if (comp < 0) {
- minChild = leftChild;
- minChildIndex = leftChildIndex;
- } else {
- minChild = rightChild;
- minChildIndex = rightChildIndex;
- }
- comp = comparison(element, minChild);
- if (comp <= 0) {
- _queue[index] = element;
- return;
- }
- _queue[index] = minChild;
- index = minChildIndex;
- rightChildIndex = index * 2 + 2;
- }
- var leftChildIndex = rightChildIndex - 1;
- if (leftChildIndex < _length) {
- var child = _elementAt(leftChildIndex);
- var comp = comparison(element, child);
- if (comp > 0) {
- _queue[index] = child;
- index = leftChildIndex;
- }
- }
- _queue[index] = element;
- }
-
- /// Grows the capacity of the list holding the heap.
- ///
- /// Called when the list is full.
- void _grow() {
- var newCapacity = _queue.length * 2 + 1;
- if (newCapacity < _initialCapacity) newCapacity = _initialCapacity;
- var newQueue = List<E?>.filled(newCapacity, null);
- newQueue.setRange(0, _length, _queue);
- _queue = newQueue;
- }
-}
-
-/// Implementation of [HeapPriorityQueue.unorderedElements].
-class _UnorderedElementsIterable<E> extends Iterable<E> {
- final HeapPriorityQueue<E> _queue;
-
- _UnorderedElementsIterable(this._queue);
-
- @override
- Iterator<E> get iterator => _UnorderedElementsIterator<E>(_queue);
-}
-
-class _UnorderedElementsIterator<E> implements Iterator<E> {
- final HeapPriorityQueue<E> _queue;
- final int _initialModificationCount;
- E? _current;
- int _index = -1;
-
- _UnorderedElementsIterator(this._queue)
- : _initialModificationCount = _queue._modificationCount;
-
- @override
- bool moveNext() {
- if (_initialModificationCount != _queue._modificationCount) {
- throw ConcurrentModificationError(_queue);
- }
- var nextIndex = _index + 1;
- if (0 <= nextIndex && nextIndex < _queue.length) {
- _current = _queue._queue[nextIndex];
- _index = nextIndex;
- return true;
- }
- _current = null;
- _index = -2;
- return false;
- }
-
- @override
- E get current =>
- _index < 0 ? throw StateError('No element') : (_current ?? null as E);
-}
diff --git a/lib/src/util.dart b/lib/src/util.dart
index ec7e190..cc4ba25 100644
--- a/lib/src/util.dart
+++ b/lib/src/util.dart
@@ -21,27 +21,26 @@
void ignore(_) {}
/// Create a single-element fixed-length list.
-List<Object?> list1(Object? v1) => List.filled(1, v1);
+List<T> list1<T extends Object?>(T v1) => List<T>.filled(1, v1);
/// Create a two-element fixed-length list.
-List<Object?> list2(Object? v1, Object? v2) => List.filled(2, v1)..[1] = v2;
+List<T> list2<T extends Object?>(T v1, T v2) => List<T>.filled(2, v1)..[1] = v2;
/// Create a three-element fixed-length list.
-List<Object?> list3(Object? v1, Object? v2, Object? v3) => List.filled(3, v1)
+List<T> list3<T extends Object?>(T v1, T v2, T v3) => List<T>.filled(3, v1)
..[1] = v2
..[2] = v3;
/// Create a four-element fixed-length list.
-List<Object?> list4(Object? v1, Object? v2, Object? v3, Object? v4) =>
- List.filled(4, v1)
+List<T> list4<T extends Object?>(T v1, T v2, T v3, T v4) =>
+ List<T>.filled(4, v1)
..[1] = v2
..[2] = v3
..[3] = v4;
/// Create a five-element fixed-length list.
-List<Object?> list5(
- Object? v1, Object? v2, Object? v3, Object? v4, Object? v5) =>
- List.filled(5, v1)
+List<T> list5<T extends Object?>(T v1, T v2, T v3, T v4, T v5) =>
+ List<T>.filled(5, v1)
..[1] = v2
..[2] = v3
..[3] = v4
diff --git a/pubspec.yaml b/pubspec.yaml
index b9c500f..a1ca11f 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: isolate
-version: 2.0.3
+version: 2.1.0
author: Dart Team <misc@dartlang.org>
description: >-
Utility functions and classes related to the 'dart:isolate' library.
@@ -9,5 +9,5 @@
sdk: '>=2.12.0 <3.0.0'
dev_dependencies:
- pedantic: ^1.0.0
+ lints: ^1.0.0
test: ^1.0.0
diff --git a/test/isolaterunner_test.dart b/test/isolaterunner_test.dart
index c7ba387..a292eb5 100644
--- a/test/isolaterunner_test.dart
+++ b/test/isolaterunner_test.dart
@@ -2,9 +2,6 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-library isolate.test.isolaterunner_test;
-
-import 'dart:async' show Future;
import 'dart:isolate' show Capability;
import 'package:isolate/isolate_runner.dart';
@@ -108,7 +105,7 @@
dynamic id(x) => x;
-var _global;
+Object? _global;
dynamic getGlobal(_) => _global;
diff --git a/test/load_balancer_test.dart b/test/load_balancer_test.dart
new file mode 100644
index 0000000..4b5a3d8
--- /dev/null
+++ b/test/load_balancer_test.dart
@@ -0,0 +1,158 @@
+// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:isolate/isolate.dart';
+import 'package:test/test.dart';
+
+void main() {
+ test('Simple use', () async {
+ // The simplest possible case. Should not throw anywhere.
+ // Load balancer with at least one isolate-runner.
+ var lb = await LoadBalancer.create(1, () => IsolateRunner.spawn());
+ // Run at least one task.
+ await lb.run(runTask, null);
+ // Close it.
+ await lb.close();
+ });
+
+ /// Run multiple tasks one at a time.
+ test("Run multiple indivudual jobs", () async {
+ var lb = await createIsolateRunners(4);
+ List<List<int>> results =
+ await Future.wait([for (var i = 0; i < 10; i++) lb.run(getId, i)]);
+ // All request numbers should be accounted for.
+ var first = {for (var result in results) result.first};
+ expect(first, {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+ // All isolate runners should have been used.
+ var last = {for (var result in results) result.last};
+ expect(last, {1, 2, 3, 4});
+ await lb.close();
+ });
+ test("Run multiple - zero", () async {
+ var lb = createLocalRunners(4);
+ expect(() => lb.runMultiple(0, runTask, 0), throwsArgumentError);
+ });
+ test("Run multiple - more than count", () async {
+ var lb = createLocalRunners(4);
+ expect(() => lb.runMultiple(5, runTask, 0), throwsArgumentError);
+ });
+ test("Run multiple - 1", () async {
+ var lb = createLocalRunners(4);
+ var results = await Future.wait(lb.runMultiple(1, LocalRunner.getId, 0));
+ expect(results, hasLength(1));
+ });
+ test("Run multiple - more", () async {
+ var lb = createLocalRunners(4);
+ var results = await Future.wait(lb.runMultiple(2, LocalRunner.getId, 0));
+ expect(results, hasLength(2));
+ // Different runners.
+ expect({for (var result in results) result.last}, hasLength(2));
+ });
+ test("Run multiple - all", () async {
+ var lb = createLocalRunners(4);
+ var results = await Future.wait(lb.runMultiple(4, LocalRunner.getId, 0));
+ expect(results, hasLength(4));
+ // Different runners.
+ expect({for (var result in results) result.last}, hasLength(4));
+ });
+ test("Always lowest load runner", () async {
+ var lb = createLocalRunners(4);
+ // Run tasks with input numbers cooresponding to the other tasks they
+ // expect to share runner with.
+ // Loads: 0, 0, 0, 0.
+ var r1 = lb.run(LocalRunner.getId, 0, load: 100);
+ // Loads: 0, 0, 0, 100(1).
+ var r2 = lb.run(LocalRunner.getId, 1, load: 50);
+ // Loads: 0, 0, 50(2), 100(1).
+ var r3 = lb.run(LocalRunner.getId, 2, load: 25);
+ // Loads: 0, 25(3), 50(2), 100(1).
+ var r4 = lb.run(LocalRunner.getId, 3, load: 10);
+ // Loads: 10(4), 25(3), 50(2), 100(1).
+ var r5 = lb.run(LocalRunner.getId, 3, load: 10);
+ // Loads: 20(4, 4), 25(3), 50(2), 100(1).
+ var r6 = lb.run(LocalRunner.getId, 3, load: 90);
+ // Loads: 25(3), 50(2), 100(1), 110(4, 4, 4).
+ var r7 = lb.run(LocalRunner.getId, 2, load: 90);
+ // Loads: 50(2), 100(1), 110(4, 4, 4), 115(3, 3).
+ var r8 = lb.run(LocalRunner.getId, 1, load: 64);
+ // Loads: 100(1), 110(4, 4, 4), 114(2, 2), 115(3, 3).
+ var r9 = lb.run(LocalRunner.getId, 0, load: 100);
+ // Loads: 110(4, 4, 4), 114(2, 2), 115(3, 3), 200(1, 1).
+ var results = await Future.wait([r1, r2, r3, r4, r5, r6, r7, r8, r9]);
+
+ // Check that tasks with the same input numbers ran on the same runner.
+ var runnerIds = [-1, -1, -1, -1];
+ for (var result in results) {
+ var expectedRunner = result.first;
+ var actualRunnerId = result.last;
+ var seenId = runnerIds[expectedRunner];
+ if (seenId == -1) {
+ runnerIds[expectedRunner] = actualRunnerId;
+ } else if (seenId != actualRunnerId) {
+ fail("Task did not run on lowest loaded runner\n$result");
+ }
+ }
+ // Check that all four runners were used.
+ var uniqueRunnerIds = {...runnerIds};
+ expect(uniqueRunnerIds, {1, 2, 3, 4});
+ await lb.close();
+ });
+}
+
+void runTask(_) {
+ // Trivial task.
+}
+
+// An isolate local ID.
+int localId = -1;
+void setId(int value) {
+ localId = value;
+}
+
+// Request a response including input and local ID.
+// Allows detecting which isolate the request is run on.
+List<int> getId(int input) => [input, localId];
+
+/// Creates isolate runners with `localId` in the range 1..count.
+Future<LoadBalancer> createIsolateRunners(int count) async {
+ var runners = <Runner>[];
+ for (var i = 1; i <= count; i++) {
+ var runner = await IsolateRunner.spawn();
+ await runner.run(setId, i);
+ runners.add(runner);
+ }
+ return LoadBalancer(runners);
+}
+
+LoadBalancer createLocalRunners(int count) =>
+ LoadBalancer([for (var i = 1; i <= count; i++) LocalRunner(i)]);
+
+class LocalRunner implements Runner {
+ final int id;
+ LocalRunner(this.id);
+
+ static Future<List<int>> getId(int input) async =>
+ [input, Zone.current[#runner].id as int];
+
+ @override
+ Future<void> close() async {}
+
+ @override
+ Future<R> run<R, P>(FutureOr<R> Function(P argument) function, P argument,
+ {Duration? timeout, FutureOr<R> Function()? onTimeout}) {
+ return runZoned(() {
+ var result = Future.sync(() => function(argument));
+ if (timeout != null) {
+ result = result.timeout(timeout, onTimeout: onTimeout ?? _throwTimeout);
+ }
+ return result;
+ }, zoneValues: {#runner: this});
+ }
+
+ static Never _throwTimeout() {
+ throw TimeoutException("timeout");
+ }
+}
diff --git a/test/ports_test.dart b/test/ports_test.dart
index 0742933..9032c76 100644
--- a/test/ports_test.dart
+++ b/test/ports_test.dart
@@ -2,8 +2,6 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-library isolate.test.ports_test;
-
import 'dart:async';
import 'dart:isolate';
@@ -25,25 +23,25 @@
var completer = Completer.sync();
var p = singleCallbackPort(completer.complete);
p.send(42);
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
expect(v, 42);
});
});
- test('ValueWithoutTimeout non-nullable', () {
+ test('Value without timeout non-nullable', () {
var completer = Completer<int>.sync();
- var p = singleCallbackPortWithoutTimeout(completer.complete);
+ var p = singleCallbackPort(completer.complete);
p.send(42);
- return completer.future.then<Null>((int v) {
+ return completer.future.then<void>((int v) {
expect(v, 42);
});
});
- test('ValueWithoutTimeout nullable', () {
+ test('Value without timeout nullable', () {
var completer = Completer<int?>.sync();
- var p = singleCallbackPortWithoutTimeout(completer.complete);
+ var p = singleCallbackPort(completer.complete);
p.send(null);
- return completer.future.then<Null>((int? v) {
+ return completer.future.then<void>((int? v) {
expect(v, null);
});
});
@@ -53,7 +51,7 @@
var p = singleCallbackPort(completer.complete);
p.send(42);
p.send(37);
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
expect(v, 42);
});
});
@@ -62,7 +60,7 @@
var completer = Completer.sync();
var p = singleCallbackPort(completer.complete, timeout: _ms * 500);
p.send(42);
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
expect(v, 42);
});
});
@@ -71,7 +69,7 @@
var completer = Completer.sync();
singleCallbackPort(completer.complete,
timeout: _ms * 100, timeoutValue: 37);
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
expect(v, 37);
});
});
@@ -81,7 +79,7 @@
var p = singleCallbackPort(completer.complete,
timeout: _ms * 100, timeoutValue: 37);
Timer(_ms * 500, () => p.send(42));
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
expect(v, 37);
});
});
@@ -92,7 +90,7 @@
var p = singleCallbackPort(completer.complete,
timeout: _ms * 100, timeoutValue: null);
Timer(_ms * 500, () => p.send(42));
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
expect(v, null);
});
});
@@ -100,10 +98,9 @@
/// invalid null is a compile time error
test('TimeoutFirstWithTimeout with valid null', () {
var completer = Completer.sync();
- var p = singleCallbackPortWithTimeout(completer.complete,
- timeout: _ms * 100, timeoutValue: null);
+ var p = singleCallbackPortWithTimeout(completer.complete, _ms * 100, null);
Timer(_ms * 500, () => p.send(42));
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
expect(v, null);
});
});
@@ -114,7 +111,7 @@
var completer = Completer.sync();
var p = singleCompletePort(completer);
p.send(42);
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
expect(v, 42);
});
});
@@ -126,7 +123,7 @@
return 87;
});
p.send(42);
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
expect(v, 87);
});
});
@@ -138,7 +135,7 @@
return Future.delayed(_ms * 500, () => 88);
});
p.send(42);
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
expect(v, 88);
});
});
@@ -150,7 +147,7 @@
throw 89;
});
p.send(42);
- return completer.future.then<Null>((v) async {
+ return completer.future.then<void>((v) async {
fail('unreachable');
}, onError: (e, s) {
expect(e, 89);
@@ -164,7 +161,7 @@
return Future.error(90);
});
p.send(42);
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((_) {
fail('unreachable');
}, onError: (e, s) {
expect(e, 90);
@@ -176,7 +173,7 @@
var p = singleCompletePort(completer);
p.send(42);
p.send(37);
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
expect(v, 42);
});
});
@@ -189,7 +186,7 @@
});
p.send(42);
p.send(37);
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
expect(v, 87);
});
});
@@ -198,7 +195,7 @@
var completer = Completer.sync();
var p = singleCompletePort(completer, timeout: _ms * 500);
p.send(42);
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
expect(v, 42);
});
});
@@ -206,7 +203,7 @@
test('Timeout', () {
var completer = Completer.sync();
singleCompletePort(completer, timeout: _ms * 100);
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
fail('unreachable');
}, onError: (e, s) {
expect(e is TimeoutException, isTrue);
@@ -216,7 +213,7 @@
test('TimeoutCallback', () {
var completer = Completer.sync();
singleCompletePort(completer, timeout: _ms * 100, onTimeout: () => 87);
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
expect(v, 87);
});
});
@@ -225,7 +222,7 @@
var completer = Completer.sync();
singleCompletePort(completer,
timeout: _ms * 100, onTimeout: () => throw 91);
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
fail('unreachable');
}, onError: (e, s) {
expect(e, 91);
@@ -236,7 +233,7 @@
var completer = Completer.sync();
singleCompletePort(completer,
timeout: _ms * 100, onTimeout: () => Future.value(87));
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
expect(v, 87);
});
});
@@ -245,7 +242,7 @@
var completer = Completer.sync();
singleCompletePort(completer,
timeout: _ms * 100, onTimeout: () => Future.error(92));
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
fail('unreachable');
}, onError: (e, s) {
expect(e, 92);
@@ -257,7 +254,7 @@
singleCompletePort(completer,
timeout: _ms * 100,
onTimeout: () => Future.delayed(_ms * 500, () => 87));
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
expect(v, 87);
});
});
@@ -267,7 +264,7 @@
singleCompletePort(completer,
timeout: _ms * 100,
onTimeout: () => Future.delayed(_ms * 500, () => throw 87));
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
fail('unreachable');
}, onError: (e, s) {
expect(e, 87);
@@ -279,7 +276,7 @@
var p =
singleCompletePort(completer, timeout: _ms * 100, onTimeout: () => 37);
Timer(_ms * 500, () => p.send(42));
- return completer.future.then<Null>((v) {
+ return completer.future.then<void>((v) {
expect(v, 37);
});
});
@@ -311,29 +308,29 @@
test('FutureValue', () {
return singleResponseFuture((SendPort p) {
p.send(42);
- }).then<Null>((v) {
+ }).then<void>((v) {
expect(v, 42);
});
});
- test('FutureValueWithoutTimeout', () {
- return singleResponseFutureWithoutTimeout<int>((SendPort p) {
+ test('FutureValue without timeout', () {
+ return singleResponseFuture<int>((SendPort p) {
p.send(42);
- }).then<Null>((v) {
+ }).then<void>((v) {
expect(v, 42);
});
});
- test('FutureValueWithoutTimeout valid null', () {
- return singleResponseFutureWithoutTimeout<int?>((SendPort p) {
+ test('FutureValue without timeout valid null', () {
+ return singleResponseFuture<int?>((SendPort p) {
p.send(null);
- }).then<Null>((v) {
+ }).then<void>((v) {
expect(v, null);
});
});
- test('FutureValueWithoutTimeout invalid null', () {
- return expectLater(singleResponseFutureWithoutTimeout<int>((SendPort p) {
+ test('FutureValue without timeout invalid null', () {
+ return expectLater(singleResponseFuture<int>((SendPort p) {
p.send(null);
}), throwsA(isA<TypeError>()));
});
@@ -342,7 +339,7 @@
return singleResponseFuture((SendPort p) {
p.send(42);
p.send(37);
- }).then<Null>((v) {
+ }).then<void>((v) {
expect(v, 42);
});
});
@@ -350,7 +347,7 @@
test('FutureError', () {
return singleResponseFuture((SendPort p) {
throw 93;
- }).then<Null>((v) {
+ }).then<void>((v) {
fail('unreachable');
}, onError: (e, s) {
expect(e, 93);
@@ -361,7 +358,7 @@
return singleResponseFuture((SendPort p) {
// no-op.
}, timeout: _ms * 100)
- .then<Null>((v) {
+ .then<void>((v) {
expect(v, null);
});
});
@@ -370,25 +367,25 @@
return singleResponseFuture((SendPort p) {
// no-op.
}, timeout: _ms * 100, timeoutValue: 42)
- .then<Null>((int? v) {
+ .then<void>((int? v) {
expect(v, 42);
});
});
test('FutureTimeoutValue with valid null timeoutValue', () {
- return singleResponseFutureWithTimeout((SendPort p) {
+ return singleResponseFuture<int?>((SendPort p) {
// no-op.
}, timeout: _ms * 100, timeoutValue: null)
- .then<Null>((int? v) {
+ .then<void>((int? v) {
expect(v, null);
});
});
test('FutureTimeoutValue with non-null timeoutValue', () {
- return singleResponseFutureWithTimeout((SendPort p) {
+ return singleResponseFuture<int>((SendPort p) {
// no-op.
}, timeout: _ms * 100, timeoutValue: 42)
- .then<Null>((int v) {
+ .then<void>((int v) {
expect(v, 42);
});
});
@@ -398,7 +395,7 @@
test('Value', () {
return singleResultFuture((SendPort p) {
sendFutureResult(Future.value(42), p);
- }).then<Null>((v) {
+ }).then<void>((v) {
expect(v, 42);
});
});
@@ -407,7 +404,7 @@
return singleResultFuture((SendPort p) {
sendFutureResult(Future.value(42), p);
sendFutureResult(Future.value(37), p);
- }).then<Null>((v) {
+ }).then<void>((v) {
expect(v, 42);
});
});
@@ -415,7 +412,7 @@
test('Error', () {
return singleResultFuture((SendPort p) {
sendFutureResult(Future.error(94), p);
- }).then<Null>((v) {
+ }).then<void>((v) {
fail('unreachable');
}, onError: (e, s) {
expect(e is RemoteError, isTrue);
@@ -426,7 +423,7 @@
return singleResultFuture((SendPort p) {
sendFutureResult(Future.error(95), p);
sendFutureResult(Future.error(96), p);
- }).then<Null>((v) {
+ }).then<void>((v) {
fail('unreachable');
}, onError: (e, s) {
expect(e is RemoteError, isTrue);
@@ -436,7 +433,7 @@
test('Error', () {
return singleResultFuture((SendPort p) {
throw 93;
- }).then<Null>((v) {
+ }).then<void>((v) {
fail('unreachable');
}, onError: (e, s) {
expect(e is RemoteError, isTrue);
@@ -447,7 +444,7 @@
return singleResultFuture((SendPort p) {
// no-op.
}, timeout: _ms * 100)
- .then<Null>((v) {
+ .then<void>((v) {
fail('unreachable');
}, onError: (e, s) {
expect(e is TimeoutException, isTrue);
@@ -457,15 +454,14 @@
test('TimeoutValue', () {
return singleResultFuture((SendPort p) {
// no-op.
- }, timeout: _ms * 100, onTimeout: () => 42).then<Null>((v) {
+ }, timeout: _ms * 100, onTimeout: () => 42).then<void>((v) {
expect(v, 42);
});
});
test('TimeoutError', () {
- return singleResultFuture((SendPort p) {
- return null;
- }, timeout: _ms * 100, onTimeout: () => throw 97).then<Null>((v) {
+ return singleResultFuture((SendPort p) {},
+ timeout: _ms * 100, onTimeout: () => throw 97).then<void>((v) {
expect(v, 42);
}, onError: (e, s) {
expect(e, 97);
@@ -477,7 +473,7 @@
test('Value', () {
final channel = SingleResponseChannel();
channel.port.send(42);
- return channel.result.then<Null>((v) {
+ return channel.result.then<void>((v) {
expect(v, 42);
});
});
@@ -486,7 +482,7 @@
final channel = SingleResponseChannel();
channel.port.send(42);
channel.port.send(37);
- return channel.result.then<Null>((v) {
+ return channel.result.then<void>((v) {
expect(v, 42);
});
});
@@ -494,7 +490,7 @@
test('ValueCallback', () {
final channel = SingleResponseChannel(callback: ((v) => 2 * (v as num)));
channel.port.send(42);
- return channel.result.then<Null>((v) {
+ return channel.result.then<void>((v) {
expect(v, 84);
});
});
@@ -502,7 +498,7 @@
test('ErrorCallback', () {
final channel = SingleResponseChannel(callback: ((v) => throw 42));
channel.port.send(37);
- return channel.result.then<Null>((v) {
+ return channel.result.then<void>((v) {
fail('unreachable');
}, onError: (v, s) {
expect(v, 42);
@@ -513,7 +509,7 @@
final channel =
SingleResponseChannel(callback: ((v) => Future.value(2 * (v as num))));
channel.port.send(42);
- return channel.result.then<Null>((v) {
+ return channel.result.then<void>((v) {
expect(v, 84);
});
});
@@ -521,7 +517,7 @@
test('AsyncErrorCallback', () {
final channel = SingleResponseChannel(callback: ((v) => Future.error(42)));
channel.port.send(37);
- return channel.result.then<Null>((v) {
+ return channel.result.then<void>((_) {
fail('unreachable');
}, onError: (v, s) {
expect(v, 42);
@@ -530,7 +526,7 @@
test('Timeout', () {
final channel = SingleResponseChannel(timeout: _ms * 100);
- return channel.result.then<Null>((v) {
+ return channel.result.then<void>((v) {
expect(v, null);
});
});
@@ -538,7 +534,7 @@
test('TimeoutThrow', () {
final channel =
SingleResponseChannel(timeout: _ms * 100, throwOnTimeout: true);
- return channel.result.then<Null>((v) {
+ return channel.result.then<void>((v) {
fail('unreachable');
}, onError: (v, s) {
expect(v is TimeoutException, isTrue);
@@ -551,7 +547,7 @@
throwOnTimeout: true,
onTimeout: () => 42,
timeoutValue: 42);
- return channel.result.then<Null>((v) {
+ return channel.result.then<void>((v) {
fail('unreachable');
}, onError: (v, s) {
expect(v is TimeoutException, isTrue);
@@ -561,7 +557,7 @@
test('TimeoutOnTimeout', () {
final channel =
SingleResponseChannel(timeout: _ms * 100, onTimeout: () => 42);
- return channel.result.then<Null>((v) {
+ return channel.result.then<void>((v) {
expect(v, 42);
});
});
@@ -569,14 +565,14 @@
test('TimeoutOnTimeoutAndValue', () {
final channel = SingleResponseChannel(
timeout: _ms * 100, onTimeout: () => 42, timeoutValue: 37);
- return channel.result.then<Null>((v) {
+ return channel.result.then<void>((v) {
expect(v, 42);
});
});
test('TimeoutValue', () {
final channel = SingleResponseChannel(timeout: _ms * 100, timeoutValue: 42);
- return channel.result.then<Null>((v) {
+ return channel.result.then<void>((v) {
expect(v, 42);
});
});
@@ -584,7 +580,7 @@
test('TimeoutOnTimeoutError', () {
final channel =
SingleResponseChannel(timeout: _ms * 100, onTimeout: () => throw 42);
- return channel.result.then<Null>((v) {
+ return channel.result.then<void>((v) {
fail('unreachable');
}, onError: (v, s) {
expect(v, 42);
@@ -594,7 +590,7 @@
test('TimeoutOnTimeoutAsync', () {
final channel = SingleResponseChannel(
timeout: _ms * 100, onTimeout: () => Future.value(42));
- return channel.result.then<Null>((v) {
+ return channel.result.then<void>((v) {
expect(v, 42);
});
});
@@ -602,7 +598,7 @@
test('TimeoutOnTimeoutAsyncError', () {
final channel = SingleResponseChannel(
timeout: _ms * 100, onTimeout: () => Future.error(42));
- return channel.result.then<Null>((v) {
+ return channel.result.then<void>((v) {
fail('unreachable');
}, onError: (v, s) {
expect(v, 42);
diff --git a/test/registry_test.dart b/test/registry_test.dart
index 9059400..763bc9c 100644
--- a/test/registry_test.dart
+++ b/test/registry_test.dart
@@ -2,8 +2,6 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
-library isolate.test.registry_test;
-
import 'dart:async';
import 'dart:isolate';
@@ -43,7 +41,7 @@
return registry.add(element, tags: [tag]);
}).then((_) {
return registry.lookup();
- }).then<Null>((all) {
+ }).then<void>((all) {
expect(all.length, 10);
expect(all.map((v) => v.id).toList()..sort(),
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
@@ -59,7 +57,7 @@
return registry.add(element, tags: [tag]);
}).then((_) {
return registry.lookup(tags: [Oddity.odd]);
- }).then<Null>((all) {
+ }).then<void>((all) {
expect(all.length, 5);
expect(all.map((v) => v.id).toList()..sort(), [1, 3, 5, 7, 9]);
}).whenComplete(regman.close);
@@ -74,7 +72,7 @@
return registry.add(element, tags: [tag]);
}).then((_) {
return registry.lookup(max: 5);
- }).then<Null>((all) {
+ }).then<void>((all) {
expect(all.length, 5);
}).whenComplete(regman.close);
});
@@ -92,7 +90,7 @@
return registry.add(element, tags: tags);
}).then((_) {
return registry.lookup(tags: [2, 3]);
- }).then<Null>((all) {
+ }).then<void>((all) {
expect(all.length, 5);
expect(all.map((v) => v.id).toList()..sort(), [0, 6, 12, 18, 24]);
}).whenComplete(regman.close);
@@ -111,7 +109,7 @@
return registry.add(element, tags: tags);
}).then((_) {
return registry.lookup(tags: [2, 3], max: 3);
- }).then<Null>((all) {
+ }).then<void>((all) {
expect(all.length, 3);
expect(all.every((v) => (v.id % 6) == 0), isTrue);
}).whenComplete(regman.close);
@@ -125,7 +123,7 @@
var object = Object();
return registry.add(object).then((_) {
return registry.lookup();
- }).then<Null>((entries) {
+ }).then<void>((entries) {
expect(entries, hasLength(1));
expect(entries.first, same(object));
}).whenComplete(regman.close);
@@ -140,7 +138,7 @@
var objects = [object1, object2, object3];
return Future.wait(objects.map(registry.add)).then((_) {
return registry.lookup();
- }).then<Null>((entries) {
+ }).then<void>((entries) {
expect(entries, hasLength(3));
for (var entry in entries) {
expect(entry, isIn(objects));
@@ -154,7 +152,7 @@
var object = Object();
return registry.add(object).then((_) {
return registry.add(object);
- }).then<Null>((_) {
+ }).then<void>((_) {
fail('Unreachable');
}, onError: (e, s) {
expect(e, isStateError);
@@ -174,7 +172,7 @@
return registry.add(object2);
}).then((_) {
return registry.lookup();
- }).then<Null>((entries) {
+ }).then<void>((entries) {
expect(entries, hasLength(2));
var entry1 = entries.first;
var entry2 = entries.last;
@@ -195,7 +193,7 @@
return registry.add(object);
}).then((_) {
return registry.lookup();
- }).then<Null>((entries) {
+ }).then<void>((entries) {
expect(entries, hasLength(1));
expect(entries.first, same(object));
}).whenComplete(regman.close);
@@ -213,18 +211,18 @@
return registry.add(object3, tags: [4, 5, 6, 7]);
}).then((_) {
return registry.lookup(tags: [3]);
- }).then<Null>((entries) {
+ }).then<void>((entries) {
expect(entries, hasLength(2));
expect(entries.first == object1 || entries.last == object1, isTrue);
expect(entries.first == object2 || entries.last == object2, isTrue);
}).then((_) {
return registry.lookup(tags: [2]);
- }).then<Null>((entries) {
+ }).then<void>((entries) {
expect(entries, hasLength(1));
expect(entries.first, same(object2));
}).then((_) {
return registry.lookup(tags: [3, 6]);
- }).then<Null>((entries) {
+ }).then<void>((entries) {
expect(entries, hasLength(1));
expect(entries.first, same(object2));
}).whenComplete(regman.close);
@@ -245,7 +243,7 @@
}).then((removeSuccess) {
expect(removeSuccess, isTrue);
return registry.lookup();
- }).then<Null>((entries) {
+ }).then<void>((entries) {
expect(entries, isEmpty);
}).whenComplete(regman.close);
});
@@ -260,7 +258,7 @@
expect(entries.first, same(object));
return registry.remove(object, Capability());
});
- }).then<Null>((removeSuccess) {
+ }).then<void>((removeSuccess) {
expect(removeSuccess, isFalse);
}).whenComplete(regman.close);
});
@@ -284,7 +282,7 @@
return registry.removeTags([object], ['x']);
}).then((_) {
return registry.lookup(tags: ['x']);
- }).then<Null>((entries) {
+ }).then<void>((entries) {
expect(entries, isEmpty);
}).whenComplete(regman.close);
});
@@ -370,7 +368,7 @@
expect(entries, hasLength(1));
expect(entries.first, same(object));
return registry.remove(entries.first, removeCapability);
- }).then<Null>((removeSuccess) {
+ }).then<void>((removeSuccess) {
expect(removeSuccess, isTrue);
});
});
@@ -388,7 +386,7 @@
var regman = RegistryManager(timeout: _ms * 500);
var registry = regman.registry;
regman.close();
- return registry.add(Object()).then<Null>((_) {
+ return registry.add(Object()).then<void>((_) {
fail('unreachable');
}, onError: (e, s) {
expect(e is TimeoutException, isTrue);
@@ -401,7 +399,7 @@
var object = Object();
return registry.add(object).then((rc) {
regman.close();
- return registry.remove(object, rc).then<Null>((_) {
+ return registry.remove(object, rc).then<void>((_) {
fail('unreachable');
}, onError: (e, s) {
expect(e is TimeoutException, isTrue);
@@ -415,7 +413,7 @@
var object = Object();
return registry.add(object).then((rc) {
regman.close();
- return registry.addTags([object], ['x']).then<Null>((_) {
+ return registry.addTags([object], ['x']).then<void>((_) {
fail('unreachable');
}, onError: (e, s) {
expect(e is TimeoutException, isTrue);
@@ -429,7 +427,7 @@
var object = Object();
return registry.add(object).then((rc) {
regman.close();
- return registry.removeTags([object], ['x']).then<Null>((_) {
+ return registry.removeTags([object], ['x']).then<void>((_) {
fail('unreachable');
}, onError: (e, s) {
expect(e is TimeoutException, isTrue);
@@ -441,7 +439,7 @@
var regman = RegistryManager(timeout: _ms * 500);
var registry = regman.registry;
regman.close();
- registry.lookup().then<Null>((_) {
+ registry.lookup().then<void>((_) {
fail('unreachable');
}, onError: (e, s) {
expect(e is TimeoutException, isTrue);
@@ -455,7 +453,7 @@
var registry1 = regman.registry;
var registry2 = regman.registry;
var l1 = ['x'];
- var l2;
+ dynamic l2;
return registry1.add(l1, tags: ['y']).then((removeCapability) {
return registry2.lookup().then((entries) {
expect(entries, hasLength(1));
@@ -471,7 +469,7 @@
}).then((removeSuccess) {
expect(removeSuccess, isTrue);
return registry1.lookup();
- }).then<Null>((entries) {
+ }).then<void>((entries) {
expect(entries, isEmpty);
});
}).whenComplete(regman.close);