blob: 694199c70f36c525828d2d92b8bff638f211f808 [file] [log] [blame]
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:stream_transform/src/from_handlers.dart';
import 'package:test/test.dart';
void main() {
late StreamController<int> values;
late List<int> emittedValues;
late bool valuesCanceled;
late bool isDone;
late List<String> errors;
late Stream<int> transformed;
late StreamSubscription<int> subscription;
void setUpForController(StreamController<int> controller,
Stream<int> Function(Stream<int>) transform) {
valuesCanceled = false;
values = controller
..onCancel = () {
valuesCanceled = true;
};
emittedValues = [];
errors = [];
isDone = false;
transformed = transform(values.stream);
subscription =
transformed.listen(emittedValues.add, onError: errors.add, onDone: () {
isDone = true;
});
}
group('default from_handlers', () {
group('Single subscription stream', () {
setUp(() {
setUpForController(StreamController(),
(s) => s.transformByHandlers(onData: (e, sink) => sink.add(e)));
});
test('has correct stream type', () {
expect(transformed.isBroadcast, false);
});
test('forwards values', () async {
values
..add(1)
..add(2);
await Future(() {});
expect(emittedValues, [1, 2]);
});
test('forwards errors', () async {
values.addError('error');
await Future(() {});
expect(errors, ['error']);
});
test('forwards done', () async {
await values.close();
expect(isDone, true);
});
test('forwards cancel', () async {
await subscription.cancel();
expect(valuesCanceled, true);
});
});
group('broadcast stream with muliple listeners', () {
late List<int> emittedValues2;
late List<String> errors2;
late bool isDone2;
late StreamSubscription<int> subscription2;
setUp(() {
setUpForController(StreamController.broadcast(),
(s) => s.transformByHandlers(onData: (e, sink) => sink.add(e)));
emittedValues2 = [];
errors2 = [];
isDone2 = false;
subscription2 = transformed
.listen(emittedValues2.add, onError: errors2.add, onDone: () {
isDone2 = true;
});
});
test('has correct stream type', () {
expect(transformed.isBroadcast, true);
});
test('forwards values', () async {
values
..add(1)
..add(2);
await Future(() {});
expect(emittedValues, [1, 2]);
expect(emittedValues2, [1, 2]);
});
test('forwards errors', () async {
values.addError('error');
await Future(() {});
expect(errors, ['error']);
expect(errors2, ['error']);
});
test('forwards done', () async {
await values.close();
expect(isDone, true);
expect(isDone2, true);
});
test('forwards cancel', () async {
await subscription.cancel();
expect(valuesCanceled, false);
await subscription2.cancel();
expect(valuesCanceled, true);
});
});
});
group('custom handlers', () {
group('single subscription', () {
setUp(() async {
setUpForController(
StreamController(),
(s) => s.transformByHandlers(onData: (value, sink) {
sink.add(value + 1);
}));
});
test('uses transform from handleData', () async {
values
..add(1)
..add(2);
await Future(() {});
expect(emittedValues, [2, 3]);
});
});
group('broadcast stream with multiple listeners', () {
late int dataCallCount;
late int doneCallCount;
late int errorCallCount;
setUp(() async {
dataCallCount = 0;
doneCallCount = 0;
errorCallCount = 0;
setUpForController(
StreamController.broadcast(),
(s) => s.transformByHandlers(onData: (value, sink) {
dataCallCount++;
}, onError: (error, stackTrace, sink) {
errorCallCount++;
sink.addError(error, stackTrace);
}, onDone: (sink) {
doneCallCount++;
}));
transformed.listen((_) {}, onError: (_, __) {});
});
test('handles data once', () async {
values.add(1);
await Future(() {});
expect(dataCallCount, 1);
});
test('handles done once', () async {
await values.close();
expect(doneCallCount, 1);
});
test('handles errors once', () async {
values.addError('error');
await Future(() {});
expect(errorCallCount, 1);
});
});
});
}