Add onBytesReceived callback to consolidateHttpClientResponseBytes() (#32853)

This will allow us to plumb the chunks in a chunked response
up to the higher levels of the framework to notify interested
parties of network loading progress.

https://github.com/flutter/flutter/issues/32374
diff --git a/packages/flutter/lib/src/foundation/consolidate_response.dart b/packages/flutter/lib/src/foundation/consolidate_response.dart
index 146ca25..b87501d 100644
--- a/packages/flutter/lib/src/foundation/consolidate_response.dart
+++ b/packages/flutter/lib/src/foundation/consolidate_response.dart
@@ -3,31 +3,121 @@
 // found in the LICENSE file.
 
 import 'dart:async';
+import 'dart:convert';
 import 'dart:io';
 import 'dart:typed_data';
 
-/// Efficiently converts the response body of an [HttpClientResponse] into a [Uint8List].
+/// Signature for getting notified when chunks of bytes are received while
+/// consolidating the bytes of an [HttpClientResponse] into a [Uint8List].
 ///
-/// The future returned will forward all errors emitted by [response].
-Future<Uint8List> consolidateHttpClientResponseBytes(HttpClientResponse response) {
-  // response.contentLength is not trustworthy when GZIP is involved
-  // or other cases where an intermediate transformer has been applied
-  // to the stream.
+/// The `cumulative` parameter will contain the total number of bytes received
+/// thus far. If the response has been gzipped, this number will be the number
+/// of compressed bytes that have been received _across the wire_.
+///
+/// The `total` parameter will contain the _expected_ total number of bytes to
+/// be received across the wire (extracted from the value of the
+/// `Content-Length` HTTP response header), or -1 if the size of the response
+/// body is not known in advance (this is common for HTTP chunked transfer
+/// encoding, which itself is common when a large amount of data is being
+/// returned to the client and the total size of the response may not be known
+/// until the request has been fully processed).
+///
+/// This is used in [consolidateHttpClientResponseBytes].
+typedef BytesReceivedCallback = void Function(int cumulative, int total);
+
+/// Efficiently converts the response body of an [HttpClientResponse] into a
+/// [Uint8List].
+///
+/// The future returned will forward any error emitted by `response`.
+///
+/// The `onBytesReceived` callback, if specified, will be invoked for every
+/// chunk of bytes that is received while consolidating the response bytes.
+/// If the callback throws an error, processing of the response will halt, and
+/// the returned future will complete with the error that was thrown by the
+/// callback. For more information on how to interpret the parameters to the
+/// callback, see the documentation on [BytesReceivedCallback].
+///
+/// If the `response` is gzipped and the `autoUncompress` parameter is true,
+/// this will automatically un-compress the bytes in the returned list if it
+/// hasn't already been done via [HttpClient.autoUncompress]. To get compressed
+/// bytes from this method (assuming the response is sending compressed bytes),
+/// set both [HttpClient.autoUncompress] to false and the `autoUncompress`
+/// parameter to false.
+// TODO(tvolkert): Remove the [client] param once https://github.com/dart-lang/sdk/issues/36971 is fixed.
+Future<Uint8List> consolidateHttpClientResponseBytes(
+  HttpClientResponse response, {
+  HttpClient client,
+  bool autoUncompress = true,
+  BytesReceivedCallback onBytesReceived,
+}) {
+  assert(autoUncompress != null);
   final Completer<Uint8List> completer = Completer<Uint8List>.sync();
-  final List<List<int>> chunks = <List<int>>[];
-  int contentLength = 0;
-  response.listen((List<int> chunk) {
-    chunks.add(chunk);
-    contentLength += chunk.length;
-  }, onDone: () {
-    final Uint8List bytes = Uint8List(contentLength);
-    int offset = 0;
-    for (List<int> chunk in chunks) {
-      bytes.setRange(offset, offset + chunk.length, chunk);
-      offset += chunk.length;
+
+  final _OutputBuffer output = _OutputBuffer();
+  ByteConversionSink sink = output;
+  int expectedContentLength = response.contentLength;
+  if (response.headers?.value(HttpHeaders.contentEncodingHeader) == 'gzip') {
+    if (client?.autoUncompress ?? true) {
+      // response.contentLength will not match our bytes stream, so we declare
+      // that we don't know the expected content length.
+      expectedContentLength = -1;
+    } else if (autoUncompress) {
+      // We need to un-compress the bytes as they come in.
+      sink = gzip.decoder.startChunkedConversion(output);
     }
-    completer.complete(bytes);
+  }
+
+  int bytesReceived = 0;
+  StreamSubscription<List<int>> subscription;
+  subscription = response.listen((List<int> chunk) {
+    sink.add(chunk);
+    if (onBytesReceived != null) {
+      bytesReceived += chunk.length;
+      try {
+        onBytesReceived(bytesReceived, expectedContentLength);
+      } catch (error, stackTrace) {
+        completer.completeError(error, stackTrace);
+        subscription.cancel();
+        return;
+      }
+    }
+  }, onDone: () {
+    sink.close();
+    completer.complete(output.bytes);
   }, onError: completer.completeError, cancelOnError: true);
 
   return completer.future;
 }
+
+class _OutputBuffer extends ByteConversionSinkBase {
+  List<List<int>> _chunks = <List<int>>[];
+  int _contentLength = 0;
+  Uint8List _bytes;
+
+  @override
+  void add(List<int> chunk) {
+    assert(_bytes == null);
+    _chunks.add(chunk);
+    _contentLength += chunk.length;
+  }
+
+  @override
+  void close() {
+    if (_bytes != null) {
+      // We've already been closed; this is a no-op
+      return;
+    }
+    _bytes = Uint8List(_contentLength);
+    int offset = 0;
+    for (List<int> chunk in _chunks) {
+      _bytes.setRange(offset, offset + chunk.length, chunk);
+      offset += chunk.length;
+    }
+    _chunks = null;
+  }
+
+  Uint8List get bytes {
+    assert(_bytes != null);
+    return _bytes;
+  }
+}
diff --git a/packages/flutter/test/foundation/consolidate_response_test.dart b/packages/flutter/test/foundation/consolidate_response_test.dart
index cd1d84d..507b66a 100644
--- a/packages/flutter/test/foundation/consolidate_response_test.dart
+++ b/packages/flutter/test/foundation/consolidate_response_test.dart
@@ -14,10 +14,17 @@
   group(consolidateHttpClientResponseBytes, () {
     final List<int> chunkOne = <int>[0, 1, 2, 3, 4, 5];
     final List<int> chunkTwo = <int>[6, 7, 8, 9, 10];
+    MockHttpClient client;
     MockHttpClientResponse response;
+    MockHttpHeaders headers;
 
     setUp(() {
+      client = MockHttpClient();
       response = MockHttpClientResponse();
+      headers = MockHttpHeaders();
+      when(client.autoUncompress).thenReturn(true);
+      when(response.headers).thenReturn(headers);
+      when(headers.value(HttpHeaders.contentEncodingHeader)).thenReturn(null);
       when(response.listen(
          any,
          onDone: anyNamed('onDone'),
@@ -43,7 +50,7 @@
       when(response.contentLength)
           .thenReturn(chunkOne.length + chunkTwo.length);
       final List<int> bytes =
-          await consolidateHttpClientResponseBytes(response);
+          await consolidateHttpClientResponseBytes(response, client: client);
 
       expect(bytes, <int>[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
     });
@@ -51,7 +58,7 @@
     test('Converts a compressed HttpClientResponse with contentLength to bytes', () async {
       when(response.contentLength).thenReturn(chunkOne.length);
       final List<int> bytes =
-          await consolidateHttpClientResponseBytes(response);
+          await consolidateHttpClientResponseBytes(response, client: client);
 
       expect(bytes, <int>[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
     });
@@ -59,11 +66,31 @@
     test('Converts an HttpClientResponse without contentLength to bytes', () async {
       when(response.contentLength).thenReturn(-1);
       final List<int> bytes =
-          await consolidateHttpClientResponseBytes(response);
+          await consolidateHttpClientResponseBytes(response, client: client);
 
       expect(bytes, <int>[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
     });
 
+    test('Notifies onBytesReceived for every chunk of bytes', () async {
+      final int syntheticTotal = (chunkOne.length + chunkTwo.length) * 2;
+      when(response.contentLength).thenReturn(syntheticTotal);
+      final List<int> records = <int>[];
+      await consolidateHttpClientResponseBytes(
+        response,
+        client: client,
+        onBytesReceived: (int cumulative, int total) {
+          records.addAll(<int>[cumulative, total]);
+        },
+      );
+
+      expect(records, <int>[
+        chunkOne.length,
+        syntheticTotal,
+        chunkOne.length + chunkTwo.length,
+        syntheticTotal,
+      ]);
+    });
+
     test('forwards errors from HttpClientResponse', () async {
       when(response.listen(
         any,
@@ -87,10 +114,108 @@
       });
       when(response.contentLength).thenReturn(-1);
 
-      expect(consolidateHttpClientResponseBytes(response),
+      expect(consolidateHttpClientResponseBytes(response, client: client),
           throwsA(isInstanceOf<Exception>()));
     });
+
+    test('Propagates error to Future return value if onBytesReceived throws', () async {
+      when(response.contentLength).thenReturn(-1);
+      final Future<List<int>> result = consolidateHttpClientResponseBytes(
+        response,
+        client: client,
+        onBytesReceived: (int cumulative, int total) {
+          throw 'misbehaving callback';
+        },
+      );
+
+      expect(result, throwsA(equals('misbehaving callback')));
+    });
+
+    group('when gzipped', () {
+      final List<int> gzipped = gzip.encode(chunkOne.followedBy(chunkTwo).toList());
+      final List<int> gzippedChunkOne = gzipped.sublist(0, gzipped.length ~/ 2);
+      final List<int> gzippedChunkTwo = gzipped.sublist(gzipped.length ~/ 2);
+
+      setUp(() {
+        when(headers.value(HttpHeaders.contentEncodingHeader)).thenReturn('gzip');
+        when(response.listen(
+          any,
+          onDone: anyNamed('onDone'),
+          onError: anyNamed('onError'),
+          cancelOnError: anyNamed('cancelOnError'),
+        )).thenAnswer((Invocation invocation) {
+          final void Function(List<int>) onData = invocation.positionalArguments[0];
+          final void Function(Object) onError = invocation.namedArguments[#onError];
+          final void Function() onDone = invocation.namedArguments[#onDone];
+          final bool cancelOnError = invocation.namedArguments[#cancelOnError];
+
+          return Stream<List<int>>.fromIterable(
+              <List<int>>[gzippedChunkOne, gzippedChunkTwo]).listen(
+            onData,
+            onDone: onDone,
+            onError: onError,
+            cancelOnError: cancelOnError,
+          );
+        });
+      });
+
+      test('Uncompresses GZIP bytes if autoUncompress is true and response.autoUncompress is false', () async {
+        when(client.autoUncompress).thenReturn(false);
+        when(response.contentLength).thenReturn(gzipped.length);
+        final List<int> bytes = await consolidateHttpClientResponseBytes(response, client: client);
+        expect(bytes, <int>[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
+      });
+
+      test('returns gzipped bytes if autoUncompress is false and response.autoUncompress is false', () async {
+        when(client.autoUncompress).thenReturn(false);
+        when(response.contentLength).thenReturn(gzipped.length);
+        final List<int> bytes = await consolidateHttpClientResponseBytes(response, client: client, autoUncompress: false);
+        expect(bytes, gzipped);
+      });
+
+      test('Notifies onBytesReceived with gzipped numbers', () async {
+        when(client.autoUncompress).thenReturn(false);
+        when(response.contentLength).thenReturn(gzipped.length);
+        final List<int> records = <int>[];
+        await consolidateHttpClientResponseBytes(
+          response,
+          client: client,
+          onBytesReceived: (int cumulative, int total) {
+            records.addAll(<int>[cumulative, total]);
+          },
+        );
+
+        expect(records, <int>[
+          gzippedChunkOne.length,
+          gzipped.length,
+          gzipped.length,
+          gzipped.length,
+        ]);
+      });
+
+      test('Notifies onBytesReceived with expectedContentLength of -1 if response.autoUncompress is true', () async {
+        final int syntheticTotal = (chunkOne.length + chunkTwo.length) * 2;
+        when(response.contentLength).thenReturn(syntheticTotal);
+        final List<int> records = <int>[];
+        await consolidateHttpClientResponseBytes(
+          response,
+          client: client,
+          onBytesReceived: (int cumulative, int total) {
+            records.addAll(<int>[cumulative, total]);
+          },
+        );
+
+        expect(records, <int>[
+          gzippedChunkOne.length,
+          -1,
+          gzipped.length,
+          -1,
+        ]);
+      });
+    });
   });
 }
 
+class MockHttpClient extends Mock implements HttpClient {}
 class MockHttpClientResponse extends Mock implements HttpClientResponse {}
+class MockHttpHeaders extends Mock implements HttpHeaders {}