| // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| import 'dart:async'; |
| |
| import 'package:async/async.dart'; |
| import 'package:test/test.dart'; |
| |
| /// Create an error with the same values as [base], except that it throwsA |
| /// when seeing the value [errorValue]. |
| Stream streamError(Stream base, int errorValue, Object error) { |
| return base.map((x) => (x == errorValue) ? throw error : x); |
| } |
| |
| /// Make a [Stream] from an [Iterable] by adding events to a stream controller |
| /// at periodic intervals. |
| Stream<T> mks<T>(Iterable<T> iterable) { |
| var iterator = iterable.iterator; |
| var controller = StreamController<T>(); |
| // Some varying time between 3 and 10 ms. |
| var ms = ((++ctr) * 5) % 7 + 3; |
| Timer.periodic(Duration(milliseconds: ms), (Timer timer) { |
| if (iterator.moveNext()) { |
| controller.add(iterator.current); |
| } else { |
| controller.close(); |
| timer.cancel(); |
| } |
| }); |
| return controller.stream; |
| } |
| |
| /// Counter used to give varying delays for streams. |
| int ctr = 0; |
| |
| void main() { |
| // Test that zipping [streams] gives the results iterated by [expectedData]. |
| void testZip(Iterable<Stream> streams, Iterable expectedData) { |
| var data = []; |
| Stream zip = StreamZip(streams); |
| zip.listen(data.add, onDone: expectAsync0(() { |
| expect(data, equals(expectedData)); |
| })); |
| } |
| |
| test('Basic', () { |
| testZip([ |
| mks([1, 2, 3]), |
| mks([4, 5, 6]), |
| mks([7, 8, 9]) |
| ], [ |
| [1, 4, 7], |
| [2, 5, 8], |
| [3, 6, 9] |
| ]); |
| }); |
| |
| test('Uneven length 1', () { |
| testZip([ |
| mks([1, 2, 3, 99, 100]), |
| mks([4, 5, 6]), |
| mks([7, 8, 9]) |
| ], [ |
| [1, 4, 7], |
| [2, 5, 8], |
| [3, 6, 9] |
| ]); |
| }); |
| |
| test('Uneven length 2', () { |
| testZip([ |
| mks([1, 2, 3]), |
| mks([4, 5, 6, 99, 100]), |
| mks([7, 8, 9]) |
| ], [ |
| [1, 4, 7], |
| [2, 5, 8], |
| [3, 6, 9] |
| ]); |
| }); |
| |
| test('Uneven length 3', () { |
| testZip([ |
| mks([1, 2, 3]), |
| mks([4, 5, 6]), |
| mks([7, 8, 9, 99, 100]) |
| ], [ |
| [1, 4, 7], |
| [2, 5, 8], |
| [3, 6, 9] |
| ]); |
| }); |
| |
| test('Uneven length 4', () { |
| testZip([ |
| mks([1, 2, 3, 98]), |
| mks([4, 5, 6]), |
| mks([7, 8, 9, 99, 100]) |
| ], [ |
| [1, 4, 7], |
| [2, 5, 8], |
| [3, 6, 9] |
| ]); |
| }); |
| |
| test('Empty 1', () { |
| testZip([ |
| mks([]), |
| mks([4, 5, 6]), |
| mks([7, 8, 9]) |
| ], []); |
| }); |
| |
| test('Empty 2', () { |
| testZip([ |
| mks([1, 2, 3]), |
| mks([]), |
| mks([7, 8, 9]) |
| ], []); |
| }); |
| |
| test('Empty 3', () { |
| testZip([ |
| mks([1, 2, 3]), |
| mks([4, 5, 6]), |
| mks([]) |
| ], []); |
| }); |
| |
| test('Empty source', () { |
| testZip([], []); |
| }); |
| |
| test('Single Source', () { |
| testZip([ |
| mks([1, 2, 3]) |
| ], [ |
| [1], |
| [2], |
| [3] |
| ]); |
| }); |
| |
| test('Other-streams', () { |
| var st1 = mks([1, 2, 3, 4, 5, 6]).where((x) => x < 4); |
| Stream st2 = |
| Stream.periodic(const Duration(milliseconds: 5), (x) => x + 4).take(3); |
| var c = StreamController.broadcast(); |
| var st3 = c.stream; |
| testZip([ |
| st1, |
| st2, |
| st3 |
| ], [ |
| [1, 4, 7], |
| [2, 5, 8], |
| [3, 6, 9] |
| ]); |
| c |
| ..add(7) |
| ..add(8) |
| ..add(9) |
| ..close(); |
| }); |
| |
| test('Error 1', () { |
| expect( |
| StreamZip([ |
| streamError(mks([1, 2, 3]), 2, 'BAD-1'), |
| mks([4, 5, 6]), |
| mks([7, 8, 9]) |
| ]).toList(), |
| throwsA(equals('BAD-1'))); |
| }); |
| |
| test('Error 2', () { |
| expect( |
| StreamZip([ |
| mks([1, 2, 3]), |
| streamError(mks([4, 5, 6]), 5, 'BAD-2'), |
| mks([7, 8, 9]) |
| ]).toList(), |
| throwsA(equals('BAD-2'))); |
| }); |
| |
| test('Error 3', () { |
| expect( |
| StreamZip([ |
| mks([1, 2, 3]), |
| mks([4, 5, 6]), |
| streamError(mks([7, 8, 9]), 8, 'BAD-3') |
| ]).toList(), |
| throwsA(equals('BAD-3'))); |
| }); |
| |
| test('Error at end', () { |
| expect( |
| StreamZip([ |
| mks([1, 2, 3]), |
| streamError(mks([4, 5, 6]), 6, 'BAD-4'), |
| mks([7, 8, 9]) |
| ]).toList(), |
| throwsA(equals('BAD-4'))); |
| }); |
| |
| test('Error before first end', () { |
| // StreamControllers' streams with no "close" called will never be done, |
| // so the fourth event of the first stream is guaranteed to come first. |
| expect( |
| StreamZip([ |
| streamError(mks([1, 2, 3, 4]), 4, 'BAD-5'), |
| (StreamController()..add(4)..add(5)..add(6)).stream, |
| (StreamController()..add(7)..add(8)..add(9)).stream |
| ]).toList(), |
| throwsA(equals('BAD-5'))); |
| }); |
| |
| test('Error after first end', () { |
| var controller = StreamController(); |
| controller..add(7)..add(8)..add(9); |
| // Transformer that puts error into controller when one of the first two |
| // streams have sent a done event. |
| var trans = |
| StreamTransformer<int, int>.fromHandlers(handleDone: (EventSink s) { |
| Timer.run(() { |
| controller.addError('BAD-6'); |
| }); |
| s.close(); |
| }); |
| testZip([ |
| mks([1, 2, 3]).transform(trans), |
| mks([4, 5, 6]).transform(trans), |
| controller.stream |
| ], [ |
| [1, 4, 7], |
| [2, 5, 8], |
| [3, 6, 9] |
| ]); |
| }); |
| |
| test('Pause/Resume', () { |
| var sc1p = 0; |
| var c1 = StreamController(onPause: () { |
| sc1p++; |
| }, onResume: () { |
| sc1p--; |
| }); |
| |
| var sc2p = 0; |
| var c2 = StreamController(onPause: () { |
| sc2p++; |
| }, onResume: () { |
| sc2p--; |
| }); |
| |
| var done = expectAsync0(() { |
| expect(sc1p, equals(1)); |
| expect(sc2p, equals(0)); |
| }); // Call to complete test. |
| |
| Stream zip = StreamZip([c1.stream, c2.stream]); |
| |
| const ms25 = Duration(milliseconds: 25); |
| |
| // StreamIterator uses pause and resume to control flow. |
| var it = StreamIterator(zip); |
| |
| it.moveNext().then((hasMore) { |
| expect(hasMore, isTrue); |
| expect(it.current, equals([1, 2])); |
| return it.moveNext(); |
| }).then((hasMore) { |
| expect(hasMore, isTrue); |
| expect(it.current, equals([3, 4])); |
| c2.add(6); |
| return it.moveNext(); |
| }).then((hasMore) { |
| expect(hasMore, isTrue); |
| expect(it.current, equals([5, 6])); |
| Future.delayed(ms25).then((_) { |
| c2.add(8); |
| }); |
| return it.moveNext(); |
| }).then((hasMore) { |
| expect(hasMore, isTrue); |
| expect(it.current, equals([7, 8])); |
| c2.add(9); |
| return it.moveNext(); |
| }).then((hasMore) { |
| expect(hasMore, isFalse); |
| done(); |
| }); |
| |
| c1 |
| ..add(1) |
| ..add(3) |
| ..add(5) |
| ..add(7) |
| ..close(); |
| c2..add(2)..add(4); |
| }); |
| |
| test('pause-resume2', () { |
| var s1 = Stream.fromIterable([0, 2, 4, 6, 8]); |
| var s2 = Stream.fromIterable([1, 3, 5, 7]); |
| var sz = StreamZip([s1, s2]); |
| var ctr = 0; |
| late StreamSubscription sub; |
| sub = sz.listen(expectAsync1((v) { |
| expect(v, equals([ctr * 2, ctr * 2 + 1])); |
| if (ctr == 1) { |
| sub.pause(Future.delayed(const Duration(milliseconds: 25))); |
| } else if (ctr == 2) { |
| sub.pause(); |
| Future.delayed(const Duration(milliseconds: 25)).then((_) { |
| sub.resume(); |
| }); |
| } |
| ctr++; |
| }, count: 4)); |
| }); |
| } |