diff --git a/CMakeLists.txt b/CMakeLists.txt index 4f42c3a..8a4a7ee 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,7 +4,10 @@ # cmake-format: on cmake_minimum_required(VERSION 3.23) -project(beman_net VERSION 0.0.0 LANGUAGES CXX) +project( + beman_net + VERSION 0.0.0 + LANGUAGES CXX) set(TARGET_NAME net) set(TARGET_PREFIX beman.${TARGET_NAME}) set(TARGET_LIBRARY beman_${TARGET_NAME}) @@ -13,13 +16,32 @@ set(TARGETS_EXPORT_NAME ${CMAKE_PROJECT_NAME}) set(CMAKE_CXX_STANDARD 23) +include(CheckIncludeFileCXX) +include(CheckCXXSourceCompiles) +check_include_file_cxx(sys/event.h NET_HAS_KQUEUE) +if(NET_HAS_KQUEUE) + add_compile_definitions(NET_HAS_KQUEUE) + check_cxx_source_compiles( + " + #include + #include + int main() { + struct kevent ev{::uintptr_t(), ::int16_t(), EV_DELETE, ::uint32_t(), ::intptr_t(), nullptr, {}}; + (void)ev; + } + " + NET_KEVENT_HAS_EXT) + if(NET_KEVENT_HAS_EXT) + add_compile_definitions(NET_KEVENT_HAS_EXT) + endif() +endif() + include(FetchContent) FetchContent_Declare( - execution - # for local development, use SOURCE_DIR /execution - GIT_REPOSITORY https://github.com/bemanproject/execution - GIT_TAG e9c3032 -) + execution + # for local development, use SOURCE_DIR /execution + GIT_REPOSITORY https://github.com/bemanproject/execution + GIT_TAG 9cd2e66ba312c4396ac222868cc0486891cfa30b) FetchContent_MakeAvailable(execution) include(CTest) diff --git a/include/beman/net/detail/basic_socket_acceptor.hpp b/include/beman/net/detail/basic_socket_acceptor.hpp index 2a6fb1c..191c5aa 100644 --- a/include/beman/net/detail/basic_socket_acceptor.hpp +++ b/include/beman/net/detail/basic_socket_acceptor.hpp @@ -4,6 +4,7 @@ #ifndef INCLUDED_BEMAN_NET_DETAIL_BASIC_SOCKET_ACCEPTOR #define INCLUDED_BEMAN_NET_DETAIL_BASIC_SOCKET_ACCEPTOR +#include #include #include #include diff --git a/include/beman/net/detail/container.hpp b/include/beman/net/detail/container.hpp index 9585fdb..7c7cb14 100644 --- a/include/beman/net/detail/container.hpp +++ b/include/beman/net/detail/container.hpp @@ -4,8 +4,11 @@ #ifndef INCLUDED_BEMAN_NET_DETAIL_CONTAINER #define INCLUDED_BEMAN_NET_DETAIL_CONTAINER +#include #include #include +#include +#include #include #include @@ -28,6 +31,10 @@ class beman::net::detail::container { auto insert(Record r) -> ::beman::net::detail::socket_id; auto erase(::beman::net::detail::socket_id id) -> void; auto operator[](::beman::net::detail::socket_id id) -> Record&; + // using iterator = decltype(records)::iterator; + // auto begin() -> iterator; + // auto end() -> iterator; + auto find(const Record& r) -> ::std::optional<::beman::net::detail::socket_id>; }; // ---------------------------------------------------------------------------- @@ -55,6 +62,19 @@ inline auto beman::net::detail::container::operator[](::beman::net::deta return ::std::get<1>(this->records[::std::size_t(id)]); } +template +inline auto ::beman::net::detail::container::find(const Record& r) + -> ::std::optional<::beman::net::detail::socket_id> { + auto it = ::std::find_if(records.begin(), records.end(), [&](auto& l) { + return ::std::holds_alternative(l) && ::std::get(l) == r; + }); + + if (it == records.end()) { + return {}; + } + return {::beman::net::detail::socket_id(::std::distance(records.begin(), it))}; +} + // ---------------------------------------------------------------------------- -#endif \ No newline at end of file +#endif diff --git a/include/beman/net/detail/io_context.hpp b/include/beman/net/detail/io_context.hpp index 811f0fe..e9cd54c 100644 --- a/include/beman/net/detail/io_context.hpp +++ b/include/beman/net/detail/io_context.hpp @@ -6,16 +6,17 @@ // ---------------------------------------------------------------------------- +#if defined(NET_HAS_KQUEUE) +#include +#endif #include #include #include #include #include -#include #include #include #include -#include #include #include @@ -29,8 +30,12 @@ class io_context; class beman::net::io_context { private: +#if defined(NET_HAS_KQUEUE) + ::std::unique_ptr<::beman::net::detail::context_base> d_owned{new ::beman::net::detail::kqueue_context()}; +#else ::std::unique_ptr<::beman::net::detail::context_base> d_owned{new ::beman::net::detail::poll_context()}; - ::beman::net::detail::context_base& d_context{*this->d_owned}; +#endif + ::beman::net::detail::context_base& d_context{*this->d_owned}; public: using scheduler_type = ::beman::net::detail::io_context_scheduler; diff --git a/include/beman/net/detail/kqueue_context.hpp b/include/beman/net/detail/kqueue_context.hpp new file mode 100644 index 0000000..99cc0a3 --- /dev/null +++ b/include/beman/net/detail/kqueue_context.hpp @@ -0,0 +1,408 @@ +// include/beman/net/detail/kqueue_context.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_BEMAN_NET_DETAIL_QUEUE_CONTEXT +#define INCLUDED_BEMAN_NET_DETAIL_QUEUE_CONTEXT + +#if defined(NET_HAS_KQUEUE) +// ---------------------------------------------------------------------------- + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace beman::net::detail { +struct kqueue_record; +struct kqueue_context; +} // namespace beman::net::detail + +// ---------------------------------------------------------------------------- + +struct beman::net::detail::kqueue_record final { + kqueue_record(::beman::net::detail::native_handle_type h) : handle(h) {} + ::beman::net::detail::native_handle_type handle; + bool blocking{true}; +}; + +// ---------------------------------------------------------------------------- + +struct beman::net::detail::kqueue_context final : ::beman::net::detail::context_base { + static constexpr size_t event_buffer_size = 10; + using time_t = ::std::chrono::system_clock::time_point; + using timer_node_t = ::beman::net::detail::context_base::resume_at_operation; + using event_key_t = ::std::tuple<::std::uintptr_t, ::std::int16_t>; + struct get_time { + auto operator()(auto* t) const -> time_t { return ::std::get<0>(*t); } + }; + using timer_priority_t = ::beman::net::detail::sorted_list, get_time>; + using kevent_t = struct ::kevent; + + struct io_event { + struct ::kevent event; + ::beman::net::detail::io_base* operation; + }; + + static constexpr ::std::array read_filter = {EVFILT_READ}; + static constexpr ::std::array write_filter = {EVFILT_WRITE}; + static constexpr ::std::array readwrite_filter = {EVFILT_READ, EVFILT_WRITE}; + + constexpr auto to_native_filter(::beman::net::event_type event_type) -> ::std::span { + + switch (event_type) { + case ::beman::net::event_type::in: + return {read_filter}; + case ::beman::net::event_type::out: + return {write_filter}; + case ::beman::net::event_type::in_out: + return {readwrite_filter}; + case ::beman::net::event_type::none: + return {}; + } + } + + ::beman::net::detail::container<::beman::net::detail::kqueue_record> d_sockets; + ::std::map> d_event; + ::beman::net::detail::container<::beman::net::detail::io_base*> d_outstanding; + timer_priority_t d_timeouts; + ::beman::net::detail::context_base::task* d_tasks{}; + const int d_queue = ::kqueue(); // TODO: is this a good practise to put it here? + + auto make_socket(int fd) -> ::beman::net::detail::socket_id override final { return this->d_sockets.insert(fd); } + auto make_socket(int d, int t, int p, ::std::error_code& error) -> ::beman::net::detail::socket_id override final { + int fd(::socket(d, t, p)); + if (fd < 0) { + error = ::std::error_code(errno, ::std::system_category()); + return ::beman::net::detail::socket_id::invalid; + } + return this->make_socket(fd); + } + auto release(::beman::net::detail::socket_id id, ::std::error_code& error) -> void override final { + ::beman::net::detail::native_handle_type handle(this->d_sockets[id].handle); + this->d_sockets.erase(id); + if (::close(handle) < 0) { + error = ::std::error_code(errno, ::std::system_category()); + } + } + auto native_handle(::beman::net::detail::socket_id id) -> ::beman::net::detail::native_handle_type override final { + return this->d_sockets[id].handle; + } + auto set_option(::beman::net::detail::socket_id id, + int level, + int name, + const void* data, + ::socklen_t size, + ::std::error_code& error) -> void override final { + if (::setsockopt(this->native_handle(id), level, name, data, size) < 0) { + error = ::std::error_code(errno, ::std::system_category()); + } + } + auto bind(::beman::net::detail::socket_id id, + const ::beman::net::detail::endpoint& endpoint, + ::std::error_code& error) -> void override final { + if (::bind(this->native_handle(id), endpoint.data(), endpoint.size()) < 0) { + error = ::std::error_code(errno, ::std::system_category()); + } + } + auto listen(::beman::net::detail::socket_id id, int no, ::std::error_code& error) -> void override final { + if (::listen(this->native_handle(id), no) < 0) { + error = ::std::error_code(errno, ::std::system_category()); + } + } + + auto process_task() -> ::std::size_t { + if (this->d_tasks) { + auto* tsk{this->d_tasks}; + this->d_tasks = tsk->next; + tsk->complete(); + return 1u; + } + return 0u; + } + auto process_timeout(const auto& now) -> ::std::size_t { + if (!this->d_timeouts.empty() && ::std::get<0>(*this->d_timeouts.front()) <= now) { + this->d_timeouts.pop_front()->complete(); + return 1u; + } + return 0u; + } + auto remove_outstanding(::beman::net::detail::socket_id outstanding_id) { + auto& completion = d_outstanding[outstanding_id]; + const auto native_handle = this->native_handle(completion->id); + const auto filters = this->to_native_filter(completion->event); + for (const auto f : filters) { + const event_key_t key{native_handle, f}; + auto event_it = d_event.find(key); + if (event_it == d_event.end()) { + continue; + } + auto& event_completions = event_it->second; + auto socket_it = ::std::find(event_completions.begin(), event_completions.end(), outstanding_id); + if (socket_it == event_completions.end()) { + continue; + } + event_completions.erase(socket_it); + + if (1 > event_completions.size()) { + kevent_t evt; + EV_SET(&evt, native_handle, f, EV_DELETE, 0, 0, nullptr); + kevent(d_queue, &evt, 1, nullptr, 0, NULL); + } + } + } + auto to_milliseconds(auto duration) -> int { + return int(::std::chrono::duration_cast<::std::chrono::milliseconds>(duration).count()); + } + auto run_one() -> ::std::size_t override final { + auto now{::std::chrono::system_clock::now()}; + if (0u < this->process_timeout(now) || 0 < this->process_task()) { + return 1u; + } + if (this->d_event.empty() && this->d_timeouts.empty()) { + return ::std::size_t{}; + } + auto next_time{this->d_timeouts.value_or(now)}; + ::std::array evt_buffer; + timespec timeout; + if (now != next_time) { + auto milliseconds = this->to_milliseconds(next_time - now); + timeout.tv_sec = milliseconds / 1000; + timeout.tv_nsec = 1000000 * (milliseconds % 1000); + } + + auto n = ::kevent( + this->d_queue, nullptr, 0, evt_buffer.data(), evt_buffer.size(), now == next_time ? nullptr : &timeout); + + ::std::size_t ncompleted{0}; + if (n < 0) { + // TODO: kevent error handling + return 0; + } + + while (0 < n) { + --n; + + auto buffer_idx = size_t(n); + + event_key_t evt_key{evt_buffer[buffer_idx].ident, evt_buffer[buffer_idx].filter}; + auto outstanding_evt = d_event.find(evt_key); + if (d_event.end() == outstanding_evt || outstanding_evt->second.size() == 0) { + kevent_t evt; + EV_SET(&evt, ::std::get<0>(evt_key), ::std::get<1>(evt_key), EV_DELETE, 0, 0, nullptr); + kevent(d_queue, &evt, 1, nullptr, 0, nullptr); + } + + auto completion = d_outstanding[outstanding_evt->second.back()]; + + this->remove_outstanding(outstanding_evt->second.back()); + completion->work(*this, completion); + ++ncompleted; + } + + return ncompleted; + } + auto wakeup() -> void { + //-dk:TODO wake-up polling thread + } + + auto add_outstanding(::beman::net::detail::io_base* completion) -> ::beman::net::detail::submit_result { + auto id{completion->id}; + if (this->d_sockets[id].blocking || + completion->work(*this, completion) == ::beman::net::detail::submit_result::submit) { + const auto native_handle = this->native_handle(completion->id); + auto filters = to_native_filter(completion->event); + auto outstanding_id = d_outstanding.insert(completion); + for (const auto f : filters) { + const event_key_t key{native_handle, f}; + d_event[key].emplace_back(outstanding_id); + kevent_t evt; + EV_SET(&evt, ::std::get<0>(key), ::std::get<1>(key), EV_ADD, 0, 0, nullptr); + kevent(d_queue, &evt, 1, nullptr, 0, NULL); + } + this->wakeup(); + return ::beman::net::detail::submit_result::submit; + } + return ::beman::net::detail::submit_result::ready; + } + + auto cancel(::beman::net::detail::io_base* cancel_op, ::beman::net::detail::io_base* op) -> void override final { + auto f = this->d_outstanding.find(op); + + if (f) { + this->remove_outstanding(*f); + op->cancel(); + cancel_op->cancel(); + } else if (this->d_timeouts.erase(op)) { + op->cancel(); + cancel_op->cancel(); + } else { + ::std::cerr << "ERROR: kqueue_context::cancel(): entity not cancelled!\n"; + } + } + auto schedule(::beman::net::detail::context_base::task* tsk) -> void override { + tsk->next = this->d_tasks; + this->d_tasks = tsk; + } + auto accept(::beman::net::detail::context_base::accept_operation* completion) + -> ::beman::net::detail::submit_result override final { + completion->work = [](::beman::net::detail::context_base& ctxt, ::beman::net::detail::io_base* comp) { + auto id{comp->id}; + auto& cmp(*static_cast(comp)); + + while (true) { + int rc = ::accept(ctxt.native_handle(id), ::std::get<0>(cmp).data(), &::std::get<1>(cmp)); + if (0 <= rc) { + ::std::get<2>(cmp) = ctxt.make_socket(rc); + cmp.complete(); + return ::beman::net::detail::submit_result::ready; + } else { + switch (errno) { + default: + cmp.error(::std::error_code(errno, ::std::system_category())); + return ::beman::net::detail::submit_result::error; + case EINTR: + break; + case EWOULDBLOCK: + return ::beman::net::detail::submit_result::submit; + } + } + } + }; + return this->add_outstanding(completion); + } + auto connect(::beman::net::detail::context_base::connect_operation* op) + -> ::beman::net::detail::submit_result override { + auto handle{this->native_handle(op->id)}; + const auto& endpoint(::std::get<0>(*op)); + if (-1 == ::fcntl(handle, F_SETFL, O_NONBLOCK)) { + op->error(::std::error_code(errno, ::std::system_category())); + return ::beman::net::detail::submit_result::error; + } + if (0 == ::connect(handle, endpoint.data(), endpoint.size())) { + op->complete(); + return ::beman::net::detail::submit_result::ready; + } + switch (errno) { + default: + op->error(::std::error_code(errno, ::std::system_category())); + return ::beman::net::detail::submit_result::error; + case EINPROGRESS: + case EINTR: + break; + } + + op->context = this; + op->work = [](::beman::net::detail::context_base& ctxt, ::beman::net::detail::io_base* o) { + auto hndl{ctxt.native_handle(o->id)}; + + int error{}; + ::socklen_t len{sizeof(error)}; + if (-1 == ::getsockopt(hndl, SOL_SOCKET, SO_ERROR, &error, &len)) { + o->error(::std::error_code(errno, ::std::system_category())); + return ::beman::net::detail::submit_result::error; + } + if (0 == error) { + o->complete(); + return ::beman::net::detail::submit_result::ready; + } else { + o->error(::std::error_code(error, ::std::system_category())); + return ::beman::net::detail::submit_result::error; + } + }; + + return this->add_outstanding(op); + } + auto receive(::beman::net::detail::context_base::receive_operation* op) + -> ::beman::net::detail::submit_result override { + op->context = this; + op->work = [](::beman::net::detail::context_base& ctxt, ::beman::net::detail::io_base* o) { + auto& completion(*static_cast(o)); + while (true) { + auto rc{::recvmsg(ctxt.native_handle(o->id), &::std::get<0>(completion), ::std::get<1>(completion))}; + if (0 <= rc) { + ::std::get<2>(completion) = ::std::size_t(rc); + completion.complete(); + return ::beman::net::detail::submit_result::ready; + } else + switch (errno) { + default: + completion.error(::std::error_code(errno, ::std::system_category())); + return ::beman::net::detail::submit_result::error; + case ECONNRESET: + case EPIPE: + ::std::get<2>(completion) = 0u; + completion.complete(); + return ::beman::net::detail::submit_result::ready; + case EINTR: + break; + case EWOULDBLOCK: + return ::beman::net::detail::submit_result::submit; + } + } + }; + return this->add_outstanding(op); + } + auto send(::beman::net::detail::context_base::send_operation* op) -> ::beman::net::detail::submit_result override { + op->context = this; + op->work = [](::beman::net::detail::context_base& ctxt, ::beman::net::detail::io_base* o) { + auto& completion(*static_cast(o)); + + while (true) { + auto rc{::sendmsg(ctxt.native_handle(o->id), &::std::get<0>(completion), ::std::get<1>(completion))}; + if (0 <= rc) { + ::std::get<2>(completion) = ::std::size_t(rc); + completion.complete(); + return ::beman::net::detail::submit_result::ready; + } else + switch (errno) { + default: + completion.error(::std::error_code(errno, ::std::system_category())); + return ::beman::net::detail::submit_result::error; + case ECONNRESET: + case EPIPE: + ::std::get<2>(completion) = 0u; + completion.complete(); + return ::beman::net::detail::submit_result::ready; + case EINTR: + break; + case EWOULDBLOCK: + return ::beman::net::detail::submit_result::submit; + } + } + }; + return this->add_outstanding(op); + } + auto resume_at(::beman::net::detail::context_base::resume_at_operation* op) + -> ::beman::net::detail::submit_result override { + if (::std::chrono::system_clock::now() < ::std::get<0>(*op)) { + this->d_timeouts.insert(op); + return ::beman::net::detail::submit_result::submit; + } else { + op->complete(); + return ::beman::net::detail::submit_result::ready; + } + } +}; + +// ---------------------------------------------------------------------------- + +#endif +#endif diff --git a/include/beman/net/detail/operations.hpp b/include/beman/net/detail/operations.hpp index a6ac80b..b64cced 100644 --- a/include/beman/net/detail/operations.hpp +++ b/include/beman/net/detail/operations.hpp @@ -9,6 +9,7 @@ #include #include #include +#include // TODO: hard dependency with poll // ---------------------------------------------------------------------------- diff --git a/include/beman/net/net.hpp b/include/beman/net/net.hpp index 0566f62..ad79b47 100644 --- a/include/beman/net/net.hpp +++ b/include/beman/net/net.hpp @@ -20,6 +20,9 @@ #include #include #include +#if defined(NET_HAS_KQUEUE) +#include +#endif #include #include #include diff --git a/src/beman/net/CMakeLists.txt b/src/beman/net/CMakeLists.txt index f67869a..bb7b63e 100644 --- a/src/beman/net/CMakeLists.txt +++ b/src/beman/net/CMakeLists.txt @@ -33,6 +33,7 @@ target_sources(${TARGET_LIBRARY} ${PROJECT_SOURCE_DIR}/include/beman/${TARGET_NAME}/detail/netfwd.hpp ${PROJECT_SOURCE_DIR}/include/beman/${TARGET_NAME}/detail/operations.hpp ${PROJECT_SOURCE_DIR}/include/beman/${TARGET_NAME}/detail/poll_context.hpp + ${PROJECT_SOURCE_DIR}/include/beman/${TARGET_NAME}/detail/kqueue_context.hpp ${PROJECT_SOURCE_DIR}/include/beman/${TARGET_NAME}/detail/sender.hpp ${PROJECT_SOURCE_DIR}/include/beman/${TARGET_NAME}/detail/socket_base.hpp ${PROJECT_SOURCE_DIR}/include/beman/${TARGET_NAME}/detail/socket_category.hpp diff --git a/src/beman/net/net.cpp b/src/beman/net/net.cpp index 80f25c5..3a6009d 100644 --- a/src/beman/net/net.cpp +++ b/src/beman/net/net.cpp @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception // ---------------------------------------------------------------------------- +#include namespace beman::net { int version{000};