Style tweaks in stream_queue.dart (#167)
- Use a noun phrase to document the `hasNext` getter. The `next` and
`peek` getters keep their verb phrase doc comments since the side
effects are critical distinctions between these methods. If we were
writing this today, `next()` would be a method.
- Change `_failClosed()` which returns an error, to `_checkNotClosed()`
which checks the condition and optionally closes. Avoid nesting the
majority of method behavior in a conditional.
- Use `RangeError.checkNotNegative` over a conditional.
- Change bare `Future` to `Future<void>` in code examples.
- Make `withTransaction` `async` since there is no longer a blocker now
that async methods start running synchronously.
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 5aa6054..f9902b9 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -126,7 +126,7 @@
}
}
- /// Asks if the stream has any more events.
+ /// Whether the stream has any more events.
///
/// Returns a future that completes with `true` if the stream has any
/// more events, whether data or error.
@@ -138,12 +138,10 @@
/// Another alternative is to use `take(1)` which returns either zero or
/// one events.
Future<bool> get hasNext {
- if (!_isClosed) {
- var hasNextRequest = _HasNextRequest<T>();
- _addRequest(hasNextRequest);
- return hasNextRequest.future;
- }
- throw _failClosed();
+ _checkNotClosed();
+ var hasNextRequest = _HasNextRequest<T>();
+ _addRequest(hasNextRequest);
+ return hasNextRequest.future;
}
/// Look at the next [count] data events without consuming them.
@@ -152,13 +150,11 @@
/// If one of the next [count] events is an error, the returned future
/// completes with this error, and the error is still left in the queue.
Future<List<T>> lookAhead(int count) {
- if (count < 0) throw RangeError.range(count, 0, null, 'count');
- if (!_isClosed) {
- var request = _LookAheadRequest<T>(count);
- _addRequest(request);
- return request.future;
- }
- throw _failClosed();
+ RangeError.checkNotNegative(count, 'count');
+ _checkNotClosed();
+ var request = _LookAheadRequest<T>(count);
+ _addRequest(request);
+ return request.future;
}
/// Requests the next (yet unrequested) event from the stream.
@@ -176,12 +172,10 @@
/// and they will be completed in the order they were requested, by the
/// first events that were not consumed by previous requeusts.
Future<T> get next {
- if (!_isClosed) {
- var nextRequest = _NextRequest<T>();
- _addRequest(nextRequest);
- return nextRequest.future;
- }
- throw _failClosed();
+ _checkNotClosed();
+ var nextRequest = _NextRequest<T>();
+ _addRequest(nextRequest);
+ return nextRequest.future;
}
/// Looks at the next (yet unrequested) event from the stream.
@@ -189,15 +183,13 @@
/// Like [next] except that the event is not consumed.
/// If the next event is an error event, it stays in the queue.
Future<T> get peek {
- if (!_isClosed) {
- var nextRequest = _PeekRequest<T>();
- _addRequest(nextRequest);
- return nextRequest.future;
- }
- throw _failClosed();
+ _checkNotClosed();
+ var nextRequest = _PeekRequest<T>();
+ _addRequest(nextRequest);
+ return nextRequest.future;
}
- /// Returns a stream of all the remaning events of the source stream.
+ /// A stream of all the remaning events of the source stream.
///
/// All requested [next], [skip] or [take] operations are completed
/// first, and then any remaining events are provided as events of
@@ -207,9 +199,7 @@
/// `rest` the caller may no longer request other events, like
/// after calling [cancel].
Stream<T> get rest {
- if (_isClosed) {
- throw _failClosed();
- }
+ _checkNotClosed();
var request = _RestRequest<T>(this);
_isClosed = true;
_addRequest(request);
@@ -232,13 +222,11 @@
/// then all events were succssfully skipped. If the value
/// is greater than zero then the stream ended early.
Future<int> skip(int count) {
- if (count < 0) throw RangeError.range(count, 0, null, 'count');
- if (!_isClosed) {
- var request = _SkipRequest<T>(count);
- _addRequest(request);
- return request.future;
- }
- throw _failClosed();
+ RangeError.checkNotNegative(count, 'count');
+ _checkNotClosed();
+ var request = _SkipRequest<T>(count);
+ _addRequest(request);
+ return request.future;
}
/// Requests the next [count] data events as a list.
@@ -257,13 +245,11 @@
/// of data collected so far. That is, the returned
/// list may have fewer than [count] elements.
Future<List<T>> take(int count) {
- if (count < 0) throw RangeError.range(count, 0, null, 'count');
- if (!_isClosed) {
- var request = _TakeRequest<T>(count);
- _addRequest(request);
- return request.future;
- }
- throw _failClosed();
+ RangeError.checkNotNegative(count, 'count');
+ _checkNotClosed();
+ var request = _TakeRequest<T>(count);
+ _addRequest(request);
+ return request.future;
}
/// Requests a transaction that can conditionally consume events.
@@ -285,7 +271,7 @@
///
/// ```dart
/// /// Consumes all empty lines from the beginning of [lines].
- /// Future consumeEmptyLines(StreamQueue<String> lines) async {
+ /// Future<void> consumeEmptyLines(StreamQueue<String> lines) async {
/// while (await lines.hasNext) {
/// var transaction = lines.startTransaction();
/// var queue = transaction.newQueue();
@@ -299,7 +285,7 @@
/// }
/// ```
StreamQueueTransaction<T> startTransaction() {
- if (_isClosed) throw _failClosed();
+ _checkNotClosed();
var request = _TransactionRequest(this);
_addRequest(request);
@@ -320,7 +306,7 @@
///
/// ```dart
/// /// Consumes all empty lines from the beginning of [lines].
- /// Future consumeEmptyLines(StreamQueue<String> lines) async {
+ /// Future<void> consumeEmptyLines(StreamQueue<String> lines) async {
/// while (await lines.hasNext) {
/// // Consume a line if it's empty, otherwise return.
/// if (!await lines.withTransaction(
@@ -330,23 +316,24 @@
/// }
/// }
/// ```
- Future<bool> withTransaction(Future<bool> Function(StreamQueue<T>) callback) {
+ Future<bool> withTransaction(
+ Future<bool> Function(StreamQueue<T>) callback) async {
var transaction = startTransaction();
- /// Avoid async/await to ensure that [startTransaction] is called
- /// synchronously and so ends up in the right place in the request queue.
var queue = transaction.newQueue();
- return callback(queue).then((result) {
- if (result) {
- transaction.commit(queue);
- } else {
- transaction.reject();
- }
- return result;
- }, onError: (Object error) {
+ bool result;
+ try {
+ result = await callback(queue);
+ } catch (_) {
transaction.commit(queue);
- throw error;
- });
+ rethrow;
+ }
+ if (result) {
+ transaction.commit(queue);
+ } else {
+ transaction.reject();
+ }
+ return result;
}
/// Passes a copy of this queue to [callback], and updates this queue to match
@@ -394,13 +381,13 @@
/// stream had closed.
///
/// The returned future completes with the result of calling
- /// `cancel`.
+ /// `cancel` on the subscription to the source stream.
///
/// After calling `cancel`, no further events can be requested.
/// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel]
/// may be called again.
Future? cancel({bool immediate = false}) {
- if (_isClosed) throw _failClosed();
+ _checkNotClosed();
_isClosed = true;
if (!immediate) {
@@ -529,12 +516,9 @@
// ------------------------------------------------------------------
// Internal helper methods.
- /// Returns an error for when a request is made after cancel.
- ///
- /// Returns a [StateError] with a message saying that either
- /// [cancel] or [rest] have already been called.
- Error _failClosed() {
- return StateError('Already cancelled');
+ /// Throws an error if [cancel] or [rest] have already been called.
+ void _checkNotClosed() {
+ if (_isClosed) throw StateError('Already cancelled');
}
/// Adds a new request to the queue.