blob: e4453d81b62f99ad95a1182038010c7ce0284f2a [file] [log] [blame]
// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
#include "vm/thread_pool.h"
#include "vm/flags.h"
#include "vm/lockers.h"
namespace dart {
DEFINE_FLAG(int, worker_timeout_millis, 5000,
"Free workers when they have been idle for this amount of time.");
ThreadPool::ThreadPool()
: shutting_down_(false),
all_workers_(NULL),
idle_workers_(NULL),
count_started_(0),
count_stopped_(0),
count_running_(0),
count_idle_(0),
shutting_down_workers_(NULL),
join_list_(NULL) {
}
ThreadPool::~ThreadPool() {
Shutdown();
}
bool ThreadPool::Run(Task* task) {
Worker* worker = NULL;
bool new_worker = false;
{
// We need ThreadPool::mutex_ to access worker lists and other
// ThreadPool state.
MutexLocker ml(&mutex_);
if (shutting_down_) {
return false;
}
if (idle_workers_ == NULL) {
worker = new Worker(this);
ASSERT(worker != NULL);
new_worker = true;
count_started_++;
// Add worker to the all_workers_ list.
worker->all_next_ = all_workers_;
all_workers_ = worker;
worker->owned_ = true;
count_running_++;
} else {
// Get the first worker from the idle worker list.
worker = idle_workers_;
idle_workers_ = worker->idle_next_;
worker->idle_next_ = NULL;
count_idle_--;
count_running_++;
}
}
// Release ThreadPool::mutex_ before calling Worker functions.
ASSERT(worker != NULL);
worker->SetTask(task);
if (new_worker) {
// Call StartThread after we've assigned the first task.
worker->StartThread();
}
return true;
}
void ThreadPool::Shutdown() {
Worker* saved = NULL;
{
MutexLocker ml(&mutex_);
shutting_down_ = true;
saved = all_workers_;
all_workers_ = NULL;
idle_workers_ = NULL;
Worker* current = saved;
while (current != NULL) {
Worker* next = current->all_next_;
current->idle_next_ = NULL;
current->owned_ = false;
current = next;
count_stopped_++;
}
count_idle_ = 0;
count_running_ = 0;
ASSERT(count_started_ == count_stopped_);
}
// Release ThreadPool::mutex_ before calling Worker functions.
{
MonitorLocker eml(&exit_monitor_);
// First tell all the workers to shut down.
Worker* current = saved;
ThreadId id = OSThread::GetCurrentThreadId();
while (current != NULL) {
Worker* next = current->all_next_;
ThreadId currentId = current->id();
if (currentId != id) {
AddWorkerToShutdownList(current);
}
current->Shutdown();
current = next;
}
saved = NULL;
// Wait until all workers will exit.
while (shutting_down_workers_ != NULL) {
// Here, we are waiting for workers to exit. When a worker exits we will
// be notified.
eml.Wait();
}
}
// Extract the join list, and join on the threads.
JoinList* list = NULL;
{
MutexLocker ml(&mutex_);
list = join_list_;
join_list_ = NULL;
}
// Join non-idle threads.
JoinList::Join(&list);
#if defined(DEBUG)
{
MutexLocker ml(&mutex_);
ASSERT(join_list_ == NULL);
}
#endif
}
bool ThreadPool::IsIdle(Worker* worker) {
ASSERT(worker != NULL && worker->owned_);
for (Worker* current = idle_workers_;
current != NULL;
current = current->idle_next_) {
if (current == worker) {
return true;
}
}
return false;
}
bool ThreadPool::RemoveWorkerFromIdleList(Worker* worker) {
ASSERT(worker != NULL && worker->owned_);
if (idle_workers_ == NULL) {
return false;
}
// Special case head of list.
if (idle_workers_ == worker) {
idle_workers_ = worker->idle_next_;
worker->idle_next_ = NULL;
return true;
}
for (Worker* current = idle_workers_;
current->idle_next_ != NULL;
current = current->idle_next_) {
if (current->idle_next_ == worker) {
current->idle_next_ = worker->idle_next_;
worker->idle_next_ = NULL;
return true;
}
}
return false;
}
bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) {
ASSERT(worker != NULL && worker->owned_);
if (all_workers_ == NULL) {
return false;
}
// Special case head of list.
if (all_workers_ == worker) {
all_workers_ = worker->all_next_;
worker->all_next_ = NULL;
worker->owned_ = false;
worker->done_ = true;
return true;
}
for (Worker* current = all_workers_;
current->all_next_ != NULL;
current = current->all_next_) {
if (current->all_next_ == worker) {
current->all_next_ = worker->all_next_;
worker->all_next_ = NULL;
worker->owned_ = false;
return true;
}
}
return false;
}
void ThreadPool::SetIdleAndReapExited(Worker* worker) {
JoinList* list = NULL;
{
MutexLocker ml(&mutex_);
if (shutting_down_) {
return;
}
ASSERT(worker->owned_ && !IsIdle(worker));
worker->idle_next_ = idle_workers_;
idle_workers_ = worker;
count_idle_++;
count_running_--;
// While we have the lock, opportunistically grab and clear the join_list_.
list = join_list_;
join_list_ = NULL;
}
JoinList::Join(&list);
}
bool ThreadPool::ReleaseIdleWorker(Worker* worker) {
MutexLocker ml(&mutex_);
if (shutting_down_) {
return false;
}
// Remove from idle list.
if (!RemoveWorkerFromIdleList(worker)) {
return false;
}
// Remove from all list.
bool found = RemoveWorkerFromAllList(worker);
ASSERT(found);
// The thread for worker will exit. Add its ThreadId to the join_list_
// so that we can join on it at the next opportunity.
JoinList::AddLocked(OSThread::GetCurrentThreadJoinId(), &join_list_);
count_stopped_++;
count_idle_--;
return true;
}
// Only call while holding the exit_monitor_
void ThreadPool::AddWorkerToShutdownList(Worker* worker) {
worker->shutdown_next_ = shutting_down_workers_;
shutting_down_workers_ = worker;
}
// Only call while holding the exit_monitor_
bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) {
ASSERT(worker != NULL);
ASSERT(shutting_down_workers_ != NULL);
// Special case head of list.
if (shutting_down_workers_ == worker) {
shutting_down_workers_ = worker->shutdown_next_;
worker->shutdown_next_ = NULL;
return true;
}
for (Worker* current = shutting_down_workers_;
current->shutdown_next_ != NULL;
current = current->shutdown_next_) {
if (current->shutdown_next_ == worker) {
current->shutdown_next_ = worker->shutdown_next_;
worker->shutdown_next_ = NULL;
return true;
}
}
return false;
}
void ThreadPool::JoinList::AddLocked(ThreadJoinId id, JoinList** list) {
*list = new JoinList(id, *list);
}
void ThreadPool::JoinList::Join(JoinList** list) {
while (*list != NULL) {
JoinList* current = *list;
*list = current->next();
OSThread::Join(current->id());
delete current;
}
}
ThreadPool::Task::Task() {
}
ThreadPool::Task::~Task() {
}
ThreadPool::Worker::Worker(ThreadPool* pool)
: pool_(pool),
task_(NULL),
id_(OSThread::kInvalidThreadId),
done_(false),
owned_(false),
all_next_(NULL),
idle_next_(NULL),
shutdown_next_(NULL) {
}
ThreadId ThreadPool::Worker::id() {
MonitorLocker ml(&monitor_);
return id_;
}
void ThreadPool::Worker::StartThread() {
#if defined(DEBUG)
// Must call SetTask before StartThread.
{ // NOLINT
MonitorLocker ml(&monitor_);
ASSERT(task_ != NULL);
}
#endif
int result = OSThread::Start(&Worker::Main, reinterpret_cast<uword>(this));
if (result != 0) {
FATAL1("Could not start worker thread: result = %d.", result);
}
}
void ThreadPool::Worker::SetTask(Task* task) {
MonitorLocker ml(&monitor_);
ASSERT(task_ == NULL);
task_ = task;
ml.Notify();
}
static int64_t ComputeTimeout(int64_t idle_start) {
if (FLAG_worker_timeout_millis <= 0) {
// No timeout.
return 0;
} else {
int64_t waited = OS::GetCurrentTimeMillis() - idle_start;
if (waited >= FLAG_worker_timeout_millis) {
// We must have gotten a spurious wakeup just before we timed
// out. Give the worker one last desperate chance to live. We
// are merciful.
return 1;
} else {
return FLAG_worker_timeout_millis - waited;
}
}
}
bool ThreadPool::Worker::Loop() {
MonitorLocker ml(&monitor_);
int64_t idle_start;
while (true) {
ASSERT(task_ != NULL);
Task* task = task_;
task_ = NULL;
// Release monitor while handling the task.
monitor_.Exit();
task->Run();
ASSERT(Isolate::Current() == NULL);
delete task;
monitor_.Enter();
ASSERT(task_ == NULL);
if (IsDone()) {
return false;
}
ASSERT(!done_);
pool_->SetIdleAndReapExited(this);
idle_start = OS::GetCurrentTimeMillis();
while (true) {
Monitor::WaitResult result = ml.Wait(ComputeTimeout(idle_start));
if (task_ != NULL) {
// We've found a task. Process it, regardless of whether the
// worker is done_.
break;
}
if (IsDone()) {
return false;
}
if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) {
return true;
}
}
}
UNREACHABLE();
return false;
}
void ThreadPool::Worker::Shutdown() {
MonitorLocker ml(&monitor_);
done_ = true;
ml.Notify();
}
// static
void ThreadPool::Worker::Main(uword args) {
Thread::EnsureInit();
Worker* worker = reinterpret_cast<Worker*>(args);
ThreadId id = OSThread::GetCurrentThreadId();
ThreadJoinId join_id = OSThread::GetCurrentThreadJoinId();
ThreadPool* pool;
{
MonitorLocker ml(&worker->monitor_);
ASSERT(worker->task_);
worker->id_ = id;
pool = worker->pool_;
}
bool released = worker->Loop();
// It should be okay to access these unlocked here in this assert.
// worker->all_next_ is retained by the pool for shutdown monitoring.
ASSERT(!worker->owned_ && (worker->idle_next_ == NULL));
if (!released) {
// This worker is exiting because the thread pool is being shut down.
// Inform the thread pool that we are exiting. We remove this worker from
// shutting_down_workers_ list because there will be no need for the
// ThreadPool to take action for this worker.
{
MutexLocker ml(&pool->mutex_);
JoinList::AddLocked(join_id, &pool->join_list_);
}
// worker->id_ should never be read again, so set to invalid in debug mode
// for asserts.
#if defined(DEBUG)
{
MonitorLocker ml(&worker->monitor_);
worker->id_ = OSThread::kInvalidThreadId;
}
#endif
// Remove from the shutdown list, delete, and notify the thread pool.
{
MonitorLocker eml(&pool->exit_monitor_);
pool->RemoveWorkerFromShutdownList(worker);
delete worker;
eml.Notify();
}
} else {
// This worker is going down because it was idle for too long. This case
// is not due to a ThreadPool Shutdown. Thus, we simply delete the worker.
// The worker's id is added to the thread pool's join list by
// ReleaseIdleWorker, so in the case that the thread pool begins shutting
// down immediately after returning from worker->Loop() above, we still
// wait for the thread to exit by joining on it in Shutdown().
delete worker;
}
#if defined(TARGET_OS_WINDOWS)
Thread::CleanUp();
#endif
}
} // namespace dart