diff --git a/rpa_queue.c b/rpa_queue.c index 3f217d5..92d2ed1 100644 --- a/rpa_queue.c +++ b/rpa_queue.c @@ -20,22 +20,31 @@ #include #include #include +#include // uncomment to print debug messages //#define QUEUE_DEBUG +/** + * @struct rpa_queue_t + * @brief Opaque structure representing a thread-safe circular queue. + * + * This structure defines a thread-safe circular queue that supports concurrent access + * by multiple threads. The queue uses a fixed-size array to store elements and is designed + * to handle both producer and consumer threads efficiently. + */ struct rpa_queue_t { - void **data; - volatile uint32_t nelts; /**< # elements */ - uint32_t in; /**< next empty location */ - uint32_t out; /**< next filled location */ - uint32_t bounds;/**< max size of queue */ - uint32_t full_waiters; - uint32_t empty_waiters; - pthread_mutex_t *one_big_mutex; - pthread_cond_t *not_empty; - pthread_cond_t *not_full; - int terminated; + void **data; /**< Array to store elements */ + volatile uint32_t nelts; /**< Number of elements in the queue */ + uint32_t in; /**< Next empty location in the queue */ + uint32_t out; /**< Next filled location in the queue */ + uint32_t bounds; /**< Maximum size of the queue */ + uint32_t full_waiters; /**< Number of threads waiting on a full queue */ + uint32_t empty_waiters; /**< Number of threads waiting on an empty queue */ + pthread_mutex_t *one_big_mutex;/**< Mutex for controlling access to the queue */ + pthread_cond_t *not_empty; /**< Condition variable for signaling non-empty queue */ + pthread_cond_t *not_full; /**< Condition variable for signaling non-full queue */ + int terminated; /**< Flag indicating whether the queue is terminated */ }; #ifdef QUEUE_DEBUG @@ -50,16 +59,92 @@ static void Q_DBG(const char*msg, rpa_queue_t *q) { #endif /** - * Detects when the rpa_queue_t is full. This utility function is expected - * to be called from within critical sections, and is not threadsafe. + * @brief Macro to check if the rpa_queue_t is full. + * + * This macro checks if the number of elements in the queue is equal to the maximum + * size of the queue, indicating that the queue is full. + * + * @param queue Pointer to the rpa_queue_t instance. + * @return 1 if the queue is full, 0 otherwise. + */ +#define MC_rpa_queue_full(queue) ((queue)->nelts == (queue)->bounds) + +/** + * @brief Macro to get the number of free slots in the rpa_queue_t. + * + * This macro calculates the number of free slots in the queue by subtracting + * the current number of elements from the maximum size of the queue. + * + * @param queue Pointer to the rpa_queue_t instance. + * @return The number of free slots in the queue. + */ +#define MC_rpa_queue_get_free(queue) (((queue)->bounds) - ((queue)->nelts)) + +/** + * @brief Macro to get the number of taken slots in the rpa_queue_t. + * + * This macro retrieves the current number of elements in the queue, indicating + * the number of slots that have been taken. + * + * @param queue Pointer to the rpa_queue_t instance. + * @return The number of taken slots in the queue. + */ +#define MC_rpa_queue_get_taken(queue) ((queue)->nelts) + +/** + * @brief Macro to check if the rpa_queue_t is empty. + * + * This macro checks if the number of elements in the queue is zero, indicating + * that the queue is empty. + * + * @param queue Pointer to the rpa_queue_t instance. + * @return 1 if the queue is empty, 0 otherwise. + */ +#define MC_rpa_queue_empty(queue) ((queue)->nelts == 0) + +/** + * @brief Macro to check if the rpa_queue_t is full. + * + * This macro checks if the number of elements in the queue is equal to the maximum + * size of the queue, indicating that the queue is full. + * + * @param queue Pointer to the rpa_queue_t instance. + * @return 1 if the queue is full, 0 otherwise. */ -#define rpa_queue_full(queue) ((queue)->nelts == (queue)->bounds) + /** - * Detects when the rpa_queue_t is empty. This utility function is expected - * to be called from within critical sections, and is not threadsafe. + * @brief Macro to check if the rpa_queue_t is empty. + * + * This macro checks if the number of elements in the queue is zero, indicating + * that the queue is empty. + * + * @param queue Pointer to the rpa_queue_t instance. + * @return 1 if the queue is empty, 0 otherwise. */ -#define rpa_queue_empty(queue) ((queue)->nelts == 0) + +bool rpa_queue_empty(const rpa_queue_t *queue) +{ + return MC_rpa_queue_empty(queue); +} + +bool rpa_queue_full(const rpa_queue_t *queue) +{ + return MC_rpa_queue_full(queue); +} + +unsigned rpa_queue_get_free(const rpa_queue_t *queue) +{ + return MC_rpa_queue_get_free(queue); +} + +unsigned rpa_queue_get_taken(const rpa_queue_t *queue) +{ + return MC_rpa_queue_get_taken(queue); +} + + +static bool _rpa_queue_timedpop(rpa_queue_t *queue, void **data, int wait_ms, bool remove_from_queue); static void set_timeout(struct timespec * abstime, int wait_ms) { @@ -178,7 +263,7 @@ bool rpa_queue_timedpush(rpa_queue_t *queue, void *data, int wait_ms) return false; } - if (rpa_queue_full(queue)) { + if (MC_rpa_queue_full(queue)) { if (!queue->terminated) { queue->full_waiters++; if (wait_ms == RPA_WAIT_FOREVER) { @@ -196,7 +281,7 @@ bool rpa_queue_timedpush(rpa_queue_t *queue, void *data, int wait_ms) } } /* If we wake up and it's still empty, then we were interrupted */ - if (rpa_queue_full(queue)) { + if (MC_rpa_queue_full(queue)) { Q_DBG("queue full (intr)", queue); rv = pthread_mutex_unlock(queue->one_big_mutex); if (rv != 0) { @@ -248,7 +333,7 @@ bool rpa_queue_trypush(rpa_queue_t *queue, void *data) return false; } - if (rpa_queue_full(queue)) { + if (MC_rpa_queue_full(queue)) { rv = pthread_mutex_unlock(queue->one_big_mutex); return false; //EAGAIN; } @@ -280,6 +365,12 @@ uint32_t rpa_queue_size(rpa_queue_t *queue) { return queue->nelts; } +bool rpa_queue_timedpeek(rpa_queue_t *queue, void **data, int wait_ms) +{ + return _rpa_queue_timedpop(queue, data, wait_ms, false); +} + + /** * Retrieves the next item from the queue. If there are no * items available, it will block until one becomes available. @@ -292,6 +383,11 @@ bool rpa_queue_pop(rpa_queue_t *queue, void **data) } bool rpa_queue_timedpop(rpa_queue_t *queue, void **data, int wait_ms) +{ + return _rpa_queue_timedpop(queue, data, wait_ms, true); +} + +static bool _rpa_queue_timedpop(rpa_queue_t *queue, void **data, int wait_ms, bool remove_from_queue) { bool rv; @@ -307,7 +403,7 @@ bool rpa_queue_timedpop(rpa_queue_t *queue, void **data, int wait_ms) } /* Keep waiting until we wake up and find that the queue is not empty. */ - if (rpa_queue_empty(queue)) { + if (MC_rpa_queue_empty(queue)) { if (!queue->terminated) { queue->empty_waiters++; if (wait_ms == RPA_WAIT_FOREVER) { @@ -325,7 +421,7 @@ bool rpa_queue_timedpop(rpa_queue_t *queue, void **data, int wait_ms) } } /* If we wake up and it's still empty, then we were interrupted */ - if (rpa_queue_empty(queue)) { + if (MC_rpa_queue_empty(queue)) { Q_DBG("queue empty (intr)", queue); rv = pthread_mutex_unlock(queue->one_big_mutex); if (rv != 0) { @@ -340,12 +436,17 @@ bool rpa_queue_timedpop(rpa_queue_t *queue, void **data, int wait_ms) } *data = queue->data[queue->out]; - queue->nelts--; - - queue->out++; - if (queue->out >= queue->bounds) { - queue->out -= queue->bounds; + if(remove_from_queue) + { + queue->nelts--; + + queue->out++; + if (queue->out >= queue->bounds) + { + queue->out -= queue->bounds; + } } + if (queue->full_waiters) { Q_DBG("signal !full", queue); rv = pthread_cond_signal(queue->not_full); @@ -355,6 +456,8 @@ bool rpa_queue_timedpop(rpa_queue_t *queue, void **data, int wait_ms) } } + //function_return: + pthread_mutex_unlock(queue->one_big_mutex); return true; } @@ -377,7 +480,7 @@ bool rpa_queue_trypop(rpa_queue_t *queue, void **data) return false; } - if (rpa_queue_empty(queue)) { + if (MC_rpa_queue_empty(queue)) { rv = pthread_mutex_unlock(queue->one_big_mutex); return false; //EAGAIN; } diff --git a/rpa_queue.h b/rpa_queue.h index 38b90f0..18336bf 100644 --- a/rpa_queue.h +++ b/rpa_queue.h @@ -20,6 +20,7 @@ #include #include #include +#include #define RPA_WAIT_NONE 0 #define RPA_WAIT_FOREVER -1 @@ -39,10 +40,11 @@ */ /** - * opaque structure + * @brief Opaque structure representing a thread-safe circular queue. */ typedef struct rpa_queue_t rpa_queue_t; + /** * create a FIFO queue * @param queue The new queue @@ -74,6 +76,18 @@ bool rpa_queue_push(rpa_queue_t *queue, void *data); */ bool rpa_queue_timedpush(rpa_queue_t *queue, void *data, int wait_ms); +/** + * peek an object from the queue without removing it from the queue, blocking if the queue is already empty + * + * @param queue the queue + * @param data the data + * @param wait_ms milliseconds to wait + * @returns RPA_EINTR the blocking was interrupted (try again) + * @returns RPA_EOF if the queue has been terminated + * @returns RPA_SUCCESS on a successful pop + */ +bool rpa_queue_timedpeek(rpa_queue_t *queue, void **data, int wait_ms); + /** * pop/get an object from the queue, blocking if the queue is already empty * @@ -158,4 +172,49 @@ void rpa_queue_destroy(rpa_queue_t * queue); */ void rpa_queue_free(rpa_queue_t * queue); + +/** + * @brief Check if the rpa_queue_t is empty. + * + * This function checks if the number of elements in the queue is zero, indicating + * that the queue is empty. + * + * @param queue Pointer to the rpa_queue_t instance. + * @return true if the queue is empty, false otherwise. + */ +bool rpa_queue_empty(const rpa_queue_t *queue); + +/** + * @brief Check if the rpa_queue_t is full. + * + * This function checks if the number of elements in the queue is equal to the maximum + * size of the queue, indicating that the queue is full. + * + * @param queue Pointer to the rpa_queue_t instance. + * @return true if the queue is full, false otherwise. + */ +bool rpa_queue_full(const rpa_queue_t *queue); + +/** + * @brief Get the number of free slots in the rpa_queue_t. + * + * This function calculates the number of free slots in the queue by subtracting + * the current number of elements from the maximum size of the queue. + * + * @param queue Pointer to the rpa_queue_t instance. + * @return The number of free slots in the queue. + */ +unsigned rpa_queue_get_free(const rpa_queue_t *queue); + +/** + * @brief Get the number of taken slots in the rpa_queue_t. + * + * This function retrieves the current number of elements in the queue, indicating + * the number of slots that have been taken. + * + * @param queue Pointer to the rpa_queue_t instance. + * @return The number of taken slots in the queue. + */ +unsigned rpa_queue_get_taken(const rpa_queue_t *queue); + #endif /* RPAQUEUE_H */