// Copyright (c) 2012, the Dart project authors.  Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

#include "vm/thread_pool.h"

#include "vm/dart.h"
#include "vm/flags.h"
#include "vm/lockers.h"

namespace dart {

DEFINE_FLAG(int,
            worker_timeout_millis,
            5000,
            "Free workers when they have been idle for this amount of time.");

static int64_t ComputeTimeout(int64_t idle_start) {
  int64_t worker_timeout_micros =
      FLAG_worker_timeout_millis * kMicrosecondsPerMillisecond;
  if (worker_timeout_micros <= 0) {
    // No timeout.
    return 0;
  } else {
    int64_t waited = OS::GetCurrentMonotonicMicros() - idle_start;
    if (waited >= worker_timeout_micros) {
      // We must have gotten a spurious wakeup just before we timed
      // out.  Give the worker one last desperate chance to live.  We
      // are merciful.
      return 1;
    } else {
      return worker_timeout_micros - waited;
    }
  }
}

ThreadPool::ThreadPool() : all_workers_dead_(false) {}

ThreadPool::~ThreadPool() {
  TriggerShutdown();

  // Wait until all workers are dead. Any new death will notify the exit
  // monitor.
  {
    MonitorLocker eml(&exit_monitor_);
    while (!all_workers_dead_) {
      eml.Wait();
    }
  }

  // Join all dead workers.
  WorkerList dead_workers_to_join;
  {
    MonitorLocker ml(&pool_monitor_);
    ObtainDeadWorkersLocked(&dead_workers_to_join);
  }
  JoinDeadWorkersLocked(&dead_workers_to_join);
}

bool ThreadPool::RunImpl(std::unique_ptr<Task> task) {
  Worker* new_worker = nullptr;
  {
    MonitorLocker ml(&pool_monitor_);
    if (shutting_down_) {
      return false;
    }
    new_worker = ScheduleTaskLocked(&ml, std::move(task));
  }
  if (new_worker != nullptr) {
    new_worker->StartThread();
  }
  return true;
}

void ThreadPool::WorkerLoop(Worker* worker) {
  WorkerList dead_workers_to_join;

  while (true) {
    MonitorLocker ml(&pool_monitor_);

    if (!tasks_.IsEmpty()) {
      IdleToRunningLocked(worker);
      while (!tasks_.IsEmpty()) {
        std::unique_ptr<Task> task(tasks_.RemoveFirst());
        pending_tasks_--;
        MonitorLeaveScope mls(&ml);
        task->Run();
        ASSERT(Isolate::Current() == nullptr);
        task.reset();
      }
      RunningToIdleLocked(worker);
    }

    if (shutting_down_) {
      ObtainDeadWorkersLocked(&dead_workers_to_join);
      IdleToDeadLocked(worker);
      break;
    }

    // Sleep until we get a new task, we time out or we're shutdown.
    const int64_t idle_start = OS::GetCurrentMonotonicMicros();
    bool done = false;
    while (!done) {
      const auto result = ml.WaitMicros(ComputeTimeout(idle_start));

      // We have to drain all pending tasks.
      if (!tasks_.IsEmpty()) break;

      if (shutting_down_ || result == Monitor::kTimedOut) {
        done = true;
        break;
      }
    }
    if (done) {
      ObtainDeadWorkersLocked(&dead_workers_to_join);
      IdleToDeadLocked(worker);
      break;
    }
  }

  // Before we transitioned to dead we obtained the list of previously died dead
  // workers, which we join here. Since every death of a worker will join
  // previously died workers, we keep the pending non-joined [dead_workers_] to
  // effectively 1.
  JoinDeadWorkersLocked(&dead_workers_to_join);
}

void ThreadPool::TriggerShutdown() {
  MonitorLocker ml(&pool_monitor_);

  // Prevent scheduling of new tasks.
  shutting_down_ = true;

  if (running_workers_.IsEmpty() && idle_workers_.IsEmpty()) {
    // All workers have already died.
    all_workers_dead_ = true;
  } else {
    // Tell workers to drain remaining work and then shut down.
    ml.NotifyAll();
  }
}

void ThreadPool::IdleToRunningLocked(Worker* worker) {
  ASSERT(idle_workers_.ContainsForDebugging(worker));
  idle_workers_.Remove(worker);
  running_workers_.Append(worker);
  count_idle_--;
  count_running_++;
}

void ThreadPool::RunningToIdleLocked(Worker* worker) {
  ASSERT(tasks_.IsEmpty());

  ASSERT(running_workers_.ContainsForDebugging(worker));
  running_workers_.Remove(worker);
  idle_workers_.Append(worker);
  count_running_--;
  count_idle_++;
}

void ThreadPool::IdleToDeadLocked(Worker* worker) {
  ASSERT(tasks_.IsEmpty());

  ASSERT(idle_workers_.ContainsForDebugging(worker));
  idle_workers_.Remove(worker);
  dead_workers_.Append(worker);
  count_idle_--;
  count_dead_++;

  // Notify shutdown thread that the worker thread is about to finish.
  if (shutting_down_) {
    if (running_workers_.IsEmpty() && idle_workers_.IsEmpty()) {
      all_workers_dead_ = true;
      MonitorLocker eml(&exit_monitor_);
      eml.Notify();
    }
  }
}

void ThreadPool::ObtainDeadWorkersLocked(WorkerList* dead_workers_to_join) {
  dead_workers_to_join->AppendList(&dead_workers_);
  ASSERT(dead_workers_.IsEmpty());
  count_dead_ = 0;
}

void ThreadPool::JoinDeadWorkersLocked(WorkerList* dead_workers_to_join) {
  auto it = dead_workers_to_join->begin();
  while (it != dead_workers_to_join->end()) {
    Worker* worker = *it;
    it = dead_workers_to_join->Erase(it);

    OSThread::Join(worker->join_id_);
    delete worker;
  }
  ASSERT(dead_workers_to_join->IsEmpty());
}

ThreadPool::Worker* ThreadPool::ScheduleTaskLocked(MonitorLocker* ml,
                                                   std::unique_ptr<Task> task) {
  // Enqueue the new task.
  tasks_.Append(task.release());
  pending_tasks_++;
  ASSERT(pending_tasks_ >= 1);

  // Notify existing idle worker (if available).
  if (count_idle_ >= pending_tasks_) {
    ASSERT(!idle_workers_.IsEmpty());
    ml->Notify();
    return nullptr;
  }

  // Otherwise start a new worker.
  auto new_worker = new Worker(this);
  idle_workers_.Append(new_worker);
  count_idle_++;
  return new_worker;
}

ThreadPool::Worker::Worker(ThreadPool* pool)
    : pool_(pool), join_id_(OSThread::kInvalidThreadJoinId) {}

void ThreadPool::Worker::StartThread() {
  int result = OSThread::Start("DartWorker", &Worker::Main,
                               reinterpret_cast<uword>(this));
  if (result != 0) {
    FATAL1("Could not start worker thread: result = %d.", result);
  }
}

void ThreadPool::Worker::Main(uword args) {
  OSThread* os_thread = OSThread::Current();
  ASSERT(os_thread != nullptr);

  Worker* worker = reinterpret_cast<Worker*>(args);
  ThreadPool* pool = worker->pool_;

  // Once the worker quits it needs to be joined.
  worker->join_id_ = OSThread::GetCurrentThreadJoinId(os_thread);

#if defined(DEBUG)
  {
    MonitorLocker ml(&pool->pool_monitor_);
    ASSERT(pool->idle_workers_.ContainsForDebugging(worker));
  }
#endif

  pool->WorkerLoop(worker);

  // Call the thread exit hook here to notify the embedder that the
  // thread pool thread is exiting.
  if (Dart::thread_exit_callback() != NULL) {
    (*Dart::thread_exit_callback())();
  }
}

}  // namespace dart
