Merge branch 'master' into deprecte-async-cache-stream
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 18043c2..1315bdc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,6 +11,9 @@
* Deprecated `AsyncCache.fetchStream`.
+* Make `AsyncCache.ephemeral` invalidate itself immediately when the returned
+ future completes, rather than wait for a later timer event.
+
## 2.8.2
* Deprecate `EventSinkBase`, `StreamSinkBase`, `IOSinkBase`.
diff --git a/lib/src/async_cache.dart b/lib/src/async_cache.dart
index 4f87ca2..b055182 100644
--- a/lib/src/async_cache.dart
+++ b/lib/src/async_cache.dart
@@ -26,7 +26,10 @@
/// [fake_async]: https://pub.dev/packages/fake_async
class AsyncCache<T> {
/// How long cached values stay fresh.
- final Duration _duration;
+ ///
+ /// Set to `null` for ephemeral caches, which only stay alive until the
+ /// future completes.
+ final Duration? _duration;
/// Cached results of a previous `fetchStream` call.
StreamSplitter<T>? _cachedStreamSplitter;
@@ -42,14 +45,14 @@
/// The [duration] starts counting after the Future returned by [fetch]
/// completes, or after the Stream returned by `fetchStream` emits a done
/// event.
- AsyncCache(this._duration);
+ AsyncCache(Duration duration) : _duration = duration;
/// Creates a cache that invalidates after an in-flight request is complete.
///
/// An ephemeral cache guarantees that a callback function will only be
/// executed at most once concurrently. This is useful for requests for which
/// data is updated frequently but stale data is acceptable.
- factory AsyncCache.ephemeral() => AsyncCache(Duration.zero);
+ AsyncCache.ephemeral() : _duration = null;
/// Returns a cached value from a previous call to [fetch], or runs [callback]
/// to compute a new one.
@@ -60,12 +63,8 @@
if (_cachedStreamSplitter != null) {
throw StateError('Previously used to cache via `fetchStream`');
}
- final result = _cachedValueFuture ??= callback();
- try {
- return await result;
- } finally {
- _startStaleTimer();
- }
+ return _cachedValueFuture ??= callback()
+ ..whenComplete(_startStaleTimer).ignore();
}
/// Returns a cached stream from a previous call to [fetchStream], or runs
@@ -74,6 +73,13 @@
/// If [fetchStream] has been called recently enough, returns a copy of its
/// previous return value. Otherwise, runs [callback] and returns its new
/// return value.
+ ///
+ /// Each call to this function returns a stream which replays the same events,
+ /// which means that all stream events are cached until this cache is
+ /// invalidated.
+ ///
+ /// Only starts counting time after the stream has been listened to,
+ /// and it has completed with a `done` event.
@Deprecated("Feature will be removed")
Stream<T> fetchStream(Stream<T> Function() callback) {
if (_cachedValueFuture != null) {
@@ -99,6 +105,11 @@
}
void _startStaleTimer() {
- _stale = Timer(_duration, invalidate);
+ var duration = _duration;
+ if (duration != null) {
+ _stale = Timer(duration, invalidate);
+ } else {
+ invalidate();
+ }
}
}
diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart
index 394ba07..9e3a735 100644
--- a/lib/src/cancelable_operation.dart
+++ b/lib/src/cancelable_operation.dart
@@ -153,16 +153,50 @@
final completer =
CancelableCompleter<R>(onCancel: propagateCancel ? cancel : null);
- _completer._inner?.future
- .then(onValue, onError: onError)
- .then(completer.complete, onError: completer.completeError);
- _completer._cancelCompleter?.future.then((_) {
- if (onCancel != null) {
- completer.complete(Future.sync(onCancel));
- } else {
- completer._cancel();
+ // if `_completer._inner` completes before `completer` is cancelled
+ // call `onValue` or `onError` with the result, and complete `completer`
+ // with the result of that call (unless cancelled in the meantime).
+ //
+ // If `_completer._cancelCompleter` completes (always with a value)
+ // before `completer` is cancelled, then call `onCancel` (if supplied)
+ // with that that value and complete `completer` with the result of that
+ // call (unless cancelled in the meantime).
+ //
+ // If any of the callbacks throw synchronously, the `completer` is
+ // completed with that error.
+ //
+ // If no `onCancel` is provided, and `_completer._cancelCompleter`
+ // completes before `completer` is cancelled,
+ // then cancel `cancelCompleter`. (Cancelling twice is safe.)
+
+ _completer._inner?.future.then<void>((value) {
+ if (completer.isCanceled) return;
+ try {
+ completer.complete(onValue(value));
+ } catch (error, stack) {
+ completer.completeError(error, stack);
}
- });
+ },
+ onError: onError == null
+ ? completer.completeError // Is ignored if already cancelled.
+ : (Object error, StackTrace stack) {
+ if (completer.isCanceled) return;
+ try {
+ completer.complete(onError(error, stack));
+ } catch (error2, stack2) {
+ completer.completeError(error2, stack2);
+ }
+ });
+ _completer._cancelCompleter?.future.whenComplete(onCancel == null
+ ? completer._cancel
+ : () {
+ if (completer.isCanceled) return;
+ try {
+ completer.complete(onCancel());
+ } catch (error, stack) {
+ completer.completeError(error, stack);
+ }
+ });
return completer.operation;
}
diff --git a/test/async_cache_test.dart b/test/async_cache_test.dart
index 4af6195..4948e53 100644
--- a/test/async_cache_test.dart
+++ b/test/async_cache_test.dart
@@ -28,15 +28,43 @@
'Expensive');
});
- test('should not fetch via callback when a future is in-flight', () async {
- // No actual caching is done, just avoid duplicate requests.
- cache = AsyncCache.ephemeral();
+ group('ephemeral cache', () {
+ test('should not fetch via callback when a future is in-flight', () async {
+ // No actual caching is done, just avoid duplicate requests.
+ cache = AsyncCache.ephemeral();
- var completer = Completer<String>();
- expect(cache.fetch(() => completer.future), completion('Expensive'));
- expect(cache.fetch(expectAsync0(() async => 'fake', count: 0)),
- completion('Expensive'));
- completer.complete('Expensive');
+ var completer = Completer<String>();
+ expect(cache.fetch(() => completer.future), completion('Expensive'));
+ expect(cache.fetch(expectAsync0(() async => 'fake', count: 0)),
+ completion('Expensive'));
+ completer.complete('Expensive');
+ });
+
+ test('should fetch via callback when the in-flight future completes',
+ () async {
+ // No actual caching is done, just avoid duplicate requests.
+ cache = AsyncCache.ephemeral();
+
+ var fetched = cache.fetch(() async => "first");
+ expect(fetched, completion('first'));
+ expect(
+ cache.fetch(expectAsync0(() async => fail('not called'), count: 0)),
+ completion('first'));
+ await fetched;
+ expect(cache.fetch(() async => 'second'), completion('second'));
+ });
+
+ test('should invalidate even if the future throws an exception', () async {
+ cache = AsyncCache.ephemeral();
+
+ Future<String> throwingCall() async => throw Exception();
+ await expectLater(cache.fetch(throwingCall), throwsA(isException));
+ // To let the timer invalidate the cache
+ await Future.delayed(Duration(milliseconds: 5));
+
+ Future<String> call() async => 'Completed';
+ expect(await cache.fetch(call), 'Completed', reason: 'Cache invalidates');
+ });
});
test('should fetch via a callback again when cache expires', () {
@@ -160,16 +188,4 @@
}));
expect(cache.fetchStream(call).toList(), completion(['1', '2', '3']));
});
-
- test('should invalidate even if the future throws an exception', () async {
- cache = AsyncCache.ephemeral();
-
- Future<String> throwingCall() async => throw Exception();
- await expectLater(cache.fetch(throwingCall), throwsA(isException));
- // To let the timer invalidate the cache
- await Future.delayed(Duration(milliseconds: 5));
-
- Future<String> call() async => 'Completed';
- expect(await cache.fetch(call), 'Completed', reason: 'Cache invalidates');
- });
}
diff --git a/test/cancelable_operation_test.dart b/test/cancelable_operation_test.dart
index cf2fd8b..5e56b61 100644
--- a/test/cancelable_operation_test.dart
+++ b/test/cancelable_operation_test.dart
@@ -495,6 +495,45 @@
expect(originalCompleter.isCanceled, false);
});
+
+ test('onValue callback not called after cancel', () async {
+ var called = false;
+ onValue = expectAsync1((_) {
+ called = true;
+ fail("onValue unreachable");
+ }, count: 0);
+
+ await runThen().cancel();
+ originalCompleter.complete(0);
+ await flushMicrotasks();
+ expect(called, false);
+ });
+
+ test('onError callback not called after cancel', () async {
+ var called = false;
+ onError = expectAsync2((_, __) {
+ called = true;
+ fail("onError unreachable");
+ }, count: 0);
+
+ await runThen().cancel();
+ originalCompleter.completeError("Error", StackTrace.empty);
+ await flushMicrotasks();
+ expect(called, false);
+ });
+
+ test('onCancel callback not called after cancel', () async {
+ var called = false;
+ onCancel = expectAsync0(() {
+ called = true;
+ fail("onCancel unreachable");
+ }, count: 0);
+
+ await runThen().cancel();
+ await originalCompleter.operation.cancel();
+ await flushMicrotasks();
+ expect(called, false);
+ });
});
});