Add additional CancelableOperation utilities (#194)
* Add CancelableOperation.fromSubscription
* Add CancelableOperation.race()
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9acccf9..815c523 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,10 @@
* Add `StreamExtensions.firstOrNull`.
+* Add a `CancelableOperation.fromSubscription()` static factory.
+
+* Add a `CancelableOperation.race()` static method.
+
## 2.8.2
* Deprecate `EventSinkBase`, `StreamSinkBase`, `IOSinkBase`.
diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart
index e04af90..f03f672 100644
--- a/lib/src/cancelable_operation.dart
+++ b/lib/src/cancelable_operation.dart
@@ -39,6 +39,62 @@
return completer.operation;
}
+ /// Creates a [CancelableOperation] wrapping [subscription].
+ ///
+ /// This overrides [subscription.onDone] and [subscription.onError] so that
+ /// the returned operation will complete when the subscription completes or
+ /// emits an error. When this operation is canceled or when it emits an error,
+ /// the subscription will be canceled (unlike
+ /// `CancelableOperation.fromFuture(subscription.asFuture())`).
+ static CancelableOperation<void> fromSubscription(
+ StreamSubscription<void> subscription) {
+ var completer = CancelableCompleter<void>(onCancel: subscription.cancel);
+ subscription.onDone(completer.complete);
+ subscription.onError((Object error, StackTrace stackTrace) {
+ subscription.cancel().whenComplete(() {
+ completer.completeError(error, stackTrace);
+ });
+ });
+ return completer.operation;
+ }
+
+ /// Returns a [CancelableOperation] that completes with the value of the first
+ /// of [operations] to complete.
+ ///
+ /// Once any of [operations] completes, its result is forwarded to the
+ /// returned [CancelableOperation] and the rest are cancelled. When the
+ /// returned operation is cancelled, all the [operations] are cancelled as
+ /// well.
+ static CancelableOperation<T> race<T>(
+ Iterable<CancelableOperation<T>> operations) {
+ operations = operations.toList();
+ if (operations.isEmpty) {
+ throw ArgumentError.value("May not be empty", "operations");
+ }
+
+ var done = false;
+ // Note: if one of the completers has already completed, it's not actually
+ // cancelled by this.
+ Future<void> _cancelAll() {
+ done = true;
+ return Future.wait(operations.map((operation) => operation.cancel()));
+ }
+
+ var completer = CancelableCompleter<T>(onCancel: _cancelAll);
+ for (var operation in operations) {
+ operation.then((value) {
+ if (!done) completer.complete(_cancelAll().then((_) => value));
+ }, onError: (error, stackTrace) {
+ if (!done) {
+ completer.complete(
+ _cancelAll().then((_) => Future.error(error, stackTrace)));
+ }
+ });
+ }
+
+ return completer.operation;
+ }
+
/// The value returned by the operation.
Future<T> get value => _completer._inner.future;
diff --git a/test/cancelable_operation_test.dart b/test/cancelable_operation_test.dart
index a46dfe2..fa41001 100644
--- a/test/cancelable_operation_test.dart
+++ b/test/cancelable_operation_test.dart
@@ -122,6 +122,30 @@
expect(operation.value, throwsA('error'));
});
});
+
+ group('CancelableOperation.fromSubscription', () {
+ test('forwards a done event once it completes', () async {
+ var controller = StreamController<void>();
+ var operationCompleted = false;
+ CancelableOperation.fromSubscription(controller.stream.listen(null))
+ .then((_) {
+ operationCompleted = true;
+ });
+
+ await flushMicrotasks();
+ expect(operationCompleted, isFalse);
+
+ controller.close();
+ await flushMicrotasks();
+ expect(operationCompleted, isTrue);
+ });
+
+ test('forwards errors', () {
+ var operation = CancelableOperation.fromSubscription(
+ Stream.error('error').listen(null));
+ expect(operation.value, throwsA('error'));
+ });
+ });
});
group('when canceled', () {
@@ -237,6 +261,37 @@
await flushMicrotasks();
expect(fired, isTrue);
});
+
+ test('CancelableOperation.fromSubscription() cancels the subscription',
+ () async {
+ var cancelCompleter = Completer<void>();
+ var canceled = false;
+ var controller = StreamController<void>(onCancel: () {
+ canceled = true;
+ return cancelCompleter.future;
+ });
+ var operation =
+ CancelableOperation.fromSubscription(controller.stream.listen(null));
+
+ await flushMicrotasks();
+ expect(canceled, isFalse);
+
+ // The `cancel()` call shouldn't complete until
+ // `StreamSubscription.cancel` completes.
+ var cancelCompleted = false;
+ expect(
+ operation.cancel().then((_) {
+ cancelCompleted = true;
+ }),
+ completes);
+ await flushMicrotasks();
+ expect(canceled, isTrue);
+ expect(cancelCompleted, isFalse);
+
+ cancelCompleter.complete();
+ await flushMicrotasks();
+ expect(cancelCompleted, isTrue);
+ });
});
group('asStream()', () {
@@ -440,4 +495,65 @@
});
});
});
+
+ group('race()', () {
+ late bool canceled1;
+ late CancelableCompleter<int> completer1;
+ late bool canceled2;
+ late CancelableCompleter<int> completer2;
+ late bool canceled3;
+ late CancelableCompleter<int> completer3;
+ late CancelableOperation<int> operation;
+ setUp(() {
+ canceled1 = false;
+ completer1 = CancelableCompleter<int>(onCancel: () {
+ canceled1 = true;
+ });
+
+ canceled2 = false;
+ completer2 = CancelableCompleter<int>(onCancel: () {
+ canceled2 = true;
+ });
+
+ canceled3 = false;
+ completer3 = CancelableCompleter<int>(onCancel: () {
+ canceled3 = true;
+ });
+
+ operation = CancelableOperation.race(
+ [completer1.operation, completer2.operation, completer3.operation]);
+ });
+
+ test('returns the first value to complete', () {
+ completer1.complete(1);
+ completer2.complete(2);
+ completer3.complete(3);
+
+ expect(operation.value, completion(equals(1)));
+ });
+
+ test('throws the first error to complete', () {
+ completer1.completeError("error 1");
+ completer2.completeError("error 2");
+ completer3.completeError("error 3");
+
+ expect(operation.value, throwsA("error 1"));
+ });
+
+ test('cancels any completers that haven\'t completed', () async {
+ completer1.complete(1);
+ await expectLater(operation.value, completion(equals(1)));
+ expect(canceled1, isFalse);
+ expect(canceled2, isTrue);
+ expect(canceled3, isTrue);
+ });
+
+ test('cancels all completers when the operation is completed', () async {
+ await operation.cancel();
+
+ expect(canceled1, isTrue);
+ expect(canceled2, isTrue);
+ expect(canceled3, isTrue);
+ });
+ });
}