Add CancelableOperation.thenOperation (#211)

Towards #210

Once combined with `Completer.completeOperation` this signature adds
significant flexibility for chaining Cancelable work following other
async work.

Move the existing `.then` implementation to `.thenOperation` since the latter
is more general.

Co-authored-by: Lasse R.H. Nielsen <lrn@google.com>
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b5b5c9e..38f8465 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+## 2.10.0-dev
+
+* Add `CancelableOperation.thenOperation` which gives more flexibility to
+  complete the resulting operation.
+
 ## 2.9.0
 
 * **Potentially Breaking** The default `propagateCancel` argument to
diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart
index 6ee350e..ee9ff9d 100644
--- a/lib/src/cancelable_operation.dart
+++ b/lib/src/cancelable_operation.dart
@@ -133,7 +133,7 @@
   /// Creates a new cancelable operation to be completed when this operation
   /// completes normally or as an error, or is cancelled.
   ///
-  /// If this operation completes normally the [value] is passed to [onValue]
+  /// If this operation completes normally the value is passed to [onValue]
   /// and the returned operation is completed with the result.
   ///
   /// If this operation completes as an error, and no [onError] callback is
@@ -147,6 +147,11 @@
   /// If this operation is canceled, and an [onCancel] callback is provided,
   /// the returned operation is completed with the result.
   ///
+  /// At most one of [onValue], [onError], or [onCancel] will be called.
+  /// If any of [onValue], [onError], or [onCancel] throw a synchronous error,
+  /// or return a `Future` that completes as an error, the error will be
+  /// forwarded through the returned operation.
+  ///
   /// If the returned operation is canceled before this operation completes or
   /// is canceled, the [onValue], [onError], and [onCancel] callbacks will not
   /// be invoked. If [propagateCancel] is `true` (the default) then this
@@ -154,8 +159,59 @@
   /// listeners on this operation and canceling the [onValue], [onError], and
   /// [onCancel] callbacks should not cancel the other listeners.
   CancelableOperation<R> then<R>(FutureOr<R> Function(T) onValue,
-      {FutureOr<R> Function(Object, StackTrace)? onError,
-      FutureOr<R> Function()? onCancel,
+          {FutureOr<R> Function(Object, StackTrace)? onError,
+          FutureOr<R> Function()? onCancel,
+          bool propagateCancel = true}) =>
+      thenOperation<R>((value, completer) {
+        completer.complete(onValue(value));
+      },
+          onError: onError == null
+              ? null
+              : (error, stackTrace, completer) {
+                  completer.complete(onError(error, stackTrace));
+                },
+          onCancel: onCancel == null
+              ? null
+              : (completer) {
+                  completer.complete(onCancel());
+                },
+          propagateCancel: propagateCancel);
+
+  /// Creates a new cancelable operation to be completed when this operation
+  /// completes normally or as an error, or is cancelled.
+  ///
+  /// If this operation completes normally the value is passed to [onValue]
+  /// with a [CancelableCompleter] controlling the returned operation.
+  ///
+  /// If this operation completes as an error, and no [onError] callback is
+  /// provided, the returned operation is completed with the same error and
+  /// stack trace.
+  /// If this operation completes as an error, and an [onError] callback is
+  /// provided, the error and stack trace are passed to [onError] with a
+  /// [CancelableCompleter] controlling the returned operation.
+  ///
+  /// If this operation is canceled, and no [onCancel] callback is provided,
+  /// the returned operation is canceled.
+  /// If this operation is canceled, and an [onCancel] callback is provided,
+  /// the [onCancel] callback is called with a [CancelableCompleter] controlling
+  /// the returned operation.
+  ///
+  /// At most one of [onValue], [onError], or [onCancel] will be called.
+  /// If any of [onValue], [onError], or [onCancel] throw a synchronous error,
+  /// or return a `Future` that completes as an error, the error will be
+  /// forwarded through the returned operation.
+  ///
+  /// If the returned operation is canceled before this operation completes or
+  /// is canceled, the [onValue], [onError], and [onCancel] callbacks will not
+  /// be invoked. If [propagateCancel] is `true` (the default) then this
+  /// operation is canceled as well. Pass `false` if there are multiple
+  /// listeners on this operation and canceling the [onValue], [onError], and
+  /// [onCancel] callbacks should not cancel the other listeners.
+  CancelableOperation<R> thenOperation<R>(
+      FutureOr<void> Function(T, CancelableCompleter<R>) onValue,
+      {FutureOr<void> Function(Object, StackTrace, CancelableCompleter<R>)?
+          onError,
+      FutureOr<void> Function(CancelableCompleter<R>)? onCancel,
       bool propagateCancel = true}) {
     final completer =
         CancelableCompleter<R>(onCancel: propagateCancel ? cancel : null);
@@ -176,35 +232,35 @@
     // completes before `completer` is cancelled,
     // then cancel `cancelCompleter`. (Cancelling twice is safe.)
 
-    _completer._inner?.future.then<void>((value) {
+    _completer._inner?.future.then<void>((value) async {
       if (completer.isCanceled) return;
       try {
-        completer.complete(onValue(value));
+        await onValue(value, completer);
       } catch (error, stack) {
         completer.completeError(error, stack);
       }
     },
         onError: onError == null
             ? completer.completeError // Is ignored if already cancelled.
-            : (Object error, StackTrace stack) {
+            : (Object error, StackTrace stack) async {
                 if (completer.isCanceled) return;
                 try {
-                  completer.complete(onError(error, stack));
+                  await onError(error, stack, completer);
                 } catch (error2, stack2) {
-                  completer.completeError(error2, stack2);
+                  completer.completeError(
+                      error2, identical(error, error2) ? stack : stack2);
                 }
               });
     _completer._cancelCompleter?.future.whenComplete(onCancel == null
         ? completer._cancel
-        : () {
+        : () async {
             if (completer.isCanceled) return;
             try {
-              completer.complete(onCancel());
+              await onCancel(completer);
             } catch (error, stack) {
               completer.completeError(error, stack);
             }
           });
-
     return completer.operation;
   }
 
diff --git a/pubspec.yaml b/pubspec.yaml
index 2275bce..cb6f348 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 2.9.0
+version: 2.10.0-dev
 
 description: Utility functions and classes related to the 'dart:async' library.
 repository: https://github.com/dart-lang/async
diff --git a/test/cancelable_operation_test.dart b/test/cancelable_operation_test.dart
index 5e56b61..0914ddc 100644
--- a/test/cancelable_operation_test.dart
+++ b/test/cancelable_operation_test.dart
@@ -537,6 +537,234 @@
     });
   });
 
+  group('thenOperation', () {
+    late void Function(int, CancelableCompleter<String>) onValue;
+    void Function(Object, StackTrace, CancelableCompleter<String>)? onError;
+    void Function(CancelableCompleter<String>)? onCancel;
+    late bool propagateCancel;
+    late CancelableCompleter<int> originalCompleter;
+
+    setUp(() {
+      // Initialize all functions to ones that expect to not be called.
+      onValue = expectAsync2((value, completer) => completer.complete('$value'),
+          count: 0, id: 'onValue');
+      onError = null;
+      onCancel = null;
+      propagateCancel = false;
+      originalCompleter = CancelableCompleter();
+    });
+
+    CancelableOperation<String> runThenOperation() {
+      return originalCompleter.operation.thenOperation(onValue,
+          onError: onError,
+          onCancel: onCancel,
+          propagateCancel: propagateCancel);
+    }
+
+    group('original operation completes successfully', () {
+      test('onValue completes successfully', () {
+        onValue =
+            expectAsync2((v, c) => c.complete('$v'), count: 1, id: 'onValue');
+
+        expect(runThenOperation().value, completion('1'));
+        originalCompleter.complete(1);
+      });
+
+      test('onValue throws error', () {
+        // expectAsync1 only works with functions that do not throw.
+        onValue = (_, __) => throw 'error';
+
+        expect(runThenOperation().value, throwsA('error'));
+        originalCompleter.complete(1);
+      });
+
+      test('onValue completes operation as error', () {
+        onValue = expectAsync2(
+            (_, completer) => completer.completeError('error'),
+            count: 1,
+            id: 'onValue');
+
+        expect(runThenOperation().value, throwsA('error'));
+        originalCompleter.complete(1);
+      });
+
+      test('onValue returns a Future that throws error', () {
+        onValue = expectAsync2((_, completer) => Future.error('error'),
+            count: 1, id: 'onValue');
+
+        expect(runThenOperation().value, throwsA('error'));
+        originalCompleter.complete(1);
+      });
+
+      test('and returned operation is canceled', () async {
+        onValue = expectAsync2((_, __) => throw 'never called', count: 0);
+        runThenOperation().cancel();
+        // onValue should not be called.
+        originalCompleter.complete(1);
+      });
+    });
+
+    group('original operation completes with error', () {
+      test('onError not set', () {
+        onError = null;
+
+        expect(runThenOperation().value, throwsA('error'));
+        originalCompleter.completeError('error');
+      });
+
+      test('onError completes operation', () {
+        onError = expectAsync3((e, s, c) => c.complete('onError caught $e'),
+            count: 1, id: 'onError');
+
+        expect(runThenOperation().value, completion('onError caught error'));
+        originalCompleter.completeError('error');
+      });
+
+      test('onError throws', () {
+        // expectAsync3 does not work with functions that throw.
+        onError = (e, s, c) => throw 'onError caught $e';
+
+        expect(runThenOperation().value, throwsA('onError caught error'));
+        originalCompleter.completeError('error');
+      });
+
+      test('onError returns Future that throws error', () {
+        onError = expectAsync3((e, s, c) => Future.error('onError caught $e'),
+            count: 1, id: 'onError');
+
+        expect(runThenOperation().value, throwsA('onError caught error'));
+        originalCompleter.completeError('error');
+      });
+
+      test('onError completes operation as an error', () {
+        onError = expectAsync3(
+            (e, s, c) => c.completeError('onError caught $e'),
+            count: 1,
+            id: 'onError');
+
+        expect(runThenOperation().value, throwsA('onError caught error'));
+        originalCompleter.completeError('error');
+      });
+
+      test('and returned operation is canceled with propagateCancel = false',
+          () async {
+        onError = expectAsync3((e, s, c) {}, count: 0);
+
+        runThenOperation().cancel();
+
+        // onError should not be called.
+        originalCompleter.completeError('error');
+      });
+    });
+
+    group('original operation canceled', () {
+      test('onCancel not set', () async {
+        onCancel = null;
+
+        final operation = runThenOperation();
+
+        await expectLater(originalCompleter.operation.cancel(), completes);
+        expect(operation.isCanceled, true);
+      });
+
+      test('onCancel completes successfully', () {
+        onCancel = expectAsync1((c) => c.complete('canceled'),
+            count: 1, id: 'onCancel');
+
+        expect(runThenOperation().value, completion('canceled'));
+        originalCompleter.operation.cancel();
+      });
+
+      test('onCancel throws error', () {
+        // expectAsync0 only works with functions that do not throw.
+        onCancel = (_) => throw 'error';
+
+        expect(runThenOperation().value, throwsA('error'));
+        originalCompleter.operation.cancel();
+      });
+
+      test('onCancel completes operation as error', () {
+        onCancel = expectAsync1((c) => c.completeError('error'),
+            count: 1, id: 'onCancel');
+
+        expect(runThenOperation().value, throwsA('error'));
+        originalCompleter.operation.cancel();
+      });
+
+      test('onCancel returns Future that throws error', () {
+        onCancel = expectAsync1((c) => Future.error('error'),
+            count: 1, id: 'onCancel');
+
+        expect(runThenOperation().value, throwsA('error'));
+        originalCompleter.operation.cancel();
+      });
+
+      test('after completing with a future does not invoke `onValue`',
+          () async {
+        onValue = expectAsync2((_, __) {}, count: 0);
+        onCancel = null;
+        var operation = runThenOperation();
+        var workCompleter = Completer<int>();
+        originalCompleter.complete(workCompleter.future);
+        var cancelation = originalCompleter.operation.cancel();
+        expect(originalCompleter.isCanceled, true);
+        workCompleter.complete(0);
+        await cancelation;
+        expect(operation.isCanceled, true);
+        await workCompleter.future;
+      });
+
+      test('after the value is completed invokes `onValue`', () {
+        onValue = expectAsync2((v, c) => c.complete('foo'), count: 1);
+        onCancel = expectAsync1((_) {}, count: 0);
+        originalCompleter.complete(0);
+        originalCompleter.operation.cancel();
+        var operation = runThenOperation();
+        expect(operation.value, completion('foo'));
+        expect(operation.isCanceled, false);
+      });
+    });
+
+    group('returned operation canceled', () {
+      test('propagateCancel is true', () async {
+        propagateCancel = true;
+
+        await runThenOperation().cancel();
+
+        expect(originalCompleter.isCanceled, true);
+      });
+
+      test('propagateCancel is false', () async {
+        propagateCancel = false;
+
+        await runThenOperation().cancel();
+
+        expect(originalCompleter.isCanceled, false);
+      });
+
+      test('onValue callback not called after cancel', () async {
+        onValue = expectAsync2((_, c) {}, count: 0);
+
+        await runThenOperation().cancel();
+        originalCompleter.complete(0);
+      });
+
+      test('onError callback not called after cancel', () async {
+        onError = expectAsync3((_, __, ___) {}, count: 0);
+
+        await runThenOperation().cancel();
+        originalCompleter.completeError("Error", StackTrace.empty);
+      });
+
+      test('onCancel callback not called after cancel', () async {
+        onCancel = expectAsync1((_) {}, count: 0);
+
+        await runThenOperation().cancel();
+        await originalCompleter.operation.cancel();
+      });
+    });
+  });
+
   group('race()', () {
     late bool canceled1;
     late CancelableCompleter<int> completer1;