Avoid leaking unreachable listeners on `CancelableOperation`. (#206)

* Make `CancelableOperation` not hold onto unnecessary callbacks.

The existing `CancelableOperation` contains two completers,
where at most one will ever complete. Listeners added to
the other completer's future will never be released as
long as the operation is alive, even after the operation
has otherwise completed.

This change drops reference to the other completer when
one completer is chosen for completion.
That can allow the completer, its future, and that future's listeners
to be GC'ed when it's known that they'll never be relevant again.

The change has one visible effect: If asking for the `value`
future after the operation has been cancelled and the value
completer has been released, you'll get a new future,
which also never completes, which is not identical to
other futures returned by the same getter.
THis only matters if someone checks the future for identity,
their behavior is exactly the same (no behavior, whatsoever).

Since the same class returns a completely new future on each
call read of `valueOrCancellation`, I think that's a reasonable
change, and unlikely to affect anyone.
(Users are not usually expecting asynchronous functions to
return the same future every time since `async` functions
don't.)

Fixes #200.

diff --git a/lib/src/cancelable_operation.dart b/lib/src/cancelable_operation.dart
index 95332dc..394ba07 100644
--- a/lib/src/cancelable_operation.dart
+++ b/lib/src/cancelable_operation.dart
@@ -13,7 +13,7 @@
   /// The completer that produced this operation.
   ///
   /// That completer is canceled when [cancel] is called.
-  final CancelableCompleter<T> _completer;
+  CancelableCompleter<T> _completer;
 
   CancelableOperation._(this._completer);
 
@@ -93,7 +93,7 @@
   /// This future will not complete if the operation is cancelled.
   /// Use [valueOrCancellation] for a future which completes
   /// both if the operation is cancelled and if it isn't.
-  Future<T> get value => _completer._inner.future;
+  Future<T> get value => _completer._inner?.future ?? Completer<T>().future;
 
   /// Creates a [Stream] containing the result of this operation.
   ///
@@ -103,7 +103,7 @@
     var controller =
         StreamController<T>(sync: true, onCancel: _completer._cancel);
 
-    value.then((value) {
+    _completer._inner?.future.then((value) {
       controller.add(value);
       controller.close();
     }, onError: (Object error, StackTrace stackTrace) {
@@ -123,7 +123,7 @@
     var completer = Completer<T?>.sync();
     value.then(completer.complete, onError: completer.completeError);
 
-    _completer._cancelCompleter.future.then((_) {
+    _completer._cancelCompleter?.future.then((_) {
       completer.complete(cancellationValue);
     }, onError: completer.completeError);
 
@@ -153,12 +153,10 @@
     final completer =
         CancelableCompleter<R>(onCancel: propagateCancel ? cancel : null);
 
-    if (!isCanceled) {
-      value
-          .then(onValue, onError: onError)
-          .then(completer.complete, onError: completer.completeError);
-    }
-    _completer._cancelCompleter.future.then((_) {
+    _completer._inner?.future
+        .then(onValue, onError: onError)
+        .then(completer.complete, onError: completer.completeError);
+    _completer._cancelCompleter?.future.then((_) {
       if (onCancel != null) {
         completer.complete(Future.sync(onCancel));
       } else {
@@ -176,50 +174,82 @@
   Future cancel() => _completer._cancel();
 
   /// Whether this operation has been canceled before it completed.
-  bool get isCanceled => _completer.isCanceled;
+  bool get isCanceled => _completer._isCanceled;
 
   /// Whether the result of this operation is ready.
   ///
   /// When ready, the [value] future is completed with the result value
   /// or error, and this operation can no longer be cancelled.
   /// An operation may be complete before the listeners on [value] are invoked.
-  bool get isCompleted => _completer._inner.isCompleted;
+  bool get isCompleted => _completer._isCompleted;
 }
 
 /// A completer for a [CancelableOperation].
 class CancelableCompleter<T> {
+  // The cancelable completer is in one of the following states:
+  // * Initial:
+  //      _inner != null
+  //      _cancelCompleter != null
+  //      _mayComplete: true
+  //
+  // * Async-completed: `complete` called with a future while Initial.
+  //      _inner != null
+  //      _cancelCompleter != null
+  //      _mayComplete: false
+  //
+  // * Completed: `complete` called with a value or `completeError` called
+  //     while Initial, or the future passed in Async-completed completes
+  //     while AsyncCompleted.
+  //      _inner != null
+  //      _cancelCompleter == null
+  //      _mayComplete: false
+  //
+  // * Cancelled may-complete: `_cancel` called while Initial.
+  //      Allows calling `complete`/`completeError` even if it does nothing.
+  //      _inner == null
+  //      _cancelCompleter != null
+  //      _mayComplete: true
+  //
+  // * Cancelled can't-complete: `_cancel` called while Async-completed.
+  //      _inner == null
+  //      _cancelCompleter != null
+  //      _mayComplete: false
+
   /// The completer for the wrapped future.
   ///
   /// At most one of `_inner.future` and `_cancelCompleter.future` will
   /// ever complete.
-  final _inner = Completer<T>();
+  /// Set to `null` when when the operation is canceled, because then
+  /// it's guaranteed that this completer will never complete.
+  Completer<T>? _inner = Completer<T>();
 
   /// Completed when `cancel` is called.
   ///
   /// At most one of `_inner.future` and `_cancelCompleter.future` will
   /// ever complete.
-  final _cancelCompleter = Completer<void>();
+  /// Set to `null` when [_inner] is completed, because then it's
+  /// guaranteed that this completer will never complete.
+  Completer<void>? _cancelCompleter = Completer<void>();
 
   /// The callback to call if the operation is canceled.
   final FutureOr<void> Function()? _onCancel;
 
+  /// Whether [complete] or [completeError] may still be called.
+  ///
+  /// Set to false when calling either.
+  ///
+  /// When completing by calling [complete] with a future,
+  /// it's still possible to cancel until the result is actually
+  /// available.
+  /// You are also allowed to call [complete] or [completeError]
+  /// after the operation has been canceled, as long as you only call it once.
+  /// It just won't do anything after the operation is cancelled.
+  /// This value only guards the calls to [complete] and [completeError].
+  bool _mayComplete = true;
+
   /// The operation controlled by this completer.
   late final operation = CancelableOperation<T>._(this);
 
-  /// Set when [complete] or [completeError] is called.
-  ///
-  /// Completing twice is not allowed.
-  ///
-  /// If [complete] is called with a future, it's still possible to
-  /// cancel the operation until that future completes,
-  /// so this value and [_isCanceled] are not mutually exclusive.
-  bool _isCompleted = false;
-
-  /// Set when [cancel] is called.
-  ///
-  /// Cancelling twice does nothing, nor does completing after cancelling.
-  bool _isCanceled = false;
-
   /// Creates a new completer for a [CancelableOperation].
   ///
   /// The cancelable [operation] can be completed using
@@ -234,6 +264,16 @@
   /// The [onCancel] function will be called at most once.
   CancelableCompleter({FutureOr Function()? onCancel}) : _onCancel = onCancel;
 
+  /// Whether the [_inner] completer has been completed.
+  ///
+  /// At this point it's no longer possible to cancel the operation.
+  bool get _isCompleted => _cancelCompleter == null;
+
+  /// Whether the completer was canceled before the result was ready.
+  ///
+  /// At this point, it's no longer possible to complete the operation.
+  bool get _isCanceled => _inner == null;
+
   /// Whether the [complete] or [completeError] have been called.
   ///
   /// Once this completer has been completed with either a result or error,
@@ -242,7 +282,7 @@
   /// If [complete] was called with a [Future] argument, this completer may be
   /// completed before it's [operation] is completed. In that case the
   /// [operation] may still be canceled before the result is available.
-  bool get isCompleted => _isCompleted;
+  bool get isCompleted => !_mayComplete;
 
   /// Whether the completer was canceled before the result was ready.
   bool get isCanceled => _isCanceled;
@@ -260,41 +300,47 @@
   /// has been called once.
   /// The [isCompleted] is true when either of these methods have been called.
   void complete([FutureOr<T>? value]) {
-    if (_isCompleted) throw StateError('Operation already completed');
-    _isCompleted = true;
+    if (!_mayComplete) throw StateError('Operation already completed');
+    _mayComplete = false;
 
     if (value is! Future<T>) {
-      if (_isCanceled) return;
-      _inner.complete(value);
+      _completeNow()?.complete(value);
       return;
     }
 
-    if (_isCanceled) {
+    if (_inner == null) {
       // Make sure errors from [value] aren't top-leveled.
       value.ignore();
       return;
     }
 
     value.then((result) {
-      if (_isCanceled) return;
-      _inner.complete(result);
+      _completeNow()?.complete(result);
     }, onError: (Object error, StackTrace stackTrace) {
-      if (_isCanceled) return;
-      _inner.completeError(error, stackTrace);
+      _completeNow()?.completeError(error, stackTrace);
     });
   }
 
+  /// Completer to use for completing with a result.
+  ///
+  /// Returns `null` if it's not possible to complete any more.
+  /// Sets [_cancelCompleter] to `null` if returning non-`null`.
+  Completer<T>? _completeNow() {
+    var inner = _inner;
+    if (inner == null) return null;
+    _cancelCompleter = null;
+    return inner;
+  }
+
   /// Completes [operation] with [error] and [stackTrace].
   ///
   /// This method may not be called after either [complete] or [completeError]
   /// has been called once.
   /// The [isCompleted] is true when either of these methods have been called.
   void completeError(Object error, [StackTrace? stackTrace]) {
-    if (_isCompleted) throw StateError('Operation already completed');
-    _isCompleted = true;
-
-    if (_isCanceled) return;
-    _inner.completeError(error, stackTrace);
+    if (!_mayComplete) throw StateError('Operation already completed');
+    _mayComplete = false;
+    _completeNow()?.completeError(error, stackTrace);
   }
 
   /// Cancels the operation.
@@ -310,14 +356,14 @@
   /// The completer can be cancelled until the result becomes available,
   /// even if [isCompleted] is true.
   Future<void> _cancel() {
-    if (_inner.isCompleted) return Future.value(null);
+    var cancelCompleter = _cancelCompleter;
+    if (cancelCompleter == null) return Future.value(null);
 
-    if (!_isCanceled) {
-      _isCanceled = true;
+    if (_inner != null) {
+      _inner = null;
       var onCancel = _onCancel;
-      _cancelCompleter
-          .complete(onCancel == null ? null : Future.sync(onCancel));
+      cancelCompleter.complete(onCancel == null ? null : Future.sync(onCancel));
     }
-    return _cancelCompleter.future;
+    return cancelCompleter.future;
   }
 }