50#define PTHREAD_CRITICAL_CALL(fn, ...) \ 
   52    int rc = fn(__VA_ARGS__); \ 
   55        err(rc, "%s()", #fn); \ 
   63    assert(queue != NULL);
 
   64    assert(compare != NULL);
 
   69    PTHREAD_CRITICAL_CALL(pthread_mutex_init, &queue->
mutex, NULL);
 
   70    PTHREAD_CRITICAL_CALL(pthread_cond_init, &queue->
cond, NULL);
 
 
   76    assert(queue != NULL);
 
 
   82#define PTHREAD_CALL(fn, ...) \ 
   84    int rc = fn(__VA_ARGS__); \ 
   87        err(rc, "%s()", #fn); \ 
   94    assert(queue != NULL);
 
   98    PTHREAD_CALL(pthread_mutex_destroy, &queue->
mutex);
 
   99    PTHREAD_CALL(pthread_cond_destroy, &queue->
cond);
 
 
  105    assert(queue != NULL);
 
  107    PTHREAD_CRITICAL_CALL(pthread_mutex_lock, &queue->
mutex);
 
  113        PTHREAD_CALL(pthread_cond_signal, &queue->
cond);
 
  116    PTHREAD_CRITICAL_CALL(pthread_mutex_unlock, &queue->
mutex);
 
 
  122    assert(queue != NULL);
 
  124    int rc = pthread_mutex_lock(&queue->
mutex);
 
  128        err(rc, 
"pthread_mutex_lock()");
 
  135_async_queue_cond_wait(
AsyncQueue *queue, uint32_t ms)
 
  137    assert(queue != NULL);
 
  139    bool success = 
false;
 
  150            gettimeofday(&now, NULL);
 
  152            val.tv_sec = now.tv_sec + (ms / 1000);
 
  155            success = !pthread_cond_timedwait(&queue->
cond, &queue->
mutex, &val);
 
  159            success = !pthread_cond_wait(&queue->
cond, &queue->
mutex);
 
  166        fprintf(stderr, 
"%s(): integer overflow.\n", __func__);
 
  173_async_queue_pop(
AsyncQueue *queue, 
void *data, uint32_t ms)
 
  175    assert(queue != NULL);
 
  177    bool success = 
false;
 
  179    if(_async_queue_try_lock(queue))
 
  182            || (_async_queue_cond_wait(queue, ms) && 
queue_pop(&queue->
queue, data));
 
  184        PTHREAD_CRITICAL_CALL(pthread_mutex_unlock, &queue->
mutex);
 
  193    assert(queue != NULL);
 
  195    return _async_queue_pop(queue, data, 0);
 
 
  201    return _async_queue_pop(queue, data, ms);
 
 
  207    assert(queue != NULL);
 
  209    PTHREAD_CRITICAL_CALL(pthread_mutex_lock, &queue->
mutex);
 
  213    PTHREAD_CRITICAL_CALL(pthread_mutex_unlock, &queue->
mutex);
 
 
  219    assert(queue != NULL);
 
  221    PTHREAD_CRITICAL_CALL(pthread_mutex_lock, &queue->
mutex);
 
  225    PTHREAD_CRITICAL_CALL(pthread_mutex_unlock, &queue->
mutex);
 
 
void async_queue_init(AsyncQueue *queue, CompareFunc compare, FreeFunc free, Pool *pool)
AsyncQueue * async_queue_new(CompareFunc compare, FreeFunc free, Pool *pool)
void async_queue_free(AsyncQueue *queue)
size_t async_queue_count(AsyncQueue *queue)
void async_queue_clear(AsyncQueue *queue)
void async_queue_destroy(AsyncQueue *queue)
bool async_queue_pop_timeout(AsyncQueue *queue, void *data, uint32_t ms)
void async_queue_push(AsyncQueue *queue, void *data)
bool async_queue_pop(AsyncQueue *queue, void *data)
Asynchronous communication between threads.
int32_t(* CompareFunc)(const void *a, const void *b)
void(* FreeFunc)(void *p)
#define queue_count(queue)
#define queue_clear(queue)
#define queue_push(queue, data)
#define queue_init(queue, compare, free, pool)
#define queue_pop(queue, data)
#define queue_free(queue)
Allocate groups of equal-sized chunks of memory.