Add CancelableOperation.thenOperation (#211)
Towards #210
Once combined with `Completer.completeOperation` this signature adds
significant flexibility for chaining Cancelable work following other
async work.
Move the existing `.then` implementation to `.thenOperation` since the latter
is more general.
Co-authored-by: Lasse R.H. Nielsen <lrn@google.com>
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b5b5c9e..38f8465 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 2.10.0-dev
+
+* Add `CancelableOperation.thenOperation` which gives more flexibility to
+ complete the resulting operation.
+
## 2.9.0
* **Potentially Breaking** The default `propagateCancel` argument to
diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart
index 6ee350e..ee9ff9d 100644
--- a/lib/src/cancelable_operation.dart
+++ b/lib/src/cancelable_operation.dart
@@ -133,7 +133,7 @@
/// Creates a new cancelable operation to be completed when this operation
/// completes normally or as an error, or is cancelled.
///
- /// If this operation completes normally the [value] is passed to [onValue]
+ /// If this operation completes normally the value is passed to [onValue]
/// and the returned operation is completed with the result.
///
/// If this operation completes as an error, and no [onError] callback is
@@ -147,6 +147,11 @@
/// If this operation is canceled, and an [onCancel] callback is provided,
/// the returned operation is completed with the result.
///
+ /// At most one of [onValue], [onError], or [onCancel] will be called.
+ /// If any of [onValue], [onError], or [onCancel] throw a synchronous error,
+ /// or return a `Future` that completes as an error, the error will be
+ /// forwarded through the returned operation.
+ ///
/// If the returned operation is canceled before this operation completes or
/// is canceled, the [onValue], [onError], and [onCancel] callbacks will not
/// be invoked. If [propagateCancel] is `true` (the default) then this
@@ -154,8 +159,59 @@
/// listeners on this operation and canceling the [onValue], [onError], and
/// [onCancel] callbacks should not cancel the other listeners.
CancelableOperation<R> then<R>(FutureOr<R> Function(T) onValue,
- {FutureOr<R> Function(Object, StackTrace)? onError,
- FutureOr<R> Function()? onCancel,
+ {FutureOr<R> Function(Object, StackTrace)? onError,
+ FutureOr<R> Function()? onCancel,
+ bool propagateCancel = true}) =>
+ thenOperation<R>((value, completer) {
+ completer.complete(onValue(value));
+ },
+ onError: onError == null
+ ? null
+ : (error, stackTrace, completer) {
+ completer.complete(onError(error, stackTrace));
+ },
+ onCancel: onCancel == null
+ ? null
+ : (completer) {
+ completer.complete(onCancel());
+ },
+ propagateCancel: propagateCancel);
+
+ /// Creates a new cancelable operation to be completed when this operation
+ /// completes normally or as an error, or is cancelled.
+ ///
+ /// If this operation completes normally the value is passed to [onValue]
+ /// with a [CancelableCompleter] controlling the returned operation.
+ ///
+ /// If this operation completes as an error, and no [onError] callback is
+ /// provided, the returned operation is completed with the same error and
+ /// stack trace.
+ /// If this operation completes as an error, and an [onError] callback is
+ /// provided, the error and stack trace are passed to [onError] with a
+ /// [CancelableCompleter] controlling the returned operation.
+ ///
+ /// If this operation is canceled, and no [onCancel] callback is provided,
+ /// the returned operation is canceled.
+ /// If this operation is canceled, and an [onCancel] callback is provided,
+ /// the [onCancel] callback is called with a [CancelableCompleter] controlling
+ /// the returned operation.
+ ///
+ /// At most one of [onValue], [onError], or [onCancel] will be called.
+ /// If any of [onValue], [onError], or [onCancel] throw a synchronous error,
+ /// or return a `Future` that completes as an error, the error will be
+ /// forwarded through the returned operation.
+ ///
+ /// If the returned operation is canceled before this operation completes or
+ /// is canceled, the [onValue], [onError], and [onCancel] callbacks will not
+ /// be invoked. If [propagateCancel] is `true` (the default) then this
+ /// operation is canceled as well. Pass `false` if there are multiple
+ /// listeners on this operation and canceling the [onValue], [onError], and
+ /// [onCancel] callbacks should not cancel the other listeners.
+ CancelableOperation<R> thenOperation<R>(
+ FutureOr<void> Function(T, CancelableCompleter<R>) onValue,
+ {FutureOr<void> Function(Object, StackTrace, CancelableCompleter<R>)?
+ onError,
+ FutureOr<void> Function(CancelableCompleter<R>)? onCancel,
bool propagateCancel = true}) {
final completer =
CancelableCompleter<R>(onCancel: propagateCancel ? cancel : null);
@@ -176,35 +232,35 @@
// completes before `completer` is cancelled,
// then cancel `cancelCompleter`. (Cancelling twice is safe.)
- _completer._inner?.future.then<void>((value) {
+ _completer._inner?.future.then<void>((value) async {
if (completer.isCanceled) return;
try {
- completer.complete(onValue(value));
+ await onValue(value, completer);
} catch (error, stack) {
completer.completeError(error, stack);
}
},
onError: onError == null
? completer.completeError // Is ignored if already cancelled.
- : (Object error, StackTrace stack) {
+ : (Object error, StackTrace stack) async {
if (completer.isCanceled) return;
try {
- completer.complete(onError(error, stack));
+ await onError(error, stack, completer);
} catch (error2, stack2) {
- completer.completeError(error2, stack2);
+ completer.completeError(
+ error2, identical(error, error2) ? stack : stack2);
}
});
_completer._cancelCompleter?.future.whenComplete(onCancel == null
? completer._cancel
- : () {
+ : () async {
if (completer.isCanceled) return;
try {
- completer.complete(onCancel());
+ await onCancel(completer);
} catch (error, stack) {
completer.completeError(error, stack);
}
});
-
return completer.operation;
}
diff --git a/pubspec.yaml b/pubspec.yaml
index 2275bce..cb6f348 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
name: async
-version: 2.9.0
+version: 2.10.0-dev
description: Utility functions and classes related to the 'dart:async' library.
repository: https://github.com/dart-lang/async
diff --git a/test/cancelable_operation_test.dart b/test/cancelable_operation_test.dart
index 5e56b61..0914ddc 100644
--- a/test/cancelable_operation_test.dart
+++ b/test/cancelable_operation_test.dart
@@ -537,6 +537,234 @@
});
});
+ group('thenOperation', () {
+ late void Function(int, CancelableCompleter<String>) onValue;
+ void Function(Object, StackTrace, CancelableCompleter<String>)? onError;
+ void Function(CancelableCompleter<String>)? onCancel;
+ late bool propagateCancel;
+ late CancelableCompleter<int> originalCompleter;
+
+ setUp(() {
+ // Initialize all functions to ones that expect to not be called.
+ onValue = expectAsync2((value, completer) => completer.complete('$value'),
+ count: 0, id: 'onValue');
+ onError = null;
+ onCancel = null;
+ propagateCancel = false;
+ originalCompleter = CancelableCompleter();
+ });
+
+ CancelableOperation<String> runThenOperation() {
+ return originalCompleter.operation.thenOperation(onValue,
+ onError: onError,
+ onCancel: onCancel,
+ propagateCancel: propagateCancel);
+ }
+
+ group('original operation completes successfully', () {
+ test('onValue completes successfully', () {
+ onValue =
+ expectAsync2((v, c) => c.complete('$v'), count: 1, id: 'onValue');
+
+ expect(runThenOperation().value, completion('1'));
+ originalCompleter.complete(1);
+ });
+
+ test('onValue throws error', () {
+ // expectAsync1 only works with functions that do not throw.
+ onValue = (_, __) => throw 'error';
+
+ expect(runThenOperation().value, throwsA('error'));
+ originalCompleter.complete(1);
+ });
+
+ test('onValue completes operation as error', () {
+ onValue = expectAsync2(
+ (_, completer) => completer.completeError('error'),
+ count: 1,
+ id: 'onValue');
+
+ expect(runThenOperation().value, throwsA('error'));
+ originalCompleter.complete(1);
+ });
+
+ test('onValue returns a Future that throws error', () {
+ onValue = expectAsync2((_, completer) => Future.error('error'),
+ count: 1, id: 'onValue');
+
+ expect(runThenOperation().value, throwsA('error'));
+ originalCompleter.complete(1);
+ });
+
+ test('and returned operation is canceled', () async {
+ onValue = expectAsync2((_, __) => throw 'never called', count: 0);
+ runThenOperation().cancel();
+ // onValue should not be called.
+ originalCompleter.complete(1);
+ });
+ });
+
+ group('original operation completes with error', () {
+ test('onError not set', () {
+ onError = null;
+
+ expect(runThenOperation().value, throwsA('error'));
+ originalCompleter.completeError('error');
+ });
+
+ test('onError completes operation', () {
+ onError = expectAsync3((e, s, c) => c.complete('onError caught $e'),
+ count: 1, id: 'onError');
+
+ expect(runThenOperation().value, completion('onError caught error'));
+ originalCompleter.completeError('error');
+ });
+
+ test('onError throws', () {
+ // expectAsync3 does not work with functions that throw.
+ onError = (e, s, c) => throw 'onError caught $e';
+
+ expect(runThenOperation().value, throwsA('onError caught error'));
+ originalCompleter.completeError('error');
+ });
+
+ test('onError returns Future that throws error', () {
+ onError = expectAsync3((e, s, c) => Future.error('onError caught $e'),
+ count: 1, id: 'onError');
+
+ expect(runThenOperation().value, throwsA('onError caught error'));
+ originalCompleter.completeError('error');
+ });
+
+ test('onError completes operation as an error', () {
+ onError = expectAsync3(
+ (e, s, c) => c.completeError('onError caught $e'),
+ count: 1,
+ id: 'onError');
+
+ expect(runThenOperation().value, throwsA('onError caught error'));
+ originalCompleter.completeError('error');
+ });
+
+ test('and returned operation is canceled with propagateCancel = false',
+ () async {
+ onError = expectAsync3((e, s, c) {}, count: 0);
+
+ runThenOperation().cancel();
+
+ // onError should not be called.
+ originalCompleter.completeError('error');
+ });
+ });
+
+ group('original operation canceled', () {
+ test('onCancel not set', () async {
+ onCancel = null;
+
+ final operation = runThenOperation();
+
+ await expectLater(originalCompleter.operation.cancel(), completes);
+ expect(operation.isCanceled, true);
+ });
+
+ test('onCancel completes successfully', () {
+ onCancel = expectAsync1((c) => c.complete('canceled'),
+ count: 1, id: 'onCancel');
+
+ expect(runThenOperation().value, completion('canceled'));
+ originalCompleter.operation.cancel();
+ });
+
+ test('onCancel throws error', () {
+ // expectAsync0 only works with functions that do not throw.
+ onCancel = (_) => throw 'error';
+
+ expect(runThenOperation().value, throwsA('error'));
+ originalCompleter.operation.cancel();
+ });
+
+ test('onCancel completes operation as error', () {
+ onCancel = expectAsync1((c) => c.completeError('error'),
+ count: 1, id: 'onCancel');
+
+ expect(runThenOperation().value, throwsA('error'));
+ originalCompleter.operation.cancel();
+ });
+
+ test('onCancel returns Future that throws error', () {
+ onCancel = expectAsync1((c) => Future.error('error'),
+ count: 1, id: 'onCancel');
+
+ expect(runThenOperation().value, throwsA('error'));
+ originalCompleter.operation.cancel();
+ });
+
+ test('after completing with a future does not invoke `onValue`',
+ () async {
+ onValue = expectAsync2((_, __) {}, count: 0);
+ onCancel = null;
+ var operation = runThenOperation();
+ var workCompleter = Completer<int>();
+ originalCompleter.complete(workCompleter.future);
+ var cancelation = originalCompleter.operation.cancel();
+ expect(originalCompleter.isCanceled, true);
+ workCompleter.complete(0);
+ await cancelation;
+ expect(operation.isCanceled, true);
+ await workCompleter.future;
+ });
+
+ test('after the value is completed invokes `onValue`', () {
+ onValue = expectAsync2((v, c) => c.complete('foo'), count: 1);
+ onCancel = expectAsync1((_) {}, count: 0);
+ originalCompleter.complete(0);
+ originalCompleter.operation.cancel();
+ var operation = runThenOperation();
+ expect(operation.value, completion('foo'));
+ expect(operation.isCanceled, false);
+ });
+ });
+
+ group('returned operation canceled', () {
+ test('propagateCancel is true', () async {
+ propagateCancel = true;
+
+ await runThenOperation().cancel();
+
+ expect(originalCompleter.isCanceled, true);
+ });
+
+ test('propagateCancel is false', () async {
+ propagateCancel = false;
+
+ await runThenOperation().cancel();
+
+ expect(originalCompleter.isCanceled, false);
+ });
+
+ test('onValue callback not called after cancel', () async {
+ onValue = expectAsync2((_, c) {}, count: 0);
+
+ await runThenOperation().cancel();
+ originalCompleter.complete(0);
+ });
+
+ test('onError callback not called after cancel', () async {
+ onError = expectAsync3((_, __, ___) {}, count: 0);
+
+ await runThenOperation().cancel();
+ originalCompleter.completeError("Error", StackTrace.empty);
+ });
+
+ test('onCancel callback not called after cancel', () async {
+ onCancel = expectAsync1((_) {}, count: 0);
+
+ await runThenOperation().cancel();
+ await originalCompleter.operation.cancel();
+ });
+ });
+ });
+
group('race()', () {
late bool canceled1;
late CancelableCompleter<int> completer1;