Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,24 @@ elseif (CMAKE_SYSTEM_NAME STREQUAL "FreeBSD" OR CMAKE_SYSTEM_NAME STREQUAL "NetB

set(EVENT_LOOP_DEFINE "KQUEUE")
set(USE_S2N ON)
elseif (CMAKE_SYSTEM_NAME STREQUAL "Emscripten")
option(USE_VSOCK
"Build in support for VSOCK sockets"
OFF)

file(GLOB AWS_IO_OS_HEADERS
)

file(GLOB AWS_IO_OS_SRC
"source/linux/*.c"
"source/posix/*.c"
)
set(PLATFORM_LIBS "")

set(EVENT_LOOP_DEFINE "POLL")
set(USE_S2N ON)
else ()
message(FATAL_ERROR "Unknown platform!")
endif()

if (BYO_CRYPTO)
Expand Down
104 changes: 95 additions & 9 deletions source/linux/epoll_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

#include <aws/io/logging.h>

#include <sys/epoll.h>
#if AWS_USE_EPOLL
# include <sys/epoll.h>
#else
# include <poll.h>
#endif

#include <errno.h>
#include <limits.h>
Expand Down Expand Up @@ -62,6 +66,12 @@ static bool s_is_on_callers_thread(struct aws_event_loop *event_loop);

static void aws_event_loop_thread(void *args);

/* default timeout is 100 seconds */
enum {
DEFAULT_TIMEOUT = 100 * 1000,
MAX_EVENTS = 100,
};

static struct aws_event_loop_vtable s_vtable = {
.destroy = s_destroy,
.run = s_run,
Expand All @@ -88,7 +98,13 @@ struct epoll_loop {
struct aws_linked_list task_pre_queue;
struct aws_task stop_task;
struct aws_atomic_var stop_task_ptr;
#if AWS_USE_EPOLL
int epoll_fd;
#else
struct pollfd fds[MAX_EVENTS];
int fds_cnt;
struct aws_hash_table events;
#endif
bool should_process_task_pre_queue;
bool should_continue;
};
Expand All @@ -102,12 +118,6 @@ struct epoll_event_data {
bool is_subscribed; /* false when handle is unsubscribed, but this struct hasn't been cleaned up yet */
};

/* default timeout is 100 seconds */
enum {
DEFAULT_TIMEOUT = 100 * 1000,
MAX_EVENTS = 100,
};

int aws_open_nonblocking_posix_pipe(int pipe_fds[2]);

/* Setup edge triggered epoll with a scheduler. */
Expand Down Expand Up @@ -145,12 +155,18 @@ struct aws_event_loop *aws_event_loop_new_default_with_options(
epoll_loop->task_pre_queue_mutex = (struct aws_mutex)AWS_MUTEX_INIT;
aws_atomic_init_ptr(&epoll_loop->stop_task_ptr, NULL);

#if AWS_USE_EPOLL
epoll_loop->epoll_fd = epoll_create(100);

if (epoll_loop->epoll_fd < 0) {
AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to open epoll handle.", (void *)loop);
aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
goto clean_up_epoll;
}
#else
epoll_loop->fds_cnt = 0;
aws_hash_table_init(&epoll_loop->events, alloc, 20, aws_hash_ptr, aws_ptr_eq, NULL, NULL);
#endif

if (aws_thread_init(&epoll_loop->thread_created_on, alloc)) {
goto clean_up_epoll;
Expand Down Expand Up @@ -213,9 +229,11 @@ struct aws_event_loop *aws_event_loop_new_default_with_options(
aws_thread_clean_up(&epoll_loop->thread_created_on);

clean_up_epoll:
#if AWS_USE_EPOLL
if (epoll_loop->epoll_fd >= 0) {
close(epoll_loop->epoll_fd);
}
#endif

aws_mem_release(alloc, epoll_loop);

Expand Down Expand Up @@ -259,7 +277,10 @@ static void s_destroy(struct aws_event_loop *event_loop) {
close(epoll_loop->write_task_handle.data.fd);
#endif

#if AWS_USE_EPOLL
close(epoll_loop->epoll_fd);
#endif

aws_mem_release(event_loop->alloc, epoll_loop);
aws_event_loop_clean_up_base(event_loop);
aws_mem_release(event_loop->alloc, event_loop);
Expand Down Expand Up @@ -392,7 +413,6 @@ static int s_subscribe_to_io_events(
int events,
aws_event_loop_on_event_fn *on_event,
void *user_data) {

AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: subscribing to events on fd %d", (void *)event_loop, handle->data.fd);
struct epoll_event_data *epoll_event_data = aws_mem_calloc(event_loop->alloc, 1, sizeof(struct epoll_event_data));
handle->additional_data = epoll_event_data;
Expand All @@ -407,6 +427,7 @@ static int s_subscribe_to_io_events(
epoll_event_data->on_event = on_event;
epoll_event_data->is_subscribed = true;

#if AWS_USE_EPOLL
/*everyone is always registered for edge-triggered, hang up, remote hang up, errors. */
uint32_t event_mask = EPOLLET | EPOLLHUP | EPOLLRDHUP | EPOLLERR;

Expand All @@ -431,6 +452,23 @@ static int s_subscribe_to_io_events(
aws_mem_release(event_loop->alloc, epoll_event_data);
return aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
}
#else
/*everyone is always registered for hang up, remote hang up, errors. */
short event_mask = POLLHUP | POLLRDHUP | POLLERR;

if (events & AWS_IO_EVENT_TYPE_READABLE) {
event_mask |= POLLIN;
}

if (events & AWS_IO_EVENT_TYPE_WRITABLE) {
event_mask |= POLLOUT;
}

epoll_loop->fds[epoll_loop->fds_cnt].fd = handle->data.fd;
epoll_loop->fds[epoll_loop->fds_cnt++].events = event_mask;

aws_hash_table_put(&epoll_loop->events, &handle->data.fd, &epoll_event_data, NULL);
#endif

return AWS_OP_SUCCESS;
}
Expand All @@ -455,6 +493,7 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc
AWS_ASSERT(handle->additional_data);
struct epoll_event_data *additional_handle_data = handle->additional_data;

#if AWS_USE_EPOLL
struct epoll_event dummy_event;

if (AWS_UNLIKELY(epoll_ctl(epoll_loop->epoll_fd, EPOLL_CTL_DEL, handle->data.fd, &dummy_event /*ignored*/))) {
Expand All @@ -465,6 +504,9 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc
handle->data.fd);
return aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
}
#else
aws_hash_table_remove(&epoll_loop->events, &handle->data.fd, NULL, NULL);
#endif

/* We can't clean up yet, because we have schedule tasks and more events to process,
* mark it as unsubscribed and schedule a cleanup task. */
Expand All @@ -478,6 +520,7 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc
s_schedule_task_now(event_loop, &additional_handle_data->cleanup_task);

handle->additional_data = NULL;

return AWS_OP_SUCCESS;
}

Expand Down Expand Up @@ -558,8 +601,13 @@ static void s_process_task_pre_queue(struct aws_event_loop *event_loop) {
* that it's doing nothing on purpose. It's waiting for events to happen...
*/
AWS_NO_INLINE
#if AWS_USE_EPOLL
static int aws_event_loop_listen_for_io_events(int epoll_fd, struct epoll_event events[MAX_EVENTS], int timeout) {
return epoll_wait(epoll_fd, events, MAX_EVENTS, timeout);
#else
static int aws_event_loop_listen_for_io_events(struct pollfd *fds, nfds_t nfds, int timeout) {
return poll(fds, nfds, timeout);
#endif
}

static void aws_event_loop_thread(void *args) {
Expand All @@ -578,6 +626,7 @@ static void aws_event_loop_thread(void *args) {

int timeout = DEFAULT_TIMEOUT;

#if AWS_USE_EPOLL
struct epoll_event events[MAX_EVENTS];

AWS_LOGF_INFO(
Expand All @@ -586,6 +635,7 @@ static void aws_event_loop_thread(void *args) {
(void *)event_loop,
timeout,
MAX_EVENTS);
#endif

/*
* until stop is called,
Expand All @@ -601,32 +651,68 @@ static void aws_event_loop_thread(void *args) {
while (epoll_loop->should_continue) {

AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: waiting for a maximum of %d ms", (void *)event_loop, timeout);
int event_count = aws_event_loop_listen_for_io_events(epoll_loop->epoll_fd, events, timeout);
int event_count =
#if AWS_USE_EPOLL
aws_event_loop_listen_for_io_events(epoll_loop->epoll_fd, events, timeout);
#else
aws_event_loop_listen_for_io_events(epoll_loop->fds, epoll_loop->fds_cnt, timeout);
#endif
aws_event_loop_register_tick_start(event_loop);

AWS_LOGF_TRACE(
AWS_LS_IO_EVENT_LOOP, "id=%p: wake up with %d events to process.", (void *)event_loop, event_count);
#if AWS_USE_EPOLL
for (int i = 0; i < event_count; ++i) {
struct epoll_event_data *event_data = (struct epoll_event_data *)events[i].data.ptr;
#else
for (int i = 0; i < epoll_loop->fds_cnt; ++i) {
if (!epoll_loop->fds[i].revents) {
continue;
}

struct aws_hash_element *object = NULL;
aws_hash_table_find(&epoll_loop->events, &epoll_loop->fds[i].fd, &object);
struct epoll_event_data *event_data = object->value;
#endif

int event_mask = 0;
#if AWS_USE_EPOLL
if (events[i].events & EPOLLIN) {
#else
if (epoll_loop->fds[i].revents & POLLIN) {
#endif
event_mask |= AWS_IO_EVENT_TYPE_READABLE;
}

#if AWS_USE_EPOLL
if (events[i].events & EPOLLOUT) {
#else
if (epoll_loop->fds[i].revents & POLLOUT) {
#endif
event_mask |= AWS_IO_EVENT_TYPE_WRITABLE;
}

#if AWS_USE_EPOLL
if (events[i].events & EPOLLRDHUP) {
#else
if (epoll_loop->fds[i].revents & POLLRDHUP) {
#endif
event_mask |= AWS_IO_EVENT_TYPE_REMOTE_HANG_UP;
}

#if AWS_USE_EPOLL
if (events[i].events & EPOLLHUP) {
#else
if (epoll_loop->fds[i].revents & POLLHUP) {
#endif
event_mask |= AWS_IO_EVENT_TYPE_CLOSED;
}

#if AWS_USE_EPOLL
if (events[i].events & EPOLLERR) {
#else
if (epoll_loop->fds[i].revents & POLLERR) {
#endif
event_mask |= AWS_IO_EVENT_TYPE_ERROR;
}

Expand Down