Parallel fetching of available versions (#2280)
Implements rate-limited parallel fetching of version information when solving.
Also does speculative pre-fetching of version information of dependencies of the newest versions of each package.
In informal benchmarks of pub get for a package whose only direct dependency is package:test the resolution time goes from 7.8 seconds to 1.5 seconds.
Also adds package:pedantic to our direct runtime dependencies
diff --git a/doc/repository-spec-v2.md b/doc/repository-spec-v2.md
index 8b24da2..4f00f58 100644
--- a/doc/repository-spec-v2.md
+++ b/doc/repository-spec-v2.md
@@ -62,7 +62,9 @@
}
```
-### Inspect a specific version of a package
+### (Deprecated) Inspect a specific version of a package
+
+**Deprecated** as of Dart 2.8, use "List all versions of a package" instead.
**GET** `<PUB_HOSTED_URL>/api/packages/<PACKAGE>/versions/<VERSION>`
@@ -99,4 +101,3 @@
The API for authenticating and publishing packages is not formalized yet, see
[#1381](https://github.com/dart-lang/pub/issues/1381).
-
diff --git a/lib/src/rate_limited_scheduler.dart b/lib/src/rate_limited_scheduler.dart
new file mode 100644
index 0000000..99f9ce3
--- /dev/null
+++ b/lib/src/rate_limited_scheduler.dart
@@ -0,0 +1,142 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:collection';
+
+import 'package:pool/pool.dart';
+import 'package:pedantic/pedantic.dart';
+
+/// Handles rate-limited scheduling of tasks.
+///
+/// Tasks are identified by a jobId of type [J] (should be useful as a Hash-key)
+/// and run with a supplied async function.
+///
+/// Designed to allow speculatively running tasks that will likely be needed
+/// later with [withPrescheduling].
+///
+/// Errors thrown by tasks scheduled with the `preschedule` callback will only
+/// be triggered when you await the [Future] returned by [schedule].
+///
+/// The operation will run in the [Zone] that the task was in when enqueued.
+///
+/// If a task if [preschedule]d and later [schedule]d before the operation is
+/// started, the task will go in front of the queue with the zone of the
+/// [schedule] operation.
+///
+/// Example:
+///
+/// ```dart
+/// // A scheduler that, given a uri, gets that page and returns the body
+/// final scheduler = RateLimitedScheduler(http.read);
+///
+/// scheduler.withPresceduling((preschedule) {
+/// // Start fetching `pub.dev` and `dart.dev` in the background.
+/// scheduler.preschedule(Uri.parse('https://pub.dev/'));
+/// scheduler.preschedule(Uri.parse('https://dart.dev/'));
+/// // ... do time-consuming task.
+/// // Now we actually need `pub.dev`.
+/// final pubDevBody =
+/// await scheduler.schedule(Uri.parse('https://pub.dev/'));
+/// // if the `dart.dev` task has not started yet, it will be canceled when
+/// // leaving `withPresceduling`.
+/// });
+/// ```
+class RateLimitedScheduler<J, V> {
+ final Future<V> Function(J) _runJob;
+
+ /// The results of ongoing and finished jobs.
+ final Map<J, Completer<V>> _cache = <J, Completer<V>>{};
+
+ /// Tasks that are waiting to be run.
+ final Queue<_Task<J>> _queue = Queue<_Task<J>>();
+
+ /// Rate limits the number of concurrent jobs.
+ final Pool _pool;
+
+ /// Jobs that have started running.
+ final Set<J> _started = {};
+
+ RateLimitedScheduler(Future<V> Function(J) runJob,
+ {maxConcurrentOperations = 10})
+ : _runJob = runJob,
+ _pool = Pool(maxConcurrentOperations);
+
+ /// Pick the next task in [_queue] and run it.
+ ///
+ /// If the task is already in [_started] it will not be run again.
+ Future<void> _processNextTask() async {
+ if (_queue.isEmpty) {
+ return;
+ }
+ final task = _queue.removeFirst();
+ final completer = _cache[task.jobId];
+
+ if (!_started.add(task.jobId)) {
+ return;
+ }
+
+ // Use an async function to catch sync exceptions from _runJob.
+ Future<V> runJob() async {
+ return await task.zone.runUnary(_runJob, task.jobId);
+ }
+
+ completer.complete(runJob());
+ // Listen to errors on the completer:
+ // this will make errors thrown by [_run] not
+ // become uncaught.
+ //
+ // They will still show up for other listeners of the future.
+ await completer.future.catchError((_) {});
+ }
+
+ /// Calls [callback] with a function that can pre-schedule jobs.
+ ///
+ /// When [callback] returns, all jobs that where prescheduled by [callback]
+ /// that have not started running will be removed from the work queue
+ /// (if they have been added seperately by [schedule] they will still be
+ /// executed).
+ Future<R> withPrescheduling<R>(
+ FutureOr<R> Function(void Function(J) preschedule) callback,
+ ) async {
+ final prescheduled = <_Task>{};
+ try {
+ return await callback((jobId) {
+ if (_started.contains(jobId)) return;
+ final task = _Task(jobId, Zone.current);
+ _cache.putIfAbsent(jobId, () => Completer());
+ _queue.addLast(task);
+ prescheduled.add(task);
+
+ unawaited(_pool.withResource(_processNextTask));
+ });
+ } finally {
+ _queue.removeWhere(prescheduled.contains);
+ }
+ }
+
+ /// Returns a future that completed with the result of running [jobId].
+ ///
+ /// If [jobId] has already run, the cached result will be returned.
+ /// If [jobId] is not yet running, it will go to the front of the work queue
+ /// to be scheduled next when there are free resources.
+ Future<V> schedule(J jobId) {
+ final completer = _cache.putIfAbsent(jobId, () => Completer());
+ if (!_started.contains(jobId)) {
+ final task = _Task(jobId, Zone.current);
+ _queue.addFirst(task);
+ scheduleMicrotask(() => _pool.withResource(_processNextTask));
+ }
+ return completer.future;
+ }
+}
+
+class _Task<J> {
+ final J jobId;
+ final Zone zone;
+ _Task(this.jobId, this.zone);
+
+ @override
+ String toString() => jobId.toString();
+}
diff --git a/lib/src/solver/version_solver.dart b/lib/src/solver/version_solver.dart
index 675310c..78c80a1 100644
--- a/lib/src/solver/version_solver.dart
+++ b/lib/src/solver/version_solver.dart
@@ -91,13 +91,15 @@
[Term(PackageRange.root(_root), false)], IncompatibilityCause.root));
try {
- var next = _root.name;
- while (next != null) {
- _propagate(next);
- next = await _choosePackageVersion();
- }
+ return await _systemCache.hosted.withPrefetching(() async {
+ var next = _root.name;
+ while (next != null) {
+ _propagate(next);
+ next = await _choosePackageVersion();
+ }
- return await _result();
+ return await _result();
+ });
} finally {
// Gather some solving metrics.
log.solver('Version solving took ${stopwatch.elapsed} seconds.\n'
diff --git a/lib/src/source/git.dart b/lib/src/source/git.dart
index f85e41f..2dfd4a3 100644
--- a/lib/src/source/git.dart
+++ b/lib/src/source/git.dart
@@ -6,6 +6,7 @@
import 'dart:io';
import 'package:path/path.dart' as p;
+import 'package:pool/pool.dart';
import 'package:pub_semver/pub_semver.dart';
import '../git.dart' as git;
@@ -189,6 +190,10 @@
/// The [BoundSource] for [GitSource].
class BoundGitSource extends CachedSource {
+ /// Limit the number of concurrent git operations to 1.
+ // TODO(sigurdm): Use RateLimitedScheduler.
+ final Pool _pool = Pool(1);
+
@override
final GitSource source;
@@ -221,27 +226,31 @@
@override
Future<List<PackageId>> doGetVersions(PackageRef ref) async {
- await _ensureRepoCache(ref);
- var path = _repoCachePath(ref);
- var revision = await _firstRevision(path, ref.description['ref']);
- var pubspec =
- await _describeUncached(ref, revision, ref.description['path']);
+ return await _pool.withResource(() async {
+ await _ensureRepoCache(ref);
+ var path = _repoCachePath(ref);
+ var revision = await _firstRevision(path, ref.description['ref']);
+ var pubspec =
+ await _describeUncached(ref, revision, ref.description['path']);
- return [
- PackageId(ref.name, source, pubspec.version, {
- 'url': ref.description['url'],
- 'ref': ref.description['ref'],
- 'resolved-ref': revision,
- 'path': ref.description['path']
- })
- ];
+ return [
+ PackageId(ref.name, source, pubspec.version, {
+ 'url': ref.description['url'],
+ 'ref': ref.description['ref'],
+ 'resolved-ref': revision,
+ 'path': ref.description['path']
+ })
+ ];
+ });
}
/// Since we don't have an easy way to read from a remote Git repo, this
/// just installs [id] into the system cache, then describes it from there.
@override
- Future<Pubspec> describeUncached(PackageId id) => _describeUncached(
- id.toRef(), id.description['resolved-ref'], id.description['path']);
+ Future<Pubspec> describeUncached(PackageId id) {
+ return _pool.withResource(() => _describeUncached(
+ id.toRef(), id.description['resolved-ref'], id.description['path']));
+ }
/// Like [describeUncached], but takes a separate [ref] and Git [revision]
/// rather than a single ID.
@@ -284,28 +293,32 @@
/// in `cache/`.
@override
Future<Package> downloadToSystemCache(PackageId id) async {
- var ref = id.toRef();
- if (!git.isInstalled) {
- fail("Cannot get ${id.name} from Git (${ref.description['url']}).\n"
- 'Please ensure Git is correctly installed.');
- }
-
- ensureDir(p.join(systemCacheRoot, 'cache'));
- await _ensureRevision(ref, id.description['resolved-ref']);
-
- var revisionCachePath = _revisionCachePath(id);
- await _revisionCacheClones.putIfAbsent(revisionCachePath, () async {
- if (!entryExists(revisionCachePath)) {
- await _clone(_repoCachePath(ref), revisionCachePath);
- await _checkOut(revisionCachePath, id.description['resolved-ref']);
- _writePackageList(revisionCachePath, [id.description['path']]);
- } else {
- _updatePackageList(revisionCachePath, id.description['path']);
+ return await _pool.withResource(() async {
+ var ref = id.toRef();
+ if (!git.isInstalled) {
+ fail("Cannot get ${id.name} from Git (${ref.description['url']}).\n"
+ 'Please ensure Git is correctly installed.');
}
- });
- return Package.load(id.name,
- p.join(revisionCachePath, id.description['path']), systemCache.sources);
+ ensureDir(p.join(systemCacheRoot, 'cache'));
+ await _ensureRevision(ref, id.description['resolved-ref']);
+
+ var revisionCachePath = _revisionCachePath(id);
+ await _revisionCacheClones.putIfAbsent(revisionCachePath, () async {
+ if (!entryExists(revisionCachePath)) {
+ await _clone(_repoCachePath(ref), revisionCachePath);
+ await _checkOut(revisionCachePath, id.description['resolved-ref']);
+ _writePackageList(revisionCachePath, [id.description['path']]);
+ } else {
+ _updatePackageList(revisionCachePath, id.description['path']);
+ }
+ });
+
+ return Package.load(
+ id.name,
+ p.join(revisionCachePath, id.description['path']),
+ systemCache.sources);
+ });
}
/// Returns the path to the revision-specific cache of [id].
diff --git a/lib/src/source/hosted.dart b/lib/src/source/hosted.dart
index e89244d..b0eb730 100644
--- a/lib/src/source/hosted.dart
+++ b/lib/src/source/hosted.dart
@@ -6,8 +6,11 @@
import 'dart:convert';
import 'dart:io' as io;
+import 'package:collection/collection.dart' show maxBy;
import 'package:http/http.dart' as http;
import 'package:path/path.dart' as p;
+import 'package:pedantic/pedantic.dart';
+import 'package:pub/src/rate_limited_scheduler.dart';
import 'package:pub_semver/pub_semver.dart';
import 'package:stack_trace/stack_trace.dart';
@@ -154,36 +157,66 @@
@override
final SystemCache systemCache;
+ RateLimitedScheduler<PackageRef, Map<PackageId, Pubspec>> _scheduler;
- BoundHostedSource(this.source, this.systemCache);
+ BoundHostedSource(this.source, this.systemCache) {
+ _scheduler =
+ RateLimitedScheduler(_fetchVersions, maxConcurrentOperations: 10);
+ }
- /// Downloads a list of all versions of a package that are available from the
- /// site.
- @override
- Future<List<PackageId>> doGetVersions(PackageRef ref) async {
+ Future<Map<PackageId, Pubspec>> _fetchVersions(PackageRef ref) async {
var url = _makeUrl(
ref.description, (server, package) => '$server/api/packages/$package');
-
log.io('Get versions from $url.');
String body;
try {
+ // TODO(sigurdm): Implement cancellation of requests. This probably
+ // requires resolution of: https://github.com/dart-lang/sdk/issues/22265.
body = await httpClient.read(url, headers: pubApiHeaders);
} catch (error, stackTrace) {
var parsed = source._parseDescription(ref.description);
_throwFriendlyError(error, stackTrace, parsed.first, parsed.last);
}
-
- var doc = jsonDecode(body);
- return (doc['versions'] as List).map((map) {
+ final doc = jsonDecode(body);
+ final versions = doc['versions'] as List;
+ final result = Map.fromEntries(versions.map((map) {
var pubspec = Pubspec.fromMap(map['pubspec'], systemCache.sources,
expectedName: ref.name, location: url);
var id = source.idFor(ref.name, pubspec.version,
url: _serverFor(ref.description));
- memoizePubspec(id, pubspec);
+ return MapEntry(id, pubspec);
+ }));
- return id;
- }).toList();
+ // Prefetch the dependencies of the latest version, we are likely to need
+ // them later.
+ final preschedule =
+ Zone.current[_prefetchingKey] as void Function(PackageRef);
+ if (preschedule != null) {
+ final latestVersion =
+ maxBy(result.keys.map((id) => id.version), (e) => e);
+
+ final latestVersionId =
+ PackageId(ref.name, source, latestVersion, ref.description);
+
+ final dependencies = result[latestVersionId]?.dependencies?.values ?? [];
+ unawaited(withDependencyType(DependencyType.none, () async {
+ for (final packageRange in dependencies) {
+ if (packageRange.source is HostedSource) {
+ preschedule(packageRange.toRef());
+ }
+ }
+ }));
+ }
+ return result;
+ }
+
+ /// Downloads a list of all versions of a package that are available from the
+ /// site.
+ @override
+ Future<List<PackageId>> doGetVersions(PackageRef ref) async {
+ final versions = await _scheduler.schedule(ref);
+ return versions.keys.toList();
}
/// Parses [description] into its server and package name components, then
@@ -198,27 +231,15 @@
return Uri.parse(pattern(server, package));
}
- /// Downloads and parses the pubspec for a specific version of a package that
- /// is available from the site.
+ /// Retrieves the pubspec for a specific version of a package that is
+ /// available from the site.
@override
Future<Pubspec> describeUncached(PackageId id) async {
- // Request it from the server.
- var url = _makeVersionUrl(
- id,
- (server, package, version) =>
- '$server/api/packages/$package/versions/$version');
-
- log.io('Describe package at $url.');
- Map<String, dynamic> version;
- try {
- version = jsonDecode(await httpClient.read(url, headers: pubApiHeaders));
- } catch (error, stackTrace) {
- var parsed = source._parseDescription(id.description);
- _throwFriendlyError(error, stackTrace, id.name, parsed.last);
- }
-
- return Pubspec.fromMap(version['pubspec'], systemCache.sources,
- expectedName: id.name, location: url);
+ final versions = await _scheduler.schedule(id.toRef());
+ final url = _makeUrl(
+ id.description, (server, package) => '$server/api/packages/$package');
+ return versions[id] ??
+ (throw PackageNotFoundException('Could not find package $id at $url'));
}
/// Downloads the package identified by [id] to the system cache.
@@ -441,18 +462,17 @@
Uri _serverFor(description) =>
Uri.parse(source._parseDescription(description).last);
- /// Parses [id] into its server, package name, and version components, then
- /// converts that to a Uri given [pattern].
- ///
- /// Ensures the package name is properly URL encoded.
- Uri _makeVersionUrl(PackageId id,
- String Function(String server, String package, String version) pattern) {
- var parsed = source._parseDescription(id.description);
- var server = parsed.last;
- var package = Uri.encodeComponent(parsed.first);
- var version = Uri.encodeComponent(id.version.toString());
- return Uri.parse(pattern(server, package, version));
+ /// Enables speculative prefetching of dependencies of packages queried with
+ /// [getVersions].
+ Future<T> withPrefetching<T>(Future<T> Function() callback) async {
+ return await _scheduler.withPrescheduling((preschedule) async {
+ return await runZoned(callback,
+ zoneValues: {_prefetchingKey: preschedule});
+ });
}
+
+ /// Key for storing the current prefetch function in the current [Zone].
+ static const _prefetchingKey = #_prefetch;
}
/// This is the modified hosted source used when pub get or upgrade are run
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index 1401282..89687d8 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -295,20 +295,27 @@
.reduce((max, element) => compare(element, max) > 0 ? element : max);
}
-/// Like [minBy], but with an asynchronous [orderBy] callback.
+/// Returns the element of [values] for which [orderBy] returns the smallest
+/// value.
+///
+/// Returns the first such value in case of ties.
+///
+/// Starts all the [orderBy] invocations in parallel.
Future<S> minByAsync<S, T>(
Iterable<S> values, Future<T> Function(S) orderBy) async {
- S minValue;
+ int minIndex;
T minOrderBy;
- for (var element in values) {
- var elementOrderBy = await orderBy(element);
+ List valuesList = values.toList();
+ final orderByResults = await Future.wait(values.map(orderBy));
+ for (var i = 0; i < orderByResults.length; i++) {
+ final elementOrderBy = orderByResults[i];
if (minOrderBy == null ||
(elementOrderBy as Comparable).compareTo(minOrderBy) < 0) {
- minValue = element;
+ minIndex = i;
minOrderBy = elementOrderBy;
}
}
- return minValue;
+ return valuesList[minIndex];
}
/// Like [List.sublist], but for any iterable.
diff --git a/pubspec.yaml b/pubspec.yaml
index d524cbe..2d76599 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -20,6 +20,7 @@
oauth2: ^1.0.0
package_config: ^1.0.0
path: ^1.2.0
+ pedantic: ^1.9.0
pool: ^1.0.0
pub_semver: ^1.4.0
shelf: ^0.7.0
diff --git a/test/rate_limited_scheduler_test.dart b/test/rate_limited_scheduler_test.dart
new file mode 100644
index 0000000..346c21b
--- /dev/null
+++ b/test/rate_limited_scheduler_test.dart
@@ -0,0 +1,213 @@
+// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:test/test.dart';
+import 'package:pedantic/pedantic.dart';
+import 'package:pub/src/rate_limited_scheduler.dart';
+
+void main() {
+ Map<String, Completer> threeCompleters() =>
+ {'a': Completer(), 'b': Completer(), 'c': Completer()};
+
+ test('scheduler is rate limited', () async {
+ final completers = threeCompleters();
+ final isBeingProcessed = threeCompleters();
+
+ Future<String> f(String i) async {
+ isBeingProcessed[i].complete();
+ await completers[i].future;
+ return i.toUpperCase();
+ }
+
+ final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
+ await scheduler.withPrescheduling((preschedule) async {
+ preschedule('a');
+ preschedule('b');
+ preschedule('c');
+ await Future.wait(
+ [isBeingProcessed['a'].future, isBeingProcessed['b'].future]);
+ expect(isBeingProcessed['c'].isCompleted, isFalse);
+ completers['a'].complete();
+ await isBeingProcessed['c'].future;
+ completers['c'].complete();
+ expect(await scheduler.schedule('c'), 'C');
+ });
+ });
+
+ test('scheduler.preschedule cancels unrun prescheduled task after callback',
+ () async {
+ final completers = threeCompleters();
+ final isBeingProcessed = threeCompleters();
+
+ Future<String> f(String i) async {
+ isBeingProcessed[i].complete();
+ await completers[i].future;
+ return i.toUpperCase();
+ }
+
+ final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1);
+
+ await scheduler.withPrescheduling((preschedule1) async {
+ await scheduler.withPrescheduling((preschedule2) async {
+ preschedule1('a');
+ preschedule2('b');
+ preschedule1('c');
+ await isBeingProcessed['a'].future;
+ // b, c should not start processing due to rate-limiting.
+ expect(isBeingProcessed['b'].isCompleted, isFalse);
+ expect(isBeingProcessed['c'].isCompleted, isFalse);
+ });
+ completers['a'].complete();
+ // b is removed from the queue, now c should start processing.
+ await isBeingProcessed['c'].future;
+ completers['c'].complete();
+ expect(await scheduler.schedule('c'), 'C');
+ // b is not on the queue anymore.
+ expect(isBeingProcessed['b'].isCompleted, isFalse);
+ });
+ });
+
+ test('scheduler.preschedule does not cancel tasks that are scheduled',
+ () async {
+ final completers = threeCompleters();
+ final isBeingProcessed = threeCompleters();
+
+ Future<String> f(String i) async {
+ isBeingProcessed[i].complete();
+ await completers[i].future;
+ return i.toUpperCase();
+ }
+
+ final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1);
+
+ Future b;
+ await scheduler.withPrescheduling((preschedule) async {
+ preschedule('a');
+ preschedule('b');
+ await isBeingProcessed['a'].future;
+ // b should not start processing due to rate-limiting.
+ expect(isBeingProcessed['b'].isCompleted, isFalse);
+ b = scheduler.schedule('b');
+ });
+ completers['a'].complete();
+ expect(await scheduler.schedule('a'), 'A');
+ // b was scheduled, so it should get processed now
+ await isBeingProcessed['b'].future;
+ completers['b'].complete();
+ expect(await b, 'B');
+ });
+
+ test('scheduler caches results', () async {
+ final completers = threeCompleters();
+ final isBeingProcessed = threeCompleters();
+
+ Future<String> f(String i) async {
+ isBeingProcessed[i].complete();
+ await completers[i].future;
+ return i.toUpperCase();
+ }
+
+ final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
+
+ completers['a'].complete();
+ expect(await scheduler.schedule('a'), 'A');
+ // Would fail if isBeingProcessed['a'] was completed twice
+ expect(await scheduler.schedule('a'), 'A');
+ });
+
+ test('scheduler prioritizes fetched tasks before prefetched', () async {
+ final completers = threeCompleters();
+ final isBeingProcessed = threeCompleters();
+
+ Future<String> f(String i) async {
+ isBeingProcessed[i].complete();
+ await completers[i].future;
+ return i.toUpperCase();
+ }
+
+ final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1);
+ await scheduler.withPrescheduling((preschedule) async {
+ preschedule('a');
+ preschedule('b');
+ await isBeingProcessed['a'].future;
+ final cResult = scheduler.schedule('c');
+ expect(isBeingProcessed['b'].isCompleted, isFalse);
+ completers['a'].complete();
+ completers['c'].complete();
+ await isBeingProcessed['c'].future;
+ // 'c' is done before we allow 'b' to finish processing
+ expect(await cResult, 'C');
+ });
+ });
+
+ test('Errors trigger when the scheduled future is listened to', () async {
+ final completers = threeCompleters();
+ final isBeingProcessed = threeCompleters();
+
+ Future<String> f(String i) async {
+ isBeingProcessed[i].complete();
+ await completers[i].future;
+ return i.toUpperCase();
+ }
+
+ final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
+
+ await scheduler.withPrescheduling((preschedule) async {
+ preschedule('a');
+ preschedule('b');
+ preschedule('c');
+ await isBeingProcessed['a'].future;
+ await isBeingProcessed['b'].future;
+ expect(isBeingProcessed['c'].isCompleted, isFalse);
+ unawaited(completers['c'].future.catchError((_) {}));
+ completers['c'].completeError('errorC');
+ completers['a'].completeError('errorA');
+ await isBeingProcessed['c'].future;
+ completers['b'].completeError('errorB');
+ expect(() async => await scheduler.schedule('a'), throwsA('errorA'));
+ expect(() async => await scheduler.schedule('b'), throwsA('errorB'));
+ expect(() async => await scheduler.schedule('c'), throwsA('errorC'));
+ });
+ });
+
+ test('tasks run in the zone they where enqueued in', () async {
+ final completers = threeCompleters();
+ final isBeingProcessed = threeCompleters();
+
+ Future<String> f(String i) async {
+ isBeingProcessed[i].complete();
+ await completers[i].future;
+ return Zone.current['zoneValue'];
+ }
+
+ final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
+ await scheduler.withPrescheduling((preschedule) async {
+ runZoned(() {
+ preschedule('a');
+ }, zoneValues: {'zoneValue': 'A'});
+ runZoned(() {
+ preschedule('b');
+ }, zoneValues: {'zoneValue': 'B'});
+ runZoned(() {
+ preschedule('c');
+ }, zoneValues: {'zoneValue': 'C'});
+
+ await runZoned(() async {
+ await isBeingProcessed['a'].future;
+ await isBeingProcessed['b'].future;
+ // This will put 'c' in front of the queue, but in a zone with zoneValue
+ // bound to S.
+ final f = expectLater(scheduler.schedule('c'), completion('S'));
+ completers['a'].complete();
+ completers['b'].complete();
+ expect(await scheduler.schedule('a'), 'A');
+ expect(await scheduler.schedule('b'), 'B');
+ completers['c'].complete();
+ await f;
+ }, zoneValues: {'zoneValue': 'S'});
+ });
+ });
+}
diff --git a/test/utils_test.dart b/test/utils_test.dart
index e7de8f0..c0d6067 100644
--- a/test/utils_test.dart
+++ b/test/utils_test.dart
@@ -2,6 +2,8 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
+import 'dart:async';
+
import 'package:pub/src/utils.dart';
import 'package:test/test.dart';
@@ -122,4 +124,43 @@
}
});
});
+
+ group('minByAsync', () {
+ test('is stable', () async {
+ {
+ final completers = <String, Completer>{};
+ Completer completer(k) => completers.putIfAbsent(k, () => Completer());
+ Future<int> lengthWhenComplete(String s) async {
+ await completer(s).future;
+ return s.length;
+ }
+
+ final w = expectLater(
+ minByAsync(['aa', 'a', 'b', 'ccc'], lengthWhenComplete),
+ completion('a'));
+ completer('aa').complete();
+ completer('b').complete();
+ completer('a').complete();
+ completer('ccc').complete();
+ await w;
+ }
+ {
+ final completers = <String, Completer>{};
+ Completer completer(k) => completers.putIfAbsent(k, () => Completer());
+ Future<int> lengthWhenComplete(String s) async {
+ await completer(s).future;
+ return s.length;
+ }
+
+ final w = expectLater(
+ minByAsync(['aa', 'a', 'b', 'ccc'], lengthWhenComplete),
+ completion('a'));
+ completer('ccc').complete();
+ completer('a').complete();
+ completer('b').complete();
+ completer('aa').complete();
+ await w;
+ }
+ });
+ });
}