| // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| // Test that transformations like `map` and `where` preserve broadcast flag. |
| library stream_join_test; |
| |
| import 'dart:async'; |
| |
| import 'package:expect/expect.dart'; |
| import 'package:async_helper/async_minitest.dart'; |
| |
| import 'event_helper.dart'; |
| |
| main() { |
| testStream("singlesub", () => new StreamController(), (c) => c.stream); |
| testStream( |
| "broadcast", () => new StreamController.broadcast(), (c) => c.stream); |
| testStream("asBroadcast", () => new StreamController(), |
| (c) => c.stream.asBroadcastStream()); |
| testStream("broadcast.asBroadcast", () => new StreamController.broadcast(), |
| (c) => c.stream.asBroadcastStream()); |
| } |
| |
| void testStream( |
| String name, StreamController create(), Stream getStream(controller)) { |
| test("$name-map", () { |
| var c = create(); |
| var s = getStream(c); |
| Stream newStream = s.map((x) => x + 1); |
| Expect.equals(s.isBroadcast, newStream.isBroadcast); |
| newStream.single.then(expectAsync((v) { |
| Expect.equals(43, v); |
| })); |
| c.add(42); |
| c.close(); |
| }); |
| test("$name-where", () { |
| var c = create(); |
| var s = getStream(c); |
| Stream newStream = s.where((x) => x.isEven); |
| Expect.equals(s.isBroadcast, newStream.isBroadcast); |
| newStream.single.then(expectAsync((v) { |
| Expect.equals(42, v); |
| })); |
| c.add(37); |
| c.add(42); |
| c.add(87); |
| c.close(); |
| }); |
| test("$name-handleError", () { |
| var c = create(); |
| var s = getStream(c); |
| Stream newStream = s.handleError((x, s) {}); |
| Expect.equals(s.isBroadcast, newStream.isBroadcast); |
| newStream.single.then(expectAsync((v) { |
| Expect.equals(42, v); |
| })); |
| c.addError("BAD1"); |
| c.add(42); |
| c.addError("BAD2"); |
| c.close(); |
| }); |
| test("$name-expand", () { |
| var c = create(); |
| var s = getStream(c); |
| Stream newStream = s.expand((x) => x.isEven ? [x] : []); |
| Expect.equals(s.isBroadcast, newStream.isBroadcast); |
| newStream.single.then(expectAsync((v) { |
| Expect.equals(42, v); |
| })); |
| c.add(37); |
| c.add(42); |
| c.add(87); |
| c.close(); |
| }); |
| test("$name-transform", () { |
| var c = create(); |
| var s = getStream(c); |
| // TODO: find name of default transformer |
| var t = |
| new StreamTransformer.fromHandlers(handleData: (value, EventSink sink) { |
| sink.add(value); |
| }); |
| Stream newStream = s.transform(t); |
| Expect.equals(s.isBroadcast, newStream.isBroadcast); |
| newStream.single.then(expectAsync((v) { |
| Expect.equals(42, v); |
| })); |
| c.add(42); |
| c.close(); |
| }); |
| test("$name-take", () { |
| var c = create(); |
| var s = getStream(c); |
| Stream newStream = s.take(1); |
| Expect.equals(s.isBroadcast, newStream.isBroadcast); |
| newStream.single.then(expectAsync((v) { |
| Expect.equals(42, v); |
| })); |
| c.add(42); |
| c.add(37); |
| c.close(); |
| }); |
| test("$name-takeWhile", () { |
| var c = create(); |
| var s = getStream(c); |
| Stream newStream = s.takeWhile((x) => x.isEven); |
| Expect.equals(s.isBroadcast, newStream.isBroadcast); |
| newStream.single.then(expectAsync((v) { |
| Expect.equals(42, v); |
| })); |
| c.add(42); |
| c.add(37); |
| c.close(); |
| }); |
| test("$name-skip", () { |
| var c = create(); |
| var s = getStream(c); |
| Stream newStream = s.skip(1); |
| Expect.equals(s.isBroadcast, newStream.isBroadcast); |
| newStream.single.then(expectAsync((v) { |
| Expect.equals(42, v); |
| })); |
| c.add(37); |
| c.add(42); |
| c.close(); |
| }); |
| test("$name-skipWhile", () { |
| var c = create(); |
| var s = getStream(c); |
| Stream newStream = s.skipWhile((x) => x.isOdd); |
| Expect.equals(s.isBroadcast, newStream.isBroadcast); |
| newStream.single.then(expectAsync((v) { |
| Expect.equals(42, v); |
| })); |
| c.add(37); |
| c.add(42); |
| c.close(); |
| }); |
| test("$name-distinct", () { |
| var c = create(); |
| var s = getStream(c); |
| Stream newStream = s.distinct(); |
| Expect.equals(s.isBroadcast, newStream.isBroadcast); |
| newStream.single.then(expectAsync((v) { |
| Expect.equals(42, v); |
| })); |
| c.add(42); |
| c.add(42); |
| c.close(); |
| }); |
| test("$name-timeout", () { |
| var c = create(); |
| var s = getStream(c); |
| Stream newStream = s.timeout(const Duration(seconds: 1)); |
| Expect.equals(s.isBroadcast, newStream.isBroadcast); |
| newStream.single.then(expectAsync((v) { |
| Expect.equals(42, v); |
| })); |
| c.add(42); |
| c.close(); |
| }); |
| test("$name-asyncMap", () { |
| var c = create(); |
| var s = getStream(c); |
| Stream newStream = s.asyncMap((x) => new Future.value(x + 1)); |
| Expect.equals(s.isBroadcast, newStream.isBroadcast); |
| newStream.single.then(expectAsync((v) { |
| Expect.equals(43, v); |
| })); |
| c.add(42); |
| c.close(); |
| }); |
| test("$name-asyncExpand", () { |
| var c = create(); |
| var s = getStream(c); |
| Stream newStream = s.asyncExpand((x) => new Stream.fromIterable([x + 1])); |
| Expect.equals(s.isBroadcast, newStream.isBroadcast); |
| newStream.single.then(expectAsync((v) { |
| Expect.equals(43, v); |
| })); |
| c.add(42); |
| c.close(); |
| }); |
| |
| // The following tests are only on broadcast streams, they require listening |
| // more than once. |
| if (name.startsWith("singlesub")) return; |
| |
| test("$name-skip-multilisten", () { |
| if (name.startsWith("singlesub") || name.startsWith("asBroadcast")) return; |
| var c = create(); |
| var s = getStream(c); |
| Stream newStream = s.skip(5); |
| // Listen immediately, to ensure that an asBroadcast stream is started. |
| var sub = newStream.listen((_) {}); |
| int i = 0; |
| var expect1 = 11; |
| var expect2 = 21; |
| var handler2 = expectAsync((v) { |
| expect(v, expect2); |
| expect2++; |
| }, count: 5); |
| var handler1 = expectAsync((v) { |
| expect(v, expect1); |
| expect1++; |
| }, count: 15); |
| var loop; |
| loop = expectAsync(() { |
| i++; |
| c.add(i); |
| if (i == 5) { |
| scheduleMicrotask(() { |
| newStream.listen(handler1); |
| }); |
| } |
| if (i == 15) { |
| scheduleMicrotask(() { |
| newStream.listen(handler2); |
| }); |
| } |
| if (i < 25) { |
| scheduleMicrotask(loop); |
| } else { |
| sub.cancel(); |
| c.close(); |
| } |
| }, count: 25); |
| scheduleMicrotask(loop); |
| }); |
| |
| test("$name-take-multilisten", () { |
| var c = create(); |
| var s = getStream(c); |
| Stream newStream = s.take(10); |
| // Listen immediately, to ensure that an asBroadcast stream is started. |
| var sub = newStream.listen((_) {}); |
| int i = 0; |
| var expect1 = 6; |
| var expect2 = 11; |
| var handler2 = expectAsync((v) { |
| expect(v, expect2); |
| expect(v <= 20, isTrue); |
| expect2++; |
| }, count: 10); |
| var handler1 = expectAsync((v) { |
| expect(v, expect1); |
| expect(v <= 15, isTrue); |
| expect1++; |
| }, count: 10); |
| var loop; |
| loop = expectAsync(() { |
| i++; |
| c.add(i); |
| if (i == 5) { |
| scheduleMicrotask(() { |
| newStream.listen(handler1); |
| }); |
| } |
| if (i == 10) { |
| scheduleMicrotask(() { |
| newStream.listen(handler2); |
| }); |
| } |
| if (i < 25) { |
| scheduleMicrotask(loop); |
| } else { |
| sub.cancel(); |
| c.close(); |
| } |
| }, count: 25); |
| scheduleMicrotask(loop); |
| }); |
| |
| test("$name-skipWhile-multilisten", () { |
| if (name.startsWith("singlesub") || name.startsWith("asBroadcast")) return; |
| var c = create(); |
| var s = getStream(c); |
| Stream newStream = s.skipWhile((x) => (x % 10) != 1); |
| // Listen immediately, to ensure that an asBroadcast stream is started. |
| var sub = newStream.listen((_) {}); |
| int i = 0; |
| var expect1 = 11; |
| var expect2 = 21; |
| var handler2 = expectAsync((v) { |
| expect(v, expect2); |
| expect2++; |
| }, count: 5); |
| var handler1 = expectAsync((v) { |
| expect(v, expect1); |
| expect1++; |
| }, count: 15); |
| var loop; |
| loop = expectAsync(() { |
| i++; |
| c.add(i); |
| if (i == 5) { |
| scheduleMicrotask(() { |
| newStream.listen(handler1); |
| }); |
| } |
| if (i == 15) { |
| scheduleMicrotask(() { |
| newStream.listen(handler2); |
| }); |
| } |
| if (i < 25) { |
| scheduleMicrotask(loop); |
| } else { |
| sub.cancel(); |
| c.close(); |
| } |
| }, count: 25); |
| scheduleMicrotask(loop); |
| }); |
| |
| test("$name-takeWhile-multilisten", () { |
| var c = create(); |
| var s = getStream(c); |
| Stream newStream = s.takeWhile((x) => (x % 10) != 5); |
| // Listen immediately, to ensure that an asBroadcast stream is started. |
| var sub = newStream.listen((_) {}); |
| int i = 0; |
| // Non-overlapping ranges means the test must not remember its first |
| // failure. |
| var expect1 = 6; |
| var expect2 = 16; |
| var handler2 = expectAsync((v) { |
| expect(v, expect2); |
| expect(v <= 25, isTrue); |
| expect2++; |
| }, count: 9); |
| var handler1 = expectAsync((v) { |
| expect(v, expect1); |
| expect(v <= 15, isTrue); |
| expect1++; |
| }, count: 9); |
| var loop; |
| loop = expectAsync(() { |
| i++; |
| c.add(i); |
| if (i == 5) { |
| scheduleMicrotask(() { |
| newStream.listen(handler1); |
| }); |
| } |
| if (i == 15) { |
| scheduleMicrotask(() { |
| newStream.listen(handler2); |
| }); |
| } |
| if (i < 25) { |
| scheduleMicrotask(loop); |
| } else { |
| sub.cancel(); |
| c.close(); |
| } |
| }, count: 25); |
| scheduleMicrotask(loop); |
| }); |
| } |