Add additional CancelableOperation utilities (#194)

* Add CancelableOperation.fromSubscription

* Add CancelableOperation.race()
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9acccf9..815c523 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,10 @@
 
 * Add `StreamExtensions.firstOrNull`.
 
+* Add a `CancelableOperation.fromSubscription()` static factory.
+
+* Add a `CancelableOperation.race()` static method.
+
 ## 2.8.2
 
 * Deprecate `EventSinkBase`, `StreamSinkBase`, `IOSinkBase`.
diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart
index e04af90..f03f672 100644
--- a/lib/src/cancelable_operation.dart
+++ b/lib/src/cancelable_operation.dart
@@ -39,6 +39,62 @@
     return completer.operation;
   }
 
+  /// Creates a [CancelableOperation] wrapping [subscription].
+  ///
+  /// This overrides [subscription.onDone] and [subscription.onError] so that
+  /// the returned operation will complete when the subscription completes or
+  /// emits an error. When this operation is canceled or when it emits an error,
+  /// the subscription will be canceled (unlike
+  /// `CancelableOperation.fromFuture(subscription.asFuture())`).
+  static CancelableOperation<void> fromSubscription(
+      StreamSubscription<void> subscription) {
+    var completer = CancelableCompleter<void>(onCancel: subscription.cancel);
+    subscription.onDone(completer.complete);
+    subscription.onError((Object error, StackTrace stackTrace) {
+      subscription.cancel().whenComplete(() {
+        completer.completeError(error, stackTrace);
+      });
+    });
+    return completer.operation;
+  }
+
+  /// Returns a [CancelableOperation] that completes with the value of the first
+  /// of [operations] to complete.
+  ///
+  /// Once any of [operations] completes, its result is forwarded to the
+  /// returned [CancelableOperation] and the rest are cancelled. When the
+  /// returned operation is cancelled, all the [operations] are cancelled as
+  /// well.
+  static CancelableOperation<T> race<T>(
+      Iterable<CancelableOperation<T>> operations) {
+    operations = operations.toList();
+    if (operations.isEmpty) {
+      throw ArgumentError.value("May not be empty", "operations");
+    }
+
+    var done = false;
+    // Note: if one of the completers has already completed, it's not actually
+    // cancelled by this.
+    Future<void> _cancelAll() {
+      done = true;
+      return Future.wait(operations.map((operation) => operation.cancel()));
+    }
+
+    var completer = CancelableCompleter<T>(onCancel: _cancelAll);
+    for (var operation in operations) {
+      operation.then((value) {
+        if (!done) completer.complete(_cancelAll().then((_) => value));
+      }, onError: (error, stackTrace) {
+        if (!done) {
+          completer.complete(
+              _cancelAll().then((_) => Future.error(error, stackTrace)));
+        }
+      });
+    }
+
+    return completer.operation;
+  }
+
   /// The value returned by the operation.
   Future<T> get value => _completer._inner.future;
 
diff --git a/test/cancelable_operation_test.dart b/test/cancelable_operation_test.dart
index a46dfe2..fa41001 100644
--- a/test/cancelable_operation_test.dart
+++ b/test/cancelable_operation_test.dart
@@ -122,6 +122,30 @@
         expect(operation.value, throwsA('error'));
       });
     });
+
+    group('CancelableOperation.fromSubscription', () {
+      test('forwards a done event once it completes', () async {
+        var controller = StreamController<void>();
+        var operationCompleted = false;
+        CancelableOperation.fromSubscription(controller.stream.listen(null))
+            .then((_) {
+          operationCompleted = true;
+        });
+
+        await flushMicrotasks();
+        expect(operationCompleted, isFalse);
+
+        controller.close();
+        await flushMicrotasks();
+        expect(operationCompleted, isTrue);
+      });
+
+      test('forwards errors', () {
+        var operation = CancelableOperation.fromSubscription(
+            Stream.error('error').listen(null));
+        expect(operation.value, throwsA('error'));
+      });
+    });
   });
 
   group('when canceled', () {
@@ -237,6 +261,37 @@
       await flushMicrotasks();
       expect(fired, isTrue);
     });
+
+    test('CancelableOperation.fromSubscription() cancels the subscription',
+        () async {
+      var cancelCompleter = Completer<void>();
+      var canceled = false;
+      var controller = StreamController<void>(onCancel: () {
+        canceled = true;
+        return cancelCompleter.future;
+      });
+      var operation =
+          CancelableOperation.fromSubscription(controller.stream.listen(null));
+
+      await flushMicrotasks();
+      expect(canceled, isFalse);
+
+      // The `cancel()` call shouldn't complete until
+      // `StreamSubscription.cancel` completes.
+      var cancelCompleted = false;
+      expect(
+          operation.cancel().then((_) {
+            cancelCompleted = true;
+          }),
+          completes);
+      await flushMicrotasks();
+      expect(canceled, isTrue);
+      expect(cancelCompleted, isFalse);
+
+      cancelCompleter.complete();
+      await flushMicrotasks();
+      expect(cancelCompleted, isTrue);
+    });
   });
 
   group('asStream()', () {
@@ -440,4 +495,65 @@
       });
     });
   });
+
+  group('race()', () {
+    late bool canceled1;
+    late CancelableCompleter<int> completer1;
+    late bool canceled2;
+    late CancelableCompleter<int> completer2;
+    late bool canceled3;
+    late CancelableCompleter<int> completer3;
+    late CancelableOperation<int> operation;
+    setUp(() {
+      canceled1 = false;
+      completer1 = CancelableCompleter<int>(onCancel: () {
+        canceled1 = true;
+      });
+
+      canceled2 = false;
+      completer2 = CancelableCompleter<int>(onCancel: () {
+        canceled2 = true;
+      });
+
+      canceled3 = false;
+      completer3 = CancelableCompleter<int>(onCancel: () {
+        canceled3 = true;
+      });
+
+      operation = CancelableOperation.race(
+          [completer1.operation, completer2.operation, completer3.operation]);
+    });
+
+    test('returns the first value to complete', () {
+      completer1.complete(1);
+      completer2.complete(2);
+      completer3.complete(3);
+
+      expect(operation.value, completion(equals(1)));
+    });
+
+    test('throws the first error to complete', () {
+      completer1.completeError("error 1");
+      completer2.completeError("error 2");
+      completer3.completeError("error 3");
+
+      expect(operation.value, throwsA("error 1"));
+    });
+
+    test('cancels any completers that haven\'t completed', () async {
+      completer1.complete(1);
+      await expectLater(operation.value, completion(equals(1)));
+      expect(canceled1, isFalse);
+      expect(canceled2, isTrue);
+      expect(canceled3, isTrue);
+    });
+
+    test('cancels all completers when the operation is completed', () async {
+      await operation.cancel();
+
+      expect(canceled1, isTrue);
+      expect(canceled2, isTrue);
+      expect(canceled3, isTrue);
+    });
+  });
 }