| // Copyright (c) 2016, the Dart project authors. Please see the AUTHORS file |
| // for details. All rights reserved. Use of this source code is governed by a |
| // BSD-style license that can be found in the LICENSE file. |
| |
| #include "platform/globals.h" |
| #if defined(DART_HOST_OS_FUCHSIA) |
| |
| #include "bin/eventhandler.h" |
| #include "bin/eventhandler_fuchsia.h" |
| |
| #include <errno.h> |
| #include <fcntl.h> |
| #include <poll.h> |
| #include <pthread.h> |
| #include <stdio.h> |
| #include <string.h> |
| #include <sys/socket.h> |
| #include <sys/stat.h> |
| #include <unistd.h> |
| #include <zircon/status.h> |
| #include <zircon/syscalls.h> |
| #include <zircon/syscalls/object.h> |
| #include <zircon/syscalls/port.h> |
| |
| #include "bin/fdutils.h" |
| #include "bin/lockers.h" |
| #include "bin/socket.h" |
| #include "bin/thread.h" |
| #include "bin/utils.h" |
| #include "platform/hashmap.h" |
| #include "platform/syslog.h" |
| #include "platform/utils.h" |
| |
| // The EventHandler for Fuchsia uses its "ports v2" API: |
| // https://fuchsia.googlesource.com/fuchsia/+/HEAD/zircon/docs/syscalls/port_create.md |
| // This API does not have epoll()-like edge triggering (EPOLLET). Since clients |
| // of the EventHandler expect edge-triggered notifications, we must simulate it. |
| // When a packet from zx_port_wait() indicates that a signal is asserted for a |
| // handle, we unsubscribe from that signal until the event that asserted the |
| // signal can be processed. For example: |
| // |
| // 1. We get ZX_SOCKET_WRITABLE from zx_port_wait() for a handle. |
| // 2. We send kOutEvent to the Dart thread. |
| // 3. We unsubscribe from further ZX_SOCKET_WRITABLE signals for the handle. |
| // 4. Some time later the Dart thread actually does a write(). |
| // 5. After writing, the Dart thread resubscribes to write events. |
| // |
| // We use the same procedure for ZX_SOCKET_READABLE, and read()/accept(). |
| |
| // define EVENTHANDLER_LOG_ERROR to get log messages only for errors. |
| // define EVENTHANDLER_LOG_INFO to get log messages for both information and |
| // errors. |
| // #define EVENTHANDLER_LOG_INFO 1 |
| #define EVENTHANDLER_LOG_ERROR 1 |
| #if defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR) |
| #define LOG_ERR(msg, ...) \ |
| { \ |
| int err = errno; \ |
| Syslog::PrintErr("Dart EventHandler ERROR: %s:%d: " msg, __FILE__, \ |
| __LINE__, ##__VA_ARGS__); \ |
| errno = err; \ |
| } |
| #if defined(EVENTHANDLER_LOG_INFO) |
| #define LOG_INFO(msg, ...) \ |
| Syslog::Print("Dart EventHandler INFO: %s:%d: " msg, __FILE__, __LINE__, \ |
| ##__VA_ARGS__) |
| #else |
| #define LOG_INFO(msg, ...) |
| #endif // defined(EVENTHANDLER_LOG_INFO) |
| #else |
| #define LOG_ERR(msg, ...) |
| #define LOG_INFO(msg, ...) |
| #endif // defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR) |
| |
| namespace dart { |
| namespace bin { |
| |
| intptr_t IOHandle::Read(void* buffer, intptr_t num_bytes) { |
| MutexLocker ml(&mutex_); |
| const ssize_t read_bytes = NO_RETRY_EXPECTED(read(fd_, buffer, num_bytes)); |
| const int err = errno; |
| LOG_INFO("IOHandle::Read: fd = %ld. read %ld bytes\n", fd_, read_bytes); |
| |
| // Track the number of bytes available to read. |
| if (read_bytes > 0) { |
| available_bytes_ -= |
| (available_bytes_ >= read_bytes) ? read_bytes : available_bytes_; |
| } |
| |
| // If we have read all available bytes, or if there was an error, then |
| // re-enable read events. We re-enable read events even if read() returns |
| // an error. The error might be, e.g. EWOULDBLOCK, in which case |
| // resubscription is necessary. Logic in the caller decides which errors |
| // are real, and which are ignore-and-continue. |
| if ((available_bytes_ == 0) || (read_bytes < 0)) { |
| // Resubscribe to read events. |
| read_events_enabled_ = true; |
| if (wait_key_ == 0) { |
| LOG_ERR("IOHandle::Read calling AsyncWaitLocked with wait_key_ == 0"); |
| } |
| if (!AsyncWaitLocked(ZX_HANDLE_INVALID, POLLIN, wait_key_)) { |
| LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_); |
| } |
| } |
| |
| errno = err; |
| return read_bytes; |
| } |
| |
| intptr_t IOHandle::Write(const void* buffer, intptr_t num_bytes) { |
| MutexLocker ml(&mutex_); |
| const ssize_t written_bytes = |
| NO_RETRY_EXPECTED(write(fd_, buffer, num_bytes)); |
| const int err = errno; |
| LOG_INFO("IOHandle::Write: fd = %ld. wrote %ld bytes\n", fd_, written_bytes); |
| |
| // Resubscribe to write events. |
| write_events_enabled_ = true; |
| if (wait_key_ == 0) { |
| LOG_ERR("IOHandle::Write calling AsyncWaitLocked with wait_key_ == 0"); |
| } |
| if (!AsyncWaitLocked(ZX_HANDLE_INVALID, POLLOUT, wait_key_)) { |
| LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_); |
| } |
| |
| errno = err; |
| return written_bytes; |
| } |
| |
| intptr_t IOHandle::Accept(struct sockaddr* addr, socklen_t* addrlen) { |
| MutexLocker ml(&mutex_); |
| const intptr_t socket = NO_RETRY_EXPECTED(accept(fd_, addr, addrlen)); |
| const int err = errno; |
| LOG_INFO("IOHandle::Accept: fd = %ld. socket = %ld\n", fd_, socket); |
| |
| // Re-subscribe to read events. |
| read_events_enabled_ = true; |
| if (wait_key_ == 0) { |
| LOG_ERR("IOHandle::Accept calling AsyncWaitLocked with wait_key_ == 0"); |
| } |
| if (!AsyncWaitLocked(ZX_HANDLE_INVALID, POLLIN, wait_key_)) { |
| LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_); |
| } |
| |
| errno = err; |
| return socket; |
| } |
| |
| intptr_t IOHandle::AvailableBytes() { |
| MutexLocker ml(&mutex_); |
| ASSERT(fd_ >= 0); |
| intptr_t available = FDUtils::AvailableBytes(fd_); |
| LOG_INFO("IOHandle::AvailableBytes(): fd = %ld, bytes = %ld\n", fd_, |
| available); |
| if (available < 0) { |
| // If there is an error, we set available to 1 to trigger a read event that |
| // then propagates the error. |
| available = 1; |
| } |
| available_bytes_ = available; |
| return available; |
| } |
| |
| void IOHandle::Close() { |
| MutexLocker ml(&mutex_); |
| VOID_NO_RETRY_EXPECTED(close(fd_)); |
| } |
| |
| uint32_t IOHandle::MaskToEpollEvents(intptr_t mask) { |
| MutexLocker ml(&mutex_); |
| // Do not ask for POLLERR and POLLHUP explicitly as they are |
| // triggered anyway. |
| uint32_t events = 0; |
| // Do not subscribe to read closed events when kCloseEvent has already been |
| // sent to the Dart thread. |
| if (close_events_enabled_) { |
| events |= POLLRDHUP; |
| } |
| if (read_events_enabled_ && ((mask & (1 << kInEvent)) != 0)) { |
| events |= POLLIN; |
| } |
| if (write_events_enabled_ && ((mask & (1 << kOutEvent)) != 0)) { |
| events |= POLLOUT; |
| } |
| return events; |
| } |
| |
| intptr_t IOHandle::EpollEventsToMask(intptr_t events) { |
| if ((events & POLLERR) != 0) { |
| // Return error only if POLLIN is present. |
| return ((events & POLLIN) != 0) ? (1 << kErrorEvent) : 0; |
| } |
| intptr_t event_mask = 0; |
| if ((events & POLLIN) != 0) { |
| event_mask |= (1 << kInEvent); |
| } |
| if ((events & POLLOUT) != 0) { |
| event_mask |= (1 << kOutEvent); |
| } |
| if ((events & (POLLHUP | POLLRDHUP)) != 0) { |
| event_mask |= (1 << kCloseEvent); |
| } |
| return event_mask; |
| } |
| |
| bool IOHandle::AsyncWaitLocked(zx_handle_t port, |
| uint32_t events, |
| uint64_t key) { |
| LOG_INFO("IOHandle::AsyncWaitLocked: fd = %ld\n", fd_); |
| if (key == 0) { |
| LOG_ERR("IOHandle::AsyncWaitLocked called with key == 0"); |
| } |
| // The call to fdio_unsafe_fd_to_io() in the DescriptorInfo constructor may |
| // have returned nullptr. If it did, propagate the problem up to Dart. |
| if (fdio_ == nullptr) { |
| LOG_ERR("fdio_unsafe_fd_to_io(%ld) returned nullptr\n", fd_); |
| return false; |
| } |
| |
| zx_handle_t handle; |
| zx_signals_t signals; |
| fdio_unsafe_wait_begin(fdio_, events, &handle, &signals); |
| if (handle == ZX_HANDLE_INVALID) { |
| LOG_ERR("fd = %ld fdio_unsafe_wait_begin returned an invalid handle\n", |
| fd_); |
| return false; |
| } |
| |
| // Remember the port. Use the remembered port if the argument "port" is |
| // ZX_HANDLE_INVALID. |
| ASSERT((port != ZX_HANDLE_INVALID) || (port_ != ZX_HANDLE_INVALID)); |
| if ((port_ == ZX_HANDLE_INVALID) || (port != ZX_HANDLE_INVALID)) { |
| port_ = port; |
| } |
| |
| handle_ = handle; |
| wait_key_ = key; |
| LOG_INFO("zx_object_wait_async(fd = %ld, signals = %x)\n", fd_, signals); |
| zx_status_t status = |
| zx_object_wait_async(handle_, port_, key, signals, ZX_WAIT_ASYNC_ONCE); |
| if (status != ZX_OK) { |
| LOG_ERR("zx_object_wait_async failed: %s\n", zx_status_get_string(status)); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| bool IOHandle::AsyncWait(zx_handle_t port, uint32_t events, uint64_t key) { |
| MutexLocker ml(&mutex_); |
| return AsyncWaitLocked(port, events, key); |
| } |
| |
| void IOHandle::CancelWait(zx_handle_t port, uint64_t key) { |
| MutexLocker ml(&mutex_); |
| LOG_INFO("IOHandle::CancelWait: fd = %ld\n", fd_); |
| ASSERT(port != ZX_HANDLE_INVALID); |
| ASSERT(handle_ != ZX_HANDLE_INVALID); |
| if (key == 0) { |
| LOG_ERR("IOHandle::CancelWait calling zx_port_cancel with key == 0"); |
| } |
| zx_status_t status = zx_port_cancel(port, handle_, key); |
| if ((status != ZX_OK) && (status != ZX_ERR_NOT_FOUND)) { |
| LOG_ERR("zx_port_cancel failed: %s\n", zx_status_get_string(status)); |
| } |
| } |
| |
| uint32_t IOHandle::WaitEnd(zx_signals_t observed) { |
| MutexLocker ml(&mutex_); |
| uint32_t events = 0; |
| fdio_unsafe_wait_end(fdio_, observed, &events); |
| LOG_INFO("IOHandle::WaitEnd: fd = %ld, events = %x\n", fd_, events); |
| return events; |
| } |
| |
| // This function controls the simulation of edge-triggering. It is responsible |
| // for removing events from the event mask when they should be suppressed, and |
| // for suppressing future events. Events are unsuppressed by their respective |
| // operations by the Dart thread on the socket---that is, where the |
| // *_events_enabled_ flags are set to true. |
| intptr_t IOHandle::ToggleEvents(intptr_t event_mask) { |
| MutexLocker ml(&mutex_); |
| // If write events are disabled, then remove the kOutEvent bit from the |
| // event mask. |
| if (!write_events_enabled_) { |
| LOG_INFO( |
| "IOHandle::ToggleEvents: fd = %ld " |
| "de-asserting kOutEvent\n", |
| fd_); |
| event_mask = event_mask & ~(1 << kOutEvent); |
| } |
| // If the kOutEvent bit is set, then suppress future write events until the |
| // Dart thread writes. |
| if ((event_mask & (1 << kOutEvent)) != 0) { |
| LOG_INFO( |
| "IOHandle::ToggleEvents: fd = %ld " |
| "asserting kOutEvent and disabling\n", |
| fd_); |
| write_events_enabled_ = false; |
| } |
| |
| // If read events are disabled, then remove the kInEvent bit from the event |
| // mask. |
| if (!read_events_enabled_) { |
| LOG_INFO( |
| "IOHandle::ToggleEvents: fd = %ld " |
| "de-asserting kInEvent\n", |
| fd_); |
| event_mask = event_mask & ~(1 << kInEvent); |
| } |
| // We may get In events without available bytes, so we must make sure there |
| // are actually bytes, or we will never resubscribe (due to a short-circuit |
| // on the Dart side). |
| // |
| // This happens due to how packets get enqueued on the port with all signals |
| // asserted at that time. Sometimes we enqueue a packet due to |
| // zx_object_wait_async e.g. for POLLOUT (writability) while the socket is |
| // readable and while we have a Read queued up on the Dart side. This packet |
| // will also have POLLIN (readable) asserted. We may then perform the Read |
| // and drain the socket before our zx_port_wait is serviced, at which point |
| // when we process the packet for POLLOUT with its stale POLLIN (readable) |
| // signal, the socket is no longer actually readable. |
| // |
| // As a detail, negative available bytes (errors) are handled specially; see |
| // IOHandle::AvailableBytes for more information. |
| if ((event_mask & (1 << kInEvent)) != 0) { |
| if (FDUtils::AvailableBytes(fd_) != 0) { |
| LOG_INFO( |
| "IOHandle::ToggleEvents: fd = %ld " |
| "asserting kInEvent and disabling with bytes available\n", |
| fd_); |
| read_events_enabled_ = false; |
| } |
| // Also suppress future read events if we get a kCloseEvent. This is to |
| // account for POLLIN being set by Fuchsia when the socket is read-closed. |
| if ((event_mask & (1 << kCloseEvent)) != 0) { |
| LOG_INFO( |
| "IOHandle::ToggleEvents: fd = %ld " |
| "asserting kInEvent and disabling due to a close event\n", |
| fd_); |
| read_events_enabled_ = false; |
| } |
| } |
| |
| // If the close events are disabled, then remove the kCloseEvent bit from the |
| // event mask. |
| if (!close_events_enabled_) { |
| LOG_INFO( |
| "IOHandle::ToggleEvents: fd = %ld " |
| "de-asserting kCloseEvent\n", |
| fd_); |
| event_mask = event_mask & ~(1 << kCloseEvent); |
| } |
| // If the kCloseEvent bit is set, then suppress future close events, they will |
| // be ignored by the Dart thread. See _NativeSocket.multiplex in |
| // socket_patch.dart. |
| if ((event_mask & (1 << kCloseEvent)) != 0) { |
| LOG_INFO( |
| "IOHandle::ToggleEvents: fd = %ld " |
| "asserting kCloseEvent and disabling\n", |
| fd_); |
| close_events_enabled_ = false; |
| } |
| return event_mask; |
| } |
| |
| void EventHandlerImplementation::AddToPort(zx_handle_t port_handle, |
| DescriptorInfo* di) { |
| const uint32_t events = di->io_handle()->MaskToEpollEvents(di->Mask()); |
| const uint64_t key = reinterpret_cast<uint64_t>(di); |
| if (key == 0) { |
| LOG_ERR( |
| "EventHandlerImplementation::AddToPort calling AsyncWait with key == " |
| "0"); |
| } |
| if (!di->io_handle()->AsyncWait(port_handle, events, key)) { |
| di->NotifyAllDartPorts(1 << kCloseEvent); |
| } |
| } |
| |
| void EventHandlerImplementation::RemoveFromPort(zx_handle_t port_handle, |
| DescriptorInfo* di) { |
| const uint64_t key = reinterpret_cast<uint64_t>(di); |
| if (key == 0) { |
| LOG_ERR( |
| "EventHandlerImplementation::RemoveFromPort calling CancelWait with " |
| "key == 0"); |
| } |
| di->io_handle()->CancelWait(port_handle, key); |
| } |
| |
| EventHandlerImplementation::EventHandlerImplementation() |
| : socket_map_(&SimpleHashMap::SamePointerValue, 16) { |
| shutdown_ = false; |
| // Create the port. |
| port_handle_ = ZX_HANDLE_INVALID; |
| zx_status_t status = zx_port_create(0, &port_handle_); |
| if (status != ZX_OK) { |
| // This is a FATAL because the VM won't work at all if we can't create this |
| // port. |
| FATAL("zx_port_create failed: %s\n", zx_status_get_string(status)); |
| } |
| ASSERT(port_handle_ != ZX_HANDLE_INVALID); |
| } |
| |
| static void DeleteDescriptorInfo(void* info) { |
| DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info); |
| LOG_INFO("Closed %ld\n", di->io_handle()->fd()); |
| di->Close(); |
| delete di; |
| } |
| |
| EventHandlerImplementation::~EventHandlerImplementation() { |
| socket_map_.Clear(DeleteDescriptorInfo); |
| zx_handle_close(port_handle_); |
| port_handle_ = ZX_HANDLE_INVALID; |
| } |
| |
| void EventHandlerImplementation::UpdatePort(intptr_t old_mask, |
| DescriptorInfo* di) { |
| const intptr_t new_mask = di->Mask(); |
| if ((old_mask != 0) && (new_mask == 0)) { |
| RemoveFromPort(port_handle_, di); |
| } else if ((old_mask == 0) && (new_mask != 0)) { |
| AddToPort(port_handle_, di); |
| } else if ((old_mask != 0) && (new_mask != 0)) { |
| ASSERT((old_mask == new_mask) || !di->IsListeningSocket()); |
| RemoveFromPort(port_handle_, di); |
| AddToPort(port_handle_, di); |
| } |
| } |
| |
| DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo( |
| intptr_t fd, |
| bool is_listening) { |
| IOHandle* handle = reinterpret_cast<IOHandle*>(fd); |
| ASSERT(handle->fd() >= 0); |
| SimpleHashMap::Entry* entry = |
| socket_map_.Lookup(GetHashmapKeyFromFd(handle->fd()), |
| GetHashmapHashFromFd(handle->fd()), true); |
| ASSERT(entry != nullptr); |
| DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value); |
| if (di == nullptr) { |
| // If there is no data in the hash map for this file descriptor a |
| // new DescriptorInfo for the file descriptor is inserted. |
| if (is_listening) { |
| di = new DescriptorInfoMultiple(fd); |
| } else { |
| di = new DescriptorInfoSingle(fd); |
| } |
| entry->value = di; |
| } |
| ASSERT(fd == di->fd()); |
| return di; |
| } |
| |
| void EventHandlerImplementation::WakeupHandler(intptr_t id, |
| Dart_Port dart_port, |
| int64_t data) { |
| COMPILE_ASSERT(sizeof(InterruptMessage) <= sizeof(zx_packet_user_t)); |
| zx_port_packet_t pkt; |
| InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt.user); |
| pkt.key = kInterruptPacketKey; |
| msg->id = id; |
| msg->dart_port = dart_port; |
| msg->data = data; |
| zx_status_t status = zx_port_queue(port_handle_, &pkt); |
| if (status != ZX_OK) { |
| // This is a FATAL because the VM won't work at all if we can't send any |
| // messages to the EventHandler thread. |
| FATAL("zx_port_queue failed: %s\n", zx_status_get_string(status)); |
| } |
| } |
| |
| void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) { |
| if (msg->id == kTimerId) { |
| LOG_INFO("HandleInterrupt read timer update\n"); |
| timeout_queue_.UpdateTimeout(msg->dart_port, msg->data); |
| return; |
| } else if (msg->id == kShutdownId) { |
| LOG_INFO("HandleInterrupt read shutdown\n"); |
| shutdown_ = true; |
| return; |
| } |
| ASSERT((msg->data & COMMAND_MASK) != 0); |
| LOG_INFO("HandleInterrupt command:\n"); |
| Socket* socket = reinterpret_cast<Socket*>(msg->id); |
| RefCntReleaseScope<Socket> rs(socket); |
| if (socket->fd() == -1) { |
| return; |
| } |
| IOHandle* io_handle = reinterpret_cast<IOHandle*>(socket->fd()); |
| const intptr_t fd = io_handle->fd(); |
| DescriptorInfo* di = |
| GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg->data)); |
| ASSERT(io_handle == di->io_handle()); |
| if (IS_COMMAND(msg->data, kShutdownReadCommand)) { |
| ASSERT(!di->IsListeningSocket()); |
| // Close the socket for reading. |
| LOG_INFO("\tSHUT_RD: %ld\n", fd); |
| VOID_NO_RETRY_EXPECTED(shutdown(fd, SHUT_RD)); |
| } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) { |
| ASSERT(!di->IsListeningSocket()); |
| // Close the socket for writing. |
| LOG_INFO("\tSHUT_WR: %ld\n", fd); |
| VOID_NO_RETRY_EXPECTED(shutdown(fd, SHUT_WR)); |
| } else if (IS_COMMAND(msg->data, kCloseCommand)) { |
| // Close the socket and free system resources and move on to next |
| // message. |
| const intptr_t old_mask = di->Mask(); |
| Dart_Port port = msg->dart_port; |
| if (port != ILLEGAL_PORT) { |
| di->RemovePort(port); |
| } |
| const intptr_t new_mask = di->Mask(); |
| UpdatePort(old_mask, di); |
| |
| LOG_INFO("\tCLOSE: %ld: %lx -> %lx\n", fd, old_mask, new_mask); |
| if (di->IsListeningSocket()) { |
| // We only close the socket file descriptor from the operating |
| // system if there are no other dart socket objects which |
| // are listening on the same (address, port) combination. |
| ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance(); |
| |
| MutexLocker locker(registry->mutex()); |
| |
| if (registry->CloseSafe(socket)) { |
| ASSERT(new_mask == 0); |
| socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); |
| di->Close(); |
| delete di; |
| socket->CloseFd(); |
| } |
| socket->SetClosedFd(); |
| } else { |
| ASSERT(new_mask == 0); |
| socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd)); |
| di->Close(); |
| delete di; |
| socket->CloseFd(); |
| } |
| if (port != 0) { |
| const bool success = DartUtils::PostInt32(port, 1 << kDestroyedEvent); |
| if (!success) { |
| LOG_INFO("Failed to post destroy event to port %ld\n", port); |
| } |
| } |
| } else if (IS_COMMAND(msg->data, kReturnTokenCommand)) { |
| const int count = TOKEN_COUNT(msg->data); |
| const intptr_t old_mask = di->Mask(); |
| LOG_INFO("\t Return Token: %ld: %lx\n", fd, old_mask); |
| di->ReturnTokens(msg->dart_port, count); |
| UpdatePort(old_mask, di); |
| } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) { |
| // `events` can only have kInEvent/kOutEvent flags set. |
| const intptr_t events = msg->data & EVENT_MASK; |
| ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent))); |
| |
| const intptr_t old_mask = di->Mask(); |
| LOG_INFO("\t Set Event Mask: %ld: %lx %lx\n", fd, old_mask, |
| msg->data & EVENT_MASK); |
| di->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK); |
| UpdatePort(old_mask, di); |
| } else { |
| UNREACHABLE(); |
| } |
| } |
| |
| void EventHandlerImplementation::HandlePacket(zx_port_packet_t* pkt) { |
| if (pkt->key == 0) { |
| LOG_ERR("HandlePacket called with pkt->key==0"); |
| return; |
| } |
| LOG_INFO("HandlePacket: Got event packet: key=%lx\n", pkt->key); |
| LOG_INFO("HandlePacket: Got event packet: type=%x\n", pkt->type); |
| LOG_INFO("HandlePacket: Got event packet: status=%d\n", pkt->status); |
| |
| if (pkt->type == ZX_PKT_TYPE_USER) { |
| ASSERT(pkt->key == kInterruptPacketKey); |
| InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt->user); |
| HandleInterrupt(msg); |
| return; |
| } |
| |
| if (pkt->type != ZX_PKT_TYPE_SIGNAL_ONE) { |
| LOG_ERR("HandlePacket: Got unexpected packet type: key=%x\n", pkt->type); |
| return; |
| } |
| |
| // Handle pkt->type == ZX_PKT_TYPE_SIGNAL_ONE |
| LOG_INFO("HandlePacket: Got event packet: observed = %x\n", |
| pkt->signal.observed); |
| LOG_INFO("HandlePacket: Got event packet: count = %ld\n", pkt->signal.count); |
| DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(pkt->key); |
| zx_signals_t observed = pkt->signal.observed; |
| const intptr_t old_mask = di->Mask(); |
| const uint32_t epoll_event = di->io_handle()->WaitEnd(observed); |
| intptr_t event_mask = IOHandle::EpollEventsToMask(epoll_event); |
| if ((event_mask & (1 << kErrorEvent)) != 0) { |
| di->NotifyAllDartPorts(event_mask); |
| } else if (event_mask != 0) { |
| event_mask = di->io_handle()->ToggleEvents(event_mask); |
| if (event_mask != 0) { |
| Dart_Port port = di->NextNotifyDartPort(event_mask); |
| ASSERT(port != 0); |
| bool success = DartUtils::PostInt32(port, event_mask); |
| if (!success) { |
| // This can happen if e.g. the isolate that owns the port has died |
| // for some reason. |
| LOG_INFO("Failed to post event to port %ld\n", port); |
| } |
| } |
| } |
| UpdatePort(old_mask, di); |
| } |
| |
| int64_t EventHandlerImplementation::GetTimeout() const { |
| if (!timeout_queue_.HasTimeout()) { |
| return kInfinityTimeout; |
| } |
| int64_t millis = |
| timeout_queue_.CurrentTimeout() - TimerUtils::GetCurrentMonotonicMillis(); |
| return (millis < 0) ? 0 : millis; |
| } |
| |
| void EventHandlerImplementation::HandleTimeout() { |
| if (timeout_queue_.HasTimeout()) { |
| int64_t millis = timeout_queue_.CurrentTimeout() - |
| TimerUtils::GetCurrentMonotonicMillis(); |
| if (millis <= 0) { |
| DartUtils::PostNull(timeout_queue_.CurrentPort()); |
| timeout_queue_.RemoveCurrent(); |
| } |
| } |
| } |
| |
| void EventHandlerImplementation::Poll(uword args) { |
| EventHandler* handler = reinterpret_cast<EventHandler*>(args); |
| EventHandlerImplementation* handler_impl = &handler->delegate_; |
| ASSERT(handler_impl != nullptr); |
| |
| zx_port_packet_t pkt; |
| while (!handler_impl->shutdown_) { |
| int64_t millis = handler_impl->GetTimeout(); |
| ASSERT((millis == kInfinityTimeout) || (millis >= 0)); |
| |
| LOG_INFO("zx_port_wait(millis = %ld)\n", millis); |
| zx_status_t status = zx_port_wait(handler_impl->port_handle_, |
| millis == kInfinityTimeout |
| ? ZX_TIME_INFINITE |
| : zx_deadline_after(ZX_MSEC(millis)), |
| &pkt); |
| if (status == ZX_ERR_TIMED_OUT) { |
| handler_impl->HandleTimeout(); |
| } else if (status != ZX_OK) { |
| FATAL("zx_port_wait failed: %s\n", zx_status_get_string(status)); |
| } else { |
| handler_impl->HandleTimeout(); |
| handler_impl->HandlePacket(&pkt); |
| } |
| } |
| DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0); |
| handler->NotifyShutdownDone(); |
| } |
| |
| void EventHandlerImplementation::Start(EventHandler* handler) { |
| Thread::Start("dart:io EventHandler", &EventHandlerImplementation::Poll, |
| reinterpret_cast<uword>(handler)); |
| } |
| |
| void EventHandlerImplementation::Shutdown() { |
| SendData(kShutdownId, 0, 0); |
| } |
| |
| void EventHandlerImplementation::SendData(intptr_t id, |
| Dart_Port dart_port, |
| int64_t data) { |
| WakeupHandler(id, dart_port, data); |
| } |
| |
| void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) { |
| // The hashmap does not support keys with value 0. |
| return reinterpret_cast<void*>(fd + 1); |
| } |
| |
| uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) { |
| // The hashmap does not support keys with value 0. |
| return dart::Utils::WordHash(fd + 1); |
| } |
| |
| } // namespace bin |
| } // namespace dart |
| |
| #endif // defined(DART_HOST_OS_FUCHSIA) |