Reland "[vm] Fix some async* semantics issues: Only run generator if there's active subscription (not paused/cancelled)"

This fixes an issue where VM would run the async* generator after a
`yield` / `yield*` even though the subscription may be paused or
cancelled.

Furthermore this fixes an issue where `StackTrace.current` used
in async* generator crashes VM and/or produces truncated stack
trace.

This fixes the following existing tests that were failing before:

  * co19/Language/Statements/Yield_and_Yield_Each/Yield_Each/execution_async_t08
  * co19/Language/Statements/Yield_and_Yield_Each/Yield_Each/execution_async_t09
  * co19/Language/Statements/Yield_and_Yield_Each/Yield_Each/execution_async_t10
  * language/async_star/async_star_cancel_test
  * language/async_star/pause_test

New in reland: Allow the generator to to cause cancelling it's own consumer.
This addresses the issue of original revert
  -> see https://github.com/flutter/flutter/issues/101514

Issue https://github.com/flutter/flutter/issues/100441
Issue https://github.com/dart-lang/sdk/issues/48695
Issue https://github.com/dart-lang/sdk/issues/34775

TEST=vm/dart{,_2}/causal_stacks/flutter_regress_100441_test

Change-Id: I091b7159d59ea15fc31162b4b6b17260d523d7cb
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/242400
Reviewed-by: Lasse Nielsen <lrn@google.com>
Commit-Queue: Martin Kustermann <kustermann@google.com>
diff --git a/pkg/front_end/testcases/general/async_function.dart.weak.transformed.expect b/pkg/front_end/testcases/general/async_function.dart.weak.transformed.expect
index 373a8d8..aea2b09 100644
--- a/pkg/front_end/testcases/general/async_function.dart.weak.transformed.expect
+++ b/pkg/front_end/testcases/general/async_function.dart.weak.transformed.expect
@@ -121,10 +121,10 @@
             return null;
           else
             [yield] null;
-          if(:controller.{asy::_AsyncStarStreamController::addStream}(self::asyncStarString2()){(asy::Stream<core::String>) → core::bool})
+          :controller.{asy::_AsyncStarStreamController::addStream}(self::asyncStarString2()){(asy::Stream<core::String>) → void};
+          [yield] null;
+          if(_in::unsafeCast<core::bool>(:result_or_exception))
             return null;
-          else
-            [yield] null;
           [yield] let dynamic #t1 = asy::_awaitHelper(self::asyncString(), :async_op_then, :async_op_error) in null;
           if(:controller.{asy::_AsyncStarStreamController::add}(_in::unsafeCast<core::String>(:result_or_exception)){(core::String) → core::bool})
             return null;
diff --git a/pkg/front_end/testcases/general/await_complex.dart.weak.transformed.expect b/pkg/front_end/testcases/general/await_complex.dart.weak.transformed.expect
index d3d1139..7af3671 100644
--- a/pkg/front_end/testcases/general/await_complex.dart.weak.transformed.expect
+++ b/pkg/front_end/testcases/general/await_complex.dart.weak.transformed.expect
@@ -652,10 +652,10 @@
                       #L16:
                       {
                         [yield] let dynamic #t55 = asy::_awaitHelper(func<asy::Stream<core::int>>(self::intStream()){(asy::Stream<core::int>) → FutureOr<asy::Stream<core::int>>}, :async_op_then, :async_op_error) in null;
-                        if(:controller.{asy::_AsyncStarStreamController::addStream}(_in::unsafeCast<asy::Stream<core::int>>(:result_or_exception)){(asy::Stream<core::int>) → core::bool})
+                        :controller.{asy::_AsyncStarStreamController::addStream}(_in::unsafeCast<asy::Stream<core::int>>(:result_or_exception)){(asy::Stream<core::int>) → void};
+                        [yield] null;
+                        if(_in::unsafeCast<core::bool>(:result_or_exception))
                           return null;
-                        else
-                          [yield] null;
                       }
                       return;
                     }
diff --git a/pkg/front_end/testcases/general/statements.dart.weak.transformed.expect b/pkg/front_end/testcases/general/statements.dart.weak.transformed.expect
index 012142b..2db299f 100644
--- a/pkg/front_end/testcases/general/statements.dart.weak.transformed.expect
+++ b/pkg/front_end/testcases/general/statements.dart.weak.transformed.expect
@@ -44,10 +44,10 @@
                       return null;
                     else
                       [yield] null;
-                    if(:controller.{asy::_AsyncStarStreamController::addStream}(x as{TypeError,ForDynamic,ForNonNullableByDefault} asy::Stream<dynamic>){(asy::Stream<dynamic>) → core::bool})
+                    :controller.{asy::_AsyncStarStreamController::addStream}(x as{TypeError,ForDynamic,ForNonNullableByDefault} asy::Stream<dynamic>){(asy::Stream<dynamic>) → void};
+                    [yield] null;
+                    if(_in::unsafeCast<core::bool>(:result_or_exception))
                       return null;
-                    else
-                      [yield] null;
                   }
                 }
                 else
diff --git a/pkg/front_end/testcases/inference/block_bodied_lambdas_async_star.dart.weak.transformed.expect b/pkg/front_end/testcases/inference/block_bodied_lambdas_async_star.dart.weak.transformed.expect
index 2e127fbc..5730a20 100644
--- a/pkg/front_end/testcases/inference/block_bodied_lambdas_async_star.dart.weak.transformed.expect
+++ b/pkg/front_end/testcases/inference/block_bodied_lambdas_async_star.dart.weak.transformed.expect
@@ -2,6 +2,7 @@
 import self as self;
 import "dart:async" as asy;
 import "dart:core" as core;
+import "dart:_internal" as _in;
 
 import "dart:async";
 
@@ -25,10 +26,10 @@
             else
               [yield] null;
             asy::Stream<core::double*>* s;
-            if(:controller.{asy::_AsyncStarStreamController::addStream}(s){(asy::Stream<core::num*>) → core::bool})
+            :controller.{asy::_AsyncStarStreamController::addStream}(s){(asy::Stream<core::num*>) → void};
+            [yield] null;
+            if(_in::unsafeCast<core::bool>(:result_or_exception))
               return null;
-            else
-              [yield] null;
           }
           return;
         }
diff --git a/pkg/front_end/testcases/inference/downwards_inference_yield_yield_star.dart.weak.transformed.expect b/pkg/front_end/testcases/inference/downwards_inference_yield_yield_star.dart.weak.transformed.expect
index 7f9840f..02bca84 100644
--- a/pkg/front_end/testcases/inference/downwards_inference_yield_yield_star.dart.weak.transformed.expect
+++ b/pkg/front_end/testcases/inference/downwards_inference_yield_yield_star.dart.weak.transformed.expect
@@ -29,6 +29,7 @@
 import self as self;
 import "dart:core" as core;
 import "dart:async" as asy;
+import "dart:_internal" as _in;
 
 import "dart:async";
 
@@ -109,18 +110,18 @@
             return null;
           else
             [yield] null;
-          if(:controller.{asy::_AsyncStarStreamController::addStream}(invalid-expression "pkg/front_end/testcases/inference/downwards_inference_yield_yield_star.dart:17:64: Error: A value of type 'List<dynamic>' can't be assigned to a variable of type 'Stream<List<int>>'.
+          :controller.{asy::_AsyncStarStreamController::addStream}(invalid-expression "pkg/front_end/testcases/inference/downwards_inference_yield_yield_star.dart:17:64: Error: A value of type 'List<dynamic>' can't be assigned to a variable of type 'Stream<List<int>>'.
  - 'List' is from 'dart:core'.
  - 'Stream' is from 'dart:async'.
   yield* /*error:YIELD_OF_INVALID_TYPE*/ /*@typeArgs=dynamic*/ [];
-                                                               ^" in core::_GrowableList::•<dynamic>(0) as{TypeError} asy::Stream<core::List<core::int*>*>*){(asy::Stream<core::List<core::int*>*>) → core::bool})
+                                                               ^" in core::_GrowableList::•<dynamic>(0) as{TypeError} asy::Stream<core::List<core::int*>*>*){(asy::Stream<core::List<core::int*>*>) → void};
+          [yield] null;
+          if(_in::unsafeCast<core::bool>(:result_or_exception))
             return null;
-          else
-            [yield] null;
-          if(:controller.{asy::_AsyncStarStreamController::addStream}(self::MyStream::•<core::List<core::int*>*>()){(asy::Stream<core::List<core::int*>*>) → core::bool})
+          :controller.{asy::_AsyncStarStreamController::addStream}(self::MyStream::•<core::List<core::int*>*>()){(asy::Stream<core::List<core::int*>*>) → void};
+          [yield] null;
+          if(_in::unsafeCast<core::bool>(:result_or_exception))
             return null;
-          else
-            [yield] null;
         }
         return;
       }
diff --git a/pkg/front_end/testcases/inference/local_return_and_yield.dart.weak.transformed.expect b/pkg/front_end/testcases/inference/local_return_and_yield.dart.weak.transformed.expect
index fbcd719..251c281 100644
--- a/pkg/front_end/testcases/inference/local_return_and_yield.dart.weak.transformed.expect
+++ b/pkg/front_end/testcases/inference/local_return_and_yield.dart.weak.transformed.expect
@@ -10,6 +10,7 @@
 import self as self;
 import "dart:core" as core;
 import "dart:async" as asy;
+import "dart:_internal" as _in;
 
 import "dart:async";
 
@@ -129,10 +130,10 @@
         try {
           #L3:
           {
-            if(:controller.{asy::_AsyncStarStreamController::addStream}(asy::Stream::fromIterable<(core::int*) →* core::int*>(core::_GrowableList::_literal1<(core::int*) →* core::int*>((core::int* x) → core::int* => x))){(asy::Stream<(core::int*) →* core::int*>) → core::bool})
+            :controller.{asy::_AsyncStarStreamController::addStream}(asy::Stream::fromIterable<(core::int*) →* core::int*>(core::_GrowableList::_literal1<(core::int*) →* core::int*>((core::int* x) → core::int* => x))){(asy::Stream<(core::int*) →* core::int*>) → void};
+            [yield] null;
+            if(_in::unsafeCast<core::bool>(:result_or_exception))
               return null;
-            else
-              [yield] null;
           }
           return;
         }
diff --git a/pkg/front_end/testcases/inference/top_level_return_and_yield.dart.weak.transformed.expect b/pkg/front_end/testcases/inference/top_level_return_and_yield.dart.weak.transformed.expect
index 84ea22c..3d5e979 100644
--- a/pkg/front_end/testcases/inference/top_level_return_and_yield.dart.weak.transformed.expect
+++ b/pkg/front_end/testcases/inference/top_level_return_and_yield.dart.weak.transformed.expect
@@ -10,6 +10,7 @@
 import self as self;
 import "dart:core" as core;
 import "dart:async" as asy;
+import "dart:_internal" as _in;
 
 import "dart:async";
 
@@ -128,10 +129,10 @@
       try {
         #L3:
         {
-          if(:controller.{asy::_AsyncStarStreamController::addStream}(asy::Stream::fromIterable<(core::int*) →* core::int*>(core::_GrowableList::_literal1<(core::int*) →* core::int*>((core::int* x) → core::int* => x))){(asy::Stream<(core::int*) →* core::int*>) → core::bool})
+          :controller.{asy::_AsyncStarStreamController::addStream}(asy::Stream::fromIterable<(core::int*) →* core::int*>(core::_GrowableList::_literal1<(core::int*) →* core::int*>((core::int* x) → core::int* => x))){(asy::Stream<(core::int*) →* core::int*>) → void};
+          [yield] null;
+          if(_in::unsafeCast<core::bool>(:result_or_exception))
             return null;
-          else
-            [yield] null;
         }
         return;
       }
diff --git a/pkg/front_end/testcases/nnbd/issue41437c.dart.strong.transformed.expect b/pkg/front_end/testcases/nnbd/issue41437c.dart.strong.transformed.expect
index cfc0b05..c8058e6 100644
--- a/pkg/front_end/testcases/nnbd/issue41437c.dart.strong.transformed.expect
+++ b/pkg/front_end/testcases/nnbd/issue41437c.dart.strong.transformed.expect
@@ -40,6 +40,7 @@
 import self as self;
 import "dart:async" as asy;
 import "dart:core" as core;
+import "dart:_internal" as _in;
 
 static method getNull() → dynamic
   return null;
@@ -160,13 +161,13 @@
       try {
         #L4:
         {
-          if(:controller.{asy::_AsyncStarStreamController::addStream}(invalid-expression "pkg/front_end/testcases/nnbd/issue41437c.dart:21:10: Error: A value of type 'Stream<dynamic>' can't be assigned to a variable of type 'Stream<bool>'.
+          :controller.{asy::_AsyncStarStreamController::addStream}(invalid-expression "pkg/front_end/testcases/nnbd/issue41437c.dart:21:10: Error: A value of type 'Stream<dynamic>' can't be assigned to a variable of type 'Stream<bool>'.
  - 'Stream' is from 'dart:async'.
   yield* getStreamNull(); // error
-         ^" in self::getStreamNull() as{TypeError,ForNonNullableByDefault} asy::Stream<core::bool>){(asy::Stream<core::bool>) → core::bool})
+         ^" in self::getStreamNull() as{TypeError,ForNonNullableByDefault} asy::Stream<core::bool>){(asy::Stream<core::bool>) → void};
+          [yield] null;
+          if(_in::unsafeCast<core::bool>(:result_or_exception))
             return null;
-          else
-            [yield] null;
         }
         return;
       }
@@ -203,10 +204,10 @@
       try {
         #L5:
         {
-          if(:controller.{asy::_AsyncStarStreamController::addStream}(self::getStreamBool()){(asy::Stream<core::bool>) → core::bool})
+          :controller.{asy::_AsyncStarStreamController::addStream}(self::getStreamBool()){(asy::Stream<core::bool>) → void};
+          [yield] null;
+          if(_in::unsafeCast<core::bool>(:result_or_exception))
             return null;
-          else
-            [yield] null;
         }
         return;
       }
@@ -285,13 +286,13 @@
               try {
                 #L8:
                 {
-                  if(:controller.{asy::_AsyncStarStreamController::addStream}(invalid-expression "pkg/front_end/testcases/nnbd/issue41437c.dart:38:12: Error: A value of type 'Stream<dynamic>' can't be assigned to a variable of type 'Stream<bool>'.
+                  :controller.{asy::_AsyncStarStreamController::addStream}(invalid-expression "pkg/front_end/testcases/nnbd/issue41437c.dart:38:12: Error: A value of type 'Stream<dynamic>' can't be assigned to a variable of type 'Stream<bool>'.
  - 'Stream' is from 'dart:async'.
     yield* getStreamNull(); // error
-           ^" in self::getStreamNull() as{TypeError,ForNonNullableByDefault} asy::Stream<core::bool>){(asy::Stream<core::bool>) → core::bool})
+           ^" in self::getStreamNull() as{TypeError,ForNonNullableByDefault} asy::Stream<core::bool>){(asy::Stream<core::bool>) → void};
+                  [yield] null;
+                  if(_in::unsafeCast<core::bool>(:result_or_exception))
                     return null;
-                  else
-                    [yield] null;
                 }
                 return;
               }
@@ -328,10 +329,10 @@
               try {
                 #L9:
                 {
-                  if(:controller.{asy::_AsyncStarStreamController::addStream}(self::getStreamBool()){(asy::Stream<core::bool>) → core::bool})
+                  :controller.{asy::_AsyncStarStreamController::addStream}(self::getStreamBool()){(asy::Stream<core::bool>) → void};
+                  [yield] null;
+                  if(_in::unsafeCast<core::bool>(:result_or_exception))
                     return null;
-                  else
-                    [yield] null;
                 }
                 return;
               }
@@ -402,10 +403,10 @@
               try {
                 #L11:
                 {
-                  if(:controller.{asy::_AsyncStarStreamController::addStream}(self::getStreamNull()){(asy::Stream<dynamic>) → core::bool})
+                  :controller.{asy::_AsyncStarStreamController::addStream}(self::getStreamNull()){(asy::Stream<dynamic>) → void};
+                  [yield] null;
+                  if(_in::unsafeCast<core::bool>(:result_or_exception))
                     return null;
-                  else
-                    [yield] null;
                 }
                 return;
               }
@@ -440,10 +441,10 @@
               try {
                 #L12:
                 {
-                  if(:controller.{asy::_AsyncStarStreamController::addStream}(self::getStreamBool()){(asy::Stream<core::bool>) → core::bool})
+                  :controller.{asy::_AsyncStarStreamController::addStream}(self::getStreamBool()){(asy::Stream<core::bool>) → void};
+                  [yield] null;
+                  if(_in::unsafeCast<core::bool>(:result_or_exception))
                     return null;
-                  else
-                    [yield] null;
                 }
                 return;
               }
diff --git a/pkg/front_end/testcases/nnbd/issue41437c.dart.weak.transformed.expect b/pkg/front_end/testcases/nnbd/issue41437c.dart.weak.transformed.expect
index cfc0b05..c8058e6 100644
--- a/pkg/front_end/testcases/nnbd/issue41437c.dart.weak.transformed.expect
+++ b/pkg/front_end/testcases/nnbd/issue41437c.dart.weak.transformed.expect
@@ -40,6 +40,7 @@
 import self as self;
 import "dart:async" as asy;
 import "dart:core" as core;
+import "dart:_internal" as _in;
 
 static method getNull() → dynamic
   return null;
@@ -160,13 +161,13 @@
       try {
         #L4:
         {
-          if(:controller.{asy::_AsyncStarStreamController::addStream}(invalid-expression "pkg/front_end/testcases/nnbd/issue41437c.dart:21:10: Error: A value of type 'Stream<dynamic>' can't be assigned to a variable of type 'Stream<bool>'.
+          :controller.{asy::_AsyncStarStreamController::addStream}(invalid-expression "pkg/front_end/testcases/nnbd/issue41437c.dart:21:10: Error: A value of type 'Stream<dynamic>' can't be assigned to a variable of type 'Stream<bool>'.
  - 'Stream' is from 'dart:async'.
   yield* getStreamNull(); // error
-         ^" in self::getStreamNull() as{TypeError,ForNonNullableByDefault} asy::Stream<core::bool>){(asy::Stream<core::bool>) → core::bool})
+         ^" in self::getStreamNull() as{TypeError,ForNonNullableByDefault} asy::Stream<core::bool>){(asy::Stream<core::bool>) → void};
+          [yield] null;
+          if(_in::unsafeCast<core::bool>(:result_or_exception))
             return null;
-          else
-            [yield] null;
         }
         return;
       }
@@ -203,10 +204,10 @@
       try {
         #L5:
         {
-          if(:controller.{asy::_AsyncStarStreamController::addStream}(self::getStreamBool()){(asy::Stream<core::bool>) → core::bool})
+          :controller.{asy::_AsyncStarStreamController::addStream}(self::getStreamBool()){(asy::Stream<core::bool>) → void};
+          [yield] null;
+          if(_in::unsafeCast<core::bool>(:result_or_exception))
             return null;
-          else
-            [yield] null;
         }
         return;
       }
@@ -285,13 +286,13 @@
               try {
                 #L8:
                 {
-                  if(:controller.{asy::_AsyncStarStreamController::addStream}(invalid-expression "pkg/front_end/testcases/nnbd/issue41437c.dart:38:12: Error: A value of type 'Stream<dynamic>' can't be assigned to a variable of type 'Stream<bool>'.
+                  :controller.{asy::_AsyncStarStreamController::addStream}(invalid-expression "pkg/front_end/testcases/nnbd/issue41437c.dart:38:12: Error: A value of type 'Stream<dynamic>' can't be assigned to a variable of type 'Stream<bool>'.
  - 'Stream' is from 'dart:async'.
     yield* getStreamNull(); // error
-           ^" in self::getStreamNull() as{TypeError,ForNonNullableByDefault} asy::Stream<core::bool>){(asy::Stream<core::bool>) → core::bool})
+           ^" in self::getStreamNull() as{TypeError,ForNonNullableByDefault} asy::Stream<core::bool>){(asy::Stream<core::bool>) → void};
+                  [yield] null;
+                  if(_in::unsafeCast<core::bool>(:result_or_exception))
                     return null;
-                  else
-                    [yield] null;
                 }
                 return;
               }
@@ -328,10 +329,10 @@
               try {
                 #L9:
                 {
-                  if(:controller.{asy::_AsyncStarStreamController::addStream}(self::getStreamBool()){(asy::Stream<core::bool>) → core::bool})
+                  :controller.{asy::_AsyncStarStreamController::addStream}(self::getStreamBool()){(asy::Stream<core::bool>) → void};
+                  [yield] null;
+                  if(_in::unsafeCast<core::bool>(:result_or_exception))
                     return null;
-                  else
-                    [yield] null;
                 }
                 return;
               }
@@ -402,10 +403,10 @@
               try {
                 #L11:
                 {
-                  if(:controller.{asy::_AsyncStarStreamController::addStream}(self::getStreamNull()){(asy::Stream<dynamic>) → core::bool})
+                  :controller.{asy::_AsyncStarStreamController::addStream}(self::getStreamNull()){(asy::Stream<dynamic>) → void};
+                  [yield] null;
+                  if(_in::unsafeCast<core::bool>(:result_or_exception))
                     return null;
-                  else
-                    [yield] null;
                 }
                 return;
               }
@@ -440,10 +441,10 @@
               try {
                 #L12:
                 {
-                  if(:controller.{asy::_AsyncStarStreamController::addStream}(self::getStreamBool()){(asy::Stream<core::bool>) → core::bool})
+                  :controller.{asy::_AsyncStarStreamController::addStream}(self::getStreamBool()){(asy::Stream<core::bool>) → void};
+                  [yield] null;
+                  if(_in::unsafeCast<core::bool>(:result_or_exception))
                     return null;
-                  else
-                    [yield] null;
                 }
                 return;
               }
diff --git a/pkg/vm/lib/transformations/continuation.dart b/pkg/vm/lib/transformations/continuation.dart
index 455b281..f158e8e 100644
--- a/pkg/vm/lib/transformations/continuation.dart
+++ b/pkg/vm/lib/transformations/continuation.dart
@@ -1329,10 +1329,21 @@
         functionType: addMethodFunctionType)
       ..fileOffset = stmt.fileOffset;
 
-    statements.add(new IfStatement(
-        addExpression,
-        new ReturnStatement(new NullLiteral()),
-        createContinuationPoint()..fileOffset = stmt.fileOffset));
+    if (stmt.isYieldStar) {
+      statements.add(ExpressionStatement(addExpression));
+      statements.add(createContinuationPoint()..fileOffset = stmt.fileOffset);
+      final wasCancelled = StaticInvocation(
+          helper.unsafeCast,
+          Arguments(<Expression>[VariableGet(expressionRewriter!.asyncResult)],
+              types: <DartType>[helper.coreTypes.boolNonNullableRawType]));
+      statements
+          .add(IfStatement(wasCancelled, ReturnStatement(NullLiteral()), null));
+    } else {
+      statements.add(new IfStatement(
+          addExpression,
+          new ReturnStatement(new NullLiteral()),
+          createContinuationPoint()..fileOffset = stmt.fileOffset));
+    }
     return removalSentinel ?? EmptyStatement();
   }
 
diff --git a/runtime/tests/vm/dart/causal_stacks/flutter_regress_100441_test.dart b/runtime/tests/vm/dart/causal_stacks/flutter_regress_100441_test.dart
new file mode 100644
index 0000000..b5ff8ad
--- /dev/null
+++ b/runtime/tests/vm/dart/causal_stacks/flutter_regress_100441_test.dart
@@ -0,0 +1,77 @@
+// Copyright (c) 2022, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+//
+// VMOptions=--lazy-async-stacks
+
+import 'dart:async';
+
+import 'package:expect/expect.dart';
+
+import 'utils.dart' show assertStack;
+
+String effectOrder = '';
+StackTrace? stackAfterYield = null;
+
+void emit(String m) => effectOrder += m;
+
+main() async {
+  emit('1');
+  await for (final value in produce()) {
+    emit('5');
+    Expect.equals('|value|', value);
+  }
+  emit('8');
+  Expect.equals('12345678', effectOrder);
+
+  assertStack(const <String>[
+    r'^#0      produceInner .*$',
+    r'^<asynchronous suspension>$',
+    r'^#1      produce .*$',
+    r'^<asynchronous suspension>$',
+    r'^#2      main .*$',
+    r'^<asynchronous suspension>$',
+  ], stackAfterYield!);
+
+  effectOrder = '';
+
+  emit('1');
+  await for (final value in produceYieldStar()) {
+    emit('5');
+    Expect.equals('|value|', value);
+    break;
+  }
+  emit('6');
+  Expect.equals('123456', effectOrder);
+}
+
+Stream<dynamic> produce() async* {
+  emit('2');
+  await for (String response in produceInner()) {
+    emit('4');
+    yield response;
+  }
+  emit('7');
+}
+
+Stream produceInner() async* {
+  emit('3');
+  yield '|value|';
+  emit('6');
+  stackAfterYield = StackTrace.current;
+}
+
+Stream<dynamic> produceYieldStar() async* {
+  emit('2');
+  await for (String response in produceInner()) {
+    emit('4');
+    yield response;
+  }
+  emit('x');
+}
+
+Stream produceInnerYieldStar() async* {
+  emit('3');
+  yield* Stream.fromIterable(['|value|', '|value2|']);
+  emit('x');
+}
diff --git a/runtime/tests/vm/dart/flutter_regress_101514_test.dart b/runtime/tests/vm/dart/flutter_regress_101514_test.dart
new file mode 100644
index 0000000..6c78710
--- /dev/null
+++ b/runtime/tests/vm/dart/flutter_regress_101514_test.dart
@@ -0,0 +1,21 @@
+// Copyright (c) 2022, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+import 'dart:async';
+
+import 'package:expect/expect.dart';
+
+late StreamSubscription sub;
+
+main() async {
+  sub = foo().listen((_) => throw 'unexpected item');
+}
+
+Stream<int> foo() async* {
+  // While the generator (i.e. this funtion) runs, we cancel the consumer and
+  // try to yield more.
+  sub.cancel();
+  yield* Stream.fromIterable([1, 2, 3]);
+  throw 'should not run';
+}
diff --git a/runtime/tests/vm/dart_2/causal_stacks/flutter_regress_100441_test.dart b/runtime/tests/vm/dart_2/causal_stacks/flutter_regress_100441_test.dart
new file mode 100644
index 0000000..2d4d2ef
--- /dev/null
+++ b/runtime/tests/vm/dart_2/causal_stacks/flutter_regress_100441_test.dart
@@ -0,0 +1,78 @@
+// Copyright (c) 2022, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+//
+// VMOptions=--lazy-async-stacks
+
+// @dart = 2.9
+import 'dart:async';
+
+import 'package:expect/expect.dart';
+
+import 'utils.dart' show assertStack;
+
+String effectOrder = '';
+StackTrace stackAfterYield = null;
+
+void emit(String m) => effectOrder += m;
+
+main() async {
+  emit('1');
+  await for (final value in produce()) {
+    emit('5');
+    Expect.equals('|value|', value);
+  }
+  emit('8');
+  Expect.equals('12345678', effectOrder);
+
+  assertStack(const <String>[
+    r'^#0      produceInner .*$',
+    r'^<asynchronous suspension>$',
+    r'^#1      produce .*$',
+    r'^<asynchronous suspension>$',
+    r'^#2      main .*$',
+    r'^<asynchronous suspension>$',
+  ], stackAfterYield);
+
+  effectOrder = '';
+
+  emit('1');
+  await for (final value in produceYieldStar()) {
+    emit('5');
+    Expect.equals('|value|', value);
+    break;
+  }
+  emit('6');
+  Expect.equals('123456', effectOrder);
+}
+
+Stream<dynamic> produce() async* {
+  emit('2');
+  await for (String response in produceInner()) {
+    emit('4');
+    yield response;
+  }
+  emit('7');
+}
+
+Stream produceInner() async* {
+  emit('3');
+  yield '|value|';
+  emit('6');
+  stackAfterYield = StackTrace.current;
+}
+
+Stream<dynamic> produceYieldStar() async* {
+  emit('2');
+  await for (String response in produceInner()) {
+    emit('4');
+    yield response;
+  }
+  emit('x');
+}
+
+Stream produceInnerYieldStar() async* {
+  emit('3');
+  yield* Stream.fromIterable(['|value|', '|value2|']);
+  emit('x');
+}
diff --git a/runtime/tests/vm/dart_2/flutter_regress_101514_test.dart b/runtime/tests/vm/dart_2/flutter_regress_101514_test.dart
new file mode 100644
index 0000000..44d029a
--- /dev/null
+++ b/runtime/tests/vm/dart_2/flutter_regress_101514_test.dart
@@ -0,0 +1,23 @@
+// Copyright (c) 2022, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// @dart=2.9
+
+import 'dart:async';
+
+import 'package:expect/expect.dart';
+
+StreamSubscription sub;
+
+main() async {
+  sub = foo().listen((_) => throw 'unexpected item');
+}
+
+Stream<int> foo() async* {
+  // While the generator (i.e. this funtion) runs, we cancel the consumer and
+  // try to yield more.
+  sub.cancel();
+  yield* Stream.fromIterable([1, 2, 3]);
+  throw 'should not run';
+}
diff --git a/runtime/vm/stack_trace.cc b/runtime/vm/stack_trace.cc
index 4c7cbcf..eee21f8 100644
--- a/runtime/vm/stack_trace.cc
+++ b/runtime/vm/stack_trace.cc
@@ -71,7 +71,7 @@
       future_listener_class(Class::Handle(zone)),
       async_start_stream_controller_class(Class::Handle(zone)),
       stream_controller_class(Class::Handle(zone)),
-      async_stream_controller_class(Class::Handle(zone)),
+      sync_stream_controller_class(Class::Handle(zone)),
       controller_subscription_class(Class::Handle(zone)),
       buffering_stream_subscription_class(Class::Handle(zone)),
       stream_iterator_class(Class::Handle(zone)),
@@ -100,9 +100,9 @@
   stream_controller_class =
       async_lib.LookupClassAllowPrivate(Symbols::_StreamController());
   ASSERT(!stream_controller_class.IsNull());
-  async_stream_controller_class =
-      async_lib.LookupClassAllowPrivate(Symbols::_AsyncStreamController());
-  ASSERT(!async_stream_controller_class.IsNull());
+  sync_stream_controller_class =
+      async_lib.LookupClassAllowPrivate(Symbols::_SyncStreamController());
+  ASSERT(!sync_stream_controller_class.IsNull());
   controller_subscription_class =
       async_lib.LookupClassAllowPrivate(Symbols::_ControllerSubscription());
   ASSERT(!controller_subscription_class.IsNull());
@@ -171,7 +171,7 @@
   const Instance& controller = Instance::Cast(context_entry_);
   controller_ = controller.GetField(controller_controller_field);
   ASSERT(!controller_.IsNull());
-  ASSERT(controller_.GetClassId() == async_stream_controller_class.id());
+  ASSERT(controller_.GetClassId() == sync_stream_controller_class.id());
 
   // Get the _StreamController._state field.
   state_ = Instance::Cast(controller_).GetField(state_field);
@@ -209,7 +209,7 @@
     // contains the iterator's value. In that case we cannot unwind anymore.
     //
     // Notice: With correct async* semantics this may never be true: The async*
-    // generator should only be invoked to produce a vaue if there's an
+    // generator should only be invoked to produce a value if there's an
     // in-progress `await streamIterator.moveNext()` call. Once such call has
     // finished the async* generator should be paused/yielded until the next
     // such call - and being paused/yielded means it should not appear in stack
diff --git a/runtime/vm/stack_trace.h b/runtime/vm/stack_trace.h
index 0683cf7..58a4cba 100644
--- a/runtime/vm/stack_trace.h
+++ b/runtime/vm/stack_trace.h
@@ -78,7 +78,7 @@
   Class& future_listener_class;
   Class& async_start_stream_controller_class;
   Class& stream_controller_class;
-  Class& async_stream_controller_class;
+  Class& sync_stream_controller_class;
   Class& controller_subscription_class;
   Class& buffering_stream_subscription_class;
   Class& stream_iterator_class;
diff --git a/runtime/vm/symbols.h b/runtime/vm/symbols.h
index 52ec048..f96d8d3 100644
--- a/runtime/vm/symbols.h
+++ b/runtime/vm/symbols.h
@@ -278,7 +278,6 @@
   V(Values, "values")                                                          \
   V(WeakSerializationReference, "WeakSerializationReference")                  \
   V(_AsyncStarStreamController, "_AsyncStarStreamController")                  \
-  V(_AsyncStreamController, "_AsyncStreamController")                          \
   V(_BufferingStreamSubscription, "_BufferingStreamSubscription")              \
   V(_ByteBuffer, "_ByteBuffer")                                                \
   V(_ByteBufferDot_New, "_ByteBuffer._New")                                    \
@@ -381,6 +380,7 @@
   V(_StreamIterator, "_StreamIterator")                                        \
   V(_String, "String")                                                         \
   V(_SyncIterator, "_SyncIterator")                                            \
+  V(_SyncStreamController, "_SyncStreamController")                            \
   V(_TransferableTypedDataImpl, "_TransferableTypedDataImpl")                  \
   V(_Type, "_Type")                                                            \
   V(_FunctionType, "_FunctionType")                                            \
diff --git a/sdk/lib/_internal/vm/lib/async_patch.dart b/sdk/lib/_internal/vm/lib/async_patch.dart
index 2e322f1..3d4b20b 100644
--- a/sdk/lib/_internal/vm/lib/async_patch.dart
+++ b/sdk/lib/_internal/vm/lib/async_patch.dart
@@ -117,6 +117,13 @@
   bool isSuspendedAtYield = false;
   _Future? cancellationFuture = null;
 
+  /// Argument passed to the generator when it is resumed after an addStream.
+  ///
+  /// `true` if the generator should exit after `yield*` resumes.
+  /// `false` if the generator should continue after `yield*` resumes.
+  /// `null` otherwies.
+  bool? continuationArgument = null;
+
   Stream<T> get stream {
     final Stream<T> local = controller.stream;
     if (local is _StreamImpl<T>) {
@@ -128,7 +135,9 @@
   void runBody() {
     isScheduled = false;
     isSuspendedAtYield = false;
-    asyncStarBody(null, null);
+    final bool? argument = continuationArgument;
+    continuationArgument = null;
+    asyncStarBody(argument, null);
   }
 
   void scheduleGenerator() {
@@ -152,11 +161,11 @@
   bool add(T event) {
     if (!onListenReceived) _fatal("yield before stream is listened to");
     if (isSuspendedAtYield) _fatal("unexpected yield");
-    // If stream is cancelled, tell caller to exit the async generator.
+    controller.add(event);
     if (!controller.hasListener) {
       return true;
     }
-    controller.add(event);
+
     scheduleGenerator();
     isSuspendedAtYield = true;
     return false;
@@ -165,22 +174,41 @@
   // Adds the elements of stream into this controller's stream.
   // The generator will be scheduled again when all of the
   // elements of the added stream have been consumed.
-  // Returns true if the caller should terminate
-  // execution of the generator.
-  bool addStream(Stream<T> stream) {
+  void addStream(Stream<T> stream) {
     if (!onListenReceived) _fatal("yield before stream is listened to");
-    // If stream is cancelled, tell caller to exit the async generator.
-    if (!controller.hasListener) return true;
+
+    if (exitAfterYieldStarIfCancelled()) return;
+
     isAdding = true;
-    var whenDoneAdding = controller.addStream(stream, cancelOnError: false);
+    final whenDoneAdding = controller.addStream(stream, cancelOnError: false);
     whenDoneAdding.then((_) {
       isAdding = false;
-      scheduleGenerator();
-      if (!isScheduled) isSuspendedAtYield = true;
+      if (exitAfterYieldStarIfCancelled()) return;
+      resumeNormallyAfterYieldStar();
     });
+  }
+
+  /// Schedules the generator to exit after `yield*` if stream was cancelled.
+  ///
+  /// Returns `true` if generator is told to exit and `false` otherwise.
+  bool exitAfterYieldStarIfCancelled() {
+    // If consumer cancelled subscription we should tell async* generator to
+    // finish (i.e. run finally clauses and return).
+    if (!controller.hasListener) {
+      continuationArgument = true;
+      scheduleGenerator();
+      return true;
+    }
     return false;
   }
 
+  /// Schedules the generator to resume normally after `yield*`.
+  void resumeNormallyAfterYieldStar() {
+    continuationArgument = false;
+    scheduleGenerator();
+    if (!isScheduled) isSuspendedAtYield = true;
+  }
+
   void addError(Object error, StackTrace stackTrace) {
     // TODO(40614): Remove once non-nullability is sound.
     ArgumentError.checkNotNull(error, "error");
@@ -211,7 +239,7 @@
   }
 
   _AsyncStarStreamController(this.asyncStarBody)
-      : controller = new StreamController() {
+      : controller = new StreamController(sync: true) {
     controller.onListen = this.onListen;
     controller.onResume = this.onResume;
     controller.onCancel = this.onCancel;
diff --git a/tests/language/async_star/pause2_test.dart b/tests/language/async_star/pause2_test.dart
index a240d3d..3df0223 100644
--- a/tests/language/async_star/pause2_test.dart
+++ b/tests/language/async_star/pause2_test.dart
@@ -72,8 +72,8 @@
     f() async* {
       int i = 0;
       while (true) {
-        yield i;
         list.add(i);
+        yield i;
         i++;
       }
     }
diff --git a/tests/language/async_star/throw_in_catch_test.dart b/tests/language/async_star/throw_in_catch_test.dart
index 88982e5..7509a00 100644
--- a/tests/language/async_star/throw_in_catch_test.dart
+++ b/tests/language/async_star/throw_in_catch_test.dart
@@ -115,7 +115,7 @@
 test() async {
   // TODO(sigurdm): These tests are too dependent on scheduling, and buffering
   // behavior.
-  await runTest(foo1, "abcYdgC", null, true);
+  await runTest(foo1, "abcYgC", null, true);
   await runTest(foo2, "abcX", "Error", false);
   await runTest(foo3, "abcYX", "Error", false);
   await runTest(foo4, "abcYdYefX", "Error2", false);
diff --git a/tests/language_2/async_star/pause2_test.dart b/tests/language_2/async_star/pause2_test.dart
index 11e0429..f6e8d5f 100644
--- a/tests/language_2/async_star/pause2_test.dart
+++ b/tests/language_2/async_star/pause2_test.dart
@@ -74,8 +74,8 @@
     f() async* {
       int i = 0;
       while (true) {
-        yield i;
         list.add(i);
+        yield i;
         i++;
       }
     }
diff --git a/tests/language_2/async_star/throw_in_catch_test.dart b/tests/language_2/async_star/throw_in_catch_test.dart
index 57b6b0e..6a26245 100644
--- a/tests/language_2/async_star/throw_in_catch_test.dart
+++ b/tests/language_2/async_star/throw_in_catch_test.dart
@@ -118,7 +118,7 @@
 test() async {
   // TODO(sigurdm): These tests are too dependent on scheduling, and buffering
   // behavior.
-  await runTest(foo1, "abcYdgC", null, true);
+  await runTest(foo1, "abcYgC", null, true);
   await runTest(foo2, "abcX", "Error", false);
   await runTest(foo3, "abcYX", "Error", false);
   await runTest(foo4, "abcYdYefX", "Error2", false);