Implement CancelableOperation.then (#83)

* Implement CancelableOperation.then

* Update version to 2.2.0
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3d543da..8436cd4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,7 @@
+## 2.2.0
+
+* Add `then` to `CancelableOperation`.
+
 ## 2.1.0
 
 * Fix `CancelableOperation.valueOrCancellation`'s type signature
diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart
index faf9076..94601c9 100644
--- a/lib/src/cancelable_operation.dart
+++ b/lib/src/cancelable_operation.dart
@@ -75,6 +75,47 @@
     return completer.future;
   }
 
+  /// Registers callbacks to be called when this operation completes.
+  ///
+  /// [onValue] and [onError] behave in the same way as [Future.then].
+  ///
+  /// If [onCancel] is provided, and this operation is canceled, the [onCancel]
+  /// callback is called and the returned operation completes with the result.
+  ///
+  /// If [onCancel] is not given, and this operation is canceled, then the
+  /// returned operation is canceled.
+  ///
+  /// If [propagateCancel] is `true` and the returned operation is canceled then
+  /// this operation is canceled. The default is `false`.
+  CancelableOperation<R> then<R>(FutureOr<R> Function(T) onValue,
+      {FutureOr<R> Function(Object, StackTrace) onError,
+      FutureOr<R> Function() onCancel,
+      bool propagateCancel = false}) {
+    final completer =
+        CancelableCompleter<R>(onCancel: propagateCancel ? cancel : null);
+
+    valueOrCancellation().then((T result) {
+      if (!completer.isCanceled) {
+        if (isCompleted) {
+          completer.complete(Future.sync(() => onValue(result)));
+        } else if (onCancel != null) {
+          completer.complete(Future.sync(onCancel));
+        } else {
+          completer._cancel();
+        }
+      }
+    }, onError: (error, stackTrace) {
+      if (!completer.isCanceled) {
+        if (onError != null) {
+          completer.complete(Future.sync(() => onError(error, stackTrace)));
+        } else {
+          completer.completeError(error, stackTrace);
+        }
+      }
+    });
+    return completer.operation;
+  }
+
   /// Cancels this operation.
   ///
   /// This returns the [Future] returned by the [CancelableCompleter]'s
diff --git a/pubspec.yaml b/pubspec.yaml
index 0b79178..1b2e96e 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 2.1.0
+version: 2.2.0
 
 description: Utility functions and classes related to the 'dart:async' library.
 author: Dart Team <misc@dartlang.org>
diff --git a/test/cancelable_operation_test.dart b/test/cancelable_operation_test.dart
index 384a023..a4c9308 100644
--- a/test/cancelable_operation_test.dart
+++ b/test/cancelable_operation_test.dart
@@ -241,4 +241,158 @@
       expect(completer.isCanceled, isTrue);
     });
   });
+
+  group("then", () {
+    FutureOr<String> Function(int) onValue;
+    FutureOr<String> Function(Object, StackTrace) onError;
+    FutureOr<String> Function() onCancel;
+    bool propagateCancel;
+    CancelableCompleter<int> originalCompleter;
+
+    setUp(() {
+      // Initialize all functions to ones that expect to not be called.
+      onValue = expectAsync1((_) {}, count: 0, id: "onValue");
+      onError = expectAsync2((e, s) {}, count: 0, id: "onError");
+      onCancel = expectAsync0(() {}, count: 0, id: "onCancel");
+      propagateCancel = false;
+    });
+
+    CancelableOperation<String> runThen() {
+      originalCompleter = CancelableCompleter();
+      return originalCompleter.operation.then(onValue,
+          onError: onError,
+          onCancel: onCancel,
+          propagateCancel: propagateCancel);
+    }
+
+    group("original operation completes successfully", () {
+      test("onValue completes successfully", () {
+        onValue = expectAsync1((v) => v.toString(), count: 1, id: "onValue");
+
+        expect(runThen().value, completion("1"));
+        originalCompleter.complete(1);
+      });
+
+      test("onValue throws error", () {
+        // expectAsync1 only works with functions that do not throw.
+        onValue = (_) => throw "error";
+
+        expect(runThen().value, throwsA("error"));
+        originalCompleter.complete(1);
+      });
+
+      test("onValue returns Future that throws error", () {
+        onValue =
+            expectAsync1((v) => Future.error("error"), count: 1, id: "onValue");
+
+        expect(runThen().value, throwsA("error"));
+        originalCompleter.complete(1);
+      });
+
+      test("and returned operation is canceled with propagateCancel = false",
+          () async {
+        propagateCancel = false;
+
+        runThen().cancel();
+
+        // onValue should not be called.
+        originalCompleter.complete(1);
+      });
+    });
+
+    group("original operation completes with error", () {
+      test("onError not set", () {
+        onError = null;
+
+        expect(runThen().value, throwsA("error"));
+        originalCompleter.completeError("error");
+      });
+
+      test("onError completes successfully", () {
+        onError = expectAsync2((e, s) => "onError caught $e",
+            count: 1, id: "onError");
+
+        expect(runThen().value, completion("onError caught error"));
+        originalCompleter.completeError("error");
+      });
+
+      test("onError throws", () {
+        // expectAsync2 does not work with functions that throw.
+        onError = (e, s) => throw "onError caught $e";
+
+        expect(runThen().value, throwsA("onError caught error"));
+        originalCompleter.completeError("error");
+      });
+
+      test("onError returns Future that throws", () {
+        onError = expectAsync2((e, s) => Future.error("onError caught $e"),
+            count: 1, id: "onError");
+
+        expect(runThen().value, throwsA("onError caught error"));
+        originalCompleter.completeError("error");
+      });
+
+      test("and returned operation is canceled with propagateCancel = false",
+          () async {
+        propagateCancel = false;
+
+        runThen().cancel();
+
+        // onError should not be called.
+        originalCompleter.completeError("error");
+      });
+    });
+
+    group("original operation canceled", () {
+      test("onCancel not set", () {
+        onCancel = null;
+
+        final operation = runThen();
+
+        expect(originalCompleter.operation.cancel(), completes);
+        expect(operation.isCanceled, true);
+      });
+
+      test("onCancel completes successfully", () {
+        onCancel = expectAsync0(() => "canceled", count: 1, id: "onCancel");
+
+        expect(runThen().value, completion("canceled"));
+        originalCompleter.operation.cancel();
+      });
+
+      test("onCancel throws error", () {
+        // expectAsync0 only works with functions that do not throw.
+        onCancel = () => throw "error";
+
+        expect(runThen().value, throwsA("error"));
+        originalCompleter.operation.cancel();
+      });
+
+      test("onCancel returns Future that throws error", () {
+        onCancel =
+            expectAsync0(() => Future.error("error"), count: 1, id: "onCancel");
+
+        expect(runThen().value, throwsA("error"));
+        originalCompleter.operation.cancel();
+      });
+    });
+
+    group("returned operation canceled", () {
+      test("propagateCancel is true", () async {
+        propagateCancel = true;
+
+        await runThen().cancel();
+
+        expect(originalCompleter.isCanceled, true);
+      });
+
+      test("propagateCancel is false", () async {
+        propagateCancel = false;
+
+        await runThen().cancel();
+
+        expect(originalCompleter.isCanceled, false);
+      });
+    });
+  });
 }