[vm] Make event handler use appropriate data structure for timers

Right now each isolate can have one port registered at the eventhandler.
When the timer expires the isolate will be notified. The isolate can
also cancel it's timer.

The timer implementation in the eventhandler is implemented as a linked
list, sorted by expiration date. Removing an entry for a given port will
perform a linear search. This O(N) search can be very problematic as the
number of isolates scales, effectively leading to O(N^2) behavior if N
isolates register/cancel N timers.

The priority_heap.h/priority_heap_test.cc was imported from the
implementation in "github.com/dartino/sdk".

Issue https://github.com/dart-lang/sdk/issues/44457

TEST=runtime/bin/priority_heap_test.cc

Change-Id: I657c1c1ac0c68ade5295b2569e3fb1a3478325ec
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/175729
Commit-Queue: Martin Kustermann <kustermann@google.com>
Reviewed-by: Vyacheslav Egorov <vegorov@google.com>
diff --git a/runtime/bin/builtin_impl_sources.gni b/runtime/bin/builtin_impl_sources.gni
index 58323d0..1045fdc 100644
--- a/runtime/bin/builtin_impl_sources.gni
+++ b/runtime/bin/builtin_impl_sources.gni
@@ -77,4 +77,5 @@
   "eventhandler_test.cc",
   "file_test.cc",
   "hashmap_test.cc",
+  "priority_heap_test.cc",
 ]
diff --git a/runtime/bin/eventhandler.cc b/runtime/bin/eventhandler.cc
index 6a02a0f..83e93a0 100644
--- a/runtime/bin/eventhandler.cc
+++ b/runtime/bin/eventhandler.cc
@@ -15,46 +15,6 @@
 namespace dart {
 namespace bin {
 
-void TimeoutQueue::UpdateTimeout(Dart_Port port, int64_t timeout) {
-  // Find port if present.
-  Timeout* last = NULL;
-  Timeout* current = timeouts_;
-  while (current != NULL) {
-    if (current->port() == port) {
-      // Found.
-      if (timeout < 0) {
-        // Remove from list and delete existing.
-        if (last != NULL) {
-          last->set_next(current->next());
-        } else {
-          timeouts_ = current->next();
-        }
-        delete current;
-      } else {
-        // Update timeout.
-        current->set_timeout(timeout);
-      }
-      break;
-    }
-    last = current;
-    current = current->next();
-  }
-  if (current == NULL && timeout >= 0) {
-    // Not found, create a new.
-    timeouts_ = new Timeout(port, timeout, timeouts_);
-  }
-  // Clear and find next timeout.
-  next_timeout_ = NULL;
-  current = timeouts_;
-  while (current != NULL) {
-    if ((next_timeout_ == NULL) ||
-        (current->timeout() < next_timeout_->timeout())) {
-      next_timeout_ = current;
-    }
-    current = current->next();
-  }
-}
-
 static EventHandler* event_handler = NULL;
 static Monitor* shutdown_monitor = NULL;
 
diff --git a/runtime/bin/eventhandler.h b/runtime/bin/eventhandler.h
index a305f65..69a8e3a 100644
--- a/runtime/bin/eventhandler.h
+++ b/runtime/bin/eventhandler.h
@@ -10,6 +10,7 @@
 #include "bin/isolate_data.h"
 
 #include "platform/hashmap.h"
+#include "platform/priority_queue.h"
 
 namespace dart {
 namespace bin {
@@ -57,56 +58,39 @@
 // clang-format on
 
 class TimeoutQueue {
- private:
-  class Timeout {
-   public:
-    Timeout(Dart_Port port, int64_t timeout, Timeout* next)
-        : port_(port), timeout_(timeout), next_(next) {}
-
-    Dart_Port port() const { return port_; }
-
-    int64_t timeout() const { return timeout_; }
-    void set_timeout(int64_t timeout) {
-      ASSERT(timeout >= 0);
-      timeout_ = timeout;
-    }
-
-    Timeout* next() const { return next_; }
-    void set_next(Timeout* next) { next_ = next; }
-
-   private:
-    Dart_Port port_;
-    int64_t timeout_;
-    Timeout* next_;
-  };
-
  public:
-  TimeoutQueue() : next_timeout_(NULL), timeouts_(NULL) {}
+  TimeoutQueue() {}
 
   ~TimeoutQueue() {
     while (HasTimeout())
       RemoveCurrent();
   }
 
-  bool HasTimeout() const { return next_timeout_ != NULL; }
+  bool HasTimeout() const { return !timeouts_.IsEmpty(); }
 
   int64_t CurrentTimeout() const {
-    ASSERT(next_timeout_ != NULL);
-    return next_timeout_->timeout();
+    ASSERT(!timeouts_.IsEmpty());
+    return timeouts_.Minimum().priority;
   }
 
   Dart_Port CurrentPort() const {
-    ASSERT(next_timeout_ != NULL);
-    return next_timeout_->port();
+    ASSERT(!timeouts_.IsEmpty());
+    return timeouts_.Minimum().value;
   }
 
-  void RemoveCurrent() { UpdateTimeout(CurrentPort(), -1); }
+  void RemoveCurrent() { timeouts_.RemoveMinimum(); }
 
-  void UpdateTimeout(Dart_Port port, int64_t timeout);
+  void UpdateTimeout(Dart_Port port, int64_t timeout) {
+    if (timeout < 0) {
+      timeouts_.RemoveByValue(port);
+    } else {
+      timeouts_.InsertOrChangePriority(timeout, port);
+    }
+  }
 
  private:
-  Timeout* next_timeout_;
-  Timeout* timeouts_;
+  PriorityQueue<int64_t, Dart_Port> timeouts_;
+  int64_t next_timeout_;
 
   DISALLOW_COPY_AND_ASSIGN(TimeoutQueue);
 };
@@ -399,7 +383,7 @@
     intptr_t is_reading;
     intptr_t token_count;
 
-    bool IsReady() { return token_count > 0 && is_reading; }
+    bool IsReady() { return token_count > 0 && is_reading != 0; }
   };
 
  public:
diff --git a/runtime/bin/priority_heap_test.cc b/runtime/bin/priority_heap_test.cc
new file mode 100644
index 0000000..165f1a4
--- /dev/null
+++ b/runtime/bin/priority_heap_test.cc
@@ -0,0 +1,167 @@
+// Copyright (c) 2020, the dart project authors. Please see the AUTHORS file
+// for details. all rights reserved. use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE.md file.
+
+#include "vm/unit_test.h"
+
+#include "platform/priority_queue.h"
+
+namespace dart {
+
+UNIT_TEST_CASE(PRIORITY_HEAP_WITH_INDEX__INCREASING) {
+  const word kSize = PriorityQueue<word, word>::kMinimumSize;
+
+  PriorityQueue<word, word> heap;
+  for (word i = 0; i < kSize; i++) {
+    heap.Insert(i, 10 + i);
+  }
+  ASSERT(heap.min_heap_size() == kSize);
+  for (word i = 0; i < kSize; i++) {
+    EXPECT(!heap.IsEmpty());
+    EXPECT_EQ(i, heap.Minimum().priority);
+    EXPECT_EQ(10 + i, heap.Minimum().value);
+    EXPECT(heap.ContainsValue(10 + i));
+    heap.RemoveMinimum();
+    EXPECT(!heap.ContainsValue(10 + i));
+  }
+  EXPECT(heap.IsEmpty());
+}
+
+UNIT_TEST_CASE(PRIORITY_HEAP_WITH_INDEX__DECREASING) {
+  const word kSize = PriorityQueue<word, word>::kMinimumSize;
+
+  PriorityQueue<word, word> heap;
+  for (word i = kSize - 1; i >= 0; i--) {
+    heap.Insert(i, 10 + i);
+  }
+  ASSERT(heap.min_heap_size() == kSize);
+  for (word i = 0; i < kSize; i++) {
+    EXPECT(!heap.IsEmpty());
+    EXPECT_EQ(i, heap.Minimum().priority);
+    EXPECT_EQ(10 + i, heap.Minimum().value);
+    EXPECT(heap.ContainsValue(10 + i));
+    heap.RemoveMinimum();
+    EXPECT(!heap.ContainsValue(10 + i));
+  }
+  EXPECT(heap.IsEmpty());
+}
+
+UNIT_TEST_CASE(PRIORITY_HEAP_WITH_INDEX__DELETE_BY_VALUES) {
+  const word kSize = PriorityQueue<word, word>::kMinimumSize;
+
+  PriorityQueue<word, word> heap;
+  for (word i = kSize - 1; i >= 0; i--) {
+    heap.Insert(i, 10 + i);
+  }
+
+  ASSERT(heap.min_heap_size() == kSize);
+
+  EXPECT(heap.RemoveByValue(10 + 0));
+  EXPECT(!heap.RemoveByValue(10 + 0));
+
+  EXPECT(heap.RemoveByValue(10 + 5));
+  EXPECT(!heap.RemoveByValue(10 + 5));
+
+  EXPECT(heap.RemoveByValue(10 + kSize - 1));
+  EXPECT(!heap.RemoveByValue(10 + kSize - 1));
+
+  for (word i = 0; i < kSize; i++) {
+    // Jump over the removed [i]s in the loop.
+    if (i != 0 && i != 5 && i != (kSize - 1)) {
+      EXPECT(!heap.IsEmpty());
+      EXPECT_EQ(i, heap.Minimum().priority);
+      EXPECT_EQ(10 + i, heap.Minimum().value);
+      EXPECT(heap.ContainsValue(10 + i));
+      heap.RemoveMinimum();
+      EXPECT(!heap.ContainsValue(10 + i));
+    }
+  }
+  EXPECT(heap.IsEmpty());
+}
+
+UNIT_TEST_CASE(PRIORITY_HEAP_WITH_INDEX__GROW_SHRINK) {
+  const word kSize = 1024;
+  const word kMinimumSize = PriorityQueue<word, word>::kMinimumSize;
+
+  PriorityQueue<word, word> heap;
+  for (word i = 0; i < kSize; i++) {
+    heap.Insert(i, 10 + i);
+  }
+
+  ASSERT(heap.min_heap_size() == kSize);
+
+  for (word i = 0; i < kSize; i++) {
+    EXPECT(!heap.IsEmpty());
+    EXPECT_EQ(i, heap.Minimum().priority);
+    EXPECT_EQ(10 + i, heap.Minimum().value);
+    EXPECT(heap.ContainsValue(10 + i));
+    heap.RemoveMinimum();
+    EXPECT(!heap.ContainsValue(10 + i));
+  }
+
+  EXPECT(heap.IsEmpty());
+  ASSERT(heap.min_heap_size() == kMinimumSize);
+
+  for (word i = 0; i < kSize; i++) {
+    heap.Insert(i, 10 + i);
+  }
+
+  for (word i = 0; i < kSize; i++) {
+    EXPECT(!heap.IsEmpty());
+    EXPECT_EQ(i, heap.Minimum().priority);
+    EXPECT_EQ(10 + i, heap.Minimum().value);
+    EXPECT(heap.ContainsValue(10 + i));
+    heap.RemoveMinimum();
+    EXPECT(!heap.ContainsValue(10 + i));
+  }
+
+  EXPECT(heap.IsEmpty());
+  ASSERT(heap.min_heap_size() == kMinimumSize);
+}
+
+UNIT_TEST_CASE(PRIORITY_HEAP_WITH_INDEX__CHANGE_PRIORITY) {
+  const word kSize = PriorityQueue<word, word>::kMinimumSize;
+
+  PriorityQueue<word, word> heap;
+  for (word i = 0; i < kSize; i++) {
+    if (i % 2 == 0) {
+      heap.Insert(i, 10 + i);
+    }
+  }
+  ASSERT(heap.min_heap_size() == kSize);
+  for (word i = 0; i < kSize; i++) {
+    bool was_inserted = i % 2 == 0;
+    bool increase = i % 3 == 0;
+    word new_priority = i + (increase ? 100 : -100);
+
+    EXPECT(was_inserted != heap.InsertOrChangePriority(new_priority, 10 + i));
+  }
+
+  for (word i = 0; i < kSize; i++) {
+    bool increase = i % 3 == 0;
+    if (!increase) {
+      word expected_priority = i + (increase ? 100 : -100);
+      EXPECT(!heap.IsEmpty());
+      EXPECT_EQ(expected_priority, heap.Minimum().priority);
+      EXPECT_EQ(10 + i, heap.Minimum().value);
+      EXPECT(heap.ContainsValue(10 + i));
+      heap.RemoveMinimum();
+      EXPECT(!heap.ContainsValue(10 + i));
+    }
+  }
+  for (word i = 0; i < kSize; i++) {
+    bool increase = i % 3 == 0;
+    if (increase) {
+      word expected_priority = i + (increase ? 100 : -100);
+      EXPECT(!heap.IsEmpty());
+      EXPECT_EQ(expected_priority, heap.Minimum().priority);
+      EXPECT_EQ(10 + i, heap.Minimum().value);
+      EXPECT(heap.ContainsValue(10 + i));
+      heap.RemoveMinimum();
+      EXPECT(!heap.ContainsValue(10 + i));
+    }
+  }
+  EXPECT(heap.IsEmpty());
+}
+
+}  // namespace dart.
diff --git a/runtime/platform/priority_queue.h b/runtime/platform/priority_queue.h
new file mode 100644
index 0000000..2064fcc
--- /dev/null
+++ b/runtime/platform/priority_queue.h
@@ -0,0 +1,269 @@
+// Copyright (c) 2020, the Dart project authors.  Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+#ifndef RUNTIME_PLATFORM_PRIORITY_QUEUE_H_
+#define RUNTIME_PLATFORM_PRIORITY_QUEUE_H_
+
+#include "platform/assert.h"
+#include "platform/globals.h"
+#include "platform/hashmap.h"
+#include "platform/utils.h"
+
+namespace dart {
+
+// A min-priority queue with deletion support.
+//
+// The [PriorityQueue] allows insertion of entries with a priority [P] and a
+// value [V]. The minimum element can be queried in O(1) time.
+// Insertion/Deletion operations have O(N) time.
+//
+// In addition to the normal insert/minimum/remove-minimum operations this
+// priority queue allows deletion-by-value. We have therefore an invariant
+// is that the value must be unique amongst all entries.
+template <typename P, typename V>
+class PriorityQueue {
+ public:
+  static const intptr_t kMinimumSize = 16;
+
+  struct Entry {
+    P priority;
+    V value;
+  };
+
+  PriorityQueue() : hashmap_(&MatchFun, kMinimumSize) {
+    min_heap_size_ = kMinimumSize;
+    min_heap_ =
+        reinterpret_cast<Entry*>(malloc(sizeof(Entry) * min_heap_size_));
+    if (min_heap_ == nullptr) FATAL("Cannot allocate memory.");
+    size_ = 0;
+  }
+
+  ~PriorityQueue() { free(min_heap_); }
+
+  // Whether the queue is empty.
+  bool IsEmpty() const { return size_ == 0; }
+
+  // Inserts a new entry with [priority] and [value], requires there to be no
+  // existing entry with given [value].
+  void Insert(const P& priority, const V& value) {
+    ASSERT(!ContainsValue(value));
+
+    if (size_ == min_heap_size_) {
+      Resize(min_heap_size_ << 1);
+    }
+
+    Set(size_, {priority, value});
+    BubbleUp(size_);
+
+    size_++;
+  }
+
+  // Returns a reference to the minimum entry.
+  //
+  // The caller can access it's priority and value in read-only mode only.
+  const Entry& Minimum() const {
+    ASSERT(!IsEmpty());
+    return min_heap_[0];
+  }
+
+  // Removes the minimum entry.
+  void RemoveMinimum() {
+    ASSERT(!IsEmpty());
+    RemoveAt(0);
+  }
+
+  // Removes an existing entry with the given [value].
+  //
+  // Returns true if such an entry was removed.
+  bool RemoveByValue(const V& value) {
+    auto entry = FindMapEntry(value);
+    if (entry != nullptr) {
+      const intptr_t offset = ValueOfMapEntry(entry);
+      RemoveAt(offset);
+
+      ASSERT(hashmap_.size() == size_);
+      return true;
+    }
+    return false;
+  }
+
+  // Whether the priority queue contains an entry with the given [value].
+  bool ContainsValue(const V& value) { return FindMapEntry(value) != nullptr; }
+
+  // Changes the priority of an existing entry with given [value] or adds a
+  // new entry.
+  bool InsertOrChangePriority(const P& priority, const V& value) {
+    auto map_entry = FindMapEntry(value);
+    if (map_entry == nullptr) {
+      Insert(priority, value);
+      return true;
+    }
+
+    const intptr_t offset = ValueOfMapEntry(map_entry);
+    ASSERT(offset < size_);
+
+    Entry& entry = min_heap_[offset];
+    entry.priority = priority;
+    if (offset == 0) {
+      BubbleDown(offset);
+    } else {
+      intptr_t parent = (offset - 1) / 2;
+      intptr_t diff = entry.priority - min_heap_[parent].priority;
+      if (diff < 0) {
+        BubbleUp(offset);
+      } else if (diff > 0) {
+        BubbleDown(offset);
+      }
+    }
+    return false;
+  }
+
+#ifdef TESTING
+  intptr_t min_heap_size() { return min_heap_size_; }
+#endif  // TESTING
+
+ private:
+  // Utility functions dealing with the SimpleHashMap interface.
+  static bool MatchFun(void* key1, void* key2) { return key1 == key2; }
+
+  SimpleHashMap::Entry* FindMapEntry(const V& key, bool insert = false) {
+    return hashmap_.Lookup(CastKey(key), HashKey(key), insert);
+  }
+  void RemoveMapEntry(const V& key) {
+    ASSERT(FindMapEntry(key) != nullptr);
+    hashmap_.Remove(CastKey(key), HashKey(key));
+  }
+  void SetMapEntry(const V& key, intptr_t value) {
+    FindMapEntry(key, /*insert=*/true)->value = reinterpret_cast<void*>(value);
+  }
+  static uint32_t HashKey(const V& key) {
+    return static_cast<uint32_t>(reinterpret_cast<intptr_t>(CastKey(key)));
+  }
+  static intptr_t ValueOfMapEntry(SimpleHashMap::Entry* entry) {
+    return reinterpret_cast<intptr_t>(entry->value);
+  }
+  static void* CastKey(const V& key) {
+    return reinterpret_cast<void*>((const_cast<V&>(key)));
+  }
+
+  void RemoveAt(intptr_t offset) {
+    ASSERT(offset < size_);
+
+    size_--;
+
+    if (offset == size_) {
+      RemoveMapEntry(min_heap_[offset].value);
+    } else {
+      Replace(offset, size_);
+      BubbleDown(offset);
+    }
+
+    if (size_ <= (min_heap_size_ >> 2) &&
+        kMinimumSize <= (min_heap_size_ >> 1)) {
+      Resize(min_heap_size_ >> 1);
+    }
+  }
+
+  void BubbleUp(intptr_t offset) {
+    while (true) {
+      if (offset == 0) return;
+
+      intptr_t parent = (offset - 1) / 2;
+      if (min_heap_[parent].priority > min_heap_[offset].priority) {
+        Swap(parent, offset);
+      }
+      offset = parent;
+    }
+  }
+
+  void BubbleDown(intptr_t offset) {
+    while (true) {
+      intptr_t left_child_index = 2 * offset + 1;
+      bool has_left_child = left_child_index < size_;
+
+      if (!has_left_child) return;
+
+      intptr_t smallest_index = offset;
+
+      if (min_heap_[left_child_index].priority < min_heap_[offset].priority) {
+        smallest_index = left_child_index;
+      }
+
+      intptr_t right_child_index = left_child_index + 1;
+      bool has_right_child = right_child_index < size_;
+      if (has_right_child) {
+        if (min_heap_[right_child_index].priority <
+            min_heap_[smallest_index].priority) {
+          smallest_index = right_child_index;
+        }
+      }
+
+      if (offset == smallest_index) {
+        return;
+      }
+
+      Swap(offset, smallest_index);
+      offset = smallest_index;
+    }
+  }
+
+  void Set(intptr_t offset1, const Entry& entry) {
+    min_heap_[offset1] = entry;
+    SetMapEntry(entry.value, offset1);
+  }
+
+  void Swap(intptr_t offset1, intptr_t offset2) {
+    Entry temp = min_heap_[offset1];
+    min_heap_[offset1] = min_heap_[offset2];
+    min_heap_[offset2] = temp;
+
+    SetMapEntry(min_heap_[offset1].value, offset1);
+    SetMapEntry(min_heap_[offset2].value, offset2);
+  }
+
+  void Replace(intptr_t index, intptr_t with_other) {
+    RemoveMapEntry(min_heap_[index].value);
+
+    const Entry& entry = min_heap_[with_other];
+    SetMapEntry(entry.value, index);
+    min_heap_[index] = entry;
+  }
+
+  void Resize(intptr_t new_min_heap_size) {
+    ASSERT(size_ < new_min_heap_size);
+    ASSERT(new_min_heap_size != min_heap_size_);
+
+    Entry* new_backing = reinterpret_cast<Entry*>(
+        realloc(min_heap_, sizeof(Entry) * new_min_heap_size));
+
+    if (new_backing == NULL) FATAL("Cannot allocate memory.");
+
+    min_heap_ = new_backing;
+    min_heap_size_ = new_min_heap_size;
+  }
+
+  // The array is representing a tree structure with guaranteed log(n) height.
+  // It has the property that the value of node N is always equal or smaller
+  // than the value of N's children. Furthermore it is a "dense" tree in the
+  // sense that all rows/layers of the tree are fully occupied except the last
+  // one. The way to represent such "dense" trees is via an array that allows
+  // finding left/right children by <2*index+1><2*index+2> and the parent by
+  // <(index-1)/2>.
+  //
+  // Insertion operations can be performed by adding one more entry at the end
+  // (bottom right) and bubbling it up until the tree invariant is satisfied
+  // again.
+  //
+  // Deletion operations can be performed by replacing the minimum element
+  // (first entry) by the last entry (bottom right) and bubbling it down until
+  // the tree invariant is satisified again.
+  Entry* min_heap_;
+  intptr_t min_heap_size_;
+  intptr_t size_;
+  SimpleHashMap hashmap_;
+};
+
+}  // namespace dart
+
+#endif  // RUNTIME_PLATFORM_PRIORITY_QUEUE_H_