Merge _StreamQueue into StreamQueue (#73)
The original reason to split into an abstract and concrete class was to
allow other implementations that would allow for the transaction
implementation. Since that was implemented without a separate subclass
merge the implementation back to one class to avoid unnecessary
abstraction and make it easier to debug.
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 0c81aec..61a1654 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -62,7 +62,7 @@
///
/// When you need no further events the `StreamQueue` should be closed
/// using [cancel]. This releases the underlying stream subscription.
-abstract class StreamQueue<T> {
+class StreamQueue<T> {
// This class maintains two queues: one of events and one of requests.
// The active request (the one in front of the queue) is called with
// the current event queue when it becomes active, every time a
@@ -81,6 +81,14 @@
// potentially a request that takes either five or zero events, determined
// by the content of the fifth event.
+ final Stream<T> _source;
+
+ /// Subscription on [_source] while listening for events.
+ ///
+ /// Set to subscription when listening, and set to `null` when the
+ /// subscription is done (and [_isDone] is set to true).
+ StreamSubscription<T> _subscription;
+
/// Whether the event source is done.
bool _isDone = false;
@@ -107,9 +115,10 @@
final Queue<_EventRequest> _requestQueue = new Queue();
/// Create a `StreamQueue` of the events of [source].
- factory StreamQueue(Stream<T> source) = _StreamQueue<T>;
+ factory StreamQueue(Stream<T> source) => StreamQueue._(source);
- StreamQueue._();
+ // Private generative constructor to avoid subclasses.
+ StreamQueue._(this._source);
/// Asks if the stream has any more events.
///
@@ -432,24 +441,66 @@
/// Can only be used by the very last request (the stream queue must
/// be closed by that request).
/// Only used by [rest].
- Stream<T> _extractStream();
+ Stream<T> _extractStream() {
+ assert(_isClosed);
+ if (_isDone) {
+ return new Stream<T>.empty();
+ }
+ _isDone = true;
+
+ if (_subscription == null) {
+ return _source;
+ }
+
+ var subscription = _subscription;
+ _subscription = null;
+
+ var wasPaused = subscription.isPaused;
+ var result = new SubscriptionStream<T>(subscription);
+ // Resume after creating stream because that pauses the subscription too.
+ // This way there won't be a short resumption in the middle.
+ if (wasPaused) subscription.resume();
+ return result;
+ }
/// Requests that the event source pauses events.
///
/// This is called automatically when the request queue is empty.
///
/// The event source is restarted by the next call to [_ensureListening].
- void _pause();
+ void _pause() {
+ _subscription.pause();
+ }
/// Ensures that we are listening on events from the event source.
///
/// Starts listening for the first time or resumes after a [_pause].
///
/// Is called automatically if a request requires more events.
- void _ensureListening();
+ void _ensureListening() {
+ if (_isDone) return;
+ if (_subscription == null) {
+ _subscription = _source.listen((data) {
+ _addResult(new Result.value(data));
+ }, onError: (error, StackTrace stackTrace) {
+ _addResult(new Result.error(error, stackTrace));
+ }, onDone: () {
+ _subscription = null;
+ this._close();
+ });
+ } else {
+ _subscription.resume();
+ }
+ }
/// Cancels the underlying event source.
- Future _cancel();
+ Future _cancel() {
+ if (_isDone) return null;
+ _subscription ??= _source.listen(null);
+ var future = _subscription.cancel();
+ _close();
+ return future;
+ }
// ------------------------------------------------------------------
// Methods called by the event source to add events or say that it's
@@ -494,73 +545,6 @@
}
}
-/// The default implementation of [StreamQueue].
-///
-/// This queue gets its events from a stream which is listened
-/// to when a request needs events.
-class _StreamQueue<T> extends StreamQueue<T> {
- /// Source of events.
- final Stream<T> _sourceStream;
-
- /// Subscription on [_sourceStream] while listening for events.
- ///
- /// Set to subscription when listening, and set to `null` when the
- /// subscription is done (and [_isDone] is set to true).
- StreamSubscription<T> _subscription;
-
- _StreamQueue(this._sourceStream) : super._();
-
- Future _cancel() {
- if (_isDone) return null;
- if (_subscription == null) _subscription = _sourceStream.listen(null);
- var future = _subscription.cancel();
- _close();
- return future;
- }
-
- void _ensureListening() {
- if (_isDone) return;
- if (_subscription == null) {
- _subscription = _sourceStream.listen((data) {
- _addResult(new Result.value(data));
- }, onError: (error, StackTrace stackTrace) {
- _addResult(new Result.error(error, stackTrace));
- }, onDone: () {
- _subscription = null;
- this._close();
- });
- } else {
- _subscription.resume();
- }
- }
-
- void _pause() {
- _subscription.pause();
- }
-
- Stream<T> _extractStream() {
- assert(_isClosed);
- if (_isDone) {
- return new Stream<T>.empty();
- }
- _isDone = true;
-
- if (_subscription == null) {
- return _sourceStream;
- }
-
- var subscription = _subscription;
- _subscription = null;
-
- var wasPaused = subscription.isPaused;
- var result = new SubscriptionStream<T>(subscription);
- // Resume after creating stream because that pauses the subscription too.
- // This way there won't be a short resumption in the middle.
- if (wasPaused) subscription.resume();
- return result;
- }
-}
-
/// A transaction on a [StreamQueue], created by [StreamQueue.startTransaction].
///
/// Copies of the parent queue may be created using [newQueue]. Calling [commit]