blob: 90f94b41be3fd8c69a3a82ffa70e3beafa0daf9e [file] [log] [blame]
// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
#include "platform/globals.h"
#if defined(TARGET_OS_ANDROID)
#include "bin/eventhandler.h"
#include <errno.h> // NOLINT
#include <pthread.h> // NOLINT
#include <stdio.h> // NOLINT
#include <string.h> // NOLINT
#include <sys/epoll.h> // NOLINT
#include <sys/stat.h> // NOLINT
#include <unistd.h> // NOLINT
#include <fcntl.h> // NOLINT
#include "bin/dartutils.h"
#include "bin/fdutils.h"
#include "bin/log.h"
#include "bin/utils.h"
#include "platform/hashmap.h"
#include "platform/thread.h"
#include "platform/utils.h"
namespace dart {
namespace bin {
static const int kInterruptMessageSize = sizeof(InterruptMessage);
static const int kInfinityTimeout = -1;
static const int kTimerId = -1;
static const int kShutdownId = -2;
intptr_t SocketData::GetPollEvents() {
// Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
// triggered anyway.
intptr_t events = 0;
if (!IsClosedRead()) {
if ((mask_ & (1 << kInEvent)) != 0) {
events |= EPOLLIN;
}
}
if (!IsClosedWrite()) {
if ((mask_ & (1 << kOutEvent)) != 0) {
events |= EPOLLOUT;
}
}
return events;
}
// Unregister the file descriptor for a SocketData structure with epoll.
static void RemoveFromEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
if (sd->tracked_by_epoll()) {
int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
EPOLL_CTL_DEL,
sd->fd(),
NULL));
if (status == -1) {
FATAL("Failed unregistering events for file descriptor");
}
sd->set_tracked_by_epoll(false);
}
}
// Register the file descriptor for a SocketData structure with epoll
// if events are requested.
static void UpdateEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
struct epoll_event event;
event.events = sd->GetPollEvents();
event.data.ptr = sd;
if (sd->port() != 0 && event.events != 0) {
int status = 0;
if (sd->tracked_by_epoll()) {
status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
EPOLL_CTL_MOD,
sd->fd(),
&event));
} else {
status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
EPOLL_CTL_ADD,
sd->fd(),
&event));
sd->set_tracked_by_epoll(true);
}
if (status == -1) {
const int kBufferSize = 1024;
char error_message[kBufferSize];
strerror_r(errno, error_message, kBufferSize);
FATAL1("Failed updating epoll instance: %s", error_message);
}
}
}
EventHandlerImplementation::EventHandlerImplementation()
: socket_map_(&HashMap::SamePointerValue, 16) {
intptr_t result;
result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_));
if (result != 0) {
FATAL("Pipe creation failed");
}
FDUtils::SetNonBlocking(interrupt_fds_[0]);
FDUtils::SetCloseOnExec(interrupt_fds_[0]);
FDUtils::SetCloseOnExec(interrupt_fds_[1]);
shutdown_ = false;
// The initial size passed to epoll_create is ignore on newer (>=
// 2.6.8) Linux versions
static const int kEpollInitialSize = 64;
epoll_fd_ = TEMP_FAILURE_RETRY(epoll_create(kEpollInitialSize));
if (epoll_fd_ == -1) {
FATAL("Failed creating epoll file descriptor");
}
FDUtils::SetCloseOnExec(epoll_fd_);
// Register the interrupt_fd with the epoll instance.
struct epoll_event event;
event.events = EPOLLIN;
event.data.ptr = NULL;
int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
EPOLL_CTL_ADD,
interrupt_fds_[0],
&event));
if (status == -1) {
FATAL("Failed adding interrupt fd to epoll instance");
}
}
EventHandlerImplementation::~EventHandlerImplementation() {
TEMP_FAILURE_RETRY(close(interrupt_fds_[0]));
TEMP_FAILURE_RETRY(close(interrupt_fds_[1]));
}
SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) {
ASSERT(fd >= 0);
HashMap::Entry* entry = socket_map_.Lookup(
GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
ASSERT(entry != NULL);
SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
if (sd == NULL) {
// If there is no data in the hash map for this file descriptor a
// new SocketData for the file descriptor is inserted.
sd = new SocketData(fd);
entry->value = sd;
}
ASSERT(fd == sd->fd());
return sd;
}
void EventHandlerImplementation::WakeupHandler(intptr_t id,
Dart_Port dart_port,
int64_t data) {
InterruptMessage msg;
msg.id = id;
msg.dart_port = dart_port;
msg.data = data;
intptr_t result =
FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
if (result != kInterruptMessageSize) {
if (result == -1) {
perror("Interrupt message failure:");
}
FATAL1("Interrupt message failure. Wrote %d bytes.", result);
}
}
bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) {
char* dst = reinterpret_cast<char*>(msg);
int total_read = 0;
int bytes_read =
TEMP_FAILURE_RETRY(read(interrupt_fds_[0], dst, kInterruptMessageSize));
if (bytes_read < 0) {
return false;
}
total_read = bytes_read;
while (total_read < kInterruptMessageSize) {
bytes_read = TEMP_FAILURE_RETRY(read(interrupt_fds_[0],
dst + total_read,
kInterruptMessageSize - total_read));
if (bytes_read > 0) {
total_read = total_read + bytes_read;
}
}
return (total_read == kInterruptMessageSize) ? true : false;
}
void EventHandlerImplementation::HandleInterruptFd() {
InterruptMessage msg;
while (GetInterruptMessage(&msg)) {
if (msg.id == kTimerId) {
timeout_queue_.UpdateTimeout(msg.dart_port, msg.data);
} else if (msg.id == kShutdownId) {
shutdown_ = true;
} else {
SocketData* sd = GetSocketData(msg.id);
if ((msg.data & (1 << kShutdownReadCommand)) != 0) {
ASSERT(msg.data == (1 << kShutdownReadCommand));
// Close the socket for reading.
sd->ShutdownRead();
UpdateEpollInstance(epoll_fd_, sd);
} else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) {
ASSERT(msg.data == (1 << kShutdownWriteCommand));
// Close the socket for writing.
sd->ShutdownWrite();
UpdateEpollInstance(epoll_fd_, sd);
} else if ((msg.data & (1 << kCloseCommand)) != 0) {
ASSERT(msg.data == (1 << kCloseCommand));
// Close the socket and free system resources and move on to
// next message.
RemoveFromEpollInstance(epoll_fd_, sd);
intptr_t fd = sd->fd();
if (fd == STDOUT_FILENO) {
// If stdout, redirect fd to /dev/null.
int null_fd = TEMP_FAILURE_RETRY(open("/dev/null", O_WRONLY));
ASSERT(null_fd >= 0);
VOID_TEMP_FAILURE_RETRY(dup2(null_fd, STDOUT_FILENO));
VOID_TEMP_FAILURE_RETRY(close(null_fd));
} else {
sd->Close();
}
socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
delete sd;
DartUtils::PostInt32(msg.dart_port, 1 << kDestroyedEvent);
} else {
// Setup events to wait for.
sd->SetPortAndMask(msg.dart_port, msg.data);
UpdateEpollInstance(epoll_fd_, sd);
}
}
}
}
#ifdef DEBUG_POLL
static void PrintEventMask(intptr_t fd, intptr_t events) {
Log::Print("%d ", fd);
if ((events & EPOLLIN) != 0) Log::Print("EPOLLIN ");
if ((events & EPOLLPRI) != 0) Log::Print("EPOLLPRI ");
if ((events & EPOLLOUT) != 0) Log::Print("EPOLLOUT ");
if ((events & EPOLLERR) != 0) Log::Print("EPOLLERR ");
if ((events & EPOLLHUP) != 0) Log::Print("EPOLLHUP ");
if ((events & EPOLLRDHUP) != 0) Log::Print("EPOLLRDHUP ");
int all_events = EPOLLIN | EPOLLPRI | EPOLLOUT |
EPOLLERR | EPOLLHUP | EPOLLRDHUP;
if ((events & ~all_events) != 0) {
Log::Print("(and %08x) ", events & ~all_events);
}
Log::Print("(available %d) ", FDUtils::AvailableBytes(fd));
Log::Print("\n");
}
#endif
intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events,
SocketData* sd) {
#ifdef DEBUG_POLL
PrintEventMask(sd->fd(), events);
#endif
intptr_t event_mask = 0;
if (sd->IsListeningSocket()) {
// For listening sockets the EPOLLIN event indicate that there are
// connections ready for accept unless accompanied with one of the
// other flags.
if ((events & EPOLLIN) != 0) {
if ((events & EPOLLHUP) != 0) event_mask |= (1 << kCloseEvent);
if ((events & EPOLLERR) != 0) event_mask |= (1 << kErrorEvent);
if (event_mask == 0) event_mask |= (1 << kInEvent);
}
} else {
// Prioritize data events over close and error events.
if ((events & EPOLLIN) != 0) {
if (FDUtils::AvailableBytes(sd->fd()) != 0) {
event_mask = (1 << kInEvent);
} else if ((events & EPOLLHUP) != 0) {
// If both EPOLLHUP and EPOLLERR are reported treat it as an
// error.
if ((events & EPOLLERR) != 0) {
event_mask = (1 << kErrorEvent);
} else {
event_mask = (1 << kCloseEvent);
}
sd->MarkClosedRead();
} else if ((events & EPOLLERR) != 0) {
event_mask = (1 << kErrorEvent);
} else {
if (sd->IsPipe()) {
// When reading from stdin (either from a terminal or piped
// input) treat EPOLLIN with 0 available bytes as
// end-of-file.
if (sd->fd() == STDIN_FILENO) {
event_mask = (1 << kCloseEvent);
sd->MarkClosedRead();
}
} else {
// If EPOLLIN is set with no available data and no EPOLLHUP use
// recv to peek for whether the other end of the socket
// actually closed.
char buffer;
ssize_t bytesPeeked =
TEMP_FAILURE_RETRY(recv(sd->fd(), &buffer, 1, MSG_PEEK));
ASSERT(EAGAIN == EWOULDBLOCK);
if (bytesPeeked == 0) {
event_mask = (1 << kCloseEvent);
sd->MarkClosedRead();
} else if (errno != EWOULDBLOCK) {
const int kBufferSize = 1024;
char error_message[kBufferSize];
strerror_r(errno, error_message, kBufferSize);
Log::PrintErr("Error recv: %s\n", error_message);
}
}
}
}
// On pipes EPOLLHUP is reported without EPOLLIN when there is no
// more data to read.
if (sd->IsPipe()) {
if (((events & EPOLLIN) == 0) &&
((events & EPOLLHUP) != 0)) {
event_mask = (1 << kCloseEvent);
sd->MarkClosedRead();
}
}
if ((events & EPOLLOUT) != 0) {
if ((events & EPOLLERR) != 0) {
event_mask = (1 << kErrorEvent);
sd->MarkClosedWrite();
} else {
event_mask |= (1 << kOutEvent);
}
}
}
return event_mask;
}
void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
int size) {
for (int i = 0; i < size; i++) {
if (events[i].data.ptr != NULL) {
SocketData* sd = reinterpret_cast<SocketData*>(events[i].data.ptr);
intptr_t event_mask = GetPollEvents(events[i].events, sd);
if (event_mask != 0) {
// Unregister events for the file descriptor. Events will be
// registered again when the current event has been handled in
// Dart code.
RemoveFromEpollInstance(epoll_fd_, sd);
Dart_Port port = sd->port();
ASSERT(port != 0);
DartUtils::PostInt32(port, event_mask);
}
}
}
HandleInterruptFd();
}
int64_t EventHandlerImplementation::GetTimeout() {
if (!timeout_queue_.HasTimeout()) {
return kInfinityTimeout;
}
int64_t millis = timeout_queue_.CurrentTimeout() -
TimerUtils::GetCurrentTimeMilliseconds();
return (millis < 0) ? 0 : millis;
}
void EventHandlerImplementation::HandleTimeout() {
if (timeout_queue_.HasTimeout()) {
int64_t millis = timeout_queue_.CurrentTimeout() -
TimerUtils::GetCurrentTimeMilliseconds();
if (millis <= 0) {
DartUtils::PostNull(timeout_queue_.CurrentPort());
timeout_queue_.RemoveCurrent();
}
}
}
void EventHandlerImplementation::Poll(uword args) {
static const intptr_t kMaxEvents = 16;
struct epoll_event events[kMaxEvents];
EventHandlerImplementation* handler =
reinterpret_cast<EventHandlerImplementation*>(args);
ASSERT(handler != NULL);
while (!handler->shutdown_) {
int64_t millis = handler->GetTimeout();
ASSERT(millis == kInfinityTimeout || millis >= 0);
if (millis > kMaxInt32) millis = kMaxInt32;
intptr_t result = TEMP_FAILURE_RETRY(epoll_wait(handler->epoll_fd_,
events,
kMaxEvents,
millis));
ASSERT(EAGAIN == EWOULDBLOCK);
if (result == -1) {
if (errno != EWOULDBLOCK) {
perror("Poll failed");
}
} else {
handler->HandleTimeout();
handler->HandleEvents(events, result);
}
}
}
void EventHandlerImplementation::Start(EventHandler* handler) {
int result = dart::Thread::Start(&EventHandlerImplementation::Poll,
reinterpret_cast<uword>(handler));
if (result != 0) {
FATAL1("Failed to start event handler thread %d", result);
}
}
void EventHandlerImplementation::Shutdown() {
SendData(kShutdownId, 0, 0);
}
void EventHandlerImplementation::SendData(intptr_t id,
Dart_Port dart_port,
intptr_t data) {
WakeupHandler(id, dart_port, data);
}
void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
// The hashmap does not support keys with value 0.
return reinterpret_cast<void*>(fd + 1);
}
uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
// The hashmap does not support keys with value 0.
return dart::Utils::WordHash(fd + 1);
}
} // namespace bin
} // namespace dart
#endif // defined(TARGET_OS_ANDROID)