Crc32c checksum validation and retry of archive downloads (#3546)

diff --git a/lib/src/crc32c.dart b/lib/src/crc32c.dart
new file mode 100644
index 0000000..e346a59
--- /dev/null
+++ b/lib/src/crc32c.dart
@@ -0,0 +1,103 @@
+// Copyright (c) 2022, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+/// Computes a crc32c checksum.
+class Crc32c {
+  int _current = mask;
+  static const mask = 0xFFFFFFFF;
+
+  // Algorithm based on https://en.wikipedia.org/wiki/Cyclic_redundancy_check
+  void update(List<int> data) {
+    for (var i = 0; i < data.length; i++) {
+      final lookupIndex = (_current ^ data[i]) & 0xff;
+      _current = (_current >> 8) ^ _crcTable[lookupIndex];
+    }
+  }
+
+  int finalize() {
+    // Finalize the CRC-32 value by inverting all the bits
+    return _current ^ mask & mask;
+  }
+
+  /// Consumes the entirety of "stream" and returns the CRC32C checksum of its
+  /// data once the stream is finished.
+  static Future<int> computeByConsumingStream(Stream<List<int>> stream) async {
+    final checksumComputer = Crc32c();
+
+    await for (final chunk in stream) {
+      checksumComputer.update(chunk);
+    }
+
+    return checksumComputer.finalize();
+  }
+}
+
+// Generated by ./pycrc.py --algorithm=table-driven --model=crc-32c --generate=c
+// See: https://pycrc.org/
+const _crcTable = [
+  0x00000000, 0xf26b8303, 0xe13b70f7, 0x1350f3f4, //
+  0xc79a971f, 0x35f1141c, 0x26a1e7e8, 0xd4ca64eb,
+  0x8ad958cf, 0x78b2dbcc, 0x6be22838, 0x9989ab3b,
+  0x4d43cfd0, 0xbf284cd3, 0xac78bf27, 0x5e133c24,
+  0x105ec76f, 0xe235446c, 0xf165b798, 0x030e349b,
+  0xd7c45070, 0x25afd373, 0x36ff2087, 0xc494a384,
+  0x9a879fa0, 0x68ec1ca3, 0x7bbcef57, 0x89d76c54,
+  0x5d1d08bf, 0xaf768bbc, 0xbc267848, 0x4e4dfb4b,
+  0x20bd8ede, 0xd2d60ddd, 0xc186fe29, 0x33ed7d2a,
+  0xe72719c1, 0x154c9ac2, 0x061c6936, 0xf477ea35,
+  0xaa64d611, 0x580f5512, 0x4b5fa6e6, 0xb93425e5,
+  0x6dfe410e, 0x9f95c20d, 0x8cc531f9, 0x7eaeb2fa,
+  0x30e349b1, 0xc288cab2, 0xd1d83946, 0x23b3ba45,
+  0xf779deae, 0x05125dad, 0x1642ae59, 0xe4292d5a,
+  0xba3a117e, 0x4851927d, 0x5b016189, 0xa96ae28a,
+  0x7da08661, 0x8fcb0562, 0x9c9bf696, 0x6ef07595,
+  0x417b1dbc, 0xb3109ebf, 0xa0406d4b, 0x522bee48,
+  0x86e18aa3, 0x748a09a0, 0x67dafa54, 0x95b17957,
+  0xcba24573, 0x39c9c670, 0x2a993584, 0xd8f2b687,
+  0x0c38d26c, 0xfe53516f, 0xed03a29b, 0x1f682198,
+  0x5125dad3, 0xa34e59d0, 0xb01eaa24, 0x42752927,
+  0x96bf4dcc, 0x64d4cecf, 0x77843d3b, 0x85efbe38,
+  0xdbfc821c, 0x2997011f, 0x3ac7f2eb, 0xc8ac71e8,
+  0x1c661503, 0xee0d9600, 0xfd5d65f4, 0x0f36e6f7,
+  0x61c69362, 0x93ad1061, 0x80fde395, 0x72966096,
+  0xa65c047d, 0x5437877e, 0x4767748a, 0xb50cf789,
+  0xeb1fcbad, 0x197448ae, 0x0a24bb5a, 0xf84f3859,
+  0x2c855cb2, 0xdeeedfb1, 0xcdbe2c45, 0x3fd5af46,
+  0x7198540d, 0x83f3d70e, 0x90a324fa, 0x62c8a7f9,
+  0xb602c312, 0x44694011, 0x5739b3e5, 0xa55230e6,
+  0xfb410cc2, 0x092a8fc1, 0x1a7a7c35, 0xe811ff36,
+  0x3cdb9bdd, 0xceb018de, 0xdde0eb2a, 0x2f8b6829,
+  0x82f63b78, 0x709db87b, 0x63cd4b8f, 0x91a6c88c,
+  0x456cac67, 0xb7072f64, 0xa457dc90, 0x563c5f93,
+  0x082f63b7, 0xfa44e0b4, 0xe9141340, 0x1b7f9043,
+  0xcfb5f4a8, 0x3dde77ab, 0x2e8e845f, 0xdce5075c,
+  0x92a8fc17, 0x60c37f14, 0x73938ce0, 0x81f80fe3,
+  0x55326b08, 0xa759e80b, 0xb4091bff, 0x466298fc,
+  0x1871a4d8, 0xea1a27db, 0xf94ad42f, 0x0b21572c,
+  0xdfeb33c7, 0x2d80b0c4, 0x3ed04330, 0xccbbc033,
+  0xa24bb5a6, 0x502036a5, 0x4370c551, 0xb11b4652,
+  0x65d122b9, 0x97baa1ba, 0x84ea524e, 0x7681d14d,
+  0x2892ed69, 0xdaf96e6a, 0xc9a99d9e, 0x3bc21e9d,
+  0xef087a76, 0x1d63f975, 0x0e330a81, 0xfc588982,
+  0xb21572c9, 0x407ef1ca, 0x532e023e, 0xa145813d,
+  0x758fe5d6, 0x87e466d5, 0x94b49521, 0x66df1622,
+  0x38cc2a06, 0xcaa7a905, 0xd9f75af1, 0x2b9cd9f2,
+  0xff56bd19, 0x0d3d3e1a, 0x1e6dcdee, 0xec064eed,
+  0xc38d26c4, 0x31e6a5c7, 0x22b65633, 0xd0ddd530,
+  0x0417b1db, 0xf67c32d8, 0xe52cc12c, 0x1747422f,
+  0x49547e0b, 0xbb3ffd08, 0xa86f0efc, 0x5a048dff,
+  0x8ecee914, 0x7ca56a17, 0x6ff599e3, 0x9d9e1ae0,
+  0xd3d3e1ab, 0x21b862a8, 0x32e8915c, 0xc083125f,
+  0x144976b4, 0xe622f5b7, 0xf5720643, 0x07198540,
+  0x590ab964, 0xab613a67, 0xb831c993, 0x4a5a4a90,
+  0x9e902e7b, 0x6cfbad78, 0x7fab5e8c, 0x8dc0dd8f,
+  0xe330a81a, 0x115b2b19, 0x020bd8ed, 0xf0605bee,
+  0x24aa3f05, 0xd6c1bc06, 0xc5914ff2, 0x37faccf1,
+  0x69e9f0d5, 0x9b8273d6, 0x88d28022, 0x7ab90321,
+  0xae7367ca, 0x5c18e4c9, 0x4f48173d, 0xbd23943e,
+  0xf36e6f75, 0x0105ec76, 0x12551f82, 0xe03e9c81,
+  0x34f4f86a, 0xc69f7b69, 0xd5cf889d, 0x27a40b9e,
+  0x79b737ba, 0x8bdcb4b9, 0x988c474d, 0x6ae7c44e,
+  0xbe2da0a5, 0x4c4623a6, 0x5f16d052, 0xad7d5351
+];
diff --git a/lib/src/exceptions.dart b/lib/src/exceptions.dart
index fcde747..ec9f876 100644
--- a/lib/src/exceptions.dart
+++ b/lib/src/exceptions.dart
@@ -104,6 +104,15 @@
   String toString() => 'Package not available ($message).';
 }
 
+/// A class for exceptions where a package's checksum could not be validated.
+class PackageIntegrityException extends WrappedException {
+  PackageIntegrityException(
+    String message, {
+    Object? innerError,
+    StackTrace? innerTrace,
+  }) : super(message, innerError, innerTrace);
+}
+
 /// Returns whether [error] is a user-facing error object.
 ///
 /// This includes both [ApplicationException] and any dart:io errors.
diff --git a/lib/src/io.dart b/lib/src/io.dart
index 50f7eb9..2b5d0cd 100644
--- a/lib/src/io.dart
+++ b/lib/src/io.dart
@@ -172,7 +172,7 @@
 }
 
 /// Reads the contents of the binary file [file] as a [Stream].
-Stream<List<int>> readBinaryFileAsSream(String file) {
+Stream<List<int>> readBinaryFileAsStream(String file) {
   log.io('Reading binary file $file.');
   var contents = File(file).openRead();
   return contents;
diff --git a/lib/src/source/hosted.dart b/lib/src/source/hosted.dart
index 160eed1..451dce1 100644
--- a/lib/src/source/hosted.dart
+++ b/lib/src/source/hosted.dart
@@ -5,15 +5,19 @@
 import 'dart:async';
 import 'dart:convert';
 import 'dart:io' as io;
+import 'dart:math' as math;
+import 'dart:typed_data';
 
 import 'package:collection/collection.dart'
     show maxBy, IterableNullableExtension;
 import 'package:http/http.dart' as http;
+import 'package:meta/meta.dart';
 import 'package:path/path.dart' as p;
 import 'package:pub_semver/pub_semver.dart';
 import 'package:stack_trace/stack_trace.dart';
 
 import '../authentication/client.dart';
+import '../crc32c.dart';
 import '../exceptions.dart';
 import '../http.dart';
 import '../io.dart';
@@ -872,27 +876,53 @@
           'Package $packageName has no version $version');
     }
 
-    var url = versionInfo.archiveUrl;
-    log.io('Get package from $url.');
+    final archiveUrl = versionInfo.archiveUrl;
+    log.io('Get package from $archiveUrl.');
     log.message('Downloading ${log.bold(id.name)} ${id.version}...');
 
     // Download and extract the archive to a temp directory.
     await withTempDir((tempDirForArchive) async {
-      var archivePath =
-          p.join(tempDirForArchive, '$packageName-$version.tar.gz');
-      var response = await withAuthenticatedClient(
-          cache,
-          Uri.parse(description.url),
-          (client) => client.send(http.Request('GET', url)));
+      var fileName = '$packageName-$version.tar.gz';
+      var archivePath = p.join(tempDirForArchive, fileName);
 
-      // We download the archive to disk instead of streaming it directly into
-      // the tar unpacking. This simplifies stream handling.
-      // Package:tar cancels the stream when it reaches end-of-archive, and
-      // cancelling a http stream makes it not reusable.
-      // There are ways around this, and we might revisit this later.
-      await createFileFromStream(response.stream, archivePath);
+      // The client from `withAuthenticatedClient` will retry HTTP requests.
+      // This wrapper is one layer up and will retry checksum validation errors.
+      await retry(
+        // Attempt to download archive and validate its checksum.
+        () async {
+          final request = http.Request('GET', archiveUrl);
+          final response = await withAuthenticatedClient(cache,
+              Uri.parse(description.url), (client) => client.send(request));
+          final expectedChecksum = _parseCrc32c(response.headers, fileName);
+
+          Stream<List<int>> stream = response.stream;
+          if (expectedChecksum != null) {
+            stream = _validateStream(
+                response.stream, expectedChecksum, id, archiveUrl);
+          }
+
+          // We download the archive to disk instead of streaming it directly
+          // into the tar unpacking. This simplifies stream handling.
+          // Package:tar cancels the stream when it reaches end-of-archive, and
+          // cancelling a http stream makes it not reusable.
+          // There are ways around this, and we might revisit this later.
+          await createFileFromStream(stream, archivePath);
+        },
+        // Retry if the checksum response header was malformed or the actual
+        // checksum did not match the expected checksum.
+        retryIf: (e) => e is PackageIntegrityException,
+        onRetry: (e, retryCount) => log
+            .io('Retry #${retryCount + 1} because of checksum error with GET '
+                '$archiveUrl...'),
+        maxAttempts: math.max(
+          1, // Having less than 1 attempt doesn't make sense.
+          int.tryParse(io.Platform.environment['PUB_MAX_HTTP_RETRIES'] ?? '') ??
+              7,
+        ),
+      );
+
       var tempDir = cache.createTempDir();
-      await extractTarGz(readBinaryFileAsSream(archivePath), tempDir);
+      await extractTarGz(readBinaryFileAsStream(archivePath), tempDir);
 
       // Now that the get has succeeded, move it to the real location in the
       // cache.
@@ -1121,3 +1151,84 @@
   @override
   bool operator ==(Object other) => other is _RefAndCache && other.ref == ref;
 }
+
+@visibleForTesting
+const checksumHeaderName = 'x-goog-hash';
+
+/// Adds a checksum validation "tap" to the response stream and returns a
+/// wrapped `Stream` object, which should be used to consume the incoming data.
+///
+/// As chunks are received, a CRC32C checksum is updated.
+/// Once the download is completed, the final checksum is compared with
+/// the one present in the checksum response header.
+///
+/// Throws [PackageIntegrityException] if there is a checksum mismatch.
+Stream<List<int>> _validateStream(Stream<List<int>> stream,
+    int expectedChecksum, PackageId id, Uri archiveUrl) async* {
+  final crc32c = Crc32c();
+
+  await for (final chunk in stream) {
+    crc32c.update(chunk);
+    yield chunk;
+  }
+
+  final actualChecksum = crc32c.finalize();
+
+  log.fine(
+      'Computed checksum $actualChecksum for ${id.name} ${id.version} with '
+      'expected CRC32C of $expectedChecksum.');
+
+  if (actualChecksum != expectedChecksum) {
+    throw PackageIntegrityException(
+        'Package archive for ${id.name} ${id.version} downloaded from '
+        '"$archiveUrl" has "x-goog-hash: crc32c=$expectedChecksum", which '
+        'doesn\'t match the checksum of the archive downloaded.');
+  }
+}
+
+/// Parses response [headers] and returns the archive's CRC32C checksum.
+///
+/// In most cases, GCS provides both MD5 and CRC32C checksums in its response
+/// headers. It uses the header name "x-goog-hash" for these values. It has
+/// been documented and observed that GCS will send multiple response headers
+/// with the same "x-goog-hash" token as the key.
+/// https://cloud.google.com/storage/docs/xml-api/reference-headers#xgooghash
+///
+/// Additionally, when the Dart http client encounters multiple response
+/// headers with the same key, it concatenates their values with a comma
+/// before inserting a single item with that key and concatenated value into
+/// its response "headers" Map.
+/// See https://github.com/dart-lang/http/issues/24
+/// https://github.com/dart-lang/http/blob/06649afbb5847dbb0293816ba8348766b116e419/pkgs/http/lib/src/base_response.dart#L29
+///
+/// Throws [PackageIntegrityException] if the CRC32C checksum cannot be parsed.
+int? _parseCrc32c(Map<String, String> headers, String fileName) {
+  final checksumHeader = headers[checksumHeaderName];
+  if (checksumHeader == null) return null;
+
+  final parts = checksumHeader.split(',');
+  for (final part in parts) {
+    if (part.startsWith('crc32c=')) {
+      final undecoded = part.substring('crc32c='.length);
+
+      try {
+        final bytes = base64Decode(undecoded);
+
+        // CRC32C must be 32 bits, or 4 bytes.
+        if (bytes.length != 4) {
+          throw FormatException('CRC32C checksum has invalid length', bytes);
+        }
+
+        return ByteData.view(bytes.buffer).getUint32(0);
+      } on FormatException catch (e, s) {
+        throw PackageIntegrityException(
+            'Package archive "$fileName" has a malformed CRC32C checksum in '
+            'its response headers',
+            innerError: e,
+            innerTrace: s);
+      }
+    }
+  }
+
+  return null;
+}
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index e4451db..dbc7622 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -638,3 +638,63 @@
       key(entry.key, entry.value): value(entry.key, entry.value),
   };
 }
+
+/// Call [fn] retrying so long as [retryIf] return `true` for the exception
+/// thrown, up-to [maxAttempts] times.
+///
+/// Defaults to 8 attempts, sleeping as following after 1st, 2nd, 3rd, ...,
+/// 7th attempt:
+///  1. 400 ms +/- 25%
+///  2. 800 ms +/- 25%
+///  3. 1600 ms +/- 25%
+///  4. 3200 ms +/- 25%
+///  5. 6400 ms +/- 25%
+///  6. 12800 ms +/- 25%
+///  7. 25600 ms +/- 25%
+///
+/// ```dart
+/// final response = await retry(
+///   // Make a GET request
+///   () => http.get('https://google.com').timeout(Duration(seconds: 5)),
+///   // Retry on SocketException or TimeoutException
+///   retryIf: (e) => e is SocketException || e is TimeoutException,
+/// );
+/// print(response.body);
+/// ```
+///
+/// If no [retryIf] function is given this will retry any for any [Exception]
+/// thrown. To retry on an [Error], the error must be caught and _rethrown_
+/// as an [Exception].
+///
+/// See https://github.com/google/dart-neats/blob/master/retry/lib/retry.dart
+Future<T> retry<T>(
+  FutureOr<T> Function() fn, {
+  Duration delayFactor = const Duration(milliseconds: 200),
+  double randomizationFactor = 0.25,
+  Duration maxDelay = const Duration(seconds: 30),
+  int maxAttempts = 8,
+  FutureOr<bool> Function(Exception)? retryIf,
+  FutureOr<void> Function(Exception, int retryCount)? onRetry,
+}) async {
+  var attempt = 0;
+  // ignore: literal_only_boolean_expressions
+  while (true) {
+    attempt++; // first invocation is the first attempt
+    try {
+      return await fn();
+    } on Exception catch (e) {
+      if (attempt >= maxAttempts || (retryIf != null && !(await retryIf(e)))) {
+        rethrow;
+      }
+      if (onRetry != null) {
+        await onRetry(e, attempt);
+      }
+    }
+
+    // Sleep for a delay
+    final rf = randomizationFactor * (random.nextDouble() * 2 - 1) + 1;
+    final exp = math.min(attempt, 31); // prevent overflows.
+    final delay = delayFactor * math.pow(2.0, exp) * rf;
+    await Future.delayed(delay < maxDelay ? delay : maxDelay);
+  }
+}
diff --git a/test/embedding/embedding_test.dart b/test/embedding/embedding_test.dart
index 73370d7..9f74405 100644
--- a/test/embedding/embedding_test.dart
+++ b/test/embedding/embedding_test.dart
@@ -357,6 +357,18 @@
         RegExp(r'Writing \d+ characters', multiLine: true),
         r'Writing $N characters',
       )
+      .replaceAll(
+        RegExp(r'x-goog-hash(.*)$', multiLine: true),
+        r'x-goog-hash: $CHECKSUM_HEADER',
+      )
+      .replaceAll(
+        RegExp(
+            r'Computed checksum \d+ for foo 1.0.0 with expected CRC32C of '
+            r'\d+\.',
+            multiLine: true),
+        r'Computed checksum $CRC32C for foo 1.0.0 with expected CRC32C of '
+        r'$CRC32C.',
+      )
 
       /// TODO(sigurdm): This hack suppresses differences in stack-traces
       /// between dart 2.17 and 2.18. Remove when 2.18 is stable.
diff --git a/test/get/hosted/get_test.dart b/test/get/hosted/get_test.dart
index f0270a3..81d4845 100644
--- a/test/get/hosted/get_test.dart
+++ b/test/get/hosted/get_test.dart
@@ -12,10 +12,13 @@
 import '../../test_pub.dart';
 
 void main() {
-  test('gets a package from a pub server', () async {
+  test('gets a package from a pub server and validates its CRC32C checksum',
+      () async {
     final server = await servePackages();
     server.serve('foo', '1.2.3');
 
+    expect(await server.peekArchiveChecksumHeader('foo', '1.2.3'), isNotNull);
+
     await d.appDir({'foo': '1.2.3'}).create();
 
     await pubGet();
@@ -26,6 +29,62 @@
     ]).validate();
   });
 
+  group('gets a package from a pub server without validating its checksum', () {
+    late PackageServer server;
+
+    setUp(() async {
+      server = await servePackages()
+        ..serveChecksums = false
+        ..serve('foo', '1.2.3')
+        ..serve('bar', '1.2.3', headers: {
+          'x-goog-hash': ['']
+        })
+        ..serve('baz', '1.2.3', headers: {
+          'x-goog-hash': ['md5=loremipsum']
+        });
+    });
+
+    test('because of omitted checksum header', () async {
+      expect(await server.peekArchiveChecksumHeader('foo', '1.2.3'), isNull);
+
+      await d.appDir({'foo': '1.2.3'}).create();
+
+      await pubGet();
+
+      await d.cacheDir({'foo': '1.2.3'}).validate();
+      await d.appPackageConfigFile([
+        d.packageConfigEntry(name: 'foo', version: '1.2.3'),
+      ]).validate();
+    });
+
+    test('because of empty checksum header', () async {
+      expect(await server.peekArchiveChecksumHeader('bar', '1.2.3'), '');
+
+      await d.appDir({'bar': '1.2.3'}).create();
+
+      await pubGet();
+
+      await d.cacheDir({'bar': '1.2.3'}).validate();
+      await d.appPackageConfigFile([
+        d.packageConfigEntry(name: 'bar', version: '1.2.3'),
+      ]).validate();
+    });
+
+    test('because of missing CRC32C in checksum header', () async {
+      expect(await server.peekArchiveChecksumHeader('baz', '1.2.3'),
+          'md5=loremipsum');
+
+      await d.appDir({'baz': '1.2.3'}).create();
+
+      await pubGet();
+
+      await d.cacheDir({'baz': '1.2.3'}).validate();
+      await d.appPackageConfigFile([
+        d.packageConfigEntry(name: 'baz', version: '1.2.3'),
+      ]).validate();
+    });
+  });
+
   test('URL encodes the package name', () async {
     await servePackages();
 
@@ -64,6 +123,148 @@
     ]).validate();
   });
 
+  test('recognizes and retries a package with a CRC32C checksum mismatch',
+      () async {
+    var server = await startPackageServer();
+
+    server.serve('foo', '1.2.3', headers: {
+      'x-goog-hash': PackageServer.composeChecksumHeader(crc32c: 3381945770)
+    });
+
+    await d.appDir({
+      'foo': {
+        'version': '1.2.3',
+        'hosted': {'name': 'foo', 'url': 'http://localhost:${server.port}'}
+      }
+    }).create();
+
+    await pubGet(
+      error: RegExp(
+          r'''Package archive for foo 1.2.3 downloaded from "(.+)" has '''
+          r'''"x-goog-hash: crc32c=(\d+)", which doesn't match the checksum '''
+          r'''of the archive downloaded\.'''),
+      silent: contains('Retry #2 because of checksum error'),
+      environment: {
+        'PUB_MAX_HTTP_RETRIES': '2',
+      },
+    );
+  });
+
+  group('recognizes bad checksum header and retries', () {
+    late PackageServer server;
+
+    setUp(() async {
+      server = await servePackages()
+        ..serve('foo', '1.2.3', headers: {
+          'x-goog-hash': ['crc32c=,md5=']
+        })
+        ..serve('bar', '1.2.3', headers: {
+          'x-goog-hash': ['crc32c=loremipsum,md5=loremipsum']
+        })
+        ..serve('baz', '1.2.3', headers: {
+          'x-goog-hash': ['crc32c=MTIzNDU=,md5=NTQzMjE=']
+        });
+    });
+
+    test('when the CRC32C checksum is empty', () async {
+      await d.appDir({
+        'foo': {
+          'version': '1.2.3',
+          'hosted': {'name': 'foo', 'url': 'http://localhost:${server.port}'}
+        }
+      }).create();
+
+      await pubGet(
+        exitCode: exit_codes.DATA,
+        error: contains(
+            'Package archive "foo-1.2.3.tar.gz" has a malformed CRC32C '
+            'checksum in its response headers'),
+        silent: contains('Retry #2 because of checksum error'),
+        environment: {
+          'PUB_MAX_HTTP_RETRIES': '2',
+        },
+      );
+    });
+
+    test('when the CRC32C checksum has bad encoding', () async {
+      await d.appDir({
+        'bar': {
+          'version': '1.2.3',
+          'hosted': {'name': 'bar', 'url': 'http://localhost:${server.port}'}
+        }
+      }).create();
+
+      await pubGet(
+        exitCode: exit_codes.DATA,
+        error: contains(
+            'Package archive "bar-1.2.3.tar.gz" has a malformed CRC32C '
+            'checksum in its response headers'),
+        silent: contains('Retry #2 because of checksum error'),
+        environment: {
+          'PUB_MAX_HTTP_RETRIES': '2',
+        },
+      );
+    });
+
+    test('when the CRC32C checksum is malformed', () async {
+      await d.appDir({
+        'baz': {
+          'version': '1.2.3',
+          'hosted': {'name': 'baz', 'url': 'http://localhost:${server.port}'}
+        }
+      }).create();
+
+      await pubGet(
+        exitCode: exit_codes.DATA,
+        error: contains(
+            'Package archive "baz-1.2.3.tar.gz" has a malformed CRC32C '
+            'checksum in its response headers'),
+        silent: contains('Retry #2 because of checksum error'),
+        environment: {
+          'PUB_MAX_HTTP_RETRIES': '2',
+        },
+      );
+    });
+  });
+
+  test('gets a package from a pub server that uses gzip response compression',
+      () async {
+    final server = await servePackages();
+    server.autoCompress = true;
+    server.serveChecksums = false;
+    server.serve('foo', '1.2.3');
+
+    expect(await server.peekArchiveChecksumHeader('foo', '1.2.3'), isNull);
+
+    await d.appDir({'foo': '1.2.3'}).create();
+
+    await pubGet();
+
+    await d.cacheDir({'foo': '1.2.3'}).validate();
+    await d.appPackageConfigFile([
+      d.packageConfigEntry(name: 'foo', version: '1.2.3'),
+    ]).validate();
+  });
+
+  test(
+      'gets a package from a pub server that uses gzip response compression '
+      'and validates its CRC32C checksum', () async {
+    final server = await servePackages();
+    server.autoCompress = true;
+    server.serve('foo', '1.2.3');
+
+    expect(await server.peekArchiveChecksumHeader('foo', '1.2.3'), isNotNull);
+
+    await d.appDir({'foo': '1.2.3'}).create();
+
+    await pubGet();
+
+    await d.cacheDir({'foo': '1.2.3'}).validate();
+    await d.appPackageConfigFile([
+      d.packageConfigEntry(name: 'foo', version: '1.2.3'),
+    ]).validate();
+  });
+
   group('categorizes dependency types in the lockfile', () {
     setUp(() async {
       await servePackages()
diff --git a/test/package_server.dart b/test/package_server.dart
index 6aaea68..b7587e7 100644
--- a/test/package_server.dart
+++ b/test/package_server.dart
@@ -5,8 +5,11 @@
 import 'dart:async';
 import 'dart:convert';
 import 'dart:io';
+import 'dart:typed_data';
 
 import 'package:path/path.dart' as p;
+import 'package:pub/src/crc32c.dart';
+import 'package:pub/src/source/hosted.dart';
 import 'package:pub/src/third_party/tar/tar.dart';
 import 'package:pub_semver/pub_semver.dart';
 import 'package:shelf/shelf.dart' as shelf;
@@ -18,8 +21,8 @@
 import 'test_pub.dart';
 
 class PackageServer {
-  /// The inner [DescriptorServer] that this uses to serve its descriptors.
-  final shelf.Server _inner;
+  /// The inner [IOServer] that this uses to serve its descriptors.
+  final shelf_io.IOServer _inner;
 
   /// Handlers of requests. Last matching handler will be used.
   final List<_PatternAndHandler> _handlers = [];
@@ -27,6 +30,16 @@
   // A list of all the requests recieved up till now.
   final List<String> requestedPaths = <String>[];
 
+  /// Whether the [IOServer] should compress the content, if possible.
+  /// The default value is `false` (compression disabled).
+  /// See [HttpServer.autoCompress] for details.
+  bool get autoCompress => _inner.server.autoCompress;
+  set autoCompress(bool shouldAutoCompress) =>
+      _inner.server.autoCompress = shouldAutoCompress;
+
+  // Setting this to false will disable automatic calculation of checksums.
+  bool serveChecksums = true;
+
   PackageServer._(this._inner) {
     _inner.mount((request) {
       final path = request.url.path;
@@ -63,26 +76,30 @@
         if (package == null) {
           return shelf.Response.notFound('No package named $name');
         }
-        return shelf.Response.ok(jsonEncode({
-          'name': name,
-          'uploaders': ['nweiz@google.com'],
-          'versions': package.versions.values
-              .map((version) => packageVersionApiMap(
-                    server._inner.url.toString(),
-                    version.pubspec,
-                    retracted: version.isRetracted,
-                  ))
-              .toList(),
-          if (package.isDiscontinued) 'isDiscontinued': true,
-          if (package.discontinuedReplacementText != null)
-            'replacedBy': package.discontinuedReplacementText,
-        }));
+        return shelf.Response.ok(
+            jsonEncode({
+              'name': name,
+              'uploaders': ['nweiz@google.com'],
+              'versions': package.versions.values
+                  .map((version) => packageVersionApiMap(
+                        server._inner.url.toString(),
+                        version.pubspec,
+                        retracted: version.isRetracted,
+                      ))
+                  .toList(),
+              if (package.isDiscontinued) 'isDiscontinued': true,
+              if (package.discontinuedReplacementText != null)
+                'replacedBy': package.discontinuedReplacementText,
+            }),
+            headers: {
+              HttpHeaders.contentTypeHeader: 'application/vnd.pub.v2+json'
+            });
       },
     );
 
     server.handle(
       _downloadPattern,
-      (shelf.Request request) {
+      (shelf.Request request) async {
         final parts = request.url.pathSegments;
         assert(parts[0] == 'packages');
         final name = parts[1];
@@ -98,7 +115,21 @@
 
         for (final packageVersion in package.versions.values) {
           if (packageVersion.version == version) {
-            return shelf.Response.ok(packageVersion.contents());
+            final headers = packageVersion.headers ?? {};
+            headers[HttpHeaders.contentTypeHeader] ??= [
+              'application/octet-stream'
+            ];
+
+            // This gate enables tests to validate the CRC32C parser by
+            // passing in arbitrary values for the checksum header.
+            if (server.serveChecksums &&
+                !headers.containsKey(checksumHeaderName)) {
+              headers[checksumHeaderName] = composeChecksumHeader(
+                  crc32c: await packageVersion.computeArchiveCrc32c());
+            }
+
+            return shelf.Response.ok(packageVersion.contents(),
+                headers: headers);
           }
         }
         return shelf.Response.notFound('No version $version of $name');
@@ -178,7 +209,8 @@
   void serve(String name, String version,
       {Map<String, dynamic>? deps,
       Map<String, dynamic>? pubspec,
-      List<d.Descriptor>? contents}) {
+      List<d.Descriptor>? contents,
+      Map<String, List<String>>? headers}) {
     var pubspecFields = <String, dynamic>{'name': name, 'version': version};
     if (pubspec != null) pubspecFields.addAll(pubspec);
     if (deps != null) pubspecFields['dependencies'] = deps;
@@ -189,6 +221,7 @@
     var package = _packages.putIfAbsent(name, _ServedPackage.new);
     package.versions[version] = _ServedPackageVersion(
       pubspecFields,
+      headers: headers,
       contents: () {
         final entries = <TarEntry>[];
 
@@ -243,6 +276,37 @@
   void retractPackageVersion(String name, String version) {
     _packages[name]!.versions[version]!.isRetracted = true;
   }
+
+  Future<String?> peekArchiveChecksumHeader(String name, String version) async {
+    final v = _packages[name]!.versions[version]!;
+
+    // If the test configured an overriding header value.
+    var checksumHeader = v.headers?[checksumHeaderName];
+
+    // Otherwise, compute from package contents.
+    if (serveChecksums) {
+      checksumHeader ??=
+          composeChecksumHeader(crc32c: await v.computeArchiveCrc32c());
+    }
+
+    return checksumHeader?.join(',');
+  }
+
+  static List<String> composeChecksumHeader(
+      {int? crc32c, String? md5 = '5f4dcc3b5aa765d61d8327deb882cf99'}) {
+    List<String> header = [];
+
+    if (crc32c != null) {
+      final bytes = Uint8List(4)..buffer.asByteData().setUint32(0, crc32c);
+      header.add('crc32c=${base64.encode(bytes)}');
+    }
+
+    if (md5 != null) {
+      header.add('md5=${base64.encode(utf8.encode(md5))}');
+    }
+
+    return header;
+  }
 }
 
 class _ServedPackage {
@@ -255,11 +319,16 @@
 class _ServedPackageVersion {
   final Map pubspec;
   final Stream<List<int>> Function() contents;
+  final Map<String, List<String>>? headers;
   bool isRetracted = false;
 
   Version get version => Version.parse(pubspec['version']);
 
-  _ServedPackageVersion(this.pubspec, {required this.contents});
+  _ServedPackageVersion(this.pubspec, {required this.contents, this.headers});
+
+  Future<int> computeArchiveCrc32c() async {
+    return await Crc32c.computeByConsumingStream(contents());
+  }
 }
 
 class _PatternAndHandler {
diff --git a/test/test_pub.dart b/test/test_pub.dart
index 7fb445d..be89bda 100644
--- a/test/test_pub.dart
+++ b/test/test_pub.dart
@@ -182,6 +182,7 @@
   Iterable<String>? args,
   Object? output,
   Object? error,
+  Object? silent,
   Object? warning,
   int? exitCode,
   Map<String, String?>? environment,
@@ -193,6 +194,7 @@
       args: args,
       output: output,
       error: error,
+      silent: silent,
       warning: warning,
       exitCode: exitCode,
       environment: environment,
diff --git a/test/testdata/goldens/embedding/embedding_test/logfile is written with --verbose and on unexpected exceptions.txt b/test/testdata/goldens/embedding/embedding_test/logfile is written with --verbose and on unexpected exceptions.txt
index 14ea76f..dc09988 100644
--- a/test/testdata/goldens/embedding/embedding_test/logfile is written with --verbose and on unexpected exceptions.txt
+++ b/test/testdata/goldens/embedding/embedding_test/logfile is written with --verbose and on unexpected exceptions.txt
@@ -27,7 +27,7 @@
 [E]    | date: $TIME
 [E]    | content-length: 197
 [E]    | x-frame-options: SAMEORIGIN
-[E]    | content-type: text/plain; charset=utf-8
+[E]    | content-type: application/vnd.pub.v2+json
 [E]    | x-xss-protection: 1; mode=block
 [E]    | x-content-type-options: nosniff
 [E] IO  : Writing $N characters to text file $SANDBOX/cache/hosted/localhost%58$PORT/.cache/foo-versions.json.
@@ -49,13 +49,15 @@
 [E] IO  : HTTP response 200 OK for GET http://localhost:$PORT/packages/foo/versions/1.0.0.tar.gz
 [E]    | took: $TIME
 [E]    | x-powered-by: Dart with package:shelf
-[E]    | transfer-encoding: chunked
 [E]    | date: $TIME
+[E]    | transfer-encoding: chunked
+[E]    | x-goog-hash: $CHECKSUM_HEADER
 [E]    | x-frame-options: SAMEORIGIN
-[E]    | content-type: text/plain; charset=utf-8
+[E]    | content-type: application/octet-stream
 [E]    | x-xss-protection: 1; mode=block
 [E]    | x-content-type-options: nosniff
 [E] IO  : Creating $FILE from stream
+[E] FINE: Computed checksum $CRC32C for foo 1.0.0 with expected CRC32C of $CRC32C.
 [E] FINE: Created $FILE from stream
 [E] IO  : Created temp directory $DIR
 [E] IO  : Reading binary file $FILE.
@@ -163,7 +165,7 @@
    | date: $TIME
    | content-length: 197
    | x-frame-options: SAMEORIGIN
-   | content-type: text/plain; charset=utf-8
+   | content-type: application/vnd.pub.v2+json
    | x-xss-protection: 1; mode=block
    | x-content-type-options: nosniff
 IO  : Writing $N characters to text file $SANDBOX/cache/hosted/localhost%58$PORT/.cache/foo-versions.json.
@@ -187,13 +189,15 @@
 IO  : HTTP response 200 OK for GET http://localhost:$PORT/packages/foo/versions/1.0.0.tar.gz
    | took: $TIME
    | x-powered-by: Dart with package:shelf
-   | transfer-encoding: chunked
    | date: $TIME
+   | transfer-encoding: chunked
+   | x-goog-hash: $CHECKSUM_HEADER
    | x-frame-options: SAMEORIGIN
-   | content-type: text/plain; charset=utf-8
+   | content-type: application/octet-stream
    | x-xss-protection: 1; mode=block
    | x-content-type-options: nosniff
 IO  : Creating $FILE from stream
+FINE: Computed checksum $CRC32C for foo 1.0.0 with expected CRC32C of $CRC32C.
 FINE: Created $FILE from stream
 IO  : Created temp directory $DIR
 IO  : Reading binary file $FILE.