1 #include "iwtp.h"
2 #include "iwth.h"
3 #include "iwp.h"
4 #include "iwlog.h"
5 #include "iwarr.h"
6 #include "iwp.h"
7
8 #include <stdlib.h>
9 #include <errno.h>
10 #include <assert.h>
11 #include <string.h>
12
13 struct task {
14 iwtp_task_f fn;
15 void *arg;
16 struct task *next;
17 };
18
19 struct iwtp {
20 struct task *head;
21 struct task *tail;
22 pthread_mutex_t mtx;
23 pthread_cond_t cond;
24 IWULIST threads;
25
26 char *thread_name_prefix;
27 int num_threads;
28 int num_threads_busy;
29 int overflow_threads_factor;
30 int queue_limit;
31 int queue_size;
32
33 bool warn_on_overflow_thread_spawn;
34 bool shutdown;
35 };
36
37 static void* _worker_fn(void *op);
38
iwtp_schedule(IWTP tp,iwtp_task_f fn,void * arg)39 iwrc iwtp_schedule(IWTP tp, iwtp_task_f fn, void *arg) {
40 if (!tp || !fn) {
41 return IW_ERROR_INVALID_ARGS;
42 }
43
44 iwrc rc = 0;
45 struct task *task = malloc(sizeof(*task));
46 RCA(task, finish);
47
48 *task = (struct task) {
49 .fn = fn,
50 .arg = arg
51 };
52
53 pthread_mutex_lock(&tp->mtx);
54 if (tp->queue_limit && (tp->queue_size + 1 > tp->queue_limit)) {
55 rc = IW_ERROR_OVERFLOW;
56 pthread_mutex_unlock(&tp->mtx);
57 iwlog_error("iwtp | Reached thread pool queue size limit: %d", tp->queue_limit);
58 goto finish;
59 }
60 if (tp->tail) {
61 tp->tail->next = task;
62 tp->tail = task;
63 } else {
64 tp->head = task;
65 tp->tail = task;
66 }
67 ++tp->queue_size;
68
69 if ( tp->queue_size > 1
70 && tp->num_threads_busy >= tp->num_threads
71 && iwulist_length(&tp->threads) < tp->num_threads * (1 + tp->overflow_threads_factor)) {
72 pthread_t th;
73 int rci = pthread_create(&th, 0, _worker_fn, tp);
74 if (rci) {
75 iwlog_ecode_error2(iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci), "iwtp | Failed to create and overflow thread");
76 }
77 }
78
79 pthread_cond_signal(&tp->cond);
80 pthread_mutex_unlock(&tp->mtx);
81
82 finish:
83 if (rc) {
84 free(task);
85 }
86 return rc;
87 }
88
_worker_fn(void * op)89 static void* _worker_fn(void *op) {
90 struct iwtp *tp = op;
91 assert(tp);
92
93 pthread_t st = pthread_self();
94
95 pthread_mutex_lock(&tp->mtx);
96 size_t idx = iwulist_length(&tp->threads);
97 if (iwulist_push(&tp->threads, &st)) {
98 pthread_mutex_unlock(&tp->mtx);
99 return 0;
100 }
101 pthread_mutex_unlock(&tp->mtx);
102
103 if (tp->thread_name_prefix) {
104 char nbuf[64];
105 if (idx >= tp->num_threads) {
106 snprintf(nbuf, sizeof(nbuf), "%s%zd+", tp->thread_name_prefix, idx);
107 if (tp->warn_on_overflow_thread_spawn) {
108 iwlog_warn("iwtp | Overflow thread spawned: %s%zd+",
109 tp->thread_name_prefix ? tp->thread_name_prefix : "", idx);
110 }
111 } else {
112 snprintf(nbuf, sizeof(nbuf), "%s%zd", tp->thread_name_prefix, idx);
113 }
114 iwp_set_current_thread_name(nbuf);
115 }
116
117 while (true) {
118 void *arg;
119 iwtp_task_f fn = 0;
120
121 pthread_mutex_lock(&tp->mtx);
122 ++tp->num_threads_busy;
123 if (tp->head) {
124 struct task *h = tp->head;
125 fn = h->fn;
126 arg = h->arg;
127 tp->head = h->next;
128 if (tp->head == 0) {
129 tp->tail = 0;
130 }
131 --tp->queue_size;
132 free(h);
133 }
134 pthread_mutex_unlock(&tp->mtx);
135
136 if (fn) {
137 fn(arg);
138 }
139
140 pthread_mutex_lock(&tp->mtx);
141 --tp->num_threads_busy;
142
143 if (idx >= tp->num_threads) {
144 // Overflow thread will be terminated immediately.
145 if (!tp->shutdown) {
146 iwulist_remove_first_by(&tp->threads, &st);
147 pthread_detach(st);
148 }
149 pthread_mutex_unlock(&tp->mtx);
150 break;
151 }
152
153 if (tp->head) {
154 pthread_mutex_unlock(&tp->mtx);
155 continue;
156 } else if (tp->shutdown) {
157 pthread_mutex_unlock(&tp->mtx);
158 break;
159 }
160
161 pthread_cond_wait(&tp->cond, &tp->mtx);
162 pthread_mutex_unlock(&tp->mtx);
163 }
164
165 return 0;
166 }
167
iwtp_start_by_spec(const struct iwtp_spec * spec,IWTP * out_tp)168 iwrc iwtp_start_by_spec(const struct iwtp_spec *spec, IWTP *out_tp) {
169 iwrc rc = 0;
170 if (!spec || !out_tp) {
171 return IW_ERROR_INVALID_ARGS;
172 }
173 if (spec->thread_name_prefix && strlen(spec->thread_name_prefix) > 15) {
174 return IW_ERROR_INVALID_ARGS;
175 }
176
177 int num_threads = spec->num_threads;
178 if (num_threads < 1) {
179 num_threads = iwp_num_cpu_cores();
180 } else if (num_threads > 1023) {
181 num_threads = 1024;
182 }
183
184 int queue_limit = spec->queue_limit;
185 if (queue_limit < 1) {
186 queue_limit = 0;
187 }
188
189 int overflow_threads_factor = spec->overflow_threads_factor;
190 if (overflow_threads_factor > 2) {
191 overflow_threads_factor = 2;
192 }
193
194 struct iwtp *tp = malloc(sizeof(*tp));
195 if (!tp) {
196 rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
197 goto finish;
198 }
199
200 *tp = (struct iwtp) {
201 .warn_on_overflow_thread_spawn = spec->warn_on_overflow_thread_spawn,
202 .overflow_threads_factor = overflow_threads_factor,
203 .num_threads = num_threads,
204 .queue_limit = queue_limit,
205 .mtx = PTHREAD_MUTEX_INITIALIZER,
206 .cond = PTHREAD_COND_INITIALIZER
207 };
208
209 if (spec->thread_name_prefix) {
210 tp->thread_name_prefix = strdup(spec->thread_name_prefix);
211 }
212
213 RCC(rc, finish, iwulist_init(&tp->threads, num_threads, sizeof(pthread_t)));
214
215 for (size_t i = 0; i < num_threads; ++i) {
216 pthread_t th;
217 int rci = pthread_create(&th, 0, _worker_fn, tp);
218 if (rci) {
219 rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
220 iwlog_ecode_error3(rc);
221 goto finish;
222 }
223 }
224
225 finish:
226 if (IW_UNLIKELY(rc)) {
227 *out_tp = 0;
228 iwtp_shutdown(&tp, false);
229 } else {
230 *out_tp = tp;
231 }
232 return rc;
233 }
234
iwtp_start(const char * thread_name_prefix,int num_threads,int queue_limit,IWTP * out_tp)235 iwrc iwtp_start(const char *thread_name_prefix, int num_threads, int queue_limit, IWTP *out_tp) {
236 return iwtp_start_by_spec(&(struct iwtp_spec) {
237 .thread_name_prefix = thread_name_prefix,
238 .num_threads = num_threads,
239 .queue_limit = queue_limit
240 }, out_tp);
241 }
242
iwtp_shutdown(IWTP * tpp,bool wait_for_all)243 iwrc iwtp_shutdown(IWTP *tpp, bool wait_for_all) {
244 if (!tpp || !*tpp) {
245 return 0;
246 }
247 IWTP tp = *tpp;
248 IWULIST *joinlist = 0;
249
250 pthread_mutex_lock(&tp->mtx);
251 pthread_t st = pthread_self();
252 if (iwulist_find_first(&tp->threads, &st) != -1) {
253 pthread_mutex_unlock(&tp->mtx);
254 iwlog_error("iwtp | Calling iwtp_shutdown() from one of managed thread: %lu", (unsigned long) st);
255 return IW_ERROR_ASSERTION;
256 }
257
258 if (tp->shutdown) {
259 pthread_mutex_unlock(&tp->mtx);
260 return 0;
261 }
262 *tpp = 0;
263 tp->shutdown = true;
264
265 if (!wait_for_all) {
266 struct task *t = tp->head;
267 while (t) {
268 struct task *o = t;
269 t = t->next;
270 free(o);
271 }
272 tp->head = 0;
273 tp->tail = 0;
274 tp->queue_size = 0;
275 }
276 joinlist = iwulist_clone(&tp->threads);
277 pthread_cond_broadcast(&tp->cond);
278 pthread_mutex_unlock(&tp->mtx);
279
280 for (size_t i = 0, l = iwulist_length(joinlist); i < l; ++i) {
281 pthread_t t = *(pthread_t*) iwulist_at2(joinlist, i);
282 pthread_join(t, 0);
283 }
284
285 pthread_cond_destroy(&tp->cond);
286 pthread_mutex_destroy(&tp->mtx);
287 iwulist_destroy_keep(&tp->threads);
288 iwulist_destroy(&joinlist);
289 free(tp->thread_name_prefix);
290 free(tp);
291 return 0;
292 }
293
iwtp_queue_size(IWTP tp)294 int iwtp_queue_size(IWTP tp) {
295 int res = 0;
296 pthread_mutex_lock(&tp->mtx);
297 res = tp->queue_size;
298 pthread_mutex_unlock(&tp->mtx);
299 return res;
300 }
301