[io/file_watcher] Ensure start/stop file watching requests are run on dart thread.

At present start/stop requests are scheduled on RunLoop thread.
This results in deadlocks since same RunLoop thread might be busy
with blocking writes of file watching events, not giving a chance
for Dart to read previously-written events. Reading would unblock
writer.
So this CL moves start/stop requests to run on Dart thread instead.

Fixes https://github.com/dart-lang/sdk/issues/45996

TEST=run analysis_server against flutter_gallery github-backed folder, switch between branches and ensure analyze_server remains responsive

Change-Id: I0464eeecf8e46ba3027fa0ed21cc323495d965c3
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/201442
Commit-Queue: Alexander Aprelev <aam@google.com>
Reviewed-by: Siva Annamalai <asiva@google.com>
diff --git a/runtime/bin/file_system_watcher_macos.cc b/runtime/bin/file_system_watcher_macos.cc
index b73c0d13..88d6986 100644
--- a/runtime/bin/file_system_watcher_macos.cc
+++ b/runtime/bin/file_system_watcher_macos.cc
@@ -82,81 +82,36 @@
     void set_ref(FSEventStreamRef ref) { ref_ = ref; }
 
     void Start() {
-      // Schedule StartCallback to be executed in the RunLoop.
-      CFRunLoopTimerContext context;
-      memset(&context, 0, sizeof(context));
-      context.info = this;
-      CFRunLoopTimerRef timer =
-          CFRunLoopTimerCreate(NULL, 0, 0, 0, 0, Node::StartCallback, &context);
-      CFRunLoopAddTimer(watcher_->run_loop_, timer, kCFRunLoopCommonModes);
-      CFRelease(timer);
-      watcher_->monitor_.Enter();
-      while (!ready_) {
-        watcher_->monitor_.Wait(Monitor::kNoTimeout);
-      }
-      watcher_->monitor_.Exit();
-    }
-
-    static void StartCallback(CFRunLoopTimerRef timer, void* info) {
-      Node* node = reinterpret_cast<Node*>(info);
-      ASSERT(Thread::Compare(node->watcher_->threadId_,
-                             Thread::GetCurrentThreadId()));
       FSEventStreamContext context;
       memset(&context, 0, sizeof(context));
-      context.info = reinterpret_cast<void*>(node);
+      context.info = reinterpret_cast<void*>(this);
       CFArrayRef array = CFArrayCreate(
-          NULL, reinterpret_cast<const void**>(&node->path_ref_), 1, NULL);
+          NULL, reinterpret_cast<const void**>(&path_ref_), 1, NULL);
       FSEventStreamRef ref = FSEventStreamCreate(
           NULL, Callback, &context, array, kFSEventStreamEventIdSinceNow, 0.10,
           kFSEventStreamCreateFlagFileEvents);
       CFRelease(array);
 
-      node->set_ref(ref);
+      set_ref(ref);
+      ready_.store(true, std::memory_order_release);
 
-      FSEventStreamScheduleWithRunLoop(node->ref_, node->watcher_->run_loop_,
+      FSEventStreamScheduleWithRunLoop(ref_, watcher_->run_loop_,
                                        kCFRunLoopDefaultMode);
 
-      FSEventStreamStart(node->ref_);
-      FSEventStreamFlushSync(node->ref_);
-
-      node->watcher_->monitor_.Enter();
-      node->ready_ = true;
-      node->watcher_->monitor_.Notify();
-      node->watcher_->monitor_.Exit();
+      FSEventStreamStart(ref_);
+      FSEventStreamFlushSync(ref_);
     }
 
     void Stop() {
-      // Schedule StopCallback to be executed in the RunLoop.
       ASSERT(ready_);
-      CFRunLoopTimerContext context;
-      memset(&context, 0, sizeof(context));
-      context.info = this;
-      CFRunLoopTimerRef timer =
-          CFRunLoopTimerCreate(NULL, 0, 0, 0, 0, StopCallback, &context);
-      CFRunLoopAddTimer(watcher_->run_loop_, timer, kCFRunLoopCommonModes);
-      CFRelease(timer);
-      watcher_->monitor_.Enter();
-      while (ready_) {
-        watcher_->monitor_.Wait(Monitor::kNoTimeout);
-      }
-      watcher_->monitor_.Exit();
-    }
-
-    static void StopCallback(CFRunLoopTimerRef timer, void* info) {
-      Node* node = reinterpret_cast<Node*>(info);
-      ASSERT(Thread::Compare(node->watcher_->threadId_,
-                             Thread::GetCurrentThreadId()));
-      FSEventStreamStop(node->ref_);
-      FSEventStreamInvalidate(node->ref_);
-      FSEventStreamRelease(node->ref_);
-      node->watcher_->monitor_.Enter();
-      node->ready_ = false;
-      node->watcher_->monitor_.Notify();
-      node->watcher_->monitor_.Exit();
+      FSEventStreamStop(ref_);
+      FSEventStreamInvalidate(ref_);
+      FSEventStreamRelease(ref_);
+      ready_.store(false, std::memory_order_release);
     }
 
     FSEventsWatcher* watcher() const { return watcher_; }
-    bool ready() const { return ready_; }
+    bool ready() const { return ready_.load(std::memory_order_acquire); }
     intptr_t base_path_length() const { return base_path_length_; }
     int read_fd() const { return read_fd_; }
     int write_fd() const { return write_fd_; }
@@ -164,7 +119,7 @@
 
    private:
     FSEventsWatcher* watcher_;
-    bool ready_;
+    std::atomic<bool> ready_;
     intptr_t base_path_length_;
     CFStringRef path_ref_;
     int read_fd_;
@@ -266,8 +221,6 @@
     Node* node = reinterpret_cast<Node*>(client);
     ASSERT(Thread::Compare(node->watcher()->threadId_,
                            Thread::GetCurrentThreadId()));
-    // `ready` is set on same thread as this callback is invoked, so we don't
-    // need to lock here.
     if (!node->ready()) {
       return;
     }