Allow pauses to streams right before they end
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index 4fa75b1..2a531e8 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -445,7 +445,7 @@
 
     var state = _StreamState.initial;
 
-    /// Sends trailing data to the stream. Reeturns true if the subscription
+    /// Sends trailing data to the stream. Returns true if the subscription
     /// should still be resumed afterwards.
     bool emitTrailing() {
       // Attempt to serve requests from pending data first.
@@ -512,13 +512,25 @@
     controller
       ..onListen = scheduleInitialEmit
       ..onPause = () {
-        assert(state == _StreamState.initial || state == _StreamState.attached);
+        assert(
+            state == _StreamState.initial ||
+                state == _StreamState.attached ||
+                state == _StreamState.done,
+            'Unexpected pause event in $state ($_remainingBlocksInOutgoing blocks remaining).');
 
         if (state == _StreamState.initial) {
           state = _StreamState.pausedAfterInitial;
-        } else {
+        } else if (state == _StreamState.attached) {
           _pause();
           state = _StreamState.pausedAfterAttached;
+        } else if (state == _StreamState.done) {
+          // It may happen that onPause is called in a state where we believe
+          // the stream to be done already. After the stream is done, we close
+          // the controller in a new microtask. So if the subscription is paused
+          // after the last event it emitted but before we close the controller,
+          // we can get a pause event here.
+          // There's nothing to do in that case.
+          assert(_subscription?.isPaused != false);
         }
       }
       ..onResume = () {
diff --git a/test/utils_test.dart b/test/utils_test.dart
index 93af4e8..e9b3a8b 100644
--- a/test/utils_test.dart
+++ b/test/utils_test.dart
@@ -279,6 +279,27 @@
       await reader.nextBlocks(2).first;
       await reader.close();
     });
+
+    test('can pause stream subscriptions when all data has been emitted',
+        () async {
+      final reader = BlockReader(controller.stream);
+      controller.add(Uint8List(blockSize * 3));
+      await reader.nextBlock();
+
+      late StreamSubscription<Uint8List> subscription;
+      final didPause = Completer<void>();
+
+      subscription = reader.nextBlocks(2).listen((_) {
+        scheduleMicrotask(() {
+          subscription.pause();
+          didPause.complete();
+        });
+      });
+
+      await didPause.future;
+      await subscription.cancel();
+      await reader.close();
+    });
   });
 }