• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Standard C headers */
2 #include <assert.h>
3 #include <limits.h>
4 #include <stdbool.h>
5 #include <stdint.h>
6 #include <stdlib.h>
7 #include <string.h>
8 
9 /* Configuration header */
10 #include "threadpool-common.h"
11 
12 /* POSIX headers */
13 #include <pthread.h>
14 #include <unistd.h>
15 
16 /* Futex-specific headers */
17 #if PTHREADPOOL_USE_FUTEX
18 	#if defined(__linux__)
19 		#include <sys/syscall.h>
20 		#include <linux/futex.h>
21 
22 		/* Old Android NDKs do not define SYS_futex and FUTEX_PRIVATE_FLAG */
23 		#ifndef SYS_futex
24 			#define SYS_futex __NR_futex
25 		#endif
26 		#ifndef FUTEX_PRIVATE_FLAG
27 			#define FUTEX_PRIVATE_FLAG 128
28 		#endif
29 	#elif defined(__EMSCRIPTEN__)
30 		/* math.h for INFINITY constant */
31 		#include <math.h>
32 
33 		#include <emscripten/threading.h>
34 	#else
35 		#error "Platform-specific implementation of futex_wait and futex_wake_all required"
36 	#endif
37 #endif
38 
39 /* Windows-specific headers */
40 #ifdef _WIN32
41 	#include <sysinfoapi.h>
42 #endif
43 
44 /* Dependencies */
45 #if PTHREADPOOL_USE_CPUINFO
46 	#include <cpuinfo.h>
47 #endif
48 
49 /* Public library header */
50 #include <pthreadpool.h>
51 
52 /* Internal library headers */
53 #include "threadpool-atomics.h"
54 #include "threadpool-object.h"
55 #include "threadpool-utils.h"
56 
57 
58 #if PTHREADPOOL_USE_FUTEX
59 	#if defined(__linux__)
futex_wait(pthreadpool_atomic_uint32_t * address,uint32_t value)60 		static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) {
61 			return syscall(SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, value, NULL);
62 		}
63 
futex_wake_all(pthreadpool_atomic_uint32_t * address)64 		static int futex_wake_all(pthreadpool_atomic_uint32_t* address) {
65 			return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX);
66 		}
67 	#elif defined(__EMSCRIPTEN__)
futex_wait(pthreadpool_atomic_uint32_t * address,uint32_t value)68 		static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) {
69 			return emscripten_futex_wait((volatile void*) address, value, INFINITY);
70 		}
71 
futex_wake_all(pthreadpool_atomic_uint32_t * address)72 		static int futex_wake_all(pthreadpool_atomic_uint32_t* address) {
73 			return emscripten_futex_wake((volatile void*) address, INT_MAX);
74 		}
75 	#else
76 		#error "Platform-specific implementation of futex_wait and futex_wake_all required"
77 	#endif
78 #endif
79 
checkin_worker_thread(struct pthreadpool * threadpool)80 static void checkin_worker_thread(struct pthreadpool* threadpool) {
81 	#if PTHREADPOOL_USE_FUTEX
82 		if (pthreadpool_decrement_fetch_relaxed_size_t(&threadpool->active_threads) == 0) {
83 			pthreadpool_store_release_uint32_t(&threadpool->has_active_threads, 0);
84 			futex_wake_all(&threadpool->has_active_threads);
85 		}
86 	#else
87 		pthread_mutex_lock(&threadpool->completion_mutex);
88 		if (pthreadpool_decrement_fetch_release_size_t(&threadpool->active_threads) == 0) {
89 			pthread_cond_signal(&threadpool->completion_condvar);
90 		}
91 		pthread_mutex_unlock(&threadpool->completion_mutex);
92 	#endif
93 }
94 
wait_worker_threads(struct pthreadpool * threadpool)95 static void wait_worker_threads(struct pthreadpool* threadpool) {
96 	/* Initial check */
97 	#if PTHREADPOOL_USE_FUTEX
98 		uint32_t has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads);
99 		if (has_active_threads == 0) {
100 			return;
101 		}
102 	#else
103 		size_t active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
104 		if (active_threads == 0) {
105 			return;
106 		}
107 	#endif
108 
109 	/* Spin-wait */
110 	for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
111 		pthreadpool_yield();
112 
113 		#if PTHREADPOOL_USE_FUTEX
114 			has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads);
115 			if (has_active_threads == 0) {
116 				return;
117 			}
118 		#else
119 			active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
120 			if (active_threads == 0) {
121 				return;
122 			}
123 		#endif
124 	}
125 
126 	/* Fall-back to mutex/futex wait */
127 	#if PTHREADPOOL_USE_FUTEX
128 		while ((has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads)) != 0) {
129 			futex_wait(&threadpool->has_active_threads, 1);
130 		}
131 	#else
132 		pthread_mutex_lock(&threadpool->completion_mutex);
133 		while (pthreadpool_load_acquire_size_t(&threadpool->active_threads) != 0) {
134 			pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex);
135 		};
136 		pthread_mutex_unlock(&threadpool->completion_mutex);
137 	#endif
138 }
139 
wait_for_new_command(struct pthreadpool * threadpool,uint32_t last_command,uint32_t last_flags)140 static uint32_t wait_for_new_command(
141 	struct pthreadpool* threadpool,
142 	uint32_t last_command,
143 	uint32_t last_flags)
144 {
145 	uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
146 	if (command != last_command) {
147 		return command;
148 	}
149 
150 	if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) {
151 		/* Spin-wait loop */
152 		for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
153 			pthreadpool_yield();
154 
155 			command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
156 			if (command != last_command) {
157 				return command;
158 			}
159 		}
160 	}
161 
162 	/* Spin-wait disabled or timed out, fall back to mutex/futex wait */
163 	#if PTHREADPOOL_USE_FUTEX
164 		do {
165 			futex_wait(&threadpool->command, last_command);
166 			command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
167 		} while (command == last_command);
168 	#else
169 		/* Lock the command mutex */
170 		pthread_mutex_lock(&threadpool->command_mutex);
171 		/* Read the command */
172 		while ((command = pthreadpool_load_acquire_uint32_t(&threadpool->command)) == last_command) {
173 			/* Wait for new command */
174 			pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex);
175 		}
176 		/* Read a new command */
177 		pthread_mutex_unlock(&threadpool->command_mutex);
178 	#endif
179 	return command;
180 }
181 
thread_main(void * arg)182 static void* thread_main(void* arg) {
183 	struct thread_info* thread = (struct thread_info*) arg;
184 	struct pthreadpool* threadpool = thread->threadpool;
185 	uint32_t last_command = threadpool_command_init;
186 	struct fpu_state saved_fpu_state = { 0 };
187 	uint32_t flags = 0;
188 
189 	/* Check in */
190 	checkin_worker_thread(threadpool);
191 
192 	/* Monitor new commands and act accordingly */
193 	for (;;) {
194 		uint32_t command = wait_for_new_command(threadpool, last_command, flags);
195 		pthreadpool_fence_acquire();
196 
197 		flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags);
198 
199 		/* Process command */
200 		switch (command & THREADPOOL_COMMAND_MASK) {
201 			case threadpool_command_parallelize:
202 			{
203 				const thread_function_t thread_function =
204 					(thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function);
205 				if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
206 					saved_fpu_state = get_fpu_state();
207 					disable_fpu_denormals();
208 				}
209 
210 				thread_function(threadpool, thread);
211 				if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
212 					set_fpu_state(saved_fpu_state);
213 				}
214 				break;
215 			}
216 			case threadpool_command_shutdown:
217 				/* Exit immediately: the master thread is waiting on pthread_join */
218 				return NULL;
219 			case threadpool_command_init:
220 				/* To inhibit compiler warning */
221 				break;
222 		}
223 		/* Notify the master thread that we finished processing */
224 		checkin_worker_thread(threadpool);
225 		/* Update last command */
226 		last_command = command;
227 	};
228 }
229 
pthreadpool_create(size_t threads_count)230 struct pthreadpool* pthreadpool_create(size_t threads_count) {
231 	#if PTHREADPOOL_USE_CPUINFO
232 		if (!cpuinfo_initialize()) {
233 			return NULL;
234 		}
235 	#endif
236 
237 	if (threads_count == 0) {
238 		#if PTHREADPOOL_USE_CPUINFO
239 			threads_count = cpuinfo_get_processors_count();
240 		#elif defined(_SC_NPROCESSORS_ONLN)
241 			threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN);
242 			#if defined(__EMSCRIPTEN_PTHREADS__)
243 				/* Limit the number of threads to 8 to match link-time PTHREAD_POOL_SIZE option */
244 				if (threads_count >= 8) {
245 					threads_count = 8;
246 				}
247 			#endif
248 		#elif defined(_WIN32)
249 			SYSTEM_INFO system_info;
250 			ZeroMemory(&system_info, sizeof(system_info));
251 			GetSystemInfo(&system_info);
252 			threads_count = (size_t) system_info.dwNumberOfProcessors;
253 		#else
254 			#error "Platform-specific implementation of sysconf(_SC_NPROCESSORS_ONLN) required"
255 		#endif
256 	}
257 
258 	struct pthreadpool* threadpool = pthreadpool_allocate(threads_count);
259 	if (threadpool == NULL) {
260 		return NULL;
261 	}
262 	threadpool->threads_count = fxdiv_init_size_t(threads_count);
263 	for (size_t tid = 0; tid < threads_count; tid++) {
264 		threadpool->threads[tid].thread_number = tid;
265 		threadpool->threads[tid].threadpool = threadpool;
266 	}
267 
268 	/* Thread pool with a single thread computes everything on the caller thread. */
269 	if (threads_count > 1) {
270 		pthread_mutex_init(&threadpool->execution_mutex, NULL);
271 		#if !PTHREADPOOL_USE_FUTEX
272 			pthread_mutex_init(&threadpool->completion_mutex, NULL);
273 			pthread_cond_init(&threadpool->completion_condvar, NULL);
274 			pthread_mutex_init(&threadpool->command_mutex, NULL);
275 			pthread_cond_init(&threadpool->command_condvar, NULL);
276 		#endif
277 
278 		#if PTHREADPOOL_USE_FUTEX
279 			pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
280 		#endif
281 		pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
282 
283 		/* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */
284 		for (size_t tid = 1; tid < threads_count; tid++) {
285 			pthread_create(&threadpool->threads[tid].thread_object, NULL, &thread_main, &threadpool->threads[tid]);
286 		}
287 
288 		/* Wait until all threads initialize */
289 		wait_worker_threads(threadpool);
290 	}
291 	return threadpool;
292 }
293 
pthreadpool_parallelize(struct pthreadpool * threadpool,thread_function_t thread_function,const void * params,size_t params_size,void * task,void * context,size_t linear_range,uint32_t flags)294 PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
295 	struct pthreadpool* threadpool,
296 	thread_function_t thread_function,
297 	const void* params,
298 	size_t params_size,
299 	void* task,
300 	void* context,
301 	size_t linear_range,
302 	uint32_t flags)
303 {
304 	assert(threadpool != NULL);
305 	assert(thread_function != NULL);
306 	assert(task != NULL);
307 	assert(linear_range > 1);
308 
309 	/* Protect the global threadpool structures */
310 	pthread_mutex_lock(&threadpool->execution_mutex);
311 
312 	#if !PTHREADPOOL_USE_FUTEX
313 		/* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */
314 		pthread_mutex_lock(&threadpool->command_mutex);
315 	#endif
316 
317 	/* Setup global arguments */
318 	pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function);
319 	pthreadpool_store_relaxed_void_p(&threadpool->task, task);
320 	pthreadpool_store_relaxed_void_p(&threadpool->argument, context);
321 	pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);
322 
323 	/* Locking of completion_mutex not needed: readers are sleeping on command_condvar */
324 	const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
325 	pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count.value - 1 /* caller thread */);
326 	#if PTHREADPOOL_USE_FUTEX
327 		pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
328 	#endif
329 
330 	if (params_size != 0) {
331 		memcpy(&threadpool->params, params, params_size);
332 		pthreadpool_fence_release();
333 	}
334 
335 	/* Spread the work between threads */
336 	const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, threads_count);
337 	size_t range_start = 0;
338 	for (size_t tid = 0; tid < threads_count.value; tid++) {
339 		struct thread_info* thread = &threadpool->threads[tid];
340 		const size_t range_length = range_params.quotient + (size_t) (tid < range_params.remainder);
341 		const size_t range_end = range_start + range_length;
342 		pthreadpool_store_relaxed_size_t(&thread->range_start, range_start);
343 		pthreadpool_store_relaxed_size_t(&thread->range_end, range_end);
344 		pthreadpool_store_relaxed_size_t(&thread->range_length, range_length);
345 
346 		/* The next subrange starts where the previous ended */
347 		range_start = range_end;
348 	}
349 
350 	/*
351 	 * Update the threadpool command.
352 	 * Imporantly, do it after initializing command parameters (range, task, argument, flags)
353 	 * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask
354 	 * to ensure the unmasked command is different then the last command, because worker threads
355 	 * monitor for change in the unmasked command.
356 	 */
357 	const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
358 	const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize;
359 
360 	/*
361 	 * Store the command with release semantics to guarantee that if a worker thread observes
362 	 * the new command value, it also observes the updated command parameters.
363 	 *
364 	 * Note: release semantics is necessary even with a conditional variable, because the workers might
365 	 * be waiting in a spin-loop rather than the conditional variable.
366 	 */
367 	pthreadpool_store_release_uint32_t(&threadpool->command, new_command);
368 	#if PTHREADPOOL_USE_FUTEX
369 		/* Wake up the threads */
370 		futex_wake_all(&threadpool->command);
371 	#else
372 		/* Unlock the command variables before waking up the threads for better performance */
373 		pthread_mutex_unlock(&threadpool->command_mutex);
374 
375 		/* Wake up the threads */
376 		pthread_cond_broadcast(&threadpool->command_condvar);
377 	#endif
378 
379 	/* Save and modify FPU denormals control, if needed */
380 	struct fpu_state saved_fpu_state = { 0 };
381 	if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
382 		saved_fpu_state = get_fpu_state();
383 		disable_fpu_denormals();
384 	}
385 
386 	/* Do computations as worker #0 */
387 	thread_function(threadpool, &threadpool->threads[0]);
388 
389 	/* Restore FPU denormals control, if needed */
390 	if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
391 		set_fpu_state(saved_fpu_state);
392 	}
393 
394 	/* Wait until the threads finish computation */
395 	wait_worker_threads(threadpool);
396 
397 	/* Make changes by other threads visible to this thread */
398 	pthreadpool_fence_acquire();
399 
400 	/* Unprotect the global threadpool structures */
401 	pthread_mutex_unlock(&threadpool->execution_mutex);
402 }
403 
pthreadpool_destroy(struct pthreadpool * threadpool)404 void pthreadpool_destroy(struct pthreadpool* threadpool) {
405 	if (threadpool != NULL) {
406 		const size_t threads_count = threadpool->threads_count.value;
407 		if (threads_count > 1) {
408 			#if PTHREADPOOL_USE_FUTEX
409 				pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
410 				pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1);
411 
412 				/*
413 				 * Store the command with release semantics to guarantee that if a worker thread observes
414 				 * the new command value, it also observes the updated active_threads/has_active_threads values.
415 				 */
416 				pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown);
417 
418 				/* Wake up worker threads */
419 				futex_wake_all(&threadpool->command);
420 			#else
421 				/* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */
422 				pthread_mutex_lock(&threadpool->command_mutex);
423 
424 				pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
425 
426 				/*
427 				 * Store the command with release semantics to guarantee that if a worker thread observes
428 				 * the new command value, it also observes the updated active_threads value.
429 				 *
430 				 * Note: the release fence inside pthread_mutex_unlock is insufficient,
431 				 * because the workers might be waiting in a spin-loop rather than the conditional variable.
432 				 */
433 				pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown);
434 
435 				/* Wake up worker threads */
436 				pthread_cond_broadcast(&threadpool->command_condvar);
437 
438 				/* Commit the state changes and let workers start processing */
439 				pthread_mutex_unlock(&threadpool->command_mutex);
440 			#endif
441 
442 			/* Wait until all threads return */
443 			for (size_t thread = 1; thread < threads_count; thread++) {
444 				pthread_join(threadpool->threads[thread].thread_object, NULL);
445 			}
446 
447 			/* Release resources */
448 			pthread_mutex_destroy(&threadpool->execution_mutex);
449 			#if !PTHREADPOOL_USE_FUTEX
450 				pthread_mutex_destroy(&threadpool->completion_mutex);
451 				pthread_cond_destroy(&threadpool->completion_condvar);
452 				pthread_mutex_destroy(&threadpool->command_mutex);
453 				pthread_cond_destroy(&threadpool->command_condvar);
454 			#endif
455 		}
456 		#if PTHREADPOOL_USE_CPUINFO
457 			cpuinfo_deinitialize();
458 		#endif
459 		pthreadpool_deallocate(threadpool);
460 	}
461 }
462