Merge branch 'master' into deprecte-async-cache-stream
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 18043c2..1315bdc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,6 +11,9 @@
 
 * Deprecated `AsyncCache.fetchStream`.
 
+* Make `AsyncCache.ephemeral` invalidate itself immediately when the returned
+  future completes, rather than wait for a later timer event.
+
 ## 2.8.2
 
 * Deprecate `EventSinkBase`, `StreamSinkBase`, `IOSinkBase`.
diff --git a/lib/src/async_cache.dart b/lib/src/async_cache.dart
index 4f87ca2..b055182 100644
--- a/lib/src/async_cache.dart
+++ b/lib/src/async_cache.dart
@@ -26,7 +26,10 @@
 /// [fake_async]: https://pub.dev/packages/fake_async
 class AsyncCache<T> {
   /// How long cached values stay fresh.
-  final Duration _duration;
+  ///
+  /// Set to `null` for ephemeral caches, which only stay alive until the
+  /// future completes.
+  final Duration? _duration;
 
   /// Cached results of a previous `fetchStream` call.
   StreamSplitter<T>? _cachedStreamSplitter;
@@ -42,14 +45,14 @@
   /// The [duration] starts counting after the Future returned by [fetch]
   /// completes, or after the Stream returned by `fetchStream` emits a done
   /// event.
-  AsyncCache(this._duration);
+  AsyncCache(Duration duration) : _duration = duration;
 
   /// Creates a cache that invalidates after an in-flight request is complete.
   ///
   /// An ephemeral cache guarantees that a callback function will only be
   /// executed at most once concurrently. This is useful for requests for which
   /// data is updated frequently but stale data is acceptable.
-  factory AsyncCache.ephemeral() => AsyncCache(Duration.zero);
+  AsyncCache.ephemeral() : _duration = null;
 
   /// Returns a cached value from a previous call to [fetch], or runs [callback]
   /// to compute a new one.
@@ -60,12 +63,8 @@
     if (_cachedStreamSplitter != null) {
       throw StateError('Previously used to cache via `fetchStream`');
     }
-    final result = _cachedValueFuture ??= callback();
-    try {
-      return await result;
-    } finally {
-      _startStaleTimer();
-    }
+    return _cachedValueFuture ??= callback()
+      ..whenComplete(_startStaleTimer).ignore();
   }
 
   /// Returns a cached stream from a previous call to [fetchStream], or runs
@@ -74,6 +73,13 @@
   /// If [fetchStream] has been called recently enough, returns a copy of its
   /// previous return value. Otherwise, runs [callback] and returns its new
   /// return value.
+  ///
+  /// Each call to this function returns a stream which replays the same events,
+  /// which means that all stream events are cached until this cache is
+  /// invalidated.
+  ///
+  /// Only starts counting time after the stream has been listened to,
+  /// and it has completed with a `done` event.
   @Deprecated("Feature will be removed")
   Stream<T> fetchStream(Stream<T> Function() callback) {
     if (_cachedValueFuture != null) {
@@ -99,6 +105,11 @@
   }
 
   void _startStaleTimer() {
-    _stale = Timer(_duration, invalidate);
+    var duration = _duration;
+    if (duration != null) {
+      _stale = Timer(duration, invalidate);
+    } else {
+      invalidate();
+    }
   }
 }
diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart
index 394ba07..9e3a735 100644
--- a/lib/src/cancelable_operation.dart
+++ b/lib/src/cancelable_operation.dart
@@ -153,16 +153,50 @@
     final completer =
         CancelableCompleter<R>(onCancel: propagateCancel ? cancel : null);
 
-    _completer._inner?.future
-        .then(onValue, onError: onError)
-        .then(completer.complete, onError: completer.completeError);
-    _completer._cancelCompleter?.future.then((_) {
-      if (onCancel != null) {
-        completer.complete(Future.sync(onCancel));
-      } else {
-        completer._cancel();
+    // if `_completer._inner` completes before `completer` is cancelled
+    // call `onValue` or `onError` with the result, and complete `completer`
+    // with the result of that call (unless cancelled in the meantime).
+    //
+    // If `_completer._cancelCompleter` completes (always with a value)
+    // before `completer` is cancelled, then call `onCancel` (if supplied)
+    // with that that value and complete `completer` with the result of that
+    // call (unless cancelled in the meantime).
+    //
+    // If any of the callbacks throw synchronously, the `completer` is
+    // completed with that error.
+    //
+    // If no `onCancel` is provided, and `_completer._cancelCompleter`
+    // completes before `completer` is cancelled,
+    // then cancel `cancelCompleter`. (Cancelling twice is safe.)
+
+    _completer._inner?.future.then<void>((value) {
+      if (completer.isCanceled) return;
+      try {
+        completer.complete(onValue(value));
+      } catch (error, stack) {
+        completer.completeError(error, stack);
       }
-    });
+    },
+        onError: onError == null
+            ? completer.completeError // Is ignored if already cancelled.
+            : (Object error, StackTrace stack) {
+                if (completer.isCanceled) return;
+                try {
+                  completer.complete(onError(error, stack));
+                } catch (error2, stack2) {
+                  completer.completeError(error2, stack2);
+                }
+              });
+    _completer._cancelCompleter?.future.whenComplete(onCancel == null
+        ? completer._cancel
+        : () {
+            if (completer.isCanceled) return;
+            try {
+              completer.complete(onCancel());
+            } catch (error, stack) {
+              completer.completeError(error, stack);
+            }
+          });
 
     return completer.operation;
   }
diff --git a/test/async_cache_test.dart b/test/async_cache_test.dart
index 4af6195..4948e53 100644
--- a/test/async_cache_test.dart
+++ b/test/async_cache_test.dart
@@ -28,15 +28,43 @@
         'Expensive');
   });
 
-  test('should not fetch via callback when a future is in-flight', () async {
-    // No actual caching is done, just avoid duplicate requests.
-    cache = AsyncCache.ephemeral();
+  group('ephemeral cache', () {
+    test('should not fetch via callback when a future is in-flight', () async {
+      // No actual caching is done, just avoid duplicate requests.
+      cache = AsyncCache.ephemeral();
 
-    var completer = Completer<String>();
-    expect(cache.fetch(() => completer.future), completion('Expensive'));
-    expect(cache.fetch(expectAsync0(() async => 'fake', count: 0)),
-        completion('Expensive'));
-    completer.complete('Expensive');
+      var completer = Completer<String>();
+      expect(cache.fetch(() => completer.future), completion('Expensive'));
+      expect(cache.fetch(expectAsync0(() async => 'fake', count: 0)),
+          completion('Expensive'));
+      completer.complete('Expensive');
+    });
+
+    test('should fetch via callback when the in-flight future completes',
+        () async {
+      // No actual caching is done, just avoid duplicate requests.
+      cache = AsyncCache.ephemeral();
+
+      var fetched = cache.fetch(() async => "first");
+      expect(fetched, completion('first'));
+      expect(
+          cache.fetch(expectAsync0(() async => fail('not called'), count: 0)),
+          completion('first'));
+      await fetched;
+      expect(cache.fetch(() async => 'second'), completion('second'));
+    });
+
+    test('should invalidate even if the future throws an exception', () async {
+      cache = AsyncCache.ephemeral();
+
+      Future<String> throwingCall() async => throw Exception();
+      await expectLater(cache.fetch(throwingCall), throwsA(isException));
+      // To let the timer invalidate the cache
+      await Future.delayed(Duration(milliseconds: 5));
+
+      Future<String> call() async => 'Completed';
+      expect(await cache.fetch(call), 'Completed', reason: 'Cache invalidates');
+    });
   });
 
   test('should fetch via a callback again when cache expires', () {
@@ -160,16 +188,4 @@
     }));
     expect(cache.fetchStream(call).toList(), completion(['1', '2', '3']));
   });
-
-  test('should invalidate even if the future throws an exception', () async {
-    cache = AsyncCache.ephemeral();
-
-    Future<String> throwingCall() async => throw Exception();
-    await expectLater(cache.fetch(throwingCall), throwsA(isException));
-    // To let the timer invalidate the cache
-    await Future.delayed(Duration(milliseconds: 5));
-
-    Future<String> call() async => 'Completed';
-    expect(await cache.fetch(call), 'Completed', reason: 'Cache invalidates');
-  });
 }
diff --git a/test/cancelable_operation_test.dart b/test/cancelable_operation_test.dart
index cf2fd8b..5e56b61 100644
--- a/test/cancelable_operation_test.dart
+++ b/test/cancelable_operation_test.dart
@@ -495,6 +495,45 @@
 
         expect(originalCompleter.isCanceled, false);
       });
+
+      test('onValue callback not called after cancel', () async {
+        var called = false;
+        onValue = expectAsync1((_) {
+          called = true;
+          fail("onValue unreachable");
+        }, count: 0);
+
+        await runThen().cancel();
+        originalCompleter.complete(0);
+        await flushMicrotasks();
+        expect(called, false);
+      });
+
+      test('onError callback not called after cancel', () async {
+        var called = false;
+        onError = expectAsync2((_, __) {
+          called = true;
+          fail("onError unreachable");
+        }, count: 0);
+
+        await runThen().cancel();
+        originalCompleter.completeError("Error", StackTrace.empty);
+        await flushMicrotasks();
+        expect(called, false);
+      });
+
+      test('onCancel callback not called after cancel', () async {
+        var called = false;
+        onCancel = expectAsync0(() {
+          called = true;
+          fail("onCancel unreachable");
+        }, count: 0);
+
+        await runThen().cancel();
+        await originalCompleter.operation.cancel();
+        await flushMicrotasks();
+        expect(called, false);
+      });
     });
   });