Add `peek` and `lookAhead` to `StreamQueue`.
These allow users to look at events (similar to `next` and `take`)
without consuming them. For simple cases, they can be used instead of
`startTransaction` to decide what to do before doing it.
R=nweiz@google.com
Review-Url: https://codereview.chromium.org//2649033006 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1ac341e..5809285 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,9 @@
* Add an `AsyncCache` class that caches asynchronous operations for a period of
time.
+* Add `StreamQueue.peek` and `StreamQueue.lookAheead`.
+ These allow users to look at events without consuming them.
+
* Add `StreamQueue.startTransaction()` and `StreamQueue.withTransaction()`.
These allow users to conditionally consume events based on their values.
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 7b73eac..14eb7a0 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -131,6 +131,22 @@
throw _failClosed();
}
+
+ /// Look at the next [count] data events without consuming them.
+ ///
+ /// Works like [take] except that the events are left in the queue.
+ /// If one of the next [count] events is an error, the returned future
+ /// completes with this error, and the error is still left in the queue.
+ Future<List<T>> lookAhead(int count) {
+ if (count < 0) throw new RangeError.range(count, 0, null, "count");
+ if (!_isClosed) {
+ var request = new _LookAheadRequest<T>(count);
+ _addRequest(request);
+ return request.future;
+ }
+ throw _failClosed();
+ }
+
/// Requests the next (yet unrequested) event from the stream.
///
/// When the requested event arrives, the returned future is completed with
@@ -154,6 +170,19 @@
throw _failClosed();
}
+ /// Looks at the next (yet unrequested) event from the stream.
+ ///
+ /// Like [next] except that the event is not consumed.
+ /// If the next event is an error event, it stays in the queue.
+ Future<T> get peek {
+ if (!_isClosed) {
+ var nextRequest = new _PeekRequest<T>();
+ _addRequest(nextRequest);
+ return nextRequest.future;
+ }
+ throw _failClosed();
+ }
+
/// Returns a stream of all the remaning events of the source stream.
///
/// All requested [next], [skip] or [take] operations are completed
@@ -353,8 +382,8 @@
/// `cancel`.
///
/// After calling `cancel`, no further events can be requested.
- /// None of [next], [rest], [skip], [take] or [cancel] may be
- /// called again.
+ /// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel]
+ /// may be called again.
Future cancel({bool immediate: false}) {
if (_isClosed) throw _failClosed();
_isClosed = true;
@@ -692,15 +721,42 @@
return true;
}
if (isDone) {
- var errorFuture =
- new Future.sync(() => throw new StateError("No elements"));
- _completer.complete(errorFuture);
+ _completer.completeError(new StateError("No elements"),
+ StackTrace.current);
return true;
}
return false;
}
}
+
+/// Request for a [StreamQueue.peek] call.
+///
+/// Completes the returned future when receiving the first event,
+/// and is then complete, but doesn't consume the event.
+class _PeekRequest<T> implements _EventRequest<T> {
+ /// Completer for the future returned by [StreamQueue.next].
+ final _completer = new Completer<T>();
+
+ _PeekRequest();
+
+ Future<T> get future => _completer.future;
+
+ bool update(QueueList<Result<T>> events, bool isDone) {
+ if (events.isNotEmpty) {
+ events.first.complete(_completer);
+ return true;
+ }
+ if (isDone) {
+ _completer.completeError(new StateError("No elements"),
+ StackTrace.current);
+ return true;
+ }
+ return false;
+ }
+}
+
+
/// Request for a [StreamQueue.skip] call.
class _SkipRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the skip call.
@@ -738,8 +794,8 @@
}
}
-/// Request for a [StreamQueue.take] call.
-class _TakeRequest<T> implements _EventRequest<T> {
+/// Common superclass for [_TakeRequest] and [_LookAheadRequest].
+abstract class _ListRequest<T> implements _EventRequest<T> {
/// Completer for the future returned by the take call.
final _completer = new Completer<List<T>>();
@@ -752,10 +808,16 @@
/// this value.
final int _eventsToTake;
- _TakeRequest(this._eventsToTake);
+ _ListRequest(this._eventsToTake);
/// The future completed when the correct number of events have been captured.
Future<List<T>> get future => _completer.future;
+}
+
+
+/// Request for a [StreamQueue.take] call.
+class _TakeRequest<T> extends _ListRequest<T> {
+ _TakeRequest(int eventsToTake) : super(eventsToTake);
bool update(QueueList<Result<T>> events, bool isDone) {
while (_list.length < _eventsToTake) {
@@ -766,7 +828,7 @@
var event = events.removeFirst();
if (event.isError) {
- _completer.completeError(event.asError.error, event.asError.stackTrace);
+ event.asError.complete(_completer);
return true;
}
_list.add(event.asValue.value);
@@ -776,6 +838,30 @@
}
}
+
+/// Request for a [StreamQueue.lookAhead] call.
+class _LookAheadRequest<T> extends _ListRequest<T> {
+ _LookAheadRequest(int eventsToTake) : super(eventsToTake);
+
+ bool update(QueueList<Result<T>> events, bool isDone) {
+ while (_list.length < _eventsToTake) {
+ if (events.length == _list.length) {
+ if (isDone) break;
+ return false;
+ }
+ var event = events.elementAt(_list.length);
+ if (event.isError) {
+ event.asError.complete(_completer);
+ return true;
+ }
+ _list.add(event.asValue.value);
+ }
+ _completer.complete(_list);
+ return true;
+ }
+}
+
+
/// Request for a [StreamQueue.cancel] call.
///
/// The request needs no events, it just waits in the request queue
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
index 0aebb17..9668f0a 100644
--- a/test/stream_queue_test.dart
+++ b/test/stream_queue_test.dart
@@ -83,6 +83,65 @@
});
});
+ group("lookAhead operation", () {
+ test("as simple list of events", () async {
+ var events = new StreamQueue<int>(createStream());
+ expect(await events.lookAhead(4), [1, 2, 3, 4]);
+ expect(await events.next, 1);
+ expect(await events.lookAhead(2), [2, 3]);
+ expect(await events.take(2), [2, 3]);
+ expect(await events.next, 4);
+ await events.cancel();
+ });
+
+ test("of 0 events", () async {
+ var events = new StreamQueue<int>(createStream());
+ expect(events.lookAhead(0), completion([]));
+ expect(events.next, completion(1));
+ expect(events.lookAhead(0), completion([]));
+ expect(events.next, completion(2));
+ expect(events.lookAhead(0), completion([]));
+ expect(events.next, completion(3));
+ expect(events.lookAhead(0), completion([]));
+ expect(events.next, completion(4));
+ expect(events.lookAhead(0), completion([]));
+ expect(events.lookAhead(5), completion([]));
+ expect(events.next, throwsStateError);
+ await events.cancel();
+ });
+
+ test("with bad arguments throws", () async {
+ var events = new StreamQueue<int>(createStream());
+ expect(() => events.lookAhead(-1), throwsArgumentError);
+ expect(await events.next, 1); // Did not consume event.
+ expect(() => events.lookAhead(-1), throwsArgumentError);
+ expect(await events.next, 2); // Did not consume event.
+ await events.cancel();
+ });
+
+ test("of too many arguments", () async {
+ var events = new StreamQueue<int>(createStream());
+ expect(await events.lookAhead(6), [1, 2, 3, 4]);
+ await events.cancel();
+ });
+
+ test("too large later", () async {
+ var events = new StreamQueue<int>(createStream());
+ expect(await events.next, 1);
+ expect(await events.next, 2);
+ expect(await events.lookAhead(6), [3, 4]);
+ await events.cancel();
+ });
+
+ test("error", () async {
+ var events = new StreamQueue<int>(createErrorStream());
+ expect(events.lookAhead(4), throwsA("To err is divine!"));
+ expect(events.take(4), throwsA("To err is divine!"));
+ expect(await events.next, 4);
+ await events.cancel();
+ });
+ });
+
group("next operation", () {
test("simple sequence of requests", () async {
var events = new StreamQueue<int>(createStream());
@@ -374,11 +433,46 @@
});
});
+ group("peek operation", () {
+ test("peeks one event", () async {
+ var events = new StreamQueue<int>(createStream());
+ expect(await events.peek, 1);
+ expect(await events.next, 1);
+ expect(await events.peek, 2);
+ expect(await events.take(2), [2, 3]);
+ expect(await events.peek, 4);
+ expect(await events.next, 4);
+ // Throws at end.
+ expect(events.peek, throws);
+ await events.cancel();
+ });
+ test("multiple requests at the same time", () async {
+ var events = new StreamQueue<int>(createStream());
+ var result = await Future.wait(
+ [events.peek, events.peek, events.next, events.peek, events.peek]);
+ expect(result, [1, 1, 1, 2, 2]);
+ await events.cancel();
+ });
+ test("sequence of requests with error", () async {
+ var events = new StreamQueue<int>(createErrorStream());
+ expect(await events.next, 1);
+ expect(await events.next, 2);
+ expect(events.peek, throwsA("To err is divine!"));
+ // Error stays in queue.
+ expect(events.peek, throwsA("To err is divine!"));
+ expect(events.next, throwsA("To err is divine!"));
+ expect(await events.next, 4);
+ await events.cancel();
+ });
+ });
+
group("cancel operation", () {
test("closes the events, prevents any other operation", () async {
var events = new StreamQueue<int>(createStream());
await events.cancel();
+ expect(() => events.lookAhead(1), throwsStateError);
expect(() => events.next, throwsStateError);
+ expect(() => events.peek, throwsStateError);
expect(() => events.skip(1), throwsStateError);
expect(() => events.take(1), throwsStateError);
expect(() => events.rest, throwsStateError);