Add CancelableOperation.valueOrCancellation.

R=lrn@google.com

Review URL: https://codereview.chromium.org//1561323002 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7e7ed4a..6db6f36 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,9 @@
-## 1.4.0
+## 1.6.0
+
+- Added `CancelableOperation.valueOrCancellation()`, which allows users to be
+  notified when an operation is canceled elsewhere.
+
+## 1.5.0
 
 - Added `LazyStream`, which forwards to the return value of a callback that's
   only called when the stream is listened to.
diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart
index 89889a3..a48c94f 100644
--- a/lib/src/cancelable_operation.dart
+++ b/lib/src/cancelable_operation.dart
@@ -57,6 +57,24 @@
     return controller.stream;
   }
 
+  /// Creates a [Future] that completes when this operation completes *or* when
+  /// it's cancelled.
+  ///
+  /// If this operation completes, this completes to the same result as [value].
+  /// If this operation is cancelled, the returned future waits for the future
+  /// returned by [cancel], then completes to [cancellationValue].
+  Future valueOrCancellation([T cancellationValue]) {
+    var completer = new Completer.sync();
+
+    value.then(completer.complete, onError: completer.completeError);
+
+    _completer._cancelMemo.future.then((_) {
+      completer.complete(cancellationValue);
+    }, onError: completer.completeError);
+
+    return completer.future;
+  }
+
   /// Cancels this operation.
   ///
   /// This returns the [Future] returned by the [CancelableCompleter]'s
@@ -140,9 +158,12 @@
   }
 
   /// Cancel the completer.
-  Future _cancel() => _cancelMemo.runOnce(() {
-    if (_inner.isCompleted) return null;
-    _isCanceled = true;
-    if (_onCancel != null) return _onCancel();
-  });
+  Future _cancel() {
+    if (_inner.isCompleted) return new Future.value();
+
+    return _cancelMemo.runOnce(() {
+      _isCanceled = true;
+      if (_onCancel != null) return _onCancel();
+    });
+  }
 }
diff --git a/pubspec.yaml b/pubspec.yaml
index e98c59e..16e2ea2 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -1,5 +1,5 @@
 name: async
-version: 1.5.0
+version: 1.6.0-dev
 author: Dart Team <misc@dartlang.org>
 description: Utility functions and classes related to the 'dart:async' library.
 homepage: https://www.github.com/dart-lang/async
diff --git a/test/cancelable_operation_test.dart b/test/cancelable_operation_test.dart
index 189c073..4d8c295 100644
--- a/test/cancelable_operation_test.dart
+++ b/test/cancelable_operation_test.dart
@@ -45,6 +45,16 @@
       expect(completer.isCompleted, isTrue);
     });
 
+    test("sends values to valueOrCancellation", () {
+      expect(completer.operation.valueOrCancellation(), completion(equals(1)));
+      completer.complete(1);
+    });
+
+    test("sends errors to valueOrCancellation", () {
+      expect(completer.operation.valueOrCancellation(), throwsA("error"));
+      completer.completeError("error");
+    });
+
     group("throws a StateError if completed", () {
       test("successfully twice", () {
         completer.complete(1);
@@ -168,6 +178,38 @@
       completer.complete(1);
       expect(() => completer.complete(1), throwsStateError);
     });
+
+    test("fires valueOrCancellation with the given value", () {
+      var completer = new CancelableCompleter();
+      expect(completer.operation.valueOrCancellation(1), completion(equals(1)));
+      completer.operation.cancel();
+    });
+
+    test("pipes an error through valueOrCancellation", () {
+      var completer = new CancelableCompleter(onCancel: () {
+        throw "error";
+      });
+      expect(completer.operation.valueOrCancellation(1), throwsA("error"));
+      completer.operation.cancel();
+    });
+
+    test("valueOrCancellation waits on the onCancel future", () async {
+      var innerCompleter = new Completer();
+      var completer = new CancelableCompleter(onCancel: () => innerCompleter.future);
+
+      var fired = false;
+      completer.operation.valueOrCancellation().then((_) {
+        fired = true;
+      });
+
+      completer.operation.cancel();
+      await flushMicrotasks();
+      expect(fired, isFalse);
+
+      innerCompleter.complete();
+      await flushMicrotasks();
+      expect(fired, isTrue);
+    });
   });
 
   group("asStream()", () {