Handle package:tar cancellations
Write the archive to disk before un-taring it.
Co-authored-by: Sigurd Meldgaard <sigurdm@google.com>
diff --git a/lib/src/http.dart b/lib/src/http.dart
index 29b3525..adc98c6 100644
--- a/lib/src/http.dart
+++ b/lib/src/http.dart
@@ -10,6 +10,7 @@
import 'package:http/http.dart' as http;
import 'package:http_retry/http_retry.dart';
+import 'package:pedantic/pedantic.dart';
import 'package:pool/pool.dart';
import 'package:stack_trace/stack_trace.dart';
@@ -382,13 +383,10 @@
rethrow;
}
- var stream = response.stream.transform(
- StreamTransformer<List<int>, List<int>>.fromHandlers(
- handleDone: (sink) {
- resource.release();
- sink.close();
- }));
- return http.StreamedResponse(stream, response.statusCode,
+ final responseController = StreamController<List<int>>(sync: true);
+ unawaited(response.stream.pipe(responseController));
+ unawaited(responseController.done.then((_) => resource.release()));
+ return http.StreamedResponse(responseController.stream, response.statusCode,
contentLength: response.contentLength,
request: response.request,
headers: response.headers,
diff --git a/lib/src/io.dart b/lib/src/io.dart
index 15828a2..66d9cd9 100644
--- a/lib/src/io.dart
+++ b/lib/src/io.dart
@@ -170,6 +170,13 @@
return contents;
}
+/// Reads the contents of the binary file [file] as a [Stream].
+Stream<List<int>> readBinaryFileAsSream(String file) {
+ log.io('Reading binary file $file.');
+ var contents = File(file).openRead();
+ return contents;
+}
+
/// Creates [file] and writes [contents] to it.
///
/// If [dontLogContents] is `true`, the contents of the file will never be
@@ -210,7 +217,7 @@
///
/// Replaces any file already at that path. Completes when the file is done
/// being written.
-Future<String> _createFileFromStream(Stream<List<int>> stream, String file) {
+Future<String> createFileFromStream(Stream<List<int>> stream, String file) {
// TODO(nweiz): remove extra logging when we figure out the windows bot issue.
log.io('Creating $file from stream.');
@@ -849,7 +856,7 @@
// Regular file
deleteIfLink(filePath);
ensureDir(parentDirectory);
- await _createFileFromStream(entry.contents, filePath);
+ await createFileFromStream(entry.contents, filePath);
if (Platform.isLinux || Platform.isMacOS) {
// Apply executable bits from tar header, but don't change r/w bits
diff --git a/lib/src/source/hosted.dart b/lib/src/source/hosted.dart
index d563839..83416ca 100644
--- a/lib/src/source/hosted.dart
+++ b/lib/src/source/hosted.dart
@@ -567,18 +567,29 @@
log.message('Downloading ${log.bold(id.name)} ${id.version}...');
// Download and extract the archive to a temp directory.
- var tempDir = systemCache.createTempDir();
- var response = await httpClient.send(http.Request('GET', url));
- await extractTarGz(response.stream, tempDir);
+ await withTempDir((tempDirForArchive) async {
+ var archivePath =
+ p.join(tempDirForArchive, '$packageName-$version.tar.gz');
+ var response = await httpClient.send(http.Request('GET', url));
- // Remove the existing directory if it exists. This will happen if
- // we're forcing a download to repair the cache.
- if (dirExists(destPath)) deleteEntry(destPath);
+ // We download the archive to disk instead of streaming it directly into
+ // the tar unpacking. This simplifies stream handling.
+ // Package:tar cancels the stream when it reaches end-of-archive, and
+ // cancelling a http stream makes it not reusable.
+ // There are ways around this, and we might revisit this later.
+ await createFileFromStream(response.stream, archivePath);
+ var tempDir = systemCache.createTempDir();
+ await extractTarGz(readBinaryFileAsSream(archivePath), tempDir);
- // Now that the get has succeeded, move it to the real location in the
- // cache. This ensures that we don't leave half-busted ghost
- // directories in the user's pub cache if a get fails.
- renameDir(tempDir, destPath);
+ // Remove the existing directory if it exists. This will happen if
+ // we're forcing a download to repair the cache.
+ if (dirExists(destPath)) deleteEntry(destPath);
+
+ // Now that the get has succeeded, move it to the real location in the
+ // cache. This ensures that we don't leave half-busted ghost
+ // directories in the user's pub cache if a get fails.
+ renameDir(tempDir, destPath);
+ });
}
/// When an error occurs trying to read something about [package] from [url],
diff --git a/test/get/hosted/get_stress_test.dart b/test/get/hosted/get_stress_test.dart
new file mode 100644
index 0000000..da45309
--- /dev/null
+++ b/test/get/hosted/get_stress_test.dart
@@ -0,0 +1,36 @@
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'package:test/test.dart';
+
+import '../../descriptor.dart' as d;
+import '../../test_pub.dart';
+
+void main() {
+ test('gets more than 16 packages from a pub server', () async {
+ await servePackages((builder) {
+ builder.serve('foo', '1.2.3');
+ for (var i = 0; i < 20; i++) {
+ builder.serve('pkg$i', '1.$i.0');
+ }
+ });
+
+ await d.appDir({
+ 'foo': '1.2.3',
+ for (var i = 0; i < 20; i++) 'pkg$i': '^1.$i.0',
+ }).create();
+
+ await pubGet();
+
+ await d.cacheDir({
+ 'foo': '1.2.3',
+ for (var i = 0; i < 20; i++) 'pkg$i': '1.$i.0',
+ }).validate();
+
+ await d.appPackagesFile({
+ 'foo': '1.2.3',
+ for (var i = 0; i < 20; i++) 'pkg$i': '1.$i.0',
+ }).validate();
+ });
+}