// Copyright 2013 The Flutter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#define FML_USED_ON_EMBEDDER

#include "flutter/fml/message_loop_task_queues.h"

#include <algorithm>
#include <cstdlib>
#include <thread>

#include "flutter/fml/synchronization/count_down_latch.h"
#include "flutter/fml/synchronization/waitable_event.h"
#include "flutter/fml/time/chrono_timestamp_provider.h"
#include "gtest/gtest.h"

namespace fml {
namespace testing {

class TestWakeable : public fml::Wakeable {
 public:
  using WakeUpCall = std::function<void(const fml::TimePoint)>;

  explicit TestWakeable(WakeUpCall call) : wake_up_call_(call) {}

  void WakeUp(fml::TimePoint time_point) override { wake_up_call_(time_point); }

 private:
  WakeUpCall wake_up_call_;
};

TEST(MessageLoopTaskQueue, StartsWithNoPendingTasks) {
  auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
  auto queue_id = task_queue->CreateTaskQueue();
  ASSERT_FALSE(task_queue->HasPendingTasks(queue_id));
}

TEST(MessageLoopTaskQueue, RegisterOneTask) {
  const auto time = fml::TimePoint::Max();

  auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
  auto queue_id = task_queue->CreateTaskQueue();
  task_queue->SetWakeable(queue_id,
                          new TestWakeable([&time](fml::TimePoint wake_time) {
                            ASSERT_TRUE(wake_time == time);
                          }));

  task_queue->RegisterTask(
      queue_id, [] {}, time);
  ASSERT_TRUE(task_queue->HasPendingTasks(queue_id));
  ASSERT_TRUE(task_queue->GetNumPendingTasks(queue_id) == 1);
}

TEST(MessageLoopTaskQueue, RegisterTwoTasksAndCount) {
  auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
  auto queue_id = task_queue->CreateTaskQueue();
  task_queue->RegisterTask(
      queue_id, [] {}, ChronoTicksSinceEpoch());
  task_queue->RegisterTask(
      queue_id, [] {}, fml::TimePoint::Max());
  ASSERT_TRUE(task_queue->HasPendingTasks(queue_id));
  ASSERT_TRUE(task_queue->GetNumPendingTasks(queue_id) == 2);
}

TEST(MessageLoopTaskQueue, RegisterTasksOnMergedQueuesAndCount) {
  auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
  auto platform_queue = task_queue->CreateTaskQueue();
  auto raster_queue = task_queue->CreateTaskQueue();
  // A task in platform_queue
  task_queue->RegisterTask(
      platform_queue, []() {}, fml::TimePoint::Now());
  // A task in raster_queue
  task_queue->RegisterTask(
      raster_queue, []() {}, fml::TimePoint::Now());
  ASSERT_TRUE(task_queue->GetNumPendingTasks(platform_queue) == 1);
  ASSERT_TRUE(task_queue->GetNumPendingTasks(raster_queue) == 1);

  ASSERT_FALSE(task_queue->Owns(platform_queue, raster_queue));
  task_queue->Merge(platform_queue, raster_queue);
  ASSERT_TRUE(task_queue->Owns(platform_queue, raster_queue));

  ASSERT_TRUE(task_queue->HasPendingTasks(platform_queue));
  ASSERT_TRUE(task_queue->GetNumPendingTasks(platform_queue) == 2);
  // The task count of subsumed queue is 0
  ASSERT_FALSE(task_queue->HasPendingTasks(raster_queue));
  ASSERT_TRUE(task_queue->GetNumPendingTasks(raster_queue) == 0);

  task_queue->Unmerge(platform_queue, raster_queue);
  ASSERT_FALSE(task_queue->Owns(platform_queue, raster_queue));
  ASSERT_TRUE(task_queue->GetNumPendingTasks(platform_queue) == 1);
  ASSERT_TRUE(task_queue->GetNumPendingTasks(raster_queue) == 1);
}

TEST(MessageLoopTaskQueue, PreserveTaskOrdering) {
  auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
  auto queue_id = task_queue->CreateTaskQueue();
  int test_val = 0;

  // order: 0
  task_queue->RegisterTask(
      queue_id, [&test_val]() { test_val = 1; }, ChronoTicksSinceEpoch());

  // order: 1
  task_queue->RegisterTask(
      queue_id, [&test_val]() { test_val = 2; }, ChronoTicksSinceEpoch());

  const auto now = ChronoTicksSinceEpoch();
  int expected_value = 1;
  while (true) {
    fml::closure invocation = task_queue->GetNextTaskToRun(queue_id, now);
    if (!invocation) {
      break;
    }
    invocation();
    ASSERT_TRUE(test_val == expected_value);
    expected_value++;
  }
}

TEST(MessageLoopTaskQueue, RegisterTasksOnMergedQueuesPreserveTaskOrdering) {
  auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
  auto platform_queue = task_queue->CreateTaskQueue();
  auto raster1_queue = task_queue->CreateTaskQueue();
  auto raster2_queue = task_queue->CreateTaskQueue();
  int test_val = 0;

  // order 0 in raster1_queue
  task_queue->RegisterTask(
      raster1_queue, [&test_val]() { test_val = 0; }, fml::TimePoint::Now());

  // order 1 in platform_queue
  task_queue->RegisterTask(
      platform_queue, [&test_val]() { test_val = 1; }, fml::TimePoint::Now());

  // order 2 in raster2_queue
  task_queue->RegisterTask(
      raster2_queue, [&test_val]() { test_val = 2; }, fml::TimePoint::Now());

  task_queue->Merge(platform_queue, raster1_queue);
  ASSERT_TRUE(task_queue->Owns(platform_queue, raster1_queue));
  task_queue->Merge(platform_queue, raster2_queue);
  ASSERT_TRUE(task_queue->Owns(platform_queue, raster2_queue));
  const auto now = fml::TimePoint::Now();
  int expected_value = 0;
  // Right order:
  // "test_val = 0" in raster1_queue
  // "test_val = 1" in platform_queue
  // "test_val = 2" in raster2_queue
  while (true) {
    fml::closure invocation = task_queue->GetNextTaskToRun(platform_queue, now);
    if (!invocation) {
      break;
    }
    invocation();
    ASSERT_TRUE(test_val == expected_value);
    expected_value++;
  }
}

TEST(MessageLoopTaskQueue, UnmergeRespectTheOriginalTaskOrderingInQueues) {
  auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
  auto platform_queue = task_queue->CreateTaskQueue();
  auto raster_queue = task_queue->CreateTaskQueue();
  int test_val = 0;

  // order 0 in platform_queue
  task_queue->RegisterTask(
      platform_queue, [&test_val]() { test_val = 0; }, fml::TimePoint::Now());
  // order 1 in platform_queue
  task_queue->RegisterTask(
      platform_queue, [&test_val]() { test_val = 1; }, fml::TimePoint::Now());
  // order 2 in raster_queue
  task_queue->RegisterTask(
      raster_queue, [&test_val]() { test_val = 2; }, fml::TimePoint::Now());
  // order 3 in raster_queue
  task_queue->RegisterTask(
      raster_queue, [&test_val]() { test_val = 3; }, fml::TimePoint::Now());
  // order 4 in platform_queue
  task_queue->RegisterTask(
      platform_queue, [&test_val]() { test_val = 4; }, fml::TimePoint::Now());
  // order 5 in raster_queue
  task_queue->RegisterTask(
      raster_queue, [&test_val]() { test_val = 5; }, fml::TimePoint::Now());

  ASSERT_TRUE(task_queue->Merge(platform_queue, raster_queue));
  ASSERT_TRUE(task_queue->Owns(platform_queue, raster_queue));
  const auto now = fml::TimePoint::Now();
  // The right order after merged and consumed 3 tasks:
  // "test_val = 0" in platform_queue
  // "test_val = 1" in platform_queue
  // "test_val = 2" in raster_queue (running on platform)
  for (int i = 0; i < 3; i++) {
    fml::closure invocation = task_queue->GetNextTaskToRun(platform_queue, now);
    ASSERT_FALSE(!invocation);
    invocation();
    ASSERT_TRUE(test_val == i);
  }
  ASSERT_TRUE(task_queue->GetNumPendingTasks(platform_queue) == 3);
  ASSERT_TRUE(task_queue->GetNumPendingTasks(raster_queue) == 0);

  ASSERT_TRUE(task_queue->Unmerge(platform_queue, raster_queue));
  ASSERT_FALSE(task_queue->Owns(platform_queue, raster_queue));

  // The right order after unmerged and left 3 tasks:
  // "test_val = 3" in raster_queue
  // "test_val = 4" in platform_queue
  // "test_val = 5" in raster_queue

  // platform_queue has 1 task left: "test_val = 4"
  {
    ASSERT_TRUE(task_queue->GetNumPendingTasks(platform_queue) == 1);
    fml::closure invocation = task_queue->GetNextTaskToRun(platform_queue, now);
    ASSERT_FALSE(!invocation);
    invocation();
    ASSERT_TRUE(test_val == 4);
    ASSERT_TRUE(task_queue->GetNumPendingTasks(platform_queue) == 0);
  }

  // raster_queue has 2 tasks left: "test_val = 3" and "test_val = 5"
  {
    ASSERT_TRUE(task_queue->GetNumPendingTasks(raster_queue) == 2);
    fml::closure invocation = task_queue->GetNextTaskToRun(raster_queue, now);
    ASSERT_FALSE(!invocation);
    invocation();
    ASSERT_TRUE(test_val == 3);
  }
  {
    ASSERT_TRUE(task_queue->GetNumPendingTasks(raster_queue) == 1);
    fml::closure invocation = task_queue->GetNextTaskToRun(raster_queue, now);
    ASSERT_FALSE(!invocation);
    invocation();
    ASSERT_TRUE(test_val == 5);
    ASSERT_TRUE(task_queue->GetNumPendingTasks(raster_queue) == 0);
  }
}

void TestNotifyObservers(fml::TaskQueueId queue_id) {
  auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
  std::vector<fml::closure> observers =
      task_queue->GetObserversToNotify(queue_id);
  for (const auto& observer : observers) {
    observer();
  }
}

TEST(MessageLoopTaskQueue, AddRemoveNotifyObservers) {
  auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
  auto queue_id = task_queue->CreateTaskQueue();

  int test_val = 0;
  intptr_t key = 123;

  task_queue->AddTaskObserver(queue_id, key, [&test_val]() { test_val = 1; });
  TestNotifyObservers(queue_id);
  ASSERT_TRUE(test_val == 1);

  test_val = 0;
  task_queue->RemoveTaskObserver(queue_id, key);
  TestNotifyObservers(queue_id);
  ASSERT_TRUE(test_val == 0);
}

TEST(MessageLoopTaskQueue, WakeUpIndependentOfTime) {
  auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
  auto queue_id = task_queue->CreateTaskQueue();

  int num_wakes = 0;
  task_queue->SetWakeable(
      queue_id, new TestWakeable(
                    [&num_wakes](fml::TimePoint wake_time) { ++num_wakes; }));

  task_queue->RegisterTask(
      queue_id, []() {}, ChronoTicksSinceEpoch());
  task_queue->RegisterTask(
      queue_id, []() {}, fml::TimePoint::Max());

  ASSERT_TRUE(num_wakes == 2);
}

TEST(MessageLoopTaskQueue, WokenUpWithNewerTime) {
  auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
  auto queue_id = task_queue->CreateTaskQueue();
  fml::CountDownLatch latch(2);

  fml::TimePoint expected = fml::TimePoint::Max();

  task_queue->SetWakeable(
      queue_id, new TestWakeable([&latch, &expected](fml::TimePoint wake_time) {
        ASSERT_TRUE(wake_time == expected);
        latch.CountDown();
      }));

  task_queue->RegisterTask(
      queue_id, []() {}, fml::TimePoint::Max());

  const auto now = ChronoTicksSinceEpoch();
  expected = now;
  task_queue->RegisterTask(
      queue_id, []() {}, now);

  latch.Wait();
}

TEST(MessageLoopTaskQueue, NotifyObserversWhileCreatingQueues) {
  auto task_queues = fml::MessageLoopTaskQueues::GetInstance();
  fml::TaskQueueId queue_id = task_queues->CreateTaskQueue();
  fml::AutoResetWaitableEvent first_observer_executing, before_second_observer;

  task_queues->AddTaskObserver(queue_id, queue_id + 1, [&]() {
    first_observer_executing.Signal();
    before_second_observer.Wait();
  });

  for (int i = 0; i < 100; i++) {
    task_queues->AddTaskObserver(queue_id, queue_id + i + 2, [] {});
  }

  std::thread notify_observers([&]() { TestNotifyObservers(queue_id); });

  first_observer_executing.Wait();

  for (int i = 0; i < 100; i++) {
    task_queues->CreateTaskQueue();
  }

  before_second_observer.Signal();
  notify_observers.join();
}

TEST(MessageLoopTaskQueue, QueueDoNotOwnItself) {
  auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
  auto queue_id = task_queue->CreateTaskQueue();
  ASSERT_FALSE(task_queue->Owns(queue_id, queue_id));
}

TEST(MessageLoopTaskQueue, QueueDoNotOwnUnmergedTaskQueueId) {
  auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
  ASSERT_FALSE(task_queue->Owns(task_queue->CreateTaskQueue(), _kUnmerged));
  ASSERT_FALSE(task_queue->Owns(_kUnmerged, task_queue->CreateTaskQueue()));
  ASSERT_FALSE(task_queue->Owns(_kUnmerged, _kUnmerged));
}

TEST(MessageLoopTaskQueue, QueueOwnsMergedTaskQueueId) {
  auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
  auto platform_queue = task_queue->CreateTaskQueue();
  auto raster_queue = task_queue->CreateTaskQueue();
  ASSERT_FALSE(task_queue->Owns(platform_queue, raster_queue));
  ASSERT_FALSE(task_queue->Owns(raster_queue, platform_queue));
  task_queue->Merge(platform_queue, raster_queue);
  ASSERT_TRUE(task_queue->Owns(platform_queue, raster_queue));
  ASSERT_FALSE(task_queue->Owns(raster_queue, platform_queue));
}

//------------------------------------------------------------------------------
/// Verifies that tasks can be added to task queues concurrently.
///
TEST(MessageLoopTaskQueue, ConcurrentQueueAndTaskCreatingCounts) {
  auto task_queues = fml::MessageLoopTaskQueues::GetInstance();

  // kThreadCount threads post kThreadTaskCount tasks each to kTaskQueuesCount
  // task queues. Each thread picks a task queue randomly for each task.
  constexpr size_t kThreadCount = 4;
  constexpr size_t kTaskQueuesCount = 2;
  constexpr size_t kThreadTaskCount = 500;

  std::vector<TaskQueueId> task_queue_ids;
  for (size_t i = 0; i < kTaskQueuesCount; ++i) {
    task_queue_ids.emplace_back(task_queues->CreateTaskQueue());
  }

  ASSERT_EQ(task_queue_ids.size(), kTaskQueuesCount);

  fml::CountDownLatch tasks_posted_latch(kThreadCount);

  auto thread_main = [&]() {
    for (size_t i = 0; i < kThreadTaskCount; i++) {
      const auto current_task_queue_id =
          task_queue_ids[std::rand() % kTaskQueuesCount];
      const auto empty_task = []() {};
      // The timepoint doesn't matter as the queue is never going to be drained.
      const auto task_timepoint = ChronoTicksSinceEpoch();

      task_queues->RegisterTask(current_task_queue_id, empty_task,
                                task_timepoint);
    }

    tasks_posted_latch.CountDown();
  };

  std::vector<std::thread> threads;

  for (size_t i = 0; i < kThreadCount; i++) {
    threads.emplace_back(std::thread{thread_main});
  }

  ASSERT_EQ(threads.size(), kThreadCount);

  for (size_t i = 0; i < kThreadCount; i++) {
    threads[i].join();
  }

  // All tasks have been posted by now. Check that they are all pending.

  size_t pending_tasks = 0u;
  std::for_each(task_queue_ids.begin(), task_queue_ids.end(),
                [&](const auto& queue) {
                  pending_tasks += task_queues->GetNumPendingTasks(queue);
                });

  ASSERT_EQ(pending_tasks, kThreadCount * kThreadTaskCount);
}

TEST(MessageLoopTaskQueue, RegisterTaskWakesUpOwnerQueue) {
  auto task_queue = fml::MessageLoopTaskQueues::GetInstance();
  auto platform_queue = task_queue->CreateTaskQueue();
  auto raster_queue = task_queue->CreateTaskQueue();

  std::vector<fml::TimePoint> wakes;

  task_queue->SetWakeable(platform_queue,
                          new TestWakeable([&wakes](fml::TimePoint wake_time) {
                            wakes.push_back(wake_time);
                          }));

  task_queue->SetWakeable(raster_queue,
                          new TestWakeable([](fml::TimePoint wake_time) {
                            // The raster queue is owned by the platform queue.
                            ASSERT_FALSE(true);
                          }));

  auto time1 = ChronoTicksSinceEpoch() + fml::TimeDelta::FromMilliseconds(1);
  auto time2 = ChronoTicksSinceEpoch() + fml::TimeDelta::FromMilliseconds(2);

  ASSERT_EQ(0UL, wakes.size());

  task_queue->RegisterTask(
      platform_queue, []() {}, time1);

  ASSERT_EQ(1UL, wakes.size());
  ASSERT_EQ(time1, wakes[0]);

  task_queue->Merge(platform_queue, raster_queue);

  task_queue->RegisterTask(
      raster_queue, []() {}, time2);

  ASSERT_EQ(3UL, wakes.size());
  ASSERT_EQ(time1, wakes[1]);
  ASSERT_EQ(time1, wakes[2]);
}

}  // namespace testing
}  // namespace fml
