blob: 12dd5e1b06534d06a330a46775cd019ba1c8023e [file] [log] [blame]
// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:collection';
import 'package:async/async.dart';
import 'package:stack_trace/stack_trace.dart';
/// Manages an abstract pool of resources with a limit on how many may be in use
/// at once.
/// When a resource is needed, the user should call [request]. When the returned
/// future completes with a [PoolResource], the resource may be allocated. Once
/// the resource has been released, the user should call [PoolResource.release].
/// The pool will ensure that only a certain number of [PoolResource]s may be
/// allocated at once.
class Pool {
/// Completers for requests beyond the first [_maxAllocatedResources].
/// When an item is released, the next element of [_requestedResources] will
/// be completed.
final _requestedResources = Queue<Completer<PoolResource>>();
/// Callbacks that must be called before additional resources can be
/// allocated.
/// See [PoolResource.allowRelease].
final _onReleaseCallbacks = Queue<void Function()>();
/// Completers that will be completed once `onRelease` callbacks are done
/// running.
/// These are kept in a queue to ensure that the earliest request completes
/// first regardless of what order the `onRelease` callbacks complete in.
final _onReleaseCompleters = Queue<Completer<PoolResource>>();
/// The maximum number of resources that may be allocated at once.
final int _maxAllocatedResources;
/// The number of resources that are currently allocated.
int _allocatedResources = 0;
/// The timeout timer.
/// This timer is canceled as long as the pool is below the resource limit.
/// It's reset once the resource limit is reached and again every time an
/// resource is released or a new resource is requested. If it fires, that
/// indicates that the caller became deadlocked, likely due to files waiting
/// for additional files to be read before they could be closed.
/// This is `null` if this pool shouldn't time out.
RestartableTimer _timer;
/// The amount of time to wait before timing out the pending resources.
final Duration _timeout;
/// A [FutureGroup] that tracks all the `onRelease` callbacks for resources
/// that have been marked releasable.
/// This is `null` until [close] is called.
FutureGroup _closeGroup;
/// Whether [close] has been called.
bool get isClosed => _closeMemo.hasRun;
/// A future that completes once the pool is closed and all its outstanding
/// resources have been released.
/// If any [PoolResource.allowRelease] callback throws an exception after the
/// pool is closed, this completes with that exception.
Future get done => _closeMemo.future;
/// Creates a new pool with the given limit on how many resources may be
/// allocated at once.
/// If [timeout] is passed, then if that much time passes without any activity
/// all pending [request] futures will throw a [TimeoutException]. This is
/// intended to avoid deadlocks.
Pool(this._maxAllocatedResources, {Duration timeout}) : _timeout = timeout {
if (_maxAllocatedResources <= 0) {
throw ArgumentError.value(_maxAllocatedResources, 'maxAllocatedResources',
'Must be greater than zero.');
if (timeout != null) {
// Start the timer canceled since we only want to start counting down once
// we've run out of available resources.
_timer = RestartableTimer(timeout, _onTimeout)..cancel();
/// Request a [PoolResource].
/// If the maximum number of resources is already allocated, this will delay
/// until one of them is released.
Future<PoolResource> request() {
if (isClosed) {
throw StateError("request() may not be called on a closed Pool.");
if (_allocatedResources < _maxAllocatedResources) {
return Future.value(PoolResource._(this));
} else if (_onReleaseCallbacks.isNotEmpty) {
return _runOnRelease(_onReleaseCallbacks.removeFirst());
} else {
var completer = Completer<PoolResource>();
return completer.future;
/// Requests a resource for the duration of [callback], which may return a
/// Future.
/// The return value of [callback] is piped to the returned Future.
Future<T> withResource<T>(FutureOr<T> Function() callback) async {
if (isClosed) {
throw StateError("withResource() may not be called on a closed Pool.");
var resource = await request();
try {
return await callback();
} finally {
/// Returns a [Stream] containing the result of [action] applied to each
/// element of [elements].
/// While [action] is invoked on each element of [elements] in order,
/// it's possible the return [Stream] may have items out-of-order – especially
/// if the completion time of [action] varies.
/// If [action] throws an error the source item along with the error object
/// and [StackTrace] are passed to [onError], if it is provided. If [onError]
/// returns `true`, the error is added to the returned [Stream], otherwise
/// it is ignored.
/// Errors thrown from iterating [elements] will not be passed to
/// [onError]. They will always be added to the returned stream as an error.
/// Note: all of the resources of the this [Pool] will be used when the
/// returned [Stream] is listened to until it is completed or canceled.
/// Note: if this [Pool] is closed before the returned [Stream] is listened
/// to, a [StateError] is thrown.
Stream<T> forEach<S, T>(
Iterable<S> elements, FutureOr<T> Function(S source) action,
{bool Function(S item, Object error, StackTrace stack) onError}) {
onError ??= (item, e, s) => true;
var cancelPending = false;
Completer resumeCompleter;
StreamController<T> controller;
Iterator<S> iterator;
Future<void> run(int i) async {
while (iterator.moveNext()) {
// caching `current` is necessary because there are async breaks
// in this code and `iterator` is shared across many workers
final current = iterator.current;
if (resumeCompleter != null) {
await resumeCompleter.future;
if (cancelPending) {
T value;
try {
value = await action(current);
} catch (e, stack) {
if (onError(current, e, stack)) {
controller.addError(e, stack);
Future doneFuture;
void onListen() {
assert(iterator == null);
iterator = elements.iterator;
assert(doneFuture == null);
doneFuture = Future.wait(
.map((i) => withResource(() => run(i))),
eagerError: true)
controller = StreamController<T>(
sync: true,
onListen: onListen,
onCancel: () async {
cancelPending = true;
await doneFuture;
onPause: () {
assert(resumeCompleter == null);
resumeCompleter = Completer();
onResume: () {
assert(resumeCompleter != null);
resumeCompleter = null;
/// Closes the pool so that no more resources are requested.
/// Existing resource requests remain unchanged.
/// Any resources that are marked as releasable using
/// [PoolResource.allowRelease] are released immediately. Once all resources
/// have been released and any `onRelease` callbacks have completed, the
/// returned future completes successfully. If any `onRelease` callback throws
/// an error, the returned future completes with that error.
/// This may be called more than once; it returns the same [Future] each time.
Future close() => _closeMemo.runOnce(() {
if (_closeGroup != null) return _closeGroup.future;
_closeGroup = FutureGroup();
for (var callback in _onReleaseCallbacks) {
_allocatedResources -= _onReleaseCallbacks.length;
if (_allocatedResources == 0) _closeGroup.close();
return _closeGroup.future;
final _closeMemo = AsyncMemoizer();
/// If there are any pending requests, this will fire the oldest one.
void _onResourceReleased() {
if (_requestedResources.isNotEmpty) {
var pending = _requestedResources.removeFirst();
} else {
if (isClosed && _allocatedResources == 0) _closeGroup.close();
/// If there are any pending requests, this will fire the oldest one after
/// running [onRelease].
void _onResourceReleaseAllowed(Function() onRelease) {
if (_requestedResources.isNotEmpty) {
var pending = _requestedResources.removeFirst();
} else if (isClosed) {
if (_allocatedResources == 0) _closeGroup.close();
} else {
var zone = Zone.current;
var registered = zone.registerCallback(onRelease);
_onReleaseCallbacks.add(() =>;
/// Runs [onRelease] and returns a Future that completes to a resource once an
/// [onRelease] callback completes.
/// Futures returned by [_runOnRelease] always complete in the order they were
/// created, even if earlier [onRelease] callbacks take longer to run.
Future<PoolResource> _runOnRelease(Function() onRelease) {
Future.sync(onRelease).then((value) {
}).catchError((error, StackTrace stackTrace) {
_onReleaseCompleters.removeFirst().completeError(error, stackTrace);
var completer = Completer<PoolResource>.sync();
return completer.future;
/// A resource has been requested, allocated, or released.
void _resetTimer() {
if (_timer == null) return;
if (_requestedResources.isEmpty) {
} else {
/// Handles [_timer] timing out by causing all pending resource completers to
/// emit exceptions.
void _onTimeout() {
for (var completer in _requestedResources) {
"Pool deadlock: all resources have been "
"allocated for too long.",
_timer = null;
/// A member of a [Pool].
/// A [PoolResource] is a token that indicates that a resource is allocated.
/// When the associated resource is released, the user should call [release].
class PoolResource {
final Pool _pool;
/// Whether `this` has been released yet.
bool _released = false;
/// Tells the parent [Pool] that the resource associated with this resource is
/// no longer allocated, and that a new [PoolResource] may be allocated.
void release() {
if (_released) {
throw StateError("A PoolResource may only be released once.");
_released = true;
/// Tells the parent [Pool] that the resource associated with this resource is
/// no longer necessary, but should remain allocated until more resources are
/// needed.
/// When [Pool.request] is called and there are no remaining available
/// resources, the [onRelease] callback is called. It should free the
/// resource, and it may return a Future or `null`. Once that completes, the
/// [Pool.request] call will complete to a new [PoolResource].
/// This is useful when a resource's main function is complete, but it may
/// produce additional information later on. For example, an isolate's task
/// may be complete, but it could still emit asynchronous errors.
void allowRelease(Function() onRelease) {
if (_released) {
throw StateError("A PoolResource may only be released once.");
_released = true;