blob: 37a46f7006a6d3618999d0c54fd8b650cd4e8658 [file] [log] [blame]
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:test/test.dart';
import 'package:stream_transform/stream_transform.dart';
void main() {
var streamTypes = {
'single subscription': () => new StreamController(),
'broadcast': () => new StreamController.broadcast()
};
for (var firstType in streamTypes.keys) {
for (var secondType in streamTypes.keys) {
group('concat [$firstType] with [$secondType]', () {
StreamController first;
StreamController second;
List emittedValues;
bool firstCanceled;
bool secondCanceled;
bool secondListened;
bool isDone;
List errors;
Stream transformed;
StreamSubscription subscription;
setUp(() async {
firstCanceled = false;
secondCanceled = false;
secondListened = false;
first = streamTypes[firstType]()
..onCancel = () {
firstCanceled = true;
};
second = streamTypes[secondType]()
..onCancel = () {
secondCanceled = true;
}
..onListen = () {
secondListened = true;
};
emittedValues = [];
errors = [];
isDone = false;
transformed = first.stream.transform(concat(second.stream));
subscription = transformed
.listen(emittedValues.add, onError: errors.add, onDone: () {
isDone = true;
});
});
test('adds all values from both streams', () async {
first..add(1)..add(2);
await first.close();
await new Future(() {});
second..add(3)..add(4);
await new Future(() {});
expect(emittedValues, [1, 2, 3, 4]);
});
test('Does not listen to second stream before first stream finishes',
() async {
expect(secondListened, false);
await first.close();
expect(secondListened, true);
});
test('closes stream after both inputs close', () async {
await first.close();
await second.close();
expect(isDone, true);
});
test('cancels any type of first stream on cancel', () async {
await subscription.cancel();
expect(firstCanceled, true);
});
if (firstType == 'single subscription') {
test(
'cancels any type of second stream on cancel if first is '
'broadcast', () async {
await first.close();
await subscription.cancel();
expect(secondCanceled, true);
});
if (secondType == 'broadcast') {
test('can pause and resume during second stream - dropping values',
() async {
await first.close();
subscription.pause();
second.add(1);
await new Future(() {});
subscription.resume();
second.add(2);
await new Future(() {});
expect(emittedValues, [2]);
});
} else {
test('can pause and resume during second stream - buffering values',
() async {
await first.close();
subscription.pause();
second.add(1);
await new Future(() {});
subscription.resume();
second.add(2);
await new Future(() {});
expect(emittedValues, [1, 2]);
});
}
}
if (firstType == 'broadcast') {
test('can cancel and relisten during first stream', () async {
await subscription.cancel();
first.add(1);
subscription = transformed.listen(emittedValues.add);
first.add(2);
await new Future(() {});
expect(emittedValues, [2]);
});
test('can cancel and relisten during second stream', () async {
await first.close();
await subscription.cancel();
second.add(2);
await new Future(() {});
subscription = transformed.listen(emittedValues.add);
second.add(3);
await new Future((() {}));
expect(emittedValues, [3]);
});
test('forwards values to multiple listeners', () async {
var otherValues = [];
transformed.listen(otherValues.add);
first.add(1);
await first.close();
second.add(2);
await new Future(() {});
var thirdValues = [];
transformed.listen(thirdValues.add);
second.add(3);
await new Future(() {});
expect(emittedValues, [1, 2, 3]);
expect(otherValues, [1, 2, 3]);
expect(thirdValues, [3]);
});
}
});
}
}
}