blob: f8156bce46bd0d916bad3a1c5b5df86183fd81e7 [file] [log] [blame]
// Copyright (c) 2019, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:pedantic/pedantic.dart';
import 'package:test/test.dart';
import 'package:stream_transform/stream_transform.dart';
void main() {
group('combineLatest', () {
test('flows through combine callback', () async {
var source = StreamController<int>();
var other = StreamController<int>();
int sum(int a, int b) => a + b;
var results = <int>[];
unawaited(source.stream
.transform(combineLatest(other.stream, sum))
.forEach(results.add));
source.add(1);
await Future(() {});
expect(results, isEmpty);
other.add(2);
await Future(() {});
expect(results, [3]);
source.add(3);
await Future(() {});
expect(results, [3, 5]);
source.add(4);
await Future(() {});
expect(results, [3, 5, 6]);
other.add(5);
await Future(() {});
expect(results, [3, 5, 6, 9]);
});
test('can combine different typed streams', () async {
var source = StreamController<String>();
var other = StreamController<int>();
String times(String a, int b) => a * b;
var results = <String>[];
unawaited(source.stream
.transform(combineLatest(other.stream, times))
.forEach(results.add));
source..add('a')..add('b');
await Future(() {});
expect(results, isEmpty);
other.add(2);
await Future(() {});
expect(results, ['bb']);
other.add(3);
await Future(() {});
expect(results, ['bb', 'bbb']);
source.add('c');
await Future(() {});
expect(results, ['bb', 'bbb', 'ccc']);
});
test('ends after both streams have ended', () async {
var source = StreamController<int>();
var other = StreamController<int>();
int sum(int a, int b) => a + b;
var done = false;
source.stream
.transform(combineLatest(other.stream, sum))
.listen(null, onDone: () => done = true);
source.add(1);
await source.close();
await Future(() {});
expect(done, false);
await other.close();
await Future(() {});
expect(done, true);
});
test('ends if source stream closes without ever emitting a value',
() async {
var source = const Stream<int>.empty();
var other = StreamController<int>();
int sum(int a, int b) => a + b;
var done = false;
source
.transform(combineLatest(other.stream, sum))
.listen(null, onDone: () => done = true);
await Future(() {});
// Nothing can ever be emitted on the result, may as well close.
expect(done, true);
});
test('ends if other stream closes without ever emitting a value', () async {
var source = StreamController<int>();
var other = const Stream<int>.empty();
int sum(int a, int b) => a + b;
var done = false;
source.stream
.transform(combineLatest(other, sum))
.listen(null, onDone: () => done = true);
await Future(() {});
// Nothing can ever be emitted on the result, may as well close.
expect(done, true);
});
test('forwards errors', () async {
var source = StreamController<int>();
var other = StreamController<int>();
int sum(int a, int b) => throw _NumberedException(3);
var errors = [];
source.stream
.transform(combineLatest(other.stream, sum))
.listen(null, onError: errors.add);
source.addError(_NumberedException(1));
other.addError(_NumberedException(2));
source.add(1);
other.add(2);
await Future(() {});
expect(errors, [_isException(1), _isException(2), _isException(3)]);
});
group('broadcast source', () {
test('can cancel and relisten to broadcast stream', () async {
var source = StreamController<int>.broadcast();
var other = StreamController<int>();
int combine(int a, int b) => a + b;
var emittedValues = <int>[];
var transformed =
source.stream.transform(combineLatest(other.stream, combine));
var subscription = transformed.listen(emittedValues.add);
source.add(1);
other.add(2);
await Future(() {});
expect(emittedValues, [3]);
await subscription.cancel();
subscription = transformed.listen(emittedValues.add);
source.add(3);
await Future(() {});
expect(emittedValues, [3, 5]);
});
});
});
}
class _NumberedException implements Exception {
final int id;
_NumberedException(this.id);
}
Matcher _isException(int id) =>
const TypeMatcher<_NumberedException>().having((n) => n.id, 'id', id);