blob: 9460dec594a490cc1208acc3dc3f54804ba6676f [file] [log] [blame]
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import "dart:async";
import "package:async/async.dart" show SubscriptionStream;
import "package:test/test.dart";
import "utils.dart";
main() {
test("subscription stream of an entire subscription", () async {
var stream = createStream();
var subscription = stream.listen(null);
var subscriptionStream = SubscriptionStream<int>(subscription);
await flushMicrotasks();
expect(subscriptionStream.toList(), completion([1, 2, 3, 4]));
});
test("subscription stream after two events", () async {
var stream = createStream();
var skips = 0;
var completer = Completer();
StreamSubscription<int> subscription;
subscription = stream.listen((value) {
++skips;
expect(value, skips);
if (skips == 2) {
completer.complete(SubscriptionStream<int>(subscription));
}
});
var subscriptionStream = await completer.future;
await flushMicrotasks();
expect(subscriptionStream.toList(), completion([3, 4]));
});
test("listening twice fails", () async {
var stream = createStream();
var sourceSubscription = stream.listen(null);
var subscriptionStream = SubscriptionStream<int>(sourceSubscription);
var subscription = subscriptionStream.listen(null);
expect(() => subscriptionStream.listen(null), throwsA(anything));
await subscription.cancel();
});
test("pause and cancel passed through to original stream", () async {
var controller = StreamController(onCancel: () async => 42);
var sourceSubscription = controller.stream.listen(null);
var subscriptionStream = SubscriptionStream(sourceSubscription);
expect(controller.isPaused, isTrue);
dynamic lastEvent;
var subscription = subscriptionStream.listen((value) {
lastEvent = value;
});
controller.add(1);
await flushMicrotasks();
expect(lastEvent, 1);
expect(controller.isPaused, isFalse);
subscription.pause();
expect(controller.isPaused, isTrue);
subscription.resume();
expect(controller.isPaused, isFalse);
expect(await subscription.cancel(), 42);
expect(controller.hasListener, isFalse);
});
group("cancelOnError source:", () {
for (var sourceCancels in [false, true]) {
group("${sourceCancels ? "yes" : "no"}:", () {
SubscriptionStream subscriptionStream;
Future onCancel; // Completes if source stream is canceled before done.
setUp(() {
var cancelCompleter = Completer();
var source = createErrorStream(cancelCompleter);
onCancel = cancelCompleter.future;
var sourceSubscription =
source.listen(null, cancelOnError: sourceCancels);
subscriptionStream = SubscriptionStream<int>(sourceSubscription);
});
test("- subscriptionStream: no", () async {
var done = Completer();
var events = [];
subscriptionStream.listen(events.add,
onError: events.add, onDone: done.complete, cancelOnError: false);
var expected = [1, 2, "To err is divine!"];
if (sourceCancels) {
await onCancel;
// And [done] won't complete at all.
bool isDone = false;
done.future.then((_) {
isDone = true;
});
await Future.delayed(const Duration(milliseconds: 5));
expect(isDone, false);
} else {
expected.add(4);
await done.future;
}
expect(events, expected);
});
test("- subscriptionStream: yes", () async {
var completer = Completer();
var events = [];
subscriptionStream.listen(events.add,
onError: (value) {
events.add(value);
completer.complete();
},
onDone: () => throw "should not happen",
cancelOnError: true);
await completer.future;
await flushMicrotasks();
expect(events, [1, 2, "To err is divine!"]);
});
});
}
for (var cancelOnError in [false, true]) {
group(cancelOnError ? "yes" : "no", () {
test("- no error, value goes to asFuture", () async {
var stream = createStream();
var sourceSubscription =
stream.listen(null, cancelOnError: cancelOnError);
var subscriptionStream = SubscriptionStream(sourceSubscription);
var subscription =
subscriptionStream.listen(null, cancelOnError: cancelOnError);
expect(subscription.asFuture(42), completion(42));
});
test("- error goes to asFuture", () async {
var stream = createErrorStream();
var sourceSubscription =
stream.listen(null, cancelOnError: cancelOnError);
var subscriptionStream = SubscriptionStream(sourceSubscription);
var subscription =
subscriptionStream.listen(null, cancelOnError: cancelOnError);
expect(subscription.asFuture(), throwsA(anything));
});
});
}
});
}
Stream<int> createStream() async* {
yield 1;
await flushMicrotasks();
yield 2;
await flushMicrotasks();
yield 3;
await flushMicrotasks();
yield 4;
}
Stream<int> createErrorStream([Completer onCancel]) async* {
bool canceled = true;
try {
yield 1;
await flushMicrotasks();
yield 2;
await flushMicrotasks();
yield* Future<int>.error("To err is divine!").asStream();
await flushMicrotasks();
yield 4;
await flushMicrotasks();
canceled = false;
} finally {
// Completes before the "done", but should be after all events.
if (canceled && onCancel != null) {
await flushMicrotasks();
onCancel.complete();
}
}
}
Stream<int> createLongStream() async* {
for (int i = 0; i < 200; i++) yield i;
}