|  | // Copyright (c) 2012, the Dart project authors.  Please see the AUTHORS file | 
|  | // for details. All rights reserved. Use of this source code is governed by a | 
|  | // BSD-style license that can be found in the LICENSE file. | 
|  |  | 
|  | // @dart = 2.9 | 
|  |  | 
|  | // VMOptions=--old_gen_heap_size=64 --no-background-compilation | 
|  |  | 
|  | library slow_consumer3_test; | 
|  |  | 
|  | import 'package:async_helper/async_helper.dart'; | 
|  | import "package:expect/expect.dart"; | 
|  | import 'dart:async'; | 
|  |  | 
|  | const int KB = 1024; | 
|  | const int MB = KB * KB; | 
|  | const int GB = KB * KB * KB; | 
|  |  | 
|  | class SlowConsumer extends StreamConsumer<List<int>> { | 
|  | int receivedCount = 0; | 
|  | final int bytesPerSecond; | 
|  | final int bufferSize; | 
|  | final List<List<int>> bufferedData = []; | 
|  | int usedBufferSize = 0; | 
|  | int finalCount; | 
|  |  | 
|  | SlowConsumer(int this.bytesPerSecond, int this.bufferSize); | 
|  |  | 
|  | Future consume(Stream stream) { | 
|  | return addStream(stream).then((_) => close()); | 
|  | } | 
|  |  | 
|  | Future addStream(Stream stream) { | 
|  | Completer result = new Completer(); | 
|  | var subscription; | 
|  | subscription = stream.listen((Object _data) { | 
|  | List<int> data = _data; | 
|  | receivedCount += data.length; | 
|  | usedBufferSize += data.length; | 
|  | bufferedData.add(data); | 
|  | int currentBufferedDataLength = bufferedData.length; | 
|  | if (usedBufferSize > bufferSize) { | 
|  | subscription.pause(); | 
|  | usedBufferSize = 0; | 
|  | int ms = data.length * 1000 ~/ bytesPerSecond; | 
|  | Duration duration = new Duration(milliseconds: ms); | 
|  | new Timer(duration, () { | 
|  | for (int i = 0; i < currentBufferedDataLength; i++) { | 
|  | bufferedData[i] = null; | 
|  | } | 
|  | subscription.resume(); | 
|  | }); | 
|  | } | 
|  | }, onDone: () { | 
|  | finalCount = receivedCount; | 
|  | result.complete(receivedCount); | 
|  | }); | 
|  | return result.future; | 
|  | } | 
|  |  | 
|  | Future close() { | 
|  | return new Future.value(finalCount); | 
|  | } | 
|  | } | 
|  |  | 
|  | Stream<List<int>> dataGenerator(int bytesTotal, int chunkSize) { | 
|  | int chunks = bytesTotal ~/ chunkSize; | 
|  | return new Stream.fromIterable(new Iterable.generate(chunks, (_) { | 
|  | // This assumes one byte per entry. In practice it will be more. | 
|  | return new List<int>.filled(chunkSize, null); | 
|  | })); | 
|  | } | 
|  |  | 
|  | main() { | 
|  | asyncStart(); | 
|  | // The data provider can deliver 800MBs of data as fast as it is | 
|  | // requested. The data is sent in 0.5MB chunks. The consumer has a buffer of | 
|  | // 3MB. That is, it can accept a few packages without pausing its input. | 
|  | // | 
|  | // Notice that we aren't really counting bytes, but words, since we use normal | 
|  | // lists where each entry takes up a full word. In 64-bit VMs this will be | 
|  | // 8 bytes per entry, so the 3*MB buffer is picked to stay below 32 actual | 
|  | // MiB. | 
|  | // | 
|  | // This test is limited to 32MB of heap-space (see VMOptions on top of the | 
|  | // file). If the consumer doesn't pause the data-provider it will run out of | 
|  | // heap-space. | 
|  |  | 
|  | dataGenerator(100 * MB, 512 * KB) | 
|  | .pipe(new SlowConsumer(200 * MB, 3 * MB)) | 
|  | .then((count) { | 
|  | Expect.equals(100 * MB, count); | 
|  | asyncEnd(); | 
|  | }); | 
|  | } |