Parallel fetching of available versions (#2280)

Implements rate-limited parallel fetching of version information when solving.
Also does speculative pre-fetching of version information of dependencies of the newest versions of each package.

In informal benchmarks of pub get for a package whose only direct dependency is package:test the resolution time goes from 7.8 seconds to 1.5 seconds.

Also adds package:pedantic to our direct runtime dependencies
diff --git a/doc/repository-spec-v2.md b/doc/repository-spec-v2.md
index 8b24da2..4f00f58 100644
--- a/doc/repository-spec-v2.md
+++ b/doc/repository-spec-v2.md
@@ -62,7 +62,9 @@
 }
 ```
 
-### Inspect a specific version of a package
+### (Deprecated) Inspect a specific version of a package
+
+**Deprecated** as of Dart 2.8, use "List all versions of a package" instead.
 
 **GET** `<PUB_HOSTED_URL>/api/packages/<PACKAGE>/versions/<VERSION>`
 
@@ -99,4 +101,3 @@
 
 The API for authenticating and publishing packages is not formalized yet, see
 [#1381](https://github.com/dart-lang/pub/issues/1381).
-
diff --git a/lib/src/rate_limited_scheduler.dart b/lib/src/rate_limited_scheduler.dart
new file mode 100644
index 0000000..99f9ce3
--- /dev/null
+++ b/lib/src/rate_limited_scheduler.dart
@@ -0,0 +1,142 @@
+// Copyright (c) 2019, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+import 'dart:collection';
+
+import 'package:pool/pool.dart';
+import 'package:pedantic/pedantic.dart';
+
+/// Handles rate-limited scheduling of tasks.
+///
+/// Tasks are identified by a jobId of type [J] (should be useful as a Hash-key)
+/// and run with a supplied async function.
+///
+/// Designed to allow speculatively running tasks that will likely be needed
+/// later with [withPrescheduling].
+///
+/// Errors thrown by tasks scheduled with the `preschedule` callback will only
+/// be triggered when you await the [Future] returned by [schedule].
+///
+/// The operation will run in the [Zone] that the task was in when enqueued.
+///
+/// If a task if [preschedule]d and later [schedule]d before the operation is
+/// started, the task will go in front of the queue with the zone of the
+/// [schedule] operation.
+///
+/// Example:
+///
+/// ```dart
+/// // A scheduler that, given a uri, gets that page and returns the body
+/// final scheduler = RateLimitedScheduler(http.read);
+///
+/// scheduler.withPresceduling((preschedule) {
+///   // Start fetching `pub.dev` and `dart.dev` in the background.
+///   scheduler.preschedule(Uri.parse('https://pub.dev/'));
+///   scheduler.preschedule(Uri.parse('https://dart.dev/'));
+///   // ... do time-consuming task.
+///   // Now we actually need `pub.dev`.
+///   final pubDevBody =
+///       await scheduler.schedule(Uri.parse('https://pub.dev/'));
+///   // if the `dart.dev` task has not started yet, it will be canceled when
+///   // leaving `withPresceduling`.
+/// });
+/// ```
+class RateLimitedScheduler<J, V> {
+  final Future<V> Function(J) _runJob;
+
+  /// The results of ongoing and finished jobs.
+  final Map<J, Completer<V>> _cache = <J, Completer<V>>{};
+
+  /// Tasks that are waiting to be run.
+  final Queue<_Task<J>> _queue = Queue<_Task<J>>();
+
+  /// Rate limits the number of concurrent jobs.
+  final Pool _pool;
+
+  /// Jobs that have started running.
+  final Set<J> _started = {};
+
+  RateLimitedScheduler(Future<V> Function(J) runJob,
+      {maxConcurrentOperations = 10})
+      : _runJob = runJob,
+        _pool = Pool(maxConcurrentOperations);
+
+  /// Pick the next task in [_queue] and run it.
+  ///
+  /// If the task is already in [_started] it will not be run again.
+  Future<void> _processNextTask() async {
+    if (_queue.isEmpty) {
+      return;
+    }
+    final task = _queue.removeFirst();
+    final completer = _cache[task.jobId];
+
+    if (!_started.add(task.jobId)) {
+      return;
+    }
+
+    // Use an async function to catch sync exceptions from _runJob.
+    Future<V> runJob() async {
+      return await task.zone.runUnary(_runJob, task.jobId);
+    }
+
+    completer.complete(runJob());
+    // Listen to errors on the completer:
+    // this will make errors thrown by [_run] not
+    // become uncaught.
+    //
+    // They will still show up for other listeners of the future.
+    await completer.future.catchError((_) {});
+  }
+
+  /// Calls [callback] with a function that can pre-schedule jobs.
+  ///
+  /// When [callback] returns, all jobs that where prescheduled by [callback]
+  /// that have not started running will be removed from the work queue
+  /// (if they have been added seperately by [schedule] they will still be
+  /// executed).
+  Future<R> withPrescheduling<R>(
+    FutureOr<R> Function(void Function(J) preschedule) callback,
+  ) async {
+    final prescheduled = <_Task>{};
+    try {
+      return await callback((jobId) {
+        if (_started.contains(jobId)) return;
+        final task = _Task(jobId, Zone.current);
+        _cache.putIfAbsent(jobId, () => Completer());
+        _queue.addLast(task);
+        prescheduled.add(task);
+
+        unawaited(_pool.withResource(_processNextTask));
+      });
+    } finally {
+      _queue.removeWhere(prescheduled.contains);
+    }
+  }
+
+  /// Returns a future that completed with the result of running [jobId].
+  ///
+  /// If [jobId] has already run, the cached result will be returned.
+  /// If [jobId] is not yet running, it will go to the front of the work queue
+  /// to be scheduled next when there are free resources.
+  Future<V> schedule(J jobId) {
+    final completer = _cache.putIfAbsent(jobId, () => Completer());
+    if (!_started.contains(jobId)) {
+      final task = _Task(jobId, Zone.current);
+      _queue.addFirst(task);
+      scheduleMicrotask(() => _pool.withResource(_processNextTask));
+    }
+    return completer.future;
+  }
+}
+
+class _Task<J> {
+  final J jobId;
+  final Zone zone;
+  _Task(this.jobId, this.zone);
+
+  @override
+  String toString() => jobId.toString();
+}
diff --git a/lib/src/solver/version_solver.dart b/lib/src/solver/version_solver.dart
index 675310c..78c80a1 100644
--- a/lib/src/solver/version_solver.dart
+++ b/lib/src/solver/version_solver.dart
@@ -91,13 +91,15 @@
         [Term(PackageRange.root(_root), false)], IncompatibilityCause.root));
 
     try {
-      var next = _root.name;
-      while (next != null) {
-        _propagate(next);
-        next = await _choosePackageVersion();
-      }
+      return await _systemCache.hosted.withPrefetching(() async {
+        var next = _root.name;
+        while (next != null) {
+          _propagate(next);
+          next = await _choosePackageVersion();
+        }
 
-      return await _result();
+        return await _result();
+      });
     } finally {
       // Gather some solving metrics.
       log.solver('Version solving took ${stopwatch.elapsed} seconds.\n'
diff --git a/lib/src/source/git.dart b/lib/src/source/git.dart
index f85e41f..2dfd4a3 100644
--- a/lib/src/source/git.dart
+++ b/lib/src/source/git.dart
@@ -6,6 +6,7 @@
 import 'dart:io';
 
 import 'package:path/path.dart' as p;
+import 'package:pool/pool.dart';
 import 'package:pub_semver/pub_semver.dart';
 
 import '../git.dart' as git;
@@ -189,6 +190,10 @@
 
 /// The [BoundSource] for [GitSource].
 class BoundGitSource extends CachedSource {
+  /// Limit the number of concurrent git operations to 1.
+  // TODO(sigurdm): Use RateLimitedScheduler.
+  final Pool _pool = Pool(1);
+
   @override
   final GitSource source;
 
@@ -221,27 +226,31 @@
 
   @override
   Future<List<PackageId>> doGetVersions(PackageRef ref) async {
-    await _ensureRepoCache(ref);
-    var path = _repoCachePath(ref);
-    var revision = await _firstRevision(path, ref.description['ref']);
-    var pubspec =
-        await _describeUncached(ref, revision, ref.description['path']);
+    return await _pool.withResource(() async {
+      await _ensureRepoCache(ref);
+      var path = _repoCachePath(ref);
+      var revision = await _firstRevision(path, ref.description['ref']);
+      var pubspec =
+          await _describeUncached(ref, revision, ref.description['path']);
 
-    return [
-      PackageId(ref.name, source, pubspec.version, {
-        'url': ref.description['url'],
-        'ref': ref.description['ref'],
-        'resolved-ref': revision,
-        'path': ref.description['path']
-      })
-    ];
+      return [
+        PackageId(ref.name, source, pubspec.version, {
+          'url': ref.description['url'],
+          'ref': ref.description['ref'],
+          'resolved-ref': revision,
+          'path': ref.description['path']
+        })
+      ];
+    });
   }
 
   /// Since we don't have an easy way to read from a remote Git repo, this
   /// just installs [id] into the system cache, then describes it from there.
   @override
-  Future<Pubspec> describeUncached(PackageId id) => _describeUncached(
-      id.toRef(), id.description['resolved-ref'], id.description['path']);
+  Future<Pubspec> describeUncached(PackageId id) {
+    return _pool.withResource(() => _describeUncached(
+        id.toRef(), id.description['resolved-ref'], id.description['path']));
+  }
 
   /// Like [describeUncached], but takes a separate [ref] and Git [revision]
   /// rather than a single ID.
@@ -284,28 +293,32 @@
   /// in `cache/`.
   @override
   Future<Package> downloadToSystemCache(PackageId id) async {
-    var ref = id.toRef();
-    if (!git.isInstalled) {
-      fail("Cannot get ${id.name} from Git (${ref.description['url']}).\n"
-          'Please ensure Git is correctly installed.');
-    }
-
-    ensureDir(p.join(systemCacheRoot, 'cache'));
-    await _ensureRevision(ref, id.description['resolved-ref']);
-
-    var revisionCachePath = _revisionCachePath(id);
-    await _revisionCacheClones.putIfAbsent(revisionCachePath, () async {
-      if (!entryExists(revisionCachePath)) {
-        await _clone(_repoCachePath(ref), revisionCachePath);
-        await _checkOut(revisionCachePath, id.description['resolved-ref']);
-        _writePackageList(revisionCachePath, [id.description['path']]);
-      } else {
-        _updatePackageList(revisionCachePath, id.description['path']);
+    return await _pool.withResource(() async {
+      var ref = id.toRef();
+      if (!git.isInstalled) {
+        fail("Cannot get ${id.name} from Git (${ref.description['url']}).\n"
+            'Please ensure Git is correctly installed.');
       }
-    });
 
-    return Package.load(id.name,
-        p.join(revisionCachePath, id.description['path']), systemCache.sources);
+      ensureDir(p.join(systemCacheRoot, 'cache'));
+      await _ensureRevision(ref, id.description['resolved-ref']);
+
+      var revisionCachePath = _revisionCachePath(id);
+      await _revisionCacheClones.putIfAbsent(revisionCachePath, () async {
+        if (!entryExists(revisionCachePath)) {
+          await _clone(_repoCachePath(ref), revisionCachePath);
+          await _checkOut(revisionCachePath, id.description['resolved-ref']);
+          _writePackageList(revisionCachePath, [id.description['path']]);
+        } else {
+          _updatePackageList(revisionCachePath, id.description['path']);
+        }
+      });
+
+      return Package.load(
+          id.name,
+          p.join(revisionCachePath, id.description['path']),
+          systemCache.sources);
+    });
   }
 
   /// Returns the path to the revision-specific cache of [id].
diff --git a/lib/src/source/hosted.dart b/lib/src/source/hosted.dart
index e89244d..b0eb730 100644
--- a/lib/src/source/hosted.dart
+++ b/lib/src/source/hosted.dart
@@ -6,8 +6,11 @@
 import 'dart:convert';
 import 'dart:io' as io;
 
+import 'package:collection/collection.dart' show maxBy;
 import 'package:http/http.dart' as http;
 import 'package:path/path.dart' as p;
+import 'package:pedantic/pedantic.dart';
+import 'package:pub/src/rate_limited_scheduler.dart';
 import 'package:pub_semver/pub_semver.dart';
 import 'package:stack_trace/stack_trace.dart';
 
@@ -154,36 +157,66 @@
 
   @override
   final SystemCache systemCache;
+  RateLimitedScheduler<PackageRef, Map<PackageId, Pubspec>> _scheduler;
 
-  BoundHostedSource(this.source, this.systemCache);
+  BoundHostedSource(this.source, this.systemCache) {
+    _scheduler =
+        RateLimitedScheduler(_fetchVersions, maxConcurrentOperations: 10);
+  }
 
-  /// Downloads a list of all versions of a package that are available from the
-  /// site.
-  @override
-  Future<List<PackageId>> doGetVersions(PackageRef ref) async {
+  Future<Map<PackageId, Pubspec>> _fetchVersions(PackageRef ref) async {
     var url = _makeUrl(
         ref.description, (server, package) => '$server/api/packages/$package');
-
     log.io('Get versions from $url.');
 
     String body;
     try {
+      // TODO(sigurdm): Implement cancellation of requests. This probably
+      // requires resolution of: https://github.com/dart-lang/sdk/issues/22265.
       body = await httpClient.read(url, headers: pubApiHeaders);
     } catch (error, stackTrace) {
       var parsed = source._parseDescription(ref.description);
       _throwFriendlyError(error, stackTrace, parsed.first, parsed.last);
     }
-
-    var doc = jsonDecode(body);
-    return (doc['versions'] as List).map((map) {
+    final doc = jsonDecode(body);
+    final versions = doc['versions'] as List;
+    final result = Map.fromEntries(versions.map((map) {
       var pubspec = Pubspec.fromMap(map['pubspec'], systemCache.sources,
           expectedName: ref.name, location: url);
       var id = source.idFor(ref.name, pubspec.version,
           url: _serverFor(ref.description));
-      memoizePubspec(id, pubspec);
+      return MapEntry(id, pubspec);
+    }));
 
-      return id;
-    }).toList();
+    // Prefetch the dependencies of the latest version, we are likely to need
+    // them later.
+    final preschedule =
+        Zone.current[_prefetchingKey] as void Function(PackageRef);
+    if (preschedule != null) {
+      final latestVersion =
+          maxBy(result.keys.map((id) => id.version), (e) => e);
+
+      final latestVersionId =
+          PackageId(ref.name, source, latestVersion, ref.description);
+
+      final dependencies = result[latestVersionId]?.dependencies?.values ?? [];
+      unawaited(withDependencyType(DependencyType.none, () async {
+        for (final packageRange in dependencies) {
+          if (packageRange.source is HostedSource) {
+            preschedule(packageRange.toRef());
+          }
+        }
+      }));
+    }
+    return result;
+  }
+
+  /// Downloads a list of all versions of a package that are available from the
+  /// site.
+  @override
+  Future<List<PackageId>> doGetVersions(PackageRef ref) async {
+    final versions = await _scheduler.schedule(ref);
+    return versions.keys.toList();
   }
 
   /// Parses [description] into its server and package name components, then
@@ -198,27 +231,15 @@
     return Uri.parse(pattern(server, package));
   }
 
-  /// Downloads and parses the pubspec for a specific version of a package that
-  /// is available from the site.
+  /// Retrieves the pubspec for a specific version of a package that is
+  /// available from the site.
   @override
   Future<Pubspec> describeUncached(PackageId id) async {
-    // Request it from the server.
-    var url = _makeVersionUrl(
-        id,
-        (server, package, version) =>
-            '$server/api/packages/$package/versions/$version');
-
-    log.io('Describe package at $url.');
-    Map<String, dynamic> version;
-    try {
-      version = jsonDecode(await httpClient.read(url, headers: pubApiHeaders));
-    } catch (error, stackTrace) {
-      var parsed = source._parseDescription(id.description);
-      _throwFriendlyError(error, stackTrace, id.name, parsed.last);
-    }
-
-    return Pubspec.fromMap(version['pubspec'], systemCache.sources,
-        expectedName: id.name, location: url);
+    final versions = await _scheduler.schedule(id.toRef());
+    final url = _makeUrl(
+        id.description, (server, package) => '$server/api/packages/$package');
+    return versions[id] ??
+        (throw PackageNotFoundException('Could not find package $id at $url'));
   }
 
   /// Downloads the package identified by [id] to the system cache.
@@ -441,18 +462,17 @@
   Uri _serverFor(description) =>
       Uri.parse(source._parseDescription(description).last);
 
-  /// Parses [id] into its server, package name, and version components, then
-  /// converts that to a Uri given [pattern].
-  ///
-  /// Ensures the package name is properly URL encoded.
-  Uri _makeVersionUrl(PackageId id,
-      String Function(String server, String package, String version) pattern) {
-    var parsed = source._parseDescription(id.description);
-    var server = parsed.last;
-    var package = Uri.encodeComponent(parsed.first);
-    var version = Uri.encodeComponent(id.version.toString());
-    return Uri.parse(pattern(server, package, version));
+  /// Enables speculative prefetching of dependencies of packages queried with
+  /// [getVersions].
+  Future<T> withPrefetching<T>(Future<T> Function() callback) async {
+    return await _scheduler.withPrescheduling((preschedule) async {
+      return await runZoned(callback,
+          zoneValues: {_prefetchingKey: preschedule});
+    });
   }
+
+  /// Key for storing the current prefetch function in the current [Zone].
+  static const _prefetchingKey = #_prefetch;
 }
 
 /// This is the modified hosted source used when pub get or upgrade are run
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index 1401282..89687d8 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -295,20 +295,27 @@
       .reduce((max, element) => compare(element, max) > 0 ? element : max);
 }
 
-/// Like [minBy], but with an asynchronous [orderBy] callback.
+/// Returns the element of [values] for which [orderBy] returns the smallest
+/// value.
+///
+/// Returns the first such value in case of ties.
+///
+/// Starts all the [orderBy] invocations in parallel.
 Future<S> minByAsync<S, T>(
     Iterable<S> values, Future<T> Function(S) orderBy) async {
-  S minValue;
+  int minIndex;
   T minOrderBy;
-  for (var element in values) {
-    var elementOrderBy = await orderBy(element);
+  List valuesList = values.toList();
+  final orderByResults = await Future.wait(values.map(orderBy));
+  for (var i = 0; i < orderByResults.length; i++) {
+    final elementOrderBy = orderByResults[i];
     if (minOrderBy == null ||
         (elementOrderBy as Comparable).compareTo(minOrderBy) < 0) {
-      minValue = element;
+      minIndex = i;
       minOrderBy = elementOrderBy;
     }
   }
-  return minValue;
+  return valuesList[minIndex];
 }
 
 /// Like [List.sublist], but for any iterable.
diff --git a/pubspec.yaml b/pubspec.yaml
index d524cbe..2d76599 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -20,6 +20,7 @@
   oauth2: ^1.0.0
   package_config: ^1.0.0
   path: ^1.2.0
+  pedantic: ^1.9.0
   pool: ^1.0.0
   pub_semver: ^1.4.0
   shelf: ^0.7.0
diff --git a/test/rate_limited_scheduler_test.dart b/test/rate_limited_scheduler_test.dart
new file mode 100644
index 0000000..346c21b
--- /dev/null
+++ b/test/rate_limited_scheduler_test.dart
@@ -0,0 +1,213 @@
+// Copyright (c) 2019, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:test/test.dart';
+import 'package:pedantic/pedantic.dart';
+import 'package:pub/src/rate_limited_scheduler.dart';
+
+void main() {
+  Map<String, Completer> threeCompleters() =>
+      {'a': Completer(), 'b': Completer(), 'c': Completer()};
+
+  test('scheduler is rate limited', () async {
+    final completers = threeCompleters();
+    final isBeingProcessed = threeCompleters();
+
+    Future<String> f(String i) async {
+      isBeingProcessed[i].complete();
+      await completers[i].future;
+      return i.toUpperCase();
+    }
+
+    final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
+    await scheduler.withPrescheduling((preschedule) async {
+      preschedule('a');
+      preschedule('b');
+      preschedule('c');
+      await Future.wait(
+          [isBeingProcessed['a'].future, isBeingProcessed['b'].future]);
+      expect(isBeingProcessed['c'].isCompleted, isFalse);
+      completers['a'].complete();
+      await isBeingProcessed['c'].future;
+      completers['c'].complete();
+      expect(await scheduler.schedule('c'), 'C');
+    });
+  });
+
+  test('scheduler.preschedule cancels unrun prescheduled task after callback',
+      () async {
+    final completers = threeCompleters();
+    final isBeingProcessed = threeCompleters();
+
+    Future<String> f(String i) async {
+      isBeingProcessed[i].complete();
+      await completers[i].future;
+      return i.toUpperCase();
+    }
+
+    final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1);
+
+    await scheduler.withPrescheduling((preschedule1) async {
+      await scheduler.withPrescheduling((preschedule2) async {
+        preschedule1('a');
+        preschedule2('b');
+        preschedule1('c');
+        await isBeingProcessed['a'].future;
+        // b, c should not start processing due to rate-limiting.
+        expect(isBeingProcessed['b'].isCompleted, isFalse);
+        expect(isBeingProcessed['c'].isCompleted, isFalse);
+      });
+      completers['a'].complete();
+      // b is removed from the queue, now c should start processing.
+      await isBeingProcessed['c'].future;
+      completers['c'].complete();
+      expect(await scheduler.schedule('c'), 'C');
+      // b is not on the queue anymore.
+      expect(isBeingProcessed['b'].isCompleted, isFalse);
+    });
+  });
+
+  test('scheduler.preschedule does not cancel tasks that are scheduled',
+      () async {
+    final completers = threeCompleters();
+    final isBeingProcessed = threeCompleters();
+
+    Future<String> f(String i) async {
+      isBeingProcessed[i].complete();
+      await completers[i].future;
+      return i.toUpperCase();
+    }
+
+    final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1);
+
+    Future b;
+    await scheduler.withPrescheduling((preschedule) async {
+      preschedule('a');
+      preschedule('b');
+      await isBeingProcessed['a'].future;
+      // b should not start processing due to rate-limiting.
+      expect(isBeingProcessed['b'].isCompleted, isFalse);
+      b = scheduler.schedule('b');
+    });
+    completers['a'].complete();
+    expect(await scheduler.schedule('a'), 'A');
+    // b was scheduled, so it should get processed now
+    await isBeingProcessed['b'].future;
+    completers['b'].complete();
+    expect(await b, 'B');
+  });
+
+  test('scheduler caches results', () async {
+    final completers = threeCompleters();
+    final isBeingProcessed = threeCompleters();
+
+    Future<String> f(String i) async {
+      isBeingProcessed[i].complete();
+      await completers[i].future;
+      return i.toUpperCase();
+    }
+
+    final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
+
+    completers['a'].complete();
+    expect(await scheduler.schedule('a'), 'A');
+    // Would fail if isBeingProcessed['a'] was completed twice
+    expect(await scheduler.schedule('a'), 'A');
+  });
+
+  test('scheduler prioritizes fetched tasks before prefetched', () async {
+    final completers = threeCompleters();
+    final isBeingProcessed = threeCompleters();
+
+    Future<String> f(String i) async {
+      isBeingProcessed[i].complete();
+      await completers[i].future;
+      return i.toUpperCase();
+    }
+
+    final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 1);
+    await scheduler.withPrescheduling((preschedule) async {
+      preschedule('a');
+      preschedule('b');
+      await isBeingProcessed['a'].future;
+      final cResult = scheduler.schedule('c');
+      expect(isBeingProcessed['b'].isCompleted, isFalse);
+      completers['a'].complete();
+      completers['c'].complete();
+      await isBeingProcessed['c'].future;
+      // 'c' is done before we allow 'b' to finish processing
+      expect(await cResult, 'C');
+    });
+  });
+
+  test('Errors trigger when the scheduled future is listened to', () async {
+    final completers = threeCompleters();
+    final isBeingProcessed = threeCompleters();
+
+    Future<String> f(String i) async {
+      isBeingProcessed[i].complete();
+      await completers[i].future;
+      return i.toUpperCase();
+    }
+
+    final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
+
+    await scheduler.withPrescheduling((preschedule) async {
+      preschedule('a');
+      preschedule('b');
+      preschedule('c');
+      await isBeingProcessed['a'].future;
+      await isBeingProcessed['b'].future;
+      expect(isBeingProcessed['c'].isCompleted, isFalse);
+      unawaited(completers['c'].future.catchError((_) {}));
+      completers['c'].completeError('errorC');
+      completers['a'].completeError('errorA');
+      await isBeingProcessed['c'].future;
+      completers['b'].completeError('errorB');
+      expect(() async => await scheduler.schedule('a'), throwsA('errorA'));
+      expect(() async => await scheduler.schedule('b'), throwsA('errorB'));
+      expect(() async => await scheduler.schedule('c'), throwsA('errorC'));
+    });
+  });
+
+  test('tasks run in the zone they where enqueued in', () async {
+    final completers = threeCompleters();
+    final isBeingProcessed = threeCompleters();
+
+    Future<String> f(String i) async {
+      isBeingProcessed[i].complete();
+      await completers[i].future;
+      return Zone.current['zoneValue'];
+    }
+
+    final scheduler = RateLimitedScheduler(f, maxConcurrentOperations: 2);
+    await scheduler.withPrescheduling((preschedule) async {
+      runZoned(() {
+        preschedule('a');
+      }, zoneValues: {'zoneValue': 'A'});
+      runZoned(() {
+        preschedule('b');
+      }, zoneValues: {'zoneValue': 'B'});
+      runZoned(() {
+        preschedule('c');
+      }, zoneValues: {'zoneValue': 'C'});
+
+      await runZoned(() async {
+        await isBeingProcessed['a'].future;
+        await isBeingProcessed['b'].future;
+        // This will put 'c' in front of the queue, but in a zone with zoneValue
+        // bound to S.
+        final f = expectLater(scheduler.schedule('c'), completion('S'));
+        completers['a'].complete();
+        completers['b'].complete();
+        expect(await scheduler.schedule('a'), 'A');
+        expect(await scheduler.schedule('b'), 'B');
+        completers['c'].complete();
+        await f;
+      }, zoneValues: {'zoneValue': 'S'});
+    });
+  });
+}
diff --git a/test/utils_test.dart b/test/utils_test.dart
index e7de8f0..c0d6067 100644
--- a/test/utils_test.dart
+++ b/test/utils_test.dart
@@ -2,6 +2,8 @@
 // for details. All rights reserved. Use of this source code is governed by a
 // BSD-style license that can be found in the LICENSE file.
 
+import 'dart:async';
+
 import 'package:pub/src/utils.dart';
 import 'package:test/test.dart';
 
@@ -122,4 +124,43 @@
       }
     });
   });
+
+  group('minByAsync', () {
+    test('is stable', () async {
+      {
+        final completers = <String, Completer>{};
+        Completer completer(k) => completers.putIfAbsent(k, () => Completer());
+        Future<int> lengthWhenComplete(String s) async {
+          await completer(s).future;
+          return s.length;
+        }
+
+        final w = expectLater(
+            minByAsync(['aa', 'a', 'b', 'ccc'], lengthWhenComplete),
+            completion('a'));
+        completer('aa').complete();
+        completer('b').complete();
+        completer('a').complete();
+        completer('ccc').complete();
+        await w;
+      }
+      {
+        final completers = <String, Completer>{};
+        Completer completer(k) => completers.putIfAbsent(k, () => Completer());
+        Future<int> lengthWhenComplete(String s) async {
+          await completer(s).future;
+          return s.length;
+        }
+
+        final w = expectLater(
+            minByAsync(['aa', 'a', 'b', 'ccc'], lengthWhenComplete),
+            completion('a'));
+        completer('ccc').complete();
+        completer('a').complete();
+        completer('b').complete();
+        completer('aa').complete();
+        await w;
+      }
+    });
+  });
 }