blob: 6941229b3e084d18f2d2096ed0630569b688f9bb [file] [log] [blame]
// Copyright (c) 2014, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
library pool;
import 'dart:async';
import 'dart:collection';
import 'package:stack_trace/stack_trace.dart';
/// Manages an abstract pool of resources with a limit on how many may be in use
/// at once.
///
/// When a resource is needed, the user should call [request]. When the returned
/// future completes with a [PoolResource], the resource may be allocated. Once
/// the resource has been released, the user should call [PoolResource.release].
/// The pool will ensure that only a certain number of [PoolResource]s may be
/// allocated at once.
class Pool {
/// Completers for requests beyond the first [_maxAllocatedResources].
///
/// When an item is released, the next element of [_requestedResources] will
/// be completed.
final _requestedResources = new Queue<Completer<PoolResource>>();
/// Callbacks that must be called before additional resources can be
/// allocated.
///
/// See [PoolResource.allowRelease].
final _onReleaseCallbacks = new Queue<Function>();
/// Completers that will be completed once `onRelease` callbacks are done
/// running.
///
/// These are kept in a queue to ensure that the earliest request completes
/// first regardless of what order the `onRelease` callbacks complete in.
final _onReleaseCompleters = new Queue<Completer<PoolResource>>();
/// The maximum number of resources that may be allocated at once.
final int _maxAllocatedResources;
/// The number of resources that are currently allocated.
int _allocatedResources = 0;
/// The timeout timer.
///
/// If [_timeout] isn't null, this timer is set as soon as the resource limit
/// is reached and is reset every time an resource is released or a new
/// resource is requested. If it fires, that indicates that the caller became
/// deadlocked, likely due to files waiting for additional files to be read
/// before they could be closed.
Timer _timer;
/// The amount of time to wait before timing out the pending resources.
final Duration _timeout;
/// Creates a new pool with the given limit on how many resources may be
/// allocated at once.
///
/// If [timeout] is passed, then if that much time passes without any activity
/// all pending [request] futures will throw a [TimeoutException]. This is
/// intended to avoid deadlocks.
Pool(this._maxAllocatedResources, {Duration timeout})
: _timeout = timeout;
/// Request a [PoolResource].
///
/// If the maximum number of resources is already allocated, this will delay
/// until one of them is released.
Future<PoolResource> request() {
if (_allocatedResources < _maxAllocatedResources) {
_allocatedResources++;
return new Future.value(new PoolResource._(this));
} else if (_onReleaseCallbacks.isNotEmpty) {
return _runOnRelease(_onReleaseCallbacks.removeFirst());
} else {
var completer = new Completer<PoolResource>();
_requestedResources.add(completer);
_resetTimer();
return completer.future;
}
}
/// Requests a resource for the duration of [callback], which may return a
/// Future.
///
/// The return value of [callback] is piped to the returned Future.
Future withResource(callback()) {
return request().then((resource) =>
Chain.track(new Future.sync(callback)).whenComplete(resource.release));
}
/// If there are any pending requests, this will fire the oldest one.
void _onResourceReleased() {
_resetTimer();
if (_requestedResources.isEmpty) {
_allocatedResources--;
return;
}
var pending = _requestedResources.removeFirst();
pending.complete(new PoolResource._(this));
}
/// If there are any pending requests, this will fire the oldest one after
/// running [onRelease].
void _onResourceReleaseAllowed(onRelease()) {
_resetTimer();
if (_requestedResources.isEmpty) {
_onReleaseCallbacks.add(
Zone.current.bindCallback(onRelease, runGuarded: false));
return;
}
var pending = _requestedResources.removeFirst();
pending.complete(_runOnRelease(onRelease));
}
/// Runs [onRelease] and returns a Future that completes to a resource once an
/// [onRelease] callback completes.
///
/// Futures returned by [_runOnRelease] always complete in the order they were
/// created, even if earlier [onRelease] callbacks take longer to run.
Future<PoolResource> _runOnRelease(onRelease()) {
new Future.sync(onRelease).then((value) {
_onReleaseCompleters.removeFirst().complete(new PoolResource._(this));
}).catchError((error, stackTrace) {
_onReleaseCompleters.removeFirst().completeError(error, stackTrace);
});
var completer = new Completer.sync();
_onReleaseCompleters.add(completer);
return completer.future;
}
/// A resource has been requested, allocated, or released.
void _resetTimer() {
if (_timer != null) _timer.cancel();
if (_timeout == null || _requestedResources.isEmpty) {
_timer = null;
} else {
_timer = new Timer(_timeout, _onTimeout);
}
}
/// Handles [_timer] timing out by causing all pending resource completers to
/// emit exceptions.
void _onTimeout() {
for (var completer in _requestedResources) {
completer.completeError(
new TimeoutException("Pool deadlock: all resources have been "
"allocated for too long.",
_timeout),
new Chain.current());
}
_requestedResources.clear();
_timer = null;
}
}
/// A member of a [Pool].
///
/// A [PoolResource] is a token that indicates that a resource is allocated.
/// When the associated resource is released, the user should call [release].
class PoolResource {
final Pool _pool;
/// Whether [this] has been released yet.
bool _released = false;
PoolResource._(this._pool);
/// Tells the parent [Pool] that the resource associated with this resource is
/// no longer allocated, and that a new [PoolResource] may be allocated.
void release() {
if (_released) {
throw new StateError("A PoolResource may only be released once.");
}
_released = true;
_pool._onResourceReleased();
}
/// Tells the parent [Pool] that the resource associated with this resource is
/// no longer necessary, but should remain allocated until more resources are
/// needed.
///
/// When [Pool.request] is called and there are no remaining available
/// resources, the [onRelease] callback is called. It should free the
/// resource, and it may return a Future or `null`. Once that completes, the
/// [Pool.request] call will complete to a new [PoolResource].
///
/// This is useful when a resource's main function is complete, but it may
/// produce additional information later on. For example, an isolate's task
/// may be complete, but it could still emit asynchronous errors.
void allowRelease(onRelease()) {
if (_released) {
throw new StateError("A PoolResource may only be released once.");
}
_released = true;
_pool._onResourceReleaseAllowed(onRelease);
}
}