Refactor HTTP retries (#3325) (#3590)
* Break out networking related OS error codes
* Add mapException feature to retry util
* Add WIP sendWithRetries method
* Fix potential runtime null response in onRetry option
* Remove code to get CI working
* Apply suggestions from code review
Co-authored-by: Jonas Finnemann Jensen <jopsen@gmail.com>
* Update Pub HTTP exceptions
* Remove stack trace from retry methods
* Remove hard-coded domain check for HTTP 500
* Simplify method
* Align retry behavior with requirements
* Move withAuthenticatedClient a layer up and clarify comment
* Clarify usage of global HTTP client
* Take request metadata out of _PubHttpClient
* Use RetryClient for OAuth2 calls
* Retrigger checks
* Wrap versions HTTP call with retryForHttp
* Add more comments
* Use "throwIfNotOk" in versions HTTP call
* Change PackageIntegrityException to always subclass as an intermittent PubHttpException
* Configure OAuth2 HTTP retries
* Make metadata headers method an extension on http.Request and use .throwIfNotOk() in more places
* Add retries to OIDC discovery document request
* Add some retries to upload command
* Add retries to upload package step
* Fix indentation
* Make PubHttpResponseException use BaseResponse
* Fix get_test
* Fix more tests
* Fix SHA-256 tests
* Only send pub.dev API Accept header when necessary and fix tests
Co-authored-by: Jonas Finnemann Jensen <jopsen@gmail.com>
diff --git a/lib/src/authentication/client.dart b/lib/src/authentication/client.dart
index a6001ec..787f394 100644
--- a/lib/src/authentication/client.dart
+++ b/lib/src/authentication/client.dart
@@ -48,19 +48,14 @@
await _credential!.getAuthorizationHeaderValue();
}
- try {
- final response = await _inner.send(request);
- if (response.statusCode == 401) {
- _detectInvalidCredentials = true;
- _throwAuthException(response);
- }
- return response;
- } on PubHttpException catch (e) {
- if (e.response.statusCode == 403) {
- _throwAuthException(e.response);
- }
- rethrow;
+ final response = await _inner.send(request);
+ if (response.statusCode == 401) {
+ _detectInvalidCredentials = true;
}
+ if (response.statusCode == 401 || response.statusCode == 403) {
+ _throwAuthException(response);
+ }
+ return response;
}
/// Throws [AuthenticationException] that includes response status code and
@@ -127,7 +122,7 @@
Future<T> Function(http.Client) fn,
) async {
final credential = systemCache.tokenStore.findCredential(hostedUrl);
- final client = _AuthenticatedClient(httpClient, credential);
+ final client = _AuthenticatedClient(globalHttpClient, credential);
try {
return await fn(client);
diff --git a/lib/src/command.dart b/lib/src/command.dart
index 31f0b17..90f920d 100644
--- a/lib/src/command.dart
+++ b/lib/src/command.dart
@@ -238,7 +238,7 @@
log.message('Logs written to $transcriptPath.');
}
}
- httpClient.close();
+ globalHttpClient.close();
}
}
@@ -253,7 +253,9 @@
exception = exception.innerError!;
}
- if (exception is HttpException ||
+ if (exception is PackageIntegrityException) {
+ return exit_codes.TEMP_FAIL;
+ } else if (exception is HttpException ||
exception is http.ClientException ||
exception is SocketException ||
exception is TlsException ||
diff --git a/lib/src/command/lish.dart b/lib/src/command/lish.dart
index 4b182b2..7090412 100644
--- a/lib/src/command/lish.dart
+++ b/lib/src/command/lish.dart
@@ -93,34 +93,51 @@
try {
await log.progress('Uploading', () async {
- var newUri = host.resolve('api/packages/versions/new');
- var response = await client.get(newUri, headers: pubApiHeaders);
- var parameters = parseJsonResponse(response);
+ /// 1. Initiate upload
+ final parametersResponse =
+ await retryForHttp('initiating upload', () async {
+ final request =
+ http.Request('GET', host.resolve('api/packages/versions/new'));
+ request.attachPubApiHeaders();
+ request.attachMetadataHeaders();
+ return await client.fetch(request);
+ });
+ final parameters = parseJsonResponse(parametersResponse);
- var url = _expectField(parameters, 'url', response);
- if (url is! String) invalidServerResponse(response);
+ /// 2. Upload package
+ var url = _expectField(parameters, 'url', parametersResponse);
+ if (url is! String) invalidServerResponse(parametersResponse);
cloudStorageUrl = Uri.parse(url);
- // TODO(nweiz): Cloud Storage can provide an XML-formatted error. We
- // should report that error and exit.
- var request = http.MultipartRequest('POST', cloudStorageUrl!);
+ final uploadResponse =
+ await retryForHttp('uploading package', () async {
+ // TODO(nweiz): Cloud Storage can provide an XML-formatted error. We
+ // should report that error and exit.
+ var request = http.MultipartRequest('POST', cloudStorageUrl!);
- var fields = _expectField(parameters, 'fields', response);
- if (fields is! Map) invalidServerResponse(response);
- fields.forEach((key, value) {
- if (value is! String) invalidServerResponse(response);
- request.fields[key] = value;
+ var fields = _expectField(parameters, 'fields', parametersResponse);
+ if (fields is! Map) invalidServerResponse(parametersResponse);
+ fields.forEach((key, value) {
+ if (value is! String) invalidServerResponse(parametersResponse);
+ request.fields[key] = value;
+ });
+
+ request.followRedirects = false;
+ request.files.add(http.MultipartFile.fromBytes('file', packageBytes,
+ filename: 'package.tar.gz'));
+ return await client.fetch(request);
});
- request.followRedirects = false;
- request.files.add(http.MultipartFile.fromBytes('file', packageBytes,
- filename: 'package.tar.gz'));
- var postResponse =
- await http.Response.fromStream(await client.send(request));
-
- var location = postResponse.headers['location'];
- if (location == null) throw PubHttpException(postResponse);
- handleJsonSuccess(
- await client.get(Uri.parse(location), headers: pubApiHeaders));
+ /// 3. Finalize publish
+ var location = uploadResponse.headers['location'];
+ if (location == null) throw PubHttpResponseException(uploadResponse);
+ final finalizeResponse =
+ await retryForHttp('finalizing publish', () async {
+ final request = http.Request('GET', Uri.parse(location));
+ request.attachPubApiHeaders();
+ request.attachMetadataHeaders();
+ return await client.fetch(request);
+ });
+ handleJsonSuccess(finalizeResponse);
});
} on AuthenticationException catch (error) {
var msg = '';
@@ -138,7 +155,7 @@
msg += '\n${error.serverMessage!}\n';
}
dataError(msg + log.red('Authentication failed!'));
- } on PubHttpException catch (error) {
+ } on PubHttpResponseException catch (error) {
var url = error.response.request!.url;
if (url == cloudStorageUrl) {
// TODO(nweiz): the response may have XML-formatted information about
@@ -189,7 +206,7 @@
return _publishUsingClient(packageBytes, client);
});
}
- } on PubHttpException catch (error) {
+ } on PubHttpResponseException catch (error) {
var url = error.response.request!.url;
if (Uri.parse(url.origin) == Uri.parse(host.origin)) {
handleJsonError(error.response);
diff --git a/lib/src/command/login.dart b/lib/src/command/login.dart
index 8f428ab..0743845 100644
--- a/lib/src/command/login.dart
+++ b/lib/src/command/login.dart
@@ -7,7 +7,6 @@
import '../command.dart';
import '../command_runner.dart';
-import '../http.dart';
import '../log.dart' as log;
import '../oauth2.dart' as oauth2;
@@ -46,9 +45,8 @@
Future<_UserInfo?> _retrieveUserInfo() async {
return await oauth2.withClient(cache, (client) async {
- final discovery = await httpClient.get(Uri.https(
- 'accounts.google.com', '/.well-known/openid-configuration'));
- final userInfoEndpoint = json.decode(discovery.body)['userinfo_endpoint'];
+ final discovery = await oauth2.fetchOidcDiscoveryDocument();
+ final userInfoEndpoint = discovery['userinfo_endpoint'];
final userInfoRequest = await client.get(Uri.parse(userInfoEndpoint));
if (userInfoRequest.statusCode != 200) return null;
try {
diff --git a/lib/src/exceptions.dart b/lib/src/exceptions.dart
index 39e9c51..90c9d1a 100644
--- a/lib/src/exceptions.dart
+++ b/lib/src/exceptions.dart
@@ -12,6 +12,7 @@
import 'package:yaml/yaml.dart';
import 'dart.dart';
+import 'http.dart';
/// An exception class for exceptions that are intended to be seen by the user.
///
@@ -106,12 +107,9 @@
}
/// A class for exceptions where a package's checksum could not be validated.
-class PackageIntegrityException extends WrappedException {
- PackageIntegrityException(
- String message, {
- Object? innerError,
- StackTrace? innerTrace,
- }) : super(message, innerError, innerTrace);
+class PackageIntegrityException extends PubHttpException {
+ PackageIntegrityException(String message)
+ : super(message, isIntermittent: true);
}
/// Returns whether [error] is a user-facing error object.
diff --git a/lib/src/http.dart b/lib/src/http.dart
index 6473a8e..ccd5146 100644
--- a/lib/src/http.dart
+++ b/lib/src/http.dart
@@ -9,14 +9,10 @@
import 'dart:math' as math;
import 'package:http/http.dart' as http;
-import 'package:http/retry.dart';
import 'package:pool/pool.dart';
-import 'package:stack_trace/stack_trace.dart';
import 'command.dart';
-import 'io.dart';
import 'log.dart' as log;
-import 'oauth2.dart' as oauth2;
import 'package.dart';
import 'sdk.dart';
import 'source/hosted.dart';
@@ -46,22 +42,6 @@
@override
Future<http.StreamedResponse> send(http.BaseRequest request) async {
- if (_shouldAddMetadata(request)) {
- request.headers['X-Pub-OS'] = Platform.operatingSystem;
- request.headers['X-Pub-Command'] = PubCommand.command;
- request.headers['X-Pub-Session-ID'] = _sessionId;
-
- var environment = Platform.environment['PUB_ENVIRONMENT'];
- if (environment != null) {
- request.headers['X-Pub-Environment'] = environment;
- }
-
- var type = Zone.current[#_dependencyType];
- if (type != null && type != DependencyType.none) {
- request.headers['X-Pub-Reason'] = type.toString();
- }
- }
-
_requestStopwatches[request] = Stopwatch()..start();
request.headers[HttpHeaders.userAgentHeader] = 'Dart pub ${sdk.version}';
_logRequest(request);
@@ -73,24 +53,6 @@
return streamedResponse;
}
- /// Whether extra metadata headers should be added to [request].
- bool _shouldAddMetadata(http.BaseRequest request) {
- if (runningFromTest && Platform.environment.containsKey('PUB_HOSTED_URL')) {
- if (request.url.origin != Platform.environment['PUB_HOSTED_URL']) {
- return false;
- }
- } else {
- if (!HostedSource.isPubDevUrl(request.url.toString())) return false;
- }
-
- if (Platform.environment.containsKey('CI') &&
- Platform.environment['CI'] != 'false') {
- return false;
- }
-
- return true;
- }
-
/// Logs the fact that [request] was sent, and information about it.
void _logRequest(http.BaseRequest request) {
var requestLog = StringBuffer();
@@ -155,130 +117,14 @@
void close() => _inner.close();
}
-/// The [_PubHttpClient] wrapped by [httpClient].
+/// The [_PubHttpClient] wrapped by [globalHttpClient].
final _pubClient = _PubHttpClient();
-/// A set of all hostnames for which we've printed a message indicating that
-/// we're waiting for them to come back up.
-final _retriedHosts = <String>{};
-
-/// Intercepts all requests and throws exceptions if the response was not
-/// considered successful.
-class _ThrowingClient extends http.BaseClient {
- final http.Client _inner;
-
- _ThrowingClient(this._inner);
-
- @override
- Future<http.StreamedResponse> send(http.BaseRequest request) async {
- late http.StreamedResponse streamedResponse;
- try {
- streamedResponse = await _inner.send(request);
- } on SocketException catch (error, stackTraceOrNull) {
- // Work around issue 23008.
- var stackTrace = stackTraceOrNull;
-
- if (error.osError == null) rethrow;
-
- // Handle error codes known to be related to DNS or SSL issues. While it
- // is tempting to handle these error codes before retrying, saving time
- // for the end-user, it is known that DNS lookups can fail intermittently
- // in some cloud environments. Furthermore, since these error codes are
- // platform-specific (undocumented) and essentially cargo-culted along
- // skipping retries may lead to intermittent issues that could be fixed
- // with a retry. Failing to retry intermittent issues is likely to cause
- // customers to wrap pub in a retry loop which will not improve the
- // end-user experience.
- if (error.osError!.errorCode == 8 ||
- error.osError!.errorCode == -2 ||
- error.osError!.errorCode == -5 ||
- error.osError!.errorCode == 11001 ||
- error.osError!.errorCode == 11004) {
- fail('Could not resolve URL "${request.url.origin}".', error,
- stackTrace);
- } else if (error.osError!.errorCode == -12276) {
- fail(
- 'Unable to validate SSL certificate for '
- '"${request.url.origin}".',
- error,
- stackTrace);
- } else {
- rethrow;
- }
- }
-
- var status = streamedResponse.statusCode;
- // 401 responses should be handled by the OAuth2 client. It's very
- // unlikely that they'll be returned by non-OAuth2 requests. We also want
- // to pass along 400 responses from the token endpoint.
- var tokenRequest = streamedResponse.request!.url == oauth2.tokenEndpoint;
- if (status < 400 || status == 401 || (status == 400 && tokenRequest)) {
- return streamedResponse;
- }
-
- if (status == 406 && request.headers['Accept'] == pubApiHeaders['Accept']) {
- fail('Pub ${sdk.version} is incompatible with the current version of '
- '${request.url.host}.\n'
- 'Upgrade pub to the latest version and try again.');
- }
-
- if (status == 500 &&
- (request.url.host == 'pub.dev' ||
- request.url.host == 'storage.googleapis.com')) {
- fail('HTTP error 500: Internal Server Error at ${request.url}.\n'
- 'This is likely a transient error. Please try again later.');
- }
-
- throw PubHttpException(await http.Response.fromStream(streamedResponse));
- }
-
- @override
- void close() => _inner.close();
-}
-
/// The HTTP client to use for all HTTP requests.
-final httpClient = _ThrottleClient(
- 16,
- _ThrowingClient(RetryClient(_pubClient,
- retries: math.max(
- 1, // Having less than 1 retry is **always** wrong.
- int.tryParse(Platform.environment['PUB_MAX_HTTP_RETRIES'] ?? '') ?? 7,
- ),
- when: (response) =>
- const [500, 502, 503, 504].contains(response.statusCode),
- whenError: (error, stackTrace) {
- if (error is! IOException) return false;
+final globalHttpClient = _pubClient;
- var chain = Chain.forTrace(stackTrace);
- log.io('HTTP error:\n$error\n\n${chain.terse}');
- return true;
- },
- delay: (retryCount) {
- if (retryCount < 3) {
- // Retry quickly a couple times in case of a short transient error.
- //
- // Add a random delay to avoid retrying a bunch of parallel requests
- // all at the same time.
- return Duration(milliseconds: 500) * math.pow(1.5, retryCount) +
- Duration(milliseconds: random.nextInt(500));
- } else {
- // If the error persists, wait a long time. This works around issues
- // where an AppEngine instance will go down and need to be rebooted,
- // which takes about a minute.
- return Duration(seconds: 30);
- }
- },
- onRetry: (request, response, retryCount) {
- log.io('Retry #${retryCount + 1} for '
- '${request.method} ${request.url}...');
- if (retryCount != 3) return;
- if (!_retriedHosts.add(request.url.host)) return;
- log.message(
- 'It looks like ${request.url.host} is having some trouble.\n'
- 'Pub will wait for a while before trying to connect again.');
- })));
-
-/// The underlying HTTP client wrapped by [httpClient].
+/// The underlying HTTP client wrapped by [globalHttpClient].
+/// This enables the ability to use a mock client in tests.
http.Client get innerHttpClient => _pubClient._inner;
set innerHttpClient(http.Client client) => _pubClient._inner = client;
@@ -294,6 +140,36 @@
return runZoned(callback, zoneValues: {#_dependencyType: type});
}
+extension AttachHeaders on http.Request {
+ /// Adds headers required for pub.dev API requests.
+ void attachPubApiHeaders() {
+ headers.addAll(pubApiHeaders);
+ }
+
+ /// Adds request metadata headers about the Pub tool's environment and the
+ /// currently running command if the request URL indicates the destination is
+ /// a Hosted Pub Repository.
+ void attachMetadataHeaders() {
+ if (!HostedSource.shouldSendAdditionalMetadataFor(url)) {
+ return;
+ }
+
+ headers['X-Pub-OS'] = Platform.operatingSystem;
+ headers['X-Pub-Command'] = PubCommand.command;
+ headers['X-Pub-Session-ID'] = _sessionId;
+
+ var environment = Platform.environment['PUB_ENVIRONMENT'];
+ if (environment != null) {
+ headers['X-Pub-Environment'] = environment;
+ }
+
+ var type = Zone.current[#_dependencyType];
+ if (type != null && type != DependencyType.none) {
+ headers['X-Pub-Reason'] = type.toString();
+ }
+ }
+}
+
/// Handles a successful JSON-formatted response from pub.dev.
///
/// These responses are expected to be of the form `{"success": {"message":
@@ -314,7 +190,12 @@
/// These responses are expected to be of the form `{"error": {"message": "some
/// message"}}`. If the format is correct, the message will be raised as an
/// error; otherwise an [invalidServerResponse] error will be raised.
-void handleJsonError(http.Response response) {
+void handleJsonError(http.BaseResponse response) {
+ if (response is! http.Response) {
+ // Not likely to be a common code path, but necessary.
+ // See https://github.com/dart-lang/pub/pull/3590#discussion_r1012978108
+ fail(log.red('Invalid server response'));
+ }
var errorMap = parseJsonResponse(response);
if (errorMap['error'] is! Map ||
!errorMap['error'].containsKey('message') ||
@@ -345,56 +226,145 @@
/// Exception thrown when an HTTP operation fails.
class PubHttpException implements Exception {
- final http.Response response;
+ final String message;
+ final bool isIntermittent;
- const PubHttpException(this.response);
+ PubHttpException(this.message, {this.isIntermittent = false});
@override
- String toString() => 'HTTP error ${response.statusCode}: '
- '${response.reasonPhrase}';
+ String toString() {
+ return 'PubHttpException: $message';
+ }
}
-/// A middleware client that throttles the number of concurrent requests.
-///
-/// As long as the number of requests is within the limit, this works just like
-/// a normal client. If a request is made beyond the limit, the underlying HTTP
-/// request won't be sent until other requests have completed.
-class _ThrottleClient extends http.BaseClient {
- final Pool _pool;
- final http.Client _inner;
+/// Exception thrown when an HTTP response is not Ok.
+class PubHttpResponseException extends PubHttpException {
+ final http.BaseResponse response;
- /// Creates a new client that allows no more than [maxActiveRequests]
- /// concurrent requests.
- ///
- /// If [inner] is passed, it's used as the inner client for sending HTTP
- /// requests. It defaults to `new http.Client()`.
- _ThrottleClient(int maxActiveRequests, this._inner)
- : _pool = Pool(maxActiveRequests);
+ PubHttpResponseException(this.response,
+ {String message = '', bool isIntermittent = false})
+ : super(message, isIntermittent: isIntermittent);
@override
- Future<http.StreamedResponse> send(http.BaseRequest request) async {
- var resource = await _pool.request();
-
- http.StreamedResponse response;
- try {
- response = await _inner.send(request);
- } catch (_) {
- resource.release();
- rethrow;
+ String toString() {
+ var temp = 'PubHttpResponseException: HTTP error ${response.statusCode} '
+ '${response.reasonPhrase}';
+ if (message != '') {
+ temp += ': $message';
}
+ return temp;
+ }
+}
- final responseController = StreamController<List<int>>(sync: true);
- unawaited(response.stream.pipe(responseController));
- unawaited(responseController.done.then((_) => resource.release()));
- return http.StreamedResponse(responseController.stream, response.statusCode,
- contentLength: response.contentLength,
- request: response.request,
- headers: response.headers,
- isRedirect: response.isRedirect,
- persistentConnection: response.persistentConnection,
- reasonPhrase: response.reasonPhrase);
+/// Whether [e] is one of a few HTTP-related exceptions that subclass
+/// [IOException]. Can be used if your try-catch block contains various
+/// operations in addition to HTTP calls and so a [IOException] instance check
+/// would be too coarse.
+bool isHttpIOException(Object e) {
+ return e is HttpException ||
+ e is TlsException ||
+ e is SocketException ||
+ e is WebSocketException;
+}
+
+/// Program-wide limiter for concurrent network requests.
+final _httpPool = Pool(16);
+
+/// Runs the provided function [fn] and returns the response.
+///
+/// If there is an HTTP-related exception, an intermittent HTTP error response,
+/// or an async timeout, [fn] is run repeatedly until there is a successful
+/// response or at most seven total attempts have been made. If all attempts
+/// fail, the final exception is re-thrown.
+///
+/// Each attempt is run within a [Pool] configured with 16 maximum resources.
+Future<T> retryForHttp<T>(String operation, FutureOr<T> Function() fn) async {
+ return await retry(
+ () async => await _httpPool.withResource(() async => await fn()),
+ retryIf: (e) async =>
+ (e is PubHttpException && e.isIntermittent) ||
+ e is TimeoutException ||
+ isHttpIOException(e),
+ onRetry: (exception, attemptNumber) async =>
+ log.io('Attempt #$attemptNumber for $operation'),
+ maxAttempts: math.max(
+ 1, // Having less than 1 attempt doesn't make sense.
+ int.tryParse(Platform.environment['PUB_MAX_HTTP_RETRIES'] ?? '') ?? 7,
+ ));
+}
+
+extension Throwing on http.BaseResponse {
+ /// See https://api.flutter.dev/flutter/dart-io/HttpClientRequest/followRedirects.html
+ static const _redirectStatusCodes = [
+ HttpStatus.movedPermanently,
+ HttpStatus.movedTemporarily,
+ HttpStatus.seeOther,
+ HttpStatus.temporaryRedirect,
+ HttpStatus.permanentRedirect
+ ];
+
+ /// Throws [PubHttpResponseException], calls [fail], or does nothing depending
+ /// on the status code.
+ ///
+ /// If the code is in the 200 range or if its a 300 range redirect code,
+ /// nothing is done. If the code is 408, 429, or in the 500 range,
+ /// [PubHttpResponseException] is thrown with "isIntermittent" set to `true`.
+ /// Otherwise, [PubHttpResponseException] is thrown with "isIntermittent" set
+ /// to `false`.
+ void throwIfNotOk() {
+ if (statusCode >= 200 && statusCode <= 299) {
+ return;
+ } else if (_redirectStatusCodes.contains(statusCode)) {
+ return;
+ } else if (statusCode == HttpStatus.notAcceptable &&
+ request?.headers['Accept'] == pubApiHeaders['Accept']) {
+ fail('Pub ${sdk.version} is incompatible with the current version of '
+ '${request?.url.host}.\n'
+ 'Upgrade pub to the latest version and try again.');
+ } else if (statusCode >= 500 ||
+ statusCode == HttpStatus.requestTimeout ||
+ statusCode == HttpStatus.tooManyRequests) {
+ // Throw if the response indicates a server error or an intermittent
+ // client error, but mark it as intermittent so it can be retried.
+ throw PubHttpResponseException(this, isIntermittent: true);
+ } else {
+ // Throw for all other status codes.
+ throw PubHttpResponseException(this);
+ }
+ }
+}
+
+extension RequestSending on http.Client {
+ /// Sends an HTTP request, reads the whole response body, validates the
+ /// response headers, and if validation is successful, and returns it.
+ ///
+ /// The send method on [http.Client], which returns a [http.StreamedResponse],
+ /// is the only method that accepts a request object. This method can be used
+ /// when you need to send a request object but want a regular response object.
+ ///
+ /// If false is passed for [throwIfNotOk], the response will not be validated.
+ /// See [http.BaseResponse.throwIfNotOk] extension for validation details.
+ Future<http.Response> fetch(http.BaseRequest request,
+ {bool throwIfNotOk = true}) async {
+ final streamedResponse = await send(request);
+ final response = await http.Response.fromStream(streamedResponse);
+ if (throwIfNotOk) {
+ response.throwIfNotOk();
+ }
+ return response;
}
- @override
- void close() => _inner.close();
+ /// Sends an HTTP request, validates the response headers, and if validation
+ /// is successful, returns a [http.StreamedResponse].
+ ///
+ /// If false is passed for [throwIfNotOk], the response will not be validated.
+ /// See [http.BaseResponse.throwIfNotOk] extension for validation details.
+ Future<http.StreamedResponse> fetchAsStream(http.BaseRequest request,
+ {bool throwIfNotOk = true}) async {
+ final streamedResponse = await send(request);
+ if (throwIfNotOk) {
+ streamedResponse.throwIfNotOk();
+ }
+ return streamedResponse;
+ }
}
diff --git a/lib/src/oauth2.dart b/lib/src/oauth2.dart
index 2159847..41eb5f5 100644
--- a/lib/src/oauth2.dart
+++ b/lib/src/oauth2.dart
@@ -6,6 +6,8 @@
import 'dart:io';
import 'package:collection/collection.dart' show IterableExtension;
+import 'package:http/http.dart' as http;
+import 'package:http/retry.dart';
import 'package:oauth2/oauth2.dart';
import 'package:path/path.dart' as path;
import 'package:shelf/shelf.dart' as shelf;
@@ -17,6 +19,13 @@
import 'system_cache.dart';
import 'utils.dart';
+/// The global HTTP client with basic retries. Used instead of retryForHttp for
+/// OAuth calls because the OAuth2 package requires a client to be passed. While
+/// the retry logic is more basic, this is fine for the publishing process.
+final _retryHttpClient = RetryClient(globalHttpClient,
+ when: (response) => response.statusCode >= 500,
+ whenError: (e, _) => isHttpIOException(e));
+
/// The pub client's OAuth2 identifier.
const _identifier = '818368855108-8grd2eg9tj9f38os6f1urbcvsq399u8n.apps.'
'googleusercontent.com';
@@ -26,6 +35,12 @@
/// This isn't actually meant to be kept a secret.
const _secret = 'SWeqj8seoJW0w7_CpEPFLX0K';
+/// The URL from which the pub client will retrieve Google's OIDC endpoint URIs.
+///
+/// [Google OpenID Connect documentation]: https://developers.google.com/identity/openid-connect/openid-connect#discovery
+final _oidcDiscoveryDocumentEndpoint =
+ Uri.https('accounts.google.com', '/.well-known/openid-configuration');
+
/// The URL to which the user will be directed to authorize the pub client to
/// get an OAuth2 access token.
///
@@ -142,7 +157,7 @@
secret: _secret,
// Google's OAuth2 API doesn't support basic auth.
basicAuth: false,
- httpClient: httpClient);
+ httpClient: _retryHttpClient);
_saveCredentials(cache, client.credentials);
return client;
}
@@ -221,7 +236,7 @@
secret: _secret,
// Google's OAuth2 API doesn't support basic auth.
basicAuth: false,
- httpClient: httpClient);
+ httpClient: _retryHttpClient);
// Spin up a one-shot HTTP server to receive the authorization code from the
// Google OAuth2 server via redirect. This server will close itself as soon as
@@ -258,3 +273,16 @@
log.message('Successfully authorized.\n');
return client;
}
+
+/// Fetches Google's OpenID Connect Discovery document and parses the JSON
+/// response body into a [Map].
+///
+/// See https://developers.google.com/identity/openid-connect/openid-connect#discovery
+Future<Map> fetchOidcDiscoveryDocument() async {
+ final discoveryResponse = await retryForHttp(
+ 'fetching Google\'s OpenID Connect Discovery document', () async {
+ final request = http.Request('GET', _oidcDiscoveryDocumentEndpoint);
+ return await globalHttpClient.fetch(request);
+ });
+ return parseJsonResponse(discoveryResponse);
+}
diff --git a/lib/src/source/hosted.dart b/lib/src/source/hosted.dart
index 5922e3c..1cb05d6 100644
--- a/lib/src/source/hosted.dart
+++ b/lib/src/source/hosted.dart
@@ -5,7 +5,7 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io' as io;
-import 'dart:math' as math;
+import 'dart:io';
import 'dart:typed_data';
import 'package:collection/collection.dart'
@@ -172,6 +172,25 @@
}
}();
+ /// Whether extra metadata headers should be sent for HTTP requests to a given
+ /// [url].
+ static bool shouldSendAdditionalMetadataFor(Uri url) {
+ if (runningFromTest && Platform.environment.containsKey('PUB_HOSTED_URL')) {
+ if (url.origin != Platform.environment['PUB_HOSTED_URL']) {
+ return false;
+ }
+ } else {
+ if (!HostedSource.isPubDevUrl(url.toString())) return false;
+ }
+
+ if (Platform.environment.containsKey('CI') &&
+ Platform.environment['CI'] != 'false') {
+ return false;
+ }
+
+ return true;
+ }
+
/// Returns a reference to a hosted package named [name].
///
/// If [url] is passed, it's the URL of the pub server from which the package
@@ -245,8 +264,6 @@
);
}
- HostedDescription _asDescription(desc) => desc as HostedDescription;
-
/// Parses the description for a package.
///
/// If the package parses correctly, this returns a (name, url) pair. If not,
@@ -370,6 +387,7 @@
if (description is! HostedDescription) {
throw ArgumentError('Wrong source');
}
+ final packageName = description.packageName;
final hostedUrl = description.url;
final url = _listVersionsUrl(ref);
log.io('Get versions from $url.');
@@ -379,12 +397,18 @@
final List<_VersionInfo> result;
try {
// TODO(sigurdm): Implement cancellation of requests. This probably
- // requires resolution of: https://github.com/dart-lang/sdk/issues/22265.
- bodyText = await withAuthenticatedClient(
- cache,
- Uri.parse(hostedUrl),
- (client) => client.read(url, headers: pubApiHeaders),
- );
+ // requires resolution of: https://github.com/dart-lang/http/issues/424.
+ bodyText = await withAuthenticatedClient(cache, Uri.parse(hostedUrl),
+ (client) async {
+ return await retryForHttp(
+ 'fetching versions for "$packageName" from "$url"', () async {
+ final request = http.Request('GET', url);
+ request.attachPubApiHeaders();
+ request.attachMetadataHeaders();
+ final response = await client.fetch(request);
+ return response.body;
+ });
+ });
final decoded = jsonDecode(bodyText);
if (decoded is! Map<String, dynamic>) {
throw FormatException('version listing must be a mapping');
@@ -392,7 +416,6 @@
body = decoded;
result = _versionInfoFromPackageListing(body, ref, url, cache);
} on Exception catch (error, stackTrace) {
- final packageName = _asDescription(ref.description).packageName;
_throwFriendlyError(error, stackTrace, packageName, hostedUrl);
}
@@ -1047,43 +1070,33 @@
// download.
final expectedSha256 = versionInfo.archiveSha256;
- // The client from `withAuthenticatedClient` will retry HTTP requests.
- // This wrapper is one layer up and will retry checksum validation errors.
- await retry(
- // Attempt to download archive and validate its checksum.
- () async {
+ await withAuthenticatedClient(cache, Uri.parse(description.url),
+ (client) async {
+ // In addition to HTTP errors, this will retry crc32c/sha256 errors as
+ // well because [PackageIntegrityException] subclasses
+ // [PubHttpException].
+ await retryForHttp('downloading "$archiveUrl"', () async {
final request = http.Request('GET', archiveUrl);
- final response = await withAuthenticatedClient(cache,
- Uri.parse(description.url), (client) => client.send(request));
- final expectedCrc32Checksum =
- _parseCrc32c(response.headers, fileName);
+ request.attachMetadataHeaders();
+ final response = await client.fetchAsStream(request);
Stream<List<int>> stream = response.stream;
- if (expectedCrc32Checksum != null) {
- stream = _validateStreamCrc32Checksum(
- response.stream, expectedCrc32Checksum, id, archiveUrl);
+ final expectedCrc32c = _parseCrc32c(response.headers, fileName);
+ if (expectedCrc32c != null) {
+ stream = _validateCrc32c(
+ response.stream, expectedCrc32c, id, archiveUrl);
}
stream = validateSha256(
stream, (expectedSha256 == null) ? null : Digest(expectedSha256));
+
// We download the archive to disk instead of streaming it directly
// into the tar unpacking. This simplifies stream handling.
// Package:tar cancels the stream when it reaches end-of-archive, and
// cancelling a http stream makes it not reusable.
// There are ways around this, and we might revisit this later.
await createFileFromStream(stream, archivePath);
- },
- // Retry if the checksum response header was malformed or the actual
- // checksum did not match the expected checksum.
- retryIf: (e) => e is PackageIntegrityException,
- onRetry: (e, retryCount) => log
- .io('Retry #${retryCount + 1} because of checksum error with GET '
- '$archiveUrl...'),
- maxAttempts: math.max(
- 1, // Having less than 1 attempt doesn't make sense.
- int.tryParse(io.Platform.environment['PUB_MAX_HTTP_RETRIES'] ?? '') ??
- 7,
- ),
- );
+ });
+ });
var tempDir = cache.createTempDir();
try {
@@ -1193,7 +1206,7 @@
String package,
String hostedUrl,
) {
- if (error is PubHttpException) {
+ if (error is PubHttpResponseException) {
if (error.response.statusCode == 404) {
throw PackageNotFoundException(
'could not find package $package at $hostedUrl',
@@ -1461,7 +1474,7 @@
/// the one present in the checksum response header.
///
/// Throws [PackageIntegrityException] if there is a checksum mismatch.
-Stream<List<int>> _validateStreamCrc32Checksum(Stream<List<int>> stream,
+Stream<List<int>> _validateCrc32c(Stream<List<int>> stream,
int expectedChecksum, PackageId id, Uri archiveUrl) async* {
final crc32c = Crc32c();
@@ -1519,11 +1532,10 @@
return ByteData.view(bytes.buffer).getUint32(0);
} on FormatException catch (e, s) {
+ log.exception(e, s);
throw PackageIntegrityException(
'Package archive "$fileName" has a malformed CRC32C checksum in '
- 'its response headers',
- innerError: e,
- innerTrace: s);
+ 'its response headers');
}
}
}
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index 0f1bf51..0719bbb 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -693,7 +693,7 @@
Duration maxDelay = const Duration(seconds: 30),
int maxAttempts = 8,
FutureOr<bool> Function(Exception)? retryIf,
- FutureOr<void> Function(Exception, int retryCount)? onRetry,
+ FutureOr<void> Function(Exception, int attemptNumber)? onRetry,
}) async {
var attempt = 0;
// ignore: literal_only_boolean_expressions
@@ -705,8 +705,9 @@
if (attempt >= maxAttempts || (retryIf != null && !(await retryIf(e)))) {
rethrow;
}
+
if (onRetry != null) {
- await onRetry(e, attempt);
+ await onRetry(e, attempt + 1);
}
}
diff --git a/test/add/hosted/non_default_pub_server_test.dart b/test/add/hosted/non_default_pub_server_test.dart
index 787bf32..f93f292 100644
--- a/test/add/hosted/non_default_pub_server_test.dart
+++ b/test/add/hosted/non_default_pub_server_test.dart
@@ -91,7 +91,8 @@
await pubAdd(
args: ['foo', '--hosted-url', 'https://invalid-url.foo'],
- error: contains('Could not resolve URL "https://invalid-url.foo".'),
+ error: contains('Got socket error trying to find package foo at '
+ 'https://invalid-url.foo.'),
exitCode: exit_codes.DATA,
environment: {
// Limit the retries - the url will never go valid.
diff --git a/test/content_hash_test.dart b/test/content_hash_test.dart
index 6a0725b..2485918 100644
--- a/test/content_hash_test.dart
+++ b/test/content_hash_test.dart
@@ -54,7 +54,8 @@
'e7a7a0f6d9873e4c40cf68cc3cc9ca5b6c8cef6a2220241bdada4b9cb0083279');
await appDir({'foo': 'any'}).create();
await pubGet(
- silent: contains('Retry #2'),
+ exitCode: exit_codes.TEMP_FAIL,
+ silent: contains('Attempt #2'),
error:
contains('Downloaded archive for foo-1.0.0 had wrong content-hash.'),
environment: {
diff --git a/test/get/hosted/get_test.dart b/test/get/hosted/get_test.dart
index 81d4845..837fb32 100644
--- a/test/get/hosted/get_test.dart
+++ b/test/get/hosted/get_test.dart
@@ -139,11 +139,12 @@
}).create();
await pubGet(
+ exitCode: exit_codes.TEMP_FAIL,
error: RegExp(
r'''Package archive for foo 1.2.3 downloaded from "(.+)" has '''
r'''"x-goog-hash: crc32c=(\d+)", which doesn't match the checksum '''
r'''of the archive downloaded\.'''),
- silent: contains('Retry #2 because of checksum error'),
+ silent: contains('Attempt #2'),
environment: {
'PUB_MAX_HTTP_RETRIES': '2',
},
@@ -175,11 +176,11 @@
}).create();
await pubGet(
- exitCode: exit_codes.DATA,
+ exitCode: exit_codes.TEMP_FAIL,
error: contains(
'Package archive "foo-1.2.3.tar.gz" has a malformed CRC32C '
'checksum in its response headers'),
- silent: contains('Retry #2 because of checksum error'),
+ silent: contains('Attempt #2'),
environment: {
'PUB_MAX_HTTP_RETRIES': '2',
},
@@ -195,11 +196,11 @@
}).create();
await pubGet(
- exitCode: exit_codes.DATA,
+ exitCode: exit_codes.TEMP_FAIL,
error: contains(
'Package archive "bar-1.2.3.tar.gz" has a malformed CRC32C '
'checksum in its response headers'),
- silent: contains('Retry #2 because of checksum error'),
+ silent: contains('Attempt #2'),
environment: {
'PUB_MAX_HTTP_RETRIES': '2',
},
@@ -215,11 +216,11 @@
}).create();
await pubGet(
- exitCode: exit_codes.DATA,
+ exitCode: exit_codes.TEMP_FAIL,
error: contains(
'Package archive "baz-1.2.3.tar.gz" has a malformed CRC32C '
'checksum in its response headers'),
- silent: contains('Retry #2 because of checksum error'),
+ silent: contains('Attempt #2'),
environment: {
'PUB_MAX_HTTP_RETRIES': '2',
},
diff --git a/test/hosted/fail_gracefully_on_url_resolve_test.dart b/test/hosted/fail_gracefully_on_url_resolve_test.dart
index 7a91e7a..0dff951 100644
--- a/test/hosted/fail_gracefully_on_url_resolve_test.dart
+++ b/test/hosted/fail_gracefully_on_url_resolve_test.dart
@@ -20,7 +20,8 @@
]).create();
await pubCommand(command,
- error: 'Could not resolve URL "https://invalid-url.foo".',
+ error: 'Got socket error trying to find package foo at '
+ 'https://invalid-url.foo.',
exitCode: exit_codes.UNAVAILABLE,
environment: {
'PUB_MAX_HTTP_RETRIES': '2',
diff --git a/tool/extract_all_pub_dev.dart b/tool/extract_all_pub_dev.dart
index 474c897..ca90cd0 100644
--- a/tool/extract_all_pub_dev.dart
+++ b/tool/extract_all_pub_dev.dart
@@ -20,13 +20,19 @@
Future<List<String>> allPackageNames() async {
var nextUrl = Uri.https('pub.dev', 'api/packages?compact=1');
- final result = json.decode(await httpClient.read(nextUrl));
+ final request = http.Request('GET', nextUrl);
+ request.attachMetadataHeaders();
+ final response = await globalHttpClient.fetch(request);
+ final result = json.decode(response.body);
return List<String>.from(result['packages']);
}
Future<List<String>> versionArchiveUrls(String packageName) async {
final url = Uri.https('pub.dev', 'api/packages/$packageName');
- final result = json.decode(await httpClient.read(url));
+ final request = http.Request('GET', url);
+ request.attachMetadataHeaders();
+ final response = await globalHttpClient.fetch(request);
+ final result = json.decode(response.body);
return List<String>.from(result['versions'].map((v) => v['archive_url']));
}
@@ -81,8 +87,10 @@
log.message('downloading $archiveUrl');
http.StreamedResponse response;
try {
- response = await httpClient
- .send(http.Request('GET', Uri.parse(archiveUrl)));
+ final archiveUri = Uri.parse(archiveUrl);
+ final request = http.Request('GET', archiveUri);
+ request.attachMetadataHeaders();
+ response = await globalHttpClient.fetchAsStream(request);
await extractTarGz(response.stream, tempDir);
log.message('Extracted $archiveUrl');
} catch (e) {