Allow pauses to streams right before they end
diff --git a/lib/src/utils.dart b/lib/src/utils.dart
index 4fa75b1..2a531e8 100644
--- a/lib/src/utils.dart
+++ b/lib/src/utils.dart
@@ -445,7 +445,7 @@
var state = _StreamState.initial;
- /// Sends trailing data to the stream. Reeturns true if the subscription
+ /// Sends trailing data to the stream. Returns true if the subscription
/// should still be resumed afterwards.
bool emitTrailing() {
// Attempt to serve requests from pending data first.
@@ -512,13 +512,25 @@
controller
..onListen = scheduleInitialEmit
..onPause = () {
- assert(state == _StreamState.initial || state == _StreamState.attached);
+ assert(
+ state == _StreamState.initial ||
+ state == _StreamState.attached ||
+ state == _StreamState.done,
+ 'Unexpected pause event in $state ($_remainingBlocksInOutgoing blocks remaining).');
if (state == _StreamState.initial) {
state = _StreamState.pausedAfterInitial;
- } else {
+ } else if (state == _StreamState.attached) {
_pause();
state = _StreamState.pausedAfterAttached;
+ } else if (state == _StreamState.done) {
+ // It may happen that onPause is called in a state where we believe
+ // the stream to be done already. After the stream is done, we close
+ // the controller in a new microtask. So if the subscription is paused
+ // after the last event it emitted but before we close the controller,
+ // we can get a pause event here.
+ // There's nothing to do in that case.
+ assert(_subscription?.isPaused != false);
}
}
..onResume = () {
diff --git a/test/utils_test.dart b/test/utils_test.dart
index 93af4e8..e9b3a8b 100644
--- a/test/utils_test.dart
+++ b/test/utils_test.dart
@@ -279,6 +279,27 @@
await reader.nextBlocks(2).first;
await reader.close();
});
+
+ test('can pause stream subscriptions when all data has been emitted',
+ () async {
+ final reader = BlockReader(controller.stream);
+ controller.add(Uint8List(blockSize * 3));
+ await reader.nextBlock();
+
+ late StreamSubscription<Uint8List> subscription;
+ final didPause = Completer<void>();
+
+ subscription = reader.nextBlocks(2).listen((_) {
+ scheduleMicrotask(() {
+ subscription.pause();
+ didPause.complete();
+ });
+ });
+
+ await didPause.future;
+ await subscription.cancel();
+ await reader.close();
+ });
});
}