blob: 09b3a75b2360a9adf2a18ad2715fe2fef08cc656 [file] [log] [blame]
// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
library async.stream_events;
import 'dart:async';
import 'dart:collection';
import "subscription_stream.dart";
import "stream_completer.dart";
import "../result.dart";
/// An asynchronous pull-based interface for accessing stream events.
///
/// Wraps a stream and makes individual events available on request.
///
/// You can request (and reserve) one or more events from the stream,
/// and after all previous requests have been fulfilled, stream events
/// go towards fulfilling your request.
///
/// For example, if you ask for [next] two times, the returned futures
/// will be completed by the next two unrequested events from the stream.
///
/// The stream subscription is paused when there are no active
/// requests.
///
/// Some streams, including broadcast streams, will buffer
/// events while paused, so waiting too long between requests may
/// cause memory bloat somewhere else.
///
/// This is similar to, but more convenient than, a [StreamIterator].
/// A `StreamIterator` requires you to manually check when a new event is
/// available and you can only access the value of that event until you
/// check for the next one. A `StreamQueue` allows you to request, for example,
/// three events at a time, either individually, as a group using [take]
/// or [skip], or in any combination.
///
/// You can also ask to have the [rest] of the stream provided as
/// a new stream. This allows, for example, taking the first event
/// out of a stream and continuing to use the rest of the stream as a stream.
///
/// Example:
///
/// var events = new StreamQueue<String>(someStreamOfLines);
/// var first = await events.next;
/// while (first.startsWith('#')) {
/// // Skip comments.
/// first = await events.next;
/// }
///
/// if (first.startsWith(MAGIC_MARKER)) {
/// var headerCount =
/// first.parseInt(first.substring(MAGIC_MARKER.length + 1));
/// handleMessage(headers: await events.take(headerCount),
/// body: events.rest);
/// return;
/// }
/// // Error handling.
///
/// When you need no further events the `StreamQueue` should be closed
/// using [cancel]. This releases the underlying stream subscription.
abstract class StreamQueue<T> {
// This class maintains two queues: one of events and one of requests.
// The active request (the one in front of the queue) is called with
// the current event queue when it becomes active, every time a
// new event arrives, and when the event source closes.
//
// If the request returns `true`, it's complete and will be removed from the
// request queue.
// If the request returns `false`, it needs more events, and will be called
// again when new events are available. It may trigger a call itself by
// calling [_updateRequests].
// The request can remove events that it uses, or keep them in the event
// queue until it has all that it needs.
//
// This model is very flexible and easily extensible.
// It allows requests that don't consume events (like [hasNext]) or
// potentially a request that takes either five or zero events, determined
// by the content of the fifth event.
/// Whether the event source is done.
bool _isDone = false;
/// Whether a closing operation has been performed on the stream queue.
///
/// Closing operations are [cancel] and [rest].
bool _isClosed = false;
/// Queue of events not used by a request yet.
final Queue<Result> _eventQueue = new Queue();
/// Queue of pending requests.
///
/// Access through methods below to ensure consistency.
final Queue<_EventRequest> _requestQueue = new Queue();
/// Create a `StreamQueue` of the events of [source].
factory StreamQueue(Stream source) = _StreamQueue<T>;
StreamQueue._();
/// Asks if the stream has any more events.
///
/// Returns a future that completes with `true` if the stream has any
/// more events, whether data or error.
/// If the stream closes without producing any more events, the returned
/// future completes with `false`.
///
/// Can be used before using [next] to avoid getting an error in the
/// future returned by `next` in the case where there are no more events.
/// Another alternative is to use `take(1)` which returns either zero or
/// one events.
Future<bool> get hasNext {
if (!_isClosed) {
var hasNextRequest = new _HasNextRequest();
_addRequest(hasNextRequest);
return hasNextRequest.future;
}
throw _failClosed();
}
/// Requests the next (yet unrequested) event from the stream.
///
/// When the requested event arrives, the returned future is completed with
/// the event.
/// If the event is a data event, the returned future completes
/// with its value.
/// If the event is an error event, the returned future completes with
/// its error and stack trace.
/// If the stream closes before an event arrives, the returned future
/// completes with a [StateError].
///
/// It's possible to have several pending [next] calls (or other requests),
/// and they will be completed in the order they were requested, by the
/// first events that were not consumed by previous requeusts.
Future<T> get next {
if (!_isClosed) {
var nextRequest = new _NextRequest<T>();
_addRequest(nextRequest);
return nextRequest.future;
}
throw _failClosed();
}
/// Returns a stream of all the remaning events of the source stream.
///
/// All requested [next], [skip] or [take] operations are completed
/// first, and then any remaining events are provided as events of
/// the returned stream.
///
/// Using `rest` closes this stream queue. After getting the
/// `rest` the caller may no longer request other events, like
/// after calling [cancel].
Stream<T> get rest {
if (_isClosed) {
throw _failClosed();
}
var request = new _RestRequest<T>(this);
_isClosed = true;
_addRequest(request);
return request.stream;
}
/// Skips the next [count] *data* events.
///
/// The [count] must be non-negative.
///
/// When successful, this is equivalent to using [take]
/// and ignoring the result.
///
/// If an error occurs before `count` data events have been skipped,
/// the returned future completes with that error instead.
///
/// If the stream closes before `count` data events,
/// the remaining unskipped event count is returned.
/// If the returned future completes with the integer `0`,
/// then all events were succssfully skipped. If the value
/// is greater than zero then the stream ended early.
Future<int> skip(int count) {
if (count < 0) throw new RangeError.range(count, 0, null, "count");
if (!_isClosed) {
var request = new _SkipRequest(count);
_addRequest(request);
return request.future;
}
throw _failClosed();
}
/// Requests the next [count] data events as a list.
///
/// The [count] must be non-negative.
///
/// Equivalent to calling [next] `count` times and
/// storing the data values in a list.
///
/// If an error occurs before `count` data events has
/// been collected, the returned future completes with
/// that error instead.
///
/// If the stream closes before `count` data events,
/// the returned future completes with the list
/// of data collected so far. That is, the returned
/// list may have fewer than [count] elements.
Future<List<T>> take(int count) {
if (count < 0) throw new RangeError.range(count, 0, null, "count");
if (!_isClosed) {
var request = new _TakeRequest<T>(count);
_addRequest(request);
return request.future;
}
throw _failClosed();
}
/// Cancels the underlying event source.
///
/// If [immediate] is `false` (the default), the cancel operation waits until
/// all previously requested events have been processed, then it cancels the
/// subscription providing the events.
///
/// If [immediate] is `true`, the source is instead canceled
/// immediately. Any pending events are completed as though the underlying
/// stream had closed.
///
/// The returned future completes with the result of calling
/// `cancel`.
///
/// After calling `cancel`, no further events can be requested.
/// None of [next], [rest], [skip], [take] or [cancel] may be
/// called again.
Future cancel({bool immediate: false}) {
if (_isClosed) throw _failClosed();
_isClosed = true;
if (!immediate) {
var request = new _CancelRequest(this);
_addRequest(request);
return request.future;
}
if (_isDone && _eventQueue.isEmpty) return new Future.value();
return _cancel();
}
// ------------------------------------------------------------------
// Methods that may be called from the request implementations to
// control the even stream.
/// Matches events with requests.
///
/// Called after receiving an event or when the event source closes.
///
/// May be called by requests which have returned `false` (saying they
/// are not yet done) so they can be checked again before any new
/// events arrive.
/// Any request returing `false` from `update` when `isDone` is `true`
/// *must* call `_updateRequests` when they are ready to continue
/// (since no further events will trigger the call).
void _updateRequests() {
while (_requestQueue.isNotEmpty) {
if (_requestQueue.first.update(_eventQueue, _isDone)) {
_requestQueue.removeFirst();
} else {
return;
}
}
if (!_isDone) {
_pause();
}
}
/// Extracts a stream from the event source and makes this stream queue
/// unusable.
///
/// Can only be used by the very last request (the stream queue must
/// be closed by that request).
/// Only used by [rest].
Stream _extractStream();
/// Requests that the event source pauses events.
///
/// This is called automatically when the request queue is empty.
///
/// The event source is restarted by the next call to [_ensureListening].
void _pause();
/// Ensures that we are listening on events from the event source.
///
/// Starts listening for the first time or resumes after a [_pause].
///
/// Is called automatically if a request requires more events.
void _ensureListening();
/// Cancels the underlying event source.
Future _cancel();
// ------------------------------------------------------------------
// Methods called by the event source to add events or say that it's
// done.
/// Called when the event source adds a new data or error event.
/// Always calls [_updateRequests] after adding.
void _addResult(Result result) {
_eventQueue.add(result);
_updateRequests();
}
/// Called when the event source is done.
/// Always calls [_updateRequests] after adding.
void _close() {
_isDone = true;
_updateRequests();
}
// ------------------------------------------------------------------
// Internal helper methods.
/// Returns an error for when a request is made after cancel.
///
/// Returns a [StateError] with a message saying that either
/// [cancel] or [rest] have already been called.
Error _failClosed() {
return new StateError("Already cancelled");
}
/// Adds a new request to the queue.
///
/// If the request queue is empty and the request can be completed
/// immediately, it skips the queue.
void _addRequest(_EventRequest request) {
if (_requestQueue.isEmpty) {
if (request.update(_eventQueue, _isDone)) return;
_ensureListening();
}
_requestQueue.add(request);
}
}
/// The default implementation of [StreamQueue].
///
/// This queue gets its events from a stream which is listened
/// to when a request needs events.
class _StreamQueue<T> extends StreamQueue<T> {
/// Source of events.
final Stream _sourceStream;
/// Subscription on [_sourceStream] while listening for events.
///
/// Set to subscription when listening, and set to `null` when the
/// subscription is done (and [_isDone] is set to true).
StreamSubscription _subscription;
_StreamQueue(this._sourceStream) : super._();
Future _cancel() {
if (_isDone) return null;
if (_subscription == null) _subscription = _sourceStream.listen(null);
var future = _subscription.cancel();
_close();
return future;
}
void _ensureListening() {
assert(!_isDone);
if (_subscription == null) {
_subscription =
_sourceStream.listen(
(data) {
_addResult(new Result.value(data));
},
onError: (error, StackTrace stackTrace) {
_addResult(new Result.error(error, stackTrace));
},
onDone: () {
_subscription = null;
this._close();
});
} else {
_subscription.resume();
}
}
void _pause() {
_subscription.pause();
}
Stream<T> _extractStream() {
assert(_isClosed);
if (_isDone) {
return new Stream<T>.empty();
}
if (_subscription == null) {
return _sourceStream;
}
var subscription = _subscription;
_subscription = null;
_isDone = true;
var wasPaused = subscription.isPaused;
var result = new SubscriptionStream<T>(subscription);
// Resume after creating stream because that pauses the subscription too.
// This way there won't be a short resumption in the middle.
if (wasPaused) subscription.resume();
return result;
}
}
/// Request object that receives events when they arrive, until fulfilled.
///
/// Each request that cannot be fulfilled immediately is represented by
/// an `_EventRequest` object in the request queue.
///
/// Events from the source stream are sent to the first request in the
/// queue until it reports itself as [isComplete].
///
/// When the first request in the queue `isComplete`, either when becoming
/// the first request or after receiving an event, its [close] methods is
/// called.
///
/// The [close] method is also called immediately when the source stream
/// is done.
abstract class _EventRequest {
/// Handle available events.
///
/// The available events are provided as a queue. The `update` function
/// should only remove events from the front of the event queue, e.g.,
/// using [removeFirst].
///
/// Returns `true` if the request is completed, or `false` if it needs
/// more events.
/// The call may keep events in the queue until the requeust is complete,
/// or it may remove them immediately.
///
/// If the method returns true, the request is considered fulfilled, and
/// will never be called again.
///
/// This method is called when a request reaches the front of the request
/// queue, and if it returns `false`, it's called again every time a new event
/// becomes available, or when the stream closes.
/// If the function returns `false` when the stream has already closed
/// ([isDone] is true), then the request must call
/// [StreamQueue._updateRequests] itself when it's ready to continue.
bool update(Queue<Result> events, bool isDone);
}
/// Request for a [StreamQueue.next] call.
///
/// Completes the returned future when receiving the first event,
/// and is then complete.
class _NextRequest<T> implements _EventRequest {
/// Completer for the future returned by [StreamQueue.next].
final Completer _completer;
_NextRequest() : _completer = new Completer<T>();
Future<T> get future => _completer.future;
bool update(Queue<Result> events, bool isDone) {
if (events.isNotEmpty) {
events.removeFirst().complete(_completer);
return true;
}
if (isDone) {
var errorFuture =
new Future.sync(() => throw new StateError("No elements"));
_completer.complete(errorFuture);
return true;
}
return false;
}
}
/// Request for a [StreamQueue.skip] call.
class _SkipRequest implements _EventRequest {
/// Completer for the future returned by the skip call.
final Completer _completer = new Completer<int>();
/// Number of remaining events to skip.
///
/// The request [isComplete] when the values reaches zero.
///
/// Decremented when an event is seen.
/// Set to zero when an error is seen since errors abort the skip request.
int _eventsToSkip;
_SkipRequest(this._eventsToSkip);
/// The future completed when the correct number of events have been skipped.
Future get future => _completer.future;
bool update(Queue<Result> events, bool isDone) {
while (_eventsToSkip > 0) {
if (events.isEmpty) {
if (isDone) break;
return false;
}
_eventsToSkip--;
var event = events.removeFirst();
if (event.isError) {
event.complete(_completer);
return true;
}
}
_completer.complete(_eventsToSkip);
return true;
}
}
/// Request for a [StreamQueue.take] call.
class _TakeRequest<T> implements _EventRequest {
/// Completer for the future returned by the take call.
final Completer _completer;
/// List collecting events until enough have been seen.
final List _list = <T>[];
/// Number of events to capture.
///
/// The request [isComplete] when the length of [_list] reaches
/// this value.
final int _eventsToTake;
_TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>();
/// The future completed when the correct number of events have been captured.
Future get future => _completer.future;
bool update(Queue<Result> events, bool isDone) {
while (_list.length < _eventsToTake) {
if (events.isEmpty) {
if (isDone) break;
return false;
}
var result = events.removeFirst();
if (result.isError) {
result.complete(_completer);
return true;
}
_list.add(result.asValue.value);
}
_completer.complete(_list);
return true;
}
}
/// Request for a [StreamQueue.cancel] call.
///
/// The request needs no events, it just waits in the request queue
/// until all previous events are fulfilled, then it cancels the stream queue
/// source subscription.
class _CancelRequest implements _EventRequest {
/// Completer for the future returned by the `cancel` call.
final Completer _completer = new Completer();
/// The [StreamQueue] object that has this request queued.
///
/// When the event is completed, it needs to cancel the active subscription
/// of the `StreamQueue` object, if any.
final StreamQueue _streamQueue;
_CancelRequest(this._streamQueue);
/// The future completed when the cancel request is completed.
Future get future => _completer.future;
bool update(Queue<Result> events, bool isDone) {
if (_streamQueue._isDone) {
_completer.complete();
} else {
_streamQueue._ensureListening();
_completer.complete(_streamQueue._extractStream().listen(null).cancel());
}
return true;
}
}
/// Request for a [StreamQueue.rest] call.
///
/// The request is always complete, it just waits in the request queue
/// until all previous events are fulfilled, then it takes over the
/// stream events subscription and creates a stream from it.
class _RestRequest<T> implements _EventRequest {
/// Completer for the stream returned by the `rest` call.
final StreamCompleter _completer = new StreamCompleter<T>();
/// The [StreamQueue] object that has this request queued.
///
/// When the event is completed, it needs to cancel the active subscription
/// of the `StreamQueue` object, if any.
final StreamQueue _streamQueue;
_RestRequest(this._streamQueue);
/// The stream which will contain the remaining events of [_streamQueue].
Stream<T> get stream => _completer.stream;
bool update(Queue<Result> events, bool isDone) {
if (events.isEmpty) {
if (_streamQueue._isDone) {
_completer.setEmpty();
} else {
_completer.setSourceStream(_streamQueue._extractStream());
}
} else {
// There are prefetched events which needs to be added before the
// remaining stream.
var controller = new StreamController<T>();
for (var event in events) {
event.addTo(controller);
}
controller.addStream(_streamQueue._extractStream(), cancelOnError: false)
.whenComplete(controller.close);
_completer.setSourceStream(controller.stream);
}
return true;
}
}
/// Request for a [StreamQueue.hasNext] call.
///
/// Completes the [future] with `true` if it sees any event,
/// but doesn't consume the event.
/// If the request is closed without seeing an event, then
/// the [future] is completed with `false`.
class _HasNextRequest<T> implements _EventRequest {
final Completer _completer = new Completer<bool>();
Future<bool> get future => _completer.future;
bool update(Queue<Result> events, bool isDone) {
if (events.isNotEmpty) {
_completer.complete(true);
return true;
}
if (isDone) {
_completer.complete(false);
return true;
}
return false;
}
}