Handle async conditions in mayEmit (#2647)

Fix a bug where the allowed `AsyncCondition` is used with `softCheck`
instead of `softCheckAsync`. Make it feasible to catch the exception for
using an asynchronous condition in `describe`, and use that exception to
detect when to fallback on a more general clause description for
`mayEmit` and `mayEmitMultiple`.

Note that callers of `nestAsync` and `expectAsync` should synchronously
surface a potential synchronous exception and update all existing
callers to do so.
diff --git a/pkgs/checks/CHANGELOG.md b/pkgs/checks/CHANGELOG.md
index 27d8623..b53ccc4 100644
--- a/pkgs/checks/CHANGELOG.md
+++ b/pkgs/checks/CHANGELOG.md
@@ -3,6 +3,9 @@
 - Require Dart 3.7
 - Improve speed of pretty printing for large collections.
 - Improve formatting for failures involving unexpected exceptions.
+- Fix a bug when using asynchronous conditions with `mayEmit` or
+  `mayEmitMultiple`. Note that extensions using `nestAsync` should synchronously
+  forward exceptions from that call.
 
 ## 0.3.1
 
diff --git a/pkgs/checks/lib/context.dart b/pkgs/checks/lib/context.dart
index cebba71..428cc01 100644
--- a/pkgs/checks/lib/context.dart
+++ b/pkgs/checks/lib/context.dart
@@ -5,6 +5,7 @@
 export 'src/checks.dart'
     show
         AsyncCondition,
+        AsyncConditionDisallowed,
         CheckFailure,
         Condition,
         Context,
diff --git a/pkgs/checks/lib/src/checks.dart b/pkgs/checks/lib/src/checks.dart
index 3a6818e..c9cad66 100644
--- a/pkgs/checks/lib/src/checks.dart
+++ b/pkgs/checks/lib/src/checks.dart
@@ -436,10 +436,14 @@
   ///
   /// {@macro async_limitations}
   ///
+  /// Extensions which use `expectAsync` should always make that call
+  /// synchronously and return the result so that exceptions stemming from use
+  /// in a synchronous context can be reported synchronously.
+  ///
   /// ```dart
   /// extension CustomChecks on Subject<CustomType> {
-  ///   Future<void> someAsyncExpectation() async {
-  ///     await context.expectAsync(() => ['meets this async expectation'],
+  ///   Future<void> someAsyncExpectation()  {
+  ///     return context.expectAsync(() => ['meets this async expectation'],
   ///         (actual) async {
   ///       if (await _expectationIsMet(actual)) return null;
   ///       return Rejection(which: ['does not meet this async expectation']);
@@ -552,10 +556,14 @@
   ///
   /// {@macro async_limitations}
   ///
+  /// Extensions which use `nestAsync` should always make that call
+  /// synchronously and return the result so that exceptions stemming from use
+  /// in a synchronous context can be reported synchronously.
+  ///
   /// ```dart
   /// Future<void> someAsyncResult(
-  ///     [AsyncCondition<Result> resultCondition]) async {
-  ///   await context.nestAsync(() => ['has someAsyncResult'], (actual) async {
+  ///     [AsyncCondition<Result> resultCondition]) {
+  ///   return context.nestAsync(() => ['has someAsyncResult'], (actual) async {
   ///     if (await _asyncOperationFailed(actual)) {
   ///       return Extracted.rejection(which: ['cannot read someAsyncResult']);
   ///     }
@@ -740,12 +748,15 @@
   Future<void> expectAsync(
     Iterable<String> Function() clause,
     FutureOr<Rejection?> Function(T) predicate,
+  ) {
+    if (!_allowAsync) throw AsyncConditionDisallowed._('Async');
+    return _expectAsync(clause, predicate);
+  }
+
+  Future<void> _expectAsync(
+    Iterable<String> Function() clause,
+    FutureOr<Rejection?> Function(T) predicate,
   ) async {
-    if (!_allowAsync) {
-      throw StateError(
-        'Async expectations cannot be used on a synchronous subject',
-      );
-    }
     _clauses.add(_ExpectationClause(clause));
     final outstandingWork = TestHandle.current.markPending();
     try {
@@ -764,9 +775,7 @@
     Iterable<String> Function() clause,
     void Function(T actual, void Function(Rejection) reject) predicate,
   ) {
-    if (!_allowUnawaited) {
-      throw StateError('Late expectations cannot be used for soft checks');
-    }
+    if (!_allowUnawaited) throw AsyncConditionDisallowed._('Unawaited');
     _clauses.add(_ExpectationClause(clause));
     _value.apply((actual) {
       predicate(actual, (r) => _fail(_failure(r._fillActual(actual))));
@@ -801,14 +810,18 @@
   @override
   Future<void> nestAsync<R>(
     Iterable<String> Function() label,
+    FutureOr<Extracted<R>> Function(T) extract, [
+    AsyncCondition<R>? nestedCondition,
+  ]) {
+    if (!_allowAsync) throw AsyncConditionDisallowed._('Async');
+    return _nestAsync(label, extract, nestedCondition);
+  }
+
+  Future<void> _nestAsync<R>(
+    Iterable<String> Function() label,
     FutureOr<Extracted<R>> Function(T) extract,
     AsyncCondition<R>? nestedCondition,
   ) async {
-    if (!_allowAsync) {
-      throw StateError(
-        'Async expectations cannot be used on a synchronous subject',
-      );
-    }
     final outstandingWork = TestHandle.current.markPending();
     try {
       final result = await _value.mapAsync(
@@ -1054,3 +1067,10 @@
 
   Rejection({this.actual = const [], this.which});
 }
+
+class AsyncConditionDisallowed implements Exception {
+  final String flavor;
+  AsyncConditionDisallowed._(this.flavor);
+  @override
+  String toString() => '$flavor expectations cannot be used on this subject';
+}
diff --git a/pkgs/checks/lib/src/extensions/async.dart b/pkgs/checks/lib/src/extensions/async.dart
index 2ff8ace..cee3d40 100644
--- a/pkgs/checks/lib/src/extensions/async.dart
+++ b/pkgs/checks/lib/src/extensions/async.dart
@@ -18,8 +18,8 @@
   ///
   /// The returned future will complete when the subject future has completed,
   /// and [completionCondition] has optionally been checked.
-  Future<void> completes([AsyncCondition<T>? completionCondition]) async {
-    await context.nestAsync<T>(() => ['completes to a value'], (actual) async {
+  Future<void> completes([AsyncCondition<T>? completionCondition]) {
+    return context.nestAsync<T>(() => ['completes to a value'], (actual) async {
       try {
         return Extracted.value(await actual);
       } catch (e, st) {
@@ -79,10 +79,8 @@
   ///
   /// The returned future will complete when the subject future has completed,
   /// and [errorCondition] has optionally been checked.
-  Future<void> throws<E extends Object>([
-    AsyncCondition<E>? errorCondition,
-  ]) async {
-    await context.nestAsync<E>(
+  Future<void> throws<E extends Object>([AsyncCondition<E>? errorCondition]) {
+    return context.nestAsync<E>(
       () => ['completes to an error${E == Object ? '' : ' of type $E'}'],
       (actual) async {
         try {
@@ -144,8 +142,8 @@
   ///
   /// The returned future will complete when the stream has emitted, errored, or
   /// ended, and the [emittedCondition] has optionally been checked.
-  Future<void> emits([AsyncCondition<T>? emittedCondition]) async {
-    await context.nestAsync<T>(() => ['emits a value'], (actual) async {
+  Future<void> emits([AsyncCondition<T>? emittedCondition]) {
+    return context.nestAsync<T>(() => ['emits a value'], (actual) async {
       if (!await actual.hasNext) {
         return Extracted.rejection(
           actual: ['a stream'],
@@ -184,8 +182,8 @@
   /// ended, and the [errorCondition] has optionally been checked.
   Future<void> emitsError<E extends Object>([
     AsyncCondition<E>? errorCondition,
-  ]) async {
-    await context.nestAsync<E>(
+  ]) {
+    return context.nestAsync<E>(
       () => ['emits an error${E == Object ? '' : ' of type $E'}'],
       (actual) async {
         if (!await actual.hasNext) {
@@ -327,15 +325,13 @@
   /// If this expectation succeeds, consumes the same events from the source
   /// queue as the satisfied condition. If multiple conditions are satisfied,
   /// chooses the condition which consumed the most events.
-  Future<void> anyOf(
-    Iterable<AsyncCondition<StreamQueue<T>>> conditions,
-  ) async {
+  Future<void> anyOf(Iterable<AsyncCondition<StreamQueue<T>>> conditions) {
     conditions = conditions.toList();
     if (conditions.isEmpty) {
       throw ArgumentError('conditions may not be empty');
     }
     final descriptions = <Iterable<String>>[];
-    await context.expectAsync(
+    return context.expectAsync(
       () => descriptions.isEmpty
           ? ['satisfies any of ${conditions.length} conditions']
           : [
@@ -444,14 +440,20 @@
   ///
   /// If a non-matching event is emitted, no events are consumed.
   /// If a matching event is emitted, that event is consumed.
-  Future<void> mayEmit(AsyncCondition<T> condition) async {
-    await context.expectAsync(
-      () => ['may emit a value that:', ...describe(condition)],
+  Future<void> mayEmit(AsyncCondition<T> condition) {
+    return context.expectAsync(
+      () {
+        try {
+          return ['may emit a value that:', ...describe(condition)];
+        } on AsyncConditionDisallowed {
+          return ['may emit a value satisfying an asynchronous condition'];
+        }
+      },
       (actual) async {
         if (!await actual.hasNext) return null;
         try {
           final value = await actual.peek;
-          if (softCheck(value, condition) == null) {
+          if (await softCheckAsync(value, condition) == null) {
             await actual.next;
           }
         } catch (_) {
@@ -470,14 +472,20 @@
   /// - A non-matching event is emitted.
   /// - An error is emitted.
   /// - The stream closes.
-  Future<void> mayEmitMultiple(AsyncCondition<T> condition) async {
-    await context.expectAsync(
-      () => ['may emit a value that:', ...describe(condition)],
+  Future<void> mayEmitMultiple(AsyncCondition<T> condition) {
+    return context.expectAsync(
+      () {
+        try {
+          return ['may emit a value that:', ...describe(condition)];
+        } on AsyncConditionDisallowed {
+          return ['may emit a value satisfying an asynchronous condition'];
+        }
+      },
       (actual) async {
         while (await actual.hasNext) {
           try {
             final value = await actual.peek;
-            if (softCheck(value, condition) == null) {
+            if (await softCheckAsync(value, condition) == null) {
               await actual.next;
             } else {
               return null;
diff --git a/pkgs/checks/test/extensions/async_test.dart b/pkgs/checks/test/extensions/async_test.dart
index 75647be..ea05920 100644
--- a/pkgs/checks/test/extensions/async_test.dart
+++ b/pkgs/checks/test/extensions/async_test.dart
@@ -417,6 +417,14 @@
         );
         await check(queue).emits((it) => it.equals(1));
       });
+      test('consumes a matching async event', () async {
+        final queue = StreamQueue(Stream.value(Future.value(1)));
+        await softCheckAsync<StreamQueue<Future<int>>>(
+          queue,
+          (it) => it.mayEmit((it) => it.completes((it) => it.equals(1))),
+        );
+        await check(queue).isDone();
+      });
       test('does not consume a non-matching event', () async {
         final queue = _countingStream(2);
         await softCheckAsync<StreamQueue<int>>(
@@ -435,6 +443,16 @@
           (it) => it.has((e) => e.message, 'message').equals('Error at 1'),
         );
       });
+      test('can be described when condition is async', () async {
+        await check(
+          (Subject<StreamQueue<Future<int>>> it) =>
+              it.mayEmit((it) => it.completes()),
+        ).hasAsyncDescriptionWhich(
+          (it) => it.deepEquals([
+            '  may emit a value satisfying an asynchronous condition',
+          ]),
+        );
+      });
     });
 
     group('mayEmitMultiple', () {
@@ -475,6 +493,25 @@
           (it) => it.has((e) => e.message, 'message').equals('Error at 1'),
         );
       });
+      test('consumes a matching async event', () async {
+        final queue = StreamQueue(Stream.value(Future.value(1)));
+        await softCheckAsync<StreamQueue<Future<int>>>(
+          queue,
+          (it) =>
+              it.mayEmitMultiple((it) => it.completes((it) => it.equals(1))),
+        );
+        await check(queue).isDone();
+      });
+      test('can be described when condition is async', () async {
+        await check(
+          (Subject<StreamQueue<Future<int>>> it) =>
+              it.mayEmitMultiple((it) => it.completes()),
+        ).hasAsyncDescriptionWhich(
+          (it) => it.deepEquals([
+            '  may emit a value satisfying an asynchronous condition',
+          ]),
+        );
+      });
     });
 
     group('isDone', () {
diff --git a/pkgs/checks/test/test_shared.dart b/pkgs/checks/test/test_shared.dart
index d86ad95..6212cab 100644
--- a/pkgs/checks/test/test_shared.dart
+++ b/pkgs/checks/test/test_shared.dart
@@ -53,10 +53,10 @@
     Condition<T> condition, {
     Iterable<String>? actual,
     Iterable<String>? which,
-  }) async {
+  }) {
     late T actualValue;
     var didRunCallback = false;
-    await context.nestAsync<Rejection>(
+    return context.nestAsync<Rejection>(
       () => ['does not meet an async condition with a Rejection'],
       (value) async {
         actualValue = value;