blob: deb53334b8036eb9bf115fac2a666ae566f2655f [file] [log] [blame]
// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:collection';
import 'package:async/async.dart';
import 'package:stack_trace/stack_trace.dart';
/// Manages an abstract pool of resources with a limit on how many may be in use
/// at once.
///
/// When a resource is needed, the user should call [request]. When the returned
/// future completes with a [PoolResource], the resource may be allocated. Once
/// the resource has been released, the user should call [PoolResource.release].
/// The pool will ensure that only a certain number of [PoolResource]s may be
/// allocated at once.
class Pool {
/// Completers for requests beyond the first [_maxAllocatedResources].
///
/// When an item is released, the next element of [_requestedResources] will
/// be completed.
final _requestedResources = new Queue<Completer<PoolResource>>();
/// Callbacks that must be called before additional resources can be
/// allocated.
///
/// See [PoolResource.allowRelease].
final _onReleaseCallbacks = new Queue<Function>();
/// Completers that will be completed once `onRelease` callbacks are done
/// running.
///
/// These are kept in a queue to ensure that the earliest request completes
/// first regardless of what order the `onRelease` callbacks complete in.
final _onReleaseCompleters = new Queue<Completer<PoolResource>>();
/// The maximum number of resources that may be allocated at once.
final int _maxAllocatedResources;
/// The number of resources that are currently allocated.
int _allocatedResources = 0;
/// The timeout timer.
///
/// This timer is canceled as long as the pool is below the resource limit.
/// It's reset once the resource limit is reached and again every time an
/// resource is released or a new resource is requested. If it fires, that
/// indicates that the caller became deadlocked, likely due to files waiting
/// for additional files to be read before they could be closed.
///
/// This is `null` if this pool shouldn't time out.
RestartableTimer _timer;
/// The amount of time to wait before timing out the pending resources.
final Duration _timeout;
/// A [FutureGroup] that tracks all the `onRelease` callbacks for resources
/// that have been marked releasable.
///
/// This is `null` until [close] is called.
FutureGroup _closeGroup;
/// Whether [close] has been called.
bool get isClosed => _closeGroup != null;
/// Creates a new pool with the given limit on how many resources may be
/// allocated at once.
///
/// If [timeout] is passed, then if that much time passes without any activity
/// all pending [request] futures will throw a [TimeoutException]. This is
/// intended to avoid deadlocks.
Pool(this._maxAllocatedResources, {Duration timeout})
: _timeout = timeout {
if (timeout != null) {
// Start the timer canceled since we only want to start counting down once
// we've run out of available resources.
_timer = new RestartableTimer(timeout, _onTimeout)..cancel();
}
}
/// Request a [PoolResource].
///
/// If the maximum number of resources is already allocated, this will delay
/// until one of them is released.
Future<PoolResource> request() {
if (isClosed) {
throw new StateError("request() may not be called on a closed Pool.");
}
if (_allocatedResources < _maxAllocatedResources) {
_allocatedResources++;
return new Future.value(new PoolResource._(this));
} else if (_onReleaseCallbacks.isNotEmpty) {
return _runOnRelease(_onReleaseCallbacks.removeFirst());
} else {
var completer = new Completer<PoolResource>();
_requestedResources.add(completer);
_resetTimer();
return completer.future;
}
}
/// Requests a resource for the duration of [callback], which may return a
/// Future.
///
/// The return value of [callback] is piped to the returned Future.
Future/*<T>*/ withResource/*<T>*/(/*=T*/ callback()) {
if (isClosed) {
throw new StateError(
"withResource() may not be called on a closed Pool.");
}
// We can't use async/await here because we need to start the request
// synchronously in case the pool is closed immediately afterwards. Async
// functions have an asynchronous gap between calling and running the body,
// and [close] could be called during that gap. See #3.
return request().then((resource) {
return new Future.sync(callback).whenComplete(resource.release);
});
}
/// Closes the pool so that no more resources are requested.
///
/// Existing resource requests remain unchanged.
///
/// Any resources that are marked as releasable using
/// [PoolResource.allowRelease] are released immediately. Once all resources
/// have been released and any `onRelease` callbacks have completed, the
/// returned future completes successfully. If any `onRelease` callback throws
/// an error, the returned future completes with that error.
///
/// This may be called more than once; it returns the same [Future] each time.
Future close() {
if (_closeGroup != null) return _closeGroup.future;
_resetTimer();
_closeGroup = new FutureGroup();
for (var callback in _onReleaseCallbacks) {
_closeGroup.add(new Future.sync(callback));
}
_allocatedResources -= _onReleaseCallbacks.length;
_onReleaseCallbacks.clear();
if (_allocatedResources == 0) _closeGroup.close();
return _closeGroup.future;
}
/// If there are any pending requests, this will fire the oldest one.
void _onResourceReleased() {
_resetTimer();
if (_requestedResources.isNotEmpty) {
var pending = _requestedResources.removeFirst();
pending.complete(new PoolResource._(this));
} else {
_allocatedResources--;
if (isClosed && _allocatedResources == 0) _closeGroup.close();
}
}
/// If there are any pending requests, this will fire the oldest one after
/// running [onRelease].
void _onResourceReleaseAllowed(onRelease()) {
_resetTimer();
if (_requestedResources.isNotEmpty) {
var pending = _requestedResources.removeFirst();
pending.complete(_runOnRelease(onRelease));
} else if (isClosed) {
_closeGroup.add(new Future.sync(onRelease));
_allocatedResources--;
if (_allocatedResources == 0) _closeGroup.close();
} else {
_onReleaseCallbacks.add(
Zone.current.bindCallback(onRelease, runGuarded: false));
}
}
/// Runs [onRelease] and returns a Future that completes to a resource once an
/// [onRelease] callback completes.
///
/// Futures returned by [_runOnRelease] always complete in the order they were
/// created, even if earlier [onRelease] callbacks take longer to run.
Future<PoolResource> _runOnRelease(onRelease()) {
new Future.sync(onRelease).then((value) {
_onReleaseCompleters.removeFirst().complete(new PoolResource._(this));
}).catchError((error, stackTrace) {
_onReleaseCompleters.removeFirst().completeError(error, stackTrace);
});
var completer = new Completer<PoolResource>.sync();
_onReleaseCompleters.add(completer);
return completer.future;
}
/// A resource has been requested, allocated, or released.
void _resetTimer() {
if (_timer == null) return;
if (_requestedResources.isEmpty) {
_timer.cancel();
} else {
_timer.reset();
}
}
/// Handles [_timer] timing out by causing all pending resource completers to
/// emit exceptions.
void _onTimeout() {
for (var completer in _requestedResources) {
completer.completeError(
new TimeoutException("Pool deadlock: all resources have been "
"allocated for too long.",
_timeout),
new Chain.current());
}
_requestedResources.clear();
_timer = null;
}
}
/// A member of a [Pool].
///
/// A [PoolResource] is a token that indicates that a resource is allocated.
/// When the associated resource is released, the user should call [release].
class PoolResource {
final Pool _pool;
/// Whether [this] has been released yet.
bool _released = false;
PoolResource._(this._pool);
/// Tells the parent [Pool] that the resource associated with this resource is
/// no longer allocated, and that a new [PoolResource] may be allocated.
void release() {
if (_released) {
throw new StateError("A PoolResource may only be released once.");
}
_released = true;
_pool._onResourceReleased();
}
/// Tells the parent [Pool] that the resource associated with this resource is
/// no longer necessary, but should remain allocated until more resources are
/// needed.
///
/// When [Pool.request] is called and there are no remaining available
/// resources, the [onRelease] callback is called. It should free the
/// resource, and it may return a Future or `null`. Once that completes, the
/// [Pool.request] call will complete to a new [PoolResource].
///
/// This is useful when a resource's main function is complete, but it may
/// produce additional information later on. For example, an isolate's task
/// may be complete, but it could still emit asynchronous errors.
void allowRelease(onRelease()) {
if (_released) {
throw new StateError("A PoolResource may only be released once.");
}
_released = true;
_pool._onResourceReleaseAllowed(onRelease);
}
}