diff --git a/src/fd.cpp b/src/fd.cpp index 0a74ba4b..42119e33 100644 --- a/src/fd.cpp +++ b/src/fd.cpp @@ -4,8 +4,8 @@ #include #include #include -#include "tinycthread.h" #include "later.h" +#include "threadutils.h" #include "callback_registry_table.h" class ThreadArgs { @@ -81,6 +81,57 @@ class ThreadArgs { }; +static Mutex mtx(tct_mtx_plain); +static ConditionVariable cv(mtx); +static int busy = 0; +static std::unique_ptr> threadargs = nullptr; + +static int wait_thread_persistent(void *arg); + +class PersistentThread { + tct_thrd_t thr = 0; + +public: + PersistentThread() { + if (tct_thrd_create(&thr, &wait_thread_persistent, NULL) != tct_thrd_success) + throw std::runtime_error("Thread creation failed."); + } + ~PersistentThread() { + Guard guard(&mtx); + if (threadargs != nullptr) { + (*threadargs)->active->store(false); + } + busy = -1; + cv.broadcast(); + } + +}; + +static int wait_for_signal() { + Guard guard(&mtx); + while (!busy) + cv.wait(); + return busy; +} + +static int submit_wait(std::shared_ptr args) { + Guard guard(&mtx); + if (busy) + return busy; + threadargs.reset(new std::shared_ptr(args)); + busy = 1; + cv.broadcast(); + return 0; +} + +static int wait_done() { + Guard guard(&mtx); + threadargs.reset(); + int ret = busy; + busy = 0; + return ret; +} + static void later_callback(void *arg) { ASSERT_MAIN_THREAD() @@ -104,15 +155,7 @@ static void later_callback(void *arg) { } // CONSIDER: if necessary to add method for HANDLES on Windows. Would be different code to SOCKETs. -// TODO: implement re-usable background thread. -static int wait_thread(void *arg) { - - tct_thrd_detach(tct_thrd_current()); - - std::unique_ptr> argsptr(static_cast*>(arg)); - std::shared_ptr args = *argsptr; - - // poll() whilst checking for cancellation at intervals +static int wait_on_fds(std::shared_ptr args) { int ready; double waitFor = std::fmax(args->timeout.diff_secs(Timestamp()), 0); @@ -124,8 +167,6 @@ static int wait_thread(void *arg) { if (ready) break; } while ((waitFor = args->timeout.diff_secs(Timestamp())) > 0); - // store pollfd revents in args->results for use by callback - if (ready > 0) { for (std::size_t i = 0; i < args->fds.size(); i++) { (args->results)[i] = (args->fds)[i].revents == 0 ? 0 : (args->fds)[i].revents & (POLLIN | POLLOUT) ? 1: NA_INTEGER; @@ -134,7 +175,43 @@ static int wait_thread(void *arg) { std::fill(args->results.begin(), args->results.end(), NA_INTEGER); } - callbackRegistryTable.scheduleCallback(later_callback, static_cast(argsptr.release()), 0, args->loop); + return 0; + +} + +static int wait_thread_single(void *arg) { + + tct_thrd_detach(tct_thrd_current()); + + std::unique_ptr> argsptr(static_cast*>(arg)); + std::shared_ptr args = *argsptr; + + if (wait_on_fds(args) == 0) { + callbackRegistryTable.scheduleCallback(later_callback, static_cast(argsptr.release()), 0, args->loop); + } + + return 0; + +} + +static int wait_thread_persistent(void *arg) { + + tct_thrd_detach(tct_thrd_current()); + + while (1) { + + if (wait_for_signal() < 0) + break; + + const int loop = (*threadargs)->loop; + if (wait_on_fds(*threadargs) == 0) { + callbackRegistryTable.scheduleCallback(later_callback, static_cast(threadargs.release()), 0, loop); + } + + if (wait_done() < 0) + break; + + } return 0; @@ -144,9 +221,17 @@ static int execLater_launch_thread(std::shared_ptr args) { std::unique_ptr> argsptr(new std::shared_ptr(args)); - tct_thrd_t thr; + // static initialization ensures finalizer runs before those for the condition variable / mutex + static PersistentThread persistentthread; + + int ret; + if ((ret = submit_wait(args))) { + // create single wait thread if persistent thread is busy + tct_thrd_t thr; + ret = tct_thrd_create(&thr, &wait_thread_single, static_cast(argsptr.release())) != tct_thrd_success; + } - return tct_thrd_create(&thr, &wait_thread, static_cast(argsptr.release())) != tct_thrd_success; + return ret; } diff --git a/tests/testthat/test-later-fd.R b/tests/testthat/test-later-fd.R index a182d3c4..ddd0710b 100644 --- a/tests/testthat/test-later-fd.R +++ b/tests/testthat/test-later-fd.R @@ -3,8 +3,9 @@ context("test-later-fd.R") test_that("later_fd", { skip_if_not_installed("nanonext") - result <- NULL + result2 <- result <- NULL callback <- function(x) result <<- x + callback2 <- function(x) result2 <<- x s1 <- nanonext::socket(listen = "inproc://nanonext") on.exit(close(s1)) s2 <- nanonext::socket(dial = "inproc://nanonext") @@ -17,9 +18,16 @@ test_that("later_fd", { Sys.sleep(0.2) run_now() expect_equal(result, c(FALSE, FALSE)) - later_fd(callback, c(fd1, fd2), exceptfds = c(fd1, fd2), timeout = 0) + + # concurrent waits + later_fd(callback, c(fd1, fd2), exceptfds = c(fd1, fd2), timeout = 0.4) + later_fd(callback2, c(fd1, fd2), exceptfds = c(fd1, fd2), timeout = 0) Sys.sleep(0.2) run_now() + expect_equal(result2, c(FALSE, FALSE, FALSE, FALSE)) + expect_equal(result, c(FALSE, FALSE)) + Sys.sleep(0.4) + run_now() expect_equal(result, c(FALSE, FALSE, FALSE, FALSE)) # cancellation