Add `peek` and `lookAhead` to `StreamQueue`.

These allow users to look at events (similar to `next` and `take`)
without consuming them. For simple cases, they can be used instead of
`startTransaction` to decide what to do before doing it.

R=nweiz@google.com

Review-Url: https://codereview.chromium.org//2649033006 .
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1ac341e..5809285 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -8,6 +8,9 @@
 * Add an `AsyncCache` class that caches asynchronous operations for a period of
   time.
 
+* Add `StreamQueue.peek` and `StreamQueue.lookAheead`.
+  These allow users to look at events without consuming them.
+
 * Add `StreamQueue.startTransaction()` and `StreamQueue.withTransaction()`.
   These allow users to conditionally consume events based on their values.
 
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 7b73eac..14eb7a0 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -131,6 +131,22 @@
     throw _failClosed();
   }
 
+
+  /// Look at the next [count] data events without consuming them.
+  ///
+  /// Works like [take] except that the events are left in the queue.
+  /// If one of the next [count] events is an error, the returned future
+  /// completes with this error, and the error is still left in the queue.
+  Future<List<T>> lookAhead(int count) {
+    if (count < 0) throw new RangeError.range(count, 0, null, "count");
+    if (!_isClosed) {
+      var request = new _LookAheadRequest<T>(count);
+      _addRequest(request);
+      return request.future;
+    }
+    throw _failClosed();
+  }
+
   /// Requests the next (yet unrequested) event from the stream.
   ///
   /// When the requested event arrives, the returned future is completed with
@@ -154,6 +170,19 @@
     throw _failClosed();
   }
 
+  /// Looks at the next (yet unrequested) event from the stream.
+  ///
+  /// Like [next] except that the event is not consumed.
+  /// If the next event is an error event, it stays in the queue.
+  Future<T> get peek {
+    if (!_isClosed) {
+      var nextRequest = new _PeekRequest<T>();
+      _addRequest(nextRequest);
+      return nextRequest.future;
+    }
+    throw _failClosed();
+  }
+
   /// Returns a stream of all the remaning events of the source stream.
   ///
   /// All requested [next], [skip] or [take] operations are completed
@@ -353,8 +382,8 @@
   /// `cancel`.
   ///
   /// After calling `cancel`, no further events can be requested.
-  /// None of [next], [rest], [skip], [take] or [cancel] may be
-  /// called again.
+  /// None of [lookAhead], [next], [peek], [rest], [skip], [take] or [cancel]
+  /// may be called again.
   Future cancel({bool immediate: false}) {
     if (_isClosed) throw _failClosed();
     _isClosed = true;
@@ -692,15 +721,42 @@
       return true;
     }
     if (isDone) {
-      var errorFuture =
-          new Future.sync(() => throw new StateError("No elements"));
-      _completer.complete(errorFuture);
+      _completer.completeError(new StateError("No elements"),
+                               StackTrace.current);
       return true;
     }
     return false;
   }
 }
 
+
+/// Request for a [StreamQueue.peek] call.
+///
+/// Completes the returned future when receiving the first event,
+/// and is then complete, but doesn't consume the event.
+class _PeekRequest<T> implements _EventRequest<T> {
+  /// Completer for the future returned by [StreamQueue.next].
+  final _completer = new Completer<T>();
+
+  _PeekRequest();
+
+  Future<T> get future => _completer.future;
+
+  bool update(QueueList<Result<T>> events, bool isDone) {
+    if (events.isNotEmpty) {
+      events.first.complete(_completer);
+      return true;
+    }
+    if (isDone) {
+      _completer.completeError(new StateError("No elements"),
+                               StackTrace.current);
+      return true;
+    }
+    return false;
+  }
+}
+
+
 /// Request for a [StreamQueue.skip] call.
 class _SkipRequest<T> implements _EventRequest<T> {
   /// Completer for the future returned by the skip call.
@@ -738,8 +794,8 @@
   }
 }
 
-/// Request for a [StreamQueue.take] call.
-class _TakeRequest<T> implements _EventRequest<T> {
+/// Common superclass for [_TakeRequest] and [_LookAheadRequest].
+abstract class _ListRequest<T> implements _EventRequest<T> {
   /// Completer for the future returned by the take call.
   final _completer = new Completer<List<T>>();
 
@@ -752,10 +808,16 @@
   /// this value.
   final int _eventsToTake;
 
-  _TakeRequest(this._eventsToTake);
+  _ListRequest(this._eventsToTake);
 
   /// The future completed when the correct number of events have been captured.
   Future<List<T>> get future => _completer.future;
+}
+
+
+/// Request for a [StreamQueue.take] call.
+class _TakeRequest<T> extends _ListRequest<T> {
+  _TakeRequest(int eventsToTake) : super(eventsToTake);
 
   bool update(QueueList<Result<T>> events, bool isDone) {
     while (_list.length < _eventsToTake) {
@@ -766,7 +828,7 @@
 
       var event = events.removeFirst();
       if (event.isError) {
-        _completer.completeError(event.asError.error, event.asError.stackTrace);
+        event.asError.complete(_completer);
         return true;
       }
       _list.add(event.asValue.value);
@@ -776,6 +838,30 @@
   }
 }
 
+
+/// Request for a [StreamQueue.lookAhead] call.
+class _LookAheadRequest<T> extends _ListRequest<T> {
+  _LookAheadRequest(int eventsToTake) : super(eventsToTake);
+
+  bool update(QueueList<Result<T>> events, bool isDone) {
+    while (_list.length < _eventsToTake) {
+      if (events.length == _list.length) {
+        if (isDone) break;
+        return false;
+      }
+      var event = events.elementAt(_list.length);
+      if (event.isError) {
+        event.asError.complete(_completer);
+        return true;
+      }
+      _list.add(event.asValue.value);
+    }
+    _completer.complete(_list);
+    return true;
+  }
+}
+
+
 /// Request for a [StreamQueue.cancel] call.
 ///
 /// The request needs no events, it just waits in the request queue
diff --git a/test/stream_queue_test.dart b/test/stream_queue_test.dart
index 0aebb17..9668f0a 100644
--- a/test/stream_queue_test.dart
+++ b/test/stream_queue_test.dart
@@ -83,6 +83,65 @@
     });
   });
 
+  group("lookAhead operation", () {
+    test("as simple list of events", () async {
+      var events = new StreamQueue<int>(createStream());
+      expect(await events.lookAhead(4), [1, 2, 3, 4]);
+      expect(await events.next, 1);
+      expect(await events.lookAhead(2), [2, 3]);
+      expect(await events.take(2), [2, 3]);
+      expect(await events.next, 4);
+      await events.cancel();
+    });
+
+    test("of 0 events", () async {
+      var events = new StreamQueue<int>(createStream());
+      expect(events.lookAhead(0), completion([]));
+      expect(events.next, completion(1));
+      expect(events.lookAhead(0), completion([]));
+      expect(events.next, completion(2));
+      expect(events.lookAhead(0), completion([]));
+      expect(events.next, completion(3));
+      expect(events.lookAhead(0), completion([]));
+      expect(events.next, completion(4));
+      expect(events.lookAhead(0), completion([]));
+      expect(events.lookAhead(5), completion([]));
+      expect(events.next, throwsStateError);
+      await events.cancel();
+    });
+
+    test("with bad arguments throws", () async {
+      var events = new StreamQueue<int>(createStream());
+      expect(() => events.lookAhead(-1), throwsArgumentError);
+      expect(await events.next, 1);  // Did not consume event.
+      expect(() => events.lookAhead(-1), throwsArgumentError);
+      expect(await events.next, 2);  // Did not consume event.
+      await events.cancel();
+    });
+
+    test("of too many arguments", () async {
+      var events = new StreamQueue<int>(createStream());
+      expect(await events.lookAhead(6), [1, 2, 3, 4]);
+      await events.cancel();
+    });
+
+    test("too large later", () async {
+      var events = new StreamQueue<int>(createStream());
+      expect(await events.next, 1);
+      expect(await events.next, 2);
+      expect(await events.lookAhead(6), [3, 4]);
+      await events.cancel();
+    });
+
+    test("error", () async {
+      var events = new StreamQueue<int>(createErrorStream());
+      expect(events.lookAhead(4), throwsA("To err is divine!"));
+      expect(events.take(4), throwsA("To err is divine!"));
+      expect(await events.next, 4);
+      await events.cancel();
+    });
+  });
+
   group("next operation", () {
     test("simple sequence of requests", () async {
       var events = new StreamQueue<int>(createStream());
@@ -374,11 +433,46 @@
     });
   });
 
+  group("peek operation", () {
+    test("peeks one event", () async {
+      var events = new StreamQueue<int>(createStream());
+      expect(await events.peek, 1);
+      expect(await events.next, 1);
+      expect(await events.peek, 2);
+      expect(await events.take(2), [2, 3]);
+      expect(await events.peek, 4);
+      expect(await events.next, 4);
+      // Throws at end.
+      expect(events.peek, throws);
+      await events.cancel();
+    });
+    test("multiple requests at the same time", () async {
+      var events = new StreamQueue<int>(createStream());
+      var result = await Future.wait(
+          [events.peek, events.peek, events.next, events.peek, events.peek]);
+      expect(result, [1, 1, 1, 2, 2]);
+      await events.cancel();
+    });
+    test("sequence of requests with error", () async {
+      var events = new StreamQueue<int>(createErrorStream());
+      expect(await events.next, 1);
+      expect(await events.next, 2);
+      expect(events.peek, throwsA("To err is divine!"));
+      // Error stays in queue.
+      expect(events.peek, throwsA("To err is divine!"));
+      expect(events.next, throwsA("To err is divine!"));
+      expect(await events.next, 4);
+      await events.cancel();
+    });
+  });
+
   group("cancel operation", () {
     test("closes the events, prevents any other operation", () async {
       var events = new StreamQueue<int>(createStream());
       await events.cancel();
+      expect(() => events.lookAhead(1), throwsStateError);
       expect(() => events.next, throwsStateError);
+      expect(() => events.peek, throwsStateError);
       expect(() => events.skip(1), throwsStateError);
       expect(() => events.take(1), throwsStateError);
       expect(() => events.rest, throwsStateError);