1 #include "iwstw.h"
2 #include "iwth.h"
3 #include "iwlog.h"
4 #include "iwp.h"
5
6 #include <stdlib.h>
7 #include <errno.h>
8 #include <assert.h>
9 #include <string.h>
10
11 struct task {
12 iwstw_task_f fn;
13 void *arg;
14 struct task *next;
15 };
16
17 struct iwstw {
18 struct task *head;
19 struct task *tail;
20 char *thread_name;
21 iwstw_on_task_discard_f on_task_discard;
22 pthread_mutex_t mtx;
23 pthread_cond_t cond;
24 pthread_cond_t cond_queue;
25 pthread_t thr;
26 int cnt;
27 int queue_limit;
28 bool queue_blocking;
29 bool queue_blocked;
30 volatile bool shutdown;
31 };
32
_worker_fn(void * op)33 static void* _worker_fn(void *op) {
34 struct iwstw *stw = op;
35 assert(stw);
36
37 if (stw->thread_name) {
38 iwp_set_current_thread_name(stw->thread_name);
39 }
40
41 while (true) {
42 void *arg;
43 iwstw_task_f fn = 0;
44
45 pthread_mutex_lock(&stw->mtx);
46 if (stw->head) {
47 struct task *h = stw->head;
48 fn = h->fn;
49 arg = h->arg;
50 stw->head = h->next;
51 if (stw->head == 0) {
52 stw->tail = 0;
53 }
54 --stw->cnt;
55 free(h);
56 }
57 pthread_mutex_unlock(&stw->mtx);
58
59 if (fn) {
60 fn(arg);
61 }
62
63 pthread_mutex_lock(&stw->mtx);
64 if (stw->head) {
65 if (stw->queue_blocked && stw->cnt < stw->queue_limit) {
66 stw->queue_blocked = false;
67 pthread_cond_broadcast(&stw->cond_queue);
68 }
69 pthread_mutex_unlock(&stw->mtx);
70 continue;
71 } else if (stw->shutdown) {
72 pthread_mutex_unlock(&stw->mtx);
73 break;
74 } else if (stw->queue_blocked && stw->cnt < stw->queue_limit) {
75 stw->queue_blocked = false;
76 pthread_cond_broadcast(&stw->cond_queue);
77 }
78 pthread_cond_wait(&stw->cond, &stw->mtx);
79 pthread_mutex_unlock(&stw->mtx);
80 }
81 return 0;
82 }
83
iwstw_shutdown(IWSTW * stwp,bool wait_for_all)84 iwrc iwstw_shutdown(IWSTW *stwp, bool wait_for_all) {
85 if (!stwp || !*stwp) {
86 return 0;
87 }
88 IWSTW stw = *stwp;
89 pthread_mutex_lock(&stw->mtx);
90 if (stw->shutdown) {
91 pthread_mutex_unlock(&stw->mtx);
92 return 0;
93 }
94 pthread_t st = pthread_self();
95 if (stw->thr == pthread_self()) {
96 iwlog_error("iwstw | Thread iwstw_shutdown() from self thread: %lu", (unsigned long) st);
97 return IW_ERROR_ASSERTION;
98 }
99 if (!wait_for_all) {
100 struct task *t = stw->head;
101 while (t) {
102 struct task *o = t;
103 t = t->next;
104 if (stw->on_task_discard) {
105 stw->on_task_discard(t->fn, t->arg);
106 }
107 free(o);
108 }
109 stw->head = 0;
110 stw->tail = 0;
111 stw->cnt = 0;
112 }
113 stw->shutdown = true;
114 pthread_cond_broadcast(&stw->cond);
115 if (stw->queue_blocking) {
116 pthread_cond_broadcast(&stw->cond_queue);
117 }
118 pthread_mutex_unlock(&stw->mtx);
119 pthread_join(stw->thr, 0);
120 pthread_cond_destroy(&stw->cond);
121 pthread_mutex_destroy(&stw->mtx);
122
123 free(stw->thread_name);
124 free(stw);
125 *stwp = 0;
126 return 0;
127 }
128
iwstw_queue_size(IWSTW stw)129 int iwstw_queue_size(IWSTW stw) {
130 int res = 0;
131 pthread_mutex_lock(&stw->mtx);
132 res = stw->cnt;
133 pthread_mutex_unlock(&stw->mtx);
134 return res;
135 }
136
iwstw_schedule(IWSTW stw,iwstw_task_f fn,void * arg)137 iwrc iwstw_schedule(IWSTW stw, iwstw_task_f fn, void *arg) {
138 if (!stw || !fn) {
139 return IW_ERROR_INVALID_ARGS;
140 }
141 iwrc rc = 0;
142 struct task *task = malloc(sizeof(*task));
143 RCA(task, finish);
144 *task = (struct task) {
145 .fn = fn,
146 .arg = arg
147 };
148 int rci = pthread_mutex_lock(&stw->mtx);
149 if (rci) {
150 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, errno);
151 goto finish;
152 }
153 if (stw->shutdown) {
154 rc = IW_ERROR_INVALID_STATE;
155 pthread_mutex_unlock(&stw->mtx);
156 goto finish;
157 }
158
159 while (stw->queue_limit && (stw->cnt + 1 > stw->queue_limit)) {
160 if (stw->queue_blocking) {
161 if (stw->shutdown) {
162 rc = IW_ERROR_INVALID_STATE;
163 pthread_mutex_unlock(&stw->mtx);
164 goto finish;
165 }
166 stw->queue_blocked = true;
167 pthread_cond_wait(&stw->cond_queue, &stw->mtx);
168 } else {
169 rc = IW_ERROR_OVERFLOW;
170 pthread_mutex_unlock(&stw->mtx);
171 goto finish;
172 }
173 }
174
175 if (stw->tail) {
176 stw->tail->next = task;
177 stw->tail = task;
178 } else {
179 stw->head = task;
180 stw->tail = task;
181 }
182 ++stw->cnt;
183 pthread_cond_broadcast(&stw->cond);
184 pthread_mutex_unlock(&stw->mtx);
185
186 finish:
187 if (rc) {
188 free(task);
189 }
190 return rc; // NOLINT (clang-analyzer-unix.Malloc)
191 }
192
iwstw_schedule_only(IWSTW stw,iwstw_task_f fn,void * arg)193 iwrc iwstw_schedule_only(IWSTW stw, iwstw_task_f fn, void *arg) {
194 if (!stw || !fn) {
195 return IW_ERROR_INVALID_ARGS;
196 }
197 iwrc rc = 0;
198 struct task *task = malloc(sizeof(*task));
199 RCA(task, finish);
200 *task = (struct task) {
201 .fn = fn,
202 .arg = arg
203 };
204 int rci = pthread_mutex_lock(&stw->mtx);
205 if (rci) {
206 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, errno);
207 goto finish;
208 }
209 if (stw->shutdown) {
210 rc = IW_ERROR_INVALID_STATE;
211 pthread_mutex_unlock(&stw->mtx);
212 goto finish;
213 }
214
215 struct task *t = stw->head;
216 while (t) {
217 struct task *o = t;
218 t = t->next;
219 if (stw->on_task_discard) {
220 stw->on_task_discard(t->fn, t->arg);
221 }
222 free(o);
223 }
224
225 stw->head = task;
226 stw->tail = task;
227 stw->cnt = 1;
228
229 pthread_cond_broadcast(&stw->cond);
230 pthread_mutex_unlock(&stw->mtx);
231
232 finish:
233 if (rc) {
234 free(task);
235 }
236 return rc; // NOLINT (clang-analyzer-unix.Malloc)
237 }
238
iwstw_schedule_empty_only(IWSTW stw,iwstw_task_f fn,void * arg,bool * out_scheduled)239 iwrc iwstw_schedule_empty_only(IWSTW stw, iwstw_task_f fn, void *arg, bool *out_scheduled) {
240 if (!stw || !fn || !out_scheduled) {
241 return IW_ERROR_INVALID_ARGS;
242 }
243 *out_scheduled = false;
244 iwrc rc = 0;
245 struct task *task = malloc(sizeof(*task));
246 RCA(task, finish);
247 *task = (struct task) {
248 .fn = fn,
249 .arg = arg
250 };
251 int rci = pthread_mutex_lock(&stw->mtx);
252 if (rci) {
253 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, errno);
254 goto finish;
255 }
256 if (stw->shutdown) {
257 rc = IW_ERROR_INVALID_STATE;
258 pthread_mutex_unlock(&stw->mtx);
259 goto finish;
260 }
261 if (stw->head) {
262 pthread_mutex_unlock(&stw->mtx);
263 goto finish;
264 }
265 *out_scheduled = true;
266 stw->head = task;
267 stw->tail = task;
268 ++stw->cnt;
269 pthread_cond_broadcast(&stw->cond);
270 pthread_mutex_unlock(&stw->mtx);
271
272 finish:
273 if (rc) {
274 free(task);
275 }
276 return rc; // NOLINT (clang-analyzer-unix.Malloc)
277 }
278
iwstw_set_on_task_discard(IWSTW stw,iwstw_on_task_discard_f on_task_discard)279 void iwstw_set_on_task_discard(IWSTW stw, iwstw_on_task_discard_f on_task_discard) {
280 stw->on_task_discard = on_task_discard;
281 }
282
iwstw_start(const char * thread_name,int queue_limit,bool queue_blocking,IWSTW * out_stw)283 iwrc iwstw_start(const char *thread_name, int queue_limit, bool queue_blocking, IWSTW *out_stw) {
284 if (queue_limit < 0 || !out_stw) {
285 return IW_ERROR_INVALID_ARGS;
286 }
287 if (thread_name && strlen(thread_name) > 15) {
288 return IW_ERROR_INVALID_ARGS;
289 }
290 struct iwstw *stw = malloc(sizeof(*stw));
291 if (!stw) {
292 *out_stw = 0;
293 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
294 }
295 int rci;
296 iwrc rc = 0;
297 *stw = (struct iwstw) {
298 .queue_limit = queue_limit,
299 .mtx = PTHREAD_MUTEX_INITIALIZER,
300 .cond = PTHREAD_COND_INITIALIZER,
301 .cond_queue = PTHREAD_COND_INITIALIZER,
302 .queue_blocking = queue_blocking
303 };
304 if (thread_name) {
305 stw->thread_name = strdup(thread_name);
306 }
307
308 rci = pthread_create(&stw->thr, 0, _worker_fn, stw);
309 if (rci) {
310 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, errno);
311 goto finish;
312 }
313
314 finish:
315 if (rc) {
316 *out_stw = 0;
317 free(stw->thread_name);
318 free(stw);
319 } else {
320 *out_stw = stw;
321 }
322 return 0;
323 }
324