Version 2.15.0-200.0.dev
Merge commit '5893a301a1ea28c44426477c5a437358831ed055' into 'dev'
diff --git a/DEPS b/DEPS
index 7c8f2da..29c1159 100644
--- a/DEPS
+++ b/DEPS
@@ -74,7 +74,7 @@
# Revisions of /third_party/* dependencies.
"args_rev": "3b3f55766af13d895d2020ec001a28e8dc147f91",
- "async_rev": "c64220396e0fa2f7b380590abfedbd25635cd7a0",
+ "async_rev": "80886150a5e6c58006c8ae5a6c2aa7108638e2a9",
"bazel_worker_rev": "0885637b037979afbf5bcd05fd748b309fd669c0",
"benchmark_harness_rev": "c546dbd9f639f75cd2f75de8df2eb9f8ea15e8e7",
"boolean_selector_rev": "665e6921ab246569420376f827bff4585dff0b14",
diff --git a/runtime/bin/dartutils.cc b/runtime/bin/dartutils.cc
index 560c7cd..565753d 100644
--- a/runtime/bin/dartutils.cc
+++ b/runtime/bin/dartutils.cc
@@ -358,7 +358,7 @@
return str;
}
-Dart_Handle DartUtils::MakeUint8Array(const uint8_t* buffer, intptr_t len) {
+Dart_Handle DartUtils::MakeUint8Array(const void* buffer, intptr_t len) {
Dart_Handle array = Dart_NewTypedData(Dart_TypedData_kUint8, len);
RETURN_IF_ERROR(array);
{
diff --git a/runtime/bin/dartutils.h b/runtime/bin/dartutils.h
index 6fc2d3f..bbe0a1f 100644
--- a/runtime/bin/dartutils.h
+++ b/runtime/bin/dartutils.h
@@ -161,7 +161,7 @@
static void CloseFile(void* stream);
static bool EntropySource(uint8_t* buffer, intptr_t length);
static Dart_Handle ReadStringFromFile(const char* filename);
- static Dart_Handle MakeUint8Array(const uint8_t* buffer, intptr_t length);
+ static Dart_Handle MakeUint8Array(const void* buffer, intptr_t length);
static Dart_Handle PrepareForScriptLoading(bool is_service_isolate,
bool trace_loading);
static Dart_Handle SetupPackageConfig(const char* packages_file);
diff --git a/runtime/bin/file.cc b/runtime/bin/file.cc
index fad12cf..d7eda14 100644
--- a/runtime/bin/file.cc
+++ b/runtime/bin/file.cc
@@ -68,6 +68,10 @@
Dart_SetIntegerReturnValue(args, file_pointer);
}
+void FUNCTION_NAME(File_GetFD)(Dart_NativeArguments args) {
+ Dart_SetIntegerReturnValue(args, GetFile(args)->GetFD());
+}
+
static void ReleaseFile(void* isolate_callback_data, void* peer) {
File* file = reinterpret_cast<File*>(peer);
file->Release();
@@ -135,7 +139,7 @@
#if !defined(PRODUCT)
if (!IsFile(dart_this)) {
Dart_PropagateError(DartUtils::NewInternalError(
- "File_Close expects the reciever to be a _RandomAccessFileOpsImpl."));
+ "File_Close expects the receiver to be a _RandomAccessFileOpsImpl."));
}
#endif
File* file;
diff --git a/runtime/bin/file.h b/runtime/bin/file.h
index edeb388..f2d45e9 100644
--- a/runtime/bin/file.h
+++ b/runtime/bin/file.h
@@ -221,7 +221,8 @@
// (stdin, stout or stderr).
static File* OpenStdio(int fd);
-#if defined(DART_HOST_OS_FUCHSIA) || defined(DART_HOST_OS_LINUX)
+#if defined(DART_HOST_OS_FUCHSIA) || defined(DART_HOST_OS_LINUX) || \
+ defined(DART_HOST_OS_ANDROID) || defined(DART_HOST_OS_MACOS)
static File* OpenFD(int fd);
#endif
diff --git a/runtime/bin/file_android.cc b/runtime/bin/file_android.cc
index b1e9bb0..d766192 100644
--- a/runtime/bin/file_android.cc
+++ b/runtime/bin/file_android.cc
@@ -210,6 +210,10 @@
return NULL;
}
+File* File::OpenFD(int fd) {
+ return new File(new FileHandle(fd));
+}
+
File* File::Open(Namespace* namespc, const char* name, FileOpenMode mode) {
NamespaceScope ns(namespc, name);
// Report errors for non-regular files.
diff --git a/runtime/bin/file_macos.cc b/runtime/bin/file_macos.cc
index 8fa3cd7..894884a 100644
--- a/runtime/bin/file_macos.cc
+++ b/runtime/bin/file_macos.cc
@@ -254,6 +254,10 @@
return NULL;
}
+File* File::OpenFD(int fd) {
+ return new File(new FileHandle(fd));
+}
+
File* File::Open(Namespace* namespc, const char* name, FileOpenMode mode) {
// Report errors for non-regular files.
struct stat st;
diff --git a/runtime/bin/io_natives.cc b/runtime/bin/io_natives.cc
index 83503b5..efa37c8 100644
--- a/runtime/bin/io_natives.cc
+++ b/runtime/bin/io_natives.cc
@@ -45,6 +45,7 @@
V(File_Exists, 2) \
V(File_Flush, 1) \
V(File_GetPointer, 1) \
+ V(File_GetFD, 1) \
V(File_GetStdioHandleType, 1) \
V(File_GetType, 3) \
V(File_LastAccessed, 2) \
@@ -81,6 +82,10 @@
V(Filter_CreateZLibInflate, 4) \
V(Filter_Process, 4) \
V(Filter_Processed, 3) \
+ V(ResourceHandleImpl_toFile, 1) \
+ V(ResourceHandleImpl_toSocket, 1) \
+ V(ResourceHandleImpl_toRawSocket, 1) \
+ V(ResourceHandleImpl_toRawDatagramSocket, 1) \
V(InternetAddress_Parse, 1) \
V(InternetAddress_ParseScopedLinkLocalAddress, 1) \
V(InternetAddress_RawAddrToString, 1) \
@@ -147,6 +152,7 @@
V(Socket_GetPort, 1) \
V(Socket_GetRemotePeer, 1) \
V(Socket_GetError, 1) \
+ V(Socket_GetFD, 1) \
V(Socket_GetOption, 3) \
V(Socket_GetRawOption, 4) \
V(Socket_GetSocketId, 1) \
@@ -156,11 +162,15 @@
V(Socket_LeaveMulticast, 4) \
V(Socket_Read, 2) \
V(Socket_RecvFrom, 1) \
+ V(Socket_ReceiveMessage, 2) \
+ V(Socket_SendMessage, 5) \
V(Socket_SendTo, 6) \
V(Socket_SetOption, 4) \
V(Socket_SetRawOption, 4) \
V(Socket_SetSocketId, 3) \
V(Socket_WriteList, 4) \
+ V(SocketControlMessage_fromHandles, 2) \
+ V(SocketControlMessageImpl_extractHandles, 1) \
V(Stdin_ReadByte, 1) \
V(Stdin_GetEchoMode, 1) \
V(Stdin_SetEchoMode, 2) \
diff --git a/runtime/bin/socket.cc b/runtime/bin/socket.cc
index a781353..ac9d95d 100644
--- a/runtime/bin/socket.cc
+++ b/runtime/bin/socket.cc
@@ -12,6 +12,7 @@
#include "bin/lockers.h"
#include "bin/process.h"
#include "bin/thread.h"
+#include "bin/typed_data_utils.h"
#include "bin/utils.h"
#include "include/dart_api.h"
@@ -649,6 +650,59 @@
Dart_SetReturnValue(args, result);
}
+void FUNCTION_NAME(Socket_ReceiveMessage)(Dart_NativeArguments args) {
+ Socket* socket = Socket::GetSocketIdNativeField(
+ ThrowIfError(Dart_GetNativeArgument(args, 0)));
+ ASSERT(socket != nullptr);
+
+ int64_t buffer_num_bytes = 0;
+ DartUtils::GetInt64Value(ThrowIfError(Dart_GetNativeArgument(args, 1)),
+ &buffer_num_bytes);
+ uint8_t* buffer = nullptr;
+ Dart_Handle data = IOBuffer::Allocate(buffer_num_bytes, &buffer);
+ if (Dart_IsNull(data)) {
+ Dart_ThrowException(DartUtils::NewDartOSError());
+ }
+ ASSERT(buffer != nullptr);
+
+ OSError os_error;
+ SocketControlMessage* control_messages;
+ const intptr_t messages_read = SocketBase::ReceiveMessage(
+ socket->fd(), buffer, &buffer_num_bytes, &control_messages,
+ SocketBase::kAsync, &os_error);
+ if (messages_read < 0) {
+ ASSERT(messages_read == -1);
+ Dart_ThrowException(DartUtils::NewDartOSError(&os_error));
+ }
+ if (buffer_num_bytes > 0) {
+ uint8_t* new_buffer = nullptr;
+ Dart_Handle new_data = IOBuffer::Allocate(buffer_num_bytes, &new_buffer);
+ if (Dart_IsNull(new_data)) {
+ Dart_ThrowException(DartUtils::NewDartOSError());
+ }
+ ASSERT(new_buffer != nullptr);
+ memmove(new_buffer, buffer, buffer_num_bytes);
+ data = new_data;
+ }
+
+ // returned list has a (level, type, message bytes) triple for every message,
+ // plus last element is raw data uint8list.
+ Dart_Handle list = ThrowIfError(Dart_NewList(messages_read * 3 + 1));
+ int j = 0;
+ for (intptr_t i = 0; i < messages_read; i++) {
+ SocketControlMessage* message = control_messages + i;
+ Dart_Handle uint8list_message_data = ThrowIfError(
+ DartUtils::MakeUint8Array(message->data(), message->data_length()));
+ ThrowIfError(Dart_ListSetAt(
+ list, j++, ThrowIfError(Dart_NewInteger(message->level()))));
+ ThrowIfError(Dart_ListSetAt(
+ list, j++, ThrowIfError(Dart_NewInteger(message->type()))));
+ ThrowIfError(Dart_ListSetAt(list, j++, uint8list_message_data));
+ }
+ ThrowIfError(Dart_ListSetAt(list, j, data));
+ Dart_SetReturnValue(args, list);
+}
+
void FUNCTION_NAME(Socket_WriteList)(Dart_NativeArguments args) {
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
@@ -696,6 +750,66 @@
}
}
+void FUNCTION_NAME(Socket_SendMessage)(Dart_NativeArguments args) {
+ Socket* socket =
+ Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
+ intptr_t offset = DartUtils::GetNativeIntptrArgument(args, 2);
+ intptr_t length = DartUtils::GetNativeIntptrArgument(args, 3);
+
+ // List of triples <level, type, data> aranged to minimize dart api use in
+ // native methods.
+ Dart_Handle control_message_list_dart =
+ ThrowIfError(Dart_GetNativeArgument(args, 4));
+ ASSERT(Dart_IsList(control_message_list_dart));
+ intptr_t num_control_messages_pieces;
+ ThrowIfError(
+ Dart_ListLength(control_message_list_dart, &num_control_messages_pieces));
+ intptr_t num_control_messages = num_control_messages_pieces / 3;
+ ASSERT((num_control_messages * 3) == num_control_messages_pieces);
+ SocketControlMessage* control_messages =
+ reinterpret_cast<SocketControlMessage*>(Dart_ScopeAllocate(
+ sizeof(SocketControlMessage) * num_control_messages));
+ ASSERT(control_messages != nullptr);
+
+ SocketControlMessage* control_message = control_messages;
+ intptr_t j = 0;
+ for (intptr_t i = 0; i < num_control_messages; i++, control_message++) {
+ int level = DartUtils::GetIntegerValue(
+ ThrowIfError(Dart_ListGetAt(control_message_list_dart, j++)));
+ int type = DartUtils::GetIntegerValue(
+ ThrowIfError(Dart_ListGetAt(control_message_list_dart, j++)));
+ Dart_Handle uint8list_dart =
+ ThrowIfError(Dart_ListGetAt(control_message_list_dart, j++));
+
+ TypedDataScope data(uint8list_dart);
+ void* copied_data = Dart_ScopeAllocate(data.size_in_bytes());
+ ASSERT(copied_data != nullptr);
+ memmove(copied_data, data.data(), data.size_in_bytes());
+ new (control_message)
+ SocketControlMessage(level, type, copied_data, data.size_in_bytes());
+ }
+
+ OSError os_error;
+ intptr_t bytes_written;
+ {
+ Dart_Handle buffer_dart = Dart_GetNativeArgument(args, 1);
+ TypedDataScope data(buffer_dart);
+
+ ASSERT((offset + length) <= data.size_in_bytes());
+ uint8_t* buffer_at_offset =
+ reinterpret_cast<uint8_t*>(data.data()) + offset;
+ bytes_written = SocketBase::SendMessage(
+ socket->fd(), buffer_at_offset, length, control_messages,
+ num_control_messages, SocketBase::kAsync, &os_error);
+ }
+
+ if (bytes_written < 0) {
+ Dart_ThrowException(DartUtils::NewDartOSError(&os_error));
+ }
+
+ Dart_SetIntegerReturnValue(args, bytes_written);
+}
+
void FUNCTION_NAME(Socket_SendTo)(Dart_NativeArguments args) {
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
@@ -783,6 +897,12 @@
Dart_SetReturnValue(args, DartUtils::NewDartOSError(&os_error));
}
+void FUNCTION_NAME(Socket_GetFD)(Dart_NativeArguments args) {
+ Socket* socket =
+ Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
+ Dart_SetIntegerReturnValue(args, socket->fd());
+}
+
void FUNCTION_NAME(Socket_GetType)(Dart_NativeArguments args) {
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
@@ -1341,5 +1461,191 @@
return socket;
}
+void FUNCTION_NAME(SocketControlMessage_fromHandles)(
+ Dart_NativeArguments args) {
+#if defined(DART_HOST_OS_WINDOWS) || defined(DART_HOST_OS_FUCHSIA)
+ Dart_SetReturnValue(args,
+ DartUtils::NewDartUnsupportedError(
+ "This is not supported on this operating system"));
+#else
+ ASSERT(Dart_IsNull(Dart_GetNativeArgument(args, 0)));
+ Dart_Handle handles_dart = Dart_GetNativeArgument(args, 1);
+ if (Dart_IsNull(handles_dart)) {
+ Dart_ThrowException(
+ DartUtils::NewDartArgumentError("handles list can't be null"));
+ }
+ ASSERT(Dart_IsList(handles_dart));
+ intptr_t num_handles;
+ ThrowIfError(Dart_ListLength(handles_dart, &num_handles));
+ intptr_t num_bytes = num_handles * sizeof(int);
+ int* handles = reinterpret_cast<int*>(Dart_ScopeAllocate(num_bytes));
+ Dart_Handle handle_dart_string =
+ ThrowIfError(DartUtils::NewString("_handle"));
+ for (intptr_t i = 0; i < num_handles; i++) {
+ Dart_Handle handle_dart = ThrowIfError(Dart_ListGetAt(handles_dart, i));
+ Dart_Handle handle_int_dart =
+ ThrowIfError(Dart_GetField(handle_dart, handle_dart_string));
+ handles[i] = DartUtils::GetIntegerValue(handle_int_dart);
+ }
+
+ Dart_Handle uint8list_dart =
+ ThrowIfError(Dart_NewTypedData(Dart_TypedData_kUint8, num_bytes));
+ ThrowIfError(Dart_ListSetAsBytes(uint8list_dart, /*offset=*/0,
+ reinterpret_cast<const uint8_t*>(handles),
+ num_bytes));
+ Dart_Handle dart_new_args[] = {Dart_NewInteger(SOL_SOCKET),
+ Dart_NewInteger(SCM_RIGHTS), uint8list_dart};
+
+ Dart_Handle socket_control_message_impl = ThrowIfError(DartUtils::GetDartType(
+ DartUtils::kIOLibURL, "_SocketControlMessageImpl"));
+ Dart_SetReturnValue(
+ args,
+ Dart_New(socket_control_message_impl,
+ /*constructor_name=*/Dart_Null(),
+ sizeof(dart_new_args) / sizeof(Dart_Handle), dart_new_args));
+#endif // defined(DART_HOST_OS_WINDOWS) || defined(DART_HOST_OS_FUCHSIA)
+}
+
+void FUNCTION_NAME(SocketControlMessageImpl_extractHandles)(
+ Dart_NativeArguments args) {
+#if defined(DART_HOST_OS_WINDOWS) || defined(DART_HOST_OS_FUCHSIA)
+ Dart_SetReturnValue(args,
+ DartUtils::NewDartUnsupportedError(
+ "This is not supported on this operating system"));
+#else
+ Dart_Handle handle_type = ThrowIfError(
+ DartUtils::GetDartType(DartUtils::kIOLibURL, "ResourceHandle"));
+
+ Dart_Handle message_dart = Dart_GetNativeArgument(args, 0);
+ intptr_t level = DartUtils::GetIntegerValue(
+ ThrowIfError(Dart_GetField(message_dart, DartUtils::NewString("level"))));
+ intptr_t type = DartUtils::GetIntegerValue(
+ ThrowIfError(Dart_GetField(message_dart, DartUtils::NewString("type"))));
+ if (level != SOL_SOCKET || type != SCM_RIGHTS) {
+ Dart_SetReturnValue(args, ThrowIfError(Dart_NewListOfTypeFilled(
+ handle_type, Dart_Null(), 0)));
+ return;
+ }
+
+ Dart_Handle data_dart =
+ ThrowIfError(Dart_GetField(message_dart, DartUtils::NewString("data")));
+ ASSERT(Dart_IsTypedData(data_dart));
+
+ void* data;
+ intptr_t bytes_count;
+ Dart_TypedData_Type data_type;
+ ThrowIfError(
+ Dart_TypedDataAcquireData(data_dart, &data_type, &data, &bytes_count));
+ ASSERT(data_type == Dart_TypedData_kUint8);
+ int* ints_data = reinterpret_cast<int*>(Dart_ScopeAllocate(bytes_count));
+ ASSERT(ints_data != nullptr);
+ memmove(ints_data, data, bytes_count);
+ ThrowIfError(Dart_TypedDataReleaseData(data_dart));
+ intptr_t ints_count = bytes_count / sizeof(int);
+
+ Dart_Handle handle_impl_type =
+ DartUtils::GetDartType(DartUtils::kIOLibURL, "_ResourceHandleImpl");
+ Dart_Handle sentinel = ThrowIfError(
+ Dart_GetField(handle_impl_type, DartUtils::NewString("_sentinel")));
+ Dart_Handle handle_list =
+ ThrowIfError(Dart_NewListOfTypeFilled(handle_type, sentinel, ints_count));
+ for (intptr_t i = 0; i < ints_count; i++) {
+ Dart_Handle constructor_args[] = {
+ ThrowIfError(Dart_NewInteger(*(ints_data + i)))};
+ Dart_Handle handle_impl = ThrowIfError(Dart_New(
+ handle_impl_type,
+ /*constructor_name=*/Dart_Null(),
+ sizeof(constructor_args) / sizeof(Dart_Handle), constructor_args));
+ ThrowIfError(Dart_ListSetAt(handle_list, i, handle_impl));
+ }
+
+ Dart_SetReturnValue(args, handle_list);
+#endif // defined(DART_HOST_OS_WINDOWS) || defined(DART_HOST_OS_FUCHSIA)
+}
+
+void FUNCTION_NAME(ResourceHandleImpl_toFile)(Dart_NativeArguments args) {
+#if defined(DART_HOST_OS_WINDOWS) || defined(DART_HOST_OS_FUCHSIA)
+ Dart_SetReturnValue(args,
+ DartUtils::NewDartUnsupportedError(
+ "This is not supported on this operating system"));
+#else
+ Dart_Handle handle_object = ThrowIfError(Dart_GetNativeArgument(args, 0));
+ Dart_Handle handle_field = ThrowIfError(
+ Dart_GetField(handle_object, DartUtils::NewString("_handle")));
+ intptr_t fd = DartUtils::GetIntegerValue(handle_field);
+
+ Dart_Handle random_access_file_type = ThrowIfError(
+ DartUtils::GetDartType(DartUtils::kIOLibURL, "_RandomAccessFile"));
+
+ Dart_Handle dart_new_args[2];
+ dart_new_args[1] = ThrowIfError(Dart_NewStringFromCString("<handle>"));
+
+ File* file = File::OpenFD(fd);
+
+ Dart_Handle result = Dart_NewInteger(reinterpret_cast<intptr_t>(file));
+ if (Dart_IsError(result)) {
+ file->Release();
+ Dart_PropagateError(result);
+ }
+ dart_new_args[0] = result;
+
+ Dart_Handle new_random_access_file =
+ Dart_New(random_access_file_type,
+ /*constructor_name=*/Dart_Null(),
+ /*number_of_arguments=*/2, dart_new_args);
+ if (Dart_IsError(new_random_access_file)) {
+ file->Release();
+ Dart_PropagateError(new_random_access_file);
+ }
+
+ Dart_SetReturnValue(args, new_random_access_file);
+#endif // defined(DART_HOST_OS_WINDOWS) || defined(DART_HOST_OS_FUCHSIA)
+}
+
+void FUNCTION_NAME(ResourceHandleImpl_toSocket)(Dart_NativeArguments args) {
+ Dart_SetReturnValue(args,
+ DartUtils::NewDartUnsupportedError(
+ "This is not supported on this operating system"));
+}
+
+void FUNCTION_NAME(ResourceHandleImpl_toRawSocket)(Dart_NativeArguments args) {
+#if defined(DART_HOST_OS_WINDOWS) || defined(DART_HOST_OS_FUCHSIA)
+ Dart_SetReturnValue(args,
+ DartUtils::NewDartUnsupportedError(
+ "This is not supported on this operating system"));
+#else
+ Dart_Handle handle_object = ThrowIfError(Dart_GetNativeArgument(args, 0));
+ Dart_Handle handle_field = ThrowIfError(
+ Dart_GetField(handle_object, DartUtils::NewString("_handle")));
+ intptr_t fd = DartUtils::GetIntegerValue(handle_field);
+
+ SocketAddress* socket_address = reinterpret_cast<SocketAddress*>(
+ Dart_ScopeAllocate(sizeof(SocketAddress)));
+ ASSERT(socket_address != nullptr);
+ SocketBase::GetSocketName(fd, socket_address);
+
+ // return a list describing socket_address: (type, hostname, typed_data_addr,
+ // fd)
+ Dart_Handle list = ThrowIfError(Dart_NewList(4));
+ ThrowIfError(Dart_ListSetAt(
+ list, 0, ThrowIfError(Dart_NewInteger(socket_address->GetType()))));
+ ThrowIfError(Dart_ListSetAt(
+ list, 1,
+ ThrowIfError(Dart_NewStringFromCString(socket_address->as_string()))));
+ ThrowIfError(Dart_ListSetAt(
+ list, 2, SocketAddress::ToTypedData(socket_address->addr())));
+ ThrowIfError(Dart_ListSetAt(list, 3, ThrowIfError(Dart_NewInteger(fd))));
+
+ Dart_SetReturnValue(args, list);
+#endif // defined(DART_HOST_OS_WINDOWS) || defined(DART_HOST_OS_FUCHSIA)
+}
+
+void FUNCTION_NAME(ResourceHandleImpl_toRawDatagramSocket)(
+ Dart_NativeArguments args) {
+ Dart_SetReturnValue(args,
+ DartUtils::NewDartUnsupportedError(
+ "This is not supported on this operating system"));
+}
+
} // namespace bin
} // namespace dart
diff --git a/runtime/bin/socket_base.h b/runtime/bin/socket_base.h
index 0442e5b..32016532 100644
--- a/runtime/bin/socket_base.h
+++ b/runtime/bin/socket_base.h
@@ -150,6 +150,30 @@
DISALLOW_COPY_AND_ASSIGN(AddressList);
};
+class SocketControlMessage {
+ public:
+ SocketControlMessage(intptr_t level,
+ intptr_t type,
+ void* data,
+ size_t data_length)
+ : level_(level), type_(type), data_(data), data_length_(data_length) {}
+
+ intptr_t level() const { return level_; }
+ intptr_t type() const { return type_; }
+ void* data() const { return data_; }
+ size_t data_length() const { return data_length_; }
+
+ inline bool is_file_descriptors_control_message();
+
+ private:
+ const intptr_t level_;
+ const intptr_t type_;
+ void* data_;
+ const size_t data_length_;
+
+ DISALLOW_COPY_AND_ASSIGN(SocketControlMessage);
+};
+
class SocketBase : public AllStatic {
public:
enum SocketRequest {
@@ -182,16 +206,30 @@
intptr_t num_bytes,
const RawAddr& addr,
SocketOpKind sync);
+ static intptr_t SendMessage(intptr_t fd,
+ void* buffer,
+ size_t buffer_num_bytes,
+ SocketControlMessage* messages,
+ intptr_t num_messages,
+ SocketOpKind sync,
+ OSError* p_oserror);
static intptr_t RecvFrom(intptr_t fd,
void* buffer,
intptr_t num_bytes,
RawAddr* addr,
SocketOpKind sync);
+ static intptr_t ReceiveMessage(intptr_t fd,
+ void* buffer,
+ int64_t* p_buffer_num_bytes,
+ SocketControlMessage** p_messages,
+ SocketOpKind sync,
+ OSError* p_oserror);
static bool AvailableDatagram(intptr_t fd, void* buffer, intptr_t num_bytes);
// Returns true if the given error-number is because the system was not able
// to bind the socket to a specific IP.
static bool IsBindError(intptr_t error_number);
static intptr_t GetPort(intptr_t fd);
+ static bool GetSocketName(intptr_t fd, SocketAddress* p_sa);
static SocketAddress* GetRemotePeer(intptr_t fd, intptr_t* port);
static void GetError(intptr_t fd, OSError* os_error);
static int GetType(intptr_t fd);
diff --git a/runtime/bin/socket_base_android.cc b/runtime/bin/socket_base_android.cc
index f916aee..61e30ab 100644
--- a/runtime/bin/socket_base_android.cc
+++ b/runtime/bin/socket_base_android.cc
@@ -99,6 +99,20 @@
return read_bytes;
}
+bool SocketControlMessage::is_file_descriptors_control_message() {
+ return false;
+}
+
+intptr_t SocketBase::ReceiveMessage(intptr_t fd,
+ void* buffer,
+ int64_t* p_buffer_num_bytes,
+ SocketControlMessage** p_messages,
+ SocketOpKind sync,
+ OSError* p_oserror) {
+ errno = ENOSYS;
+ return -1;
+}
+
bool SocketBase::AvailableDatagram(intptr_t fd,
void* buffer,
intptr_t num_bytes) {
@@ -141,6 +155,34 @@
return written_bytes;
}
+intptr_t SocketBase::SendMessage(intptr_t fd,
+ void* buffer,
+ size_t num_bytes,
+ SocketControlMessage* messages,
+ intptr_t num_messages,
+ SocketOpKind sync,
+ OSError* p_oserror) {
+ errno = ENOSYS;
+ return -1;
+}
+
+bool SocketBase::GetSocketName(intptr_t fd, SocketAddress* p_sa) {
+ ASSERT(fd >= 0);
+ ASSERT(p_sa != nullptr);
+ RawAddr raw;
+ socklen_t size = sizeof(raw);
+ if (NO_RETRY_EXPECTED(getsockname(fd, &raw.addr, &size))) {
+ return false;
+ }
+
+ // sockaddr_un contains sa_family_t sun_family and char[] sun_path.
+ // If size is the size of sa_family_t, this is an unnamed socket and
+ // sun_path contains garbage.
+ new (p_sa) SocketAddress(&raw.addr,
+ /*unnamed_unix_socket=*/size == sizeof(sa_family_t));
+ return true;
+}
+
intptr_t SocketBase::GetPort(intptr_t fd) {
ASSERT(fd >= 0);
RawAddr raw;
diff --git a/runtime/bin/socket_base_fuchsia.cc b/runtime/bin/socket_base_fuchsia.cc
index 5f36731..670a240 100644
--- a/runtime/bin/socket_base_fuchsia.cc
+++ b/runtime/bin/socket_base_fuchsia.cc
@@ -124,6 +124,20 @@
return -1;
}
+bool SocketControlMessage::is_file_descriptors_control_message() {
+ return false;
+}
+
+intptr_t SocketBase::ReceiveMessage(intptr_t fd,
+ void* buffer,
+ int64_t* p_buffer_num_bytes,
+ SocketControlMessage** p_messages,
+ SocketOpKind sync,
+ OSError* p_oserror) {
+ errno = ENOSYS;
+ return -1;
+}
+
bool SocketBase::AvailableDatagram(intptr_t fd,
void* buffer,
intptr_t num_bytes) {
@@ -163,6 +177,34 @@
return -1;
}
+intptr_t SocketBase::SendMessage(intptr_t fd,
+ void* buffer,
+ size_t num_bytes,
+ SocketControlMessage* messages,
+ intptr_t num_messages,
+ SocketOpKind sync,
+ OSError* p_oserror) {
+ errno = ENOSYS;
+ return -1;
+}
+
+bool SocketBase::GetSocketName(intptr_t fd, SocketAddress* p_sa) {
+ ASSERT(fd >= 0);
+ ASSERT(p_sa != nullptr);
+ RawAddr raw;
+ socklen_t size = sizeof(raw);
+ if (NO_RETRY_EXPECTED(getsockname(fd, &raw.addr, &size))) {
+ return false;
+ }
+
+ // sockaddr_un contains sa_family_t sun_family and char[] sun_path.
+ // If size is the size of sa_family_t, this is an unnamed socket and
+ // sun_path contains garbage.
+ new (p_sa) SocketAddress(&raw.addr,
+ /*unnamed_unix_socket=*/size == sizeof(sa_family_t));
+ return true;
+}
+
intptr_t SocketBase::GetPort(intptr_t fd) {
IOHandle* handle = reinterpret_cast<IOHandle*>(fd);
ASSERT(handle->fd() >= 0);
diff --git a/runtime/bin/socket_base_linux.cc b/runtime/bin/socket_base_linux.cc
index 01d9d22..f23c23a 100644
--- a/runtime/bin/socket_base_linux.cc
+++ b/runtime/bin/socket_base_linux.cc
@@ -99,6 +99,75 @@
return read_bytes;
}
+bool SocketControlMessage::is_file_descriptors_control_message() {
+ return level_ == SOL_SOCKET && type_ == SCM_RIGHTS;
+}
+
+// /proc/sys/net/core/optmem_max is corresponding kernel setting.
+const size_t kMaxSocketMessageControlLength = 2048;
+
+// if return value is positive or zero - it's number of messages read
+// if it's negative - it's error code
+intptr_t SocketBase::ReceiveMessage(intptr_t fd,
+ void* buffer,
+ int64_t* p_buffer_num_bytes,
+ SocketControlMessage** p_messages,
+ SocketOpKind sync,
+ OSError* p_oserror) {
+ ASSERT(fd >= 0);
+ ASSERT(p_messages != nullptr);
+ ASSERT(p_buffer_num_bytes != nullptr);
+
+ struct iovec iov[1];
+ memset(iov, 0, sizeof(iov));
+ iov[0].iov_base = buffer;
+ iov[0].iov_len = *p_buffer_num_bytes;
+
+ struct msghdr msg;
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_iov = iov;
+ msg.msg_iovlen = 1; // number of elements in iov
+ uint8_t control_buffer[kMaxSocketMessageControlLength];
+ msg.msg_control = control_buffer;
+ msg.msg_controllen = sizeof(control_buffer);
+
+ ssize_t read_bytes = TEMP_FAILURE_RETRY(recvmsg(fd, &msg, MSG_CMSG_CLOEXEC));
+ if ((sync == kAsync) && (read_bytes == -1) && (errno == EWOULDBLOCK)) {
+ // If the read would block we need to retry and therefore return 0
+ // as the number of bytes read.
+ return 0;
+ }
+ if (read_bytes < 0) {
+ p_oserror->Reload();
+ return read_bytes;
+ }
+ *p_buffer_num_bytes = read_bytes;
+
+ struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
+ size_t num_messages = 0;
+ while (cmsg != nullptr) {
+ num_messages++;
+ cmsg = CMSG_NXTHDR(&msg, cmsg);
+ }
+ (*p_messages) = reinterpret_cast<SocketControlMessage*>(
+ Dart_ScopeAllocate(sizeof(SocketControlMessage) * num_messages));
+ SocketControlMessage* control_message = *p_messages;
+ for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr;
+ cmsg = CMSG_NXTHDR(&msg, cmsg), control_message++) {
+ void* data = CMSG_DATA(cmsg);
+ size_t data_length = cmsg->cmsg_len - (reinterpret_cast<uint8_t*>(data) -
+ reinterpret_cast<uint8_t*>(cmsg));
+ void* copied_data = Dart_ScopeAllocate(data_length);
+ ASSERT(copied_data != nullptr);
+ memmove(copied_data, data, data_length);
+ ASSERT(cmsg->cmsg_level == SOL_SOCKET);
+ ASSERT(cmsg->cmsg_type == SCM_RIGHTS);
+ new (control_message) SocketControlMessage(
+ cmsg->cmsg_level, cmsg->cmsg_type, copied_data, data_length);
+ }
+ return num_messages;
+}
+
bool SocketBase::AvailableDatagram(intptr_t fd,
void* buffer,
intptr_t num_bytes) {
@@ -141,6 +210,84 @@
return written_bytes;
}
+intptr_t SocketBase::SendMessage(intptr_t fd,
+ void* buffer,
+ size_t num_bytes,
+ SocketControlMessage* messages,
+ intptr_t num_messages,
+ SocketOpKind sync,
+ OSError* p_oserror) {
+ ASSERT(fd >= 0);
+
+ struct iovec iov = {
+ .iov_base = buffer,
+ .iov_len = num_bytes,
+ };
+
+ struct msghdr msg;
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ if (messages != nullptr && num_messages > 0) {
+ SocketControlMessage* message = messages;
+ size_t total_length = 0;
+ for (intptr_t i = 0; i < num_messages; i++, message++) {
+ total_length += CMSG_SPACE(message->data_length());
+ }
+
+ uint8_t* control_buffer =
+ reinterpret_cast<uint8_t*>(Dart_ScopeAllocate(total_length));
+ memset(control_buffer, 0, total_length);
+ msg.msg_control = control_buffer;
+ msg.msg_controllen = total_length;
+
+ struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
+ message = messages;
+ for (intptr_t i = 0; i < num_messages;
+ i++, message++, cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+ ASSERT(message->is_file_descriptors_control_message());
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+
+ intptr_t data_length = message->data_length();
+ cmsg->cmsg_len = CMSG_LEN(data_length);
+ memmove(CMSG_DATA(cmsg), message->data(), data_length);
+ }
+ msg.msg_controllen = total_length;
+ }
+
+ ssize_t written_bytes = TEMP_FAILURE_RETRY(sendmsg(fd, &msg, 0));
+ ASSERT(EAGAIN == EWOULDBLOCK);
+ if ((sync == kAsync) && (written_bytes == -1) && (errno == EWOULDBLOCK)) {
+ // If the would block we need to retry and therefore return 0 as
+ // the number of bytes written.
+ written_bytes = 0;
+ }
+ if (written_bytes < 0) {
+ p_oserror->Reload();
+ }
+
+ return written_bytes;
+}
+
+bool SocketBase::GetSocketName(intptr_t fd, SocketAddress* p_sa) {
+ ASSERT(fd >= 0);
+ ASSERT(p_sa != nullptr);
+ RawAddr raw;
+ socklen_t size = sizeof(raw);
+ if (NO_RETRY_EXPECTED(getsockname(fd, &raw.addr, &size))) {
+ return false;
+ }
+
+ // sockaddr_un contains sa_family_t sun_family and char[] sun_path.
+ // If size is the size of sa_family_t, this is an unnamed socket and
+ // sun_path contains garbage.
+ new (p_sa) SocketAddress(&raw.addr,
+ /*unnamed_unix_socket=*/size == sizeof(sa_family_t));
+ return true;
+}
+
intptr_t SocketBase::GetPort(intptr_t fd) {
ASSERT(fd >= 0);
RawAddr raw;
@@ -158,12 +305,12 @@
if (NO_RETRY_EXPECTED(getpeername(fd, &raw.addr, &size))) {
return NULL;
}
- // sockaddr_un contains sa_family_t sun_familty and char[] sun_path.
- // If size is the size of sa_familty_t, this is an unnamed socket and
+ // sockaddr_un contains sa_family_t sun_family and char[] sun_path.
+ // If size is the size of sa_family_t, this is an unnamed socket and
// sun_path contains garbage.
if (size == sizeof(sa_family_t)) {
*port = 0;
- return new SocketAddress(&raw.addr, true);
+ return new SocketAddress(&raw.addr, /*unnamed_unix_socket=*/true);
}
*port = SocketAddress::GetAddrPort(raw);
return new SocketAddress(&raw.addr);
diff --git a/runtime/bin/socket_base_macos.cc b/runtime/bin/socket_base_macos.cc
index cb26ed4..1b4cbcc 100644
--- a/runtime/bin/socket_base_macos.cc
+++ b/runtime/bin/socket_base_macos.cc
@@ -98,6 +98,20 @@
return read_bytes;
}
+bool SocketControlMessage::is_file_descriptors_control_message() {
+ return false;
+}
+
+intptr_t SocketBase::ReceiveMessage(intptr_t fd,
+ void* buffer,
+ int64_t* p_buffer_num_bytes,
+ SocketControlMessage** p_messages,
+ SocketOpKind sync,
+ OSError* p_oserror) {
+ errno = ENOSYS;
+ return -1;
+}
+
bool SocketBase::AvailableDatagram(intptr_t fd,
void* buffer,
intptr_t num_bytes) {
@@ -140,6 +154,34 @@
return written_bytes;
}
+intptr_t SocketBase::SendMessage(intptr_t fd,
+ void* buffer,
+ size_t num_bytes,
+ SocketControlMessage* messages,
+ intptr_t num_messages,
+ SocketOpKind sync,
+ OSError* p_oserror) {
+ errno = ENOSYS;
+ return -1;
+}
+
+bool SocketBase::GetSocketName(intptr_t fd, SocketAddress* p_sa) {
+ ASSERT(fd >= 0);
+ ASSERT(p_sa != nullptr);
+ RawAddr raw;
+ socklen_t size = sizeof(raw);
+ if (NO_RETRY_EXPECTED(getsockname(fd, &raw.addr, &size))) {
+ return false;
+ }
+
+ // sockaddr_un contains sa_family_t sun_family and char[] sun_path.
+ // If size is the size of sa_family_t, this is an unnamed socket and
+ // sun_path contains garbage.
+ new (p_sa) SocketAddress(&raw.addr,
+ /*unnamed_unix_socket=*/size == sizeof(sa_family_t));
+ return true;
+}
+
intptr_t SocketBase::GetPort(intptr_t fd) {
ASSERT(fd >= 0);
RawAddr raw;
diff --git a/runtime/bin/socket_base_win.cc b/runtime/bin/socket_base_win.cc
index 120d42f..4d03305 100644
--- a/runtime/bin/socket_base_win.cc
+++ b/runtime/bin/socket_base_win.cc
@@ -99,6 +99,20 @@
return handle->RecvFrom(buffer, num_bytes, &addr->addr, addr_len);
}
+bool SocketControlMessage::is_file_descriptors_control_message() {
+ return false;
+}
+
+intptr_t SocketBase::ReceiveMessage(intptr_t fd,
+ void* buffer,
+ int64_t* p_buffer_num_bytes,
+ SocketControlMessage** p_messages,
+ SocketOpKind sync,
+ OSError* p_oserror) {
+ errno = ENOSYS;
+ return -1;
+}
+
bool SocketBase::AvailableDatagram(intptr_t fd,
void* buffer,
intptr_t num_bytes) {
@@ -125,6 +139,34 @@
SocketAddress::GetAddrLength(addr));
}
+intptr_t SocketBase::SendMessage(intptr_t fd,
+ void* buffer,
+ size_t num_bytes,
+ SocketControlMessage* messages,
+ intptr_t num_messages,
+ SocketOpKind sync,
+ OSError* p_oserror) {
+ errno = ENOSYS;
+ return -1;
+}
+
+bool SocketBase::GetSocketName(intptr_t fd, SocketAddress* p_sa) {
+ ASSERT(fd >= 0);
+ ASSERT(p_sa != nullptr);
+ RawAddr raw;
+ socklen_t size = sizeof(raw);
+ if (getsockname(fd, &raw.addr, &size) == SOCKET_ERROR) {
+ return false;
+ }
+
+ // sockaddr_un contains sa_family_t sun_family and char[] sun_path.
+ // If size is the size of sa_family_t, this is an unnamed socket and
+ // sun_path contains garbage.
+ new (p_sa) SocketAddress(&raw.addr,
+ /*unnamed_unix_socket=*/size == sizeof(u_short));
+ return true;
+}
+
intptr_t SocketBase::GetPort(intptr_t fd) {
ASSERT(reinterpret_cast<Handle*>(fd)->is_socket());
SocketHandle* socket_handle = reinterpret_cast<SocketHandle*>(fd);
diff --git a/runtime/vm/compiler/aot/precompiler.cc b/runtime/vm/compiler/aot/precompiler.cc
index 82b4c61..c987ed3 100644
--- a/runtime/vm/compiler/aot/precompiler.cc
+++ b/runtime/vm/compiler/aot/precompiler.cc
@@ -3524,7 +3524,11 @@
PreventRenaming("html");
// Looked up by name via "DartUtils::GetDartType".
+ PreventRenaming("_RandomAccessFile");
PreventRenaming("_RandomAccessFileOpsImpl");
+ PreventRenaming("ResourceHandle");
+ PreventRenaming("_ResourceHandleImpl");
+ PreventRenaming("_SocketControlMessageImpl");
PreventRenaming("_NamespaceImpl");
}
diff --git a/sdk/lib/_internal/js_dev_runtime/patch/io_patch.dart b/sdk/lib/_internal/js_dev_runtime/patch/io_patch.dart
index 882f10c..4960213 100644
--- a/sdk/lib/_internal/js_dev_runtime/patch/io_patch.dart
+++ b/sdk/lib/_internal/js_dev_runtime/patch/io_patch.dart
@@ -502,6 +502,48 @@
}
@patch
+class SocketControlMessage {
+ @patch
+ factory SocketControlMessage.fromHandles(List<ResourceHandle> handles) {
+ throw UnsupportedError("SocketControlMessage constructor");
+ }
+}
+
+@patch
+class ResourceHandle {
+ @patch
+ factory ResourceHandle.fromFile(RandomAccessFile file) {
+ throw UnsupportedError("ResourceHandle.fromFile constructor");
+ }
+
+ @patch
+ factory ResourceHandle.fromSocket(Socket socket) {
+ throw UnsupportedError("ResourceHandle.fromSocket constructor");
+ }
+
+ @patch
+ factory ResourceHandle.fromRawSocket(RawSocket rawSocket) {
+ throw UnsupportedError("ResourceHandle.fromRawSocket constructor");
+ }
+
+ @patch
+ factory ResourceHandle.fromRawDatagramSocket(
+ RawDatagramSocket rawDatagramSocket) {
+ throw UnsupportedError("ResourceHandle.fromRawDatagramSocket constructor");
+ }
+
+ @patch
+ factory ResourceHandle.fromStdin(Stdin stdin) {
+ throw UnsupportedError("ResourceHandle.fromStdin constructor");
+ }
+
+ @patch
+ factory ResourceHandle.fromStdout(Stdout stdout) {
+ throw UnsupportedError("ResourceHandle.fromStdout constructor");
+ }
+}
+
+@patch
class SecureSocket {
@patch
factory SecureSocket._(RawSecureSocket rawSocket) {
diff --git a/sdk/lib/_internal/js_runtime/lib/io_patch.dart b/sdk/lib/_internal/js_runtime/lib/io_patch.dart
index 2ee566b..6a88d0f 100644
--- a/sdk/lib/_internal/js_runtime/lib/io_patch.dart
+++ b/sdk/lib/_internal/js_runtime/lib/io_patch.dart
@@ -502,6 +502,48 @@
}
@patch
+class SocketControlMessage {
+ @patch
+ factory SocketControlMessage.fromHandles(List<ResourceHandle> handles) {
+ throw UnsupportedError("SocketControlMessage constructor");
+ }
+}
+
+@patch
+class ResourceHandle {
+ @patch
+ factory ResourceHandle.fromFile(RandomAccessFile file) {
+ throw UnsupportedError("ResourceHandle.fromFile constructor");
+ }
+
+ @patch
+ factory ResourceHandle.fromSocket(Socket socket) {
+ throw UnsupportedError("ResourceHandle.fromSocket constructor");
+ }
+
+ @patch
+ factory ResourceHandle.fromRawSocket(RawSocket rawSocket) {
+ throw UnsupportedError("ResourceHandle.fromRawSocket constructor");
+ }
+
+ @patch
+ factory ResourceHandle.fromRawDatagramSocket(
+ RawDatagramSocket rawDatagramSocket) {
+ throw UnsupportedError("ResourceHandle.fromRawDatagramSocket constructor");
+ }
+
+ @patch
+ factory ResourceHandle.fromStdin(Stdin stdin) {
+ throw UnsupportedError("ResourceHandle.fromStdin constructor");
+ }
+
+ @patch
+ factory ResourceHandle.fromStdout(Stdout stdout) {
+ throw UnsupportedError("ResourceHandle.fromStdout constructor");
+ }
+}
+
+@patch
class SecureSocket {
@patch
factory SecureSocket._(RawSecureSocket rawSocket) {
diff --git a/sdk/lib/_internal/vm/bin/file_patch.dart b/sdk/lib/_internal/vm/bin/file_patch.dart
index b95e71a..8768d7a 100644
--- a/sdk/lib/_internal/vm/bin/file_patch.dart
+++ b/sdk/lib/_internal/vm/bin/file_patch.dart
@@ -79,9 +79,10 @@
@pragma("vm:external-name", "File_SetPointer")
external void _setPointer(int pointer);
-
@pragma("vm:external-name", "File_GetPointer")
external int getPointer();
+ @pragma("vm:external-name", "File_GetFD")
+ external int get fd;
@pragma("vm:external-name", "File_Close")
external int close();
@pragma("vm:external-name", "File_ReadByte")
diff --git a/sdk/lib/_internal/vm/bin/socket_patch.dart b/sdk/lib/_internal/vm/bin/socket_patch.dart
index c330ed8..9a68d6f 100644
--- a/sdk/lib/_internal/vm/bin/socket_patch.dart
+++ b/sdk/lib/_internal/vm/bin/socket_patch.dart
@@ -1081,6 +1081,43 @@
}
}
+ SocketMessage? readMessage([int? count]) {
+ if (count != null && count <= 0) {
+ throw ArgumentError("Illegal length $count");
+ }
+ if (isClosing || isClosed) return null;
+ try {
+ final bytesCount = count ?? nativeAvailable();
+ // Returned messagesData is a list of triples (level, type, uint8list)
+ // followed by uint8list with raw data.
+ // This is kept at this level to minimize dart api use in native method.
+ final List<dynamic> messagesData = nativeReceiveMessage(bytesCount);
+ final messages = <SocketControlMessage>[];
+ if (messagesData.isNotEmpty) {
+ final triplesCount = (messagesData.length - 1) / 3;
+ assert((triplesCount * 3) == (messagesData.length - 1));
+ for (int i = 0; i < triplesCount; i++) {
+ final message = _SocketControlMessageImpl(
+ messagesData[i * 3] as int,
+ messagesData[i * 3 + 1] as int,
+ messagesData[i * 3 + 2] as Uint8List);
+ messages.add(message);
+ }
+ }
+ final socketMessage = SocketMessage(
+ messagesData[messagesData.length - 1] as Uint8List, messages);
+ available = nativeAvailable();
+ if (!const bool.fromEnvironment("dart.vm.product")) {
+ _SocketProfile.collectStatistic(
+ nativeGetSocketId(), _SocketProfileType.readBytes, bytesCount);
+ }
+ return socketMessage;
+ } catch (e, st) {
+ reportError(e, st, "Read failed");
+ return null;
+ }
+ }
+
static int _fixOffset(int? offset) => offset ?? 0;
int write(List<int> buffer, int offset, int? bytes) {
@@ -1149,6 +1186,44 @@
}
}
+ int sendMessage(List<int> buffer, int offset, int? bytes,
+ List<SocketControlMessage> controlMessages) {
+ if (offset < 0) throw new RangeError.value(offset);
+ if (bytes != null) {
+ if (bytes < 0) throw new RangeError.value(bytes);
+ } else {
+ bytes = buffer.length - offset;
+ }
+ if ((offset + bytes) > buffer.length) {
+ throw new RangeError.value(offset + bytes);
+ }
+ if (isClosing || isClosed) return 0;
+ try {
+ _BufferAndStart bufferAndStart =
+ _ensureFastAndSerializableByteData(buffer, offset, bytes);
+ if (!const bool.fromEnvironment("dart.vm.product")) {
+ _SocketProfile.collectStatistic(
+ nativeGetSocketId(),
+ _SocketProfileType.writeBytes,
+ bufferAndStart.buffer.length - bufferAndStart.start);
+ }
+ // list of triples <level, type, data> arranged to minimize dart api
+ // use in native method.
+ List<dynamic> messages = <dynamic>[];
+ for (SocketControlMessage controlMessage in controlMessages) {
+ messages.add(controlMessage.level);
+ messages.add(controlMessage.type);
+ messages.add(controlMessage.data);
+ }
+
+ return nativeSendMessage(
+ bufferAndStart.buffer, bufferAndStart.start, bytes, messages);
+ } catch (e, st) {
+ scheduleMicrotask(() => reportError(e, st, "SendMessage failed"));
+ return 0;
+ }
+ }
+
_NativeSocket? accept() {
// Don't issue accept if we're closing.
if (isClosing || isClosed) return null;
@@ -1540,11 +1615,16 @@
external Uint8List? nativeRead(int len);
@pragma("vm:external-name", "Socket_RecvFrom")
external Datagram? nativeRecvFrom();
+ @pragma("vm:external-name", "Socket_ReceiveMessage")
+ external List<dynamic> nativeReceiveMessage(int len);
@pragma("vm:external-name", "Socket_WriteList")
external int nativeWrite(List<int> buffer, int offset, int bytes);
@pragma("vm:external-name", "Socket_SendTo")
external int nativeSendTo(
List<int> buffer, int offset, int bytes, Uint8List address, int port);
+ @pragma("vm:external-name", "Socket_SendMessage")
+ external nativeSendMessage(
+ List<int> buffer, int offset, int bytes, List<dynamic> controlMessages);
@pragma("vm:external-name", "Socket_CreateConnect")
external nativeCreateConnect(Uint8List addr, int port, int scope_id);
@pragma("vm:external-name", "Socket_CreateUnixDomainConnect")
@@ -1574,6 +1654,8 @@
external List nativeGetRemotePeer();
@pragma("vm:external-name", "Socket_GetSocketId")
external int nativeGetSocketId();
+ @pragma("vm:external-name", "Socket_GetFD")
+ external int get fd;
@pragma("vm:external-name", "Socket_GetError")
external OSError nativeGetError();
@pragma("vm:external-name", "Socket_GetOption")
@@ -1785,9 +1867,17 @@
}
}
+ SocketMessage? readMessage([int? count]) {
+ return _socket.readMessage(count);
+ }
+
int write(List<int> buffer, [int offset = 0, int? count]) =>
_socket.write(buffer, offset, count);
+ int sendMessage(List<SocketControlMessage> controlMessages, List<int> data,
+ [int offset = 0, int? count]) =>
+ _socket.sendMessage(data, offset, count, controlMessages);
+
Future<RawSocket> close() => _socket.close().then<RawSocket>((_) {
if (!const bool.fromEnvironment("dart.vm.product")) {
_SocketProfile.collectStatistic(
@@ -2411,3 +2501,95 @@
_InternetAddress(InternetAddressType._from(type), address, null, in_addr),
port);
}
+
+@patch
+class ResourceHandle {
+ factory ResourceHandle.fromFile(RandomAccessFile file) {
+ int fd = (file as _RandomAccessFile).fd;
+ return _ResourceHandleImpl(fd);
+ }
+
+ factory ResourceHandle.fromSocket(Socket socket) {
+ final _socket = socket as _Socket;
+ if (_socket._raw == null) {
+ throw ArgumentError("Socket is closed");
+ }
+ final _RawSocket raw = _socket._raw! as _RawSocket;
+ final _NativeSocket nativeSocket = raw._socket;
+ int fd = nativeSocket.fd;
+ return _ResourceHandleImpl(fd);
+ }
+
+ factory ResourceHandle.fromRawSocket(RawSocket socket) {
+ final _RawSocket raw = socket as _RawSocket;
+ final _NativeSocket nativeSocket = raw._socket;
+ int fd = nativeSocket.fd;
+ return _ResourceHandleImpl(fd);
+ }
+
+ factory ResourceHandle.fromRawDatagramSocket(RawDatagramSocket socket) {
+ final _RawDatagramSocket raw = socket as _RawDatagramSocket;
+ final _NativeSocket nativeSocket = socket._socket;
+ int fd = nativeSocket.fd;
+ return _ResourceHandleImpl(fd);
+ }
+
+ factory ResourceHandle.fromStdin(Stdin stdin) {
+ return _ResourceHandleImpl(stdin._fd);
+ }
+
+ factory ResourceHandle.fromStdout(Stdout stdout) {
+ return _ResourceHandleImpl(stdout._fd);
+ }
+}
+
+@pragma("vm:entry-point")
+class _ResourceHandleImpl implements ResourceHandle {
+ int _handle; // file descriptor on linux
+ _ResourceHandleImpl(this._handle);
+
+ @pragma("vm:external-name", "ResourceHandleImpl_toFile")
+ external RandomAccessFile toFile();
+ @pragma("vm:external-name", "ResourceHandleImpl_toSocket")
+ external Socket toSocket();
+ @pragma("vm:external-name", "ResourceHandleImpl_toRawSocket")
+ external List<dynamic> _toRawSocket();
+
+ RawSocket toRawSocket() {
+ List<dynamic> list = _toRawSocket();
+ InternetAddressType type = InternetAddressType._from(list[0] as int);
+ String hostname = list[1] as String;
+ Uint8List rawAddr = list[2] as Uint8List;
+ int fd = list[3] as int;
+ InternetAddress internetAddress = type == InternetAddressType.unix
+ ? _InternetAddress.fromString(hostname, type: InternetAddressType.unix)
+ : _InternetAddress(type, hostname, null, rawAddr);
+ final nativeSocket = _NativeSocket.normal(internetAddress);
+ nativeSocket.nativeSetSocketId(fd, _NativeSocket.typeInternalSocket);
+ return _RawSocket(nativeSocket);
+ }
+
+ @pragma("vm:external-name", "ResourceHandleImpl_toRawDatagramSocket")
+ external RawDatagramSocket toRawDatagramSocket();
+
+ static final _ResourceHandleImpl _sentinel = _ResourceHandleImpl(-1);
+}
+
+@patch
+class SocketControlMessage {
+ factory SocketControlMessage.fromHandles(List<ResourceHandle> handles)
+ native "SocketControlMessage_fromHandles";
+}
+
+class _SocketControlMessageImpl implements SocketControlMessage {
+ final int level;
+ final int type;
+ final Uint8List data;
+
+ _SocketControlMessageImpl(this.level, this.type, this.data);
+
+ @pragma("vm:external-name", "SocketControlMessageImpl_extractHandles")
+ external List<ResourceHandle> extractHandles();
+
+ static final _sentinel = _SocketControlMessageImpl(0, 0, Uint8List(0));
+}
diff --git a/sdk/lib/io/file_impl.dart b/sdk/lib/io/file_impl.dart
index 696273b..1311444 100644
--- a/sdk/lib/io/file_impl.dart
+++ b/sdk/lib/io/file_impl.dart
@@ -647,6 +647,7 @@
external factory _RandomAccessFileOps(int pointer);
int getPointer();
+ int get fd;
int close();
readByte();
read(int bytes);
@@ -1051,6 +1052,8 @@
bool closed = false;
+ int get fd => _ops.fd;
+
// WARNING:
// Calling this function will increase the reference count on the native
// object that implements the file operations. It should only be called to
diff --git a/sdk/lib/io/secure_socket.dart b/sdk/lib/io/secure_socket.dart
index 32222b5..6150653 100644
--- a/sdk/lib/io/secure_socket.dart
+++ b/sdk/lib/io/secure_socket.dart
@@ -669,6 +669,10 @@
return result;
}
+ SocketMessage? readMessage([int? count]) {
+ throw UnsupportedError("Message-passing not supported by secure sockets");
+ }
+
static int _fixOffset(int? offset) => offset ?? 0;
// Write the data to the socket, and schedule the filter to encrypt it.
@@ -699,6 +703,11 @@
return written;
}
+ int sendMessage(List<SocketControlMessage> controlMessages, List<int> data,
+ [int offset = 0, int? count]) {
+ throw UnsupportedError("Message-passing not supported by secure sockets");
+ }
+
X509Certificate? get peerCertificate => _secureFilter!.peerCertificate;
String? get selectedProtocol => _selectedProtocol;
diff --git a/sdk/lib/io/socket.dart b/sdk/lib/io/socket.dart
index ab2ecf5..26cb3ef 100644
--- a/sdk/lib/io/socket.dart
+++ b/sdk/lib/io/socket.dart
@@ -615,6 +615,23 @@
/// is returned.
Uint8List? read([int? len]);
+ /// Reads a message containing up to [count] bytes from the socket.
+ ///
+ /// This function differs from [read] in that it will also return any
+ /// [SocketControlMessage] that have been sent.
+ ///
+ /// This function is non-blocking and will only return data
+ /// if data is available.
+ /// The number of bytes read can be less then [count] if fewer bytes are
+ /// available for immediate reading.
+ /// Length of data buffer in [SocketMessage] indicates number of bytes read.
+ ///
+ /// Returns `null` if no data is available.
+ ///
+ /// Unsupported by [RawSecureSocket].
+ @Since("2.15")
+ SocketMessage? readMessage([int? count]);
+
/// Writes up to [count] bytes of the buffer from [offset] buffer offset to
/// the socket.
///
@@ -626,6 +643,35 @@
/// `buffer.length - offset`.
int write(List<int> buffer, [int offset = 0, int? count]);
+ /// Writes socket control messages and data bytes to the socket.
+ ///
+ /// Writes [controlMessages] and up to [count] bytes of [data],
+ /// starting at [offset], to the socket. If [count] is not provided,
+ /// as many bytes as possible are written. Use [write] instead if no control
+ /// messages are required to be sent.
+ ///
+ /// When sent control messages are received, they are retained until the
+ /// next call to [readMessage], where all currently available control messages
+ /// are provided as part of the returned [SocketMessage].
+ /// Calling [read] will read only data bytes, and will not affect control
+ /// messages.
+ ///
+ /// The [count] must be positive (greater than zero).
+ ///
+ /// Returns the number of bytes written, which cannot be greater than
+ /// [count], nor greater than `data.length - offset`.
+ /// Return value of zero indicates that control messages were not sent.
+ ///
+ /// This function is non-blocking and will only write data
+ /// if buffer space is available in the socket.
+ ///
+ /// Throws an [OSError] if message could not be sent out.
+ ///
+ /// Unsupported by [RawSecureSocket].
+ @Since("2.15")
+ int sendMessage(List<SocketControlMessage> controlMessages, List<int> data,
+ [int offset = 0, int? count]);
+
/// The port used by this socket.
///
/// Throws a [SocketException] if the socket is closed.
@@ -825,6 +871,125 @@
Datagram(this.data, this.address, this.port);
}
+/// A wrappper around OS resource handle so it can be passed via Socket
+/// as part of [SocketMessage].
+abstract class ResourceHandle {
+ /// Creates wrapper around opened file.
+ external factory ResourceHandle.fromFile(RandomAccessFile file);
+
+ /// Creates wrapper around opened socket.
+ external factory ResourceHandle.fromSocket(Socket socket);
+
+ /// Creates wrapper around opened raw socket.
+ external factory ResourceHandle.fromRawSocket(RawSocket socket);
+
+ /// Creates wrapper around opened raw datagram socket.
+ external factory ResourceHandle.fromRawDatagramSocket(
+ RawDatagramSocket socket);
+
+ /// Creates wrapper around current stdin.
+ external factory ResourceHandle.fromStdin(Stdin stdin);
+
+ /// Creates wrapper around current stdout.
+ external factory ResourceHandle.fromStdout(Stdout stdout);
+
+ /// Extracts opened file from resource handle.
+ ///
+ /// This can also be used when receiving stdin and stdout handles.
+ ///
+ /// If this resource handle is not a file or stdio handle, the behavior of the
+ /// returned [RandomAccessFile] is completely unspecified.
+ /// Be very careful to avoid using a handle incorrectly.
+ RandomAccessFile toFile();
+
+ /// Extracts opened socket from resource handle.
+ ///
+ /// If this resource handle is not a socket handle, the behavior of the
+ /// returned [Socket] is completely unspecified.
+ /// Be very careful to avoid using a handle incorrectly.
+ Socket toSocket();
+
+ /// Extracts opened raw socket from resource handle.
+ ///
+ /// If this resource handle is not a socket handle, the behavior of the
+ /// returned [RawSocket] is completely unspecified.
+ /// Be very careful to avoid using a handle incorrectly.
+ RawSocket toRawSocket();
+
+ /// Extracts opened raw datagram socket from resource handle.
+ ///
+ /// If this resource handle is not a datagram socket handle, the behavior of
+ /// the returned [RawDatagramSocket] is completely unspecified.
+ /// Be very careful to avoid using a handle incorrectly.
+ RawDatagramSocket toRawDatagramSocket();
+}
+
+/// Control message part of the [SocketMessage] received by a call to
+/// [RawSocket.readMessage].
+///
+/// Control messages could carry different information including
+/// [ResourceHandle]. If [ResourceHandle]s are availabe as part of this message,
+/// they can be extracted via [extractHandles].
+abstract class SocketControlMessage {
+ /// Creates a control message containing the provided [handles].
+ ///
+ /// This is used by the sender when it sends handles across the socket.
+ /// Receiver can extract the handles from the message using [extractHandles].
+ external factory SocketControlMessage.fromHandles(
+ List<ResourceHandle> handles);
+
+ /// Extracts the list of handles embedded in this message.
+ ///
+ /// This method must only be used to extract handles from messages
+ /// received on a socket. It must not be used on a socket control
+ /// message that is created locally, and has not been sent using
+ /// [RawSocket.sendMessage].
+ ///
+ /// This method must only be called once.
+ /// Calling it multiple times may cause duplicated handles with unspecified
+ /// behavior.
+ List<ResourceHandle> extractHandles();
+
+ /// A platform specific value used to determine the kind of control message.
+ ///
+ /// Together with [type], these two integers identify the kind of control
+ /// message in a platform specific way.
+ /// For example, on Linux certain combinations of these values indicate
+ /// that this is a control message that carries [ResourceHandle]s.
+ int get level;
+
+ /// A platform specific value used to determine the kind of control message.
+ ///
+ /// Together with [level], these two integers identify the kind of control
+ /// message in a platform specific way.
+ /// For example, on Linux certain combinations of these values indicate
+ /// that this is a control message that carries [ResourceHandle]s.
+ int get type;
+
+ /// Actual bytes that were passed as part of the control message by the
+ /// underlying platform.
+ ///
+ /// The bytes are interpreted differently depending on the [level] and
+ /// [type]. These actual bytes can be used to inspect and interpret
+ /// non-handle-carrying messages.
+ Uint8List get data;
+}
+
+/// A socket message received by a [RawDatagramSocket].
+///
+/// A socket message consists of [data] bytes and [controlMessages].
+class SocketMessage {
+ /// The actual bytes of the message.
+ final Uint8List data;
+
+ /// The control messages sent as part of this socket message.
+ ///
+ /// This list can be empty.
+ final List<SocketControlMessage> controlMessages;
+
+ SocketMessage(this.data, this.controlMessages);
+}
+
/// An unbuffered interface to a UDP socket.
///
/// The raw datagram socket delivers the datagrams in the same chunks as the
diff --git a/tests/standalone/io/unix_socket_test.dart b/tests/standalone/io/unix_socket_test.dart
index d510fb6..f87a4e5 100644
--- a/tests/standalone/io/unix_socket_test.dart
+++ b/tests/standalone/io/unix_socket_test.dart
@@ -435,18 +435,421 @@
await httpServer.close();
}
+Future testFileMessage(String tempDirPath) async {
+ if (!Platform.isLinux && !Platform.isAndroid) {
+ return;
+ }
+
+ final completer = Completer<bool>();
+
+ final address =
+ InternetAddress('$tempDirPath/sock', type: InternetAddressType.unix);
+ final server = await RawServerSocket.bind(address, 0, shared: false);
+
+ server.listen((RawSocket socket) async {
+ print('server started a socket $socket');
+ socket.listen((e) {
+ if (e == RawSocketEvent.read) {
+ final SocketMessage? message = socket.readMessage();
+ if (message == null) {
+ return;
+ }
+ print('server received message $message');
+ final String messageData = String.fromCharCodes(message.data);
+ print('server received messageData $messageData');
+ if (messageData == 'EmptyMessage') {
+ Expect.equals('EmptyMessage'.length, message.data.length);
+ Expect.isTrue(message.controlMessages.isEmpty);
+ return;
+ }
+ Expect.equals('Hello', messageData);
+ Expect.equals('Hello'.length, message.data.length);
+ Expect.equals(1, message.controlMessages.length);
+ final SocketControlMessage controlMessage = message.controlMessages[0];
+ final handles = controlMessage.extractHandles();
+ Expect.isNotNull(handles);
+ Expect.equals(1, handles.length);
+ final receivedFile = handles[0].toFile();
+ receivedFile.writeStringSync('Hello, server!\n');
+ print("server has written to the $receivedFile file");
+ socket.write('abc'.codeUnits);
+ } else if (e == RawSocketEvent.readClosed) {
+ print('server socket got readClosed');
+ socket.close();
+ server.close();
+ }
+ });
+ });
+
+ final file = File('$tempDirPath/myfile.txt');
+ final randomAccessFile = file.openSync(mode: FileMode.write);
+ // Send a message with sample file.
+ final socket = await RawSocket.connect(address, 0);
+ socket.listen((e) {
+ if (e == RawSocketEvent.write) {
+ randomAccessFile.writeStringSync('Hello, client!\n');
+ socket.sendMessage(<SocketControlMessage>[
+ SocketControlMessage.fromHandles(
+ <ResourceHandle>[ResourceHandle.fromFile(randomAccessFile)])
+ ], 'Hello'.codeUnits);
+ print('client sent a message');
+ socket.sendMessage(<SocketControlMessage>[], 'EmptyMessage'.codeUnits);
+ print('client sent a message without control data');
+ } else if (e == RawSocketEvent.read) {
+ final data = socket.read();
+ if (data == null) {
+ return;
+ }
+ Expect.equals('abc', String.fromCharCodes(data));
+ Expect.equals(
+ 'Hello, client!\nHello, server!\n', file.readAsStringSync());
+ socket.close();
+ completer.complete(true);
+ }
+ });
+
+ return completer.future;
+}
+
+Future testTooLargeControlMessage(String tempDirPath) async {
+ if (!Platform.isLinux && !Platform.isAndroid) {
+ return;
+ }
+ final completer = Completer<bool>();
+ final address =
+ InternetAddress('$tempDirPath/sock', type: InternetAddressType.unix);
+ final server = await RawServerSocket.bind(address, 0, shared: false);
+
+ server.listen((RawSocket socket) async {
+ print('server started a socket $socket');
+ socket.listen((e) {
+ if (e == RawSocketEvent.read) {
+ throw "Server should not receive request from the client";
+ } else if (e == RawSocketEvent.readClosed) {
+ socket.close();
+ server.close();
+ }
+ });
+ });
+
+ final file = File('$tempDirPath/myfile.txt');
+ final randomAccessFile = file.openSync(mode: FileMode.write);
+ // Send a message with sample file.
+ final socket = await RawSocket.connect(address, 0);
+
+ runZonedGuarded(
+ () => socket.listen((e) {
+ if (e == RawSocketEvent.write) {
+ randomAccessFile.writeStringSync('Hello, client!\n');
+ const int largeHandleCount = 1024;
+ final manyResourceHandles = List<ResourceHandle>.filled(
+ largeHandleCount, ResourceHandle.fromFile(randomAccessFile));
+ socket.sendMessage(<SocketControlMessage>[
+ SocketControlMessage.fromHandles(manyResourceHandles)
+ ], 'Hello'.codeUnits);
+ server.close();
+ socket.close();
+ }
+ }), (e, st) {
+ print('Got expected unhandled exception $e $st');
+ Expect.equals(true, e is SocketException);
+ completer.complete(true);
+ });
+
+ return completer.future;
+}
+
+Future testFileMessageWithShortRead(String tempDirPath) async {
+ if (!Platform.isLinux && !Platform.isAndroid) {
+ return;
+ }
+
+ final completer = Completer<bool>();
+
+ final address =
+ InternetAddress('$tempDirPath/sock', type: InternetAddressType.unix);
+ final server = await RawServerSocket.bind(address, 0, shared: false);
+
+ server.listen((RawSocket socket) async {
+ print('server started a socket $socket');
+ socket.listen((e) {
+ if (e == RawSocketEvent.read) {
+ Expect.throws(
+ () => socket.readMessage(0),
+ (e) =>
+ e is ArgumentError &&
+ e.toString().contains('Illegal length 0'));
+ final SocketMessage? message = socket.readMessage(/*count=*/ 1);
+ if (message == null) {
+ return;
+ }
+ print('server received message $message');
+ final String messageData = String.fromCharCodes(message.data);
+ print('messageData: $messageData');
+ if (messageData[0] == 'H') {
+ Expect.equals(1, message.controlMessages.length);
+ final SocketControlMessage controlMessage =
+ message.controlMessages[0];
+ final handles = controlMessage.extractHandles();
+ Expect.isNotNull(handles);
+ Expect.equals(1, handles.length);
+ final handlesAgain = controlMessage.extractHandles();
+ Expect.isNotNull(handlesAgain);
+ Expect.equals(1, handlesAgain.length);
+ handles[0].toFile().writeStringSync('Hello, server!\n');
+ socket.write('abc'.codeUnits);
+ } else {
+ Expect.equals('i', messageData[0]);
+ Expect.equals(0, message.controlMessages.length);
+ }
+ } else if (e == RawSocketEvent.readClosed) {
+ print('server socket got readClosed');
+ socket.close();
+ server.close();
+ }
+ });
+ });
+
+ final file = File('$tempDirPath/myfile.txt');
+ final randomAccessFile = file.openSync(mode: FileMode.write);
+ // Send a message with sample file.
+ final socket = await RawSocket.connect(address, 0);
+ socket.listen((e) {
+ if (e == RawSocketEvent.write) {
+ randomAccessFile.writeStringSync('Hello, client!\n');
+ socket.sendMessage(<SocketControlMessage>[
+ SocketControlMessage.fromHandles(
+ <ResourceHandle>[ResourceHandle.fromFile(randomAccessFile)])
+ ], 'Hi'.codeUnits);
+ print('client sent a message');
+ } else if (e == RawSocketEvent.read) {
+ final data = socket.read();
+ if (data == null) {
+ return;
+ }
+ Expect.equals('abc', String.fromCharCodes(data));
+ Expect.equals(
+ 'Hello, client!\nHello, server!\n', file.readAsStringSync());
+ socket.close();
+ completer.complete(true);
+ }
+ });
+
+ return completer.future;
+}
+
+Future<RawServerSocket> createTestServer() async {
+ final server = await RawServerSocket.bind(InternetAddress.LOOPBACK_IP_V4, 0);
+ return server
+ ..listen((client) {
+ String receivedData = "";
+
+ client.writeEventsEnabled = false;
+ client.listen((event) {
+ switch (event) {
+ case RawSocketEvent.READ:
+ assert(client.available() > 0);
+ final buffer = client.read(200)!;
+ receivedData += String.fromCharCodes(buffer);
+ break;
+ case RawSocketEvent.READ_CLOSED:
+ client.close();
+ server.close();
+ break;
+ case RawSocketEvent.CLOSED:
+ Expect.equals(
+ "Hello, client 1!\nHello, client 2!\nHello, server!\n",
+ receivedData);
+ break;
+ default:
+ throw "Unexpected event $event";
+ }
+ }, onError: (e) {
+ print("client ERROR $e");
+ });
+ });
+}
+
+Future testSocketMessage(String uniqueName) async {
+ if (!Platform.isLinux && !Platform.isAndroid) {
+ return;
+ }
+
+ final address =
+ InternetAddress('$uniqueName/sock', type: InternetAddressType.unix);
+ final server = await RawServerSocket.bind(address, 0, shared: false);
+
+ server.listen((RawSocket socket) async {
+ socket.listen((e) {
+ switch (e) {
+ case RawSocketEvent.read:
+ final SocketMessage? message = socket.readMessage();
+ if (message == null) {
+ return;
+ }
+ Expect.equals('Hello', String.fromCharCodes(message.data));
+ Expect.equals(1, message.controlMessages.length);
+ final SocketControlMessage controlMessage =
+ message.controlMessages[0];
+ final handles = controlMessage.extractHandles();
+ Expect.isNotNull(handles);
+ Expect.equals(1, handles.length);
+ final receivedSocket = handles[0].toRawSocket();
+ receivedSocket.write('Hello, server!\n'.codeUnits);
+ socket.write('server replied'.codeUnits);
+ break;
+ case RawSocketEvent.readClosed:
+ socket.close();
+ server.close();
+ break;
+ }
+ });
+ });
+
+ final RawServerSocket testServer = await createTestServer();
+ final testSocket = await RawSocket.connect("127.0.0.1", testServer.port);
+
+ // Send a message with opened [testSocket] socket.
+ final socket = await RawSocket.connect(address, 0);
+ socket.listen((e) {
+ switch (e) {
+ case RawSocketEvent.write:
+ testSocket.write('Hello, client 1!\n'.codeUnits);
+ socket.sendMessage(<SocketControlMessage>[
+ SocketControlMessage.fromHandles(
+ <ResourceHandle>[ResourceHandle.fromRawSocket(testSocket)])
+ ], 'Hello'.codeUnits);
+ testSocket.write('Hello, client 2!\n'.codeUnits);
+ break;
+ case RawSocketEvent.read:
+ final data = socket.read();
+ if (data == null) {
+ return;
+ }
+
+ final dataString = String.fromCharCodes(data);
+ Expect.equals('server replied', dataString);
+ socket.close();
+ testSocket.close();
+ testServer.close();
+ }
+ });
+}
+
+Future testStdioMessage(String tempDirPath, {bool caller: false}) async {
+ if (!Platform.isLinux && !Platform.isAndroid) {
+ return;
+ }
+ if (caller) {
+ final process = await Process.start(Platform.resolvedExecutable,
+ <String>[Platform.script.toFilePath(), '--start-stdio-message-test']);
+ String processStdout = "";
+ String processStderr = "";
+ process.stdout.transform(utf8.decoder).listen((line) {
+ processStdout += line;
+ print('stdout:>$line<');
+ });
+ process.stderr.transform(utf8.decoder).listen((line) {
+ processStderr += line;
+ print('stderr:>$line<');
+ });
+ process.stdin.writeln('Caller wrote to stdin');
+
+ Expect.equals(0, await process.exitCode);
+ Expect.equals("client sent a message\nHello, server!\n", processStdout);
+ Expect.equals(
+ "client wrote to stderr\nHello, server too!\n", processStderr);
+ return;
+ }
+
+ final address =
+ InternetAddress('$tempDirPath/sock', type: InternetAddressType.unix);
+ final server = await RawServerSocket.bind(address, 0, shared: false);
+
+ server.listen((RawSocket socket) async {
+ socket.listen((e) {
+ if (e == RawSocketEvent.read) {
+ final SocketMessage? message = socket.readMessage();
+ if (message == null) {
+ return;
+ }
+ Expect.equals('Hello', String.fromCharCodes(message.data));
+ Expect.equals(message.controlMessages.length, 1);
+ final SocketControlMessage controlMessage = message.controlMessages[0];
+ final handles = controlMessage.extractHandles();
+ Expect.isNotNull(handles);
+ Expect.equals(3, handles.length);
+ final receivedStdin = handles[0].toFile();
+ final receivedString = String.fromCharCodes(receivedStdin.readSync(32));
+ Expect.equals('Caller wrote to stdin\n', receivedString);
+ final receivedStdout = handles[1].toFile();
+ receivedStdout.writeStringSync('Hello, server!\n');
+ final receivedStderr = handles[2].toFile();
+ receivedStderr.writeStringSync('Hello, server too!\n');
+ socket.write('abc'.codeUnits);
+ } else if (e == RawSocketEvent.readClosed) {
+ socket.close();
+ server.close();
+ }
+ });
+ });
+
+ final file = File('$tempDirPath/myfile.txt');
+ final randomAccessFile = file.openSync(mode: FileMode.write);
+ // Send a message with sample file.
+ var socket = await RawSocket.connect(address, 0);
+ socket.listen((e) {
+ if (e == RawSocketEvent.write) {
+ socket.sendMessage(<SocketControlMessage>[
+ SocketControlMessage.fromHandles(<ResourceHandle>[
+ ResourceHandle.fromStdin(stdin),
+ ResourceHandle.fromStdout(stdout),
+ ResourceHandle.fromStdout(stderr)
+ ])
+ ], 'Hello'.codeUnits);
+ stdout.writeln('client sent a message');
+ stderr.writeln('client wrote to stderr');
+ } else if (e == RawSocketEvent.read) {
+ final data = socket.read();
+ if (data == null) {
+ return;
+ }
+ Expect.equals('abc', String.fromCharCodes(data));
+ socket.close();
+ }
+ });
+}
+
// Create socket in temp directory
Future withTempDir(String prefix, Future<void> test(Directory dir)) async {
- var tempDir = Directory.systemTemp.createTempSync(prefix);
+ final tempDir = Directory.systemTemp.createTempSync(prefix);
try {
- await test(tempDir);
+ await runZonedGuarded(() => test(tempDir), (e, st) {
+ try {
+ tempDir.deleteSync(recursive: true);
+ } catch (_) {
+ // ignore errors
+ }
+ throw e;
+ });
} finally {
- tempDir.deleteSync(recursive: true);
+ try {
+ tempDir.deleteSync(recursive: true);
+ } catch (_) {
+ // ignore errors
+ }
}
}
-void main() async {
- try {
+void main(List<String> args) async {
+ runZonedGuarded(() async {
+ if (args.length > 0 && args[0] == '--start-stdio-message-test') {
+ await withTempDir('unix_socket_test', (Directory dir) async {
+ await testStdioMessage('${dir.path}', caller: false);
+ });
+ return;
+ }
+
await withTempDir('unix_socket_test', (Directory dir) async {
await testAddress('${dir.path}');
});
@@ -477,12 +880,27 @@
await withTempDir('unix_socket_test', (Directory dir) async {
await testShortAbstractAddress(dir.uri.pathSegments.last);
});
- } catch (e) {
+ await withTempDir('unix_socket_test', (Directory dir) async {
+ await testFileMessage('${dir.path}');
+ });
+ await withTempDir('unix_socket_test', (Directory dir) async {
+ await testFileMessageWithShortRead('${dir.path}');
+ });
+ await withTempDir('unix_socket_test', (Directory dir) async {
+ await testTooLargeControlMessage('${dir.path}');
+ });
+ await withTempDir('unix_socket_test', (Directory dir) async {
+ await testSocketMessage('${dir.path}');
+ });
+ await withTempDir('unix_socket_test', (Directory dir) async {
+ await testStdioMessage('${dir.path}', caller: true);
+ });
+ }, (e, st) {
if (Platform.isMacOS || Platform.isLinux || Platform.isAndroid) {
Expect.fail("Unexpected exception $e is thrown");
} else {
Expect.isTrue(e is SocketException);
Expect.isTrue(e.toString().contains('not available'));
}
- }
+ });
}
diff --git a/tests/standalone_2/io/unix_socket_test.dart b/tests/standalone_2/io/unix_socket_test.dart
index a82a155..98b27a5 100644
--- a/tests/standalone_2/io/unix_socket_test.dart
+++ b/tests/standalone_2/io/unix_socket_test.dart
@@ -437,18 +437,421 @@
await httpServer.close();
}
+Future testFileMessage(String tempDirPath) async {
+ if (!Platform.isLinux && !Platform.isAndroid) {
+ return;
+ }
+
+ final completer = Completer<bool>();
+
+ final address =
+ InternetAddress('$tempDirPath/sock', type: InternetAddressType.unix);
+ final server = await RawServerSocket.bind(address, 0, shared: false);
+
+ server.listen((RawSocket socket) async {
+ print('server started a socket $socket');
+ socket.listen((e) {
+ if (e == RawSocketEvent.read) {
+ final SocketMessage message = socket.readMessage();
+ if (message == null) {
+ return;
+ }
+ print('server received message $message');
+ final String messageData = String.fromCharCodes(message.data);
+ print('server received messageData $messageData');
+ if (messageData == 'EmptyMessage') {
+ Expect.equals('EmptyMessage'.length, message.data.length);
+ Expect.isTrue(message.controlMessages.isEmpty);
+ return;
+ }
+ Expect.equals('Hello', messageData);
+ Expect.equals('Hello'.length, message.data.length);
+ Expect.equals(1, message.controlMessages.length);
+ final SocketControlMessage controlMessage = message.controlMessages[0];
+ final handles = controlMessage.extractHandles();
+ Expect.isNotNull(handles);
+ Expect.equals(1, handles.length);
+ final receivedFile = handles[0].toFile();
+ receivedFile.writeStringSync('Hello, server!\n');
+ print("server has written to the $receivedFile file");
+ socket.write('abc'.codeUnits);
+ } else if (e == RawSocketEvent.readClosed) {
+ print('server socket got readClosed');
+ socket.close();
+ server.close();
+ }
+ });
+ });
+
+ final file = File('$tempDirPath/myfile.txt');
+ final randomAccessFile = file.openSync(mode: FileMode.write);
+ // Send a message with sample file.
+ final socket = await RawSocket.connect(address, 0);
+ socket.listen((e) {
+ if (e == RawSocketEvent.write) {
+ randomAccessFile.writeStringSync('Hello, client!\n');
+ socket.sendMessage(<SocketControlMessage>[
+ SocketControlMessage.fromHandles(
+ <ResourceHandle>[ResourceHandle.fromFile(randomAccessFile)])
+ ], 'Hello'.codeUnits);
+ print('client sent a message');
+ socket.sendMessage(<SocketControlMessage>[], 'EmptyMessage'.codeUnits);
+ print('client sent a message without control data');
+ } else if (e == RawSocketEvent.read) {
+ final data = socket.read();
+ Expect.equals('abc', String.fromCharCodes(data));
+ Expect.equals(
+ 'Hello, client!\nHello, server!\n', file.readAsStringSync());
+ socket.close();
+ completer.complete(true);
+ }
+ });
+
+ return completer.future;
+}
+
+Future testTooLargeControlMessage(String tempDirPath) async {
+ if (!Platform.isLinux && !Platform.isAndroid) {
+ return;
+ }
+ final completer = Completer<bool>();
+ final address =
+ InternetAddress('$tempDirPath/sock', type: InternetAddressType.unix);
+ final server = await RawServerSocket.bind(address, 0, shared: false);
+
+ server.listen((RawSocket socket) async {
+ print('server started a socket $socket');
+ socket.listen((e) {
+ if (e == RawSocketEvent.read) {
+ throw "Server should not receive request from the client";
+ } else if (e == RawSocketEvent.readClosed) {
+ socket.close();
+ server.close();
+ }
+ });
+ });
+
+ final file = File('$tempDirPath/myfile.txt');
+ final randomAccessFile = file.openSync(mode: FileMode.write);
+ // Send a message with sample file.
+ final socket = await RawSocket.connect(address, 0);
+
+ runZonedGuarded(
+ () => socket.listen((e) {
+ if (e == RawSocketEvent.write) {
+ randomAccessFile.writeStringSync('Hello, client!\n');
+ const int largeHandleCount = 1024;
+ final manyResourceHandles = List<ResourceHandle>.filled(
+ largeHandleCount, ResourceHandle.fromFile(randomAccessFile));
+ socket.sendMessage(<SocketControlMessage>[
+ SocketControlMessage.fromHandles(manyResourceHandles)
+ ], 'Hello'.codeUnits);
+ server.close();
+ socket.close();
+ }
+ }), (e, st) {
+ print('Got expected unhandled exception $e $st');
+ Expect.equals(true, e is SocketException);
+ completer.complete(true);
+ });
+
+ return completer.future;
+}
+
+Future testFileMessageWithShortRead(String tempDirPath) async {
+ if (!Platform.isLinux && !Platform.isAndroid) {
+ return;
+ }
+
+ final completer = Completer<bool>();
+
+ final address =
+ InternetAddress('$tempDirPath/sock', type: InternetAddressType.unix);
+ final server = await RawServerSocket.bind(address, 0, shared: false);
+
+ server.listen((RawSocket socket) async {
+ print('server started a socket $socket');
+ socket.listen((e) {
+ if (e == RawSocketEvent.read) {
+ Expect.throws(
+ () => socket.readMessage(0),
+ (e) =>
+ e is ArgumentError &&
+ e.toString().contains('Illegal length 0'));
+ final SocketMessage message = socket.readMessage(/*count=*/ 1);
+ if (message == null) {
+ return;
+ }
+ print('server received message $message');
+ final String messageData = String.fromCharCodes(message.data);
+ print('messageData: $messageData');
+ if (messageData[0] == 'H') {
+ Expect.equals(1, message.controlMessages.length);
+ final SocketControlMessage controlMessage =
+ message.controlMessages[0];
+ final handles = controlMessage.extractHandles();
+ Expect.isNotNull(handles);
+ Expect.equals(1, handles.length);
+ final handlesAgain = controlMessage.extractHandles();
+ Expect.isNotNull(handlesAgain);
+ Expect.equals(1, handlesAgain.length);
+ handles[0].toFile().writeStringSync('Hello, server!\n');
+ socket.write('abc'.codeUnits);
+ } else {
+ Expect.equals('i', messageData[0]);
+ Expect.equals(0, message.controlMessages.length);
+ }
+ } else if (e == RawSocketEvent.readClosed) {
+ print('server socket got readClosed');
+ socket.close();
+ server.close();
+ }
+ });
+ });
+
+ final file = File('$tempDirPath/myfile.txt');
+ final randomAccessFile = file.openSync(mode: FileMode.write);
+ // Send a message with sample file.
+ final socket = await RawSocket.connect(address, 0);
+ socket.listen((e) {
+ if (e == RawSocketEvent.write) {
+ randomAccessFile.writeStringSync('Hello, client!\n');
+ socket.sendMessage(<SocketControlMessage>[
+ SocketControlMessage.fromHandles(
+ <ResourceHandle>[ResourceHandle.fromFile(randomAccessFile)])
+ ], 'Hi'.codeUnits);
+ print('client sent a message');
+ } else if (e == RawSocketEvent.read) {
+ final data = socket.read();
+ Expect.equals('abc', String.fromCharCodes(data));
+ Expect.equals(
+ 'Hello, client!\nHello, server!\n', file.readAsStringSync());
+ socket.close();
+ completer.complete(true);
+ }
+ });
+
+ return completer.future;
+}
+
+Future<RawServerSocket> createTestServer() async {
+ final server = await RawServerSocket.bind(InternetAddress.LOOPBACK_IP_V4, 0);
+ return server
+ ..listen((client) {
+ String receivedData = "";
+
+ client.writeEventsEnabled = false;
+ client.listen((event) {
+ switch (event) {
+ case RawSocketEvent.READ:
+ assert(client.available() > 0);
+ final buffer = client.read(200);
+ receivedData += String.fromCharCodes(buffer);
+ break;
+ case RawSocketEvent.READ_CLOSED:
+ client.close();
+ server.close();
+ break;
+ case RawSocketEvent.CLOSED:
+ Expect.equals(
+ "Hello, client 1!\nHello, client 2!\nHello, server!\n",
+ receivedData);
+ break;
+ default:
+ throw "Unexpected event $event";
+ }
+ }, onError: (e) {
+ print("client ERROR $e");
+ });
+ });
+}
+
+Future testSocketMessage(String uniqueName) async {
+ if (!Platform.isLinux && !Platform.isAndroid) {
+ return;
+ }
+
+ final address =
+ InternetAddress('$uniqueName/sock', type: InternetAddressType.unix);
+ final server = await RawServerSocket.bind(address, 0, shared: false);
+
+ server.listen((RawSocket socket) async {
+ socket.listen((e) {
+ switch (e) {
+ case RawSocketEvent.read:
+ final SocketMessage message = socket.readMessage();
+ if (message == null) {
+ return;
+ }
+ Expect.equals('Hello', String.fromCharCodes(message.data));
+ Expect.equals(1, message.controlMessages.length);
+ final SocketControlMessage controlMessage =
+ message.controlMessages[0];
+ final handles = controlMessage.extractHandles();
+ Expect.isNotNull(handles);
+ Expect.equals(1, handles.length);
+ final receivedSocket = handles[0].toRawSocket();
+ receivedSocket.write('Hello, server!\n'.codeUnits);
+ socket.write('server replied'.codeUnits);
+ break;
+ case RawSocketEvent.readClosed:
+ socket.close();
+ server.close();
+ break;
+ }
+ });
+ });
+
+ final RawServerSocket testServer = await createTestServer();
+ final testSocket = await RawSocket.connect("127.0.0.1", testServer.port);
+
+ // Send a message with opened [testSocket] socket.
+ final socket = await RawSocket.connect(address, 0);
+ socket.listen((e) {
+ switch (e) {
+ case RawSocketEvent.write:
+ testSocket.write('Hello, client 1!\n'.codeUnits);
+ socket.sendMessage(<SocketControlMessage>[
+ SocketControlMessage.fromHandles(
+ <ResourceHandle>[ResourceHandle.fromRawSocket(testSocket)])
+ ], 'Hello'.codeUnits);
+ testSocket.write('Hello, client 2!\n'.codeUnits);
+ break;
+ case RawSocketEvent.read:
+ final data = socket.read();
+ if (data == null) {
+ return;
+ }
+
+ final dataString = String.fromCharCodes(data);
+ Expect.equals('server replied', dataString);
+ socket.close();
+ testSocket.close();
+ testServer.close();
+ }
+ });
+}
+
+Future testStdioMessage(String tempDirPath, {bool caller: false}) async {
+ if (!Platform.isLinux && !Platform.isAndroid) {
+ return;
+ }
+
+ final completer = Completer<bool>();
+
+ if (caller) {
+ final process = await Process.start(Platform.resolvedExecutable,
+ <String>[Platform.script.toFilePath(), '--start-stdio-message-test']);
+ String processStdout = "";
+ String processStderr = "";
+ process.stdout.transform(utf8.decoder).listen((line) {
+ processStdout += line;
+ print('stdout:>$line<');
+ });
+ process.stderr.transform(utf8.decoder).listen((line) {
+ processStderr += line;
+ print('stderr:>$line<');
+ });
+ process.stdin.writeln('Caller wrote to stdin');
+
+ Expect.equals(0, await process.exitCode);
+ Expect.equals("client sent a message\nHello, server!\n", processStdout);
+ Expect.equals(
+ "client wrote to stderr\nHello, server too!\n", processStderr);
+ return;
+ }
+
+ final address =
+ InternetAddress('$tempDirPath/sock', type: InternetAddressType.unix);
+ final server = await RawServerSocket.bind(address, 0, shared: false);
+
+ server.listen((RawSocket socket) async {
+ socket.listen((e) {
+ if (e == RawSocketEvent.read) {
+ final SocketMessage message = socket.readMessage();
+ if (message == null) {
+ return;
+ }
+ Expect.equals('Hello', String.fromCharCodes(message.data));
+ Expect.equals(message.controlMessages.length, 1);
+ final SocketControlMessage controlMessage = message.controlMessages[0];
+ final handles = controlMessage.extractHandles();
+ Expect.isNotNull(handles);
+ Expect.equals(3, handles.length);
+ final receivedStdin = handles[0].toFile();
+ final receivedString = String.fromCharCodes(receivedStdin.readSync(32));
+ Expect.equals('Caller wrote to stdin\n', receivedString);
+ final receivedStdout = handles[1].toFile();
+ receivedStdout.writeStringSync('Hello, server!\n');
+ final receivedStderr = handles[2].toFile();
+ receivedStderr.writeStringSync('Hello, server too!\n');
+ socket.write('abc'.codeUnits);
+ } else if (e == RawSocketEvent.readClosed) {
+ socket.close();
+ server.close();
+ }
+ });
+ });
+
+ final file = File('$tempDirPath/myfile.txt');
+ final randomAccessFile = file.openSync(mode: FileMode.write);
+ // Send a message with sample file.
+ var socket = await RawSocket.connect(address, 0);
+ socket.listen((e) {
+ if (e == RawSocketEvent.write) {
+ socket.sendMessage(<SocketControlMessage>[
+ SocketControlMessage.fromHandles(<ResourceHandle>[
+ ResourceHandle.fromStdin(stdin),
+ ResourceHandle.fromStdout(stdout),
+ ResourceHandle.fromStdout(stderr)
+ ])
+ ], 'Hello'.codeUnits);
+ stdout.writeln('client sent a message');
+ stderr.writeln('client wrote to stderr');
+ } else if (e == RawSocketEvent.read) {
+ final data = socket.read();
+ if (data == null) {
+ return;
+ }
+ Expect.equals('abc', String.fromCharCodes(data));
+ socket.close();
+ completer.complete(true);
+ }
+ });
+
+ return completer.future;
+}
+
// Create socket in temp directory
Future withTempDir(String prefix, Future<void> test(Directory dir)) async {
- var tempDir = Directory.systemTemp.createTempSync(prefix);
+ final tempDir = Directory.systemTemp.createTempSync(prefix);
try {
- await test(tempDir);
+ await runZonedGuarded(() => test(tempDir), (e, st) {
+ try {
+ tempDir.deleteSync(recursive: true);
+ } catch (_) {
+ // ignore errors
+ }
+ throw e;
+ });
} finally {
- tempDir.deleteSync(recursive: true);
+ try {
+ tempDir.deleteSync(recursive: true);
+ } catch (_) {
+ // ignore errors
+ }
}
}
-void main() async {
- try {
+void main(List<String> args) async {
+ runZonedGuarded(() async {
+ if (args.length > 0 && args[0] == '--start-stdio-message-test') {
+ await withTempDir('unix_socket_test', (Directory dir) async {
+ await testStdioMessage('${dir.path}', caller: false);
+ });
+ return;
+ }
+
await withTempDir('unix_socket_test', (Directory dir) async {
await testAddress('${dir.path}');
});
@@ -479,12 +882,27 @@
await withTempDir('unix_socket_test', (Directory dir) async {
await testShortAbstractAddress(dir.uri.pathSegments.last);
});
- } catch (e) {
+ await withTempDir('unix_socket_test', (Directory dir) async {
+ await testFileMessage('${dir.path}');
+ });
+ await withTempDir('unix_socket_test', (Directory dir) async {
+ await testFileMessageWithShortRead('${dir.path}');
+ });
+ await withTempDir('unix_socket_test', (Directory dir) async {
+ await testTooLargeControlMessage('${dir.path}');
+ });
+ await withTempDir('unix_socket_test', (Directory dir) async {
+ await testSocketMessage('${dir.path}');
+ });
+ await withTempDir('unix_socket_test', (Directory dir) async {
+ await testStdioMessage('${dir.path}', caller: true);
+ });
+ }, (e, st) {
if (Platform.isMacOS || Platform.isLinux || Platform.isAndroid) {
Expect.fail("Unexpected exception $e is thrown");
} else {
Expect.isTrue(e is SocketException);
Expect.isTrue(e.toString().contains('not available'));
}
- }
+ });
}
diff --git a/tools/VERSION b/tools/VERSION
index 6e5e310..0510da6 100644
--- a/tools/VERSION
+++ b/tools/VERSION
@@ -27,5 +27,5 @@
MAJOR 2
MINOR 15
PATCH 0
-PRERELEASE 199
+PRERELEASE 200
PRERELEASE_PATCH 0
\ No newline at end of file