• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "iwstw.h"
2 #include "iwth.h"
3 #include "iwlog.h"
4 #include "iwp.h"
5 
6 #include <stdlib.h>
7 #include <errno.h>
8 #include <assert.h>
9 #include <string.h>
10 
11 struct task {
12   iwstw_task_f fn;
13   void *arg;
14   struct task *next;
15 };
16 
17 struct iwstw {
18   struct task *head;
19   struct task *tail;
20   char *thread_name;
21   iwstw_on_task_discard_f on_task_discard;
22   pthread_mutex_t mtx;
23   pthread_cond_t  cond;
24   pthread_cond_t  cond_queue;
25   pthread_t       thr;
26   int  cnt;
27   int  queue_limit;
28   bool queue_blocking;
29   bool queue_blocked;
30   volatile bool shutdown;
31 };
32 
_worker_fn(void * op)33 static void* _worker_fn(void *op) {
34   struct iwstw *stw = op;
35   assert(stw);
36 
37   if (stw->thread_name) {
38     iwp_set_current_thread_name(stw->thread_name);
39   }
40 
41   while (true) {
42     void *arg;
43     iwstw_task_f fn = 0;
44 
45     pthread_mutex_lock(&stw->mtx);
46     if (stw->head) {
47       struct task *h = stw->head;
48       fn = h->fn;
49       arg = h->arg;
50       stw->head = h->next;
51       if (stw->head == 0) {
52         stw->tail = 0;
53       }
54       --stw->cnt;
55       free(h);
56     }
57     pthread_mutex_unlock(&stw->mtx);
58 
59     if (fn) {
60       fn(arg);
61     }
62 
63     pthread_mutex_lock(&stw->mtx);
64     if (stw->head) {
65       if (stw->queue_blocked && stw->cnt < stw->queue_limit) {
66         stw->queue_blocked = false;
67         pthread_cond_broadcast(&stw->cond_queue);
68       }
69       pthread_mutex_unlock(&stw->mtx);
70       continue;
71     } else if (stw->shutdown) {
72       pthread_mutex_unlock(&stw->mtx);
73       break;
74     } else if (stw->queue_blocked && stw->cnt < stw->queue_limit) {
75       stw->queue_blocked = false;
76       pthread_cond_broadcast(&stw->cond_queue);
77     }
78     pthread_cond_wait(&stw->cond, &stw->mtx);
79     pthread_mutex_unlock(&stw->mtx);
80   }
81   return 0;
82 }
83 
iwstw_shutdown(IWSTW * stwp,bool wait_for_all)84 iwrc iwstw_shutdown(IWSTW *stwp, bool wait_for_all) {
85   if (!stwp || !*stwp) {
86     return 0;
87   }
88   IWSTW stw = *stwp;
89   pthread_mutex_lock(&stw->mtx);
90   if (stw->shutdown) {
91     pthread_mutex_unlock(&stw->mtx);
92     return 0;
93   }
94   pthread_t st = pthread_self();
95   if (stw->thr == pthread_self()) {
96     iwlog_error("iwstw | Thread iwstw_shutdown() from self thread: %lu", (unsigned long) st);
97     return IW_ERROR_ASSERTION;
98   }
99   if (!wait_for_all) {
100     struct task *t = stw->head;
101     while (t) {
102       struct task *o = t;
103       t = t->next;
104       if (stw->on_task_discard) {
105         stw->on_task_discard(t->fn, t->arg);
106       }
107       free(o);
108     }
109     stw->head = 0;
110     stw->tail = 0;
111     stw->cnt = 0;
112   }
113   stw->shutdown = true;
114   pthread_cond_broadcast(&stw->cond);
115   if (stw->queue_blocking) {
116     pthread_cond_broadcast(&stw->cond_queue);
117   }
118   pthread_mutex_unlock(&stw->mtx);
119   pthread_join(stw->thr, 0);
120   pthread_cond_destroy(&stw->cond);
121   pthread_mutex_destroy(&stw->mtx);
122 
123   free(stw->thread_name);
124   free(stw);
125   *stwp = 0;
126   return 0;
127 }
128 
iwstw_queue_size(IWSTW stw)129 int iwstw_queue_size(IWSTW stw) {
130   int res = 0;
131   pthread_mutex_lock(&stw->mtx);
132   res = stw->cnt;
133   pthread_mutex_unlock(&stw->mtx);
134   return res;
135 }
136 
iwstw_schedule(IWSTW stw,iwstw_task_f fn,void * arg)137 iwrc iwstw_schedule(IWSTW stw, iwstw_task_f fn, void *arg) {
138   if (!stw || !fn) {
139     return IW_ERROR_INVALID_ARGS;
140   }
141   iwrc rc = 0;
142   struct task *task = malloc(sizeof(*task));
143   RCA(task, finish);
144   *task = (struct task) {
145     .fn = fn,
146     .arg = arg
147   };
148   int rci = pthread_mutex_lock(&stw->mtx);
149   if (rci) {
150     rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, errno);
151     goto finish;
152   }
153   if (stw->shutdown) {
154     rc = IW_ERROR_INVALID_STATE;
155     pthread_mutex_unlock(&stw->mtx);
156     goto finish;
157   }
158 
159   while (stw->queue_limit && (stw->cnt + 1 > stw->queue_limit)) {
160     if (stw->queue_blocking) {
161       if (stw->shutdown) {
162         rc = IW_ERROR_INVALID_STATE;
163         pthread_mutex_unlock(&stw->mtx);
164         goto finish;
165       }
166       stw->queue_blocked = true;
167       pthread_cond_wait(&stw->cond_queue, &stw->mtx);
168     } else {
169       rc = IW_ERROR_OVERFLOW;
170       pthread_mutex_unlock(&stw->mtx);
171       goto finish;
172     }
173   }
174 
175   if (stw->tail) {
176     stw->tail->next = task;
177     stw->tail = task;
178   } else {
179     stw->head = task;
180     stw->tail = task;
181   }
182   ++stw->cnt;
183   pthread_cond_broadcast(&stw->cond);
184   pthread_mutex_unlock(&stw->mtx);
185 
186 finish:
187   if (rc) {
188     free(task);
189   }
190   return rc; // NOLINT (clang-analyzer-unix.Malloc)
191 }
192 
iwstw_schedule_only(IWSTW stw,iwstw_task_f fn,void * arg)193 iwrc iwstw_schedule_only(IWSTW stw, iwstw_task_f fn, void *arg) {
194   if (!stw || !fn) {
195     return IW_ERROR_INVALID_ARGS;
196   }
197   iwrc rc = 0;
198   struct task *task = malloc(sizeof(*task));
199   RCA(task, finish);
200   *task = (struct task) {
201     .fn = fn,
202     .arg = arg
203   };
204   int rci = pthread_mutex_lock(&stw->mtx);
205   if (rci) {
206     rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, errno);
207     goto finish;
208   }
209   if (stw->shutdown) {
210     rc = IW_ERROR_INVALID_STATE;
211     pthread_mutex_unlock(&stw->mtx);
212     goto finish;
213   }
214 
215   struct task *t = stw->head;
216   while (t) {
217     struct task *o = t;
218     t = t->next;
219     if (stw->on_task_discard) {
220       stw->on_task_discard(t->fn, t->arg);
221     }
222     free(o);
223   }
224 
225   stw->head = task;
226   stw->tail = task;
227   stw->cnt = 1;
228 
229   pthread_cond_broadcast(&stw->cond);
230   pthread_mutex_unlock(&stw->mtx);
231 
232 finish:
233   if (rc) {
234     free(task);
235   }
236   return rc; // NOLINT (clang-analyzer-unix.Malloc)
237 }
238 
iwstw_schedule_empty_only(IWSTW stw,iwstw_task_f fn,void * arg,bool * out_scheduled)239 iwrc iwstw_schedule_empty_only(IWSTW stw, iwstw_task_f fn, void *arg, bool *out_scheduled) {
240   if (!stw || !fn || !out_scheduled) {
241     return IW_ERROR_INVALID_ARGS;
242   }
243   *out_scheduled = false;
244   iwrc rc = 0;
245   struct task *task = malloc(sizeof(*task));
246   RCA(task, finish);
247   *task = (struct task) {
248     .fn = fn,
249     .arg = arg
250   };
251   int rci = pthread_mutex_lock(&stw->mtx);
252   if (rci) {
253     rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, errno);
254     goto finish;
255   }
256   if (stw->shutdown) {
257     rc = IW_ERROR_INVALID_STATE;
258     pthread_mutex_unlock(&stw->mtx);
259     goto finish;
260   }
261   if (stw->head) {
262     pthread_mutex_unlock(&stw->mtx);
263     goto finish;
264   }
265   *out_scheduled = true;
266   stw->head = task;
267   stw->tail = task;
268   ++stw->cnt;
269   pthread_cond_broadcast(&stw->cond);
270   pthread_mutex_unlock(&stw->mtx);
271 
272 finish:
273   if (rc) {
274     free(task);
275   }
276   return rc; // NOLINT (clang-analyzer-unix.Malloc)
277 }
278 
iwstw_set_on_task_discard(IWSTW stw,iwstw_on_task_discard_f on_task_discard)279 void iwstw_set_on_task_discard(IWSTW stw, iwstw_on_task_discard_f on_task_discard) {
280   stw->on_task_discard = on_task_discard;
281 }
282 
iwstw_start(const char * thread_name,int queue_limit,bool queue_blocking,IWSTW * out_stw)283 iwrc iwstw_start(const char *thread_name, int queue_limit, bool queue_blocking, IWSTW *out_stw) {
284   if (queue_limit < 0 || !out_stw) {
285     return IW_ERROR_INVALID_ARGS;
286   }
287   if (thread_name && strlen(thread_name) > 15) {
288     return IW_ERROR_INVALID_ARGS;
289   }
290   struct iwstw *stw = malloc(sizeof(*stw));
291   if (!stw) {
292     *out_stw = 0;
293     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
294   }
295   int rci;
296   iwrc rc = 0;
297   *stw = (struct iwstw) {
298     .queue_limit = queue_limit,
299     .mtx = PTHREAD_MUTEX_INITIALIZER,
300     .cond = PTHREAD_COND_INITIALIZER,
301     .cond_queue = PTHREAD_COND_INITIALIZER,
302     .queue_blocking = queue_blocking
303   };
304   if (thread_name) {
305     stw->thread_name = strdup(thread_name);
306   }
307 
308   rci = pthread_create(&stw->thr, 0, _worker_fn, stw);
309   if (rci) {
310     rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, errno);
311     goto finish;
312   }
313 
314 finish:
315   if (rc) {
316     *out_stw = 0;
317     free(stw->thread_name);
318     free(stw);
319   } else {
320     *out_stw = stw;
321   }
322   return 0;
323 }
324