[vm] Simplify implementation of native ports

This reland commit 5a32d8bc7c5569efde3aec844b7f26d33e1bd86e with a fix
for thread leak (Issue #56717): when `ThreadPool` is shutting down
asynchronously the last worker should detach itself to prevent
leaking associated low-level data structures, because no thread will
join it.

A hang in service isolate shutdown (caused by an existing bug) was fixed by commit 157a0dc7f98be66a32329d097e6fef096ea62d84.

This CL turns native ports into a thin abstraction over underlying
thread pool instead of building them as full fledged MessageHandler.

This allows to easily implement a variation of native ports which can
handle messages concurrently with the given degree of concurrency.
This type of port can be used to greatly simplify implementation of
IOService - which previously had to do its own concurrency management
on top of "single threaded" native ports. This capability is exposed
as `Dart_NewConcurrentNativePort` API.

The new implementation is in general much cleaner then the old one
with one exception: `Dart_CloseNativePort` API has unfortunate design
where underlying message handler is destroyed asynchronously and
`Dart_CloseNativePort` returns immediately without waiting for pending
tasks to complete. Implementing this on top of `ThreadPool` requires
some changes to thread pool implementation.

Issue https://github.com/dart-lang/sdk/issues/55844

Closes https://github.com/dart-lang/sdk/issues/56717

TEST=ci

Change-Id: Ic68bfb60757685afd75c80a70cdec66cc13c149b
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/385000
Reviewed-by: Martin Kustermann <kustermann@google.com>
Commit-Queue: Slava Egorov <vegorov@google.com>
diff --git a/runtime/bin/io_service.cc b/runtime/bin/io_service.cc
index b651f6d..d16dda2 100644
--- a/runtime/bin/io_service.cc
+++ b/runtime/bin/io_service.cc
@@ -55,7 +55,8 @@
 }
 
 Dart_Port IOService::GetServicePort() {
-  return Dart_NewNativePort("IOService", IOServiceCallback, true);
+  return Dart_NewConcurrentNativePort("IOService", IOServiceCallback,
+                                      /*max_concurrency=*/32);
 }
 
 void FUNCTION_NAME(IOService_NewServicePort)(Dart_NativeArguments args) {
diff --git a/runtime/bin/io_service_no_ssl.cc b/runtime/bin/io_service_no_ssl.cc
index ecf4428..7e6d83c 100644
--- a/runtime/bin/io_service_no_ssl.cc
+++ b/runtime/bin/io_service_no_ssl.cc
@@ -53,7 +53,8 @@
 }
 
 Dart_Port IOService::GetServicePort() {
-  return Dart_NewNativePort("IOService", IOServiceCallback, true);
+  return Dart_NewConcurrentNativePort("IOService", IOServiceCallback,
+                                      /*max_concurrency=*/32);
 }
 
 void FUNCTION_NAME(IOService_NewServicePort)(Dart_NativeArguments args) {
diff --git a/runtime/include/dart_api.h b/runtime/include/dart_api.h
index 75e5edb..035c909 100644
--- a/runtime/include/dart_api.h
+++ b/runtime/include/dart_api.h
@@ -57,14 +57,14 @@
 #endif
 
 #if __GNUC__
-#define DART_WARN_UNUSED_RESULT __attribute__((warn_unused_result))
-#define DART_DEPRECATED(msg) __attribute__((deprecated(msg)))
+#define DART_API_WARN_UNUSED_RESULT __attribute__((warn_unused_result))
+#define DART_API_DEPRECATED(msg) __attribute__((deprecated(msg)))
 #elif _MSC_VER
-#define DART_WARN_UNUSED_RESULT _Check_return_
-#define DART_DEPRECATED(msg) __declspec(deprecated(msg))
+#define DART_API_WARN_UNUSED_RESULT _Check_return_
+#define DART_API_DEPRECATED(msg) __declspec(deprecated(msg))
 #else
-#define DART_WARN_UNUSED_RESULT
-#define DART_DEPRECATED(msg)
+#define DART_API_WARN_UNUSED_RESULT
+#define DART_API_DEPRECATED(msg)
 #endif
 
 /*
@@ -1000,7 +1000,7 @@
  * \return NULL if initialization is successful. Returns an error message
  *   otherwise. The caller is responsible for freeing the error message.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT char* Dart_Initialize(
+DART_EXPORT DART_API_WARN_UNUSED_RESULT char* Dart_Initialize(
     Dart_InitializeParams* params);
 
 /**
@@ -1012,7 +1012,7 @@
  * NOTE: This function must not be called on a thread that was created by the VM
  * itself.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT char* Dart_Cleanup(void);
+DART_EXPORT DART_API_WARN_UNUSED_RESULT char* Dart_Cleanup(void);
 
 /**
  * Sets command line flags. Should be called before Dart_Initialize.
@@ -1025,8 +1025,9 @@
  *
  * NOTE: This call does not store references to the passed in c-strings.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT char* Dart_SetVMFlags(int argc,
-                                                          const char** argv);
+DART_EXPORT DART_API_WARN_UNUSED_RESULT char* Dart_SetVMFlags(
+    int argc,
+    const char** argv);
 
 /**
  * Returns true if the named VM flag is of boolean type, specified, and set to
@@ -1480,7 +1481,7 @@
  *
  * \return A valid handle if no error occurs during the operation.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_CreateSnapshot(uint8_t** vm_snapshot_data_buffer,
                     intptr_t* vm_snapshot_data_size,
                     uint8_t** isolate_snapshot_data_buffer,
@@ -1510,7 +1511,7 @@
  * \return NULL if successful. Returns an error message otherwise. The caller
  * is responsible for freeing the error message.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT char* Dart_IsolateMakeRunnable(
+DART_EXPORT DART_API_WARN_UNUSED_RESULT char* Dart_IsolateMakeRunnable(
     Dart_Isolate isolate);
 
 /*
@@ -1680,7 +1681,7 @@
  *
  * \return A valid handle if no error occurs during the operation.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle Dart_HandleMessage(void);
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle Dart_HandleMessage(void);
 
 /**
  * Handles any pending messages for the vm service for the current
@@ -1720,7 +1721,7 @@
  *   exception or other error occurs while processing messages, an
  *   error handle is returned.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle Dart_RunLoop(void);
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle Dart_RunLoop(void);
 
 /**
  * Lets the VM run message processing for the isolate.
@@ -1740,7 +1741,7 @@
  *   of its message loop. If not successful the caller retains ownership of the
  *   isolate.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT bool Dart_RunLoopAsync(
+DART_EXPORT DART_API_WARN_UNUSED_RESULT bool Dart_RunLoopAsync(
     bool errors_are_fatal,
     Dart_Port on_error_port,
     Dart_Port on_exit_port,
@@ -2785,7 +2786,7 @@
  *   then the new object. If an error occurs during execution, then an
  *   error handle is returned.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_New(Dart_Handle type,
          Dart_Handle constructor_name,
          int number_of_arguments,
@@ -2799,7 +2800,8 @@
  * \return The new object. If an error occurs during execution, then an
  *   error handle is returned.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle Dart_Allocate(Dart_Handle type);
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
+Dart_Allocate(Dart_Handle type);
 
 /**
  * Allocate a new object without invoking a constructor, and sets specified
@@ -2840,7 +2842,7 @@
  *   successfully, then the return value is returned. If an error
  *   occurs during execution, then an error handle is returned.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_Invoke(Dart_Handle target,
             Dart_Handle name,
             int number_of_arguments,
@@ -2856,7 +2858,7 @@
  *   invoking the closure is returned. If an error occurs during
  *   execution, then an error handle is returned.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_InvokeClosure(Dart_Handle closure,
                    int number_of_arguments,
                    Dart_Handle* arguments);
@@ -2881,7 +2883,7 @@
  *   successfully, then the object is returned. If an error
  *   occurs during execution, then an error handle is returned.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_InvokeConstructor(Dart_Handle object,
                        Dart_Handle name,
                        int number_of_arguments,
@@ -2907,7 +2909,7 @@
  * \return If no error occurs, then the value of the field is
  *   returned. Otherwise an error handle is returned.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_GetField(Dart_Handle container, Dart_Handle name);
 
 /**
@@ -2930,7 +2932,7 @@
  *
  * \return A valid handle if no error occurs.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_SetField(Dart_Handle container, Dart_Handle name, Dart_Handle value);
 
 /*
@@ -3505,7 +3507,7 @@
  * Requires the current isolate to be the same current isolate during the
  * invocation of the Dart_DeferredLoadHandler.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_DeferredLoadComplete(intptr_t loading_unit_id,
                           const uint8_t* snapshot_data,
                           const uint8_t* snapshot_instructions);
@@ -3522,7 +3524,7 @@
  * Requires the current isolate to be the same current isolate during the
  * invocation of the Dart_DeferredLoadHandler.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_DeferredLoadCompleteError(intptr_t loading_unit_id,
                                const char* error_message,
                                bool transient);
@@ -3538,7 +3540,7 @@
  *
  * \return A handle to the root library, or an error.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_LoadScriptFromKernel(const uint8_t* kernel_buffer, intptr_t kernel_size);
 
 /**
@@ -3706,10 +3708,10 @@
  *
  * \return A handle to the main library of the compilation unit, or an error.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_LoadLibraryFromKernel(const uint8_t* kernel_buffer,
                            intptr_t kernel_buffer_size);
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_LoadLibrary(Dart_Handle kernel_buffer);
 
 /**
@@ -3725,7 +3727,7 @@
  * \return Success if all classes have been finalized and deferred library
  *   futures are completed. Otherwise, returns an error.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_FinalizeLoading(bool complete_futures);
 
 /*
@@ -3994,12 +3996,12 @@
  *
  *  \return A valid handle if no error occurs during the operation.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_CreateAppAOTSnapshotAsAssembly(Dart_StreamingWriteCallback callback,
                                     void* callback_data,
                                     bool stripped,
                                     void* debug_callback_data);
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_CreateAppAOTSnapshotAsAssemblies(
     Dart_CreateLoadingUnitCallback next_callback,
     void* next_callback_data,
@@ -4034,12 +4036,12 @@
  *
  * \return A valid handle if no error occurs during the operation.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_CreateAppAOTSnapshotAsElf(Dart_StreamingWriteCallback callback,
                                void* callback_data,
                                bool stripped,
                                void* debug_callback_data);
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_CreateAppAOTSnapshotAsElfs(Dart_CreateLoadingUnitCallback next_callback,
                                 void* next_callback_data,
                                 bool stripped,
@@ -4052,7 +4054,7 @@
  *  not strip DWARF information from the generated assembly or allow for
  *  separate debug information.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_CreateVMAOTSnapshotAsAssembly(Dart_StreamingWriteCallback callback,
                                    void* callback_data);
 
@@ -4063,7 +4065,7 @@
  *
  * \return A valid handle if no error occurs during the operation.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle Dart_SortClasses(void);
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle Dart_SortClasses(void);
 
 /**
  *  Creates a snapshot that caches compiled code and type feedback for faster
@@ -4086,7 +4088,7 @@
  *
  * \return A valid handle if no error occurs during the operation.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_CreateAppJITSnapshotAsBlobs(uint8_t** isolate_snapshot_data_buffer,
                                  intptr_t* isolate_snapshot_data_size,
                                  uint8_t** isolate_snapshot_instructions_buffer,
@@ -4101,7 +4103,7 @@
  * \return Returns an error handler if the VM was built in a mode that does not
  * support obfuscation.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
 Dart_GetObfuscationMap(uint8_t** buffer, intptr_t* buffer_length);
 
 /**
diff --git a/runtime/include/dart_embedder_api.h b/runtime/include/dart_embedder_api.h
index bc03773..8ccf9e5 100644
--- a/runtime/include/dart_embedder_api.h
+++ b/runtime/include/dart_embedder_api.h
@@ -18,7 +18,7 @@
 //
 // Returns true on success and false otherwise, in which case error would
 // contain error message.
-DART_WARN_UNUSED_RESULT bool InitOnce(char** error);
+DART_API_WARN_UNUSED_RESULT bool InitOnce(char** error);
 
 // Cleans up all subsystems of the embedder.
 //
@@ -51,7 +51,7 @@
 // script_uri.
 // The isolate is created from the given snapshot (might be kernel data or
 // app-jit snapshot).
-DART_WARN_UNUSED_RESULT Dart_Isolate
+DART_API_WARN_UNUSED_RESULT Dart_Isolate
 CreateKernelServiceIsolate(const IsolateCreationData& data,
                            const uint8_t* buffer,
                            intptr_t buffer_size,
@@ -81,7 +81,7 @@
 // is expected to contain all necessary 'vm-service' libraries.
 // This method should be used when VM invokes isolate creation callback with
 // DART_VM_SERVICE_ISOLATE_NAME as script_uri.
-DART_WARN_UNUSED_RESULT Dart_Isolate
+DART_API_WARN_UNUSED_RESULT Dart_Isolate
 CreateVmServiceIsolate(const IsolateCreationData& data,
                        const VmServiceConfiguration& config,
                        const uint8_t* isolate_data,
@@ -92,7 +92,7 @@
 // is expected to contain all necessary 'vm-service' libraries.
 // This method should be used when VM invokes isolate creation callback with
 // DART_VM_SERVICE_ISOLATE_NAME as script_uri.
-DART_WARN_UNUSED_RESULT Dart_Isolate
+DART_API_WARN_UNUSED_RESULT Dart_Isolate
 CreateVmServiceIsolateFromKernel(const IsolateCreationData& data,
                                  const VmServiceConfiguration& config,
                                  const uint8_t* kernel_buffer,
diff --git a/runtime/include/dart_native_api.h b/runtime/include/dart_native_api.h
index 79194e0..dbdaeb3 100644
--- a/runtime/include/dart_native_api.h
+++ b/runtime/include/dart_native_api.h
@@ -166,7 +166,23 @@
 DART_EXPORT Dart_Port Dart_NewNativePort(const char* name,
                                          Dart_NativeMessageHandler handler,
                                          bool handle_concurrently);
-/* TODO(turnidge): Currently handle_concurrently is ignored. */
+
+/**
+ * Creates a new native port.  When messages are received on this
+ * native port, then they will be dispatched to the provided native
+ * message handler using up to |max_concurrency| concurrent threads.
+ *
+ * \param name The name of this port in debugging messages.
+ * \param handler The C handler to run when messages arrive on the port.
+ * \param max_concurrency Size of the thread pool used by the native port.
+ *
+ * \return If successful, returns the port id for the native port.  In
+ *   case of error, returns ILLEGAL_PORT.
+ */
+DART_EXPORT Dart_Port
+Dart_NewConcurrentNativePort(const char* name,
+                             Dart_NativeMessageHandler handler,
+                             intptr_t max_concurrency);
 
 /**
  * Closes the native port with the given id.
@@ -191,12 +207,13 @@
  *
  * TODO(turnidge): Document.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle Dart_CompileAll(void);
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle Dart_CompileAll(void);
 
 /**
  * Finalizes all classes.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle Dart_FinalizeAllClasses(void);
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
+Dart_FinalizeAllClasses(void);
 
 /*  This function is intentionally undocumented.
  *
diff --git a/runtime/include/dart_tools_api.h b/runtime/include/dart_tools_api.h
index c118bc4..367e1eb 100644
--- a/runtime/include/dart_tools_api.h
+++ b/runtime/include/dart_tools_api.h
@@ -573,7 +573,7 @@
  * \return The UserTag's label. NULL if the user_tag is invalid. The caller is
  *   responsible for freeing the returned label.
  */
-DART_EXPORT DART_WARN_UNUSED_RESULT char* Dart_GetUserTagLabel(
+DART_EXPORT DART_API_WARN_UNUSED_RESULT char* Dart_GetUserTagLabel(
     Dart_Handle user_tag);
 
 /*
diff --git a/runtime/platform/globals.h b/runtime/platform/globals.h
index 29bbb96..34bda32 100644
--- a/runtime/platform/globals.h
+++ b/runtime/platform/globals.h
@@ -797,6 +797,14 @@
 #error Target operating system detection failed.
 #endif
 
+#if __GNUC__
+#define DART_WARN_UNUSED_RESULT __attribute__((warn_unused_result))
+#elif _MSC_VER
+#define DART_WARN_UNUSED_RESULT _Check_return_
+#else
+#define DART_WARN_UNUSED_RESULT
+#endif
+
 }  // namespace dart
 
 #endif  // RUNTIME_PLATFORM_GLOBALS_H_
diff --git a/runtime/tests/vm/dart/exported_symbols_test.dart b/runtime/tests/vm/dart/exported_symbols_test.dart
index c2cddbb..290c1e4 100644
--- a/runtime/tests/vm/dart/exported_symbols_test.dart
+++ b/runtime/tests/vm/dart/exported_symbols_test.dart
@@ -247,6 +247,7 @@
     "Dart_NewListOfType",
     "Dart_NewListOfTypeFilled",
     "Dart_NewNativePort",
+    "Dart_NewConcurrentNativePort",
     "Dart_NewPersistentHandle",
     "Dart_NewSendPort",
     "Dart_NewSendPortEx",
diff --git a/runtime/vm/dart.cc b/runtime/vm/dart.cc
index 3c4502b..a71313d 100644
--- a/runtime/vm/dart.cc
+++ b/runtime/vm/dart.cc
@@ -702,6 +702,7 @@
                  UptimeMillis());
   }
   DartInitializationState::SetUnInitialized();
+  PortMap::Shutdown();
   thread_pool_->Shutdown();
   delete thread_pool_;
   thread_pool_ = nullptr;
diff --git a/runtime/vm/isolate.cc b/runtime/vm/isolate.cc
index 0f04c1a..4be42f8 100644
--- a/runtime/vm/isolate.cc
+++ b/runtime/vm/isolate.cc
@@ -1069,25 +1069,23 @@
   explicit IsolateMessageHandler(Isolate* isolate);
   ~IsolateMessageHandler();
 
-  const char* name() const;
-  void MessageNotify(Message::Priority priority);
-  MessageStatus HandleMessage(std::unique_ptr<Message> message);
+  const char* name() const override;
+  void MessageNotify(Message::Priority priority) override;
+  MessageStatus HandleMessage(std::unique_ptr<Message> message) override;
 #ifndef PRODUCT
-  void NotifyPauseOnStart();
-  void NotifyPauseOnExit();
+  void NotifyPauseOnStart() override;
+  void NotifyPauseOnExit() override;
 #endif  // !PRODUCT
 
 #if defined(DEBUG)
   // Check that it is safe to access this handler.
-  void CheckAccess() const;
+  void CheckAccess() const override;
 #endif
-  bool IsCurrentIsolate() const;
-  virtual Isolate* isolate() const { return isolate_; }
-  virtual IsolateGroup* isolate_group() const { return isolate_->group(); }
 
-  virtual bool KeepAliveLocked() {
-    // If the message handler was asked to shutdown we shut down.
-    if (!MessageHandler::KeepAliveLocked()) return false;
+  Isolate* isolate() const override { return isolate_; }
+  IsolateGroup* isolate_group() const { return isolate_->group(); }
+
+  bool KeepAliveLocked() override {
     // Otherwise we only stay alive as long as there's active receive ports, or
     // there are FFI callbacks keeping the isolate alive.
     return isolate_->HasLivePorts() || isolate_->HasOpenNativeCallables();
@@ -1350,7 +1348,9 @@
 
 MessageHandler::MessageStatus IsolateMessageHandler::HandleMessage(
     std::unique_ptr<Message> message) {
-  ASSERT(IsCurrentIsolate());
+#ifdef DEBUG
+  CheckAccess();
+#endif
   Thread* thread = Thread::Current();
   StackZone stack_zone(thread);
   Zone* zone = stack_zone.GetZone();
@@ -1498,14 +1498,10 @@
 
 #if defined(DEBUG)
 void IsolateMessageHandler::CheckAccess() const {
-  ASSERT(IsCurrentIsolate());
+  ASSERT(isolate() == Isolate::Current());
 }
 #endif
 
-bool IsolateMessageHandler::IsCurrentIsolate() const {
-  return (I == Isolate::Current());
-}
-
 static MessageHandler::MessageStatus StoreError(Thread* thread,
                                                 const Error& error) {
   thread->set_sticky_error(error);
diff --git a/runtime/vm/message_handler.cc b/runtime/vm/message_handler.cc
index 10e0d45..9be103c 100644
--- a/runtime/vm/message_handler.cc
+++ b/runtime/vm/message_handler.cc
@@ -67,7 +67,6 @@
       paused_timestamp_(-1),
 #endif
       task_running_(false),
-      delete_me_(false),
       pool_(nullptr),
       start_callback_(nullptr),
       end_callback_(nullptr),
@@ -88,12 +87,6 @@
   return "<unnamed>";
 }
 
-#if defined(DEBUG)
-void MessageHandler::CheckAccess() const {
-  // By default there is no checking.
-}
-#endif
-
 void MessageHandler::MessageNotify(Message::Priority priority) {
   // By default, there is no custom message notification.
 }
@@ -110,7 +103,6 @@
         name());
   }
   ASSERT(pool_ == nullptr);
-  ASSERT(!delete_me_);
   pool_ = pool;
   start_callback_ = start_callback;
   end_callback_ = end_callback;
@@ -165,7 +157,6 @@
     }
 
     if (pool_ != nullptr && !task_running_) {
-      ASSERT(!delete_me_);
       task_running_ = true;
       const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
       ASSERT(launched_successfully);
@@ -292,7 +283,6 @@
   // assigned to a thread pool.
   MonitorLocker ml(&monitor_);
   ASSERT(pool_ == nullptr);
-  ASSERT(!delete_me_);
 #if defined(DEBUG)
   CheckAccess();
 #endif
@@ -303,7 +293,6 @@
     int64_t timeout_millis) {
   MonitorLocker ml(&monitor_, /*no_safepoint_scope=*/false);
   ASSERT(task_running_);
-  ASSERT(!delete_me_);
 #if defined(DEBUG)
   CheckAccess();
 #endif
@@ -317,7 +306,6 @@
       wr = ml.Wait(timeout_millis);
     }
     ASSERT(task_running_);
-    ASSERT(!delete_me_);
     if (wr == Monitor::kTimedOut) {
       break;
     }
@@ -340,7 +328,6 @@
     return kOK;
   }
   MonitorLocker ml(&monitor_);
-  ASSERT(!delete_me_);
 #if defined(DEBUG)
   CheckAccess();
 #endif
@@ -391,7 +378,6 @@
   ASSERT(Isolate::Current() == nullptr);
   MessageStatus status = kOK;
   bool run_end_callback = false;
-  bool delete_me = false;
   EndCallback end_callback = nullptr;
   CallbackData callback_data = 0;
   {
@@ -504,7 +490,6 @@
       end_callback = end_callback_;
       callback_data = callback_data_;
       run_end_callback = end_callback_ != nullptr;
-      delete_me = delete_me_;
     }
 
     // Clear task_running_ last.  This allows other tasks to potentially start
@@ -516,20 +501,14 @@
   // The handler may have been deleted by another thread here if it is a native
   // message handler.
 
-  // Message handlers either use delete_me or end_callback but not both.
-  ASSERT(!delete_me || !run_end_callback);
-
   if (run_end_callback) {
     ASSERT(end_callback != nullptr);
     end_callback(callback_data);
     // The handler may have been deleted after this point.
   }
-  if (delete_me) {
-    delete this;
-  }
 }
 
-void MessageHandler::ClosePort(Dart_Port port) {
+void MessageHandler::OnPortClosed(Dart_Port port) {
   if (FLAG_trace_isolates) {
     MonitorLocker ml(&monitor_);
     OS::PrintErr(
@@ -540,7 +519,7 @@
   }
 }
 
-void MessageHandler::CloseAllPorts() {
+void MessageHandler::OnAllPortsClosed() {
   MonitorLocker ml(&monitor_);
   if (FLAG_trace_isolates) {
     OS::PrintErr(
@@ -552,20 +531,6 @@
   oob_queue_->Clear();
 }
 
-void MessageHandler::RequestDeletion() {
-  {
-    MonitorLocker ml(&monitor_);
-    if (task_running_) {
-      // This message handler currently has a task running on the thread pool.
-      delete_me_ = true;
-      return;
-    }
-  }
-
-  // This message handler has no current task.  Delete it.
-  delete this;
-}
-
 #if !defined(PRODUCT)
 void MessageHandler::DebugDump() {
   PortMap::DebugDumpForMessageHandler(this);
diff --git a/runtime/vm/message_handler.h b/runtime/vm/message_handler.h
index 229ba93..b0f5061 100644
--- a/runtime/vm/message_handler.h
+++ b/runtime/vm/message_handler.h
@@ -11,13 +11,14 @@
 #include "vm/lockers.h"
 #include "vm/message.h"
 #include "vm/os_thread.h"
+#include "vm/port.h"
 #include "vm/port_set.h"
 #include "vm/thread_pool.h"
 
 namespace dart {
 
 // A MessageHandler is an entity capable of accepting messages.
-class MessageHandler {
+class MessageHandler : public PortHandler {
  protected:
   MessageHandler();
 
@@ -31,9 +32,6 @@
 
   virtual ~MessageHandler();
 
-  // Allow subclasses to provide a handler name.
-  virtual const char* name() const;
-
   typedef uword CallbackData;
   typedef MessageStatus (*StartCallback)(CallbackData data);
   typedef void (*EndCallback)(CallbackData data);
@@ -85,15 +83,7 @@
   bool HasMessages();
 
   // Whether to keep this message handler alive or whether it should shutdown.
-  virtual bool KeepAliveLocked() {
-    // By default we keep alive until the message handler was asked to shutdown
-    // via [RequestDeletion].
-    return !delete_me_;
-  }
-
-  // Requests deletion of this message handler when the next task
-  // completes.
-  void RequestDeletion();
+  virtual bool KeepAliveLocked() { return true; }
 
   bool paused() const { return paused_ > 0; }
 
@@ -161,39 +151,7 @@
     friend class MessageHandler;
   };
 
-#if defined(DEBUG)
-  // Check that it is safe to access this message handler.
-  //
-  // For example, if this MessageHandler is an isolate, then it is
-  // only safe to access it when the MessageHandler is the current
-  // isolate.
-  virtual void CheckAccess() const;
-#endif
-
  protected:
-  // ------------ START PortMap API ------------
-  // These functions should only be called from the PortMap.
-
-  // Does this message handler correspond to the current isolate?
-  virtual bool IsCurrentIsolate() const { return false; }
-
-  // Return Isolate to which this message handler corresponds to.
-  virtual Isolate* isolate() const { return nullptr; }
-
-  // Posts a message on this handler's message queue.
-  // If before_events is true, then the message is enqueued before any pending
-  // events, but after any pending isolate library events.
-  void PostMessage(std::unique_ptr<Message> message,
-                   bool before_events = false);
-
-  // Notifies this handler that a port is being closed.
-  void ClosePort(Dart_Port port);
-
-  // Notifies this handler that all ports are being closed.
-  void CloseAllPorts();
-
-  // ------------ END PortMap API ------------
-
   // Custom message notification.  Optionally provided by subclass.
   virtual void MessageNotify(Message::Priority priority);
 
@@ -208,6 +166,12 @@
   // TODO(iposva): Set a local field before entering MessageHandler methods.
   Thread* thread() const { return Thread::Current(); }
 
+  // Posts a message on this handler's message queue.
+  // If before_events is true, then the message is enqueued before any pending
+  // events, but after any pending isolate library events.
+  void PostMessage(std::unique_ptr<Message> message,
+                   bool before_events = false) override;
+
  private:
   template <typename GCVisitorType>
   friend void MournFinalizerEntry(GCVisitorType*, FinalizerEntryPtr);
@@ -215,7 +179,28 @@
   friend class MessageHandlerTestPeer;
   friend class MessageHandlerTask;
 
-  struct PortSetEntry : public PortSet<PortSetEntry>::Entry {};
+  // ------------ START PortMap API ------------
+  // These functions should only be called from the PortMap.
+  // Implementaion of PortHandler API.
+
+  const char* name() const override;
+
+  void OnPortClosed(Dart_Port port) override;
+
+  void Shutdown() override {
+    // Nothing to do.
+  }
+
+  // Return Isolate to which this message handler corresponds to.
+  Isolate* isolate() const override { return nullptr; }
+
+  PortSet<PortSetEntry>* ports(PortMap::Locker& locker) override {
+    return &ports_;
+  }
+
+  // Notifies this handler that all ports are being closed.
+  void OnAllPortsClosed();
+  // ------------ END PortMap API ------------
 
   // Called by MessageHandlerTask to process our task queue.
   void TaskCallback();
@@ -254,8 +239,11 @@
   // thread.
   bool oob_message_handling_allowed_;
   bool paused_for_messages_;
-  PortSet<PortSetEntry>
-      ports_;  // Only accessed by [PortMap], protected by [PortMap]s lock.
+
+  // Only accessed by [PortMap], protected by [PortMap]s lock. See ports()
+  // getter.
+  PortSet<PortSetEntry> ports_;
+
   intptr_t paused_;  // The number of pause messages received.
 #if !defined(PRODUCT)
   bool should_pause_on_start_;
@@ -268,7 +256,6 @@
   int64_t paused_timestamp_;
 #endif
   bool task_running_;
-  bool delete_me_;
   ThreadPool* pool_;
   StartCallback start_callback_;
   EndCallback end_callback_;
diff --git a/runtime/vm/message_handler_test.cc b/runtime/vm/message_handler_test.cc
index a6ebe56..7369dbd 100644
--- a/runtime/vm/message_handler_test.cc
+++ b/runtime/vm/message_handler_test.cc
@@ -18,8 +18,8 @@
   void PostMessage(std::unique_ptr<Message> message) {
     handler_->PostMessage(std::move(message));
   }
-  void ClosePort(Dart_Port port) { handler_->ClosePort(port); }
-  void CloseAllPorts() { handler_->CloseAllPorts(); }
+  void OnPortClosed(Dart_Port port) { handler_->OnPortClosed(port); }
+  void OnAllPortsClosed() { handler_->OnAllPortsClosed(); }
 
   MessageQueue* queue() const { return handler_->queue_; }
   MessageQueue* oob_queue() const { return handler_->oob_queue_; }
@@ -193,7 +193,7 @@
   }
 
   // Delete all pending messages.
-  handler_peer.CloseAllPorts();
+  handler_peer.OnAllPortsClosed();
 }
 
 VM_UNIT_TEST_CASE(MessageHandler_ClosePort) {
@@ -207,20 +207,20 @@
   Message* raw_message2 = message.get();
   handler_peer.PostMessage(std::move(message));
 
-  handler_peer.ClosePort(1);
+  handler_peer.OnPortClosed(1);
 
   // Closing the port does not drop the messages from the queue.
   EXPECT(raw_message1 == handler_peer.queue()->Dequeue().get());
   EXPECT(raw_message2 == handler_peer.queue()->Dequeue().get());
 }
 
-VM_UNIT_TEST_CASE(MessageHandler_CloseAllPorts) {
+VM_UNIT_TEST_CASE(MessageHandler_OnAllPortsClosed) {
   TestMessageHandler handler;
   MessageHandlerTestPeer handler_peer(&handler);
   handler_peer.PostMessage(BlankMessage(1, Message::kNormalPriority));
   handler_peer.PostMessage(BlankMessage(2, Message::kNormalPriority));
 
-  handler_peer.CloseAllPorts();
+  handler_peer.OnAllPortsClosed();
 
   // All messages are dropped from the queue.
   EXPECT(nullptr == handler_peer.queue()->Dequeue());
@@ -269,7 +269,7 @@
   Dart_Port* ports = handler.port_buffer();
   EXPECT_EQ(port2, ports[0]);  // oob_message1, error
   EXPECT_EQ(port3, ports[1]);  // oob_message2, ok
-  handler_peer.CloseAllPorts();
+  handler_peer.OnAllPortsClosed();
 }
 
 VM_UNIT_TEST_CASE(MessageHandler_HandleNextMessage_Shutdown) {
@@ -302,7 +302,7 @@
     MessageHandler::AcquiredQueues aq(&handler);
     EXPECT(aq.oob_queue()->Length() == 0);
   }
-  handler_peer.CloseAllPorts();
+  handler_peer.OnAllPortsClosed();
 }
 
 VM_UNIT_TEST_CASE(MessageHandler_HandleOOBMessages) {
@@ -323,7 +323,7 @@
   Dart_Port* ports = handler.port_buffer();
   EXPECT_EQ(port3, ports[0]);
   EXPECT_EQ(port4, ports[1]);
-  handler_peer.CloseAllPorts();
+  handler_peer.OnAllPortsClosed();
 }
 
 struct ThreadStartInfo {
diff --git a/runtime/vm/native_api_impl.cc b/runtime/vm/native_api_impl.cc
index 2e7602a..f42bac4 100644
--- a/runtime/vm/native_api_impl.cc
+++ b/runtime/vm/native_api_impl.cc
@@ -74,6 +74,13 @@
 DART_EXPORT Dart_Port Dart_NewNativePort(const char* name,
                                          Dart_NativeMessageHandler handler,
                                          bool handle_concurrently) {
+  return Dart_NewConcurrentNativePort(name, handler, /*max_concurrency=*/1);
+}
+
+DART_EXPORT Dart_Port
+Dart_NewConcurrentNativePort(const char* name,
+                             Dart_NativeMessageHandler handler,
+                             intptr_t max_concurrency) {
   if (name == nullptr) {
     name = "<UnnamedNativePort>";
   }
@@ -88,15 +95,9 @@
   // Start the native port without a current isolate.
   IsolateLeaveScope saver(Isolate::Current());
 
-  NativeMessageHandler* nmh = new NativeMessageHandler(name, handler);
+  NativeMessageHandler* nmh =
+      new NativeMessageHandler(name, handler, max_concurrency);
   Dart_Port port_id = PortMap::CreatePort(nmh);
-  if (port_id != ILLEGAL_PORT) {
-    if (!nmh->Run(Dart::thread_pool(), nullptr, nullptr, 0)) {
-      PortMap::ClosePort(port_id);
-      nmh->RequestDeletion();
-      port_id = ILLEGAL_PORT;
-    }
-  }
   Dart::ResetActiveApiCall();
   return port_id;
 }
@@ -105,10 +106,11 @@
   // Close the native port without a current isolate.
   IsolateLeaveScope saver(Isolate::Current());
 
-  MessageHandler* handler = nullptr;
+  PortHandler* handler = nullptr;
   const bool was_closed = PortMap::ClosePort(native_port_id, &handler);
   if (was_closed) {
-    handler->RequestDeletion();
+    NativeMessageHandler::RequestDeletion(
+        static_cast<NativeMessageHandler*>(handler));
   }
   return was_closed;
 }
diff --git a/runtime/vm/native_message_handler.cc b/runtime/vm/native_message_handler.cc
index a4906c5..a0160c3 100644
--- a/runtime/vm/native_message_handler.cc
+++ b/runtime/vm/native_message_handler.cc
@@ -5,6 +5,7 @@
 #include "vm/native_message_handler.h"
 
 #include <memory>
+#include <utility>
 
 #include "vm/dart_api_message.h"
 #include "vm/isolate.h"
@@ -15,12 +16,11 @@
 namespace dart {
 
 NativeMessageHandler::NativeMessageHandler(const char* name,
-                                           Dart_NativeMessageHandler func)
-    : name_(Utils::StrDup(name)), func_(func) {}
+                                           Dart_NativeMessageHandler func,
+                                           intptr_t max_concurrency)
+    : name_(Utils::StrDup(name)), func_(func), pool_(max_concurrency) {}
 
-NativeMessageHandler::~NativeMessageHandler() {
-  free(name_);
-}
+NativeMessageHandler::~NativeMessageHandler() {}
 
 #if defined(DEBUG)
 void NativeMessageHandler::CheckAccess() const {
@@ -28,19 +28,44 @@
 }
 #endif
 
-MessageHandler::MessageStatus NativeMessageHandler::HandleMessage(
-    std::unique_ptr<Message> message) {
+namespace {
+class HandleMessage : public ThreadPool::Task {
+ public:
+  HandleMessage(Dart_NativeMessageHandler handler,
+                std::unique_ptr<Message> message)
+      : handler_(handler), message_(std::move(message)) {
+    ASSERT(handler != nullptr);
+  }
+
+  virtual void Run() {
+    ApiNativeScope scope;
+    Dart_CObject* object = ReadApiMessage(scope.zone(), message_.get());
+    handler_(message_->dest_port(), object);
+  }
+
+ private:
+  Dart_NativeMessageHandler handler_;
+  std::unique_ptr<Message> message_;
+
+  DISALLOW_COPY_AND_ASSIGN(HandleMessage);
+};
+}  // namespace
+
+void NativeMessageHandler::PostMessage(std::unique_ptr<Message> message,
+                                       bool before_events /* = false */) {
   if (message->IsOOB()) {
-    // We currently do not use OOB messages for native ports.
     UNREACHABLE();
   }
-  // We create a native scope for handling the message.
-  // All allocation of objects for decoding the message is done in the
-  // zone associated with this scope.
-  ApiNativeScope scope;
-  Dart_CObject* object = ReadApiMessage(scope.zone(), message.get());
-  (*func())(message->dest_port(), object);
-  return kOK;
+
+  pool_.Run<HandleMessage>(func_, std::move(message));
+}
+
+void NativeMessageHandler::RequestDeletion(NativeMessageHandler* handler) {
+  ThreadPool::RequestShutdown(&handler->pool_, [handler]() { delete handler; });
+}
+
+void NativeMessageHandler::Shutdown() {
+  pool_.Shutdown();
 }
 
 }  // namespace dart
diff --git a/runtime/vm/native_message_handler.h b/runtime/vm/native_message_handler.h
index f2cd91a..4c27b1f 100644
--- a/runtime/vm/native_message_handler.h
+++ b/runtime/vm/native_message_handler.h
@@ -14,25 +14,54 @@
 namespace dart {
 
 // A NativeMessageHandler accepts messages and dispatches them to
-// native C handlers.
-class NativeMessageHandler : public MessageHandler {
+// native C handlers on worker threads. It will spawn up to
+// |max_concurrency| worker threads which will handle incomming messages
+// concurrently.
+class NativeMessageHandler final : public PortHandler {
  public:
-  NativeMessageHandler(const char* name, Dart_NativeMessageHandler func);
-  ~NativeMessageHandler();
+  NativeMessageHandler(const char* name,
+                       Dart_NativeMessageHandler func,
+                       intptr_t max_concurrency);
 
-  const char* name() const { return name_; }
+  ~NativeMessageHandler() override;
+
+  const char* name() const override { return name_.get(); }
   Dart_NativeMessageHandler func() const { return func_; }
 
-  MessageStatus HandleMessage(std::unique_ptr<Message> message);
-
 #if defined(DEBUG)
   // Check that it is safe to access this handler.
-  void CheckAccess() const;
+  void CheckAccess() const override;
 #endif
 
+  void OnPortClosed(Dart_Port port) override {}
+
+  Isolate* isolate() const override { return nullptr; }
+
+  // Posts a message on this handler's message queue.
+  // If before_events is true, then the message is enqueued before any pending
+  // events, but after any pending isolate library events.
+  void PostMessage(std::unique_ptr<Message> message,
+                   bool before_events = false) override;
+
+  // Request deletion of the given handler once it is down with the currently
+  // running Dart_NativeMessageHandler callbacks. No new callbacks will be
+  // scheduled after this call.
+  //
+  // Note: |handler| might be deleted synchronously if no callback is running,
+  // or it can be deleted later on a worker thread.
+  static void RequestDeletion(NativeMessageHandler* handler);
+
+  void Shutdown() override;
+
  private:
-  char* name_;
-  Dart_NativeMessageHandler func_;
+  PortSet<PortSetEntry>* ports(PortMap::Locker& locker) override {
+    return nullptr;
+  }
+
+  CStringUniquePtr name_;
+  const Dart_NativeMessageHandler func_;
+
+  ThreadPool pool_;
 };
 
 }  // namespace dart
diff --git a/runtime/vm/os_thread.cc b/runtime/vm/os_thread.cc
index d020c8d..d53ce55 100644
--- a/runtime/vm/os_thread.cc
+++ b/runtime/vm/os_thread.cc
@@ -240,6 +240,10 @@
   return false;
 }
 
+bool OSThread::CanCreateOSThreads() {
+  return creation_enabled_;
+}
+
 void OSThread::DisableOSThreadCreation() {
   MutexLocker ml(thread_list_lock_);
   creation_enabled_ = false;
diff --git a/runtime/vm/os_thread.h b/runtime/vm/os_thread.h
index 0fc7170..4f2aff3 100644
--- a/runtime/vm/os_thread.h
+++ b/runtime/vm/os_thread.h
@@ -200,6 +200,7 @@
   static void SetThreadLocal(ThreadLocalKey key, uword value);
   static intptr_t GetMaxStackSize();
   static void Join(ThreadJoinId id);
+  static void Detach(ThreadJoinId id);
   static intptr_t ThreadIdToIntPtr(ThreadId id);
   static ThreadId ThreadIdFromIntPtr(intptr_t id);
 
@@ -213,6 +214,7 @@
   static bool IsThreadInList(ThreadId id);
 
   static void DisableOSThreadCreation();
+  static bool CanCreateOSThreads();
   static void EnableOSThreadCreation();
 
   static constexpr intptr_t kStackSizeBufferMax = (16 * KB * kWordSize);
diff --git a/runtime/vm/os_thread_absl.cc b/runtime/vm/os_thread_absl.cc
index e951b1d..ea2b042 100644
--- a/runtime/vm/os_thread_absl.cc
+++ b/runtime/vm/os_thread_absl.cc
@@ -216,6 +216,11 @@
   ASSERT(result == 0);
 }
 
+void OSThread::Detach(ThreadJoinId id) {
+  int result = pthread_detach(id);
+  VALIDATE_PTHREAD_RESULT(result);
+}
+
 intptr_t OSThread::ThreadIdToIntPtr(ThreadId id) {
   COMPILE_ASSERT(sizeof(id) <= sizeof(intptr_t));
 #if defined(DART_HOST_OS_ANDROID) || defined(DART_HOST_OS_LINUX)
diff --git a/runtime/vm/os_thread_android.cc b/runtime/vm/os_thread_android.cc
index 495eef6..d1e4b70 100644
--- a/runtime/vm/os_thread_android.cc
+++ b/runtime/vm/os_thread_android.cc
@@ -182,6 +182,11 @@
   VALIDATE_PTHREAD_RESULT(result);
 }
 
+void OSThread::Detach(ThreadJoinId id) {
+  int result = pthread_detach(id);
+  VALIDATE_PTHREAD_RESULT(result);
+}
+
 intptr_t OSThread::ThreadIdToIntPtr(ThreadId id) {
   COMPILE_ASSERT(sizeof(id) <= sizeof(intptr_t));
   return static_cast<intptr_t>(id);
diff --git a/runtime/vm/os_thread_fuchsia.cc b/runtime/vm/os_thread_fuchsia.cc
index 9d335c5..c1594dc7 100644
--- a/runtime/vm/os_thread_fuchsia.cc
+++ b/runtime/vm/os_thread_fuchsia.cc
@@ -152,6 +152,11 @@
   VALIDATE_PTHREAD_RESULT(result);
 }
 
+void OSThread::Detach(ThreadJoinId id) {
+  int result = pthread_detach(id);
+  VALIDATE_PTHREAD_RESULT(result);
+}
+
 intptr_t OSThread::ThreadIdToIntPtr(ThreadId id) {
   COMPILE_ASSERT(sizeof(id) <= sizeof(intptr_t));
   return static_cast<intptr_t>(id);
diff --git a/runtime/vm/os_thread_linux.cc b/runtime/vm/os_thread_linux.cc
index 793a6c3..e0189cc 100644
--- a/runtime/vm/os_thread_linux.cc
+++ b/runtime/vm/os_thread_linux.cc
@@ -181,6 +181,11 @@
   VALIDATE_PTHREAD_RESULT(result);
 }
 
+void OSThread::Detach(ThreadJoinId id) {
+  int result = pthread_detach(id);
+  VALIDATE_PTHREAD_RESULT(result);
+}
+
 intptr_t OSThread::ThreadIdToIntPtr(ThreadId id) {
   COMPILE_ASSERT(sizeof(id) <= sizeof(intptr_t));
   return static_cast<intptr_t>(id);
diff --git a/runtime/vm/os_thread_macos.cc b/runtime/vm/os_thread_macos.cc
index 9ba11a8..8b887ba 100644
--- a/runtime/vm/os_thread_macos.cc
+++ b/runtime/vm/os_thread_macos.cc
@@ -178,6 +178,11 @@
   VALIDATE_PTHREAD_RESULT(result);
 }
 
+void OSThread::Detach(ThreadJoinId id) {
+  int result = pthread_detach(id);
+  VALIDATE_PTHREAD_RESULT(result);
+}
+
 intptr_t OSThread::ThreadIdToIntPtr(ThreadId id) {
   COMPILE_ASSERT(sizeof(id) <= sizeof(intptr_t));
   return reinterpret_cast<intptr_t>(id);
diff --git a/runtime/vm/os_thread_win.cc b/runtime/vm/os_thread_win.cc
index 11766f7..5b2b090 100644
--- a/runtime/vm/os_thread_win.cc
+++ b/runtime/vm/os_thread_win.cc
@@ -159,6 +159,12 @@
   ASSERT(res == WAIT_OBJECT_0);
 }
 
+void OSThread::Detach(ThreadJoinId id) {
+  HANDLE handle = static_cast<HANDLE>(id);
+  ASSERT(handle != nullptr);
+  CloseHandle(handle);
+}
+
 intptr_t OSThread::ThreadIdToIntPtr(ThreadId id) {
   COMPILE_ASSERT(sizeof(id) <= sizeof(intptr_t));
   return static_cast<intptr_t>(id);
diff --git a/runtime/vm/port.cc b/runtime/vm/port.cc
index 139f4dc..a6d0375 100644
--- a/runtime/vm/port.cc
+++ b/runtime/vm/port.cc
@@ -52,9 +52,9 @@
   return result;
 }
 
-Dart_Port PortMap::CreatePort(MessageHandler* handler) {
+Dart_Port PortMap::CreatePort(PortHandler* handler) {
   ASSERT(handler != nullptr);
-  MutexLocker ml(mutex_);
+  PortMap::Locker ml;
   if (ports_ == nullptr) {
     return ILLEGAL_PORT;
   }
@@ -64,35 +64,28 @@
 #endif
 
   const Dart_Port port = AllocatePort();
-
-  // The MessageHandler::ports_ is only accessed by [PortMap], it is guarded
-  // by the [PortMap::mutex_] we already hold.
-  MessageHandler::PortSetEntry isolate_entry;
-  isolate_entry.port = port;
-  handler->ports_.Insert(isolate_entry);
-
-  Entry entry;
-  entry.port = port;
-  entry.handler = handler;
-  ports_->Insert(entry);
+  if (auto ports = handler->ports(ml)) {
+    ports->Insert(PortHandler::PortSetEntry{port});
+  }
+  ports_->Insert(Entry{port, handler});
 
   if (FLAG_trace_isolates) {
     OS::PrintErr(
         "[+] Opening port: \n"
         "\thandler:    %s\n"
         "\tport:       %" Pd64 "\n",
-        handler->name(), entry.port);
+        handler->name(), port);
   }
 
-  return entry.port;
+  return port;
 }
 
-bool PortMap::ClosePort(Dart_Port port, MessageHandler** message_handler) {
-  if (message_handler != nullptr) *message_handler = nullptr;
+bool PortMap::ClosePort(Dart_Port port, PortHandler** port_handler) {
+  if (port_handler != nullptr) *port_handler = nullptr;
 
-  MessageHandler* handler = nullptr;
+  PortHandler* handler = nullptr;
   {
-    MutexLocker ml(mutex_);
+    PortMap::Locker ml;
     if (ports_ == nullptr) {
       return false;
     }
@@ -108,33 +101,33 @@
     handler->CheckAccess();
 #endif
 
-    // Delete the port entry before releasing the lock to avoid holding the lock
-    // while flushing the messages below.
     it.Delete();
     ports_->Rebalance();
 
-    // The MessageHandler::ports_ is only accessed by [PortMap], it is guarded
-    // by the [PortMap::mutex_] we already hold.
-    auto isolate_it = handler->ports_.TryLookup(port);
-    ASSERT(isolate_it != handler->ports_.end());
-    isolate_it.Delete();
-    handler->ports_.Rebalance();
+    if (auto ports = handler->ports(ml)) {
+      auto isolate_it = ports->TryLookup(port);
+      ASSERT(isolate_it != ports->end());
+      isolate_it.Delete();
+      ports->Rebalance();
+    }
   }
-  handler->ClosePort(port);
-  if (message_handler != nullptr) *message_handler = handler;
+  handler->OnPortClosed(port);
+  if (port_handler != nullptr) *port_handler = handler;
   return true;
 }
 
 void PortMap::ClosePorts(MessageHandler* handler) {
   {
-    MutexLocker ml(mutex_);
+    PortMap::Locker ml;
     if (ports_ == nullptr) {
       return;
     }
-    // The MessageHandler::ports_ is only accessed by [PortMap], it is guarded
-    // by the [PortMap::mutex_] we already hold.
-    for (auto isolate_it = handler->ports_.begin();
-         isolate_it != handler->ports_.end(); ++isolate_it) {
+
+    auto ports = handler->ports(ml);
+    ASSERT(ports != nullptr);
+
+    for (auto isolate_it = ports->begin(); isolate_it != ports->end();
+         ++isolate_it) {
       auto it = ports_->TryLookup((*isolate_it).port);
       ASSERT(it != ports_->end());
       Entry entry = *it;
@@ -143,10 +136,10 @@
       it.Delete();
       isolate_it.Delete();
     }
-    ASSERT(handler->ports_.IsEmpty());
+    ASSERT(ports->IsEmpty());
     ports_->Rebalance();
   }
-  handler->CloseAllPorts();
+  handler->OnAllPortsClosed();
 }
 
 bool PortMap::PostMessage(std::unique_ptr<Message> message,
@@ -161,7 +154,7 @@
     message->DropFinalizers();
     return false;
   }
-  MessageHandler* handler = (*it).handler;
+  auto handler = (*it).handler;
   ASSERT(handler != nullptr);
   handler->PostMessage(std::move(message), before_events);
   return true;
@@ -189,7 +182,7 @@
     return nullptr;
   }
 
-  MessageHandler* handler = (*it).handler;
+  auto handler = (*it).handler;
   return handler->isolate();
 }
 
@@ -204,7 +197,7 @@
     return ILLEGAL_PORT;
   }
 
-  MessageHandler* handler = (*it).handler;
+  auto handler = (*it).handler;
   Isolate* isolate = handler->isolate();
   if (isolate == nullptr) {
     // Message handler is a native port instead of an isolate.
@@ -258,6 +251,13 @@
   }
 }
 
+void PortMap::Shutdown() {
+  // Tell all handlers which are running their own thread pools to shutdown.
+  for (auto& entry : *ports_) {
+    entry.handler->Shutdown();
+  }
+}
+
 void PortMap::Cleanup() {
   ASSERT(ports_ != nullptr);
   ASSERT(prng_ != nullptr);
@@ -317,4 +317,12 @@
   }
 }
 
+PortHandler::~PortHandler() {}
+
+#if defined(DEBUG)
+void PortHandler::CheckAccess() const {
+  // By default there is no checking.
+}
+#endif
+
 }  // namespace dart
diff --git a/runtime/vm/port.h b/runtime/vm/port.h
index e5afe8f..8e94e35 100644
--- a/runtime/vm/port.h
+++ b/runtime/vm/port.h
@@ -11,6 +11,7 @@
 #include "vm/allocation.h"
 #include "vm/globals.h"
 #include "vm/json_stream.h"
+#include "vm/lockers.h"
 #include "vm/port_set.h"
 #include "vm/random.h"
 
@@ -20,17 +21,17 @@
 class Message;
 class MessageHandler;
 class Mutex;
+class PortHandler;
 
 class PortMap : public AllStatic {
  public:
   // Allocate a port for the provided handler and return its VM-global id.
-  static Dart_Port CreatePort(MessageHandler* handler);
+  static Dart_Port CreatePort(PortHandler* handler);
 
   // Close the port with id. All pending messages will be dropped.
   //
   // Returns true if the port is successfully closed.
-  static bool ClosePort(Dart_Port id,
-                        MessageHandler** message_handler = nullptr);
+  static bool ClosePort(Dart_Port id, PortHandler** port_handler = nullptr);
 
   // Close all the ports for the provided handler.
   static void ClosePorts(MessageHandler* handler);
@@ -58,6 +59,7 @@
                                                    IsolateGroup* group);
 
   static void Init();
+  static void Shutdown();
   static void Cleanup();
 
   static void PrintPortsForMessageHandler(MessageHandler* handler,
@@ -65,11 +67,18 @@
 
   static void DebugDumpForMessageHandler(MessageHandler* handler);
 
+  class Locker : public MutexLocker {
+   public:
+    Locker() : MutexLocker(PortMap::mutex_) {}
+  };
+
  private:
   struct Entry : public PortSet<Entry>::Entry {
     Entry() : handler(nullptr) {}
+    Entry(Dart_Port port, PortHandler* handler)
+        : PortSet<Entry>::Entry(port), handler(handler) {}
 
-    MessageHandler* handler;
+    PortHandler* handler;
   };
 
   // Allocate a new unique port.
@@ -83,6 +92,54 @@
   static Random* prng_;
 };
 
+// An object handling messages dispatched to one or more ports in the |PortMap|.
+class PortHandler {
+ public:
+  virtual ~PortHandler();
+
+  virtual const char* name() const = 0;
+
+  // Notify the handler that a port previously associated with it is
+  // now closed.
+  virtual void OnPortClosed(Dart_Port port) = 0;
+
+#if defined(DEBUG)
+  // Check that it is safe to access this port handler.
+  //
+  // For example, if this |PortHandler| is an isolate, then it is
+  // only safe to access it when it is the current isolate.
+  virtual void CheckAccess() const;
+#endif
+
+  // Return Isolate to which this message handler corresponds to.
+  virtual Isolate* isolate() const = 0;
+
+  // Ask the handler to shutdown, e.g. stop associated thread pools if any.
+  virtual void Shutdown() = 0;
+
+  // Posts a message on this handler's message queue.
+  // If before_events is true, then the message is enqueued before any pending
+  // events, but after any pending isolate library events.
+  virtual void PostMessage(std::unique_ptr<Message> message,
+                           bool before_events = false) = 0;
+
+ protected:
+  struct PortSetEntry : public PortSet<PortSetEntry>::Entry {
+    PortSetEntry() : Entry() {}
+    explicit PortSetEntry(Dart_Port port) : Entry(port) {}
+  };
+
+ private:
+  friend class PortMap;
+
+  // Returns set of ports associate with this handler if
+  // handler supports multiple ports or |nullptr| otherwise.
+  //
+  // Only |PortMap| is expected to call this method under locked
+  // PortMap::mutex_.
+  virtual PortSet<PortSetEntry>* ports(PortMap::Locker& locker) = 0;
+};
+
 }  // namespace dart
 
 #endif  // RUNTIME_VM_PORT_H_
diff --git a/runtime/vm/port_set.h b/runtime/vm/port_set.h
index 5809f6a..48e66cc 100644
--- a/runtime/vm/port_set.h
+++ b/runtime/vm/port_set.h
@@ -21,6 +21,7 @@
 
   struct Entry : public MallocAllocated {
     Entry() : port(kFreePort) {}
+    explicit Entry(Dart_Port port) : port(port) {}
 
     // Free entries have set this to 0.
     Dart_Port port;
diff --git a/runtime/vm/thread_pool.cc b/runtime/vm/thread_pool.cc
index 28ae62f..3cf5fab 100644
--- a/runtime/vm/thread_pool.cc
+++ b/runtime/vm/thread_pool.cc
@@ -41,23 +41,59 @@
   Shutdown();
 }
 
-void ThreadPool::Shutdown() {
-  {
-    MutexLocker ml(&pool_mutex_);
+void ThreadPool::RequestWorkersToShutdown() {
+  MutexLocker ml(&pool_mutex_);
 
-    // Prevent scheduling of new tasks.
-    shutting_down_ = true;
+  // If we are just starting to shutdown threads then this should be done
+  // before OSThread::DisableOSThreadCreation is called. If |OSThread| creation
+  // is disabled after |Worker::StartThread| is called but before
+  // |ThreadPool::Worker::Main| is called then a worker will be stuck in the
+  // state idle but will never properly start and thus will never transition to
+  // dead - leading to a deadlock.
+  RELEASE_ASSERT(shutting_down_ || OSThread::CanCreateOSThreads());
 
-    if (running_workers_.IsEmpty() && idle_workers_.IsEmpty()) {
-      // All workers have already died.
-      all_workers_dead_ = true;
-    } else {
-      // Tell all idling workers to drain remaining work and then shut down.
-      for (auto worker : idle_workers_) {
-        worker->Wakeup();
-      }
+  // Prevent scheduling of new tasks.
+  shutting_down_ = true;
+
+  if (running_workers_.IsEmpty() && idle_workers_.IsEmpty()) {
+    // All workers have already died.
+    all_workers_dead_ = true;
+  } else {
+    // Tell all idling workers to drain remaining work and then shut down.
+    for (auto worker : idle_workers_) {
+      worker->Wakeup();
     }
   }
+}
+
+void ThreadPool::RequestShutdown(
+    ThreadPool* pool,
+    std::function<void(void)>&& shutdown_complete) {
+  pool->RequestWorkersToShutdown();
+
+  {
+    MonitorLocker eml(&pool->exit_monitor_);
+    if (!pool->all_workers_dead_) {
+      // Workers are still doing some work. Mark this pool for asynchronous
+      // deletion. When the last worker finishes it will delete itself and
+      // call shutdown_complete.
+      pool->shutdown_complete_callback_ = std::move(shutdown_complete);
+      return;
+    }
+
+    // Threads are in the process of exiting already and there is no way to ask
+    // them to do additional cleanup asynchronously. We will just join the
+    // last dead worker and delete it synchronously.
+  }
+  pool->DeleteLastDeadWorker();
+  shutdown_complete();
+}
+
+void ThreadPool::Shutdown() {
+  // Should not combine |Shutdown| and |RequestShutdown| on the same pool.
+  ASSERT(shutdown_complete_callback_ == nullptr);
+
+  RequestWorkersToShutdown();
 
   // Wait until all workers are dead. Any new death will notify the exit
   // monitor.
@@ -67,20 +103,18 @@
       eml.Wait();
     }
   }
+
+  DeleteLastDeadWorker();
+}
+
+void ThreadPool::DeleteLastDeadWorker() {
+  ASSERT(all_workers_dead_);
   ASSERT(count_idle_ == 0);
   ASSERT(count_running_ == 0);
   ASSERT(idle_workers_.IsEmpty());
   ASSERT(running_workers_.IsEmpty());
-
-  WorkerList dead_workers_to_join;
-  {
-    MutexLocker ml(&pool_mutex_);
-    ObtainDeadWorkersLocked(&dead_workers_to_join);
-  }
-  JoinDeadWorkersLocked(&dead_workers_to_join);
-
-  ASSERT(count_dead_ == 0);
-  ASSERT(dead_workers_.IsEmpty());
+  JoinDeadWorker(last_dead_worker_);
+  last_dead_worker_ = nullptr;
 }
 
 bool ThreadPool::RunImpl(std::unique_ptr<Task> task) {
@@ -156,7 +190,7 @@
 }
 
 void ThreadPool::WorkerLoop(Worker* worker) {
-  WorkerList dead_workers_to_join;
+  Worker* previous_dead_worker = nullptr;
 
   while (true) {
     MutexLocker ml(&pool_mutex_);
@@ -165,7 +199,6 @@
       IdleToRunningLocked(worker);
       while (!tasks_.IsEmpty()) {
         auto task = TakeNextAvailableTaskLocked();
-
         MutexUnlocker mls(&ml);
         task->Run();
         ASSERT(Isolate::Current() == nullptr);
@@ -183,8 +216,7 @@
     }
 
     if (shutting_down_) {
-      ObtainDeadWorkersLocked(&dead_workers_to_join);
-      IdleToDeadLocked(worker);
+      previous_dead_worker = IdleToDeadLocked(worker);
       break;
     }
 
@@ -203,17 +235,16 @@
       }
     }
     if (done) {
-      ObtainDeadWorkersLocked(&dead_workers_to_join);
-      IdleToDeadLocked(worker);
+      previous_dead_worker = IdleToDeadLocked(worker);
       break;
     }
   }
 
-  // Before we transitioned to dead we obtained the list of previously died dead
-  // workers, which we join here. Since every death of a worker will join
-  // previously died workers, we keep the pending non-joined [dead_workers_] to
-  // effectively 1.
-  JoinDeadWorkersLocked(&dead_workers_to_join);
+  // |IdleToDeadLocked| obtained the worker which died before us, which we will
+  // join here. Since every dead worker will join the previous one, all dead
+  // workers effectively form a chain and it is enough to join the worker which
+  // died last to join all workers which died before it.
+  JoinDeadWorker(previous_dead_worker);
 }
 
 void ThreadPool::IdleToRunningLocked(Worker* worker) {
@@ -234,14 +265,14 @@
   count_idle_++;
 }
 
-void ThreadPool::IdleToDeadLocked(Worker* worker) {
+ThreadPool::Worker* ThreadPool::IdleToDeadLocked(Worker* worker) {
   ASSERT(tasks_.IsEmpty());
+  Worker* previous_dead = last_dead_worker_;
 
   ASSERT(idle_workers_.ContainsForDebugging(worker));
   idle_workers_.Remove(worker);
-  dead_workers_.Append(worker);
+  last_dead_worker_ = worker;
   count_idle_--;
-  count_dead_++;
 
   // Notify shutdown thread that the worker thread is about to finish.
   if (shutting_down_) {
@@ -251,24 +282,15 @@
       eml.Notify();
     }
   }
+
+  return previous_dead;
 }
 
-void ThreadPool::ObtainDeadWorkersLocked(WorkerList* dead_workers_to_join) {
-  dead_workers_to_join->AppendList(&dead_workers_);
-  ASSERT(dead_workers_.IsEmpty());
-  count_dead_ = 0;
-}
-
-void ThreadPool::JoinDeadWorkersLocked(WorkerList* dead_workers_to_join) {
-  auto it = dead_workers_to_join->begin();
-  while (it != dead_workers_to_join->end()) {
-    Worker* worker = *it;
-    it = dead_workers_to_join->Erase(it);
-
+void ThreadPool::JoinDeadWorker(Worker* worker) {
+  if (worker != nullptr) {
     OSThread::Join(worker->join_id_);
     delete worker;
   }
-  ASSERT(dead_workers_to_join->IsEmpty());
 }
 
 ThreadPool::Worker* ThreadPool::ScheduleTaskLocked(std::unique_ptr<Task> task) {
@@ -353,6 +375,28 @@
   if (exit_cb != nullptr) {
     exit_cb();
   }
+
+  ThreadPool::WorkerThreadExit(pool, worker);
+}
+
+void ThreadPool::WorkerThreadExit(ThreadPool* pool, Worker* worker) {
+  if (pool->shutdown_complete_callback_ != nullptr && pool->all_workers_dead_ &&
+      pool->last_dead_worker_ == worker) {
+    // Asynchronous shutdown was requested and this is the last exiting worker.
+    // It needs to delete itself and notify the code which requested the
+    // shutdown that we are done.
+    // Start by detaching the thread (because nobody is going to join it) so
+    // that we don't keep any thread related data structures behind.
+    OSThread::Detach(worker->join_id_);
+    delete worker;
+    pool->last_dead_worker_ = nullptr;
+
+    // Run the callback. It might (and most likely will) delete |pool| so this
+    // should be the last time we touch the |pool| pointer.
+    auto callback = pool->shutdown_complete_callback_;
+    pool->shutdown_complete_callback_ = nullptr;
+    callback();
+  }
 }
 
 }  // namespace dart
diff --git a/runtime/vm/thread_pool.h b/runtime/vm/thread_pool.h
index 74479ac..99fee30 100644
--- a/runtime/vm/thread_pool.h
+++ b/runtime/vm/thread_pool.h
@@ -5,6 +5,7 @@
 #ifndef RUNTIME_VM_THREAD_POOL_H_
 #define RUNTIME_VM_THREAD_POOL_H_
 
+#include <functional>
 #include <memory>
 #include <utility>
 
@@ -59,13 +60,25 @@
   // to continue executing.
   void MarkCurrentWorkerAsUnBlocked();
 
-  // Triggers shutdown, prevents scheduling of new tasks.
+  // Triggers shutdown, prevents scheduling of new tasks and waits for all
+  // worker threads to exit.
+  //
+  // Existing tasks are executed to completion.
   void Shutdown();
 
+  // Prevent scheduling of new tasks on |pool| and request it to shutdown
+  // after all currently running tasks finish. |shutdown_complete| will be
+  // invoked when shutdown is complete. This might happen synchronously
+  // if all workers are already stopped or on one of the worker threads.
+  //
+  // It is safe to delete |pool| from |shutdown_complete|.
+  static void RequestShutdown(ThreadPool* pool,
+                              std::function<void(void)>&& shutdown_complete);
+
   // Exposed for unit test in thread_pool_test.cc
   uint64_t workers_started() const { return count_idle_ + count_running_; }
   // Exposed for unit test in thread_pool_test.cc
-  uint64_t workers_stopped() const { return count_dead_; }
+  bool has_pending_dead_worker() const { return last_dead_worker_ != nullptr; }
 
  protected:
   class Worker : public IntrusiveDListEntry<Worker> {
@@ -112,6 +125,8 @@
   bool TasksWaitingToRunLocked() { return !tasks_.IsEmpty(); }
 
  private:
+  static void WorkerThreadExit(ThreadPool* pool, ThreadPool::Worker* worker);
+
   using TaskList = IntrusiveDList<Task>;
   using WorkerList = IntrusiveDList<Worker>;
 
@@ -124,9 +139,14 @@
 
   void IdleToRunningLocked(Worker* worker);
   void RunningToIdleLocked(Worker* worker);
-  void IdleToDeadLocked(Worker* worker);
-  void ObtainDeadWorkersLocked(WorkerList* dead_workers_to_join);
-  void JoinDeadWorkersLocked(WorkerList* dead_workers_to_join);
+  DART_WARN_UNUSED_RESULT Worker* IdleToDeadLocked(Worker* worker);
+  void JoinDeadWorker(Worker* worker);
+
+  Worker* TakeLastDeadWorker();
+
+  void RequestWorkersToShutdown();
+
+  void DeleteLastDeadWorker();
 
   Mutex pool_mutex_;
   bool shutting_down_ = false;
@@ -135,13 +155,19 @@
   uint64_t count_dead_ = 0;
   WorkerList running_workers_;
   WorkerList idle_workers_;
-  WorkerList dead_workers_;
+
+  Worker* last_dead_worker_ = nullptr;
+
   uint64_t pending_tasks_ = 0;
   TaskList tasks_;
 
   Monitor exit_monitor_;
   std::atomic<bool> all_workers_dead_;
 
+  // If asynchronous shutdown is requested then this callback will be
+  // invoked by the last exiting worker.
+  std::function<void(void)> shutdown_complete_callback_;
+
   uintptr_t max_pool_size_ = 0;
 
   DISALLOW_COPY_AND_ASSIGN(ThreadPool);
diff --git a/runtime/vm/thread_pool_test.cc b/runtime/vm/thread_pool_test.cc
index 7a6effd..4798d57 100644
--- a/runtime/vm/thread_pool_test.cc
+++ b/runtime/vm/thread_pool_test.cc
@@ -74,7 +74,7 @@
 
   // Do a sanity test on the worker stats.
   EXPECT_EQ(1U, thread_pool.workers_started());
-  EXPECT_EQ(0U, thread_pool.workers_stopped());
+  EXPECT(!thread_pool.has_pending_dead_worker());
 }
 
 THREAD_POOL_UNIT_TEST_CASE(ThreadPool_RunMany) {
@@ -175,14 +175,15 @@
   {
     ThreadPool thread_pool;
     EXPECT_EQ(0U, thread_pool.workers_started());
-    EXPECT_EQ(0U, thread_pool.workers_stopped());
+    EXPECT(!thread_pool.has_pending_dead_worker());
 
     // Run a worker.
     Monitor sync;
     bool done = true;
     thread_pool.Run<TestTask>(&sync, &done);
     EXPECT_EQ(1U, thread_pool.workers_started());
-    EXPECT_EQ(0U, thread_pool.workers_stopped());
+    EXPECT(!thread_pool.has_pending_dead_worker());
+
     {
       MonitorLocker ml(&sync);
       done = false;
@@ -196,11 +197,11 @@
     // Wait up to 5 seconds to see if a worker times out.
     const int kMaxWait = 5000;
     int waited = 0;
-    while (thread_pool.workers_stopped() == 0 && waited < kMaxWait) {
+    while (!thread_pool.has_pending_dead_worker() && waited < kMaxWait) {
       OS::Sleep(1);
       waited += 1;
     }
-    EXPECT_EQ(1U, thread_pool.workers_stopped());
+    EXPECT(thread_pool.has_pending_dead_worker());
   }
 
   FLAG_worker_timeout_millis = saved_timeout;
diff --git a/sdk/lib/_internal/vm/bin/io_service_patch.dart b/sdk/lib/_internal/vm/bin/io_service_patch.dart
index eae117a..7fd13b8 100644
--- a/sdk/lib/_internal/vm/bin/io_service_patch.dart
+++ b/sdk/lib/_internal/vm/bin/io_service_patch.dart
@@ -4,48 +4,13 @@
 
 part of "common_patch.dart";
 
-class _IOServicePorts {
-  // We limit the number of IO Service ports per isolate so that we don't
-  // spawn too many threads all at once, which can crash the VM on Windows.
-  static const int maxPorts = 32;
-  final List<SendPort> _ports = [];
-  final List<int> _useCounts = [];
-  final List<int> _freePorts = [];
-  final Map<int, int> _usedPorts = HashMap<int, int>();
-
-  _IOServicePorts();
-
-  SendPort _getPort(int forRequestId) {
-    assert(!_usedPorts.containsKey(forRequestId));
-    if (_freePorts.isEmpty && _ports.length < maxPorts) {
-      final SendPort port = _newServicePort();
-      _ports.add(port);
-      _useCounts.add(0);
-      _freePorts.add(_ports.length - 1);
-    }
-    // Use a free port if one exists.
-    final index = _freePorts.isNotEmpty
-        ? _freePorts.removeLast()
-        : forRequestId % maxPorts;
-    _usedPorts[forRequestId] = index;
-    _useCounts[index]++;
-    return _ports[index];
-  }
-
-  void _returnPort(int forRequestId) {
-    final index = _usedPorts.remove(forRequestId)!;
-    if (--_useCounts[index] == 0) {
-      _freePorts.add(index);
-    }
-  }
-
-  @pragma("vm:external-name", "IOService_NewServicePort")
-  external static SendPort _newServicePort();
-}
+@pragma("vm:external-name", "IOService_NewServicePort")
+external SendPort _newServicePort();
 
 @patch
 class _IOService {
-  static _IOServicePorts _servicePorts = new _IOServicePorts();
+  static final SendPort _port = _newServicePort();
+
   static RawReceivePort? _receivePort;
   static late SendPort _replyToPort;
   static HashMap<int, Completer> _messageMap = new HashMap<int, Completer>();
@@ -59,10 +24,9 @@
     } while (_messageMap.containsKey(id));
     final Completer completer = new Completer();
     try {
-      final SendPort servicePort = _servicePorts._getPort(id);
       _ensureInitialize();
       _messageMap[id] = completer;
-      servicePort.send(<dynamic>[id, _replyToPort, request, data]);
+      _port.send(<dynamic>[id, _replyToPort, request, data]);
     } catch (error) {
       _messageMap.remove(id)!.complete(error);
       if (_messageMap.length == 0) {
@@ -79,7 +43,6 @@
       _receivePort!.handler = (List<Object?> data) {
         assert(data.length == 2);
         _messageMap.remove(data[0])!.complete(data[1]);
-        _servicePorts._returnPort(data[0] as int);
         if (_messageMap.length == 0) {
           _finalize();
         }
diff --git a/tests/standalone/io/socket_finalizer_test.dart b/tests/standalone/io/socket_finalizer_test.dart
index 5dfca94..7662534 100644
--- a/tests/standalone/io/socket_finalizer_test.dart
+++ b/tests/standalone/io/socket_finalizer_test.dart
@@ -9,6 +9,7 @@
 import 'dart:async';
 import 'dart:io';
 import 'dart:isolate';
+import 'dart:_internal'; // ignore: import_internal_library, unused_import
 
 import "package:async_helper/async_helper.dart";
 import "package:expect/expect.dart";
@@ -32,19 +33,19 @@
     }, onError: (e) {
       Expect.fail("Socket error $e");
     });
-    isolate.kill();
 
-    // Cause a GC to collect the [socket] from [connectorIsolate].
-    for (int i = 0; i < 100000; ++i) {
-      produceGarbage();
-    }
+    final port = ReceivePort();
+    port.listen((_) {
+      print("Isolate exited - triggering GC");
+      // Cause a GC to collect the [socket] from [connectorIsolate].
+      VMInternalsForTesting.collectAllGarbage(); // ignore: undefined_identifier
+      port.close();
+    });
+    isolate.addOnExitListener(port.sendPort);
+
+    isolate.kill();
   });
   await completer.future;
   await server.close();
   asyncEnd();
 }
-
-@pragma('vm:never-inline')
-produceGarbage() => all.add(List.filled(1024, null));
-
-final all = [];