blob: 662831c0121259244acef67ebc057f5c93eebb8b [file] [log] [blame]
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:pedantic/pedantic.dart';
import 'package:test/test.dart';
import 'package:stream_transform/stream_transform.dart';
void main() {
group('Scan', () {
test('produces intermediate values', () async {
var source = Stream.fromIterable([1, 2, 3, 4]);
int sum(int x, int y) => x + y;
var result = await source.transform(scan(0, sum)).toList();
expect(result, [1, 3, 6, 10]);
});
test('can create a broadcast stream', () {
var source = StreamController.broadcast();
var transformed = source.stream.transform(scan(null, null));
expect(transformed.isBroadcast, true);
});
test('forwards errors from source', () async {
var source = StreamController<int>();
int sum(int x, int y) => x + y;
var errors = [];
source.stream.transform(scan(0, sum)).listen(null, onError: errors.add);
source.addError(StateError('fail'));
await Future(() {});
expect(errors, [isStateError]);
});
group('with async combine', () {
test('returns a Stream of non-futures', () async {
var source = Stream.fromIterable([1, 2, 3, 4]);
Future<int> sum(int x, int y) async => x + y;
var result = await source.transform(scan(0, sum)).toList();
expect(result, [1, 3, 6, 10]);
});
test('can return a Stream of futures when specified', () async {
var source = Stream.fromIterable([1, 2]);
Future<int> sum(Future<int> x, int y) async => (await x) + y;
var result = await source
.transform(scan<int, Future<int>>(Future.value(0), sum))
.toList();
expect(result, [
const TypeMatcher<Future<void>>(),
const TypeMatcher<Future<void>>()
]);
expect(await Future.wait(result), [1, 3]);
});
test('does not call for subsequent values while waiting', () async {
var source = StreamController<int>();
var calledWith = <int>[];
var block = Completer<void>();
Future<int> combine(int x, int y) async {
calledWith.add(y);
await block.future;
return x + y;
}
var results = <int>[];
unawaited(
source.stream.transform(scan(0, combine)).forEach(results.add));
source..add(1)..add(2);
await Future(() {});
expect(calledWith, [1]);
expect(results, isEmpty);
block.complete();
await Future(() {});
expect(calledWith, [1, 2]);
expect(results, [1, 3]);
});
test('forwards async errors', () async {
var source = StreamController<int>();
Future<int> combine(int x, int y) async => throw StateError('fail');
var errors = [];
source.stream
.transform(scan(0, combine))
.listen(null, onError: errors.add);
source.add(1);
await Future(() {});
expect(errors, [isStateError]);
});
});
});
}