Handle package:tar cancellations

Write the archive to disk before un-taring it.

Co-authored-by: Sigurd Meldgaard <sigurdm@google.com>
diff --git a/lib/src/http.dart b/lib/src/http.dart
index 29b3525..adc98c6 100644
--- a/lib/src/http.dart
+++ b/lib/src/http.dart
@@ -10,6 +10,7 @@
 
 import 'package:http/http.dart' as http;
 import 'package:http_retry/http_retry.dart';
+import 'package:pedantic/pedantic.dart';
 import 'package:pool/pool.dart';
 import 'package:stack_trace/stack_trace.dart';
 
@@ -382,13 +383,10 @@
       rethrow;
     }
 
-    var stream = response.stream.transform(
-        StreamTransformer<List<int>, List<int>>.fromHandlers(
-            handleDone: (sink) {
-      resource.release();
-      sink.close();
-    }));
-    return http.StreamedResponse(stream, response.statusCode,
+    final responseController = StreamController<List<int>>(sync: true);
+    unawaited(response.stream.pipe(responseController));
+    unawaited(responseController.done.then((_) => resource.release()));
+    return http.StreamedResponse(responseController.stream, response.statusCode,
         contentLength: response.contentLength,
         request: response.request,
         headers: response.headers,
diff --git a/lib/src/io.dart b/lib/src/io.dart
index 15828a2..66d9cd9 100644
--- a/lib/src/io.dart
+++ b/lib/src/io.dart
@@ -170,6 +170,13 @@
   return contents;
 }
 
+/// Reads the contents of the binary file [file] as a [Stream].
+Stream<List<int>> readBinaryFileAsSream(String file) {
+  log.io('Reading binary file $file.');
+  var contents = File(file).openRead();
+  return contents;
+}
+
 /// Creates [file] and writes [contents] to it.
 ///
 /// If [dontLogContents] is `true`, the contents of the file will never be
@@ -210,7 +217,7 @@
 ///
 /// Replaces any file already at that path. Completes when the file is done
 /// being written.
-Future<String> _createFileFromStream(Stream<List<int>> stream, String file) {
+Future<String> createFileFromStream(Stream<List<int>> stream, String file) {
   // TODO(nweiz): remove extra logging when we figure out the windows bot issue.
   log.io('Creating $file from stream.');
 
@@ -849,7 +856,7 @@
         // Regular file
         deleteIfLink(filePath);
         ensureDir(parentDirectory);
-        await _createFileFromStream(entry.contents, filePath);
+        await createFileFromStream(entry.contents, filePath);
 
         if (Platform.isLinux || Platform.isMacOS) {
           // Apply executable bits from tar header, but don't change r/w bits
diff --git a/lib/src/source/hosted.dart b/lib/src/source/hosted.dart
index d563839..83416ca 100644
--- a/lib/src/source/hosted.dart
+++ b/lib/src/source/hosted.dart
@@ -567,18 +567,29 @@
     log.message('Downloading ${log.bold(id.name)} ${id.version}...');
 
     // Download and extract the archive to a temp directory.
-    var tempDir = systemCache.createTempDir();
-    var response = await httpClient.send(http.Request('GET', url));
-    await extractTarGz(response.stream, tempDir);
+    await withTempDir((tempDirForArchive) async {
+      var archivePath =
+          p.join(tempDirForArchive, '$packageName-$version.tar.gz');
+      var response = await httpClient.send(http.Request('GET', url));
 
-    // Remove the existing directory if it exists. This will happen if
-    // we're forcing a download to repair the cache.
-    if (dirExists(destPath)) deleteEntry(destPath);
+      // We download the archive to disk instead of streaming it directly into
+      // the tar unpacking. This simplifies stream handling.
+      // Package:tar cancels the stream when it reaches end-of-archive, and
+      // cancelling a http stream makes it not reusable.
+      // There are ways around this, and we might revisit this later.
+      await createFileFromStream(response.stream, archivePath);
+      var tempDir = systemCache.createTempDir();
+      await extractTarGz(readBinaryFileAsSream(archivePath), tempDir);
 
-    // Now that the get has succeeded, move it to the real location in the
-    // cache. This ensures that we don't leave half-busted ghost
-    // directories in the user's pub cache if a get fails.
-    renameDir(tempDir, destPath);
+      // Remove the existing directory if it exists. This will happen if
+      // we're forcing a download to repair the cache.
+      if (dirExists(destPath)) deleteEntry(destPath);
+
+      // Now that the get has succeeded, move it to the real location in the
+      // cache. This ensures that we don't leave half-busted ghost
+      // directories in the user's pub cache if a get fails.
+      renameDir(tempDir, destPath);
+    });
   }
 
   /// When an error occurs trying to read something about [package] from [url],
diff --git a/test/get/hosted/get_stress_test.dart b/test/get/hosted/get_stress_test.dart
new file mode 100644
index 0000000..da45309
--- /dev/null
+++ b/test/get/hosted/get_stress_test.dart
@@ -0,0 +1,36 @@
+// Copyright (c) 2012, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:test/test.dart';
+
+import '../../descriptor.dart' as d;
+import '../../test_pub.dart';
+
+void main() {
+  test('gets more than 16 packages from a pub server', () async {
+    await servePackages((builder) {
+      builder.serve('foo', '1.2.3');
+      for (var i = 0; i < 20; i++) {
+        builder.serve('pkg$i', '1.$i.0');
+      }
+    });
+
+    await d.appDir({
+      'foo': '1.2.3',
+      for (var i = 0; i < 20; i++) 'pkg$i': '^1.$i.0',
+    }).create();
+
+    await pubGet();
+
+    await d.cacheDir({
+      'foo': '1.2.3',
+      for (var i = 0; i < 20; i++) 'pkg$i': '1.$i.0',
+    }).validate();
+
+    await d.appPackagesFile({
+      'foo': '1.2.3',
+      for (var i = 0; i < 20; i++) 'pkg$i': '1.$i.0',
+    }).validate();
+  });
+}