• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #include "iwstw.h"
2 #include "iwth.h"
3 #include "iwlog.h"
4 #include <stdlib.h>
5 #include <errno.h>
6 #include <assert.h>
7 
8 struct _TASK {
9   iwstw_task_f fn;
10   void *arg;
11   struct _TASK *next;
12 };
13 
14 struct _IWSTW {
15   struct _TASK *head;
16   struct _TASK *tail;
17   pthread_mutex_t mtx;
18   pthread_barrier_t brr;
19   pthread_cond_t cond;
20   pthread_t thr;
21   int cnt;
22   int queue_limit;
23   volatile bool shutdown;
24 };
25 
worker_fn(void * op)26 void *worker_fn(void *op) {
27   struct _IWSTW *stw = op;
28   assert(stw);
29   pthread_barrier_wait(&stw->brr);
30 
31   while (true) {
32     void *arg;
33     iwstw_task_f fn = 0;
34 
35     pthread_mutex_lock(&stw->mtx);
36     if (stw->head) {
37       struct _TASK *h = stw->head;
38       fn = h->fn;
39       arg = h->arg;
40       stw->head = h->next;
41       if (stw->tail == h) {
42         stw->tail = stw->head;
43       }
44       --stw->cnt;
45       free(h);
46     }
47     pthread_mutex_unlock(&stw->mtx);
48 
49     if (fn) {
50       fn(arg);
51     }
52 
53     pthread_mutex_lock(&stw->mtx);
54     if (stw->head) {
55       pthread_mutex_unlock(&stw->mtx);
56       continue;
57     } else if (stw->shutdown) {
58       pthread_mutex_unlock(&stw->mtx);
59       break;
60     }
61     pthread_cond_wait(&stw->cond, &stw->mtx);
62     pthread_mutex_unlock(&stw->mtx);
63   }
64   return 0;
65 }
66 
iwstw_shutdown(IWSTW * stwp,bool wait_for_all)67 void iwstw_shutdown(IWSTW *stwp, bool wait_for_all) {
68   if (!stwp || !*stwp) {
69     return;
70   }
71   IWSTW stw = *stwp;
72   pthread_mutex_lock(&stw->mtx);
73   if (stw->shutdown) {
74     pthread_mutex_unlock(&stw->mtx);
75     return;
76   }
77   if (!wait_for_all) {
78     struct _TASK *t = stw->head;
79     while (t) {
80       struct _TASK *o = t;
81       t = t->next;
82       free(o);
83     }
84     stw->head = 0;
85     stw->tail = 0;
86     stw->cnt = 0;
87   }
88   stw->shutdown = true;
89   pthread_cond_broadcast(&stw->cond);
90   pthread_mutex_unlock(&stw->mtx);
91   int rci = pthread_join(stw->thr, 0);
92   if (rci) {
93     iwrc rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, rci);
94     iwlog_ecode_error3(rc);
95   }
96   pthread_barrier_destroy(&stw->brr);
97   pthread_cond_destroy(&stw->cond);
98   pthread_mutex_destroy(&stw->mtx);
99   free(stw);
100   *stwp = 0;
101 }
102 
iwstw_schedule(IWSTW stw,iwstw_task_f fn,void * arg)103 iwrc iwstw_schedule(IWSTW stw, iwstw_task_f fn, void *arg) {
104   if (!stw || !fn) {
105     return IW_ERROR_INVALID_ARGS;
106   }
107   iwrc rc = 0;
108   struct _TASK *task = malloc(sizeof(*task));
109   RCA(task, finish);
110   *task = (struct _TASK) {
111     .fn = fn,
112     .arg = arg
113   };
114   int rci = pthread_mutex_lock(&stw->mtx);
115   if (rci) {
116     rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, errno);
117     goto finish;
118   }
119   if (stw->shutdown) {
120     rc = IW_ERROR_INVALID_STATE;
121     pthread_mutex_unlock(&stw->mtx);
122     goto finish;
123   }
124   if (stw->queue_limit && stw->cnt + 1 > stw->queue_limit) {
125     rc = IW_ERROR_OVERFLOW;
126     pthread_mutex_unlock(&stw->mtx);
127     goto finish;
128   }
129   if (stw->tail) {
130     stw->tail->next = task;
131     stw->tail = task;
132   } else {
133     stw->head = task;
134     stw->tail = task;
135   }
136   ++stw->cnt;
137   pthread_cond_broadcast(&stw->cond);
138   pthread_mutex_unlock(&stw->mtx);
139 
140 finish:
141   if (rc) {
142     free(task);
143   }
144   return rc; // NOLINT (clang-analyzer-unix.Malloc)
145 }
146 
iwstw_start(int queue_limit,IWSTW * stwp_out)147 iwrc iwstw_start(int queue_limit, IWSTW *stwp_out) {
148   struct _IWSTW *stw = malloc(sizeof(*stw));
149   if (!stw) {
150     *stwp_out = 0;
151     return iwrc_set_errno(IW_ERROR_ALLOC, errno);
152   }
153   int rci;
154   iwrc rc = 0;
155   *stw = (struct _IWSTW) {
156     .queue_limit = queue_limit,
157     .mtx = PTHREAD_MUTEX_INITIALIZER,
158     .cond = PTHREAD_COND_INITIALIZER
159   };
160   rci = pthread_barrier_init(&stw->brr, 0, 2);
161   if (rci) {
162     rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, errno);
163     goto finish;
164   }
165   rci = pthread_create(&stw->thr, 0, worker_fn, stw);
166   if (rci) {
167     rc = iwrc_set_errno(IW_ERROR_THREADING_ERRNO, errno);
168     pthread_barrier_destroy(&stw->brr);
169     goto finish;
170   }
171   pthread_barrier_wait(&stw->brr);
172 
173 finish:
174   if (rc) {
175     *stwp_out = 0;
176     free(stw);
177   } else {
178     *stwp_out = stw;
179   }
180   return 0;
181 
182 }
183