• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "iwtp.h"
2 #include "iwth.h"
3 #include "iwp.h"
4 #include "iwlog.h"
5 #include "iwarr.h"
6 #include "iwp.h"
7 
8 #include <stdlib.h>
9 #include <errno.h>
10 #include <assert.h>
11 #include <string.h>
12 
13 struct task {
14   iwtp_task_f fn;
15   void       *arg;
16   struct task *next;
17 };
18 
19 struct iwtp {
20   struct task    *head;
21   struct task    *tail;
22   pthread_mutex_t mtx;
23   pthread_cond_t  cond;
24   IWULIST threads;
25 
26   char *thread_name_prefix;
27   int   num_threads;
28   int   num_threads_busy;
29   int   overflow_threads_factor;
30   int   queue_limit;
31   int   queue_size;
32 
33   bool warn_on_overflow_thread_spawn;
34   bool shutdown;
35 };
36 
37 static void* _worker_fn(void *op);
38 
iwtp_schedule(IWTP tp,iwtp_task_f fn,void * arg)39 iwrc iwtp_schedule(IWTP tp, iwtp_task_f fn, void *arg) {
40   if (!tp || !fn) {
41     return IW_ERROR_INVALID_ARGS;
42   }
43 
44   iwrc rc = 0;
45   struct task *task = malloc(sizeof(*task));
46   RCA(task, finish);
47 
48   *task = (struct task) {
49     .fn = fn,
50     .arg = arg
51   };
52 
53   pthread_mutex_lock(&tp->mtx);
54   if (tp->queue_limit && (tp->queue_size + 1 > tp->queue_limit)) {
55     rc = IW_ERROR_OVERFLOW;
56     pthread_mutex_unlock(&tp->mtx);
57     iwlog_error("iwtp | Reached  thread pool queue size limit: %d", tp->queue_limit);
58     goto finish;
59   }
60   if (tp->tail) {
61     tp->tail->next = task;
62     tp->tail = task;
63   } else {
64     tp->head = task;
65     tp->tail = task;
66   }
67   ++tp->queue_size;
68 
69   if (  tp->queue_size > 1
70      && tp->num_threads_busy >= tp->num_threads
71      && iwulist_length(&tp->threads) < tp->num_threads * (1 + tp->overflow_threads_factor)) {
72     pthread_t th;
73     int rci = pthread_create(&th, 0, _worker_fn, tp);
74     if (rci) {
75       iwlog_ecode_error2(iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci), "iwtp | Failed to create and overflow thread");
76     }
77   }
78 
79   pthread_cond_signal(&tp->cond);
80   pthread_mutex_unlock(&tp->mtx);
81 
82 finish:
83   if (rc) {
84     free(task);
85   }
86   return rc;
87 }
88 
_worker_fn(void * op)89 static void* _worker_fn(void *op) {
90   struct iwtp *tp = op;
91   assert(tp);
92 
93   pthread_t st = pthread_self();
94 
95   pthread_mutex_lock(&tp->mtx);
96   size_t idx = iwulist_length(&tp->threads);
97   if (iwulist_push(&tp->threads, &st)) {
98     pthread_mutex_unlock(&tp->mtx);
99     return 0;
100   }
101   pthread_mutex_unlock(&tp->mtx);
102 
103   if (tp->thread_name_prefix) {
104     char nbuf[64];
105     if (idx >= tp->num_threads) {
106       snprintf(nbuf, sizeof(nbuf), "%s%zd+", tp->thread_name_prefix, idx);
107       if (tp->warn_on_overflow_thread_spawn) {
108         iwlog_warn("iwtp | Overflow thread spawned: %s%zd+",
109                    tp->thread_name_prefix ? tp->thread_name_prefix : "", idx);
110       }
111     } else {
112       snprintf(nbuf, sizeof(nbuf), "%s%zd", tp->thread_name_prefix, idx);
113     }
114     iwp_set_current_thread_name(nbuf);
115   }
116 
117   while (true) {
118     void *arg;
119     iwtp_task_f fn = 0;
120 
121     pthread_mutex_lock(&tp->mtx);
122     ++tp->num_threads_busy;
123     if (tp->head) {
124       struct task *h = tp->head;
125       fn = h->fn;
126       arg = h->arg;
127       tp->head = h->next;
128       if (tp->head == 0) {
129         tp->tail = 0;
130       }
131       --tp->queue_size;
132       free(h);
133     }
134     pthread_mutex_unlock(&tp->mtx);
135 
136     if (fn) {
137       fn(arg);
138     }
139 
140     pthread_mutex_lock(&tp->mtx);
141     --tp->num_threads_busy;
142 
143     if (idx >= tp->num_threads) {
144       // Overflow thread will be terminated immediately.
145       if (!tp->shutdown) {
146         iwulist_remove_first_by(&tp->threads, &st);
147         pthread_detach(st);
148       }
149       pthread_mutex_unlock(&tp->mtx);
150       break;
151     }
152 
153     if (tp->head) {
154       pthread_mutex_unlock(&tp->mtx);
155       continue;
156     } else if (tp->shutdown) {
157       pthread_mutex_unlock(&tp->mtx);
158       break;
159     }
160 
161     pthread_cond_wait(&tp->cond, &tp->mtx);
162     pthread_mutex_unlock(&tp->mtx);
163   }
164 
165   return 0;
166 }
167 
iwtp_start_by_spec(const struct iwtp_spec * spec,IWTP * out_tp)168 iwrc iwtp_start_by_spec(const struct iwtp_spec *spec, IWTP *out_tp) {
169   iwrc rc = 0;
170   if (!spec || !out_tp) {
171     return IW_ERROR_INVALID_ARGS;
172   }
173   if (spec->thread_name_prefix && strlen(spec->thread_name_prefix) > 15) {
174     return IW_ERROR_INVALID_ARGS;
175   }
176 
177   int num_threads = spec->num_threads;
178   if (num_threads < 1) {
179     num_threads = iwp_num_cpu_cores();
180   } else if (num_threads > 1023) {
181     num_threads = 1024;
182   }
183 
184   int queue_limit = spec->queue_limit;
185   if (queue_limit < 1) {
186     queue_limit = 0;
187   }
188 
189   int overflow_threads_factor = spec->overflow_threads_factor;
190   if (overflow_threads_factor > 2) {
191     overflow_threads_factor = 2;
192   }
193 
194   struct iwtp *tp = malloc(sizeof(*tp));
195   if (!tp) {
196     rc = iwrc_set_errno(IW_ERROR_ALLOC, errno);
197     goto finish;
198   }
199 
200   *tp = (struct iwtp) {
201     .warn_on_overflow_thread_spawn = spec->warn_on_overflow_thread_spawn,
202     .overflow_threads_factor = overflow_threads_factor,
203     .num_threads = num_threads,
204     .queue_limit = queue_limit,
205     .mtx = PTHREAD_MUTEX_INITIALIZER,
206     .cond = PTHREAD_COND_INITIALIZER
207   };
208 
209   if (spec->thread_name_prefix) {
210     tp->thread_name_prefix = strdup(spec->thread_name_prefix);
211   }
212 
213   RCC(rc, finish, iwulist_init(&tp->threads, num_threads, sizeof(pthread_t)));
214 
215   for (size_t i = 0; i < num_threads; ++i) {
216     pthread_t th;
217     int rci = pthread_create(&th, 0, _worker_fn, tp);
218     if (rci) {
219       rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
220       iwlog_ecode_error3(rc);
221       goto finish;
222     }
223   }
224 
225 finish:
226   if (IW_UNLIKELY(rc)) {
227     *out_tp = 0;
228     iwtp_shutdown(&tp, false);
229   } else {
230     *out_tp = tp;
231   }
232   return rc;
233 }
234 
iwtp_start(const char * thread_name_prefix,int num_threads,int queue_limit,IWTP * out_tp)235 iwrc iwtp_start(const char *thread_name_prefix, int num_threads, int queue_limit, IWTP *out_tp) {
236   return iwtp_start_by_spec(&(struct iwtp_spec) {
237     .thread_name_prefix = thread_name_prefix,
238     .num_threads = num_threads,
239     .queue_limit = queue_limit
240   }, out_tp);
241 }
242 
iwtp_shutdown(IWTP * tpp,bool wait_for_all)243 iwrc iwtp_shutdown(IWTP *tpp, bool wait_for_all) {
244   if (!tpp || !*tpp) {
245     return 0;
246   }
247   IWTP tp = *tpp;
248   IWULIST *joinlist = 0;
249 
250   pthread_mutex_lock(&tp->mtx);
251   pthread_t st = pthread_self();
252   if (iwulist_find_first(&tp->threads, &st) != -1) {
253     pthread_mutex_unlock(&tp->mtx);
254     iwlog_error("iwtp | Calling iwtp_shutdown() from one of managed thread: %lu", (unsigned long) st);
255     return IW_ERROR_ASSERTION;
256   }
257 
258   if (tp->shutdown) {
259     pthread_mutex_unlock(&tp->mtx);
260     return 0;
261   }
262   *tpp = 0;
263   tp->shutdown = true;
264 
265   if (!wait_for_all) {
266     struct task *t = tp->head;
267     while (t) {
268       struct task *o = t;
269       t = t->next;
270       free(o);
271     }
272     tp->head = 0;
273     tp->tail = 0;
274     tp->queue_size = 0;
275   }
276   joinlist = iwulist_clone(&tp->threads);
277   pthread_cond_broadcast(&tp->cond);
278   pthread_mutex_unlock(&tp->mtx);
279 
280   for (size_t i = 0, l = iwulist_length(joinlist); i < l; ++i) {
281     pthread_t t = *(pthread_t*) iwulist_at2(joinlist, i);
282     pthread_join(t, 0);
283   }
284 
285   pthread_cond_destroy(&tp->cond);
286   pthread_mutex_destroy(&tp->mtx);
287   iwulist_destroy_keep(&tp->threads);
288   iwulist_destroy(&joinlist);
289   free(tp->thread_name_prefix);
290   free(tp);
291   return 0;
292 }
293 
iwtp_queue_size(IWTP tp)294 int iwtp_queue_size(IWTP tp) {
295   int res = 0;
296   pthread_mutex_lock(&tp->mtx);
297   res = tp->queue_size;
298   pthread_mutex_unlock(&tp->mtx);
299   return res;
300 }
301