blob: 806f929aa3a976f8edabfcd89d93effa9ea528f6 [file] [log] [blame]
// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// VMOptions=--old_gen_heap_size=64 --no-background-compilation
library slow_consumer3_test;
import 'package:async_helper/async_helper.dart';
import "package:expect/expect.dart";
import 'dart:async';
const int KB = 1024;
const int MB = KB * KB;
const int GB = KB * KB * KB;
class SlowConsumer extends StreamConsumer<List<int>> {
int receivedCount = 0;
final int bytesPerSecond;
final int bufferSize;
final List<List<int>> bufferedData = [];
int usedBufferSize = 0;
int finalCount;
SlowConsumer(int this.bytesPerSecond, int this.bufferSize);
Future consume(Stream stream) {
return addStream(stream).then((_) => close());
}
Future addStream(Stream stream) {
Completer result = new Completer();
var subscription;
subscription = stream.listen((Object _data) {
List<int> data = _data;
receivedCount += data.length;
usedBufferSize += data.length;
bufferedData.add(data);
int currentBufferedDataLength = bufferedData.length;
if (usedBufferSize > bufferSize) {
subscription.pause();
usedBufferSize = 0;
int ms = data.length * 1000 ~/ bytesPerSecond;
Duration duration = new Duration(milliseconds: ms);
new Timer(duration, () {
for (int i = 0; i < currentBufferedDataLength; i++) {
bufferedData[i] = null;
}
subscription.resume();
});
}
}, onDone: () {
finalCount = receivedCount;
result.complete(receivedCount);
});
return result.future;
}
Future close() {
return new Future.value(finalCount);
}
}
Stream<List<int>> dataGenerator(int bytesTotal, int chunkSize) {
int chunks = bytesTotal ~/ chunkSize;
return new Stream.fromIterable(new Iterable.generate(chunks, (_) {
// This assumes one byte per entry. In practice it will be more.
return new List<int>(chunkSize);
}));
}
main() {
asyncStart();
// The data provider can deliver 800MBs of data as fast as it is
// requested. The data is sent in 0.5MB chunks. The consumer has a buffer of
// 3MB. That is, it can accept a few packages without pausing its input.
//
// Notice that we aren't really counting bytes, but words, since we use normal
// lists where each entry takes up a full word. In 64-bit VMs this will be
// 8 bytes per entry, so the 3*MB buffer is picked to stay below 32 actual
// MiB.
//
// This test is limited to 32MB of heap-space (see VMOptions on top of the
// file). If the consumer doesn't pause the data-provider it will run out of
// heap-space.
dataGenerator(100 * MB, 512 * KB)
.pipe(new SlowConsumer(200 * MB, 3 * MB))
.then((count) {
Expect.equals(100 * MB, count);
asyncEnd();
});
}