| // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| library watcher.async_queue; |
| |
| import 'dart:async'; |
| import 'dart:collection'; |
| |
| typedef Future ItemProcessor<T>(T item); |
| typedef void ErrorHandler(error); |
| |
| /// A queue of items that are sequentially, asynchronously processed. |
| /// |
| /// Unlike [Stream.map] or [Stream.forEach], the callback used to process each |
| /// item returns a [Future], and it will not advance to the next item until the |
| /// current item is finished processing. |
| /// |
| /// Items can be added at any point in time and processing will be started as |
| /// needed. When all items are processed, it stops processing until more items |
| /// are added. |
| class AsyncQueue<T> { |
| final _items = new Queue<T>(); |
| |
| /// Whether or not the queue is currently waiting on a processing future to |
| /// complete. |
| bool _isProcessing = false; |
| |
| /// The callback to invoke on each queued item. |
| /// |
| /// The next item in the queue will not be processed until the [Future] |
| /// returned by this completes. |
| final ItemProcessor<T> _processor; |
| |
| /// The handler for errors thrown during processing. |
| /// |
| /// Used to avoid top-leveling asynchronous errors. |
| final ErrorHandler _errorHandler; |
| |
| AsyncQueue(this._processor, {ErrorHandler onError}) |
| : _errorHandler = onError; |
| |
| /// Enqueues [item] to be processed and starts asynchronously processing it |
| /// if a process isn't already running. |
| void add(T item) { |
| _items.add(item); |
| |
| // Start up the asynchronous processing if not already running. |
| if (_isProcessing) return; |
| _isProcessing = true; |
| |
| _processNextItem().catchError(_errorHandler); |
| } |
| |
| /// Removes all remaining items to be processed. |
| void clear() { |
| _items.clear(); |
| } |
| |
| /// Pulls the next item off [_items] and processes it. |
| /// |
| /// When complete, recursively calls itself to continue processing unless |
| /// the process was cancelled. |
| Future _processNextItem() { |
| var item = _items.removeFirst(); |
| return _processor(item).then((_) { |
| if (_items.isNotEmpty) return _processNextItem(); |
| |
| // We have drained the queue, stop processing and wait until something |
| // has been enqueued. |
| _isProcessing = false; |
| }); |
| } |
| } |