libdatatypes 0.3.2
Abstract datatypes for C.
Loading...
Searching...
No Matches
asyncqueue.c
Go to the documentation of this file.
1/***************************************************************************
2 begin........: June 2012
3 copyright....: Sebastian Fedrau
4 email........: sebastian.fedrau@gmail.com
5 ***************************************************************************/
6
7/***************************************************************************
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License v3 as published by
10 the Free Software Foundation.
11
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 General Public License v3 for more details.
16 ***************************************************************************/
22#ifdef WITH_PTHREAD
23
24#include <stdlib.h>
25#include <stdio.h>
26#include <sys/time.h>
27#include <assert.h>
28#include <err.h>
29
30#include "asyncqueue.h"
31
34{
35 assert(compare);
36
37 AsyncQueue *queue = (AsyncQueue *)malloc(sizeof(AsyncQueue));
38
39 if(!queue)
40 {
41 perror("malloc()");
42 abort();
43 }
44
45 async_queue_init(queue, compare, free, pool);
46
47 return queue;
48}
49
50#define PTHREAD_CRITICAL_CALL(fn, ...) \
51{ \
52 int rc = fn(__VA_ARGS__); \
53 if (rc) \
54 { \
55 err(rc, "%s()", #fn); \
56 abort(); \
57 } \
58}
59
60void
62{
63 assert(queue != NULL);
64 assert(compare != NULL);
65
66 queue_init(&queue->queue, compare, free, pool);
67 queue->waiting = 0;
68
69 PTHREAD_CRITICAL_CALL(pthread_mutex_init, &queue->mutex, NULL);
70 PTHREAD_CRITICAL_CALL(pthread_cond_init, &queue->cond, NULL);
71}
72
73void
75{
76 assert(queue != NULL);
77
78 async_queue_free(queue);
79 free(queue);
80}
81
82#define PTHREAD_CALL(fn, ...) \
83{ \
84 int rc = fn(__VA_ARGS__); \
85 if (rc) \
86 { \
87 err(rc, "%s()", #fn); \
88 } \
89}
90
91void
93{
94 assert(queue != NULL);
95
96 queue_free(&queue->queue);
97
98 PTHREAD_CALL(pthread_mutex_destroy, &queue->mutex);
99 PTHREAD_CALL(pthread_cond_destroy, &queue->cond);
100}
101
102void
103async_queue_push(AsyncQueue *queue, void *data)
104{
105 assert(queue != NULL);
106
107 PTHREAD_CRITICAL_CALL(pthread_mutex_lock, &queue->mutex);
108
109 queue_push(&queue->queue, data);
110
111 if(queue->waiting)
112 {
113 PTHREAD_CALL(pthread_cond_signal, &queue->cond);
114 }
115
116 PTHREAD_CRITICAL_CALL(pthread_mutex_unlock, &queue->mutex);
117}
118
119static bool
120_async_queue_try_lock(AsyncQueue *queue)
121{
122 assert(queue != NULL);
123
124 int rc = pthread_mutex_lock(&queue->mutex);
125
126 if(rc)
127 {
128 err(rc, "pthread_mutex_lock()");
129 }
130
131 return !rc;
132}
133
134static bool
135_async_queue_cond_wait(AsyncQueue *queue, uint32_t ms)
136{
137 assert(queue != NULL);
138
139 bool success = false;
140
141 if(queue->waiting < SIZE_MAX)
142 {
143 ++queue->waiting;
144
145 if(ms)
146 {
147 struct timespec val;
148 struct timeval now;
149
150 gettimeofday(&now, NULL);
151
152 val.tv_sec = now.tv_sec + (ms / 1000);
153 val.tv_nsec = 0;
154
155 success = !pthread_cond_timedwait(&queue->cond, &queue->mutex, &val);
156 }
157 else
158 {
159 success = !pthread_cond_wait(&queue->cond, &queue->mutex);
160 }
161
162 --queue->waiting;
163 }
164 else
165 {
166 fprintf(stderr, "%s(): integer overflow.\n", __func__);
167 }
168
169 return success;
170}
171
172static bool
173_async_queue_pop(AsyncQueue *queue, void *data, uint32_t ms)
174{
175 assert(queue != NULL);
176
177 bool success = false;
178
179 if(_async_queue_try_lock(queue))
180 {
181 success = queue_pop(&queue->queue, data)
182 || (_async_queue_cond_wait(queue, ms) && queue_pop(&queue->queue, data));
183
184 PTHREAD_CRITICAL_CALL(pthread_mutex_unlock, &queue->mutex);
185 }
186
187 return success;
188}
189
190bool
191async_queue_pop(AsyncQueue *queue, void *data)
192{
193 assert(queue != NULL);
194
195 return _async_queue_pop(queue, data, 0);
196}
197
198bool
199async_queue_pop_timeout(AsyncQueue *queue, void *data, uint32_t ms)
200{
201 return _async_queue_pop(queue, data, ms);
202}
203
204void
206{
207 assert(queue != NULL);
208
209 PTHREAD_CRITICAL_CALL(pthread_mutex_lock, &queue->mutex);
210
211 queue_clear(&queue->queue);
212
213 PTHREAD_CRITICAL_CALL(pthread_mutex_unlock, &queue->mutex);
214}
215
216size_t
218{
219 assert(queue != NULL);
220
221 PTHREAD_CRITICAL_CALL(pthread_mutex_lock, &queue->mutex);
222
223 size_t count = queue_count(&queue->queue);
224
225 PTHREAD_CRITICAL_CALL(pthread_mutex_unlock, &queue->mutex);
226
227 return count;
228}
229
230#endif /* WITH_PTHREAD */
231
void async_queue_init(AsyncQueue *queue, CompareFunc compare, FreeFunc free, Pool *pool)
Definition asyncqueue.c:61
AsyncQueue * async_queue_new(CompareFunc compare, FreeFunc free, Pool *pool)
Definition asyncqueue.c:33
void async_queue_free(AsyncQueue *queue)
Definition asyncqueue.c:92
size_t async_queue_count(AsyncQueue *queue)
Definition asyncqueue.c:217
void async_queue_clear(AsyncQueue *queue)
Definition asyncqueue.c:205
void async_queue_destroy(AsyncQueue *queue)
Definition asyncqueue.c:74
bool async_queue_pop_timeout(AsyncQueue *queue, void *data, uint32_t ms)
Definition asyncqueue.c:199
void async_queue_push(AsyncQueue *queue, void *data)
Definition asyncqueue.c:103
bool async_queue_pop(AsyncQueue *queue, void *data)
Definition asyncqueue.c:191
Asynchronous queue.
size_t waiting
Definition asyncqueue.h:44
pthread_cond_t cond
Definition asyncqueue.h:40
pthread_mutex_t mutex
Definition asyncqueue.h:38
Queue queue
Definition asyncqueue.h:42
Asynchronous communication between threads.
Definition asyncqueue.h:36
int32_t(* CompareFunc)(const void *a, const void *b)
Definition compare.h:30
void(* FreeFunc)(void *p)
Definition datatypes.h:33
#define queue_count(queue)
Definition queue.h:112
#define queue_clear(queue)
Definition queue.h:104
#define queue_push(queue, data)
Definition queue.h:79
#define queue_init(queue, compare, free, pool)
Definition queue.h:57
#define queue_pop(queue, data)
Definition queue.h:88
#define queue_free(queue)
Definition queue.h:71
Allocate groups of equal-sized chunks of memory.
Definition pool.h:33