blob: 980f8c7c4d8ace14311535336169e700b71df995 [file] [log] [blame]
// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart = 2.9
// Test that `await for` and `async*` interact correctly.
// An `await for` must pause its subscription immediately
// if the `await for` body does anything asynchronous
// (any `await`, `await for`, or pausing at a `yield`/`yield*`)
// A pause happening synchronously in an event delivery
// must pause the `sync*` method at the `yield` sending the event.
// A break happening synchronously in an event delivery,
// or while paused at a `yield`, must exit at that `yield`.
import "dart:async";
import "package:expect/expect.dart";
import "package:async_helper/async_helper.dart";
Stream<int> stream(List<String> log) async* {
log.add("^");
try {
log.add("?1");
yield 1;
log.add("?2");
yield 2;
log.add("?3");
yield 3;
} finally {
log.add(r"$");
}
}
Stream<int> consume(List<String> log,
{int breakAt = -1,
int yieldAt = -1,
int yieldStarAt = -1,
int pauseAt = -1}) async* {
// Create stream.
var s = stream(log);
log.add("(");
// The "consume loop".
await for (var event in s) {
// Should be acting synchronously wrt. the delivery of the event.
// The source stream should be at the yield now.
log.add("!$event");
if (event == pauseAt) {
log.add("p$event[");
// Async operation causes subscription to pause.
// Nothing should happen in the source stream
// until the end of the loop body where the subscription is resumed.
await Future.delayed(Duration(microseconds: 1));
log.add("]");
}
if (event == yieldAt) {
log.add("y$event[");
// Yield may cause subscription to pause or cancel.
// This loop should stay at the yield until the event has been delieverd.
// If the receiver pauses or cancels, we delay or break the loop here.
yield event;
log.add("]");
}
if (event == yieldStarAt) {
log.add("Y$event[");
// Yield* will always cause the subscription for this loop to pause.
// If the listener pauses, this stream is paused. If the listener cancels,
// this stream is cancelled, and the yield* acts like return, cancelling
// the loop subscription and waiting for the cancel future.
yield* Stream<int>.fromIterable([event]);
log.add("]");
}
if (event == breakAt) {
log.add("b$event");
// Breaks the loop. This cancels the loop subscription and waits for the
// cancel future.
break;
}
}
// Done event from stream or cancel future has completed.
log.add(")");
}
main() async {
asyncStart();
// Just run the loop over the stream. The consume stream emits no events.
{
var log = <String>[];
await for (var _ in consume(log)) {
throw "unreachable";
}
await Future.delayed(Duration(milliseconds: 1));
var trace = log.join("");
Expects.equals(r"(^?1!1?2!2?3!3$)", trace, "straight through");
}
// Pause at 1, then resume.
// Consume loop forces a pause when it receives the 1 event.
// Nothing should happen until that pause is resumed.
{
var log = <String>[];
await for (var _ in consume(log, pauseAt: 1)) {
throw "unreachable";
}
await Future.delayed(Duration(milliseconds: 10));
var trace = log.join("");
String message = "pause at 1";
if (trace.contains("p1[?2")) {
message += " (did not pause in time)";
}
Expects.equals(r"(^?1!1p1[]?2!2?3!3$)", trace, message);
}
// Break at 1.
// Consume loop breaks after receiving the 1 event.
// The consume stream emits no events.
{
var log = <String>[];
await for (var _ in consume(log, breakAt: 1)) {
throw "unreachable";
}
await Future.delayed(Duration(milliseconds: 10));
var trace = log.join("");
String message = "break at 1";
if (trace.contains("b1?2")) {
message += " (did not cancel in time)";
}
Expects.equals(r"(^?1!1b1$)", trace, message);
}
// Pause then break at 1.
// Consume loop pauses after receiving the 1 event,
// then breaks before resuming. It should still be at the yield.
// The consume stream emits no events.
{
var log = <String>[];
await for (var _ in consume(log, pauseAt: 1, breakAt: 1)) {
throw "unreachable";
}
await Future.delayed(Duration(milliseconds: 10));
var trace = log.join("");
String message = "pause then break at 1";
if (trace.contains("p1[?2")) {
message += " (did not pause in time)";
}
if (trace.contains("b1?2")) {
message += " (did not cancel in time)";
}
Expects.equals(r"(^?1!1p1[]b1$)", trace, message);
}
// Yield at 1.
// The consume loop re-emits the 1 event.
// The test loop should receive that event while the consume loop is still
// at the yield statement.
// The consume loop may or may not pause, it should make no difference.
{
var log = <String>[];
await for (var s in consume(log, yieldAt: 1)) {
log.add("e$s");
}
await Future.delayed(Duration(milliseconds: 10));
var trace = log.join("");
String message = "yield at 1";
if (trace.contains("y1[?2")) {
message += " (did not wait for delivery)";
}
Expects.equals(r"(^?1!1y1[e1]?2!2?3!3$)", trace, message);
}
// Yield at 1, then pause at yield.
// The consume loop re-emits the 1 event.
// The test loop should receive that event while the consume loop is still
// at the yield statement.
// The test loop then pauses.
// Nothing should happen in either the original yield
// or the consume-function yield until the test loop ends.
{
var log = <String>[];
await for (var s in consume(log, yieldAt: 1)) {
log.add("e$s<");
// Force pause at yield.
await Future.delayed(Duration(milliseconds: 1));
log.add(">");
}
await Future.delayed(Duration(milliseconds: 10));
var trace = log.join("");
String message = "yield at 1, pause at yield";
if (trace.contains("y1[?2")) {
message += " (did not wait for delivery)";
}
if (trace.contains("e1<?2")) {
message += " (did not pause in time)";
}
Expects.equals(r"(^?1!1y1[e1<>]?2!2?3!3$)", trace, message);
}
// Yield at 1, then break at yield.
// The consume loop re-emits the 1 event.
// The test loop should receive that event while the consume loop is still
// at the yield statement.
// The test loop then breaks. That makes the consume loop yield return,
// breaking the consume loop, which makes the source yield return.
{
var log = <String>[];
await for (var s in consume(log, yieldAt: 1)) {
log.add("e${s}B$s");
break; // Force break at yield*.
}
await Future.delayed(Duration(milliseconds: 10));
var trace = log.join("");
String message = "yield at 1, break at yield";
if (trace.contains("y1[?2")) {
message += " (did not wait for delivery)";
}
if (trace.contains("B1?2")) {
message += " (did not break in time)";
}
Expects.equals(r"(^?1!1y1[e1B1$)", trace, message);
}
// Yield* at 1.
// The consume loop re-emits a stream containing the 1 event.
// The test loop should receive that event before the consume loop
// continues from the `yield*`, which again happens before the source
// stream continues from its `yield`.
{
var log = <String>[];
await for (var s in consume(log, yieldStarAt: 1)) {
log.add("e$s");
}
await Future.delayed(Duration(milliseconds: 10));
var trace = log.join("");
String message = "yield* at 1";
if (trace.contains("Y1[?2")) {
message += " (did not wait for delivery)";
}
Expects.equals(r"(^?1!1Y1[e1]?2!2?3!3$)", trace, message);
}
// Yield* at 1, pause at yield.
// The consume loop re-emits a stream containing the 1 event.
// The test loop should receive that event before the consume loop
// continues from the `yield*`. The test loop then force a pause.
// Nothing further should happen during that pause.
{
var log = <String>[];
await for (var s in consume(log, yieldStarAt: 1)) {
log.add("e$s<");
await Future.delayed(Duration(milliseconds: 1)); // force pause.
log.add(">");
}
await Future.delayed(Duration(milliseconds: 10));
var trace = log.join("");
String message = "yield* then pause at 1";
if (trace.contains("Y1[?2")) {
message += " (did not wait for delivery)";
}
if (trace.contains("e1<?2")) {
message += " (did not pause in time)";
}
Expects.equals(r"(^?1!1Y1[e1<>]?2!2?3!3$)", trace, message);
}
// Yield* at 1, then break at 1.
// The consume loop re-emits a stream containing the 1 event.
// The test loop should receive that event before the consume loop
// continues from the `yield*`.
// When the consume loop continues, it breaks,
// forcing the waiting source yield to return.
{
var log = <String>[];
await for (var s in consume(log, yieldStarAt: 1, breakAt: 1)) {
log.add("e$s");
}
await Future.delayed(Duration(milliseconds: 10));
var trace = log.join("");
String message = "yield* then pause at 1";
if (trace.contains("Y1[?2")) {
message += " (did not wait for delivery)";
}
Expects.equals(r"(^?1!1Y1[e1]b1$)", trace, message);
}
// Yield* at 1, pause at yield, then break at 1.
// The consume loop re-emits a stream containing the 1 event.
// The test loop should receive that event before the consume loop
// continues from the `yield*`. After the `yield*`, the consume loop breaks.
// This forces the waiting source yield to return.
{
var log = <String>[];
await for (var s in consume(log, yieldStarAt: 1, breakAt: 1)) {
log.add("e$s<");
await Future.delayed(Duration(milliseconds: 1)); // force pause.
log.add(">");
}
await Future.delayed(Duration(milliseconds: 10));
var trace = log.join("");
String message = "yield* then pause at 1";
Expects.equals(r"(^?1!1Y1[e1<>]b1$)", trace, message);
}
// Yield* at 1, break at yield.
// The consume loop re-emits a stream containing the 1 event.
// The test loop should receive that event before the consume loop
// continues from the `yield*`. The test loop then breaks,
// forcing the two waiting yields to return.
{
var log = <String>[];
await for (var s in consume(log, yieldStarAt: 1)) {
log.add("e${s}B$s");
break;
}
await Future.delayed(Duration(milliseconds: 10));
var trace = log.join("");
String message = "yield* then break at 1";
if (trace.contains("Y1[?2")) {
message += " (did not deliver event in time)";
}
if (trace.contains("e1?2")) {
message += " (did not cancel in time)";
}
Expects.equals(r"(^?1!1Y1[e1B1$)", trace, message);
}
// Yield* at 1, pause at yield, then break at yield.
// The consume loop re-emits a stream containing the 1 event.
// The test loop should receive that event before the consume loop
// continues from the `yield*`. The test loop then forces a pause,
// and then breaks before that pause is resumed.
// This forces the two waiting yields to return.
{
var log = <String>[];
await for (var s in consume(log, yieldStarAt: 1)) {
log.add("e$s<");
await Future.delayed(Duration(milliseconds: 1)); // force pause.
log.add(">B$s");
break; // And break.
}
await Future.delayed(Duration(milliseconds: 10));
var trace = log.join("");
String message = "yield* then pause then break at 1";
Expects.equals(r"(^?1!1Y1[e1<>B1$)", trace, message);
}
Expects.summarize();
asyncEnd();
}
class Expects {
static var _errors = [];
static int _tests = 0;
static void summarize() {
if (_errors.isNotEmpty) {
var buffer = StringBuffer();
for (var es in _errors) {
buffer.writeln("FAILURE:");
buffer.writeln(es[0]); // error
buffer.writeln(es[1]); // stack trace
}
;
buffer.writeln("Expectations failed: ${_errors.length}"
", succeeded: ${_tests - _errors.length}");
throw ExpectException(buffer.toString());
}
}
static void equals(o1, o2, String message) {
_tests++;
try {
Expect.equals(o1, o2, message);
} on ExpectException catch (e) {
var stack = StackTrace.current;
_errors.add([e, stack]);
}
}
}