[lib] Avoid empty stack-traces in Stream methods

This change leans on VMs ability to provide a awaiter stack traces (if
the code is written using async syntax sugar or properly annotated with
@pragma('vm:awaiter-link').

Currently doing

    await Stream.fromIterable([]).first;

would result in an empty stack trace. However after this change
we actually get a meaningful stack trace when running on the VM.

Fixes https://github.com/dart-lang/sdk/issues/51025

TEST=vm/dart/awaiter_stacks/stream_methods

CoreLibraryReviewExempt: No functional or API changes. Just better error messages on exceptional code paths.
Change-Id: Ifc3e446c00121cc97f2f870c3e3f5bf56a6a6964
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/453940
Reviewed-by: Lasse Nielsen <lrn@google.com>
Commit-Queue: Slava Egorov <vegorov@google.com>
diff --git a/runtime/tests/vm/dart/awaiter_stacks/stream_methods_test.dart b/runtime/tests/vm/dart/awaiter_stacks/stream_methods_test.dart
index c74ba0f..7334411 100644
--- a/runtime/tests/vm/dart/awaiter_stacks/stream_methods_test.dart
+++ b/runtime/tests/vm/dart/awaiter_stacks/stream_methods_test.dart
@@ -12,6 +12,7 @@
 // stack traces when unwinding through various built-in [Stream] methods.
 import 'dart:async';
 
+import 'package:expect/expect.dart';
 import 'harness.dart' as harness;
 
 Future<void> baz() async {
@@ -34,8 +35,21 @@
   await baz();
 }
 
-Future<void> runTest(Future<void> Function(Stream<String> stream) body) async {
-  await body(foo());
+Stream<String> Function() stream(Iterable<String> elements) {
+  return () => Stream.fromIterable(elements);
+}
+
+Future<void> runTest(
+  Future<void> Function(Stream<String> stream) body, {
+  Stream<String> Function() source = foo,
+  bool expectedToThrow = false,
+}) async {
+  try {
+    await body(source());
+  } catch (e, st) {
+    Expect.isTrue(expectedToThrow, 'Not expecting exception');
+    await harness.checkExpectedStack(st);
+  }
 }
 
 Future<void> main() async {
@@ -59,10 +73,56 @@
   await runTest((s) => s.first);
   await runTest((s) => s.last);
   await runTest((s) => s.single);
+  await runTest((s) => s.singleWhere((element) => true));
   await runTest((s) => s.firstWhere((element) => true));
   await runTest((s) => s.lastWhere((element) => true));
   await runTest((s) => s.elementAt(0));
 
+  // Check that error caused by empty stream also produces good stack traces.
+  await runTest(
+    (s) => s.reduce((a, b) => ''),
+    source: stream([]),
+    expectedToThrow: true,
+  );
+  await runTest((s) => s.first, source: stream([]), expectedToThrow: true);
+  await runTest((s) => s.last, source: stream([]), expectedToThrow: true);
+  await runTest((s) => s.single, source: stream([]), expectedToThrow: true);
+  await runTest(
+    (s) => s.single,
+    source: stream(['1', '2']),
+    expectedToThrow: true,
+  );
+  await runTest(
+    (s) => s.single,
+    source: stream(['1', '2']),
+    expectedToThrow: true,
+  );
+  await runTest(
+    (s) => s.singleWhere((_) => true),
+    source: stream([]),
+    expectedToThrow: true,
+  );
+  await runTest(
+    (s) => s.singleWhere((_) => true),
+    source: stream(['1', '2']),
+    expectedToThrow: true,
+  );
+  await runTest(
+    (s) => s.firstWhere((element) => true),
+    source: stream([]),
+    expectedToThrow: true,
+  );
+  await runTest(
+    (s) => s.lastWhere((element) => true),
+    source: stream([]),
+    expectedToThrow: true,
+  );
+  await runTest(
+    (s) => s.elementAt(0),
+    source: stream([]),
+    expectedToThrow: true,
+  );
+
   harness.updateExpectations();
 }
 
@@ -1226,6 +1286,93 @@
   """
 #0    foo (%test%)
 <asynchronous suspension>
+#1    Stream.singleWhere.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#2    runTest (%test%)
+<asynchronous suspension>
+#3    main (%test%)
+<asynchronous suspension>""",
+  """
+#0    baz (%test%)
+#1    foo (%test%)
+<asynchronous suspension>
+#2    Stream.singleWhere.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#3    runTest (%test%)
+<asynchronous suspension>
+#4    main (%test%)
+<asynchronous suspension>""",
+  """
+#0    baz (%test%)
+<asynchronous suspension>
+#1    foo (%test%)
+<asynchronous suspension>
+#2    Stream.singleWhere.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#3    runTest (%test%)
+<asynchronous suspension>
+#4    main (%test%)
+<asynchronous suspension>""",
+  """
+#0    bar (%test%)
+<asynchronous suspension>
+#1    foo (%test%)
+<asynchronous suspension>
+#2    Stream.singleWhere.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#3    runTest (%test%)
+<asynchronous suspension>
+#4    main (%test%)
+<asynchronous suspension>""",
+  """
+#0    baz (%test%)
+#1    bar (%test%)
+<asynchronous suspension>
+#2    foo (%test%)
+<asynchronous suspension>
+#3    Stream.singleWhere.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#4    runTest (%test%)
+<asynchronous suspension>
+#5    main (%test%)
+<asynchronous suspension>""",
+  """
+#0    baz (%test%)
+<asynchronous suspension>
+#1    bar (%test%)
+<asynchronous suspension>
+#2    foo (%test%)
+<asynchronous suspension>
+#3    Stream.singleWhere.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#4    runTest (%test%)
+<asynchronous suspension>
+#5    main (%test%)
+<asynchronous suspension>""",
+  """
+#0    baz (%test%)
+#1    foo (%test%)
+<asynchronous suspension>
+#2    Stream.singleWhere.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#3    runTest (%test%)
+<asynchronous suspension>
+#4    main (%test%)
+<asynchronous suspension>""",
+  """
+#0    baz (%test%)
+<asynchronous suspension>
+#1    foo (%test%)
+<asynchronous suspension>
+#2    Stream.singleWhere.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#3    runTest (%test%)
+<asynchronous suspension>
+#4    main (%test%)
+<asynchronous suspension>""",
+  """
+#0    foo (%test%)
+<asynchronous suspension>
 #1    Stream.firstWhere.<anonymous closure> (stream.dart)
 <asynchronous suspension>
 #2    runTest (%test%)
@@ -1442,5 +1589,82 @@
 <asynchronous suspension>
 #5    main (%test%)
 <asynchronous suspension>""",
+  """
+#0    Stream.reduce.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#1    runTest (%test%)
+<asynchronous suspension>
+#2    main (%test%)
+<asynchronous suspension>""",
+  """
+#0    Stream.first.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#1    runTest (%test%)
+<asynchronous suspension>
+#2    main (%test%)
+<asynchronous suspension>""",
+  """
+#0    Stream.last.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#1    runTest (%test%)
+<asynchronous suspension>
+#2    main (%test%)
+<asynchronous suspension>""",
+  """
+#0    Stream.single.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#1    runTest (%test%)
+<asynchronous suspension>
+#2    main (%test%)
+<asynchronous suspension>""",
+  """
+#0    Stream.single.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#1    runTest (%test%)
+<asynchronous suspension>
+#2    main (%test%)
+<asynchronous suspension>""",
+  """
+#0    Stream.single.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#1    runTest (%test%)
+<asynchronous suspension>
+#2    main (%test%)
+<asynchronous suspension>""",
+  """
+#0    Stream.singleWhere.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#1    runTest (%test%)
+<asynchronous suspension>
+#2    main (%test%)
+<asynchronous suspension>""",
+  """
+#0    Stream.singleWhere.<anonymous closure>.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#1    runTest (%test%)
+<asynchronous suspension>
+#2    main (%test%)
+<asynchronous suspension>""",
+  """
+#0    Stream.firstWhere.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#1    runTest (%test%)
+<asynchronous suspension>
+#2    main (%test%)
+<asynchronous suspension>""",
+  """
+#0    Stream.lastWhere.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#1    runTest (%test%)
+<asynchronous suspension>
+#2    main (%test%)
+<asynchronous suspension>""",
+  """
+#0    Stream.elementAt.<anonymous closure> (stream.dart)
+<asynchronous suspension>
+#1    runTest (%test%)
+<asynchronous suspension>
+#2    main (%test%)
+<asynchronous suspension>""",
 ];
 // CURRENT EXPECTATIONS END
diff --git a/sdk/lib/async/stream.dart b/sdk/lib/async/stream.dart
index 488573c..f400c84 100644
--- a/sdk/lib/async/stream.dart
+++ b/sdk/lib/async/stream.dart
@@ -1064,6 +1064,7 @@
   /// print(result); // 28
   /// ```
   Future<T> reduce(T combine(T previous, T element)) {
+    @pragma('vm:awaiter-link')
     _Future<T> result = _Future<T>();
     bool seenFirst = false;
     late T value;
@@ -1072,7 +1073,7 @@
       onError: result._completeError,
       onDone: () {
         if (!seenFirst) {
-          var stack = StackTrace.empty;
+          var stack = StackTrace.current;
           var error = IterableElementError.noElement();
           _trySetStackTrace(error, stack);
           _completeWithErrorCallback(result, error, stack);
@@ -1643,12 +1644,13 @@
   /// Except for the type of the error, this method is equivalent to
   /// `this.elementAt(0)`.
   Future<T> get first {
+    @pragma('vm:awaiter-link')
     _Future<T> future = _Future<T>();
     StreamSubscription<T> subscription = this.listen(
       null,
       onError: future._completeError,
       onDone: () {
-        var stack = StackTrace.empty;
+        var stack = StackTrace.current;
         var error = IterableElementError.noElement();
         _trySetStackTrace(error, stack);
         _completeWithErrorCallback(future, error, stack);
@@ -1670,6 +1672,7 @@
   /// If this stream is empty (the done event is the first event),
   /// the returned future completes with an error.
   Future<T> get last {
+    @pragma('vm:awaiter-link')
     _Future<T> future = _Future<T>();
     late T result;
     bool foundResult = false;
@@ -1684,7 +1687,7 @@
           future._complete(result);
           return;
         }
-        var stack = StackTrace.empty;
+        var stack = StackTrace.current;
         var error = IterableElementError.noElement();
         _trySetStackTrace(error, stack);
         _completeWithErrorCallback(future, error, stack);
@@ -1703,6 +1706,7 @@
   /// If this [Stream] is empty or has more than one element,
   /// the returned future completes with an error.
   Future<T> get single {
+    @pragma('vm:awaiter-link')
     _Future<T> future = _Future<T>();
     late T result;
     bool foundResult = false;
@@ -1714,7 +1718,7 @@
           future._complete(result);
           return;
         }
-        var stack = StackTrace.empty;
+        var stack = StackTrace.current;
         var error = IterableElementError.noElement();
         _trySetStackTrace(error, stack);
         _completeWithErrorCallback(future, error, stack);
@@ -1724,7 +1728,7 @@
     subscription.onData((T value) {
       if (foundResult) {
         // This is the second element we get.
-        var stack = StackTrace.empty;
+        var stack = StackTrace.current;
         var error = IterableElementError.tooMany();
         _trySetStackTrace(error, stack);
         _cancelAndErrorWithReplacement(subscription, future, error, stack);
@@ -1773,6 +1777,7 @@
   /// print(result); // -1
   /// ```
   Future<T> firstWhere(bool test(T element), {T orElse()?}) {
+    @pragma('vm:awaiter-link')
     _Future<T> future = _Future();
     StreamSubscription<T> subscription = this.listen(
       null,
@@ -1782,7 +1787,7 @@
           _runUserCode(orElse, future._complete, future._completeError);
           return;
         }
-        var stack = StackTrace.empty;
+        var stack = StackTrace.current;
         var error = IterableElementError.noElement();
         _trySetStackTrace(error, stack);
         _completeWithErrorCallback(future, error, stack);
@@ -1826,6 +1831,7 @@
   /// print(result); // -1
   /// ```
   Future<T> lastWhere(bool test(T element), {T orElse()?}) {
+    @pragma('vm:awaiter-link')
     _Future<T> future = _Future();
     late T result;
     bool foundResult = false;
@@ -1841,7 +1847,7 @@
           _runUserCode(orElse, future._complete, future._completeError);
           return;
         }
-        var stack = StackTrace.empty;
+        var stack = StackTrace.current;
         var error = IterableElementError.noElement();
         _trySetStackTrace(error, stack);
         _completeWithErrorCallback(future, error, stack);
@@ -1893,6 +1899,7 @@
   /// // Throws.
   /// ```
   Future<T> singleWhere(bool test(T element), {T orElse()?}) {
+    @pragma('vm:awaiter-link')
     _Future<T> future = _Future<T>();
     late T result;
     bool foundResult = false;
@@ -1908,7 +1915,7 @@
           _runUserCode(orElse, future._complete, future._completeError);
           return;
         }
-        var stack = StackTrace.empty;
+        var stack = StackTrace.current;
         var error = IterableElementError.noElement();
         _trySetStackTrace(error, stack);
         _completeWithErrorCallback(future, error, stack);
@@ -1920,7 +1927,7 @@
       _runUserCode(() => test(value), (bool isMatch) {
         if (isMatch) {
           if (foundResult) {
-            var stack = StackTrace.empty;
+            var stack = StackTrace.current;
             var error = IterableElementError.tooMany();
             _trySetStackTrace(error, stack);
             _cancelAndErrorWithReplacement(subscription, future, error, stack);
@@ -1950,6 +1957,7 @@
   /// with a [RangeError].
   Future<T> elementAt(int index) {
     RangeError.checkNotNegative(index, "index");
+    @pragma('vm:awaiter-link')
     _Future<T> result = _Future<T>();
     int elementIndex = 0;
     StreamSubscription<T> subscription;
@@ -1964,7 +1972,7 @@
             indexable: this,
             name: "index",
           ),
-          StackTrace.empty,
+          StackTrace.current,
         );
       },
       cancelOnError: true,