| // Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| // Test the basic StreamController and StreamController.broadcast. |
| library stream_controller_async_test; |
| |
| import 'dart:async'; |
| import "package:expect/expect.dart"; |
| import 'package:unittest/unittest.dart'; |
| import 'event_helper.dart'; |
| import 'stream_state_helper.dart'; |
| |
| void cancelSub(StreamSubscription sub) { sub.cancel(); } |
| |
| testController() { |
| // Test fold |
| test("StreamController.fold", () { |
| StreamController c = new StreamController(); |
| Stream stream = c.stream.asBroadcastStream(onCancel: cancelSub); |
| stream.fold(0, (a,b) => a + b) |
| .then(expectAsync((int v) { |
| Expect.equals(42, v); |
| })); |
| c.add(10); |
| c.add(32); |
| c.close(); |
| }); |
| |
| test("StreamController.fold throws", () { |
| StreamController c = new StreamController(); |
| Stream stream = c.stream.asBroadcastStream(onCancel: cancelSub); |
| stream.fold(0, (a,b) { throw "Fnyf!"; }) |
| .catchError(expectAsync((error) { Expect.equals("Fnyf!", error); })); |
| c.add(42); |
| }); |
| } |
| |
| testSingleController() { |
| test("Single-subscription StreamController.fold", () { |
| StreamController c = new StreamController(); |
| Stream stream = c.stream; |
| stream.fold(0, (a,b) => a + b) |
| .then(expectAsync((int v) { Expect.equals(42, v); })); |
| c.add(10); |
| c.add(32); |
| c.close(); |
| }); |
| |
| test("Single-subscription StreamController.fold throws", () { |
| StreamController c = new StreamController(); |
| Stream stream = c.stream; |
| stream.fold(0, (a,b) { throw "Fnyf!"; }) |
| .catchError(expectAsync((e) { Expect.equals("Fnyf!", e); })); |
| c.add(42); |
| }); |
| |
| test("Single-subscription StreamController events are buffered when" |
| " there is no subscriber", |
| () { |
| StreamController c = new StreamController(); |
| EventSink sink = c.sink; |
| Stream stream = c.stream; |
| int counter = 0; |
| sink.add(1); |
| sink.add(2); |
| sink.close(); |
| stream.listen( |
| (data) { |
| counter += data; |
| }, |
| onDone: expectAsync(() { |
| Expect.equals(3, counter); |
| })); |
| }); |
| } |
| |
| testExtraMethods() { |
| Events sentEvents = new Events()..add(7)..add(9)..add(13)..add(87)..close(); |
| |
| test("forEach", () { |
| StreamController c = new StreamController(); |
| Events actualEvents = new Events(); |
| Future f = c.stream.forEach(actualEvents.add); |
| f.then(expectAsync((_) { |
| actualEvents.close(); |
| Expect.listEquals(sentEvents.events, actualEvents.events); |
| })); |
| sentEvents.replay(c); |
| }); |
| |
| test("forEachError", () { |
| Events sentEvents = new Events()..add(7)..error("bad")..add(87)..close(); |
| StreamController c = new StreamController(); |
| Events actualEvents = new Events(); |
| Future f = c.stream.forEach(actualEvents.add); |
| f.catchError(expectAsync((error) { |
| Expect.equals("bad", error); |
| Expect.listEquals((new Events()..add(7)).events, actualEvents.events); |
| })); |
| sentEvents.replay(c); |
| }); |
| |
| test("forEachError2", () { |
| Events sentEvents = new Events()..add(7)..add(9)..add(87)..close(); |
| StreamController c = new StreamController(); |
| Events actualEvents = new Events(); |
| Future f = c.stream.forEach((x) { |
| if (x == 9) throw "bad"; |
| actualEvents.add(x); |
| }); |
| f.catchError(expectAsync((error) { |
| Expect.equals("bad", error); |
| Expect.listEquals((new Events()..add(7)).events, actualEvents.events); |
| })); |
| sentEvents.replay(c); |
| }); |
| |
| test("firstWhere", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.firstWhere((x) => (x % 3) == 0); |
| f.then(expectAsync((v) { Expect.equals(9, v); })); |
| sentEvents.replay(c); |
| }); |
| |
| test("firstWhere 2", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.firstWhere((x) => (x % 4) == 0); |
| f.catchError(expectAsync((e) {})); |
| sentEvents.replay(c); |
| }); |
| |
| test("firstWhere 3", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.firstWhere((x) => (x % 4) == 0, defaultValue: () => 999); |
| f.then(expectAsync((v) { Expect.equals(999, v); })); |
| sentEvents.replay(c); |
| }); |
| |
| |
| test("lastWhere", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.lastWhere((x) => (x % 3) == 0); |
| f.then(expectAsync((v) { Expect.equals(87, v); })); |
| sentEvents.replay(c); |
| }); |
| |
| test("lastWhere 2", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.lastWhere((x) => (x % 4) == 0); |
| f.catchError(expectAsync((e) {})); |
| sentEvents.replay(c); |
| }); |
| |
| test("lastWhere 3", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.lastWhere((x) => (x % 4) == 0, defaultValue: () => 999); |
| f.then(expectAsync((v) { Expect.equals(999, v); })); |
| sentEvents.replay(c); |
| }); |
| |
| test("singleWhere", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.singleWhere((x) => (x % 9) == 0); |
| f.then(expectAsync((v) { Expect.equals(9, v); })); |
| sentEvents.replay(c); |
| }); |
| |
| test("singleWhere 2", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.singleWhere((x) => (x % 3) == 0); // Matches 9 and 87.. |
| f.catchError(expectAsync((error) { Expect.isTrue(error is StateError); })); |
| sentEvents.replay(c); |
| }); |
| |
| test("first", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.first; |
| f.then(expectAsync((v) { Expect.equals(7, v);})); |
| sentEvents.replay(c); |
| }); |
| |
| test("first empty", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.first; |
| f.catchError(expectAsync((error) { Expect.isTrue(error is StateError); })); |
| Events emptyEvents = new Events()..close(); |
| emptyEvents.replay(c); |
| }); |
| |
| test("first error", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.first; |
| f.catchError(expectAsync((error) { Expect.equals("error", error); })); |
| Events errorEvents = new Events()..error("error")..close(); |
| errorEvents.replay(c); |
| }); |
| |
| test("first error 2", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.first; |
| f.catchError(expectAsync((error) { Expect.equals("error", error); })); |
| Events errorEvents = new Events()..error("error")..error("error2")..close(); |
| errorEvents.replay(c); |
| }); |
| |
| test("last", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.last; |
| f.then(expectAsync((v) { Expect.equals(87, v);})); |
| sentEvents.replay(c); |
| }); |
| |
| test("last empty", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.last; |
| f.catchError(expectAsync((error) { Expect.isTrue(error is StateError); })); |
| Events emptyEvents = new Events()..close(); |
| emptyEvents.replay(c); |
| }); |
| |
| test("last error", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.last; |
| f.catchError(expectAsync((error) { Expect.equals("error", error); })); |
| Events errorEvents = new Events()..error("error")..close(); |
| errorEvents.replay(c); |
| }); |
| |
| test("last error 2", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.last; |
| f.catchError(expectAsync((error) { Expect.equals("error", error); })); |
| Events errorEvents = new Events()..error("error")..error("error2")..close(); |
| errorEvents.replay(c); |
| }); |
| |
| test("elementAt", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.elementAt(2); |
| f.then(expectAsync((v) { Expect.equals(13, v);})); |
| sentEvents.replay(c); |
| }); |
| |
| test("elementAt 2", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.elementAt(20); |
| f.catchError(expectAsync((error) { Expect.isTrue(error is RangeError); })); |
| sentEvents.replay(c); |
| }); |
| |
| test("drain", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.drain(); |
| f.then(expectAsync((v) { Expect.equals(null, v);})); |
| sentEvents.replay(c); |
| }); |
| |
| test("drain error", () { |
| StreamController c = new StreamController(); |
| Future f = c.stream.drain(); |
| f.catchError(expectAsync((error) { Expect.equals("error", error); })); |
| Events errorEvents = new Events()..error("error")..error("error2")..close(); |
| errorEvents.replay(c); |
| }); |
| |
| } |
| |
| testPause() { |
| test("pause event-unpause", () { |
| |
| StreamProtocolTest test = new StreamProtocolTest(); |
| Completer completer = new Completer(); |
| test..expectListen() |
| ..expectData(42, () { test.pause(completer.future); }) |
| ..expectPause(() { |
| completer.complete(null); |
| }) |
| ..expectData(43) |
| ..expectData(44) |
| ..expectCancel() |
| ..expectDone(test.terminate); |
| test.listen(); |
| test.add(42); |
| test.add(43); |
| test.add(44); |
| test.close(); |
| }); |
| |
| test("pause twice event-unpause", () { |
| StreamProtocolTest test = new StreamProtocolTest(); |
| Completer completer = new Completer(); |
| Completer completer2 = new Completer(); |
| test..expectListen() |
| ..expectData(42, () { |
| test.pause(completer.future); |
| test.pause(completer2.future); |
| }) |
| ..expectPause(() { |
| completer.future.then(completer2.complete); |
| completer.complete(null); |
| }) |
| ..expectData(43) |
| ..expectData(44) |
| ..expectCancel() |
| ..expectDone(test.terminate); |
| test..listen() |
| ..add(42) |
| ..add(43) |
| ..add(44) |
| ..close(); |
| }); |
| |
| test("pause twice direct-unpause", () { |
| StreamProtocolTest test = new StreamProtocolTest(); |
| test..expectListen() |
| ..expectData(42, () { |
| test.pause(); |
| test.pause(); |
| }) |
| ..expectPause(() { |
| test.resume(); |
| test.resume(); |
| }) |
| ..expectData(43) |
| ..expectData(44) |
| ..expectCancel() |
| ..expectDone(test.terminate); |
| test..listen() |
| ..add(42) |
| ..add(43) |
| ..add(44) |
| ..close(); |
| }); |
| |
| test("pause twice direct-event-unpause", () { |
| StreamProtocolTest test = new StreamProtocolTest(); |
| Completer completer = new Completer(); |
| test..expectListen() |
| ..expectData(42, () { |
| test.pause(); |
| test.pause(completer.future); |
| test.add(43); |
| test.add(44); |
| test.close(); |
| }) |
| ..expectPause(() { |
| completer.future.then((v) => test.resume()); |
| completer.complete(null); |
| }) |
| ..expectData(43) |
| ..expectData(44) |
| ..expectCancel() |
| ..expectDone(test.terminate); |
| test..listen() |
| ..add(42); |
| }); |
| } |
| |
| class TestError { const TestError(); } |
| |
| testRethrow() { |
| TestError error = const TestError(); |
| |
| testStream(name, streamValueTransform) { |
| test("rethrow-$name-value", () { |
| StreamController c = new StreamController(); |
| Stream s = streamValueTransform(c.stream, (v) { throw error; }); |
| s.listen((_) { Expect.fail("unexpected value"); }, onError: expectAsync( |
| (e) { Expect.identical(error, e); })); |
| c.add(null); |
| c.close(); |
| }); |
| } |
| |
| testStreamError(name, streamErrorTransform) { |
| test("rethrow-$name-error", () { |
| StreamController c = new StreamController(); |
| Stream s = streamErrorTransform(c.stream, (e) { throw error; }); |
| s.listen((_) { Expect.fail("unexpected value"); }, onError: expectAsync( |
| (e) { Expect.identical(error, e); })); |
| c.addError("SOME ERROR"); |
| c.close(); |
| }); |
| } |
| |
| testFuture(name, streamValueTransform) { |
| test("rethrow-$name-value", () { |
| StreamController c = new StreamController(); |
| Future f = streamValueTransform(c.stream, (v) { throw error; }); |
| f.then((v) { Expect.fail("unreachable"); }, |
| onError: expectAsync((e) { Expect.identical(error, e); })); |
| // Need two values to trigger compare for reduce. |
| c.add(0); |
| c.add(1); |
| c.close(); |
| }); |
| } |
| |
| testStream("where", (s, act) => s.where(act)); |
| testStream("map", (s, act) => s.map(act)); |
| testStream("expand", (s, act) => s.expand(act)); |
| testStream("where", (s, act) => s.where(act)); |
| testStreamError("handleError", (s, act) => s.handleError(act)); |
| testStreamError("handleTest", (s, act) => s.handleError((v) {}, test: act)); |
| testFuture("forEach", (s, act) => s.forEach(act)); |
| testFuture("every", (s, act) => s.every(act)); |
| testFuture("any", (s, act) => s.any(act)); |
| testFuture("reduce", (s, act) => s.reduce((a,b) => act(b))); |
| testFuture("fold", (s, act) => s.fold(0, (a,b) => act(b))); |
| testFuture("drain", (s, act) => s.drain().then(act)); |
| } |
| |
| void testBroadcastController() { |
| test("broadcast-controller-basic", () { |
| StreamProtocolTest test = new StreamProtocolTest.broadcast(); |
| test..expectListen() |
| ..expectData(42) |
| ..expectCancel() |
| ..expectDone(test.terminate); |
| test..listen() |
| ..add(42) |
| ..close(); |
| }); |
| |
| test("broadcast-controller-listen-twice", () { |
| StreamProtocolTest test = new StreamProtocolTest.broadcast(); |
| test..expectListen() |
| ..expectData(42, () { |
| test.listen(); |
| test.add(37); |
| test.close(); |
| }) |
| // Order is not guaranteed between subscriptions if not sync. |
| ..expectData(37) |
| ..expectData(37) |
| ..expectDone() |
| ..expectCancel() |
| ..expectDone(test.terminate); |
| test.listen(); |
| test.add(42); |
| }); |
| |
| test("broadcast-controller-listen-twice-non-overlap", () { |
| StreamProtocolTest test = new StreamProtocolTest.broadcast(); |
| test |
| ..expectListen(() { |
| test.add(42); |
| }) |
| ..expectData(42, () { |
| test.cancel(); |
| }) |
| ..expectCancel(() { |
| test.listen(); |
| })..expectListen(() { |
| test.add(37); |
| }) |
| ..expectData(37, () { |
| test.close(); |
| }) |
| ..expectCancel() |
| ..expectDone(test.terminate); |
| test.listen(); |
| }); |
| |
| test("broadcast-controller-individual-pause", () { |
| StreamProtocolTest test = new StreamProtocolTest.broadcast(); |
| var sub1; |
| test..expectListen() |
| ..expectData(42) |
| ..expectData(42, () { sub1.pause(); }) |
| ..expectData(43, () { |
| sub1.cancel(); |
| test.listen(); |
| test.add(44); |
| test.expectData(44); |
| test.expectData(44, test.terminate); |
| }); |
| sub1 = test.listen(); |
| test.listen(); |
| test.add(42); |
| test.add(43); |
| }); |
| |
| test("broadcast-controller-add-in-callback", () { |
| StreamProtocolTest test = new StreamProtocolTest.broadcast(); |
| test.expectListen(); |
| var sub = test.listen(); |
| test.add(42); |
| sub.expectData(42, () { |
| test.add(87); |
| sub.cancel(); |
| }); |
| test.expectCancel(() { |
| test.add(37); |
| test.terminate(); |
| }); |
| }); |
| } |
| |
| void testAsBroadcast() { |
| test("asBroadcast-not-canceled", () { |
| StreamProtocolTest test = new StreamProtocolTest.asBroadcast(); |
| var sub; |
| test..expectListen() |
| ..expectBroadcastListen((_) { |
| test.add(42); |
| }) |
| ..expectData(42, () { |
| sub.cancel(); |
| }) |
| ..expectBroadcastCancel((_) { |
| sub = test.listen(); |
| }) |
| ..expectBroadcastListen((_) { |
| test.terminate(); |
| }); |
| sub = test.listen(); |
| }); |
| |
| test("asBroadcast-canceled", () { |
| StreamProtocolTest test = new StreamProtocolTest.asBroadcast(); |
| var sub; |
| test..expectListen() |
| ..expectBroadcastListen((_) { |
| test.add(42); |
| }) |
| ..expectData(42, () { |
| sub.cancel(); |
| }) |
| ..expectBroadcastCancel((originalSub) { |
| originalSub.cancel(); |
| }) |
| ..expectCancel(test.terminate); |
| sub = test.listen(); |
| }); |
| |
| test("asBroadcast-pause-original", () { |
| StreamProtocolTest test = new StreamProtocolTest.asBroadcast(); |
| var sub; |
| test..expectListen() |
| ..expectBroadcastListen((_) { |
| test.add(42); |
| test.add(43); |
| }) |
| ..expectData(42, () { |
| sub.cancel(); |
| }) |
| ..expectBroadcastCancel((originalSub) { |
| originalSub.pause(); // Pause before sending 43 from original sub. |
| }) |
| ..expectPause(() { |
| sub = test.listen(); |
| }) |
| ..expectBroadcastListen((originalSub) { |
| originalSub.resume(); |
| }) |
| ..expectData(43) |
| ..expectResume(() { |
| test.close(); |
| }) |
| ..expectCancel() |
| ..expectDone() |
| ..expectBroadcastCancel((_) => test.terminate()); |
| sub = test.listen(); |
| }); |
| } |
| |
| void testSink({bool sync, bool broadcast, bool asBroadcast}) { |
| String type = "${sync?"S":"A"}${broadcast?"B":"S"}${asBroadcast?"aB":""}"; |
| test("$type-controller-sink", () { |
| var done = expectAsync((){}); |
| var c = broadcast ? new StreamController.broadcast(sync: sync) |
| : new StreamController(sync: sync); |
| var expected = new Events() |
| ..add(42)..error("error") |
| ..add(1)..add(2)..add(3)..add(4)..add(5) |
| ..add(43)..close(); |
| var actual = new Events.capture(asBroadcast ? c.stream.asBroadcastStream() |
| : c.stream); |
| var sink = c.sink; |
| sink.add(42); |
| sink.addError("error"); |
| sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5])) |
| .then((_) { |
| sink.add(43); |
| return sink.close(); |
| }) |
| .then((_) { |
| Expect.listEquals(expected.events, actual.events); |
| done(); |
| }); |
| }); |
| |
| test("$type-controller-sink-canceled", () { |
| var done = expectAsync((){}); |
| var c = broadcast ? new StreamController.broadcast(sync: sync) |
| : new StreamController(sync: sync); |
| var expected = new Events() |
| ..add(42)..error("error") |
| ..add(1)..add(2)..add(3); |
| var stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream; |
| var actual = new Events(); |
| var sub; |
| // Cancel subscription after receiving "3" event. |
| sub = stream.listen((v) { |
| if (v == 3) sub.cancel(); |
| actual.add(v); |
| }, onError: actual.error); |
| var sink = c.sink; |
| sink.add(42); |
| sink.addError("error"); |
| sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5])) |
| .then((_) { |
| Expect.listEquals(expected.events, actual.events); |
| // Close controller as well. It has no listener. If it is a broadcast |
| // stream, it will still be open, and we read the "done" future before |
| // closing. A normal stream is already done when its listener cancels. |
| Future doneFuture = sink.done; |
| sink.close(); |
| return doneFuture; |
| }) |
| .then((_) { |
| // No change in events. |
| Expect.listEquals(expected.events, actual.events); |
| done(); |
| }); |
| }); |
| |
| test("$type-controller-sink-paused", () { |
| var done = expectAsync((){}); |
| var c = broadcast ? new StreamController.broadcast(sync: sync) |
| : new StreamController(sync: sync); |
| var expected = new Events() |
| ..add(42)..error("error") |
| ..add(1)..add(2)..add(3) |
| ..add(4)..add(5)..add(43)..close(); |
| var stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream; |
| var actual = new Events(); |
| var sub; |
| var pauseIsDone = false; |
| sub = stream.listen( |
| (v) { |
| if (v == 3) { |
| sub.pause(new Future.delayed(const Duration(milliseconds: 15), |
| () { pauseIsDone = true; })); |
| } |
| actual.add(v); |
| }, |
| onError: actual.error, |
| onDone: actual.close); |
| var sink = c.sink; |
| sink.add(42); |
| sink.addError("error"); |
| sink.addStream(new Stream.fromIterable([1, 2, 3, 4, 5])) |
| .then((_) { |
| sink.add(43); |
| return sink.close(); |
| }) |
| .then((_) { |
| if (asBroadcast || broadcast) { |
| // The done-future of the sink completes when it passes |
| // the done event to the asBroadcastStream controller, which is |
| // before the final listener gets the event. |
| // Wait for the done event to be *delivered* before testing the |
| // events. |
| actual.onDone(() { |
| Expect.listEquals(expected.events, actual.events); |
| done(); |
| }); |
| } else { |
| Expect.listEquals(expected.events, actual.events); |
| done(); |
| } |
| }); |
| }); |
| |
| test("$type-controller-addstream-error-stop", () { |
| // Check that addStream defaults to ending after the first error. |
| var done = expectAsync((){}); |
| StreamController c = broadcast |
| ? new StreamController.broadcast(sync: sync) |
| : new StreamController(sync: sync); |
| Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream; |
| var actual = new Events.capture(stream); |
| |
| var source = new Events(); |
| source..add(1)..add(2)..error("BAD")..add(3)..error("FAIL")..close(); |
| |
| var expected = new Events()..add(1)..add(2)..error("BAD")..close(); |
| StreamController sourceController = new StreamController(); |
| c.addStream(sourceController.stream).then((_) { |
| c.close().then((_) { |
| Expect.listEquals(expected.events, actual.events); |
| done(); |
| }); |
| }); |
| |
| source.replay(sourceController); |
| }); |
| |
| test("$type-controller-addstream-error-forward", () { |
| // Check that addStream with cancelOnError:false passes all data and errors |
| // to the controller. |
| var done = expectAsync((){}); |
| StreamController c = broadcast |
| ? new StreamController.broadcast(sync: sync) |
| : new StreamController(sync: sync); |
| Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream; |
| var actual = new Events.capture(stream); |
| |
| var source = new Events(); |
| source..add(1)..add(2)..addError("BAD")..add(3)..addError("FAIL")..close(); |
| |
| StreamController sourceController = new StreamController(); |
| c.addStream(sourceController.stream, cancelOnError: false).then((_) { |
| c.close().then((_) { |
| Expect.listEquals(source.events, actual.events); |
| done(); |
| }); |
| }); |
| |
| source.replay(sourceController); |
| }); |
| |
| test("$type-controller-addstream-twice", () { |
| // Using addStream twice on the same stream |
| var done = expectAsync((){}); |
| StreamController c = broadcast |
| ? new StreamController.broadcast(sync: sync) |
| : new StreamController(sync: sync); |
| Stream stream = asBroadcast ? c.stream.asBroadcastStream() : c.stream; |
| var actual = new Events.capture(stream); |
| |
| // Streams of five events, throws on 3. |
| Stream s1 = new Stream.fromIterable([1,2,3,4,5]) |
| .map((x) => (x == 3 ? throw x : x)); |
| Stream s2 = new Stream.fromIterable([1,2,3,4,5]) |
| .map((x) => (x == 3 ? throw x : x)); |
| |
| Events expected = new Events(); |
| expected..add(1)..add(2)..error(3); |
| expected..add(1)..add(2)..error(3)..add(4)..add(5); |
| expected..close(); |
| |
| c.addStream(s1).then((_) { |
| c.addStream(s2, cancelOnError: false).then((_) { |
| c.close().then((_) { |
| Expect.listEquals(expected.events, actual.events); |
| done(); |
| }); |
| }); |
| }); |
| }); |
| } |
| |
| main() { |
| testController(); |
| testSingleController(); |
| testExtraMethods(); |
| testPause(); |
| testRethrow(); |
| testBroadcastController(); |
| testAsBroadcast(); |
| testSink(sync: true, broadcast: false, asBroadcast: false); |
| testSink(sync: true, broadcast: false, asBroadcast: true); |
| testSink(sync: true, broadcast: true, asBroadcast: false); |
| testSink(sync: false, broadcast: false, asBroadcast: false); |
| testSink(sync: false, broadcast: false, asBroadcast: true); |
| testSink(sync: false, broadcast: true, asBroadcast: false); |
| } |