Download hosted packages in parallel when repairing cache (#2377)

diff --git a/lib/src/command/cache_repair.dart b/lib/src/command/cache_repair.dart
index ed1f5c0..da95c54 100644
--- a/lib/src/command/cache_repair.dart
+++ b/lib/src/command/cache_repair.dart
@@ -26,17 +26,21 @@
 
   @override
   Future run() async {
-    var successes = [];
-    var failures = [];
-
     // Repair every cached source.
-    for (var source in cache.sources.all.map(cache.source)) {
-      if (source is CachedSource) {
-        var results = await source.repairCachedPackages();
-        successes.addAll(results.first);
-        failures.addAll(results.last);
-      }
-    }
+    final repairResults = (await Future.wait(
+            cache.sources.all.map(cache.source).map((source) async {
+      return source is CachedSource
+          ? await source.repairCachedPackages()
+          : <RepairResult>[];
+    })))
+        .expand((x) => x);
+
+    final successes = [
+      for (final result in repairResults) if (result.success) result.package
+    ];
+    final failures = [
+      for (final result in repairResults) if (!result.success) result.package
+    ];
 
     if (successes.isNotEmpty) {
       var packages = pluralize('package', successes.length);
@@ -59,24 +63,27 @@
       log.message(buffer.toString());
     }
 
-    var results = await globals.repairActivatedPackages();
-    if (results.first.isNotEmpty) {
-      var packages = pluralize('package', results.first.length);
-      log.message('Reactivated ${log.green(results.first.length)} $packages.');
+    var globalRepairResults = await globals.repairActivatedPackages();
+    if (globalRepairResults.first.isNotEmpty) {
+      var packages = pluralize('package', globalRepairResults.first.length);
+      log.message(
+          'Reactivated ${log.green(globalRepairResults.first.length)} $packages.');
     }
 
-    if (results.last.isNotEmpty) {
-      var packages = pluralize('package', results.last.length);
+    if (globalRepairResults.last.isNotEmpty) {
+      var packages = pluralize('package', globalRepairResults.last.length);
       log.message(
-          'Failed to reactivate ${log.red(results.last.length)} $packages:\n' +
-              results.last.map((name) => '- ${log.bold(name)}').join('\n'));
+          'Failed to reactivate ${log.red(globalRepairResults.last.length)} $packages:\n' +
+              globalRepairResults.last
+                  .map((name) => '- ${log.bold(name)}')
+                  .join('\n'));
     }
 
     if (successes.isEmpty && failures.isEmpty) {
       log.message('No packages in cache, so nothing to repair.');
     }
 
-    if (failures.isNotEmpty || results.last.isNotEmpty) {
+    if (failures.isNotEmpty || globalRepairResults.last.isNotEmpty) {
       await flushThenExit(exit_codes.UNAVAILABLE);
     }
   }
diff --git a/lib/src/source/cached.dart b/lib/src/source/cached.dart
index 1fe99c3..28f903a 100644
--- a/lib/src/source/cached.dart
+++ b/lib/src/source/cached.dart
@@ -4,6 +4,7 @@
 
 import 'dart:async';
 
+import 'package:meta/meta.dart';
 import 'package:path/path.dart' as path;
 
 import '../io.dart';
@@ -11,7 +12,6 @@
 import '../package_name.dart';
 import '../pubspec.dart';
 import '../source.dart';
-import '../utils.dart';
 
 /// Base class for a [BoundSource] that installs packages into pub's
 /// [SystemCache].
@@ -67,8 +67,19 @@
   /// Reinstalls all packages that have been previously installed into the
   /// system cache by this source.
   ///
-  /// Returns a [Pair] whose first element is the packages that were
-  /// successfully repaired and the second is the packages that failed to be
-  /// repaired.
-  Future<Pair<List<PackageId>, List<PackageId>>> repairCachedPackages();
+  /// Returns a list of results indicating for each if that package was
+  /// successfully repaired.
+  Future<Iterable<RepairResult>> repairCachedPackages();
+}
+
+/// The result of repairing a single cache entry.
+class RepairResult {
+  /// `true` if [package] was repaired successfully.
+  /// `false` if something failed during the repair.
+  ///
+  /// When something goes wrong the package is attempted removed from
+  /// cache (but that might itself have failed).
+  final bool success;
+  final PackageId package;
+  RepairResult(this.package, {@required this.success});
 }
diff --git a/lib/src/source/git.dart b/lib/src/source/git.dart
index 2dfd4a3..f503fd4 100644
--- a/lib/src/source/git.dart
+++ b/lib/src/source/git.dart
@@ -336,11 +336,10 @@
   /// Resets all cached packages back to the pristine state of the Git
   /// repository at the revision they are pinned to.
   @override
-  Future<Pair<List<PackageId>, List<PackageId>>> repairCachedPackages() async {
-    if (!dirExists(systemCacheRoot)) return Pair([], []);
+  Future<Iterable<RepairResult>> repairCachedPackages() async {
+    if (!dirExists(systemCacheRoot)) return [];
 
-    var successes = <PackageId>[];
-    var failures = <PackageId>[];
+    final result = <RepairResult>[];
 
     var packages = listDir(systemCacheRoot)
         .where((entry) => dirExists(p.join(entry, '.git')))
@@ -356,7 +355,9 @@
             } catch (error, stackTrace) {
               log.error('Failed to load package', error, stackTrace);
               var name = p.basename(revisionCachePath).split('-').first;
-              failures.add(PackageId(name, source, Version.none, '???'));
+              result.add(RepairResult(
+                  PackageId(name, source, Version.none, '???'),
+                  success: false));
               tryDeleteEntry(revisionCachePath);
               return null;
             }
@@ -387,19 +388,19 @@
         // Discard all changes to tracked files.
         await git.run(['reset', '--hard', 'HEAD'], workingDir: package.dir);
 
-        successes.add(id);
+        result.add(RepairResult(id, success: true));
       } on git.GitException catch (error, stackTrace) {
         log.error('Failed to reset ${log.bold(package.name)} '
             '${package.version}. Error:\n$error');
         log.fine(stackTrace);
-        failures.add(id);
+        result.add(RepairResult(id, success: false));
 
         // Delete the revision cache path, not the subdirectory that contains the package.
         tryDeleteEntry(getDirectory(id));
       }
     }
 
-    return Pair(successes, failures);
+    return result;
   }
 
   /// Ensures that the canonical clone of the repository referred to by [ref]
diff --git a/lib/src/source/hosted.dart b/lib/src/source/hosted.dart
index 146f0f8..28bb36e 100644
--- a/lib/src/source/hosted.dart
+++ b/lib/src/source/hosted.dart
@@ -282,47 +282,51 @@
   /// Re-downloads all packages that have been previously downloaded into the
   /// system cache from any server.
   @override
-  Future<Pair<List<PackageId>, List<PackageId>>> repairCachedPackages() async {
-    if (!dirExists(systemCacheRoot)) return Pair([], []);
+  Future<Iterable<RepairResult>> repairCachedPackages() async {
+    if (!dirExists(systemCacheRoot)) return [];
 
-    var successes = <PackageId>[];
-    var failures = <PackageId>[];
-
-    for (var serverDir in listDir(systemCacheRoot)) {
-      var url = _directoryToUrl(p.basename(serverDir));
-
-      var packages = <Package>[];
-      for (var entry in listDir(serverDir)) {
-        try {
-          packages.add(Package.load(null, entry, systemCache.sources));
-        } catch (error, stackTrace) {
-          log.error('Failed to load package', error, stackTrace);
-          failures.add(_idForBasename(p.basename(entry)));
-          tryDeleteEntry(entry);
+    return (await Future.wait(listDir(systemCacheRoot).map(
+      (serverDir) async {
+        var url = _directoryToUrl(p.basename(serverDir));
+        final results = <RepairResult>[];
+        var packages = <Package>[];
+        for (var entry in listDir(serverDir)) {
+          try {
+            packages.add(Package.load(null, entry, systemCache.sources));
+          } catch (error, stackTrace) {
+            log.error('Failed to load package', error, stackTrace);
+            results.add(RepairResult(_idForBasename(p.basename(entry)),
+                success: false));
+            tryDeleteEntry(entry);
+          }
         }
-      }
 
-      packages.sort(Package.orderByNameAndVersion);
+        packages.sort(Package.orderByNameAndVersion);
 
-      for (var package in packages) {
-        var id = source.idFor(package.name, package.version, url: url);
-        try {
-          await _download(id, package.dir);
-          successes.add(id);
-        } catch (error, stackTrace) {
-          failures.add(id);
-          var message = 'Failed to repair ${log.bold(package.name)} '
-              '${package.version}';
-          if (url != source.defaultUrl) message += ' from $url';
-          log.error('$message. Error:\n$error');
-          log.fine(stackTrace);
+        return results
+          ..addAll(await Future.wait(
+            packages.map(
+              (package) async {
+                var id = source.idFor(package.name, package.version, url: url);
+                try {
+                  await _download(id, package.dir);
+                  return RepairResult(id, success: true);
+                } catch (error, stackTrace) {
+                  var message = 'Failed to repair ${log.bold(package.name)} '
+                      '${package.version}';
+                  if (url != source.defaultUrl) message += ' from $url';
+                  log.error('$message. Error:\n$error');
+                  log.fine(stackTrace);
 
-          tryDeleteEntry(package.dir);
-        }
-      }
-    }
-
-    return Pair(successes, failures);
+                  tryDeleteEntry(package.dir);
+                  return RepairResult(id, success: false);
+                }
+              },
+            ),
+          ));
+      },
+    )))
+        .expand((x) => x);
   }
 
   /// Returns the best-guess package ID for [basename], which should be a