[vm] Simplify implementation of native ports
This reland commit 5a32d8bc7c5569efde3aec844b7f26d33e1bd86e with a fix
for thread leak (Issue #56717): when `ThreadPool` is shutting down
asynchronously the last worker should detach itself to prevent
leaking associated low-level data structures, because no thread will
join it.
A hang in service isolate shutdown (caused by an existing bug) was fixed by commit 157a0dc7f98be66a32329d097e6fef096ea62d84.
This CL turns native ports into a thin abstraction over underlying
thread pool instead of building them as full fledged MessageHandler.
This allows to easily implement a variation of native ports which can
handle messages concurrently with the given degree of concurrency.
This type of port can be used to greatly simplify implementation of
IOService - which previously had to do its own concurrency management
on top of "single threaded" native ports. This capability is exposed
as `Dart_NewConcurrentNativePort` API.
The new implementation is in general much cleaner then the old one
with one exception: `Dart_CloseNativePort` API has unfortunate design
where underlying message handler is destroyed asynchronously and
`Dart_CloseNativePort` returns immediately without waiting for pending
tasks to complete. Implementing this on top of `ThreadPool` requires
some changes to thread pool implementation.
Issue https://github.com/dart-lang/sdk/issues/55844
Closes https://github.com/dart-lang/sdk/issues/56717
TEST=ci
Change-Id: Ic68bfb60757685afd75c80a70cdec66cc13c149b
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/385000
Reviewed-by: Martin Kustermann <kustermann@google.com>
Commit-Queue: Slava Egorov <vegorov@google.com>
diff --git a/runtime/bin/io_service.cc b/runtime/bin/io_service.cc
index b651f6d..d16dda2 100644
--- a/runtime/bin/io_service.cc
+++ b/runtime/bin/io_service.cc
@@ -55,7 +55,8 @@
}
Dart_Port IOService::GetServicePort() {
- return Dart_NewNativePort("IOService", IOServiceCallback, true);
+ return Dart_NewConcurrentNativePort("IOService", IOServiceCallback,
+ /*max_concurrency=*/32);
}
void FUNCTION_NAME(IOService_NewServicePort)(Dart_NativeArguments args) {
diff --git a/runtime/bin/io_service_no_ssl.cc b/runtime/bin/io_service_no_ssl.cc
index ecf4428..7e6d83c 100644
--- a/runtime/bin/io_service_no_ssl.cc
+++ b/runtime/bin/io_service_no_ssl.cc
@@ -53,7 +53,8 @@
}
Dart_Port IOService::GetServicePort() {
- return Dart_NewNativePort("IOService", IOServiceCallback, true);
+ return Dart_NewConcurrentNativePort("IOService", IOServiceCallback,
+ /*max_concurrency=*/32);
}
void FUNCTION_NAME(IOService_NewServicePort)(Dart_NativeArguments args) {
diff --git a/runtime/include/dart_api.h b/runtime/include/dart_api.h
index 75e5edb..035c909 100644
--- a/runtime/include/dart_api.h
+++ b/runtime/include/dart_api.h
@@ -57,14 +57,14 @@
#endif
#if __GNUC__
-#define DART_WARN_UNUSED_RESULT __attribute__((warn_unused_result))
-#define DART_DEPRECATED(msg) __attribute__((deprecated(msg)))
+#define DART_API_WARN_UNUSED_RESULT __attribute__((warn_unused_result))
+#define DART_API_DEPRECATED(msg) __attribute__((deprecated(msg)))
#elif _MSC_VER
-#define DART_WARN_UNUSED_RESULT _Check_return_
-#define DART_DEPRECATED(msg) __declspec(deprecated(msg))
+#define DART_API_WARN_UNUSED_RESULT _Check_return_
+#define DART_API_DEPRECATED(msg) __declspec(deprecated(msg))
#else
-#define DART_WARN_UNUSED_RESULT
-#define DART_DEPRECATED(msg)
+#define DART_API_WARN_UNUSED_RESULT
+#define DART_API_DEPRECATED(msg)
#endif
/*
@@ -1000,7 +1000,7 @@
* \return NULL if initialization is successful. Returns an error message
* otherwise. The caller is responsible for freeing the error message.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT char* Dart_Initialize(
+DART_EXPORT DART_API_WARN_UNUSED_RESULT char* Dart_Initialize(
Dart_InitializeParams* params);
/**
@@ -1012,7 +1012,7 @@
* NOTE: This function must not be called on a thread that was created by the VM
* itself.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT char* Dart_Cleanup(void);
+DART_EXPORT DART_API_WARN_UNUSED_RESULT char* Dart_Cleanup(void);
/**
* Sets command line flags. Should be called before Dart_Initialize.
@@ -1025,8 +1025,9 @@
*
* NOTE: This call does not store references to the passed in c-strings.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT char* Dart_SetVMFlags(int argc,
- const char** argv);
+DART_EXPORT DART_API_WARN_UNUSED_RESULT char* Dart_SetVMFlags(
+ int argc,
+ const char** argv);
/**
* Returns true if the named VM flag is of boolean type, specified, and set to
@@ -1480,7 +1481,7 @@
*
* \return A valid handle if no error occurs during the operation.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_CreateSnapshot(uint8_t** vm_snapshot_data_buffer,
intptr_t* vm_snapshot_data_size,
uint8_t** isolate_snapshot_data_buffer,
@@ -1510,7 +1511,7 @@
* \return NULL if successful. Returns an error message otherwise. The caller
* is responsible for freeing the error message.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT char* Dart_IsolateMakeRunnable(
+DART_EXPORT DART_API_WARN_UNUSED_RESULT char* Dart_IsolateMakeRunnable(
Dart_Isolate isolate);
/*
@@ -1680,7 +1681,7 @@
*
* \return A valid handle if no error occurs during the operation.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle Dart_HandleMessage(void);
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle Dart_HandleMessage(void);
/**
* Handles any pending messages for the vm service for the current
@@ -1720,7 +1721,7 @@
* exception or other error occurs while processing messages, an
* error handle is returned.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle Dart_RunLoop(void);
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle Dart_RunLoop(void);
/**
* Lets the VM run message processing for the isolate.
@@ -1740,7 +1741,7 @@
* of its message loop. If not successful the caller retains ownership of the
* isolate.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT bool Dart_RunLoopAsync(
+DART_EXPORT DART_API_WARN_UNUSED_RESULT bool Dart_RunLoopAsync(
bool errors_are_fatal,
Dart_Port on_error_port,
Dart_Port on_exit_port,
@@ -2785,7 +2786,7 @@
* then the new object. If an error occurs during execution, then an
* error handle is returned.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_New(Dart_Handle type,
Dart_Handle constructor_name,
int number_of_arguments,
@@ -2799,7 +2800,8 @@
* \return The new object. If an error occurs during execution, then an
* error handle is returned.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle Dart_Allocate(Dart_Handle type);
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
+Dart_Allocate(Dart_Handle type);
/**
* Allocate a new object without invoking a constructor, and sets specified
@@ -2840,7 +2842,7 @@
* successfully, then the return value is returned. If an error
* occurs during execution, then an error handle is returned.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_Invoke(Dart_Handle target,
Dart_Handle name,
int number_of_arguments,
@@ -2856,7 +2858,7 @@
* invoking the closure is returned. If an error occurs during
* execution, then an error handle is returned.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_InvokeClosure(Dart_Handle closure,
int number_of_arguments,
Dart_Handle* arguments);
@@ -2881,7 +2883,7 @@
* successfully, then the object is returned. If an error
* occurs during execution, then an error handle is returned.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_InvokeConstructor(Dart_Handle object,
Dart_Handle name,
int number_of_arguments,
@@ -2907,7 +2909,7 @@
* \return If no error occurs, then the value of the field is
* returned. Otherwise an error handle is returned.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_GetField(Dart_Handle container, Dart_Handle name);
/**
@@ -2930,7 +2932,7 @@
*
* \return A valid handle if no error occurs.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_SetField(Dart_Handle container, Dart_Handle name, Dart_Handle value);
/*
@@ -3505,7 +3507,7 @@
* Requires the current isolate to be the same current isolate during the
* invocation of the Dart_DeferredLoadHandler.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_DeferredLoadComplete(intptr_t loading_unit_id,
const uint8_t* snapshot_data,
const uint8_t* snapshot_instructions);
@@ -3522,7 +3524,7 @@
* Requires the current isolate to be the same current isolate during the
* invocation of the Dart_DeferredLoadHandler.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_DeferredLoadCompleteError(intptr_t loading_unit_id,
const char* error_message,
bool transient);
@@ -3538,7 +3540,7 @@
*
* \return A handle to the root library, or an error.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_LoadScriptFromKernel(const uint8_t* kernel_buffer, intptr_t kernel_size);
/**
@@ -3706,10 +3708,10 @@
*
* \return A handle to the main library of the compilation unit, or an error.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_LoadLibraryFromKernel(const uint8_t* kernel_buffer,
intptr_t kernel_buffer_size);
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_LoadLibrary(Dart_Handle kernel_buffer);
/**
@@ -3725,7 +3727,7 @@
* \return Success if all classes have been finalized and deferred library
* futures are completed. Otherwise, returns an error.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_FinalizeLoading(bool complete_futures);
/*
@@ -3994,12 +3996,12 @@
*
* \return A valid handle if no error occurs during the operation.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_CreateAppAOTSnapshotAsAssembly(Dart_StreamingWriteCallback callback,
void* callback_data,
bool stripped,
void* debug_callback_data);
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_CreateAppAOTSnapshotAsAssemblies(
Dart_CreateLoadingUnitCallback next_callback,
void* next_callback_data,
@@ -4034,12 +4036,12 @@
*
* \return A valid handle if no error occurs during the operation.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_CreateAppAOTSnapshotAsElf(Dart_StreamingWriteCallback callback,
void* callback_data,
bool stripped,
void* debug_callback_data);
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_CreateAppAOTSnapshotAsElfs(Dart_CreateLoadingUnitCallback next_callback,
void* next_callback_data,
bool stripped,
@@ -4052,7 +4054,7 @@
* not strip DWARF information from the generated assembly or allow for
* separate debug information.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_CreateVMAOTSnapshotAsAssembly(Dart_StreamingWriteCallback callback,
void* callback_data);
@@ -4063,7 +4065,7 @@
*
* \return A valid handle if no error occurs during the operation.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle Dart_SortClasses(void);
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle Dart_SortClasses(void);
/**
* Creates a snapshot that caches compiled code and type feedback for faster
@@ -4086,7 +4088,7 @@
*
* \return A valid handle if no error occurs during the operation.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_CreateAppJITSnapshotAsBlobs(uint8_t** isolate_snapshot_data_buffer,
intptr_t* isolate_snapshot_data_size,
uint8_t** isolate_snapshot_instructions_buffer,
@@ -4101,7 +4103,7 @@
* \return Returns an error handler if the VM was built in a mode that does not
* support obfuscation.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
Dart_GetObfuscationMap(uint8_t** buffer, intptr_t* buffer_length);
/**
diff --git a/runtime/include/dart_embedder_api.h b/runtime/include/dart_embedder_api.h
index bc03773..8ccf9e5 100644
--- a/runtime/include/dart_embedder_api.h
+++ b/runtime/include/dart_embedder_api.h
@@ -18,7 +18,7 @@
//
// Returns true on success and false otherwise, in which case error would
// contain error message.
-DART_WARN_UNUSED_RESULT bool InitOnce(char** error);
+DART_API_WARN_UNUSED_RESULT bool InitOnce(char** error);
// Cleans up all subsystems of the embedder.
//
@@ -51,7 +51,7 @@
// script_uri.
// The isolate is created from the given snapshot (might be kernel data or
// app-jit snapshot).
-DART_WARN_UNUSED_RESULT Dart_Isolate
+DART_API_WARN_UNUSED_RESULT Dart_Isolate
CreateKernelServiceIsolate(const IsolateCreationData& data,
const uint8_t* buffer,
intptr_t buffer_size,
@@ -81,7 +81,7 @@
// is expected to contain all necessary 'vm-service' libraries.
// This method should be used when VM invokes isolate creation callback with
// DART_VM_SERVICE_ISOLATE_NAME as script_uri.
-DART_WARN_UNUSED_RESULT Dart_Isolate
+DART_API_WARN_UNUSED_RESULT Dart_Isolate
CreateVmServiceIsolate(const IsolateCreationData& data,
const VmServiceConfiguration& config,
const uint8_t* isolate_data,
@@ -92,7 +92,7 @@
// is expected to contain all necessary 'vm-service' libraries.
// This method should be used when VM invokes isolate creation callback with
// DART_VM_SERVICE_ISOLATE_NAME as script_uri.
-DART_WARN_UNUSED_RESULT Dart_Isolate
+DART_API_WARN_UNUSED_RESULT Dart_Isolate
CreateVmServiceIsolateFromKernel(const IsolateCreationData& data,
const VmServiceConfiguration& config,
const uint8_t* kernel_buffer,
diff --git a/runtime/include/dart_native_api.h b/runtime/include/dart_native_api.h
index 79194e0..dbdaeb3 100644
--- a/runtime/include/dart_native_api.h
+++ b/runtime/include/dart_native_api.h
@@ -166,7 +166,23 @@
DART_EXPORT Dart_Port Dart_NewNativePort(const char* name,
Dart_NativeMessageHandler handler,
bool handle_concurrently);
-/* TODO(turnidge): Currently handle_concurrently is ignored. */
+
+/**
+ * Creates a new native port. When messages are received on this
+ * native port, then they will be dispatched to the provided native
+ * message handler using up to |max_concurrency| concurrent threads.
+ *
+ * \param name The name of this port in debugging messages.
+ * \param handler The C handler to run when messages arrive on the port.
+ * \param max_concurrency Size of the thread pool used by the native port.
+ *
+ * \return If successful, returns the port id for the native port. In
+ * case of error, returns ILLEGAL_PORT.
+ */
+DART_EXPORT Dart_Port
+Dart_NewConcurrentNativePort(const char* name,
+ Dart_NativeMessageHandler handler,
+ intptr_t max_concurrency);
/**
* Closes the native port with the given id.
@@ -191,12 +207,13 @@
*
* TODO(turnidge): Document.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle Dart_CompileAll(void);
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle Dart_CompileAll(void);
/**
* Finalizes all classes.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT Dart_Handle Dart_FinalizeAllClasses(void);
+DART_EXPORT DART_API_WARN_UNUSED_RESULT Dart_Handle
+Dart_FinalizeAllClasses(void);
/* This function is intentionally undocumented.
*
diff --git a/runtime/include/dart_tools_api.h b/runtime/include/dart_tools_api.h
index c118bc4..367e1eb 100644
--- a/runtime/include/dart_tools_api.h
+++ b/runtime/include/dart_tools_api.h
@@ -573,7 +573,7 @@
* \return The UserTag's label. NULL if the user_tag is invalid. The caller is
* responsible for freeing the returned label.
*/
-DART_EXPORT DART_WARN_UNUSED_RESULT char* Dart_GetUserTagLabel(
+DART_EXPORT DART_API_WARN_UNUSED_RESULT char* Dart_GetUserTagLabel(
Dart_Handle user_tag);
/*
diff --git a/runtime/platform/globals.h b/runtime/platform/globals.h
index 29bbb96..34bda32 100644
--- a/runtime/platform/globals.h
+++ b/runtime/platform/globals.h
@@ -797,6 +797,14 @@
#error Target operating system detection failed.
#endif
+#if __GNUC__
+#define DART_WARN_UNUSED_RESULT __attribute__((warn_unused_result))
+#elif _MSC_VER
+#define DART_WARN_UNUSED_RESULT _Check_return_
+#else
+#define DART_WARN_UNUSED_RESULT
+#endif
+
} // namespace dart
#endif // RUNTIME_PLATFORM_GLOBALS_H_
diff --git a/runtime/tests/vm/dart/exported_symbols_test.dart b/runtime/tests/vm/dart/exported_symbols_test.dart
index c2cddbb..290c1e4 100644
--- a/runtime/tests/vm/dart/exported_symbols_test.dart
+++ b/runtime/tests/vm/dart/exported_symbols_test.dart
@@ -247,6 +247,7 @@
"Dart_NewListOfType",
"Dart_NewListOfTypeFilled",
"Dart_NewNativePort",
+ "Dart_NewConcurrentNativePort",
"Dart_NewPersistentHandle",
"Dart_NewSendPort",
"Dart_NewSendPortEx",
diff --git a/runtime/vm/dart.cc b/runtime/vm/dart.cc
index 3c4502b..a71313d 100644
--- a/runtime/vm/dart.cc
+++ b/runtime/vm/dart.cc
@@ -702,6 +702,7 @@
UptimeMillis());
}
DartInitializationState::SetUnInitialized();
+ PortMap::Shutdown();
thread_pool_->Shutdown();
delete thread_pool_;
thread_pool_ = nullptr;
diff --git a/runtime/vm/isolate.cc b/runtime/vm/isolate.cc
index 0f04c1a..4be42f8 100644
--- a/runtime/vm/isolate.cc
+++ b/runtime/vm/isolate.cc
@@ -1069,25 +1069,23 @@
explicit IsolateMessageHandler(Isolate* isolate);
~IsolateMessageHandler();
- const char* name() const;
- void MessageNotify(Message::Priority priority);
- MessageStatus HandleMessage(std::unique_ptr<Message> message);
+ const char* name() const override;
+ void MessageNotify(Message::Priority priority) override;
+ MessageStatus HandleMessage(std::unique_ptr<Message> message) override;
#ifndef PRODUCT
- void NotifyPauseOnStart();
- void NotifyPauseOnExit();
+ void NotifyPauseOnStart() override;
+ void NotifyPauseOnExit() override;
#endif // !PRODUCT
#if defined(DEBUG)
// Check that it is safe to access this handler.
- void CheckAccess() const;
+ void CheckAccess() const override;
#endif
- bool IsCurrentIsolate() const;
- virtual Isolate* isolate() const { return isolate_; }
- virtual IsolateGroup* isolate_group() const { return isolate_->group(); }
- virtual bool KeepAliveLocked() {
- // If the message handler was asked to shutdown we shut down.
- if (!MessageHandler::KeepAliveLocked()) return false;
+ Isolate* isolate() const override { return isolate_; }
+ IsolateGroup* isolate_group() const { return isolate_->group(); }
+
+ bool KeepAliveLocked() override {
// Otherwise we only stay alive as long as there's active receive ports, or
// there are FFI callbacks keeping the isolate alive.
return isolate_->HasLivePorts() || isolate_->HasOpenNativeCallables();
@@ -1350,7 +1348,9 @@
MessageHandler::MessageStatus IsolateMessageHandler::HandleMessage(
std::unique_ptr<Message> message) {
- ASSERT(IsCurrentIsolate());
+#ifdef DEBUG
+ CheckAccess();
+#endif
Thread* thread = Thread::Current();
StackZone stack_zone(thread);
Zone* zone = stack_zone.GetZone();
@@ -1498,14 +1498,10 @@
#if defined(DEBUG)
void IsolateMessageHandler::CheckAccess() const {
- ASSERT(IsCurrentIsolate());
+ ASSERT(isolate() == Isolate::Current());
}
#endif
-bool IsolateMessageHandler::IsCurrentIsolate() const {
- return (I == Isolate::Current());
-}
-
static MessageHandler::MessageStatus StoreError(Thread* thread,
const Error& error) {
thread->set_sticky_error(error);
diff --git a/runtime/vm/message_handler.cc b/runtime/vm/message_handler.cc
index 10e0d45..9be103c 100644
--- a/runtime/vm/message_handler.cc
+++ b/runtime/vm/message_handler.cc
@@ -67,7 +67,6 @@
paused_timestamp_(-1),
#endif
task_running_(false),
- delete_me_(false),
pool_(nullptr),
start_callback_(nullptr),
end_callback_(nullptr),
@@ -88,12 +87,6 @@
return "<unnamed>";
}
-#if defined(DEBUG)
-void MessageHandler::CheckAccess() const {
- // By default there is no checking.
-}
-#endif
-
void MessageHandler::MessageNotify(Message::Priority priority) {
// By default, there is no custom message notification.
}
@@ -110,7 +103,6 @@
name());
}
ASSERT(pool_ == nullptr);
- ASSERT(!delete_me_);
pool_ = pool;
start_callback_ = start_callback;
end_callback_ = end_callback;
@@ -165,7 +157,6 @@
}
if (pool_ != nullptr && !task_running_) {
- ASSERT(!delete_me_);
task_running_ = true;
const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
ASSERT(launched_successfully);
@@ -292,7 +283,6 @@
// assigned to a thread pool.
MonitorLocker ml(&monitor_);
ASSERT(pool_ == nullptr);
- ASSERT(!delete_me_);
#if defined(DEBUG)
CheckAccess();
#endif
@@ -303,7 +293,6 @@
int64_t timeout_millis) {
MonitorLocker ml(&monitor_, /*no_safepoint_scope=*/false);
ASSERT(task_running_);
- ASSERT(!delete_me_);
#if defined(DEBUG)
CheckAccess();
#endif
@@ -317,7 +306,6 @@
wr = ml.Wait(timeout_millis);
}
ASSERT(task_running_);
- ASSERT(!delete_me_);
if (wr == Monitor::kTimedOut) {
break;
}
@@ -340,7 +328,6 @@
return kOK;
}
MonitorLocker ml(&monitor_);
- ASSERT(!delete_me_);
#if defined(DEBUG)
CheckAccess();
#endif
@@ -391,7 +378,6 @@
ASSERT(Isolate::Current() == nullptr);
MessageStatus status = kOK;
bool run_end_callback = false;
- bool delete_me = false;
EndCallback end_callback = nullptr;
CallbackData callback_data = 0;
{
@@ -504,7 +490,6 @@
end_callback = end_callback_;
callback_data = callback_data_;
run_end_callback = end_callback_ != nullptr;
- delete_me = delete_me_;
}
// Clear task_running_ last. This allows other tasks to potentially start
@@ -516,20 +501,14 @@
// The handler may have been deleted by another thread here if it is a native
// message handler.
- // Message handlers either use delete_me or end_callback but not both.
- ASSERT(!delete_me || !run_end_callback);
-
if (run_end_callback) {
ASSERT(end_callback != nullptr);
end_callback(callback_data);
// The handler may have been deleted after this point.
}
- if (delete_me) {
- delete this;
- }
}
-void MessageHandler::ClosePort(Dart_Port port) {
+void MessageHandler::OnPortClosed(Dart_Port port) {
if (FLAG_trace_isolates) {
MonitorLocker ml(&monitor_);
OS::PrintErr(
@@ -540,7 +519,7 @@
}
}
-void MessageHandler::CloseAllPorts() {
+void MessageHandler::OnAllPortsClosed() {
MonitorLocker ml(&monitor_);
if (FLAG_trace_isolates) {
OS::PrintErr(
@@ -552,20 +531,6 @@
oob_queue_->Clear();
}
-void MessageHandler::RequestDeletion() {
- {
- MonitorLocker ml(&monitor_);
- if (task_running_) {
- // This message handler currently has a task running on the thread pool.
- delete_me_ = true;
- return;
- }
- }
-
- // This message handler has no current task. Delete it.
- delete this;
-}
-
#if !defined(PRODUCT)
void MessageHandler::DebugDump() {
PortMap::DebugDumpForMessageHandler(this);
diff --git a/runtime/vm/message_handler.h b/runtime/vm/message_handler.h
index 229ba93..b0f5061 100644
--- a/runtime/vm/message_handler.h
+++ b/runtime/vm/message_handler.h
@@ -11,13 +11,14 @@
#include "vm/lockers.h"
#include "vm/message.h"
#include "vm/os_thread.h"
+#include "vm/port.h"
#include "vm/port_set.h"
#include "vm/thread_pool.h"
namespace dart {
// A MessageHandler is an entity capable of accepting messages.
-class MessageHandler {
+class MessageHandler : public PortHandler {
protected:
MessageHandler();
@@ -31,9 +32,6 @@
virtual ~MessageHandler();
- // Allow subclasses to provide a handler name.
- virtual const char* name() const;
-
typedef uword CallbackData;
typedef MessageStatus (*StartCallback)(CallbackData data);
typedef void (*EndCallback)(CallbackData data);
@@ -85,15 +83,7 @@
bool HasMessages();
// Whether to keep this message handler alive or whether it should shutdown.
- virtual bool KeepAliveLocked() {
- // By default we keep alive until the message handler was asked to shutdown
- // via [RequestDeletion].
- return !delete_me_;
- }
-
- // Requests deletion of this message handler when the next task
- // completes.
- void RequestDeletion();
+ virtual bool KeepAliveLocked() { return true; }
bool paused() const { return paused_ > 0; }
@@ -161,39 +151,7 @@
friend class MessageHandler;
};
-#if defined(DEBUG)
- // Check that it is safe to access this message handler.
- //
- // For example, if this MessageHandler is an isolate, then it is
- // only safe to access it when the MessageHandler is the current
- // isolate.
- virtual void CheckAccess() const;
-#endif
-
protected:
- // ------------ START PortMap API ------------
- // These functions should only be called from the PortMap.
-
- // Does this message handler correspond to the current isolate?
- virtual bool IsCurrentIsolate() const { return false; }
-
- // Return Isolate to which this message handler corresponds to.
- virtual Isolate* isolate() const { return nullptr; }
-
- // Posts a message on this handler's message queue.
- // If before_events is true, then the message is enqueued before any pending
- // events, but after any pending isolate library events.
- void PostMessage(std::unique_ptr<Message> message,
- bool before_events = false);
-
- // Notifies this handler that a port is being closed.
- void ClosePort(Dart_Port port);
-
- // Notifies this handler that all ports are being closed.
- void CloseAllPorts();
-
- // ------------ END PortMap API ------------
-
// Custom message notification. Optionally provided by subclass.
virtual void MessageNotify(Message::Priority priority);
@@ -208,6 +166,12 @@
// TODO(iposva): Set a local field before entering MessageHandler methods.
Thread* thread() const { return Thread::Current(); }
+ // Posts a message on this handler's message queue.
+ // If before_events is true, then the message is enqueued before any pending
+ // events, but after any pending isolate library events.
+ void PostMessage(std::unique_ptr<Message> message,
+ bool before_events = false) override;
+
private:
template <typename GCVisitorType>
friend void MournFinalizerEntry(GCVisitorType*, FinalizerEntryPtr);
@@ -215,7 +179,28 @@
friend class MessageHandlerTestPeer;
friend class MessageHandlerTask;
- struct PortSetEntry : public PortSet<PortSetEntry>::Entry {};
+ // ------------ START PortMap API ------------
+ // These functions should only be called from the PortMap.
+ // Implementaion of PortHandler API.
+
+ const char* name() const override;
+
+ void OnPortClosed(Dart_Port port) override;
+
+ void Shutdown() override {
+ // Nothing to do.
+ }
+
+ // Return Isolate to which this message handler corresponds to.
+ Isolate* isolate() const override { return nullptr; }
+
+ PortSet<PortSetEntry>* ports(PortMap::Locker& locker) override {
+ return &ports_;
+ }
+
+ // Notifies this handler that all ports are being closed.
+ void OnAllPortsClosed();
+ // ------------ END PortMap API ------------
// Called by MessageHandlerTask to process our task queue.
void TaskCallback();
@@ -254,8 +239,11 @@
// thread.
bool oob_message_handling_allowed_;
bool paused_for_messages_;
- PortSet<PortSetEntry>
- ports_; // Only accessed by [PortMap], protected by [PortMap]s lock.
+
+ // Only accessed by [PortMap], protected by [PortMap]s lock. See ports()
+ // getter.
+ PortSet<PortSetEntry> ports_;
+
intptr_t paused_; // The number of pause messages received.
#if !defined(PRODUCT)
bool should_pause_on_start_;
@@ -268,7 +256,6 @@
int64_t paused_timestamp_;
#endif
bool task_running_;
- bool delete_me_;
ThreadPool* pool_;
StartCallback start_callback_;
EndCallback end_callback_;
diff --git a/runtime/vm/message_handler_test.cc b/runtime/vm/message_handler_test.cc
index a6ebe56..7369dbd 100644
--- a/runtime/vm/message_handler_test.cc
+++ b/runtime/vm/message_handler_test.cc
@@ -18,8 +18,8 @@
void PostMessage(std::unique_ptr<Message> message) {
handler_->PostMessage(std::move(message));
}
- void ClosePort(Dart_Port port) { handler_->ClosePort(port); }
- void CloseAllPorts() { handler_->CloseAllPorts(); }
+ void OnPortClosed(Dart_Port port) { handler_->OnPortClosed(port); }
+ void OnAllPortsClosed() { handler_->OnAllPortsClosed(); }
MessageQueue* queue() const { return handler_->queue_; }
MessageQueue* oob_queue() const { return handler_->oob_queue_; }
@@ -193,7 +193,7 @@
}
// Delete all pending messages.
- handler_peer.CloseAllPorts();
+ handler_peer.OnAllPortsClosed();
}
VM_UNIT_TEST_CASE(MessageHandler_ClosePort) {
@@ -207,20 +207,20 @@
Message* raw_message2 = message.get();
handler_peer.PostMessage(std::move(message));
- handler_peer.ClosePort(1);
+ handler_peer.OnPortClosed(1);
// Closing the port does not drop the messages from the queue.
EXPECT(raw_message1 == handler_peer.queue()->Dequeue().get());
EXPECT(raw_message2 == handler_peer.queue()->Dequeue().get());
}
-VM_UNIT_TEST_CASE(MessageHandler_CloseAllPorts) {
+VM_UNIT_TEST_CASE(MessageHandler_OnAllPortsClosed) {
TestMessageHandler handler;
MessageHandlerTestPeer handler_peer(&handler);
handler_peer.PostMessage(BlankMessage(1, Message::kNormalPriority));
handler_peer.PostMessage(BlankMessage(2, Message::kNormalPriority));
- handler_peer.CloseAllPorts();
+ handler_peer.OnAllPortsClosed();
// All messages are dropped from the queue.
EXPECT(nullptr == handler_peer.queue()->Dequeue());
@@ -269,7 +269,7 @@
Dart_Port* ports = handler.port_buffer();
EXPECT_EQ(port2, ports[0]); // oob_message1, error
EXPECT_EQ(port3, ports[1]); // oob_message2, ok
- handler_peer.CloseAllPorts();
+ handler_peer.OnAllPortsClosed();
}
VM_UNIT_TEST_CASE(MessageHandler_HandleNextMessage_Shutdown) {
@@ -302,7 +302,7 @@
MessageHandler::AcquiredQueues aq(&handler);
EXPECT(aq.oob_queue()->Length() == 0);
}
- handler_peer.CloseAllPorts();
+ handler_peer.OnAllPortsClosed();
}
VM_UNIT_TEST_CASE(MessageHandler_HandleOOBMessages) {
@@ -323,7 +323,7 @@
Dart_Port* ports = handler.port_buffer();
EXPECT_EQ(port3, ports[0]);
EXPECT_EQ(port4, ports[1]);
- handler_peer.CloseAllPorts();
+ handler_peer.OnAllPortsClosed();
}
struct ThreadStartInfo {
diff --git a/runtime/vm/native_api_impl.cc b/runtime/vm/native_api_impl.cc
index 2e7602a..f42bac4 100644
--- a/runtime/vm/native_api_impl.cc
+++ b/runtime/vm/native_api_impl.cc
@@ -74,6 +74,13 @@
DART_EXPORT Dart_Port Dart_NewNativePort(const char* name,
Dart_NativeMessageHandler handler,
bool handle_concurrently) {
+ return Dart_NewConcurrentNativePort(name, handler, /*max_concurrency=*/1);
+}
+
+DART_EXPORT Dart_Port
+Dart_NewConcurrentNativePort(const char* name,
+ Dart_NativeMessageHandler handler,
+ intptr_t max_concurrency) {
if (name == nullptr) {
name = "<UnnamedNativePort>";
}
@@ -88,15 +95,9 @@
// Start the native port without a current isolate.
IsolateLeaveScope saver(Isolate::Current());
- NativeMessageHandler* nmh = new NativeMessageHandler(name, handler);
+ NativeMessageHandler* nmh =
+ new NativeMessageHandler(name, handler, max_concurrency);
Dart_Port port_id = PortMap::CreatePort(nmh);
- if (port_id != ILLEGAL_PORT) {
- if (!nmh->Run(Dart::thread_pool(), nullptr, nullptr, 0)) {
- PortMap::ClosePort(port_id);
- nmh->RequestDeletion();
- port_id = ILLEGAL_PORT;
- }
- }
Dart::ResetActiveApiCall();
return port_id;
}
@@ -105,10 +106,11 @@
// Close the native port without a current isolate.
IsolateLeaveScope saver(Isolate::Current());
- MessageHandler* handler = nullptr;
+ PortHandler* handler = nullptr;
const bool was_closed = PortMap::ClosePort(native_port_id, &handler);
if (was_closed) {
- handler->RequestDeletion();
+ NativeMessageHandler::RequestDeletion(
+ static_cast<NativeMessageHandler*>(handler));
}
return was_closed;
}
diff --git a/runtime/vm/native_message_handler.cc b/runtime/vm/native_message_handler.cc
index a4906c5..a0160c3 100644
--- a/runtime/vm/native_message_handler.cc
+++ b/runtime/vm/native_message_handler.cc
@@ -5,6 +5,7 @@
#include "vm/native_message_handler.h"
#include <memory>
+#include <utility>
#include "vm/dart_api_message.h"
#include "vm/isolate.h"
@@ -15,12 +16,11 @@
namespace dart {
NativeMessageHandler::NativeMessageHandler(const char* name,
- Dart_NativeMessageHandler func)
- : name_(Utils::StrDup(name)), func_(func) {}
+ Dart_NativeMessageHandler func,
+ intptr_t max_concurrency)
+ : name_(Utils::StrDup(name)), func_(func), pool_(max_concurrency) {}
-NativeMessageHandler::~NativeMessageHandler() {
- free(name_);
-}
+NativeMessageHandler::~NativeMessageHandler() {}
#if defined(DEBUG)
void NativeMessageHandler::CheckAccess() const {
@@ -28,19 +28,44 @@
}
#endif
-MessageHandler::MessageStatus NativeMessageHandler::HandleMessage(
- std::unique_ptr<Message> message) {
+namespace {
+class HandleMessage : public ThreadPool::Task {
+ public:
+ HandleMessage(Dart_NativeMessageHandler handler,
+ std::unique_ptr<Message> message)
+ : handler_(handler), message_(std::move(message)) {
+ ASSERT(handler != nullptr);
+ }
+
+ virtual void Run() {
+ ApiNativeScope scope;
+ Dart_CObject* object = ReadApiMessage(scope.zone(), message_.get());
+ handler_(message_->dest_port(), object);
+ }
+
+ private:
+ Dart_NativeMessageHandler handler_;
+ std::unique_ptr<Message> message_;
+
+ DISALLOW_COPY_AND_ASSIGN(HandleMessage);
+};
+} // namespace
+
+void NativeMessageHandler::PostMessage(std::unique_ptr<Message> message,
+ bool before_events /* = false */) {
if (message->IsOOB()) {
- // We currently do not use OOB messages for native ports.
UNREACHABLE();
}
- // We create a native scope for handling the message.
- // All allocation of objects for decoding the message is done in the
- // zone associated with this scope.
- ApiNativeScope scope;
- Dart_CObject* object = ReadApiMessage(scope.zone(), message.get());
- (*func())(message->dest_port(), object);
- return kOK;
+
+ pool_.Run<HandleMessage>(func_, std::move(message));
+}
+
+void NativeMessageHandler::RequestDeletion(NativeMessageHandler* handler) {
+ ThreadPool::RequestShutdown(&handler->pool_, [handler]() { delete handler; });
+}
+
+void NativeMessageHandler::Shutdown() {
+ pool_.Shutdown();
}
} // namespace dart
diff --git a/runtime/vm/native_message_handler.h b/runtime/vm/native_message_handler.h
index f2cd91a..4c27b1f 100644
--- a/runtime/vm/native_message_handler.h
+++ b/runtime/vm/native_message_handler.h
@@ -14,25 +14,54 @@
namespace dart {
// A NativeMessageHandler accepts messages and dispatches them to
-// native C handlers.
-class NativeMessageHandler : public MessageHandler {
+// native C handlers on worker threads. It will spawn up to
+// |max_concurrency| worker threads which will handle incomming messages
+// concurrently.
+class NativeMessageHandler final : public PortHandler {
public:
- NativeMessageHandler(const char* name, Dart_NativeMessageHandler func);
- ~NativeMessageHandler();
+ NativeMessageHandler(const char* name,
+ Dart_NativeMessageHandler func,
+ intptr_t max_concurrency);
- const char* name() const { return name_; }
+ ~NativeMessageHandler() override;
+
+ const char* name() const override { return name_.get(); }
Dart_NativeMessageHandler func() const { return func_; }
- MessageStatus HandleMessage(std::unique_ptr<Message> message);
-
#if defined(DEBUG)
// Check that it is safe to access this handler.
- void CheckAccess() const;
+ void CheckAccess() const override;
#endif
+ void OnPortClosed(Dart_Port port) override {}
+
+ Isolate* isolate() const override { return nullptr; }
+
+ // Posts a message on this handler's message queue.
+ // If before_events is true, then the message is enqueued before any pending
+ // events, but after any pending isolate library events.
+ void PostMessage(std::unique_ptr<Message> message,
+ bool before_events = false) override;
+
+ // Request deletion of the given handler once it is down with the currently
+ // running Dart_NativeMessageHandler callbacks. No new callbacks will be
+ // scheduled after this call.
+ //
+ // Note: |handler| might be deleted synchronously if no callback is running,
+ // or it can be deleted later on a worker thread.
+ static void RequestDeletion(NativeMessageHandler* handler);
+
+ void Shutdown() override;
+
private:
- char* name_;
- Dart_NativeMessageHandler func_;
+ PortSet<PortSetEntry>* ports(PortMap::Locker& locker) override {
+ return nullptr;
+ }
+
+ CStringUniquePtr name_;
+ const Dart_NativeMessageHandler func_;
+
+ ThreadPool pool_;
};
} // namespace dart
diff --git a/runtime/vm/os_thread.cc b/runtime/vm/os_thread.cc
index d020c8d..d53ce55 100644
--- a/runtime/vm/os_thread.cc
+++ b/runtime/vm/os_thread.cc
@@ -240,6 +240,10 @@
return false;
}
+bool OSThread::CanCreateOSThreads() {
+ return creation_enabled_;
+}
+
void OSThread::DisableOSThreadCreation() {
MutexLocker ml(thread_list_lock_);
creation_enabled_ = false;
diff --git a/runtime/vm/os_thread.h b/runtime/vm/os_thread.h
index 0fc7170..4f2aff3 100644
--- a/runtime/vm/os_thread.h
+++ b/runtime/vm/os_thread.h
@@ -200,6 +200,7 @@
static void SetThreadLocal(ThreadLocalKey key, uword value);
static intptr_t GetMaxStackSize();
static void Join(ThreadJoinId id);
+ static void Detach(ThreadJoinId id);
static intptr_t ThreadIdToIntPtr(ThreadId id);
static ThreadId ThreadIdFromIntPtr(intptr_t id);
@@ -213,6 +214,7 @@
static bool IsThreadInList(ThreadId id);
static void DisableOSThreadCreation();
+ static bool CanCreateOSThreads();
static void EnableOSThreadCreation();
static constexpr intptr_t kStackSizeBufferMax = (16 * KB * kWordSize);
diff --git a/runtime/vm/os_thread_absl.cc b/runtime/vm/os_thread_absl.cc
index e951b1d..ea2b042 100644
--- a/runtime/vm/os_thread_absl.cc
+++ b/runtime/vm/os_thread_absl.cc
@@ -216,6 +216,11 @@
ASSERT(result == 0);
}
+void OSThread::Detach(ThreadJoinId id) {
+ int result = pthread_detach(id);
+ VALIDATE_PTHREAD_RESULT(result);
+}
+
intptr_t OSThread::ThreadIdToIntPtr(ThreadId id) {
COMPILE_ASSERT(sizeof(id) <= sizeof(intptr_t));
#if defined(DART_HOST_OS_ANDROID) || defined(DART_HOST_OS_LINUX)
diff --git a/runtime/vm/os_thread_android.cc b/runtime/vm/os_thread_android.cc
index 495eef6..d1e4b70 100644
--- a/runtime/vm/os_thread_android.cc
+++ b/runtime/vm/os_thread_android.cc
@@ -182,6 +182,11 @@
VALIDATE_PTHREAD_RESULT(result);
}
+void OSThread::Detach(ThreadJoinId id) {
+ int result = pthread_detach(id);
+ VALIDATE_PTHREAD_RESULT(result);
+}
+
intptr_t OSThread::ThreadIdToIntPtr(ThreadId id) {
COMPILE_ASSERT(sizeof(id) <= sizeof(intptr_t));
return static_cast<intptr_t>(id);
diff --git a/runtime/vm/os_thread_fuchsia.cc b/runtime/vm/os_thread_fuchsia.cc
index 9d335c5..c1594dc7 100644
--- a/runtime/vm/os_thread_fuchsia.cc
+++ b/runtime/vm/os_thread_fuchsia.cc
@@ -152,6 +152,11 @@
VALIDATE_PTHREAD_RESULT(result);
}
+void OSThread::Detach(ThreadJoinId id) {
+ int result = pthread_detach(id);
+ VALIDATE_PTHREAD_RESULT(result);
+}
+
intptr_t OSThread::ThreadIdToIntPtr(ThreadId id) {
COMPILE_ASSERT(sizeof(id) <= sizeof(intptr_t));
return static_cast<intptr_t>(id);
diff --git a/runtime/vm/os_thread_linux.cc b/runtime/vm/os_thread_linux.cc
index 793a6c3..e0189cc 100644
--- a/runtime/vm/os_thread_linux.cc
+++ b/runtime/vm/os_thread_linux.cc
@@ -181,6 +181,11 @@
VALIDATE_PTHREAD_RESULT(result);
}
+void OSThread::Detach(ThreadJoinId id) {
+ int result = pthread_detach(id);
+ VALIDATE_PTHREAD_RESULT(result);
+}
+
intptr_t OSThread::ThreadIdToIntPtr(ThreadId id) {
COMPILE_ASSERT(sizeof(id) <= sizeof(intptr_t));
return static_cast<intptr_t>(id);
diff --git a/runtime/vm/os_thread_macos.cc b/runtime/vm/os_thread_macos.cc
index 9ba11a8..8b887ba 100644
--- a/runtime/vm/os_thread_macos.cc
+++ b/runtime/vm/os_thread_macos.cc
@@ -178,6 +178,11 @@
VALIDATE_PTHREAD_RESULT(result);
}
+void OSThread::Detach(ThreadJoinId id) {
+ int result = pthread_detach(id);
+ VALIDATE_PTHREAD_RESULT(result);
+}
+
intptr_t OSThread::ThreadIdToIntPtr(ThreadId id) {
COMPILE_ASSERT(sizeof(id) <= sizeof(intptr_t));
return reinterpret_cast<intptr_t>(id);
diff --git a/runtime/vm/os_thread_win.cc b/runtime/vm/os_thread_win.cc
index 11766f7..5b2b090 100644
--- a/runtime/vm/os_thread_win.cc
+++ b/runtime/vm/os_thread_win.cc
@@ -159,6 +159,12 @@
ASSERT(res == WAIT_OBJECT_0);
}
+void OSThread::Detach(ThreadJoinId id) {
+ HANDLE handle = static_cast<HANDLE>(id);
+ ASSERT(handle != nullptr);
+ CloseHandle(handle);
+}
+
intptr_t OSThread::ThreadIdToIntPtr(ThreadId id) {
COMPILE_ASSERT(sizeof(id) <= sizeof(intptr_t));
return static_cast<intptr_t>(id);
diff --git a/runtime/vm/port.cc b/runtime/vm/port.cc
index 139f4dc..a6d0375 100644
--- a/runtime/vm/port.cc
+++ b/runtime/vm/port.cc
@@ -52,9 +52,9 @@
return result;
}
-Dart_Port PortMap::CreatePort(MessageHandler* handler) {
+Dart_Port PortMap::CreatePort(PortHandler* handler) {
ASSERT(handler != nullptr);
- MutexLocker ml(mutex_);
+ PortMap::Locker ml;
if (ports_ == nullptr) {
return ILLEGAL_PORT;
}
@@ -64,35 +64,28 @@
#endif
const Dart_Port port = AllocatePort();
-
- // The MessageHandler::ports_ is only accessed by [PortMap], it is guarded
- // by the [PortMap::mutex_] we already hold.
- MessageHandler::PortSetEntry isolate_entry;
- isolate_entry.port = port;
- handler->ports_.Insert(isolate_entry);
-
- Entry entry;
- entry.port = port;
- entry.handler = handler;
- ports_->Insert(entry);
+ if (auto ports = handler->ports(ml)) {
+ ports->Insert(PortHandler::PortSetEntry{port});
+ }
+ ports_->Insert(Entry{port, handler});
if (FLAG_trace_isolates) {
OS::PrintErr(
"[+] Opening port: \n"
"\thandler: %s\n"
"\tport: %" Pd64 "\n",
- handler->name(), entry.port);
+ handler->name(), port);
}
- return entry.port;
+ return port;
}
-bool PortMap::ClosePort(Dart_Port port, MessageHandler** message_handler) {
- if (message_handler != nullptr) *message_handler = nullptr;
+bool PortMap::ClosePort(Dart_Port port, PortHandler** port_handler) {
+ if (port_handler != nullptr) *port_handler = nullptr;
- MessageHandler* handler = nullptr;
+ PortHandler* handler = nullptr;
{
- MutexLocker ml(mutex_);
+ PortMap::Locker ml;
if (ports_ == nullptr) {
return false;
}
@@ -108,33 +101,33 @@
handler->CheckAccess();
#endif
- // Delete the port entry before releasing the lock to avoid holding the lock
- // while flushing the messages below.
it.Delete();
ports_->Rebalance();
- // The MessageHandler::ports_ is only accessed by [PortMap], it is guarded
- // by the [PortMap::mutex_] we already hold.
- auto isolate_it = handler->ports_.TryLookup(port);
- ASSERT(isolate_it != handler->ports_.end());
- isolate_it.Delete();
- handler->ports_.Rebalance();
+ if (auto ports = handler->ports(ml)) {
+ auto isolate_it = ports->TryLookup(port);
+ ASSERT(isolate_it != ports->end());
+ isolate_it.Delete();
+ ports->Rebalance();
+ }
}
- handler->ClosePort(port);
- if (message_handler != nullptr) *message_handler = handler;
+ handler->OnPortClosed(port);
+ if (port_handler != nullptr) *port_handler = handler;
return true;
}
void PortMap::ClosePorts(MessageHandler* handler) {
{
- MutexLocker ml(mutex_);
+ PortMap::Locker ml;
if (ports_ == nullptr) {
return;
}
- // The MessageHandler::ports_ is only accessed by [PortMap], it is guarded
- // by the [PortMap::mutex_] we already hold.
- for (auto isolate_it = handler->ports_.begin();
- isolate_it != handler->ports_.end(); ++isolate_it) {
+
+ auto ports = handler->ports(ml);
+ ASSERT(ports != nullptr);
+
+ for (auto isolate_it = ports->begin(); isolate_it != ports->end();
+ ++isolate_it) {
auto it = ports_->TryLookup((*isolate_it).port);
ASSERT(it != ports_->end());
Entry entry = *it;
@@ -143,10 +136,10 @@
it.Delete();
isolate_it.Delete();
}
- ASSERT(handler->ports_.IsEmpty());
+ ASSERT(ports->IsEmpty());
ports_->Rebalance();
}
- handler->CloseAllPorts();
+ handler->OnAllPortsClosed();
}
bool PortMap::PostMessage(std::unique_ptr<Message> message,
@@ -161,7 +154,7 @@
message->DropFinalizers();
return false;
}
- MessageHandler* handler = (*it).handler;
+ auto handler = (*it).handler;
ASSERT(handler != nullptr);
handler->PostMessage(std::move(message), before_events);
return true;
@@ -189,7 +182,7 @@
return nullptr;
}
- MessageHandler* handler = (*it).handler;
+ auto handler = (*it).handler;
return handler->isolate();
}
@@ -204,7 +197,7 @@
return ILLEGAL_PORT;
}
- MessageHandler* handler = (*it).handler;
+ auto handler = (*it).handler;
Isolate* isolate = handler->isolate();
if (isolate == nullptr) {
// Message handler is a native port instead of an isolate.
@@ -258,6 +251,13 @@
}
}
+void PortMap::Shutdown() {
+ // Tell all handlers which are running their own thread pools to shutdown.
+ for (auto& entry : *ports_) {
+ entry.handler->Shutdown();
+ }
+}
+
void PortMap::Cleanup() {
ASSERT(ports_ != nullptr);
ASSERT(prng_ != nullptr);
@@ -317,4 +317,12 @@
}
}
+PortHandler::~PortHandler() {}
+
+#if defined(DEBUG)
+void PortHandler::CheckAccess() const {
+ // By default there is no checking.
+}
+#endif
+
} // namespace dart
diff --git a/runtime/vm/port.h b/runtime/vm/port.h
index e5afe8f..8e94e35 100644
--- a/runtime/vm/port.h
+++ b/runtime/vm/port.h
@@ -11,6 +11,7 @@
#include "vm/allocation.h"
#include "vm/globals.h"
#include "vm/json_stream.h"
+#include "vm/lockers.h"
#include "vm/port_set.h"
#include "vm/random.h"
@@ -20,17 +21,17 @@
class Message;
class MessageHandler;
class Mutex;
+class PortHandler;
class PortMap : public AllStatic {
public:
// Allocate a port for the provided handler and return its VM-global id.
- static Dart_Port CreatePort(MessageHandler* handler);
+ static Dart_Port CreatePort(PortHandler* handler);
// Close the port with id. All pending messages will be dropped.
//
// Returns true if the port is successfully closed.
- static bool ClosePort(Dart_Port id,
- MessageHandler** message_handler = nullptr);
+ static bool ClosePort(Dart_Port id, PortHandler** port_handler = nullptr);
// Close all the ports for the provided handler.
static void ClosePorts(MessageHandler* handler);
@@ -58,6 +59,7 @@
IsolateGroup* group);
static void Init();
+ static void Shutdown();
static void Cleanup();
static void PrintPortsForMessageHandler(MessageHandler* handler,
@@ -65,11 +67,18 @@
static void DebugDumpForMessageHandler(MessageHandler* handler);
+ class Locker : public MutexLocker {
+ public:
+ Locker() : MutexLocker(PortMap::mutex_) {}
+ };
+
private:
struct Entry : public PortSet<Entry>::Entry {
Entry() : handler(nullptr) {}
+ Entry(Dart_Port port, PortHandler* handler)
+ : PortSet<Entry>::Entry(port), handler(handler) {}
- MessageHandler* handler;
+ PortHandler* handler;
};
// Allocate a new unique port.
@@ -83,6 +92,54 @@
static Random* prng_;
};
+// An object handling messages dispatched to one or more ports in the |PortMap|.
+class PortHandler {
+ public:
+ virtual ~PortHandler();
+
+ virtual const char* name() const = 0;
+
+ // Notify the handler that a port previously associated with it is
+ // now closed.
+ virtual void OnPortClosed(Dart_Port port) = 0;
+
+#if defined(DEBUG)
+ // Check that it is safe to access this port handler.
+ //
+ // For example, if this |PortHandler| is an isolate, then it is
+ // only safe to access it when it is the current isolate.
+ virtual void CheckAccess() const;
+#endif
+
+ // Return Isolate to which this message handler corresponds to.
+ virtual Isolate* isolate() const = 0;
+
+ // Ask the handler to shutdown, e.g. stop associated thread pools if any.
+ virtual void Shutdown() = 0;
+
+ // Posts a message on this handler's message queue.
+ // If before_events is true, then the message is enqueued before any pending
+ // events, but after any pending isolate library events.
+ virtual void PostMessage(std::unique_ptr<Message> message,
+ bool before_events = false) = 0;
+
+ protected:
+ struct PortSetEntry : public PortSet<PortSetEntry>::Entry {
+ PortSetEntry() : Entry() {}
+ explicit PortSetEntry(Dart_Port port) : Entry(port) {}
+ };
+
+ private:
+ friend class PortMap;
+
+ // Returns set of ports associate with this handler if
+ // handler supports multiple ports or |nullptr| otherwise.
+ //
+ // Only |PortMap| is expected to call this method under locked
+ // PortMap::mutex_.
+ virtual PortSet<PortSetEntry>* ports(PortMap::Locker& locker) = 0;
+};
+
} // namespace dart
#endif // RUNTIME_VM_PORT_H_
diff --git a/runtime/vm/port_set.h b/runtime/vm/port_set.h
index 5809f6a..48e66cc 100644
--- a/runtime/vm/port_set.h
+++ b/runtime/vm/port_set.h
@@ -21,6 +21,7 @@
struct Entry : public MallocAllocated {
Entry() : port(kFreePort) {}
+ explicit Entry(Dart_Port port) : port(port) {}
// Free entries have set this to 0.
Dart_Port port;
diff --git a/runtime/vm/thread_pool.cc b/runtime/vm/thread_pool.cc
index 28ae62f..3cf5fab 100644
--- a/runtime/vm/thread_pool.cc
+++ b/runtime/vm/thread_pool.cc
@@ -41,23 +41,59 @@
Shutdown();
}
-void ThreadPool::Shutdown() {
- {
- MutexLocker ml(&pool_mutex_);
+void ThreadPool::RequestWorkersToShutdown() {
+ MutexLocker ml(&pool_mutex_);
- // Prevent scheduling of new tasks.
- shutting_down_ = true;
+ // If we are just starting to shutdown threads then this should be done
+ // before OSThread::DisableOSThreadCreation is called. If |OSThread| creation
+ // is disabled after |Worker::StartThread| is called but before
+ // |ThreadPool::Worker::Main| is called then a worker will be stuck in the
+ // state idle but will never properly start and thus will never transition to
+ // dead - leading to a deadlock.
+ RELEASE_ASSERT(shutting_down_ || OSThread::CanCreateOSThreads());
- if (running_workers_.IsEmpty() && idle_workers_.IsEmpty()) {
- // All workers have already died.
- all_workers_dead_ = true;
- } else {
- // Tell all idling workers to drain remaining work and then shut down.
- for (auto worker : idle_workers_) {
- worker->Wakeup();
- }
+ // Prevent scheduling of new tasks.
+ shutting_down_ = true;
+
+ if (running_workers_.IsEmpty() && idle_workers_.IsEmpty()) {
+ // All workers have already died.
+ all_workers_dead_ = true;
+ } else {
+ // Tell all idling workers to drain remaining work and then shut down.
+ for (auto worker : idle_workers_) {
+ worker->Wakeup();
}
}
+}
+
+void ThreadPool::RequestShutdown(
+ ThreadPool* pool,
+ std::function<void(void)>&& shutdown_complete) {
+ pool->RequestWorkersToShutdown();
+
+ {
+ MonitorLocker eml(&pool->exit_monitor_);
+ if (!pool->all_workers_dead_) {
+ // Workers are still doing some work. Mark this pool for asynchronous
+ // deletion. When the last worker finishes it will delete itself and
+ // call shutdown_complete.
+ pool->shutdown_complete_callback_ = std::move(shutdown_complete);
+ return;
+ }
+
+ // Threads are in the process of exiting already and there is no way to ask
+ // them to do additional cleanup asynchronously. We will just join the
+ // last dead worker and delete it synchronously.
+ }
+ pool->DeleteLastDeadWorker();
+ shutdown_complete();
+}
+
+void ThreadPool::Shutdown() {
+ // Should not combine |Shutdown| and |RequestShutdown| on the same pool.
+ ASSERT(shutdown_complete_callback_ == nullptr);
+
+ RequestWorkersToShutdown();
// Wait until all workers are dead. Any new death will notify the exit
// monitor.
@@ -67,20 +103,18 @@
eml.Wait();
}
}
+
+ DeleteLastDeadWorker();
+}
+
+void ThreadPool::DeleteLastDeadWorker() {
+ ASSERT(all_workers_dead_);
ASSERT(count_idle_ == 0);
ASSERT(count_running_ == 0);
ASSERT(idle_workers_.IsEmpty());
ASSERT(running_workers_.IsEmpty());
-
- WorkerList dead_workers_to_join;
- {
- MutexLocker ml(&pool_mutex_);
- ObtainDeadWorkersLocked(&dead_workers_to_join);
- }
- JoinDeadWorkersLocked(&dead_workers_to_join);
-
- ASSERT(count_dead_ == 0);
- ASSERT(dead_workers_.IsEmpty());
+ JoinDeadWorker(last_dead_worker_);
+ last_dead_worker_ = nullptr;
}
bool ThreadPool::RunImpl(std::unique_ptr<Task> task) {
@@ -156,7 +190,7 @@
}
void ThreadPool::WorkerLoop(Worker* worker) {
- WorkerList dead_workers_to_join;
+ Worker* previous_dead_worker = nullptr;
while (true) {
MutexLocker ml(&pool_mutex_);
@@ -165,7 +199,6 @@
IdleToRunningLocked(worker);
while (!tasks_.IsEmpty()) {
auto task = TakeNextAvailableTaskLocked();
-
MutexUnlocker mls(&ml);
task->Run();
ASSERT(Isolate::Current() == nullptr);
@@ -183,8 +216,7 @@
}
if (shutting_down_) {
- ObtainDeadWorkersLocked(&dead_workers_to_join);
- IdleToDeadLocked(worker);
+ previous_dead_worker = IdleToDeadLocked(worker);
break;
}
@@ -203,17 +235,16 @@
}
}
if (done) {
- ObtainDeadWorkersLocked(&dead_workers_to_join);
- IdleToDeadLocked(worker);
+ previous_dead_worker = IdleToDeadLocked(worker);
break;
}
}
- // Before we transitioned to dead we obtained the list of previously died dead
- // workers, which we join here. Since every death of a worker will join
- // previously died workers, we keep the pending non-joined [dead_workers_] to
- // effectively 1.
- JoinDeadWorkersLocked(&dead_workers_to_join);
+ // |IdleToDeadLocked| obtained the worker which died before us, which we will
+ // join here. Since every dead worker will join the previous one, all dead
+ // workers effectively form a chain and it is enough to join the worker which
+ // died last to join all workers which died before it.
+ JoinDeadWorker(previous_dead_worker);
}
void ThreadPool::IdleToRunningLocked(Worker* worker) {
@@ -234,14 +265,14 @@
count_idle_++;
}
-void ThreadPool::IdleToDeadLocked(Worker* worker) {
+ThreadPool::Worker* ThreadPool::IdleToDeadLocked(Worker* worker) {
ASSERT(tasks_.IsEmpty());
+ Worker* previous_dead = last_dead_worker_;
ASSERT(idle_workers_.ContainsForDebugging(worker));
idle_workers_.Remove(worker);
- dead_workers_.Append(worker);
+ last_dead_worker_ = worker;
count_idle_--;
- count_dead_++;
// Notify shutdown thread that the worker thread is about to finish.
if (shutting_down_) {
@@ -251,24 +282,15 @@
eml.Notify();
}
}
+
+ return previous_dead;
}
-void ThreadPool::ObtainDeadWorkersLocked(WorkerList* dead_workers_to_join) {
- dead_workers_to_join->AppendList(&dead_workers_);
- ASSERT(dead_workers_.IsEmpty());
- count_dead_ = 0;
-}
-
-void ThreadPool::JoinDeadWorkersLocked(WorkerList* dead_workers_to_join) {
- auto it = dead_workers_to_join->begin();
- while (it != dead_workers_to_join->end()) {
- Worker* worker = *it;
- it = dead_workers_to_join->Erase(it);
-
+void ThreadPool::JoinDeadWorker(Worker* worker) {
+ if (worker != nullptr) {
OSThread::Join(worker->join_id_);
delete worker;
}
- ASSERT(dead_workers_to_join->IsEmpty());
}
ThreadPool::Worker* ThreadPool::ScheduleTaskLocked(std::unique_ptr<Task> task) {
@@ -353,6 +375,28 @@
if (exit_cb != nullptr) {
exit_cb();
}
+
+ ThreadPool::WorkerThreadExit(pool, worker);
+}
+
+void ThreadPool::WorkerThreadExit(ThreadPool* pool, Worker* worker) {
+ if (pool->shutdown_complete_callback_ != nullptr && pool->all_workers_dead_ &&
+ pool->last_dead_worker_ == worker) {
+ // Asynchronous shutdown was requested and this is the last exiting worker.
+ // It needs to delete itself and notify the code which requested the
+ // shutdown that we are done.
+ // Start by detaching the thread (because nobody is going to join it) so
+ // that we don't keep any thread related data structures behind.
+ OSThread::Detach(worker->join_id_);
+ delete worker;
+ pool->last_dead_worker_ = nullptr;
+
+ // Run the callback. It might (and most likely will) delete |pool| so this
+ // should be the last time we touch the |pool| pointer.
+ auto callback = pool->shutdown_complete_callback_;
+ pool->shutdown_complete_callback_ = nullptr;
+ callback();
+ }
}
} // namespace dart
diff --git a/runtime/vm/thread_pool.h b/runtime/vm/thread_pool.h
index 74479ac..99fee30 100644
--- a/runtime/vm/thread_pool.h
+++ b/runtime/vm/thread_pool.h
@@ -5,6 +5,7 @@
#ifndef RUNTIME_VM_THREAD_POOL_H_
#define RUNTIME_VM_THREAD_POOL_H_
+#include <functional>
#include <memory>
#include <utility>
@@ -59,13 +60,25 @@
// to continue executing.
void MarkCurrentWorkerAsUnBlocked();
- // Triggers shutdown, prevents scheduling of new tasks.
+ // Triggers shutdown, prevents scheduling of new tasks and waits for all
+ // worker threads to exit.
+ //
+ // Existing tasks are executed to completion.
void Shutdown();
+ // Prevent scheduling of new tasks on |pool| and request it to shutdown
+ // after all currently running tasks finish. |shutdown_complete| will be
+ // invoked when shutdown is complete. This might happen synchronously
+ // if all workers are already stopped or on one of the worker threads.
+ //
+ // It is safe to delete |pool| from |shutdown_complete|.
+ static void RequestShutdown(ThreadPool* pool,
+ std::function<void(void)>&& shutdown_complete);
+
// Exposed for unit test in thread_pool_test.cc
uint64_t workers_started() const { return count_idle_ + count_running_; }
// Exposed for unit test in thread_pool_test.cc
- uint64_t workers_stopped() const { return count_dead_; }
+ bool has_pending_dead_worker() const { return last_dead_worker_ != nullptr; }
protected:
class Worker : public IntrusiveDListEntry<Worker> {
@@ -112,6 +125,8 @@
bool TasksWaitingToRunLocked() { return !tasks_.IsEmpty(); }
private:
+ static void WorkerThreadExit(ThreadPool* pool, ThreadPool::Worker* worker);
+
using TaskList = IntrusiveDList<Task>;
using WorkerList = IntrusiveDList<Worker>;
@@ -124,9 +139,14 @@
void IdleToRunningLocked(Worker* worker);
void RunningToIdleLocked(Worker* worker);
- void IdleToDeadLocked(Worker* worker);
- void ObtainDeadWorkersLocked(WorkerList* dead_workers_to_join);
- void JoinDeadWorkersLocked(WorkerList* dead_workers_to_join);
+ DART_WARN_UNUSED_RESULT Worker* IdleToDeadLocked(Worker* worker);
+ void JoinDeadWorker(Worker* worker);
+
+ Worker* TakeLastDeadWorker();
+
+ void RequestWorkersToShutdown();
+
+ void DeleteLastDeadWorker();
Mutex pool_mutex_;
bool shutting_down_ = false;
@@ -135,13 +155,19 @@
uint64_t count_dead_ = 0;
WorkerList running_workers_;
WorkerList idle_workers_;
- WorkerList dead_workers_;
+
+ Worker* last_dead_worker_ = nullptr;
+
uint64_t pending_tasks_ = 0;
TaskList tasks_;
Monitor exit_monitor_;
std::atomic<bool> all_workers_dead_;
+ // If asynchronous shutdown is requested then this callback will be
+ // invoked by the last exiting worker.
+ std::function<void(void)> shutdown_complete_callback_;
+
uintptr_t max_pool_size_ = 0;
DISALLOW_COPY_AND_ASSIGN(ThreadPool);
diff --git a/runtime/vm/thread_pool_test.cc b/runtime/vm/thread_pool_test.cc
index 7a6effd..4798d57 100644
--- a/runtime/vm/thread_pool_test.cc
+++ b/runtime/vm/thread_pool_test.cc
@@ -74,7 +74,7 @@
// Do a sanity test on the worker stats.
EXPECT_EQ(1U, thread_pool.workers_started());
- EXPECT_EQ(0U, thread_pool.workers_stopped());
+ EXPECT(!thread_pool.has_pending_dead_worker());
}
THREAD_POOL_UNIT_TEST_CASE(ThreadPool_RunMany) {
@@ -175,14 +175,15 @@
{
ThreadPool thread_pool;
EXPECT_EQ(0U, thread_pool.workers_started());
- EXPECT_EQ(0U, thread_pool.workers_stopped());
+ EXPECT(!thread_pool.has_pending_dead_worker());
// Run a worker.
Monitor sync;
bool done = true;
thread_pool.Run<TestTask>(&sync, &done);
EXPECT_EQ(1U, thread_pool.workers_started());
- EXPECT_EQ(0U, thread_pool.workers_stopped());
+ EXPECT(!thread_pool.has_pending_dead_worker());
+
{
MonitorLocker ml(&sync);
done = false;
@@ -196,11 +197,11 @@
// Wait up to 5 seconds to see if a worker times out.
const int kMaxWait = 5000;
int waited = 0;
- while (thread_pool.workers_stopped() == 0 && waited < kMaxWait) {
+ while (!thread_pool.has_pending_dead_worker() && waited < kMaxWait) {
OS::Sleep(1);
waited += 1;
}
- EXPECT_EQ(1U, thread_pool.workers_stopped());
+ EXPECT(thread_pool.has_pending_dead_worker());
}
FLAG_worker_timeout_millis = saved_timeout;
diff --git a/sdk/lib/_internal/vm/bin/io_service_patch.dart b/sdk/lib/_internal/vm/bin/io_service_patch.dart
index eae117a..7fd13b8 100644
--- a/sdk/lib/_internal/vm/bin/io_service_patch.dart
+++ b/sdk/lib/_internal/vm/bin/io_service_patch.dart
@@ -4,48 +4,13 @@
part of "common_patch.dart";
-class _IOServicePorts {
- // We limit the number of IO Service ports per isolate so that we don't
- // spawn too many threads all at once, which can crash the VM on Windows.
- static const int maxPorts = 32;
- final List<SendPort> _ports = [];
- final List<int> _useCounts = [];
- final List<int> _freePorts = [];
- final Map<int, int> _usedPorts = HashMap<int, int>();
-
- _IOServicePorts();
-
- SendPort _getPort(int forRequestId) {
- assert(!_usedPorts.containsKey(forRequestId));
- if (_freePorts.isEmpty && _ports.length < maxPorts) {
- final SendPort port = _newServicePort();
- _ports.add(port);
- _useCounts.add(0);
- _freePorts.add(_ports.length - 1);
- }
- // Use a free port if one exists.
- final index = _freePorts.isNotEmpty
- ? _freePorts.removeLast()
- : forRequestId % maxPorts;
- _usedPorts[forRequestId] = index;
- _useCounts[index]++;
- return _ports[index];
- }
-
- void _returnPort(int forRequestId) {
- final index = _usedPorts.remove(forRequestId)!;
- if (--_useCounts[index] == 0) {
- _freePorts.add(index);
- }
- }
-
- @pragma("vm:external-name", "IOService_NewServicePort")
- external static SendPort _newServicePort();
-}
+@pragma("vm:external-name", "IOService_NewServicePort")
+external SendPort _newServicePort();
@patch
class _IOService {
- static _IOServicePorts _servicePorts = new _IOServicePorts();
+ static final SendPort _port = _newServicePort();
+
static RawReceivePort? _receivePort;
static late SendPort _replyToPort;
static HashMap<int, Completer> _messageMap = new HashMap<int, Completer>();
@@ -59,10 +24,9 @@
} while (_messageMap.containsKey(id));
final Completer completer = new Completer();
try {
- final SendPort servicePort = _servicePorts._getPort(id);
_ensureInitialize();
_messageMap[id] = completer;
- servicePort.send(<dynamic>[id, _replyToPort, request, data]);
+ _port.send(<dynamic>[id, _replyToPort, request, data]);
} catch (error) {
_messageMap.remove(id)!.complete(error);
if (_messageMap.length == 0) {
@@ -79,7 +43,6 @@
_receivePort!.handler = (List<Object?> data) {
assert(data.length == 2);
_messageMap.remove(data[0])!.complete(data[1]);
- _servicePorts._returnPort(data[0] as int);
if (_messageMap.length == 0) {
_finalize();
}
diff --git a/tests/standalone/io/socket_finalizer_test.dart b/tests/standalone/io/socket_finalizer_test.dart
index 5dfca94..7662534 100644
--- a/tests/standalone/io/socket_finalizer_test.dart
+++ b/tests/standalone/io/socket_finalizer_test.dart
@@ -9,6 +9,7 @@
import 'dart:async';
import 'dart:io';
import 'dart:isolate';
+import 'dart:_internal'; // ignore: import_internal_library, unused_import
import "package:async_helper/async_helper.dart";
import "package:expect/expect.dart";
@@ -32,19 +33,19 @@
}, onError: (e) {
Expect.fail("Socket error $e");
});
- isolate.kill();
- // Cause a GC to collect the [socket] from [connectorIsolate].
- for (int i = 0; i < 100000; ++i) {
- produceGarbage();
- }
+ final port = ReceivePort();
+ port.listen((_) {
+ print("Isolate exited - triggering GC");
+ // Cause a GC to collect the [socket] from [connectorIsolate].
+ VMInternalsForTesting.collectAllGarbage(); // ignore: undefined_identifier
+ port.close();
+ });
+ isolate.addOnExitListener(port.sendPort);
+
+ isolate.kill();
});
await completer.future;
await server.close();
asyncEnd();
}
-
-@pragma('vm:never-inline')
-produceGarbage() => all.add(List.filled(1024, null));
-
-final all = [];