Add onBytesReceived callback to consolidateHttpClientResponseBytes() (#32853)
This will allow us to plumb the chunks in a chunked response
up to the higher levels of the framework to notify interested
parties of network loading progress.
https://github.com/flutter/flutter/issues/32374
diff --git a/packages/flutter/lib/src/foundation/consolidate_response.dart b/packages/flutter/lib/src/foundation/consolidate_response.dart
index 146ca25..b87501d 100644
--- a/packages/flutter/lib/src/foundation/consolidate_response.dart
+++ b/packages/flutter/lib/src/foundation/consolidate_response.dart
@@ -3,31 +3,121 @@
// found in the LICENSE file.
import 'dart:async';
+import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';
-/// Efficiently converts the response body of an [HttpClientResponse] into a [Uint8List].
+/// Signature for getting notified when chunks of bytes are received while
+/// consolidating the bytes of an [HttpClientResponse] into a [Uint8List].
///
-/// The future returned will forward all errors emitted by [response].
-Future<Uint8List> consolidateHttpClientResponseBytes(HttpClientResponse response) {
- // response.contentLength is not trustworthy when GZIP is involved
- // or other cases where an intermediate transformer has been applied
- // to the stream.
+/// The `cumulative` parameter will contain the total number of bytes received
+/// thus far. If the response has been gzipped, this number will be the number
+/// of compressed bytes that have been received _across the wire_.
+///
+/// The `total` parameter will contain the _expected_ total number of bytes to
+/// be received across the wire (extracted from the value of the
+/// `Content-Length` HTTP response header), or -1 if the size of the response
+/// body is not known in advance (this is common for HTTP chunked transfer
+/// encoding, which itself is common when a large amount of data is being
+/// returned to the client and the total size of the response may not be known
+/// until the request has been fully processed).
+///
+/// This is used in [consolidateHttpClientResponseBytes].
+typedef BytesReceivedCallback = void Function(int cumulative, int total);
+
+/// Efficiently converts the response body of an [HttpClientResponse] into a
+/// [Uint8List].
+///
+/// The future returned will forward any error emitted by `response`.
+///
+/// The `onBytesReceived` callback, if specified, will be invoked for every
+/// chunk of bytes that is received while consolidating the response bytes.
+/// If the callback throws an error, processing of the response will halt, and
+/// the returned future will complete with the error that was thrown by the
+/// callback. For more information on how to interpret the parameters to the
+/// callback, see the documentation on [BytesReceivedCallback].
+///
+/// If the `response` is gzipped and the `autoUncompress` parameter is true,
+/// this will automatically un-compress the bytes in the returned list if it
+/// hasn't already been done via [HttpClient.autoUncompress]. To get compressed
+/// bytes from this method (assuming the response is sending compressed bytes),
+/// set both [HttpClient.autoUncompress] to false and the `autoUncompress`
+/// parameter to false.
+// TODO(tvolkert): Remove the [client] param once https://github.com/dart-lang/sdk/issues/36971 is fixed.
+Future<Uint8List> consolidateHttpClientResponseBytes(
+ HttpClientResponse response, {
+ HttpClient client,
+ bool autoUncompress = true,
+ BytesReceivedCallback onBytesReceived,
+}) {
+ assert(autoUncompress != null);
final Completer<Uint8List> completer = Completer<Uint8List>.sync();
- final List<List<int>> chunks = <List<int>>[];
- int contentLength = 0;
- response.listen((List<int> chunk) {
- chunks.add(chunk);
- contentLength += chunk.length;
- }, onDone: () {
- final Uint8List bytes = Uint8List(contentLength);
- int offset = 0;
- for (List<int> chunk in chunks) {
- bytes.setRange(offset, offset + chunk.length, chunk);
- offset += chunk.length;
+
+ final _OutputBuffer output = _OutputBuffer();
+ ByteConversionSink sink = output;
+ int expectedContentLength = response.contentLength;
+ if (response.headers?.value(HttpHeaders.contentEncodingHeader) == 'gzip') {
+ if (client?.autoUncompress ?? true) {
+ // response.contentLength will not match our bytes stream, so we declare
+ // that we don't know the expected content length.
+ expectedContentLength = -1;
+ } else if (autoUncompress) {
+ // We need to un-compress the bytes as they come in.
+ sink = gzip.decoder.startChunkedConversion(output);
}
- completer.complete(bytes);
+ }
+
+ int bytesReceived = 0;
+ StreamSubscription<List<int>> subscription;
+ subscription = response.listen((List<int> chunk) {
+ sink.add(chunk);
+ if (onBytesReceived != null) {
+ bytesReceived += chunk.length;
+ try {
+ onBytesReceived(bytesReceived, expectedContentLength);
+ } catch (error, stackTrace) {
+ completer.completeError(error, stackTrace);
+ subscription.cancel();
+ return;
+ }
+ }
+ }, onDone: () {
+ sink.close();
+ completer.complete(output.bytes);
}, onError: completer.completeError, cancelOnError: true);
return completer.future;
}
+
+class _OutputBuffer extends ByteConversionSinkBase {
+ List<List<int>> _chunks = <List<int>>[];
+ int _contentLength = 0;
+ Uint8List _bytes;
+
+ @override
+ void add(List<int> chunk) {
+ assert(_bytes == null);
+ _chunks.add(chunk);
+ _contentLength += chunk.length;
+ }
+
+ @override
+ void close() {
+ if (_bytes != null) {
+ // We've already been closed; this is a no-op
+ return;
+ }
+ _bytes = Uint8List(_contentLength);
+ int offset = 0;
+ for (List<int> chunk in _chunks) {
+ _bytes.setRange(offset, offset + chunk.length, chunk);
+ offset += chunk.length;
+ }
+ _chunks = null;
+ }
+
+ Uint8List get bytes {
+ assert(_bytes != null);
+ return _bytes;
+ }
+}
diff --git a/packages/flutter/test/foundation/consolidate_response_test.dart b/packages/flutter/test/foundation/consolidate_response_test.dart
index cd1d84d..507b66a 100644
--- a/packages/flutter/test/foundation/consolidate_response_test.dart
+++ b/packages/flutter/test/foundation/consolidate_response_test.dart
@@ -14,10 +14,17 @@
group(consolidateHttpClientResponseBytes, () {
final List<int> chunkOne = <int>[0, 1, 2, 3, 4, 5];
final List<int> chunkTwo = <int>[6, 7, 8, 9, 10];
+ MockHttpClient client;
MockHttpClientResponse response;
+ MockHttpHeaders headers;
setUp(() {
+ client = MockHttpClient();
response = MockHttpClientResponse();
+ headers = MockHttpHeaders();
+ when(client.autoUncompress).thenReturn(true);
+ when(response.headers).thenReturn(headers);
+ when(headers.value(HttpHeaders.contentEncodingHeader)).thenReturn(null);
when(response.listen(
any,
onDone: anyNamed('onDone'),
@@ -43,7 +50,7 @@
when(response.contentLength)
.thenReturn(chunkOne.length + chunkTwo.length);
final List<int> bytes =
- await consolidateHttpClientResponseBytes(response);
+ await consolidateHttpClientResponseBytes(response, client: client);
expect(bytes, <int>[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});
@@ -51,7 +58,7 @@
test('Converts a compressed HttpClientResponse with contentLength to bytes', () async {
when(response.contentLength).thenReturn(chunkOne.length);
final List<int> bytes =
- await consolidateHttpClientResponseBytes(response);
+ await consolidateHttpClientResponseBytes(response, client: client);
expect(bytes, <int>[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});
@@ -59,11 +66,31 @@
test('Converts an HttpClientResponse without contentLength to bytes', () async {
when(response.contentLength).thenReturn(-1);
final List<int> bytes =
- await consolidateHttpClientResponseBytes(response);
+ await consolidateHttpClientResponseBytes(response, client: client);
expect(bytes, <int>[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});
+ test('Notifies onBytesReceived for every chunk of bytes', () async {
+ final int syntheticTotal = (chunkOne.length + chunkTwo.length) * 2;
+ when(response.contentLength).thenReturn(syntheticTotal);
+ final List<int> records = <int>[];
+ await consolidateHttpClientResponseBytes(
+ response,
+ client: client,
+ onBytesReceived: (int cumulative, int total) {
+ records.addAll(<int>[cumulative, total]);
+ },
+ );
+
+ expect(records, <int>[
+ chunkOne.length,
+ syntheticTotal,
+ chunkOne.length + chunkTwo.length,
+ syntheticTotal,
+ ]);
+ });
+
test('forwards errors from HttpClientResponse', () async {
when(response.listen(
any,
@@ -87,10 +114,108 @@
});
when(response.contentLength).thenReturn(-1);
- expect(consolidateHttpClientResponseBytes(response),
+ expect(consolidateHttpClientResponseBytes(response, client: client),
throwsA(isInstanceOf<Exception>()));
});
+
+ test('Propagates error to Future return value if onBytesReceived throws', () async {
+ when(response.contentLength).thenReturn(-1);
+ final Future<List<int>> result = consolidateHttpClientResponseBytes(
+ response,
+ client: client,
+ onBytesReceived: (int cumulative, int total) {
+ throw 'misbehaving callback';
+ },
+ );
+
+ expect(result, throwsA(equals('misbehaving callback')));
+ });
+
+ group('when gzipped', () {
+ final List<int> gzipped = gzip.encode(chunkOne.followedBy(chunkTwo).toList());
+ final List<int> gzippedChunkOne = gzipped.sublist(0, gzipped.length ~/ 2);
+ final List<int> gzippedChunkTwo = gzipped.sublist(gzipped.length ~/ 2);
+
+ setUp(() {
+ when(headers.value(HttpHeaders.contentEncodingHeader)).thenReturn('gzip');
+ when(response.listen(
+ any,
+ onDone: anyNamed('onDone'),
+ onError: anyNamed('onError'),
+ cancelOnError: anyNamed('cancelOnError'),
+ )).thenAnswer((Invocation invocation) {
+ final void Function(List<int>) onData = invocation.positionalArguments[0];
+ final void Function(Object) onError = invocation.namedArguments[#onError];
+ final void Function() onDone = invocation.namedArguments[#onDone];
+ final bool cancelOnError = invocation.namedArguments[#cancelOnError];
+
+ return Stream<List<int>>.fromIterable(
+ <List<int>>[gzippedChunkOne, gzippedChunkTwo]).listen(
+ onData,
+ onDone: onDone,
+ onError: onError,
+ cancelOnError: cancelOnError,
+ );
+ });
+ });
+
+ test('Uncompresses GZIP bytes if autoUncompress is true and response.autoUncompress is false', () async {
+ when(client.autoUncompress).thenReturn(false);
+ when(response.contentLength).thenReturn(gzipped.length);
+ final List<int> bytes = await consolidateHttpClientResponseBytes(response, client: client);
+ expect(bytes, <int>[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
+ });
+
+ test('returns gzipped bytes if autoUncompress is false and response.autoUncompress is false', () async {
+ when(client.autoUncompress).thenReturn(false);
+ when(response.contentLength).thenReturn(gzipped.length);
+ final List<int> bytes = await consolidateHttpClientResponseBytes(response, client: client, autoUncompress: false);
+ expect(bytes, gzipped);
+ });
+
+ test('Notifies onBytesReceived with gzipped numbers', () async {
+ when(client.autoUncompress).thenReturn(false);
+ when(response.contentLength).thenReturn(gzipped.length);
+ final List<int> records = <int>[];
+ await consolidateHttpClientResponseBytes(
+ response,
+ client: client,
+ onBytesReceived: (int cumulative, int total) {
+ records.addAll(<int>[cumulative, total]);
+ },
+ );
+
+ expect(records, <int>[
+ gzippedChunkOne.length,
+ gzipped.length,
+ gzipped.length,
+ gzipped.length,
+ ]);
+ });
+
+ test('Notifies onBytesReceived with expectedContentLength of -1 if response.autoUncompress is true', () async {
+ final int syntheticTotal = (chunkOne.length + chunkTwo.length) * 2;
+ when(response.contentLength).thenReturn(syntheticTotal);
+ final List<int> records = <int>[];
+ await consolidateHttpClientResponseBytes(
+ response,
+ client: client,
+ onBytesReceived: (int cumulative, int total) {
+ records.addAll(<int>[cumulative, total]);
+ },
+ );
+
+ expect(records, <int>[
+ gzippedChunkOne.length,
+ -1,
+ gzipped.length,
+ -1,
+ ]);
+ });
+ });
});
}
+class MockHttpClient extends Mock implements HttpClient {}
class MockHttpClientResponse extends Mock implements HttpClientResponse {}
+class MockHttpHeaders extends Mock implements HttpHeaders {}