1 /* Standard C headers */
2 #include <assert.h>
3 #include <limits.h>
4 #include <stdbool.h>
5 #include <stdint.h>
6 #include <stdlib.h>
7 #include <string.h>
8
9 /* Configuration header */
10 #include "threadpool-common.h"
11
12 /* POSIX headers */
13 #include <pthread.h>
14 #include <unistd.h>
15
16 /* Futex-specific headers */
17 #if PTHREADPOOL_USE_FUTEX
18 #if defined(__linux__)
19 #include <sys/syscall.h>
20 #include <linux/futex.h>
21
22 /* Old Android NDKs do not define SYS_futex and FUTEX_PRIVATE_FLAG */
23 #ifndef SYS_futex
24 #define SYS_futex __NR_futex
25 #endif
26 #ifndef FUTEX_PRIVATE_FLAG
27 #define FUTEX_PRIVATE_FLAG 128
28 #endif
29 #elif defined(__EMSCRIPTEN__)
30 /* math.h for INFINITY constant */
31 #include <math.h>
32
33 #include <emscripten/threading.h>
34 #else
35 #error "Platform-specific implementation of futex_wait and futex_wake_all required"
36 #endif
37 #endif
38
39 /* Windows-specific headers */
40 #ifdef _WIN32
41 #include <sysinfoapi.h>
42 #endif
43
44 /* Dependencies */
45 #if PTHREADPOOL_USE_CPUINFO
46 #include <cpuinfo.h>
47 #endif
48
49 /* Public library header */
50 #include <pthreadpool.h>
51
52 /* Internal library headers */
53 #include "threadpool-atomics.h"
54 #include "threadpool-object.h"
55 #include "threadpool-utils.h"
56
57
58 #if PTHREADPOOL_USE_FUTEX
59 #if defined(__linux__)
futex_wait(pthreadpool_atomic_uint32_t * address,uint32_t value)60 static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) {
61 return syscall(SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, value, NULL);
62 }
63
futex_wake_all(pthreadpool_atomic_uint32_t * address)64 static int futex_wake_all(pthreadpool_atomic_uint32_t* address) {
65 return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX);
66 }
67 #elif defined(__EMSCRIPTEN__)
futex_wait(pthreadpool_atomic_uint32_t * address,uint32_t value)68 static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) {
69 return emscripten_futex_wait((volatile void*) address, value, INFINITY);
70 }
71
futex_wake_all(pthreadpool_atomic_uint32_t * address)72 static int futex_wake_all(pthreadpool_atomic_uint32_t* address) {
73 return emscripten_futex_wake((volatile void*) address, INT_MAX);
74 }
75 #else
76 #error "Platform-specific implementation of futex_wait and futex_wake_all required"
77 #endif
78 #endif
79
checkin_worker_thread(struct pthreadpool * threadpool)80 static void checkin_worker_thread(struct pthreadpool* threadpool) {
81 #if PTHREADPOOL_USE_FUTEX
82 if (pthreadpool_decrement_fetch_relaxed_size_t(&threadpool->active_threads) == 0) {
83 pthreadpool_store_release_uint32_t(&threadpool->has_active_threads, 0);
84 futex_wake_all(&threadpool->has_active_threads);
85 }
86 #else
87 pthread_mutex_lock(&threadpool->completion_mutex);
88 if (pthreadpool_decrement_fetch_release_size_t(&threadpool->active_threads) == 0) {
89 pthread_cond_signal(&threadpool->completion_condvar);
90 }
91 pthread_mutex_unlock(&threadpool->completion_mutex);
92 #endif
93 }
94
wait_worker_threads(struct pthreadpool * threadpool)95 static void wait_worker_threads(struct pthreadpool* threadpool) {
96 /* Initial check */
97 #if PTHREADPOOL_USE_FUTEX
98 uint32_t has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads);
99 if (has_active_threads == 0) {
100 return;
101 }
102 #else
103 size_t active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
104 if (active_threads == 0) {
105 return;
106 }
107 #endif
108
109 /* Spin-wait */
110 for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
111 pthreadpool_yield();
112
113 #if PTHREADPOOL_USE_FUTEX
114 has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads);
115 if (has_active_threads == 0) {
116 return;
117 }
118 #else
119 active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
120 if (active_threads == 0) {
121 return;
122 }
123 #endif
124 }
125
126 /* Fall-back to mutex/futex wait */
127 #if PTHREADPOOL_USE_FUTEX
128 while ((has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads)) != 0) {
129 futex_wait(&threadpool->has_active_threads, 1);
130 }
131 #else
132 pthread_mutex_lock(&threadpool->completion_mutex);
133 while (pthreadpool_load_acquire_size_t(&threadpool->active_threads) != 0) {
134 pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex);
135 };
136 pthread_mutex_unlock(&threadpool->completion_mutex);
137 #endif
138 }
139
wait_for_new_command(struct pthreadpool * threadpool,uint32_t last_command,uint32_t last_flags)140 static uint32_t wait_for_new_command(
141 struct pthreadpool* threadpool,
142 uint32_t last_command,
143 uint32_t last_flags)
144 {
145 uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
146 if (command != last_command) {
147 return command;
148 }
149
150 if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) {
151 /* Spin-wait loop */
152 for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
153 pthreadpool_yield();
154
155 command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
156 if (command != last_command) {
157 return command;
158 }
159 }
160 }
161
162 /* Spin-wait disabled or timed out, fall back to mutex/futex wait */
163 #if PTHREADPOOL_USE_FUTEX
164 do {
165 futex_wait(&threadpool->command, last_command);
166 command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
167 } while (command == last_command);
168 #else
169 /* Lock the command mutex */
170 pthread_mutex_lock(&threadpool->command_mutex);
171 /* Read the command */
172 while ((command = pthreadpool_load_acquire_uint32_t(&threadpool->command)) == last_command) {
173 /* Wait for new command */
174 pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex);
175 }
176 /* Read a new command */
177 pthread_mutex_unlock(&threadpool->command_mutex);
178 #endif
179 return command;
180 }
181
thread_main(void * arg)182 static void* thread_main(void* arg) {
183 struct thread_info* thread = (struct thread_info*) arg;
184 struct pthreadpool* threadpool = thread->threadpool;
185 uint32_t last_command = threadpool_command_init;
186 struct fpu_state saved_fpu_state = { 0 };
187 uint32_t flags = 0;
188
189 /* Check in */
190 checkin_worker_thread(threadpool);
191
192 /* Monitor new commands and act accordingly */
193 for (;;) {
194 uint32_t command = wait_for_new_command(threadpool, last_command, flags);
195 pthreadpool_fence_acquire();
196
197 flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags);
198
199 /* Process command */
200 switch (command & THREADPOOL_COMMAND_MASK) {
201 case threadpool_command_parallelize:
202 {
203 const thread_function_t thread_function =
204 (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function);
205 if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
206 saved_fpu_state = get_fpu_state();
207 disable_fpu_denormals();
208 }
209
210 thread_function(threadpool, thread);
211 if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
212 set_fpu_state(saved_fpu_state);
213 }
214 break;
215 }
216 case threadpool_command_shutdown:
217 /* Exit immediately: the master thread is waiting on pthread_join */
218 return NULL;
219 case threadpool_command_init:
220 /* To inhibit compiler warning */
221 break;
222 }
223 /* Notify the master thread that we finished processing */
224 checkin_worker_thread(threadpool);
225 /* Update last command */
226 last_command = command;
227 };
228 }
229
pthreadpool_create(size_t threads_count)230 struct pthreadpool* pthreadpool_create(size_t threads_count) {
231 #if PTHREADPOOL_USE_CPUINFO
232 if (!cpuinfo_initialize()) {
233 return NULL;
234 }
235 #endif
236
237 if (threads_count == 0) {
238 #if PTHREADPOOL_USE_CPUINFO
239 threads_count = cpuinfo_get_processors_count();
240 #elif defined(_SC_NPROCESSORS_ONLN)
241 threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN);
242 #if defined(__EMSCRIPTEN_PTHREADS__)
243 /* Limit the number of threads to 8 to match link-time PTHREAD_POOL_SIZE option */
244 if (threads_count >= 8) {
245 threads_count = 8;
246 }
247 #endif
248 #elif defined(_WIN32)
249 SYSTEM_INFO system_info;
250 ZeroMemory(&system_info, sizeof(system_info));
251 GetSystemInfo(&system_info);
252 threads_count = (size_t) system_info.dwNumberOfProcessors;
253 #else
254 #error "Platform-specific implementation of sysconf(_SC_NPROCESSORS_ONLN) required"
255 #endif
256 }
257
258 struct pthreadpool* threadpool = pthreadpool_allocate(threads_count);
259 if (threadpool == NULL) {
260 return NULL;
261 }
262 threadpool->threads_count = fxdiv_init_size_t(threads_count);
263 for (size_t tid = 0; tid < threads_count; tid++) {
264 threadpool->threads[tid].thread_number = tid;
265 threadpool->threads[tid].threadpool = threadpool;
266 }
267
268 /* Thread pool with a single thread computes everything on the caller thread. */
269 if (threads_count > 1) {
270 pthread_mutex_init(&threadpool->execution_mutex, NULL);
271 #if !PTHREADPOOL_USE_FUTEX
272 pthread_mutex_init(&threadpool->completion_mutex, NULL);
273 pthread_cond_init(&threadpool->completion_condvar, NULL);
274 pthread_mutex_init(&threadpool->command_mutex, NULL);
275 pthread_cond_init(&threadpool->command_condvar, NULL);
276 #endif
277
278 #if PTHREADPOOL_USE_FUTEX
279 pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
280 #endif
281 pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
282
283 /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */
284 for (size_t tid = 1; tid < threads_count; tid++) {
285 pthread_create(&threadpool->threads[tid].thread_object, NULL, &thread_main, &threadpool->threads[tid]);
286 }
287
288 /* Wait until all threads initialize */
289 wait_worker_threads(threadpool);
290 }
291 return threadpool;
292 }
293
pthreadpool_parallelize(struct pthreadpool * threadpool,thread_function_t thread_function,const void * params,size_t params_size,void * task,void * context,size_t linear_range,uint32_t flags)294 PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
295 struct pthreadpool* threadpool,
296 thread_function_t thread_function,
297 const void* params,
298 size_t params_size,
299 void* task,
300 void* context,
301 size_t linear_range,
302 uint32_t flags)
303 {
304 assert(threadpool != NULL);
305 assert(thread_function != NULL);
306 assert(task != NULL);
307 assert(linear_range > 1);
308
309 /* Protect the global threadpool structures */
310 pthread_mutex_lock(&threadpool->execution_mutex);
311
312 #if !PTHREADPOOL_USE_FUTEX
313 /* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */
314 pthread_mutex_lock(&threadpool->command_mutex);
315 #endif
316
317 /* Setup global arguments */
318 pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function);
319 pthreadpool_store_relaxed_void_p(&threadpool->task, task);
320 pthreadpool_store_relaxed_void_p(&threadpool->argument, context);
321 pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);
322
323 /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
324 const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
325 pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count.value - 1 /* caller thread */);
326 #if PTHREADPOOL_USE_FUTEX
327 pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
328 #endif
329
330 if (params_size != 0) {
331 memcpy(&threadpool->params, params, params_size);
332 pthreadpool_fence_release();
333 }
334
335 /* Spread the work between threads */
336 const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, threads_count);
337 size_t range_start = 0;
338 for (size_t tid = 0; tid < threads_count.value; tid++) {
339 struct thread_info* thread = &threadpool->threads[tid];
340 const size_t range_length = range_params.quotient + (size_t) (tid < range_params.remainder);
341 const size_t range_end = range_start + range_length;
342 pthreadpool_store_relaxed_size_t(&thread->range_start, range_start);
343 pthreadpool_store_relaxed_size_t(&thread->range_end, range_end);
344 pthreadpool_store_relaxed_size_t(&thread->range_length, range_length);
345
346 /* The next subrange starts where the previous ended */
347 range_start = range_end;
348 }
349
350 /*
351 * Update the threadpool command.
352 * Imporantly, do it after initializing command parameters (range, task, argument, flags)
353 * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask
354 * to ensure the unmasked command is different then the last command, because worker threads
355 * monitor for change in the unmasked command.
356 */
357 const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
358 const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize;
359
360 /*
361 * Store the command with release semantics to guarantee that if a worker thread observes
362 * the new command value, it also observes the updated command parameters.
363 *
364 * Note: release semantics is necessary even with a conditional variable, because the workers might
365 * be waiting in a spin-loop rather than the conditional variable.
366 */
367 pthreadpool_store_release_uint32_t(&threadpool->command, new_command);
368 #if PTHREADPOOL_USE_FUTEX
369 /* Wake up the threads */
370 futex_wake_all(&threadpool->command);
371 #else
372 /* Unlock the command variables before waking up the threads for better performance */
373 pthread_mutex_unlock(&threadpool->command_mutex);
374
375 /* Wake up the threads */
376 pthread_cond_broadcast(&threadpool->command_condvar);
377 #endif
378
379 /* Save and modify FPU denormals control, if needed */
380 struct fpu_state saved_fpu_state = { 0 };
381 if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
382 saved_fpu_state = get_fpu_state();
383 disable_fpu_denormals();
384 }
385
386 /* Do computations as worker #0 */
387 thread_function(threadpool, &threadpool->threads[0]);
388
389 /* Restore FPU denormals control, if needed */
390 if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
391 set_fpu_state(saved_fpu_state);
392 }
393
394 /* Wait until the threads finish computation */
395 wait_worker_threads(threadpool);
396
397 /* Make changes by other threads visible to this thread */
398 pthreadpool_fence_acquire();
399
400 /* Unprotect the global threadpool structures */
401 pthread_mutex_unlock(&threadpool->execution_mutex);
402 }
403
pthreadpool_destroy(struct pthreadpool * threadpool)404 void pthreadpool_destroy(struct pthreadpool* threadpool) {
405 if (threadpool != NULL) {
406 const size_t threads_count = threadpool->threads_count.value;
407 if (threads_count > 1) {
408 #if PTHREADPOOL_USE_FUTEX
409 pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
410 pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
411
412 /*
413 * Store the command with release semantics to guarantee that if a worker thread observes
414 * the new command value, it also observes the updated active_threads/has_active_threads values.
415 */
416 pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown);
417
418 /* Wake up worker threads */
419 futex_wake_all(&threadpool->command);
420 #else
421 /* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */
422 pthread_mutex_lock(&threadpool->command_mutex);
423
424 pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
425
426 /*
427 * Store the command with release semantics to guarantee that if a worker thread observes
428 * the new command value, it also observes the updated active_threads value.
429 *
430 * Note: the release fence inside pthread_mutex_unlock is insufficient,
431 * because the workers might be waiting in a spin-loop rather than the conditional variable.
432 */
433 pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown);
434
435 /* Wake up worker threads */
436 pthread_cond_broadcast(&threadpool->command_condvar);
437
438 /* Commit the state changes and let workers start processing */
439 pthread_mutex_unlock(&threadpool->command_mutex);
440 #endif
441
442 /* Wait until all threads return */
443 for (size_t thread = 1; thread < threads_count; thread++) {
444 pthread_join(threadpool->threads[thread].thread_object, NULL);
445 }
446
447 /* Release resources */
448 pthread_mutex_destroy(&threadpool->execution_mutex);
449 #if !PTHREADPOOL_USE_FUTEX
450 pthread_mutex_destroy(&threadpool->completion_mutex);
451 pthread_cond_destroy(&threadpool->completion_condvar);
452 pthread_mutex_destroy(&threadpool->command_mutex);
453 pthread_cond_destroy(&threadpool->command_condvar);
454 #endif
455 }
456 #if PTHREADPOOL_USE_CPUINFO
457 cpuinfo_deinitialize();
458 #endif
459 pthreadpool_deallocate(threadpool);
460 }
461 }
462