1 #include "iwstw.h"
2 #include "iwth.h"
3 #include "iwlog.h"
4 #include <stdlib.h>
5 #include <errno.h>
6 #include <assert.h>
7
8 struct _TASK {
9 iwstw_task_f fn;
10 void *arg;
11 struct _TASK *next;
12 };
13
14 struct _IWSTW {
15 struct _TASK *head;
16 struct _TASK *tail;
17 pthread_mutex_t mtx;
18 pthread_barrier_t brr;
19 pthread_cond_t cond;
20 pthread_t thr;
21 int cnt;
22 int queue_limit;
23 volatile bool shutdown;
24 };
25
worker_fn(void * op)26 void *worker_fn(void *op) {
27 struct _IWSTW *stw = op;
28 assert(stw);
29 pthread_barrier_wait(&stw->brr);
30
31 while (true) {
32 void *arg;
33 iwstw_task_f fn = 0;
34
35 pthread_mutex_lock(&stw->mtx);
36 if (stw->head) {
37 struct _TASK *h = stw->head;
38 fn = h->fn;
39 arg = h->arg;
40 stw->head = h->next;
41 if (stw->tail == h) {
42 stw->tail = stw->head;
43 }
44 --stw->cnt;
45 free(h);
46 }
47 pthread_mutex_unlock(&stw->mtx);
48
49 if (fn) {
50 fn(arg);
51 }
52
53 pthread_mutex_lock(&stw->mtx);
54 if (stw->head) {
55 pthread_mutex_unlock(&stw->mtx);
56 continue;
57 } else if (stw->shutdown) {
58 pthread_mutex_unlock(&stw->mtx);
59 break;
60 }
61 pthread_cond_wait(&stw->cond, &stw->mtx);
62 pthread_mutex_unlock(&stw->mtx);
63 }
64 return 0;
65 }
66
iwstw_shutdown(IWSTW * stwp,bool wait_for_all)67 void iwstw_shutdown(IWSTW *stwp, bool wait_for_all) {
68 if (!stwp || !*stwp) {
69 return;
70 }
71 IWSTW stw = *stwp;
72 pthread_mutex_lock(&stw->mtx);
73 if (stw->shutdown) {
74 pthread_mutex_unlock(&stw->mtx);
75 return;
76 }
77 if (!wait_for_all) {
78 struct _TASK *t = stw->head;
79 while (t) {
80 struct _TASK *o = t;
81 t = t->next;
82 free(o);
83 }
84 stw->head = 0;
85 stw->tail = 0;
86 stw->cnt = 0;
87 }
88 stw->shutdown = true;
89 pthread_cond_broadcast(&stw->cond);
90 pthread_mutex_unlock(&stw->mtx);
91 int rci = pthread_join(stw->thr, 0);
92 if (rci) {
93 iwrc rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
94 iwlog_ecode_error3(rc);
95 }
96 pthread_barrier_destroy(&stw->brr);
97 pthread_cond_destroy(&stw->cond);
98 pthread_mutex_destroy(&stw->mtx);
99 free(stw);
100 *stwp = 0;
101 }
102
iwstw_schedule(IWSTW stw,iwstw_task_f fn,void * arg)103 iwrc iwstw_schedule(IWSTW stw, iwstw_task_f fn, void *arg) {
104 if (!stw || !fn) {
105 return IW_ERROR_INVALID_ARGS;
106 }
107 iwrc rc = 0;
108 struct _TASK *task = malloc(sizeof(*task));
109 RCA(task, finish);
110 *task = (struct _TASK) {
111 .fn = fn,
112 .arg = arg
113 };
114 int rci = pthread_mutex_lock(&stw->mtx);
115 if (rci) {
116 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, errno);
117 goto finish;
118 }
119 if (stw->shutdown) {
120 rc = IW_ERROR_INVALID_STATE;
121 pthread_mutex_unlock(&stw->mtx);
122 goto finish;
123 }
124 if (stw->queue_limit && stw->cnt + 1 > stw->queue_limit) {
125 rc = IW_ERROR_OVERFLOW;
126 pthread_mutex_unlock(&stw->mtx);
127 goto finish;
128 }
129 if (stw->tail) {
130 stw->tail->next = task;
131 stw->tail = task;
132 } else {
133 stw->head = task;
134 stw->tail = task;
135 }
136 ++stw->cnt;
137 pthread_cond_broadcast(&stw->cond);
138 pthread_mutex_unlock(&stw->mtx);
139
140 finish:
141 if (rc) {
142 free(task);
143 }
144 return rc; // NOLINT (clang-analyzer-unix.Malloc)
145 }
146
iwstw_start(int queue_limit,IWSTW * stwp_out)147 iwrc iwstw_start(int queue_limit, IWSTW *stwp_out) {
148 struct _IWSTW *stw = malloc(sizeof(*stw));
149 if (!stw) {
150 *stwp_out = 0;
151 return iwrc_set_errno(IW_ERROR_ALLOC, errno);
152 }
153 int rci;
154 iwrc rc = 0;
155 *stw = (struct _IWSTW) {
156 .queue_limit = queue_limit,
157 .mtx = PTHREAD_MUTEX_INITIALIZER,
158 .cond = PTHREAD_COND_INITIALIZER
159 };
160 rci = pthread_barrier_init(&stw->brr, 0, 2);
161 if (rci) {
162 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, errno);
163 goto finish;
164 }
165 rci = pthread_create(&stw->thr, 0, worker_fn, stw);
166 if (rci) {
167 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, errno);
168 pthread_barrier_destroy(&stw->brr);
169 goto finish;
170 }
171 pthread_barrier_wait(&stw->brr);
172
173 finish:
174 if (rc) {
175 *stwp_out = 0;
176 free(stw);
177 } else {
178 *stwp_out = stw;
179 }
180 return 0;
181
182 }
183