[vm, gc] Remove busy waiting from parallel mark and scavenge.

TEST=ci
Bug: https://github.com/dart-lang/sdk/issues/40695
Change-Id: I1da72df5019f2befc3a456b7e956e32c40150fa1
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/202520
Commit-Queue: Ryan Macnak <rmacnak@google.com>
Reviewed-by: Slava Egorov <vegorov@google.com>
Reviewed-by: Martin Kustermann <kustermann@google.com>
diff --git a/runtime/vm/heap/marker.cc b/runtime/vm/heap/marker.cc
index 3a79f14..f6a984c 100644
--- a/runtime/vm/heap/marker.cc
+++ b/runtime/vm/heap/marker.cc
@@ -212,6 +212,10 @@
     }
   }
 
+  bool WaitForWork(RelaxedAtomic<uintptr_t>* num_busy) {
+    return work_list_.WaitForWork(num_busy);
+  }
+
   void AbandonWork() {
     work_list_.AbandonWork();
     deferred_work_list_.AbandonWork();
@@ -552,23 +556,7 @@
       do {
         do {
           visitor_->DrainMarkingStack();
-
-          // I can't find more work right now. If no other task is busy,
-          // then there will never be more work (NB: 1 is *before* decrement).
-          if (num_busy_->fetch_sub(1u) == 1) break;
-
-          // Wait for some work to appear.
-          // TODO(40695): Replace busy-waiting with a solution using Monitor,
-          // and redraw the boundaries between stack/visitor/task as needed.
-          while (marking_stack_->IsEmpty() && num_busy_->load() > 0) {
-          }
-
-          // If no tasks are busy, there will never be more work.
-          if (num_busy_->load() == 0) break;
-
-          // I saw some work; get busy and compete for it.
-          num_busy_->fetch_add(1u);
-        } while (true);
+        } while (visitor_->WaitForWork(num_busy_));
         // Wait for all markers to stop.
         barrier_->Sync();
 #if defined(DEBUG)
diff --git a/runtime/vm/heap/pointer_block.cc b/runtime/vm/heap/pointer_block.cc
index 2814465..daedab8 100644
--- a/runtime/vm/heap/pointer_block.cc
+++ b/runtime/vm/heap/pointer_block.cc
@@ -41,7 +41,7 @@
 }
 
 template <int BlockSize>
-BlockStack<BlockSize>::BlockStack() : mutex_() {}
+BlockStack<BlockSize>::BlockStack() : monitor_() {}
 
 template <int BlockSize>
 BlockStack<BlockSize>::~BlockStack() {
@@ -50,7 +50,7 @@
 
 template <int BlockSize>
 void BlockStack<BlockSize>::Reset() {
-  MutexLocker local_mutex_locker(&mutex_);
+  MonitorLocker local_mutex_locker(&monitor_);
   {
     // Empty all blocks and move them to the global cache.
     MutexLocker global_mutex_locker(global_mutex_);
@@ -70,7 +70,7 @@
 
 template <int BlockSize>
 typename BlockStack<BlockSize>::Block* BlockStack<BlockSize>::TakeBlocks() {
-  MutexLocker ml(&mutex_);
+  MonitorLocker ml(&monitor_);
   while (!partial_.IsEmpty()) {
     full_.Push(partial_.Pop());
   }
@@ -81,15 +81,45 @@
 void BlockStack<BlockSize>::PushBlockImpl(Block* block) {
   ASSERT(block->next() == NULL);  // Should be just a single block.
   if (block->IsFull()) {
-    MutexLocker ml(&mutex_);
+    MonitorLocker ml(&monitor_);
+    bool was_empty = IsEmptyLocked();
     full_.Push(block);
+    if (was_empty) ml.Notify();
   } else if (block->IsEmpty()) {
     MutexLocker ml(global_mutex_);
     global_empty_->Push(block);
     TrimGlobalEmpty();
   } else {
-    MutexLocker ml(&mutex_);
+    MonitorLocker ml(&monitor_);
+    bool was_empty = IsEmptyLocked();
     partial_.Push(block);
+    if (was_empty) ml.Notify();
+  }
+}
+
+template <int BlockSize>
+typename BlockStack<BlockSize>::Block* BlockStack<BlockSize>::WaitForWork(
+    RelaxedAtomic<uintptr_t>* num_busy) {
+  MonitorLocker ml(&monitor_);
+  if (num_busy->fetch_sub(1u) == 1 /* 1 is before subtraction */) {
+    // This is the last worker, wake the others now that we know no further work
+    // will come.
+    ml.NotifyAll();
+    return NULL;
+  }
+  for (;;) {
+    if (!full_.IsEmpty()) {
+      num_busy->fetch_add(1u);
+      return full_.Pop();
+    }
+    if (!partial_.IsEmpty()) {
+      num_busy->fetch_add(1u);
+      return partial_.Pop();
+    }
+    ml.Wait();
+    if (num_busy->load() == 0) {
+      return NULL;
+    }
   }
 }
 
@@ -103,7 +133,7 @@
 void StoreBuffer::PushBlock(Block* block, ThresholdPolicy policy) {
   BlockStack<Block::kSize>::PushBlockImpl(block);
   if ((policy == kCheckThreshold) && Overflowed()) {
-    MutexLocker ml(&mutex_);
+    MonitorLocker ml(&monitor_);
     Thread* thread = Thread::Current();
     // Sanity check: it makes no sense to schedule the GC in another isolate
     // group.
@@ -118,7 +148,7 @@
 typename BlockStack<BlockSize>::Block*
 BlockStack<BlockSize>::PopNonFullBlock() {
   {
-    MutexLocker ml(&mutex_);
+    MonitorLocker ml(&monitor_);
     if (!partial_.IsEmpty()) {
       return partial_.Pop();
     }
@@ -140,7 +170,7 @@
 template <int BlockSize>
 typename BlockStack<BlockSize>::Block*
 BlockStack<BlockSize>::PopNonEmptyBlock() {
-  MutexLocker ml(&mutex_);
+  MonitorLocker ml(&monitor_);
   if (!full_.IsEmpty()) {
     return full_.Pop();
   } else if (!partial_.IsEmpty()) {
@@ -152,7 +182,12 @@
 
 template <int BlockSize>
 bool BlockStack<BlockSize>::IsEmpty() {
-  MutexLocker ml(&mutex_);
+  MonitorLocker ml(&monitor_);
+  return IsEmptyLocked();
+}
+
+template <int BlockSize>
+bool BlockStack<BlockSize>::IsEmptyLocked() {
   return full_.IsEmpty() && partial_.IsEmpty();
 }
 
@@ -189,7 +224,7 @@
 }
 
 bool StoreBuffer::Overflowed() {
-  MutexLocker ml(&mutex_);
+  MonitorLocker ml(&monitor_);
   return (full_.length() + partial_.length()) > kMaxNonEmpty;
 }
 
diff --git a/runtime/vm/heap/pointer_block.h b/runtime/vm/heap/pointer_block.h
index 168a1cb..5ef6dc2 100644
--- a/runtime/vm/heap/pointer_block.h
+++ b/runtime/vm/heap/pointer_block.h
@@ -108,6 +108,8 @@
 
   bool IsEmpty();
 
+  Block* WaitForWork(RelaxedAtomic<uintptr_t>* num_busy);
+
  protected:
   class List {
    public:
@@ -126,6 +128,8 @@
     DISALLOW_COPY_AND_ASSIGN(List);
   };
 
+  bool IsEmptyLocked();
+
   // Adds and transfers ownership of the block to the buffer.
   void PushBlockImpl(Block* block);
 
@@ -134,7 +138,7 @@
 
   List full_;
   List partial_;
-  Mutex mutex_;
+  Monitor monitor_;
 
   // Note: This is shared on the basis of block size.
   static const intptr_t kMaxGlobalEmpty = 100;
@@ -191,6 +195,17 @@
     local_output_->Push(raw_obj);
   }
 
+  bool WaitForWork(RelaxedAtomic<uintptr_t>* num_busy) {
+    ASSERT(local_input_->IsEmpty());
+    Block* new_work = stack_->WaitForWork(num_busy);
+    if (new_work == NULL) {
+      return false;
+    }
+    stack_->PushBlock(local_input_);
+    local_input_ = new_work;
+    return true;
+  }
+
   void Finalize() {
     ASSERT(local_output_->IsEmpty());
     stack_->PushBlock(local_output_);
diff --git a/runtime/vm/heap/scavenger.cc b/runtime/vm/heap/scavenger.cc
index cf13c43..3faf889 100644
--- a/runtime/vm/heap/scavenger.cc
+++ b/runtime/vm/heap/scavenger.cc
@@ -261,6 +261,10 @@
            !promoted_list_.IsEmpty();
   }
 
+  bool WaitForWork(RelaxedAtomic<uintptr_t>* num_busy) {
+    return promoted_list_.WaitForWork(num_busy);
+  }
+
   void Finalize() {
     if (scavenger_->abort_) {
       promoted_list_.AbandonWork();
@@ -562,23 +566,7 @@
     do {
       do {
         visitor_->ProcessSurvivors();
-
-        // I can't find more work right now. If no other task is busy,
-        // then there will never be more work (NB: 1 is *before* decrement).
-        if (num_busy_->fetch_sub(1u) == 1) break;
-
-        // Wait for some work to appear.
-        // TODO(iposva): Replace busy-waiting with a solution using Monitor,
-        // and redraw the boundaries between stack/visitor/task as needed.
-        while (!visitor_->HasWork() && num_busy_->load() > 0) {
-        }
-
-        // If no tasks are busy, there will never be more work.
-        if (num_busy_->load() == 0) break;
-
-        // I saw some work; get busy and compete for it.
-        num_busy_->fetch_add(1u);
-      } while (true);
+      } while (visitor_->WaitForWork(num_busy_));
       // Wait for all scavengers to stop.
       barrier_->Sync();
 #if defined(DEBUG)