blob: d8536103919d75c90bdd82ecc4a655041fdad35d [file] [log] [blame] [edit]
// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
#include "vm/thread_pool.h"
#include "vm/dart.h"
#include "vm/flags.h"
#include "vm/lockers.h"
namespace dart {
DEFINE_FLAG(int,
worker_timeout_millis,
5000,
"Free workers when they have been idle for this amount of time.");
static int64_t ComputeTimeout(int64_t idle_start) {
int64_t worker_timeout_micros =
FLAG_worker_timeout_millis * kMicrosecondsPerMillisecond;
if (worker_timeout_micros <= 0) {
// No timeout.
return 0;
} else {
int64_t waited = OS::GetCurrentMonotonicMicros() - idle_start;
if (waited >= worker_timeout_micros) {
// We must have gotten a spurious wakeup just before we timed
// out. Give the worker one last desperate chance to live. We
// are merciful.
return 1;
} else {
return worker_timeout_micros - waited;
}
}
}
ThreadPool::ThreadPool(uintptr_t max_pool_size)
: all_workers_dead_(false), max_pool_size_(max_pool_size) {}
ThreadPool::~ThreadPool() {
Shutdown();
}
void ThreadPool::RequestWorkersToShutdown() {
MutexLocker ml(&pool_mutex_);
// If we are just starting to shutdown threads then this should be done
// before OSThread::DisableOSThreadCreation is called. If |OSThread| creation
// is disabled after |Worker::StartThread| is called but before
// |ThreadPool::Worker::Main| is called then a worker will be stuck in the
// state idle but will never properly start and thus will never transition to
// dead - leading to a deadlock.
RELEASE_ASSERT(shutting_down_ || OSThread::CanCreateOSThreads());
// Prevent scheduling of new tasks.
shutting_down_ = true;
if (running_workers_.IsEmpty() && idle_workers_.IsEmpty()) {
// All workers have already died.
all_workers_dead_ = true;
} else {
// Tell all idling workers to drain remaining work and then shut down.
for (auto worker : idle_workers_) {
worker->Wakeup();
}
}
}
void ThreadPool::RequestShutdown(
ThreadPool* pool,
std::function<void(void)>&& shutdown_complete) {
pool->RequestWorkersToShutdown();
{
MonitorLocker eml(&pool->exit_monitor_);
if (!pool->all_workers_dead_) {
// Workers are still doing some work. Mark this pool for asynchronous
// deletion. When the last worker finishes it will delete itself and
// call shutdown_complete.
pool->shutdown_complete_callback_ = std::move(shutdown_complete);
return;
}
// Threads are in the process of exiting already and there is no way to ask
// them to do additional cleanup asynchronously. We will just join the
// last dead worker and delete it synchronously.
}
pool->DeleteLastDeadWorker();
shutdown_complete();
}
void ThreadPool::Shutdown() {
// Should not combine |Shutdown| and |RequestShutdown| on the same pool.
ASSERT(shutdown_complete_callback_ == nullptr);
RequestWorkersToShutdown();
// Wait until all workers are dead. Any new death will notify the exit
// monitor.
{
MonitorLocker eml(&exit_monitor_);
while (!all_workers_dead_) {
eml.Wait();
}
}
DeleteLastDeadWorker();
}
void ThreadPool::DeleteLastDeadWorker() {
ASSERT(all_workers_dead_);
ASSERT(count_idle_ == 0);
ASSERT(count_running_ == 0);
ASSERT(idle_workers_.IsEmpty());
ASSERT(running_workers_.IsEmpty());
JoinDeadWorker(last_dead_worker_);
last_dead_worker_ = nullptr;
}
bool ThreadPool::RunImpl(std::unique_ptr<Task> task) {
Worker* new_worker = nullptr;
{
MutexLocker ml(&pool_mutex_);
if (shutting_down_) {
return false;
}
new_worker = ScheduleTaskLocked(std::move(task));
}
if (new_worker != nullptr) {
new_worker->StartThread();
}
return true;
}
bool ThreadPool::CurrentThreadIsWorker() {
auto worker =
static_cast<Worker*>(OSThread::Current()->owning_thread_pool_worker_);
return worker != nullptr && worker->pool_ == this;
}
void ThreadPool::MarkCurrentWorkerAsBlocked() {
auto worker =
static_cast<Worker*>(OSThread::Current()->owning_thread_pool_worker_);
Worker* new_worker = nullptr;
if (worker != nullptr) {
MutexLocker ml(&pool_mutex_);
ASSERT(!worker->is_blocked_);
worker->is_blocked_ = true;
if (max_pool_size_ > 0) {
++max_pool_size_;
// This thread is blocked and therefore no longer usable as a worker.
// If we have pending tasks and there are no idle workers, we will spawn a
// new thread (temporarily allow exceeding the maximum pool size) to
// handle the pending tasks.
if (idle_workers_.IsEmpty() && pending_tasks_ > 0) {
new_worker = new Worker(this);
idle_workers_.Append(new_worker);
count_idle_++;
}
}
}
if (new_worker != nullptr) {
new_worker->StartThread();
}
}
void ThreadPool::MarkCurrentWorkerAsUnBlocked() {
auto worker =
static_cast<Worker*>(OSThread::Current()->owning_thread_pool_worker_);
if (worker != nullptr) {
MutexLocker ml(&pool_mutex_);
if (worker->is_blocked_) {
worker->is_blocked_ = false;
if (max_pool_size_ > 0) {
--max_pool_size_;
ASSERT(max_pool_size_ > 0);
}
}
}
}
std::unique_ptr<ThreadPool::Task> ThreadPool::TakeNextAvailableTaskLocked() {
std::unique_ptr<Task> task(tasks_.RemoveFirst());
pending_tasks_--;
if (pending_tasks_ > 0 && !idle_workers_.IsEmpty()) {
// Wake up one more worker if more tasks are left.
idle_workers_.Last()->Wakeup();
}
return task;
}
void ThreadPool::WorkerLoop(Worker* worker) {
Worker* previous_dead_worker = nullptr;
while (true) {
MutexLocker ml(&pool_mutex_);
if (!tasks_.IsEmpty()) {
IdleToRunningLocked(worker);
while (!tasks_.IsEmpty()) {
auto task = TakeNextAvailableTaskLocked();
MutexUnlocker mls(&ml);
task->Run();
ASSERT(Isolate::Current() == nullptr);
task.reset(); // Delete the task while unlocked.
}
RunningToIdleLocked(worker);
}
if (running_workers_.IsEmpty()) {
ASSERT(tasks_.IsEmpty());
OnEnterIdleLocked(&ml, worker);
if (!tasks_.IsEmpty()) {
continue;
}
}
if (shutting_down_) {
previous_dead_worker = IdleToDeadLocked(worker);
break;
}
// Sleep until we get a new task, we time out or we're shutdown.
const int64_t idle_start = OS::GetCurrentMonotonicMicros();
bool done = false;
while (!done) {
const auto result = worker->Sleep(ComputeTimeout(idle_start));
// We have to drain all pending tasks.
if (!tasks_.IsEmpty()) break;
if (shutting_down_ || result == ConditionVariable::kTimedOut) {
done = true;
break;
}
}
if (done) {
previous_dead_worker = IdleToDeadLocked(worker);
break;
}
}
// |IdleToDeadLocked| obtained the worker which died before us, which we will
// join here. Since every dead worker will join the previous one, all dead
// workers effectively form a chain and it is enough to join the worker which
// died last to join all workers which died before it.
JoinDeadWorker(previous_dead_worker);
}
void ThreadPool::IdleToRunningLocked(Worker* worker) {
ASSERT(idle_workers_.ContainsForDebugging(worker));
idle_workers_.Remove(worker);
running_workers_.Append(worker);
count_idle_--;
count_running_++;
}
void ThreadPool::RunningToIdleLocked(Worker* worker) {
ASSERT(tasks_.IsEmpty());
ASSERT(running_workers_.ContainsForDebugging(worker));
running_workers_.Remove(worker);
idle_workers_.Append(worker);
count_running_--;
count_idle_++;
}
ThreadPool::Worker* ThreadPool::IdleToDeadLocked(Worker* worker) {
ASSERT(tasks_.IsEmpty());
Worker* previous_dead = last_dead_worker_;
ASSERT(idle_workers_.ContainsForDebugging(worker));
idle_workers_.Remove(worker);
last_dead_worker_ = worker;
count_idle_--;
// Notify shutdown thread that the worker thread is about to finish.
if (shutting_down_) {
if (running_workers_.IsEmpty() && idle_workers_.IsEmpty()) {
all_workers_dead_ = true;
MonitorLocker eml(&exit_monitor_);
eml.Notify();
}
}
return previous_dead;
}
void ThreadPool::JoinDeadWorker(Worker* worker) {
if (worker != nullptr) {
OSThread::Join(worker->join_id_);
delete worker;
}
}
ThreadPool::Worker* ThreadPool::ScheduleTaskLocked(std::unique_ptr<Task> task) {
// Enqueue the new task.
tasks_.Append(task.release());
pending_tasks_++;
ASSERT(pending_tasks_ >= 1);
// Notify existing idle worker (if available).
if (count_idle_ >= pending_tasks_) {
ASSERT(!idle_workers_.IsEmpty());
// We always notify only the last worker which became idle. It will wake up
// more workers if needed.
idle_workers_.Last()->Wakeup();
return nullptr;
}
// If we have maxed out the number of threads running, we will not start a
// new one.
if (max_pool_size_ > 0 && (count_idle_ + count_running_) >= max_pool_size_) {
if (!idle_workers_.IsEmpty()) {
// We always notify only the last worker which became idle. It will
// wake up more workers if needed.
idle_workers_.Last()->Wakeup();
}
return nullptr;
}
// Otherwise start a new worker.
auto new_worker = new Worker(this);
idle_workers_.Append(new_worker);
count_idle_++;
return new_worker;
}
ThreadPool::Worker::Worker(ThreadPool* pool)
: pool_(pool), join_id_(OSThread::kInvalidThreadJoinId) {}
void ThreadPool::Worker::StartThread() {
OSThread::Start("DartWorker", &Worker::Main, reinterpret_cast<uword>(this));
}
void ThreadPool::Worker::Main(uword args) {
// Call the thread start hook here to notify the embedder that the
// thread pool thread has started.
Dart_ThreadStartCallback start_cb = Dart::thread_start_callback();
if (start_cb != nullptr) {
start_cb();
}
OSThread* os_thread = OSThread::Current();
ASSERT(os_thread != nullptr);
Worker* worker = reinterpret_cast<Worker*>(args);
ThreadPool* pool = worker->pool_;
os_thread->owning_thread_pool_worker_ = worker;
worker->os_thread_ = os_thread;
// Once the worker quits it needs to be joined.
worker->join_id_ = OSThread::GetCurrentThreadJoinId(os_thread);
#if defined(DEBUG)
{
MutexLocker ml(&pool->pool_mutex_);
ASSERT(pool->idle_workers_.ContainsForDebugging(worker));
}
#endif
pool->WorkerLoop(worker);
worker->os_thread_ = nullptr;
os_thread->owning_thread_pool_worker_ = nullptr;
// Call the thread exit hook here to notify the embedder that the
// thread pool thread is exiting.
Dart_ThreadExitCallback exit_cb = Dart::thread_exit_callback();
if (exit_cb != nullptr) {
exit_cb();
}
ThreadPool::WorkerThreadExit(pool, worker);
}
void ThreadPool::WorkerThreadExit(ThreadPool* pool, Worker* worker) {
if (pool->shutdown_complete_callback_ != nullptr && pool->all_workers_dead_ &&
pool->last_dead_worker_ == worker) {
// Asynchronous shutdown was requested and this is the last exiting worker.
// It needs to delete itself and notify the code which requested the
// shutdown that we are done.
// Start by detaching the thread (because nobody is going to join it) so
// that we don't keep any thread related data structures behind.
OSThread::Detach(worker->join_id_);
delete worker;
pool->last_dead_worker_ = nullptr;
// Run the callback. It might (and most likely will) delete |pool| so this
// should be the last time we touch the |pool| pointer.
auto callback = pool->shutdown_complete_callback_;
pool->shutdown_complete_callback_ = nullptr;
callback();
}
}
} // namespace dart