Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 100 additions & 15 deletions src/fd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
#include <cstdlib>
#include <atomic>
#include <memory>
#include "tinycthread.h"
#include "later.h"
#include "threadutils.h"
#include "callback_registry_table.h"

class ThreadArgs {
Expand Down Expand Up @@ -81,6 +81,57 @@ class ThreadArgs {

};

static Mutex mtx(tct_mtx_plain);
static ConditionVariable cv(mtx);
static int busy = 0;
static std::unique_ptr<std::shared_ptr<ThreadArgs>> threadargs = nullptr;

static int wait_thread_persistent(void *arg);

class PersistentThread {
tct_thrd_t thr = 0;

public:
PersistentThread() {
if (tct_thrd_create(&thr, &wait_thread_persistent, NULL) != tct_thrd_success)
throw std::runtime_error("Thread creation failed.");
}
~PersistentThread() {
Guard guard(&mtx);
if (threadargs != nullptr) {
(*threadargs)->active->store(false);
}
busy = -1;
cv.broadcast();
}

};

static int wait_for_signal() {
Guard guard(&mtx);
while (!busy)
cv.wait();
return busy;
}

static int submit_wait(std::shared_ptr<ThreadArgs> args) {
Guard guard(&mtx);
if (busy)
return busy;
threadargs.reset(new std::shared_ptr<ThreadArgs>(args));
busy = 1;
cv.broadcast();
return 0;
}

static int wait_done() {
Guard guard(&mtx);
threadargs.reset();
int ret = busy;
busy = 0;
return ret;
}

static void later_callback(void *arg) {

ASSERT_MAIN_THREAD()
Expand All @@ -104,15 +155,7 @@ static void later_callback(void *arg) {
}

// CONSIDER: if necessary to add method for HANDLES on Windows. Would be different code to SOCKETs.
// TODO: implement re-usable background thread.
static int wait_thread(void *arg) {

tct_thrd_detach(tct_thrd_current());

std::unique_ptr<std::shared_ptr<ThreadArgs>> argsptr(static_cast<std::shared_ptr<ThreadArgs>*>(arg));
std::shared_ptr<ThreadArgs> args = *argsptr;

// poll() whilst checking for cancellation at intervals
static int wait_on_fds(std::shared_ptr<ThreadArgs> args) {

int ready;
double waitFor = std::fmax(args->timeout.diff_secs(Timestamp()), 0);
Expand All @@ -124,8 +167,6 @@ static int wait_thread(void *arg) {
if (ready) break;
} while ((waitFor = args->timeout.diff_secs(Timestamp())) > 0);

// store pollfd revents in args->results for use by callback

if (ready > 0) {
for (std::size_t i = 0; i < args->fds.size(); i++) {
(args->results)[i] = (args->fds)[i].revents == 0 ? 0 : (args->fds)[i].revents & (POLLIN | POLLOUT) ? 1: NA_INTEGER;
Expand All @@ -134,7 +175,43 @@ static int wait_thread(void *arg) {
std::fill(args->results.begin(), args->results.end(), NA_INTEGER);
}

callbackRegistryTable.scheduleCallback(later_callback, static_cast<void *>(argsptr.release()), 0, args->loop);
return 0;

}

static int wait_thread_single(void *arg) {

tct_thrd_detach(tct_thrd_current());

std::unique_ptr<std::shared_ptr<ThreadArgs>> argsptr(static_cast<std::shared_ptr<ThreadArgs>*>(arg));
std::shared_ptr<ThreadArgs> args = *argsptr;

if (wait_on_fds(args) == 0) {
callbackRegistryTable.scheduleCallback(later_callback, static_cast<void *>(argsptr.release()), 0, args->loop);
}

return 0;

}

static int wait_thread_persistent(void *arg) {

tct_thrd_detach(tct_thrd_current());

while (1) {

if (wait_for_signal() < 0)
break;

const int loop = (*threadargs)->loop;
if (wait_on_fds(*threadargs) == 0) {
callbackRegistryTable.scheduleCallback(later_callback, static_cast<void *>(threadargs.release()), 0, loop);
}

if (wait_done() < 0)
break;

}

return 0;

Expand All @@ -144,9 +221,17 @@ static int execLater_launch_thread(std::shared_ptr<ThreadArgs> args) {

std::unique_ptr<std::shared_ptr<ThreadArgs>> argsptr(new std::shared_ptr<ThreadArgs>(args));

tct_thrd_t thr;
// static initialization ensures finalizer runs before those for the condition variable / mutex
static PersistentThread persistentthread;

int ret;
if ((ret = submit_wait(args))) {
// create single wait thread if persistent thread is busy
tct_thrd_t thr;
ret = tct_thrd_create(&thr, &wait_thread_single, static_cast<void *>(argsptr.release())) != tct_thrd_success;
}

return tct_thrd_create(&thr, &wait_thread, static_cast<void *>(argsptr.release())) != tct_thrd_success;
return ret;

}

Expand Down
12 changes: 10 additions & 2 deletions tests/testthat/test-later-fd.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ context("test-later-fd.R")
test_that("later_fd", {
skip_if_not_installed("nanonext")

result <- NULL
result2 <- result <- NULL
callback <- function(x) result <<- x
callback2 <- function(x) result2 <<- x
s1 <- nanonext::socket(listen = "inproc://nanonext")
on.exit(close(s1))
s2 <- nanonext::socket(dial = "inproc://nanonext")
Expand All @@ -17,9 +18,16 @@ test_that("later_fd", {
Sys.sleep(0.2)
run_now()
expect_equal(result, c(FALSE, FALSE))
later_fd(callback, c(fd1, fd2), exceptfds = c(fd1, fd2), timeout = 0)

# concurrent waits
later_fd(callback, c(fd1, fd2), exceptfds = c(fd1, fd2), timeout = 0.4)
later_fd(callback2, c(fd1, fd2), exceptfds = c(fd1, fd2), timeout = 0)
Sys.sleep(0.2)
run_now()
expect_equal(result2, c(FALSE, FALSE, FALSE, FALSE))
expect_equal(result, c(FALSE, FALSE))
Sys.sleep(0.4)
run_now()
expect_equal(result, c(FALSE, FALSE, FALSE, FALSE))

# cancellation
Expand Down
Loading