[vm/io] On Windows do not consider sockets with pending operations closed.

ClientSocket might have pending operations associated with it, so we should
not delete it until those operations fire.

This aligns ClientSocket::IsClosed with IsClosed implementation for other
handles, which check if handle has any associated pending operations.

The situation that sometimes arises it that if we issue Read, Disconnect operation,
they might complete in the opposite order DisconnectComplete, ReadComplete, which
is why we need to check for pending operations.

MSDN contains sentences like these: "Please note that while the packets are
queued in FIFO order they may be dequeued in a different order."

It seems that a more uniform fix would be to it increment a reference count
of a handle whenever an asynchronous operation is started and decrement it
whenever it is completed - but such fix requires much more thorough
refactoring of eventhandler_win.cc.

Bug: https://github.com/flutter/flutter/issues/22558
Change-Id: I4e6a7d5fdeaa85b9903d005b5bb95338033228f1
Reviewed-on: https://dart-review.googlesource.com/c/90484
Commit-Queue: Vyacheslav Egorov <vegorov@google.com>
Reviewed-by: Jonas Termansen <sortie@google.com>
Reviewed-by: Martin Kustermann <kustermann@google.com>
diff --git a/runtime/bin/eventhandler_win.cc b/runtime/bin/eventhandler_win.cc
index 7dc6561..7b73b7e 100644
--- a/runtime/bin/eventhandler_win.cc
+++ b/runtime/bin/eventhandler_win.cc
@@ -166,13 +166,11 @@
 }
 
 bool Handle::HasPendingRead() {
-  MonitorLocker ml(monitor_);
-  return pending_read_ != NULL;
+  return pending_read_ != nullptr;
 }
 
 bool Handle::HasPendingWrite() {
-  MonitorLocker ml(monitor_);
-  return pending_write_ != NULL;
+  return pending_write_ != nullptr;
 }
 
 void Handle::WaitForReadThreadStarted() {
@@ -258,7 +256,7 @@
 
 void Handle::ReadSyncCompleteAsync() {
   NotifyReadThreadStarted();
-  ASSERT(pending_read_ != NULL);
+  ASSERT(HasPendingRead());
   ASSERT(pending_read_->GetBufferSize() >= kStdOverlappedBufferSize);
 
   DWORD buffer_size = pending_read_->GetBufferSize();
@@ -283,7 +281,7 @@
 
 bool Handle::IssueRead() {
   ASSERT(type_ != kListenSocket);
-  ASSERT(pending_read_ == NULL);
+  ASSERT(!HasPendingRead());
   OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize);
   if (SupportsOverlappedIO()) {
     ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
@@ -319,7 +317,7 @@
   MonitorLocker ml(monitor_);
   ASSERT(type_ != kListenSocket);
   ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
-  ASSERT(pending_write_ != NULL);
+  ASSERT(HasPendingWrite());
   ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite);
 
   OverlappedBuffer* buffer = pending_write_;
@@ -395,13 +393,13 @@
 }
 
 bool DirectoryWatchHandle::IsClosed() {
-  return IsClosing() && (pending_read_ == NULL);
+  return IsClosing() && !HasPendingRead();
 }
 
 bool DirectoryWatchHandle::IssueRead() {
   // It may have been started before, as we start the directory-handler when
   // we create it.
-  if ((pending_read_ != NULL) || (data_ready_ != NULL)) {
+  if (HasPendingRead() || (data_ready_ != NULL)) {
     return true;
   }
   OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize);
@@ -422,7 +420,7 @@
   MonitorLocker ml(monitor_);
   // Stop the outstanding read, so we can close the handle.
 
-  if (pending_read_ != NULL) {
+  if (HasPendingRead()) {
     CancelIoEx(handle(), pending_read_->GetCleanOverlapped());
     // Don't dispose of the buffer, as it will still complete (with length 0).
   }
@@ -660,7 +658,7 @@
 
 intptr_t Handle::Write(const void* buffer, intptr_t num_bytes) {
   MonitorLocker ml(monitor_);
-  if (pending_write_ != NULL) {
+  if (HasPendingWrite()) {
     return 0;
   }
   if (num_bytes > kBufferSize) {
@@ -684,7 +682,7 @@
                         struct sockaddr* sa,
                         socklen_t sa_len) {
   MonitorLocker ml(monitor_);
-  if (pending_write_ != NULL) {
+  if (HasPendingWrite()) {
     return 0;
   }
   if (num_bytes > kBufferSize) {
@@ -728,7 +726,7 @@
 
   while (write_thread_running_) {
     ml.Wait(Monitor::kNoTimeout);
-    if (pending_write_ != NULL) {
+    if (HasPendingWrite()) {
       // We woke up and had a pending write. Execute it.
       WriteSyncCompleteAsync();
     }
@@ -739,7 +737,7 @@
 }
 
 void StdHandle::WriteSyncCompleteAsync() {
-  ASSERT(pending_write_ != NULL);
+  ASSERT(HasPendingWrite());
 
   DWORD bytes_written = -1;
   BOOL ok = WriteFile(handle_, pending_write_->GetBufferStart(),
@@ -759,7 +757,7 @@
 
 intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) {
   MonitorLocker ml(monitor_);
-  if (pending_write_ != NULL) {
+  if (HasPendingWrite()) {
     return 0;
   }
   if (num_bytes > kBufferSize) {
@@ -861,7 +859,7 @@
 bool ClientSocket::IssueRead() {
   MonitorLocker ml(monitor_);
   ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
-  ASSERT(pending_read_ == NULL);
+  ASSERT(!HasPendingRead());
 
   // TODO(sgjesse): Use a MTU value here. Only the loopback adapter can
   // handle 64k datagrams.
@@ -884,7 +882,7 @@
 bool ClientSocket::IssueWrite() {
   MonitorLocker ml(monitor_);
   ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
-  ASSERT(pending_write_ != NULL);
+  ASSERT(HasPendingWrite());
   ASSERT(pending_write_->operation() == OverlappedBuffer::kWrite);
 
   int rc = WSASend(socket(), pending_write_->GetWASBUF(), 1, NULL, 0,
@@ -959,13 +957,13 @@
 }
 
 bool ClientSocket::IsClosed() {
-  return connected_ && closed_;
+  return connected_ && closed_ && !HasPendingRead() && !HasPendingWrite();
 }
 
 bool DatagramSocket::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
   MonitorLocker ml(monitor_);
   ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
-  ASSERT(pending_write_ != NULL);
+  ASSERT(HasPendingWrite());
   ASSERT(pending_write_->operation() == OverlappedBuffer::kSendTo);
 
   int rc = WSASendTo(socket(), pending_write_->GetWASBUF(), 1, NULL, 0, sa,
@@ -982,7 +980,7 @@
 bool DatagramSocket::IssueRecvFrom() {
   MonitorLocker ml(monitor_);
   ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
-  ASSERT(pending_read_ == NULL);
+  ASSERT(!HasPendingRead());
 
   OverlappedBuffer* buffer =
       OverlappedBuffer::AllocateRecvFromBuffer(kMaxUDPPackageLength);