Refactor HTTP retries (#3325) (#3590)

* Break out networking related OS error codes

* Add mapException feature to retry util

* Add WIP sendWithRetries method

* Fix potential runtime null response in onRetry option

* Remove code to get CI working

* Apply suggestions from code review

Co-authored-by: Jonas Finnemann Jensen <jopsen@gmail.com>

* Update Pub HTTP exceptions

* Remove stack trace from retry methods

* Remove hard-coded domain check for HTTP 500

* Simplify method

* Align retry behavior with requirements

* Move withAuthenticatedClient a layer up and clarify comment

* Clarify usage of global HTTP client

* Take request metadata out of _PubHttpClient

* Use RetryClient for OAuth2 calls

* Retrigger checks

* Wrap versions HTTP call with retryForHttp

* Add more comments

* Use "throwIfNotOk" in versions HTTP call

* Change PackageIntegrityException to always subclass as an intermittent PubHttpException

* Configure OAuth2 HTTP retries

* Make metadata headers method an extension on http.Request and use .throwIfNotOk() in more places

* Add retries to OIDC discovery document request

* Add some retries to upload command

* Add retries to upload package step

* Fix indentation

* Make PubHttpResponseException use BaseResponse

* Fix get_test

* Fix more tests

* Fix SHA-256 tests

* Only send pub.dev API Accept header when necessary and fix tests

Co-authored-by: Jonas Finnemann Jensen <jopsen@gmail.com>
diff --git a/lib/src/authentication/client.dart b/lib/src/authentication/client.dart
index a6001ec..787f394 100644
--- a/lib/src/authentication/client.dart
+++ b/lib/src/authentication/client.dart
@@ -48,19 +48,14 @@
           await _credential!.getAuthorizationHeaderValue();
     }
 
-    try {
-      final response = await _inner.send(request);
-      if (response.statusCode == 401) {
-        _detectInvalidCredentials = true;
-        _throwAuthException(response);
-      }
-      return response;
-    } on PubHttpException catch (e) {
-      if (e.response.statusCode == 403) {
-        _throwAuthException(e.response);
-      }
-      rethrow;
+    final response = await _inner.send(request);
+    if (response.statusCode == 401) {
+      _detectInvalidCredentials = true;
     }
+    if (response.statusCode == 401 || response.statusCode == 403) {
+      _throwAuthException(response);
+    }
+    return response;
   }
 
   /// Throws [AuthenticationException] that includes response status code and
@@ -127,7 +122,7 @@
   Future<T> Function(http.Client) fn,
 ) async {
   final credential = systemCache.tokenStore.findCredential(hostedUrl);
-  final client = _AuthenticatedClient(httpClient, credential);
+  final client = _AuthenticatedClient(globalHttpClient, credential);
 
   try {
     return await fn(client);
diff --git a/lib/src/command.dart b/lib/src/command.dart
index 31f0b17..90f920d 100644
--- a/lib/src/command.dart
+++ b/lib/src/command.dart
@@ -238,7 +238,7 @@
           log.message('Logs written to $transcriptPath.');
         }
       }
-      httpClient.close();
+      globalHttpClient.close();
     }
   }
 
@@ -253,7 +253,9 @@
       exception = exception.innerError!;
     }
 
-    if (exception is HttpException ||
+    if (exception is PackageIntegrityException) {
+      return exit_codes.TEMP_FAIL;
+    } else if (exception is HttpException ||
         exception is http.ClientException ||
         exception is SocketException ||
         exception is TlsException ||
diff --git a/lib/src/command/lish.dart b/lib/src/command/lish.dart
index 4b182b2..7090412 100644
--- a/lib/src/command/lish.dart
+++ b/lib/src/command/lish.dart
@@ -93,34 +93,51 @@
 
     try {
       await log.progress('Uploading', () async {
-        var newUri = host.resolve('api/packages/versions/new');
-        var response = await client.get(newUri, headers: pubApiHeaders);
-        var parameters = parseJsonResponse(response);
+        /// 1. Initiate upload
+        final parametersResponse =
+            await retryForHttp('initiating upload', () async {
+          final request =
+              http.Request('GET', host.resolve('api/packages/versions/new'));
+          request.attachPubApiHeaders();
+          request.attachMetadataHeaders();
+          return await client.fetch(request);
+        });
+        final parameters = parseJsonResponse(parametersResponse);
 
-        var url = _expectField(parameters, 'url', response);
-        if (url is! String) invalidServerResponse(response);
+        /// 2. Upload package
+        var url = _expectField(parameters, 'url', parametersResponse);
+        if (url is! String) invalidServerResponse(parametersResponse);
         cloudStorageUrl = Uri.parse(url);
-        // TODO(nweiz): Cloud Storage can provide an XML-formatted error. We
-        // should report that error and exit.
-        var request = http.MultipartRequest('POST', cloudStorageUrl!);
+        final uploadResponse =
+            await retryForHttp('uploading package', () async {
+          // TODO(nweiz): Cloud Storage can provide an XML-formatted error. We
+          // should report that error and exit.
+          var request = http.MultipartRequest('POST', cloudStorageUrl!);
 
-        var fields = _expectField(parameters, 'fields', response);
-        if (fields is! Map) invalidServerResponse(response);
-        fields.forEach((key, value) {
-          if (value is! String) invalidServerResponse(response);
-          request.fields[key] = value;
+          var fields = _expectField(parameters, 'fields', parametersResponse);
+          if (fields is! Map) invalidServerResponse(parametersResponse);
+          fields.forEach((key, value) {
+            if (value is! String) invalidServerResponse(parametersResponse);
+            request.fields[key] = value;
+          });
+
+          request.followRedirects = false;
+          request.files.add(http.MultipartFile.fromBytes('file', packageBytes,
+              filename: 'package.tar.gz'));
+          return await client.fetch(request);
         });
 
-        request.followRedirects = false;
-        request.files.add(http.MultipartFile.fromBytes('file', packageBytes,
-            filename: 'package.tar.gz'));
-        var postResponse =
-            await http.Response.fromStream(await client.send(request));
-
-        var location = postResponse.headers['location'];
-        if (location == null) throw PubHttpException(postResponse);
-        handleJsonSuccess(
-            await client.get(Uri.parse(location), headers: pubApiHeaders));
+        /// 3. Finalize publish
+        var location = uploadResponse.headers['location'];
+        if (location == null) throw PubHttpResponseException(uploadResponse);
+        final finalizeResponse =
+            await retryForHttp('finalizing publish', () async {
+          final request = http.Request('GET', Uri.parse(location));
+          request.attachPubApiHeaders();
+          request.attachMetadataHeaders();
+          return await client.fetch(request);
+        });
+        handleJsonSuccess(finalizeResponse);
       });
     } on AuthenticationException catch (error) {
       var msg = '';
@@ -138,7 +155,7 @@
         msg += '\n${error.serverMessage!}\n';
       }
       dataError(msg + log.red('Authentication failed!'));
-    } on PubHttpException catch (error) {
+    } on PubHttpResponseException catch (error) {
       var url = error.response.request!.url;
       if (url == cloudStorageUrl) {
         // TODO(nweiz): the response may have XML-formatted information about
@@ -189,7 +206,7 @@
           return _publishUsingClient(packageBytes, client);
         });
       }
-    } on PubHttpException catch (error) {
+    } on PubHttpResponseException catch (error) {
       var url = error.response.request!.url;
       if (Uri.parse(url.origin) == Uri.parse(host.origin)) {
         handleJsonError(error.response);
diff --git a/lib/src/command/login.dart b/lib/src/command/login.dart
index 8f428ab..0743845 100644
--- a/lib/src/command/login.dart
+++ b/lib/src/command/login.dart
@@ -7,7 +7,6 @@
 
 import '../command.dart';
 import '../command_runner.dart';
-import '../http.dart';
 import '../log.dart' as log;
 import '../oauth2.dart' as oauth2;
 
@@ -46,9 +45,8 @@
 
   Future<_UserInfo?> _retrieveUserInfo() async {
     return await oauth2.withClient(cache, (client) async {
-      final discovery = await httpClient.get(Uri.https(
-          'accounts.google.com', '/.well-known/openid-configuration'));
-      final userInfoEndpoint = json.decode(discovery.body)['userinfo_endpoint'];
+      final discovery = await oauth2.fetchOidcDiscoveryDocument();
+      final userInfoEndpoint = discovery['userinfo_endpoint'];
       final userInfoRequest = await client.get(Uri.parse(userInfoEndpoint));
       if (userInfoRequest.statusCode != 200) return null;
       try {
diff --git a/lib/src/exceptions.dart b/lib/src/exceptions.dart
index 39e9c51..90c9d1a 100644
--- a/lib/src/exceptions.dart
+++ b/lib/src/exceptions.dart
@@ -12,6 +12,7 @@
 import 'package:yaml/yaml.dart';
 
 import 'dart.dart';
+import 'http.dart';
 
 /// An exception class for exceptions that are intended to be seen by the user.
 ///
@@ -106,12 +107,9 @@
 }
 
 /// A class for exceptions where a package's checksum could not be validated.
-class PackageIntegrityException extends WrappedException {
-  PackageIntegrityException(
-    String message, {
-    Object? innerError,
-    StackTrace? innerTrace,
-  }) : super(message, innerError, innerTrace);
+class PackageIntegrityException extends PubHttpException {
+  PackageIntegrityException(String message)
+      : super(message, isIntermittent: true);
 }
 
 /// Returns whether [error] is a user-facing error object.
diff --git a/lib/src/http.dart b/lib/src/http.dart
index 6473a8e..ccd5146 100644
--- a/lib/src/http.dart
+++ b/lib/src/http.dart
@@ -9,14 +9,10 @@
 import 'dart:math' as math;
 
 import 'package:http/http.dart' as http;
-import 'package:http/retry.dart';
 import 'package:pool/pool.dart';
-import 'package:stack_trace/stack_trace.dart';
 
 import 'command.dart';
-import 'io.dart';
 import 'log.dart' as log;
-import 'oauth2.dart' as oauth2;
 import 'package.dart';
 import 'sdk.dart';
 import 'source/hosted.dart';
@@ -46,22 +42,6 @@
 
   @override
   Future<http.StreamedResponse> send(http.BaseRequest request) async {
-    if (_shouldAddMetadata(request)) {
-      request.headers['X-Pub-OS'] = Platform.operatingSystem;
-      request.headers['X-Pub-Command'] = PubCommand.command;
-      request.headers['X-Pub-Session-ID'] = _sessionId;
-
-      var environment = Platform.environment['PUB_ENVIRONMENT'];
-      if (environment != null) {
-        request.headers['X-Pub-Environment'] = environment;
-      }
-
-      var type = Zone.current[#_dependencyType];
-      if (type != null && type != DependencyType.none) {
-        request.headers['X-Pub-Reason'] = type.toString();
-      }
-    }
-
     _requestStopwatches[request] = Stopwatch()..start();
     request.headers[HttpHeaders.userAgentHeader] = 'Dart pub ${sdk.version}';
     _logRequest(request);
@@ -73,24 +53,6 @@
     return streamedResponse;
   }
 
-  /// Whether extra metadata headers should be added to [request].
-  bool _shouldAddMetadata(http.BaseRequest request) {
-    if (runningFromTest && Platform.environment.containsKey('PUB_HOSTED_URL')) {
-      if (request.url.origin != Platform.environment['PUB_HOSTED_URL']) {
-        return false;
-      }
-    } else {
-      if (!HostedSource.isPubDevUrl(request.url.toString())) return false;
-    }
-
-    if (Platform.environment.containsKey('CI') &&
-        Platform.environment['CI'] != 'false') {
-      return false;
-    }
-
-    return true;
-  }
-
   /// Logs the fact that [request] was sent, and information about it.
   void _logRequest(http.BaseRequest request) {
     var requestLog = StringBuffer();
@@ -155,130 +117,14 @@
   void close() => _inner.close();
 }
 
-/// The [_PubHttpClient] wrapped by [httpClient].
+/// The [_PubHttpClient] wrapped by [globalHttpClient].
 final _pubClient = _PubHttpClient();
 
-/// A set of all hostnames for which we've printed a message indicating that
-/// we're waiting for them to come back up.
-final _retriedHosts = <String>{};
-
-/// Intercepts all requests and throws exceptions if the response was not
-/// considered successful.
-class _ThrowingClient extends http.BaseClient {
-  final http.Client _inner;
-
-  _ThrowingClient(this._inner);
-
-  @override
-  Future<http.StreamedResponse> send(http.BaseRequest request) async {
-    late http.StreamedResponse streamedResponse;
-    try {
-      streamedResponse = await _inner.send(request);
-    } on SocketException catch (error, stackTraceOrNull) {
-      // Work around issue 23008.
-      var stackTrace = stackTraceOrNull;
-
-      if (error.osError == null) rethrow;
-
-      // Handle error codes known to be related to DNS or SSL issues. While it
-      // is tempting to handle these error codes before retrying, saving time
-      // for the end-user, it is known that DNS lookups can fail intermittently
-      // in some cloud environments. Furthermore, since these error codes are
-      // platform-specific (undocumented) and essentially cargo-culted along
-      // skipping retries may lead to intermittent issues that could be fixed
-      // with a retry. Failing to retry intermittent issues is likely to cause
-      // customers to wrap pub in a retry loop which will not improve the
-      // end-user experience.
-      if (error.osError!.errorCode == 8 ||
-          error.osError!.errorCode == -2 ||
-          error.osError!.errorCode == -5 ||
-          error.osError!.errorCode == 11001 ||
-          error.osError!.errorCode == 11004) {
-        fail('Could not resolve URL "${request.url.origin}".', error,
-            stackTrace);
-      } else if (error.osError!.errorCode == -12276) {
-        fail(
-            'Unable to validate SSL certificate for '
-            '"${request.url.origin}".',
-            error,
-            stackTrace);
-      } else {
-        rethrow;
-      }
-    }
-
-    var status = streamedResponse.statusCode;
-    // 401 responses should be handled by the OAuth2 client. It's very
-    // unlikely that they'll be returned by non-OAuth2 requests. We also want
-    // to pass along 400 responses from the token endpoint.
-    var tokenRequest = streamedResponse.request!.url == oauth2.tokenEndpoint;
-    if (status < 400 || status == 401 || (status == 400 && tokenRequest)) {
-      return streamedResponse;
-    }
-
-    if (status == 406 && request.headers['Accept'] == pubApiHeaders['Accept']) {
-      fail('Pub ${sdk.version} is incompatible with the current version of '
-          '${request.url.host}.\n'
-          'Upgrade pub to the latest version and try again.');
-    }
-
-    if (status == 500 &&
-        (request.url.host == 'pub.dev' ||
-            request.url.host == 'storage.googleapis.com')) {
-      fail('HTTP error 500: Internal Server Error at ${request.url}.\n'
-          'This is likely a transient error. Please try again later.');
-    }
-
-    throw PubHttpException(await http.Response.fromStream(streamedResponse));
-  }
-
-  @override
-  void close() => _inner.close();
-}
-
 /// The HTTP client to use for all HTTP requests.
-final httpClient = _ThrottleClient(
-    16,
-    _ThrowingClient(RetryClient(_pubClient,
-        retries: math.max(
-          1, // Having less than 1 retry is **always** wrong.
-          int.tryParse(Platform.environment['PUB_MAX_HTTP_RETRIES'] ?? '') ?? 7,
-        ),
-        when: (response) =>
-            const [500, 502, 503, 504].contains(response.statusCode),
-        whenError: (error, stackTrace) {
-          if (error is! IOException) return false;
+final globalHttpClient = _pubClient;
 
-          var chain = Chain.forTrace(stackTrace);
-          log.io('HTTP error:\n$error\n\n${chain.terse}');
-          return true;
-        },
-        delay: (retryCount) {
-          if (retryCount < 3) {
-            // Retry quickly a couple times in case of a short transient error.
-            //
-            // Add a random delay to avoid retrying a bunch of parallel requests
-            // all at the same time.
-            return Duration(milliseconds: 500) * math.pow(1.5, retryCount) +
-                Duration(milliseconds: random.nextInt(500));
-          } else {
-            // If the error persists, wait a long time. This works around issues
-            // where an AppEngine instance will go down and need to be rebooted,
-            // which takes about a minute.
-            return Duration(seconds: 30);
-          }
-        },
-        onRetry: (request, response, retryCount) {
-          log.io('Retry #${retryCount + 1} for '
-              '${request.method} ${request.url}...');
-          if (retryCount != 3) return;
-          if (!_retriedHosts.add(request.url.host)) return;
-          log.message(
-              'It looks like ${request.url.host} is having some trouble.\n'
-              'Pub will wait for a while before trying to connect again.');
-        })));
-
-/// The underlying HTTP client wrapped by [httpClient].
+/// The underlying HTTP client wrapped by [globalHttpClient].
+/// This enables the ability to use a mock client in tests.
 http.Client get innerHttpClient => _pubClient._inner;
 set innerHttpClient(http.Client client) => _pubClient._inner = client;
 
@@ -294,6 +140,36 @@
   return runZoned(callback, zoneValues: {#_dependencyType: type});
 }
 
+extension AttachHeaders on http.Request {
+  /// Adds headers required for pub.dev API requests.
+  void attachPubApiHeaders() {
+    headers.addAll(pubApiHeaders);
+  }
+
+  /// Adds request metadata headers about the Pub tool's environment and the
+  /// currently running command if the request URL indicates the destination is
+  /// a Hosted Pub Repository.
+  void attachMetadataHeaders() {
+    if (!HostedSource.shouldSendAdditionalMetadataFor(url)) {
+      return;
+    }
+
+    headers['X-Pub-OS'] = Platform.operatingSystem;
+    headers['X-Pub-Command'] = PubCommand.command;
+    headers['X-Pub-Session-ID'] = _sessionId;
+
+    var environment = Platform.environment['PUB_ENVIRONMENT'];
+    if (environment != null) {
+      headers['X-Pub-Environment'] = environment;
+    }
+
+    var type = Zone.current[#_dependencyType];
+    if (type != null && type != DependencyType.none) {
+      headers['X-Pub-Reason'] = type.toString();
+    }
+  }
+}
+
 /// Handles a successful JSON-formatted response from pub.dev.
 ///
 /// These responses are expected to be of the form `{"success": {"message":
@@ -314,7 +190,12 @@
 /// These responses are expected to be of the form `{"error": {"message": "some
 /// message"}}`. If the format is correct, the message will be raised as an
 /// error; otherwise an [invalidServerResponse] error will be raised.
-void handleJsonError(http.Response response) {
+void handleJsonError(http.BaseResponse response) {
+  if (response is! http.Response) {
+    // Not likely to be a common code path, but necessary.
+    // See https://github.com/dart-lang/pub/pull/3590#discussion_r1012978108
+    fail(log.red('Invalid server response'));
+  }
   var errorMap = parseJsonResponse(response);
   if (errorMap['error'] is! Map ||
       !errorMap['error'].containsKey('message') ||
@@ -345,56 +226,145 @@
 
 /// Exception thrown when an HTTP operation fails.
 class PubHttpException implements Exception {
-  final http.Response response;
+  final String message;
+  final bool isIntermittent;
 
-  const PubHttpException(this.response);
+  PubHttpException(this.message, {this.isIntermittent = false});
 
   @override
-  String toString() => 'HTTP error ${response.statusCode}: '
-      '${response.reasonPhrase}';
+  String toString() {
+    return 'PubHttpException: $message';
+  }
 }
 
-/// A middleware client that throttles the number of concurrent requests.
-///
-/// As long as the number of requests is within the limit, this works just like
-/// a normal client. If a request is made beyond the limit, the underlying HTTP
-/// request won't be sent until other requests have completed.
-class _ThrottleClient extends http.BaseClient {
-  final Pool _pool;
-  final http.Client _inner;
+/// Exception thrown when an HTTP response is not Ok.
+class PubHttpResponseException extends PubHttpException {
+  final http.BaseResponse response;
 
-  /// Creates a new client that allows no more than [maxActiveRequests]
-  /// concurrent requests.
-  ///
-  /// If [inner] is passed, it's used as the inner client for sending HTTP
-  /// requests. It defaults to `new http.Client()`.
-  _ThrottleClient(int maxActiveRequests, this._inner)
-      : _pool = Pool(maxActiveRequests);
+  PubHttpResponseException(this.response,
+      {String message = '', bool isIntermittent = false})
+      : super(message, isIntermittent: isIntermittent);
 
   @override
-  Future<http.StreamedResponse> send(http.BaseRequest request) async {
-    var resource = await _pool.request();
-
-    http.StreamedResponse response;
-    try {
-      response = await _inner.send(request);
-    } catch (_) {
-      resource.release();
-      rethrow;
+  String toString() {
+    var temp = 'PubHttpResponseException: HTTP error ${response.statusCode} '
+        '${response.reasonPhrase}';
+    if (message != '') {
+      temp += ': $message';
     }
+    return temp;
+  }
+}
 
-    final responseController = StreamController<List<int>>(sync: true);
-    unawaited(response.stream.pipe(responseController));
-    unawaited(responseController.done.then((_) => resource.release()));
-    return http.StreamedResponse(responseController.stream, response.statusCode,
-        contentLength: response.contentLength,
-        request: response.request,
-        headers: response.headers,
-        isRedirect: response.isRedirect,
-        persistentConnection: response.persistentConnection,
-        reasonPhrase: response.reasonPhrase);
+/// Whether [e] is one of a few HTTP-related exceptions that subclass
+/// [IOException]. Can be used if your try-catch block contains various
+/// operations in addition to HTTP calls and so a [IOException] instance check
+/// would be too coarse.
+bool isHttpIOException(Object e) {
+  return e is HttpException ||
+      e is TlsException ||
+      e is SocketException ||
+      e is WebSocketException;
+}
+
+/// Program-wide limiter for concurrent network requests.
+final _httpPool = Pool(16);
+
+/// Runs the provided function [fn] and returns the response.
+///
+/// If there is an HTTP-related exception, an intermittent HTTP error response,
+/// or an async timeout, [fn] is run repeatedly until there is a successful
+/// response or at most seven total attempts have been made. If all attempts
+/// fail, the final exception is re-thrown.
+///
+/// Each attempt is run within a [Pool] configured with 16 maximum resources.
+Future<T> retryForHttp<T>(String operation, FutureOr<T> Function() fn) async {
+  return await retry(
+      () async => await _httpPool.withResource(() async => await fn()),
+      retryIf: (e) async =>
+          (e is PubHttpException && e.isIntermittent) ||
+          e is TimeoutException ||
+          isHttpIOException(e),
+      onRetry: (exception, attemptNumber) async =>
+          log.io('Attempt #$attemptNumber for $operation'),
+      maxAttempts: math.max(
+        1, // Having less than 1 attempt doesn't make sense.
+        int.tryParse(Platform.environment['PUB_MAX_HTTP_RETRIES'] ?? '') ?? 7,
+      ));
+}
+
+extension Throwing on http.BaseResponse {
+  /// See https://api.flutter.dev/flutter/dart-io/HttpClientRequest/followRedirects.html
+  static const _redirectStatusCodes = [
+    HttpStatus.movedPermanently,
+    HttpStatus.movedTemporarily,
+    HttpStatus.seeOther,
+    HttpStatus.temporaryRedirect,
+    HttpStatus.permanentRedirect
+  ];
+
+  /// Throws [PubHttpResponseException], calls [fail], or does nothing depending
+  /// on the status code.
+  ///
+  /// If the code is in the 200 range or if its a 300 range redirect code,
+  /// nothing is done. If the code is 408, 429, or in the 500 range,
+  /// [PubHttpResponseException] is thrown with "isIntermittent" set to `true`.
+  /// Otherwise, [PubHttpResponseException] is thrown with "isIntermittent" set
+  /// to `false`.
+  void throwIfNotOk() {
+    if (statusCode >= 200 && statusCode <= 299) {
+      return;
+    } else if (_redirectStatusCodes.contains(statusCode)) {
+      return;
+    } else if (statusCode == HttpStatus.notAcceptable &&
+        request?.headers['Accept'] == pubApiHeaders['Accept']) {
+      fail('Pub ${sdk.version} is incompatible with the current version of '
+          '${request?.url.host}.\n'
+          'Upgrade pub to the latest version and try again.');
+    } else if (statusCode >= 500 ||
+        statusCode == HttpStatus.requestTimeout ||
+        statusCode == HttpStatus.tooManyRequests) {
+      // Throw if the response indicates a server error or an intermittent
+      // client error, but mark it as intermittent so it can be retried.
+      throw PubHttpResponseException(this, isIntermittent: true);
+    } else {
+      // Throw for all other status codes.
+      throw PubHttpResponseException(this);
+    }
+  }
+}
+
+extension RequestSending on http.Client {
+  /// Sends an HTTP request, reads the whole response body, validates the
+  /// response headers, and if validation is successful, and returns it.
+  ///
+  /// The send method on [http.Client], which returns a [http.StreamedResponse],
+  /// is the only method that accepts a request object. This method can be used
+  /// when you need to send a request object but want a regular response object.
+  ///
+  /// If false is passed for [throwIfNotOk], the response will not be validated.
+  /// See [http.BaseResponse.throwIfNotOk] extension for validation details.
+  Future<http.Response> fetch(http.BaseRequest request,
+      {bool throwIfNotOk = true}) async {
+    final streamedResponse = await send(request);
+    final response = await http.Response.fromStream(streamedResponse);
+    if (throwIfNotOk) {
+      response.throwIfNotOk();
+    }
+    return response;
   }
 
-  @override
-  void close() => _inner.close();
+  /// Sends an HTTP request, validates the response headers, and if validation
+  /// is successful, returns a [http.StreamedResponse].
+  ///
+  /// If false is passed for [throwIfNotOk], the response will not be validated.
+  /// See [http.BaseResponse.throwIfNotOk] extension for validation details.
+  Future<http.StreamedResponse> fetchAsStream(http.BaseRequest request,
+      {bool throwIfNotOk = true}) async {
+    final streamedResponse = await send(request);
+    if (throwIfNotOk) {
+      streamedResponse.throwIfNotOk();
+    }
+    return streamedResponse;
+  }
 }
diff --git a/lib/src/oauth2.dart b/lib/src/oauth2.dart
index 2159847..41eb5f5 100644
--- a/lib/src/oauth2.dart
+++ b/lib/src/oauth2.dart
@@ -6,6 +6,8 @@
 import 'dart:io';
 
 import 'package:collection/collection.dart' show IterableExtension;
+import 'package:http/http.dart' as http;
+import 'package:http/retry.dart';
 import 'package:oauth2/oauth2.dart';
 import 'package:path/path.dart' as path;
 import 'package:shelf/shelf.dart' as shelf;
@@ -17,6 +19,13 @@
 import 'system_cache.dart';
 import 'utils.dart';
 
+/// The global HTTP client with basic retries. Used instead of retryForHttp for
+/// OAuth calls because the OAuth2 package requires a client to be passed. While
+/// the retry logic is more basic, this is fine for the publishing process.
+final _retryHttpClient = RetryClient(globalHttpClient,
+    when: (response) => response.statusCode >= 500,
+    whenError: (e, _) => isHttpIOException(e));
+
 /// The pub client's OAuth2 identifier.
 const _identifier = '818368855108-8grd2eg9tj9f38os6f1urbcvsq399u8n.apps.'
     'googleusercontent.com';
@@ -26,6 +35,12 @@
 /// This isn't actually meant to be kept a secret.
 const _secret = 'SWeqj8seoJW0w7_CpEPFLX0K';
 
+/// The URL from which the pub client will retrieve Google's OIDC endpoint URIs.
+///
+/// [Google OpenID Connect documentation]: https://developers.google.com/identity/openid-connect/openid-connect#discovery
+final _oidcDiscoveryDocumentEndpoint =
+    Uri.https('accounts.google.com', '/.well-known/openid-configuration');
+
 /// The URL to which the user will be directed to authorize the pub client to
 /// get an OAuth2 access token.
 ///
@@ -142,7 +157,7 @@
       secret: _secret,
       // Google's OAuth2 API doesn't support basic auth.
       basicAuth: false,
-      httpClient: httpClient);
+      httpClient: _retryHttpClient);
   _saveCredentials(cache, client.credentials);
   return client;
 }
@@ -221,7 +236,7 @@
           secret: _secret,
           // Google's OAuth2 API doesn't support basic auth.
           basicAuth: false,
-          httpClient: httpClient);
+          httpClient: _retryHttpClient);
 
   // Spin up a one-shot HTTP server to receive the authorization code from the
   // Google OAuth2 server via redirect. This server will close itself as soon as
@@ -258,3 +273,16 @@
   log.message('Successfully authorized.\n');
   return client;
 }
+
+/// Fetches Google's OpenID Connect Discovery document and parses the JSON
+/// response body into a [Map].
+///
+/// See https://developers.google.com/identity/openid-connect/openid-connect#discovery
+Future<Map> fetchOidcDiscoveryDocument() async {
+  final discoveryResponse = await retryForHttp(
+      'fetching Google\'s OpenID Connect Discovery document', () async {
+    final request = http.Request('GET', _oidcDiscoveryDocumentEndpoint);
+    return await globalHttpClient.fetch(request);
+  });
+  return parseJsonResponse(discoveryResponse);
+}
diff --git a/lib/src/source/hosted.dart b/lib/src/source/hosted.dart
index 5922e3c..1cb05d6 100644
--- a/lib/src/source/hosted.dart
+++ b/lib/src/source/hosted.dart
@@ -5,7 +5,7 @@
 import 'dart:async';
 import 'dart:convert';
 import 'dart:io' as io;
-import 'dart:math' as math;
+import 'dart:io';
 import 'dart:typed_data';
 
 import 'package:collection/collection.dart'
@@ -172,6 +172,25 @@
     }
   }();
 
+  /// Whether extra metadata headers should be sent for HTTP requests to a given
+  /// [url].
+  static bool shouldSendAdditionalMetadataFor(Uri url) {
+    if (runningFromTest && Platform.environment.containsKey('PUB_HOSTED_URL')) {
+      if (url.origin != Platform.environment['PUB_HOSTED_URL']) {
+        return false;
+      }
+    } else {
+      if (!HostedSource.isPubDevUrl(url.toString())) return false;
+    }
+
+    if (Platform.environment.containsKey('CI') &&
+        Platform.environment['CI'] != 'false') {
+      return false;
+    }
+
+    return true;
+  }
+
   /// Returns a reference to a hosted package named [name].
   ///
   /// If [url] is passed, it's the URL of the pub server from which the package
@@ -245,8 +264,6 @@
     );
   }
 
-  HostedDescription _asDescription(desc) => desc as HostedDescription;
-
   /// Parses the description for a package.
   ///
   /// If the package parses correctly, this returns a (name, url) pair. If not,
@@ -370,6 +387,7 @@
     if (description is! HostedDescription) {
       throw ArgumentError('Wrong source');
     }
+    final packageName = description.packageName;
     final hostedUrl = description.url;
     final url = _listVersionsUrl(ref);
     log.io('Get versions from $url.');
@@ -379,12 +397,18 @@
     final List<_VersionInfo> result;
     try {
       // TODO(sigurdm): Implement cancellation of requests. This probably
-      // requires resolution of: https://github.com/dart-lang/sdk/issues/22265.
-      bodyText = await withAuthenticatedClient(
-        cache,
-        Uri.parse(hostedUrl),
-        (client) => client.read(url, headers: pubApiHeaders),
-      );
+      // requires resolution of: https://github.com/dart-lang/http/issues/424.
+      bodyText = await withAuthenticatedClient(cache, Uri.parse(hostedUrl),
+          (client) async {
+        return await retryForHttp(
+            'fetching versions for "$packageName" from "$url"', () async {
+          final request = http.Request('GET', url);
+          request.attachPubApiHeaders();
+          request.attachMetadataHeaders();
+          final response = await client.fetch(request);
+          return response.body;
+        });
+      });
       final decoded = jsonDecode(bodyText);
       if (decoded is! Map<String, dynamic>) {
         throw FormatException('version listing must be a mapping');
@@ -392,7 +416,6 @@
       body = decoded;
       result = _versionInfoFromPackageListing(body, ref, url, cache);
     } on Exception catch (error, stackTrace) {
-      final packageName = _asDescription(ref.description).packageName;
       _throwFriendlyError(error, stackTrace, packageName, hostedUrl);
     }
 
@@ -1047,43 +1070,33 @@
       // download.
       final expectedSha256 = versionInfo.archiveSha256;
 
-      // The client from `withAuthenticatedClient` will retry HTTP requests.
-      // This wrapper is one layer up and will retry checksum validation errors.
-      await retry(
-        // Attempt to download archive and validate its checksum.
-        () async {
+      await withAuthenticatedClient(cache, Uri.parse(description.url),
+          (client) async {
+        // In addition to HTTP errors, this will retry crc32c/sha256 errors as
+        // well because [PackageIntegrityException] subclasses
+        // [PubHttpException].
+        await retryForHttp('downloading "$archiveUrl"', () async {
           final request = http.Request('GET', archiveUrl);
-          final response = await withAuthenticatedClient(cache,
-              Uri.parse(description.url), (client) => client.send(request));
-          final expectedCrc32Checksum =
-              _parseCrc32c(response.headers, fileName);
+          request.attachMetadataHeaders();
+          final response = await client.fetchAsStream(request);
 
           Stream<List<int>> stream = response.stream;
-          if (expectedCrc32Checksum != null) {
-            stream = _validateStreamCrc32Checksum(
-                response.stream, expectedCrc32Checksum, id, archiveUrl);
+          final expectedCrc32c = _parseCrc32c(response.headers, fileName);
+          if (expectedCrc32c != null) {
+            stream = _validateCrc32c(
+                response.stream, expectedCrc32c, id, archiveUrl);
           }
           stream = validateSha256(
               stream, (expectedSha256 == null) ? null : Digest(expectedSha256));
+
           // We download the archive to disk instead of streaming it directly
           // into the tar unpacking. This simplifies stream handling.
           // Package:tar cancels the stream when it reaches end-of-archive, and
           // cancelling a http stream makes it not reusable.
           // There are ways around this, and we might revisit this later.
           await createFileFromStream(stream, archivePath);
-        },
-        // Retry if the checksum response header was malformed or the actual
-        // checksum did not match the expected checksum.
-        retryIf: (e) => e is PackageIntegrityException,
-        onRetry: (e, retryCount) => log
-            .io('Retry #${retryCount + 1} because of checksum error with GET '
-                '$archiveUrl...'),
-        maxAttempts: math.max(
-          1, // Having less than 1 attempt doesn't make sense.
-          int.tryParse(io.Platform.environment['PUB_MAX_HTTP_RETRIES'] ?? '') ??
-              7,
-        ),
-      );
+        });
+      });
 
       var tempDir = cache.createTempDir();
       try {
@@ -1193,7 +1206,7 @@
     String package,
     String hostedUrl,
   ) {
-    if (error is PubHttpException) {
+    if (error is PubHttpResponseException) {
       if (error.response.statusCode == 404) {
         throw PackageNotFoundException(
             'could not find package $package at $hostedUrl',
@@ -1461,7 +1474,7 @@
 /// the one present in the checksum response header.
 ///
 /// Throws [PackageIntegrityException] if there is a checksum mismatch.
-Stream<List<int>> _validateStreamCrc32Checksum(Stream<List<int>> stream,
+Stream<List<int>> _validateCrc32c(Stream<List<int>> stream,
     int expectedChecksum, PackageId id, Uri archiveUrl) async* {
   final crc32c = Crc32c();
 
@@ -1519,11 +1532,10 @@
 
         return ByteData.view(bytes.buffer).getUint32(0);
       } on FormatException catch (e, s) {
+        log.exception(e, s);
         throw PackageIntegrityException(
             'Package archive "$fileName" has a malformed CRC32C checksum in '
-            'its response headers',
-            innerError: e,
-            innerTrace: s);
+            'its response headers');
       }
     }
   }
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index 0f1bf51..0719bbb 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -693,7 +693,7 @@
   Duration maxDelay = const Duration(seconds: 30),
   int maxAttempts = 8,
   FutureOr<bool> Function(Exception)? retryIf,
-  FutureOr<void> Function(Exception, int retryCount)? onRetry,
+  FutureOr<void> Function(Exception, int attemptNumber)? onRetry,
 }) async {
   var attempt = 0;
   // ignore: literal_only_boolean_expressions
@@ -705,8 +705,9 @@
       if (attempt >= maxAttempts || (retryIf != null && !(await retryIf(e)))) {
         rethrow;
       }
+
       if (onRetry != null) {
-        await onRetry(e, attempt);
+        await onRetry(e, attempt + 1);
       }
     }
 
diff --git a/test/add/hosted/non_default_pub_server_test.dart b/test/add/hosted/non_default_pub_server_test.dart
index 787bf32..f93f292 100644
--- a/test/add/hosted/non_default_pub_server_test.dart
+++ b/test/add/hosted/non_default_pub_server_test.dart
@@ -91,7 +91,8 @@
 
     await pubAdd(
       args: ['foo', '--hosted-url', 'https://invalid-url.foo'],
-      error: contains('Could not resolve URL "https://invalid-url.foo".'),
+      error: contains('Got socket error trying to find package foo at '
+          'https://invalid-url.foo.'),
       exitCode: exit_codes.DATA,
       environment: {
         // Limit the retries - the url will never go valid.
diff --git a/test/content_hash_test.dart b/test/content_hash_test.dart
index 6a0725b..2485918 100644
--- a/test/content_hash_test.dart
+++ b/test/content_hash_test.dart
@@ -54,7 +54,8 @@
         'e7a7a0f6d9873e4c40cf68cc3cc9ca5b6c8cef6a2220241bdada4b9cb0083279');
     await appDir({'foo': 'any'}).create();
     await pubGet(
-      silent: contains('Retry #2'),
+      exitCode: exit_codes.TEMP_FAIL,
+      silent: contains('Attempt #2'),
       error:
           contains('Downloaded archive for foo-1.0.0 had wrong content-hash.'),
       environment: {
diff --git a/test/get/hosted/get_test.dart b/test/get/hosted/get_test.dart
index 81d4845..837fb32 100644
--- a/test/get/hosted/get_test.dart
+++ b/test/get/hosted/get_test.dart
@@ -139,11 +139,12 @@
     }).create();
 
     await pubGet(
+      exitCode: exit_codes.TEMP_FAIL,
       error: RegExp(
           r'''Package archive for foo 1.2.3 downloaded from "(.+)" has '''
           r'''"x-goog-hash: crc32c=(\d+)", which doesn't match the checksum '''
           r'''of the archive downloaded\.'''),
-      silent: contains('Retry #2 because of checksum error'),
+      silent: contains('Attempt #2'),
       environment: {
         'PUB_MAX_HTTP_RETRIES': '2',
       },
@@ -175,11 +176,11 @@
       }).create();
 
       await pubGet(
-        exitCode: exit_codes.DATA,
+        exitCode: exit_codes.TEMP_FAIL,
         error: contains(
             'Package archive "foo-1.2.3.tar.gz" has a malformed CRC32C '
             'checksum in its response headers'),
-        silent: contains('Retry #2 because of checksum error'),
+        silent: contains('Attempt #2'),
         environment: {
           'PUB_MAX_HTTP_RETRIES': '2',
         },
@@ -195,11 +196,11 @@
       }).create();
 
       await pubGet(
-        exitCode: exit_codes.DATA,
+        exitCode: exit_codes.TEMP_FAIL,
         error: contains(
             'Package archive "bar-1.2.3.tar.gz" has a malformed CRC32C '
             'checksum in its response headers'),
-        silent: contains('Retry #2 because of checksum error'),
+        silent: contains('Attempt #2'),
         environment: {
           'PUB_MAX_HTTP_RETRIES': '2',
         },
@@ -215,11 +216,11 @@
       }).create();
 
       await pubGet(
-        exitCode: exit_codes.DATA,
+        exitCode: exit_codes.TEMP_FAIL,
         error: contains(
             'Package archive "baz-1.2.3.tar.gz" has a malformed CRC32C '
             'checksum in its response headers'),
-        silent: contains('Retry #2 because of checksum error'),
+        silent: contains('Attempt #2'),
         environment: {
           'PUB_MAX_HTTP_RETRIES': '2',
         },
diff --git a/test/hosted/fail_gracefully_on_url_resolve_test.dart b/test/hosted/fail_gracefully_on_url_resolve_test.dart
index 7a91e7a..0dff951 100644
--- a/test/hosted/fail_gracefully_on_url_resolve_test.dart
+++ b/test/hosted/fail_gracefully_on_url_resolve_test.dart
@@ -20,7 +20,8 @@
       ]).create();
 
       await pubCommand(command,
-          error: 'Could not resolve URL "https://invalid-url.foo".',
+          error: 'Got socket error trying to find package foo at '
+              'https://invalid-url.foo.',
           exitCode: exit_codes.UNAVAILABLE,
           environment: {
             'PUB_MAX_HTTP_RETRIES': '2',
diff --git a/tool/extract_all_pub_dev.dart b/tool/extract_all_pub_dev.dart
index 474c897..ca90cd0 100644
--- a/tool/extract_all_pub_dev.dart
+++ b/tool/extract_all_pub_dev.dart
@@ -20,13 +20,19 @@
 
 Future<List<String>> allPackageNames() async {
   var nextUrl = Uri.https('pub.dev', 'api/packages?compact=1');
-  final result = json.decode(await httpClient.read(nextUrl));
+  final request = http.Request('GET', nextUrl);
+  request.attachMetadataHeaders();
+  final response = await globalHttpClient.fetch(request);
+  final result = json.decode(response.body);
   return List<String>.from(result['packages']);
 }
 
 Future<List<String>> versionArchiveUrls(String packageName) async {
   final url = Uri.https('pub.dev', 'api/packages/$packageName');
-  final result = json.decode(await httpClient.read(url));
+  final request = http.Request('GET', url);
+  request.attachMetadataHeaders();
+  final response = await globalHttpClient.fetch(request);
+  final result = json.decode(response.body);
   return List<String>.from(result['versions'].map((v) => v['archive_url']));
 }
 
@@ -81,8 +87,10 @@
               log.message('downloading $archiveUrl');
               http.StreamedResponse response;
               try {
-                response = await httpClient
-                    .send(http.Request('GET', Uri.parse(archiveUrl)));
+                final archiveUri = Uri.parse(archiveUrl);
+                final request = http.Request('GET', archiveUri);
+                request.attachMetadataHeaders();
+                response = await globalHttpClient.fetchAsStream(request);
                 await extractTarGz(response.stream, tempDir);
                 log.message('Extracted $archiveUrl');
               } catch (e) {