Add CancelableCompleter.completeOperation (#215)

Towards #210

Combined with `CancelableOperation.thenOperation` this allows chaining
cancelable work that can be canceled at multiple points.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 38f8465..6e2add9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,7 @@
 
 * Add `CancelableOperation.thenOperation` which gives more flexibility to
   complete the resulting operation.
+* Add `CancelableCompleter.completeOperation`.
 
 ## 2.9.0
 
diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart
index ee9ff9d..65ecabc 100644
--- a/lib/src/cancelable_operation.dart
+++ b/lib/src/cancelable_operation.dart
@@ -418,6 +418,34 @@
     });
   }
 
+  /// Makes this [CancelableCompleter.operation] complete with the same result
+  /// as [result].
+  ///
+  /// If [propagateCancel] is `true` (the default), and the [operation] of this
+  /// completer is canceled before [result] completes, then [result] is also
+  /// canceled.
+  void completeOperation(CancelableOperation<T> result,
+      {bool propagateCancel = true}) {
+    if (!_mayComplete) throw StateError("Already completed");
+    _mayComplete = false;
+    if (isCanceled) {
+      if (propagateCancel) result.cancel();
+      result.value.ignore();
+      return;
+    }
+    result.then<void>((value) {
+      _inner?.complete(
+          value); // _inner is set to null if this.operation is cancelled.
+    }, onError: (error, stack) {
+      _inner?.completeError(error, stack);
+    }, onCancel: () {
+      operation.cancel();
+    });
+    if (propagateCancel) {
+      _cancelCompleter?.future.whenComplete(result.cancel);
+    }
+  }
+
   /// Completer to use for completing with a result.
   ///
   /// Returns `null` if it's not possible to complete any more.
diff --git a/test/cancelable_operation_test.dart b/test/cancelable_operation_test.dart
index 0914ddc..f073e80 100644
--- a/test/cancelable_operation_test.dart
+++ b/test/cancelable_operation_test.dart
@@ -54,6 +54,39 @@
       expect(completer.operation.isCompleted, isTrue);
     });
 
+    test('sends values from a cancelable operation to the future', () {
+      expect(completer.operation.value, completion(equals(1)));
+      completer
+          .completeOperation(CancelableOperation.fromFuture(Future.value(1)));
+    });
+
+    test('sends values from a completed cancelable operation to the future',
+        () async {
+      final operation = CancelableOperation.fromFuture(Future.value(1));
+      await operation.value;
+      expect(completer.operation.value, completion(equals(1)));
+      completer.completeOperation(operation);
+    });
+
+    test('sends errors from a cancelable operation to the future', () {
+      expect(completer.operation.value, throwsA('error'));
+      completer.completeOperation(
+          CancelableOperation.fromFuture(Future.error('error')..ignore()));
+    });
+
+    test('sends errors from a completed cancelable operation to the future',
+        () async {
+      final operation =
+          CancelableOperation.fromFuture(Future.error('error')..ignore());
+      try {
+        await operation.value;
+      } on Object {
+        // ignore
+      }
+      expect(completer.operation.value, throwsA('error'));
+      completer.completeOperation(operation);
+    });
+
     test('sends values to valueOrCancellation', () {
       expect(completer.operation.valueOrCancellation(), completion(equals(1)));
       completer.complete(1);
@@ -292,6 +325,64 @@
       await flushMicrotasks();
       expect(cancelCompleted, isTrue);
     });
+
+    group('completeOperation', () {
+      test('sends cancellation from a cancelable operation', () async {
+        final completer = CancelableCompleter<void>();
+        completer.operation.value.whenComplete(expectAsync0(() {}, count: 0));
+        completer
+            .completeOperation(CancelableCompleter<void>().operation..cancel());
+        await completer.operation.valueOrCancellation();
+        expect(completer.operation.isCanceled, true);
+      });
+
+      test('sends errors from a completed cancelable operation to the future',
+          () async {
+        final operation = CancelableCompleter<void>().operation..cancel();
+        await operation.valueOrCancellation();
+        final completer = CancelableCompleter<void>();
+        completer.operation.value.whenComplete(expectAsync0(() {}, count: 0));
+        completer.completeOperation(operation);
+        await completer.operation.valueOrCancellation();
+        expect(completer.operation.isCanceled, true);
+      });
+
+      test('propagates cancellation', () {
+        final completer = CancelableCompleter<void>();
+        final operation =
+            CancelableCompleter<void>(onCancel: expectAsync0(() {}, count: 1))
+                .operation;
+        completer.completeOperation(operation);
+        completer.operation.cancel();
+      });
+
+      test('propagates cancellation from already canceld completer', () async {
+        final completer = CancelableCompleter<void>()..operation.cancel();
+        await completer.operation.valueOrCancellation();
+        final operation =
+            CancelableCompleter<void>(onCancel: expectAsync0(() {}, count: 1))
+                .operation;
+        completer.completeOperation(operation);
+      });
+      test('cancel propagation can be disabled', () {
+        final completer = CancelableCompleter<void>();
+        final operation =
+            CancelableCompleter<void>(onCancel: expectAsync0(() {}, count: 0))
+                .operation;
+        completer.completeOperation(operation, propagateCancel: false);
+        completer.operation.cancel();
+      });
+
+      test('cancel propagation can be disabled from already canceled completed',
+          () async {
+        final completer = CancelableCompleter<void>()..operation.cancel();
+        await completer.operation.valueOrCancellation();
+        final operation =
+            CancelableCompleter<void>(onCancel: expectAsync0(() {}, count: 0))
+                .operation;
+        completer.completeOperation(operation, propagateCancel: false);
+      });
+    });
   });
 
   group('asStream()', () {