Make ephemeral async cache clear sooner.

The `AsyncCache.ephemeral()` cache would invalidate
the `fetch` cache after a `Duration.zero` wait.
That allows a large number of microtasks to happen between
the `fetch` completing and the cache being invalidated.

Instead the cache is now invalidated immediately when the fetched
request completes.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index aabfb36..0086e7b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,9 @@
 * Update `StreamGroup` methods that return a `Future<dynamic>` today to return
   a `Future<void>` instead.
 
+* Make `AsyncCache.ephemeral` invalidate itself immediately when the returned
+  future completes, rather than wait for a later timer event.
+
 ## 2.8.2
 
 * Deprecate `EventSinkBase`, `StreamSinkBase`, `IOSinkBase`.
diff --git a/lib/src/async_cache.dart b/lib/src/async_cache.dart
index 99b3881..be7434f 100644
--- a/lib/src/async_cache.dart
+++ b/lib/src/async_cache.dart
@@ -26,7 +26,10 @@
 /// [fake_async]: https://pub.dev/packages/fake_async
 class AsyncCache<T> {
   /// How long cached values stay fresh.
-  final Duration _duration;
+  ///
+  /// Set to `null` for ephemeral caches, which only stay alive until the
+  /// future completes.
+  final Duration? _duration;
 
   /// Cached results of a previous [fetchStream] call.
   StreamSplitter<T>? _cachedStreamSplitter;
@@ -42,14 +45,14 @@
   /// The [duration] starts counting after the Future returned by [fetch]
   /// completes, or after the Stream returned by [fetchStream] emits a done
   /// event.
-  AsyncCache(this._duration);
+  AsyncCache(Duration duration) : _duration = duration;
 
   /// Creates a cache that invalidates after an in-flight request is complete.
   ///
   /// An ephemeral cache guarantees that a callback function will only be
   /// executed at most once concurrently. This is useful for requests for which
   /// data is updated frequently but stale data is acceptable.
-  factory AsyncCache.ephemeral() => AsyncCache(Duration.zero);
+  AsyncCache.ephemeral() : _duration = null;
 
   /// Returns a cached value from a previous call to [fetch], or runs [callback]
   /// to compute a new one.
@@ -60,12 +63,8 @@
     if (_cachedStreamSplitter != null) {
       throw StateError('Previously used to cache via `fetchStream`');
     }
-    final result = _cachedValueFuture ??= callback();
-    try {
-      return await result;
-    } finally {
-      _startStaleTimer();
-    }
+    return _cachedValueFuture ??= callback()
+      ..whenComplete(_startStaleTimer).ignore();
   }
 
   /// Returns a cached stream from a previous call to [fetchStream], or runs
@@ -74,6 +73,13 @@
   /// If [fetchStream] has been called recently enough, returns a copy of its
   /// previous return value. Otherwise, runs [callback] and returns its new
   /// return value.
+  ///
+  /// Each call to this function returns a stream which replays the same events,
+  /// which means that all stream events are cached until this cache is
+  /// invalidated.
+  ///
+  /// Only starts counting time after the stream has been listened to,
+  /// and it has completed with a `done` event.
   Stream<T> fetchStream(Stream<T> Function() callback) {
     if (_cachedValueFuture != null) {
       throw StateError('Previously used to cache via `fetch`');
@@ -98,6 +104,11 @@
   }
 
   void _startStaleTimer() {
-    _stale = Timer(_duration, invalidate);
+    var duration = _duration;
+    if (duration != null) {
+      _stale = Timer(duration, invalidate);
+    } else {
+      invalidate();
+    }
   }
 }
diff --git a/test/async_cache_test.dart b/test/async_cache_test.dart
index 3ceda79..e8ff132 100644
--- a/test/async_cache_test.dart
+++ b/test/async_cache_test.dart
@@ -26,15 +26,43 @@
         'Expensive');
   });
 
-  test('should not fetch via callback when a future is in-flight', () async {
-    // No actual caching is done, just avoid duplicate requests.
-    cache = AsyncCache.ephemeral();
+  group('ephemeral cache', () {
+    test('should not fetch via callback when a future is in-flight', () async {
+      // No actual caching is done, just avoid duplicate requests.
+      cache = AsyncCache.ephemeral();
 
-    var completer = Completer<String>();
-    expect(cache.fetch(() => completer.future), completion('Expensive'));
-    expect(cache.fetch(expectAsync0(() async => 'fake', count: 0)),
-        completion('Expensive'));
-    completer.complete('Expensive');
+      var completer = Completer<String>();
+      expect(cache.fetch(() => completer.future), completion('Expensive'));
+      expect(cache.fetch(expectAsync0(() async => 'fake', count: 0)),
+          completion('Expensive'));
+      completer.complete('Expensive');
+    });
+
+    test('should fetch via callback when the in-flight future completes',
+        () async {
+      // No actual caching is done, just avoid duplicate requests.
+      cache = AsyncCache.ephemeral();
+
+      var fetched = cache.fetch(() async => "first");
+      expect(fetched, completion('first'));
+      expect(
+          cache.fetch(expectAsync0(() async => fail('not called'), count: 0)),
+          completion('first'));
+      await fetched;
+      expect(cache.fetch(() async => 'second'), completion('second'));
+    });
+
+    test('should invalidate even if the future throws an exception', () async {
+      cache = AsyncCache.ephemeral();
+
+      Future<String> throwingCall() async => throw Exception();
+      await expectLater(cache.fetch(throwingCall), throwsA(isException));
+      // To let the timer invalidate the cache
+      await Future.delayed(Duration(milliseconds: 5));
+
+      Future<String> call() async => 'Completed';
+      expect(await cache.fetch(call), 'Completed', reason: 'Cache invalidates');
+    });
   });
 
   test('should fetch via a callback again when cache expires', () {
@@ -158,16 +186,4 @@
     }));
     expect(cache.fetchStream(call).toList(), completion(['1', '2', '3']));
   });
-
-  test('should invalidate even if the future throws an exception', () async {
-    cache = AsyncCache.ephemeral();
-
-    Future<String> throwingCall() async => throw Exception();
-    await expectLater(cache.fetch(throwingCall), throwsA(isException));
-    // To let the timer invalidate the cache
-    await Future.delayed(Duration(milliseconds: 5));
-
-    Future<String> call() async => 'Completed';
-    expect(await cache.fetch(call), 'Completed', reason: 'Cache invalidates');
-  });
 }