Simplify implementation of `StreamQueue`.
Only change in behavior is that all event seen by the original queue
while the transaction request is active, are forwarded to the transaction
immediately, and will still be visible after the transaction commits
or rejects.
The existing behavior was based on cancelling a stream subscription
and which events were visible depended on which commands had been
used to request those events.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 839eb91..78926d4 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,9 @@
## 2.11.0-dev
* Add `CancelableOperation.fromValue`.
+* Simplify implementation of `StreamQueue`.
+ Remaining queues of committed or rejected transactions now see all events
+ delivered up to the point where the transaction was completed.
## 2.10.0
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index f7ab8ba..fd6a2e4 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -5,12 +5,9 @@
import 'dart:async';
import 'dart:collection';
-import 'package:collection/collection.dart';
-
import 'cancelable_operation.dart';
import 'result/result.dart';
import 'stream_completer.dart';
-import 'stream_splitter.dart';
import 'subscription_stream.dart';
/// An asynchronous pull-based interface for accessing stream events.
@@ -62,6 +59,103 @@
///
/// When you need no further events the `StreamQueue` should be closed
/// using [cancel]. This releases the underlying stream subscription.
+class _StreamStreamQueue<T> extends StreamQueue<T> {
+ final Stream<T> _source;
+
+ /// Subscription on [_source] while listening for events.
+ ///
+ /// Set to subscription when listening, and set to `null` when the
+ /// subscription is done (and [_isDone] is set to true).
+ StreamSubscription<T>? _subscription;
+
+ // Private generative constructor to avoid subclasses.
+ _StreamStreamQueue(this._source) : super._() {
+ // Start listening immediately if we could otherwise lose events.
+ if (_source.isBroadcast) {
+ _ensureListening();
+ _pause();
+ }
+ }
+
+ /// Requests that the event source pauses events.
+ ///
+ /// This is called automatically when the request queue is empty.
+ ///
+ /// The event source is restarted by the next call to [_ensureListening].
+ @override
+ void _pause() {
+ _subscription?.pause();
+ }
+
+ /// Ensures that we are listening on events from the event source.
+ ///
+ /// Starts listening for the first time or resumes after a [_pause].
+ ///
+ /// Is called automatically if a request requires more events.
+ @override
+ void _ensureListening() {
+ if (_isDone) return;
+ var subscription = _subscription;
+ if (subscription != null) {
+ subscription.resume();
+ } else {
+ _subscription = _source.listen((data) {
+ _addResult(Result.value(data));
+ }, onError: (Object error, StackTrace stackTrace) {
+ _addResult(Result.error(error, stackTrace));
+ }, onDone: () {
+ _subscription = null;
+ _close();
+ });
+ }
+ }
+
+ /// Cancels the underlying event source.
+ @override
+ Future? _cancel() {
+ if (_isDone) return null;
+ var cancelFuture = (_subscription ?? _source.listen(null)).cancel();
+ _close(); // Sets _isDone, updates requests.
+ return cancelFuture;
+ }
+
+ @override
+ Stream<T> get rest {
+ _checkNotClosed();
+ var request = _StreamRestRequest<T>(this);
+ _setClosed();
+ _addRequest(request);
+ return request.stream;
+ }
+
+ /// Extracts a stream from the event source and makes this stream queue
+ /// unusable.
+ ///
+ /// Can only be used by the very last request (the stream queue must
+ /// be closed by that request).
+ /// Only used by [rest].
+ Stream<T> _extractStream() {
+ assert(_isClosed);
+ if (_isDone) {
+ return Stream<T>.empty();
+ }
+ _setDone();
+
+ var subscription = _subscription;
+ if (subscription == null) {
+ return _source;
+ }
+ _subscription = null;
+
+ var wasPaused = subscription.isPaused;
+ var result = SubscriptionStream<T>(subscription);
+ // Resume after creating stream because that pauses the subscription too.
+ // This way there won't be a short resumption in the middle.
+ if (wasPaused) subscription.resume();
+ return result;
+ }
+}
+
class StreamQueue<T> {
// This class maintains two queues: one of events and one of requests.
// The active request (the one in front of the queue) is called with
@@ -81,21 +175,42 @@
// potentially a request that takes either five or zero events, determined
// by the content of the fifth event.
- final Stream<T> _source;
+ /// Create a `StreamQueue` of the events of [source].
+ factory StreamQueue(Stream<T> source) = _StreamStreamQueue<T>;
- /// Subscription on [_source] while listening for events.
- ///
- /// Set to subscription when listening, and set to `null` when the
- /// subscription is done (and [_isDone] is set to true).
- StreamSubscription<T>? _subscription;
+ StreamQueue._();
+
+ StreamQueue._init(Iterable<Result<T>> initialEvents, bool isDone) {
+ _eventQueue.addAll(initialEvents);
+ _eventsReceived = _eventQueue.length;
+ if (isDone) _setClosed();
+ }
+
+ static const int _flagDone = 1;
+ static const int _flagClosed = 2;
+ int _flags = 0;
/// Whether the event source is done.
- bool _isDone = false;
+ bool get _isDone => _flags & _flagDone != 0;
+
+ void _setDone() {
+ _flags |= _flagDone;
+ }
/// Whether a closing operation has been performed on the stream queue.
///
/// Closing operations are [cancel] and [rest].
- bool _isClosed = false;
+ bool get _isClosed => _flags & _flagClosed != 0;
+
+ void _setClosed() {
+ _flags |= _flagClosed;
+ }
+
+ Future? _cancel() {
+ if (_isDone) return null;
+ _close();
+ return Future<void>.value(null);
+ }
/// The number of events dispatched by this queue.
///
@@ -107,25 +222,13 @@
var _eventsReceived = 0;
/// Queue of events not used by a request yet.
- final QueueList<Result<T>> _eventQueue = QueueList();
+ final Queue<Result<T>> _eventQueue = Queue();
/// Queue of pending requests.
///
/// Access through methods below to ensure consistency.
final Queue<_EventRequest> _requestQueue = Queue();
- /// Create a `StreamQueue` of the events of [source].
- factory StreamQueue(Stream<T> source) => StreamQueue._(source);
-
- // Private generative constructor to avoid subclasses.
- StreamQueue._(this._source) {
- // Start listening immediately if we could otherwise lose events.
- if (_source.isBroadcast) {
- _ensureListening();
- _pause();
- }
- }
-
/// Whether the stream has any more events.
///
/// Returns a future that completes with `true` if the stream has any
@@ -200,8 +303,8 @@
/// after calling [cancel].
Stream<T> get rest {
_checkNotClosed();
- var request = _RestRequest<T>(this);
- _isClosed = true;
+ var request = _RestRequest<T>();
+ _setClosed();
_addRequest(request);
return request.stream;
}
@@ -377,18 +480,18 @@
/// subscription providing the events.
///
/// If [immediate] is `true`, the source is instead canceled
- /// immediately. Any pending events are completed as though the underlying
+ /// immediately. Any pending requests are completed as though the underlying
/// stream had closed.
///
/// The returned future completes with the result of calling
- /// `cancel` on the subscription to the source stream.
+ /// `cancel` on the subscription to the source stream, if any.
///
/// After calling `cancel`, no further events can be requested.
/// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel]
/// may be called again.
Future? cancel({bool immediate = false}) {
_checkNotClosed();
- _isClosed = true;
+ _setClosed();
if (!immediate) {
var request = _CancelRequest<T>(this);
@@ -396,7 +499,7 @@
return request.future;
}
- if (_isDone && _eventQueue.isEmpty) return Future.value();
+ if (_isDone && _eventQueue.isEmpty) return Future<void>.value(null);
return _cancel();
}
@@ -428,71 +531,8 @@
}
}
- /// Extracts a stream from the event source and makes this stream queue
- /// unusable.
- ///
- /// Can only be used by the very last request (the stream queue must
- /// be closed by that request).
- /// Only used by [rest].
- Stream<T> _extractStream() {
- assert(_isClosed);
- if (_isDone) {
- return Stream<T>.empty();
- }
- _isDone = true;
-
- var subscription = _subscription;
- if (subscription == null) {
- return _source;
- }
- _subscription = null;
-
- var wasPaused = subscription.isPaused;
- var result = SubscriptionStream<T>(subscription);
- // Resume after creating stream because that pauses the subscription too.
- // This way there won't be a short resumption in the middle.
- if (wasPaused) subscription.resume();
- return result;
- }
-
- /// Requests that the event source pauses events.
- ///
- /// This is called automatically when the request queue is empty.
- ///
- /// The event source is restarted by the next call to [_ensureListening].
- void _pause() {
- _subscription!.pause();
- }
-
- /// Ensures that we are listening on events from the event source.
- ///
- /// Starts listening for the first time or resumes after a [_pause].
- ///
- /// Is called automatically if a request requires more events.
- void _ensureListening() {
- if (_isDone) return;
- if (_subscription == null) {
- _subscription = _source.listen((data) {
- _addResult(Result.value(data));
- }, onError: (Object error, StackTrace stackTrace) {
- _addResult(Result.error(error, stackTrace));
- }, onDone: () {
- _subscription = null;
- _close();
- });
- } else {
- _subscription!.resume();
- }
- }
-
- /// Cancels the underlying event source.
- Future? _cancel() {
- if (_isDone) return null;
- _subscription ??= _source.listen(null);
- var future = _subscription!.cancel();
- _close();
- return future;
- }
+ // Does nothing by default.
+ void _pause() {}
// ------------------------------------------------------------------
// Methods called by the event source to add events or say that it's
@@ -506,10 +546,24 @@
_updateRequests();
}
+ /// Called when zero or more events are added.
+ /// Always calls [_updateRequests] after adding.
+ void _addResults(Queue<Result<T>> results, int start, int end, bool isDone) {
+ for (var i = start; i < end; i++) {
+ _eventQueue.add(results.elementAt(i));
+ _eventsReceived++;
+ }
+ if (isDone) {
+ _close();
+ } else {
+ _updateRequests();
+ }
+ }
+
/// Called when the event source is done.
/// Always calls [_updateRequests] after adding.
void _close() {
- _isDone = true;
+ _setDone();
_updateRequests();
}
@@ -532,6 +586,9 @@
}
_requestQueue.add(request);
}
+
+ /// Allows subclasses to do something when a first request is added.
+ void _ensureListening() {}
}
/// A transaction on a [StreamQueue], created by [StreamQueue.startTransaction].
@@ -543,20 +600,36 @@
/// The parent queue on which this transaction is active.
final StreamQueue<T> _parent;
- /// The splitter that produces copies of the parent queue's stream.
- final StreamSplitter<T> _splitter;
-
/// Queues created using [newQueue].
- final _queues = <StreamQueue>{};
+ final _queues = <StreamQueue<T>>{};
- /// Whether [commit] has been called.
- var _committed = false;
+ /// The value -1 until committed or rejected, then then number of consumed
+ /// results + 1 for committed and 0 for rejected).
+ int _state = -1;
- /// Whether [reject] has been called.
- var _rejected = false;
+ bool get _active => _state < 0;
+ bool get _committed => _state > 0;
+ bool get _rejected => _state == 0;
- StreamQueueTransaction._(this._parent, Stream<T> source)
- : _splitter = StreamSplitter(source);
+ int get _consumedEvents {
+ assert(_state >= 0);
+ return _state == 0 ? 0 : _state - 1;
+ }
+
+ void _update(Queue<Result<T>> events, bool isDone) {
+ if (!_active) return;
+ for (var queue in _queues) {
+ var start = queue._eventsReceived;
+ var end = events.length;
+ if (start < end) {
+ queue._addResults(events, start, end, isDone);
+ } else if (isDone) {
+ queue._close();
+ }
+ }
+ }
+
+ StreamQueueTransaction._(this._parent);
/// Creates a new copy of the parent queue.
///
@@ -564,7 +637,10 @@
/// [StreamQueue.startTransaction] was called. Its position can be committed
/// to the parent queue using [commit].
StreamQueue<T> newQueue() {
- var queue = StreamQueue(_splitter.split());
+ if (!_active) {
+ return StreamQueue._().._flags = StreamQueue._flagDone;
+ }
+ var queue = StreamQueue<T>._init(_parent._eventQueue, _parent._isDone);
_queues.add(queue);
return queue;
}
@@ -583,16 +659,9 @@
if (!_queues.contains(queue)) {
throw ArgumentError("Queue doesn't belong to this transaction.");
} else if (queue._requestQueue.isNotEmpty) {
- throw StateError("A queue with pending requests can't be committed.");
+ throw StateError("A queue with pending requests cannot be committed.");
}
- _committed = true;
-
- // Remove all events from the parent queue that were consumed by the
- // child queue.
- for (var j = 0; j < queue.eventsDispatched; j++) {
- _parent._eventQueue.removeFirst();
- }
-
+ _state = queue.eventsDispatched + 1;
_done();
}
@@ -605,32 +674,29 @@
/// Throws a [StateError] if [commit] or [reject] have already been called.
void reject() {
_assertActive();
- _rejected = true;
+ _state = 0;
_done();
}
// Cancels all [_queues], removes the [_TransactionRequest] from [_parent]'s
// request queue, and runs the next request.
void _done() {
- _splitter.close();
for (var queue in _queues) {
queue._cancel();
}
- // If this is the active request in the queue, mark it as finished.
- var currentRequest = _parent._requestQueue.first;
- if (currentRequest is _TransactionRequest &&
- currentRequest.transaction == this) {
- _parent._requestQueue.removeFirst();
- _parent._updateRequests();
- }
+ _queues.clear();
+ assert(!_active);
+ _parent._updateRequests();
}
/// Throws a [StateError] if [commit] or [reject] has already been called.
void _assertActive() {
- if (_committed) {
- throw StateError('This transaction has already been accepted.');
- } else if (_rejected) {
+ if (_active) return;
+ if (_rejected) {
throw StateError('This transaction has already been rejected.');
+ } else {
+ assert(_committed);
+ throw StateError('This transaction has already been accepted.');
}
}
}
@@ -670,7 +736,7 @@
/// If the function returns `false` when the stream has already closed
/// ([isDone] is true), then the request must call
/// [StreamQueue._updateRequests] itself when it's ready to continue.
- bool update(QueueList<Result<T>> events, bool isDone);
+ bool update(Queue<Result<T>> events, bool isDone);
}
/// Request for a [StreamQueue.next] call.
@@ -686,7 +752,7 @@
Future<T> get future => _completer.future;
@override
- bool update(QueueList<Result<T>> events, bool isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
if (events.isNotEmpty) {
events.removeFirst().complete(_completer);
return true;
@@ -712,7 +778,7 @@
Future<T> get future => _completer.future;
@override
- bool update(QueueList<Result<T>> events, bool isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
if (events.isNotEmpty) {
events.first.complete(_completer);
return true;
@@ -744,7 +810,7 @@
Future<int> get future => _completer.future;
@override
- bool update(QueueList<Result<T>> events, bool isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
while (_eventsToSkip > 0) {
if (events.isEmpty) {
if (isDone) break;
@@ -789,7 +855,7 @@
_TakeRequest(super.eventsToTake);
@override
- bool update(QueueList<Result<T>> events, bool isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
while (_list.length < _eventsToTake) {
if (events.isEmpty) {
if (isDone) break;
@@ -813,7 +879,7 @@
_LookAheadRequest(super.eventsToTake);
@override
- bool update(QueueList<Result<T>> events, bool isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
while (_list.length < _eventsToTake) {
if (events.length == _list.length) {
if (isDone) break;
@@ -850,12 +916,55 @@
Future get future => _completer.future;
@override
- bool update(QueueList<Result<T>> events, bool isDone) {
- if (_streamQueue._isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
+ if (isDone) {
_completer.complete();
} else {
- _streamQueue._ensureListening();
- _completer.complete(_streamQueue._extractStream().listen(null).cancel());
+ _completer.complete(_streamQueue._cancel()); // ignore: void_checks
+ }
+ return true;
+ }
+}
+
+/// Request for a [StreamQueue.rest] call.
+///
+/// The request is always complete, it just waits in the request queue
+/// until all previous events are fulfilled, then it takes over the
+/// stream events subscription and creates a stream from it.
+class _StreamRestRequest<T> implements _EventRequest<T> {
+ /// Completer for the stream returned by the `rest` call.
+ final _completer = StreamCompleter<T>();
+
+ /// The [StreamQueue] object that has this request queued.
+ ///
+ /// When the event is completed, it needs to cancel the active subscription
+ /// of the `StreamQueue` object, if any.
+ final _StreamStreamQueue<T> _streamQueue;
+
+ _StreamRestRequest(this._streamQueue);
+
+ /// The stream which will contain the remaining events of [_streamQueue].
+ Stream<T> get stream => _completer.stream;
+
+ @override
+ bool update(Queue<Result<T>> events, bool isDone) {
+ if (events.isEmpty) {
+ if (_streamQueue._isDone) {
+ _completer.setEmpty();
+ } else {
+ _completer.setSourceStream(_streamQueue._extractStream());
+ }
+ } else {
+ // There are prefetched events which needs to be added before the
+ // remaining stream.
+ var controller = StreamController<T>(sync: true);
+ for (var event in events) {
+ event.addTo(controller);
+ }
+ controller
+ .addStream(_streamQueue._extractStream(), cancelOnError: false)
+ .whenComplete(controller.close);
+ _completer.setSourceStream(controller.stream);
}
return true;
}
@@ -867,41 +976,23 @@
/// until all previous events are fulfilled, then it takes over the
/// stream events subscription and creates a stream from it.
class _RestRequest<T> implements _EventRequest<T> {
- /// Completer for the stream returned by the `rest` call.
- final _completer = StreamCompleter<T>();
+ _RestRequest();
- /// The [StreamQueue] object that has this request queued.
- ///
- /// When the event is completed, it needs to cancel the active subscription
- /// of the `StreamQueue` object, if any.
- final StreamQueue<T> _streamQueue;
+ final StreamController<T> _controller = StreamController<T>(sync: true);
- _RestRequest(this._streamQueue);
-
- /// The stream which will contain the remaining events of [_streamQueue].
- Stream<T> get stream => _completer.stream;
+ /// The stream which will contain the remaining events.
+ Stream<T> get stream => _controller.stream;
@override
- bool update(QueueList<Result<T>> events, bool isDone) {
- if (events.isEmpty) {
- if (_streamQueue._isDone) {
- _completer.setEmpty();
- } else {
- _completer.setSourceStream(_streamQueue._extractStream());
- }
- } else {
- // There are prefetched events which needs to be added before the
- // remaining stream.
- var controller = StreamController<T>();
- for (var event in events) {
- event.addTo(controller);
- }
- controller
- .addStream(_streamQueue._extractStream(), cancelOnError: false)
- .whenComplete(controller.close);
- _completer.setSourceStream(controller.stream);
+ bool update(Queue<Result<T>> events, bool isDone) {
+ while (events.isNotEmpty) {
+ events.removeFirst().addTo(_controller);
}
- return true;
+ if (isDone) {
+ _controller.close();
+ return true;
+ }
+ return false;
}
}
@@ -917,7 +1008,7 @@
Future<bool> get future => _completer.future;
@override
- bool update(QueueList<Result<T>> events, bool isDone) {
+ bool update(Queue<Result<T>> events, bool isDone) {
if (events.isNotEmpty) {
_completer.complete(true);
return true;
@@ -938,24 +1029,26 @@
/// [StreamQueue._updateRequests].
class _TransactionRequest<T> implements _EventRequest<T> {
/// The transaction created by this request.
- late final StreamQueueTransaction<T> transaction;
+ final StreamQueueTransaction<T> transaction;
- /// The controller that passes events to [transaction].
- final _controller = StreamController<T>(sync: true);
+ int eventsProvided = 0;
- /// The number of events passed to [_controller] so far.
- var _eventsSent = 0;
-
- _TransactionRequest(StreamQueue<T> parent) {
- transaction = StreamQueueTransaction._(parent, _controller.stream);
- }
+ _TransactionRequest(StreamQueue<T> parent)
+ : transaction = StreamQueueTransaction._(parent);
@override
- bool update(QueueList<Result<T>> events, bool isDone) {
- while (_eventsSent < events.length) {
- events[_eventsSent++].addTo(_controller);
+ bool update(Queue<Result<T>> events, bool isDone) {
+ // The transaction never updates the events.
+ // It forwards events to transaction queues,
+ // and we check whether the transaction has been commmitted or not,
+ // and remove consumed events here before returning true.
+ transaction._update(events, isDone);
+ if (transaction._active) return false;
+ // Is committed or rejected. If committed, consumed events may be positive.
+ var consumedEvents = transaction._consumedEvents;
+ for (var i = 0; i < consumedEvents; i++) {
+ events.removeFirst();
}
- if (isDone && !_controller.isClosed) _controller.close();
- return transaction._committed || transaction._rejected;
+ return true;
}
}
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
index 626eb16..69bf82b 100644
--- a/test/stream_queue_test.dart
+++ b/test/stream_queue_test.dart
@@ -878,8 +878,9 @@
var queue = transaction.newQueue();
// This should emit no more events after the transaction is rejected.
+ // Acts like source closed, so stream closes.
queue.rest.listen(expectAsync1((_) {}, count: 3),
- onDone: expectAsync0(() {}, count: 0));
+ onDone: expectAsync0(() {}, count: 1));
controller.add(1);
controller.add(2);
@@ -939,20 +940,22 @@
transaction.commit(queue1);
});
- test('further child requests act as though the stream was closed',
- () async {
+ test(
+ 'further child requests act as though the stream was closed '
+ 'after already delivered events', () async {
+ // Already delivered events are still visible.
expect(await queue2.next, 2);
transaction.commit(queue2);
- expect(await queue1.hasNext, isFalse);
- expect(queue1.next, throwsStateError);
+ expect(await queue1.rest.toList(), [2]);
});
- test('pending child requests act as though the stream was closed',
- () async {
+ test(
+ 'pending child requests act as though the stream was closed '
+ 'after already delivered events', () async {
expect(await queue2.next, 2);
- expect(queue1.hasNext, completion(isFalse));
- expect(queue1.next, throwsStateError);
+ expect(queue1.hasNext, completion(isTrue));
+ expect(queue1.next, completion(2));
transaction.commit(queue2);
});