• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* Standard C headers */
2 #include <assert.h>
3 #include <stdbool.h>
4 #include <stdint.h>
5 #include <stdlib.h>
6 #include <string.h>
7 
8 /* Configuration header */
9 #include "threadpool-common.h"
10 
11 /* Windows headers */
12 #ifndef WIN32_LEAN_AND_MEAN
13 #define WIN32_LEAN_AND_MEAN
14 #endif
15 #include <windows.h>
16 
17 /* Public library header */
18 #include <pthreadpool.h>
19 
20 /* Internal library headers */
21 #include "threadpool-atomics.h"
22 #include "threadpool-object.h"
23 #include "threadpool-utils.h"
24 
25 
checkin_worker_thread(struct pthreadpool * threadpool,uint32_t event_index)26 static void checkin_worker_thread(struct pthreadpool* threadpool, uint32_t event_index) {
27 	if (pthreadpool_decrement_fetch_acquire_release_size_t(&threadpool->active_threads) == 0) {
28 		SetEvent(threadpool->completion_event[event_index]);
29 	}
30 }
31 
wait_worker_threads(struct pthreadpool * threadpool,uint32_t event_index)32 static void wait_worker_threads(struct pthreadpool* threadpool, uint32_t event_index) {
33 	/* Initial check */
34 	size_t active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
35 	if (active_threads == 0) {
36 		return;
37 	}
38 
39 	/* Spin-wait */
40 	for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
41 		pthreadpool_yield();
42 
43 		active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
44 		if (active_threads == 0) {
45 			return;
46 		}
47 	}
48 
49 	/* Fall-back to event wait */
50 	const DWORD wait_status = WaitForSingleObject(threadpool->completion_event[event_index], INFINITE);
51 	assert(wait_status == WAIT_OBJECT_0);
52 	assert(pthreadpool_load_relaxed_size_t(&threadpool->active_threads) == 0);
53 }
54 
wait_for_new_command(struct pthreadpool * threadpool,uint32_t last_command,uint32_t last_flags)55 static uint32_t wait_for_new_command(
56 	struct pthreadpool* threadpool,
57 	uint32_t last_command,
58 	uint32_t last_flags)
59 {
60 	uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
61 	if (command != last_command) {
62 		return command;
63 	}
64 
65 	if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) {
66 		/* Spin-wait loop */
67 		for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
68 			pthreadpool_yield();
69 
70 			command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
71 			if (command != last_command) {
72 				return command;
73 			}
74 		}
75 	}
76 
77 	/* Spin-wait disabled or timed out, fall back to event wait */
78 	const uint32_t event_index = (last_command >> 31);
79 	const DWORD wait_status = WaitForSingleObject(threadpool->command_event[event_index], INFINITE);
80 	assert(wait_status == WAIT_OBJECT_0);
81 
82 	command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
83 	assert(command != last_command);
84 	return command;
85 }
86 
thread_main(LPVOID arg)87 static DWORD WINAPI thread_main(LPVOID arg) {
88 	struct thread_info* thread = (struct thread_info*) arg;
89 	struct pthreadpool* threadpool = thread->threadpool;
90 	uint32_t last_command = threadpool_command_init;
91 	struct fpu_state saved_fpu_state = { 0 };
92 	uint32_t flags = 0;
93 
94 	/* Check in */
95 	checkin_worker_thread(threadpool, 0);
96 
97 	/* Monitor new commands and act accordingly */
98 	for (;;) {
99 		uint32_t command = wait_for_new_command(threadpool, last_command, flags);
100 		pthreadpool_fence_acquire();
101 
102 		flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags);
103 
104 		/* Process command */
105 		switch (command & THREADPOOL_COMMAND_MASK) {
106 			case threadpool_command_parallelize:
107 			{
108 				const thread_function_t thread_function =
109 					(thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function);
110 				if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
111 					saved_fpu_state = get_fpu_state();
112 					disable_fpu_denormals();
113 				}
114 
115 				thread_function(threadpool, thread);
116 				if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
117 					set_fpu_state(saved_fpu_state);
118 				}
119 				break;
120 			}
121 			case threadpool_command_shutdown:
122 				/* Exit immediately: the master thread is waiting on pthread_join */
123 				return 0;
124 			case threadpool_command_init:
125 				/* To inhibit compiler warning */
126 				break;
127 		}
128 		/* Notify the master thread that we finished processing */
129 		const uint32_t event_index = command >> 31;
130 		checkin_worker_thread(threadpool, event_index);
131 		/* Update last command */
132 		last_command = command;
133 	};
134 	return 0;
135 }
136 
pthreadpool_create(size_t threads_count)137 struct pthreadpool* pthreadpool_create(size_t threads_count) {
138 	if (threads_count == 0) {
139 		SYSTEM_INFO system_info;
140 		ZeroMemory(&system_info, sizeof(system_info));
141 		GetSystemInfo(&system_info);
142 		threads_count = (size_t) system_info.dwNumberOfProcessors;
143 	}
144 
145 	struct pthreadpool* threadpool = pthreadpool_allocate(threads_count);
146 	if (threadpool == NULL) {
147 		return NULL;
148 	}
149 	threadpool->threads_count = fxdiv_init_size_t(threads_count);
150 	for (size_t tid = 0; tid < threads_count; tid++) {
151 		threadpool->threads[tid].thread_number = tid;
152 		threadpool->threads[tid].threadpool = threadpool;
153 	}
154 
155 	/* Thread pool with a single thread computes everything on the caller thread. */
156 	if (threads_count > 1) {
157 		threadpool->execution_mutex = CreateMutexW(
158 			NULL /* mutex attributes */,
159 			FALSE /* initially owned */,
160 			NULL /* name */);
161 		for (size_t i = 0; i < 2; i++) {
162 			threadpool->completion_event[i] = CreateEventW(
163 				NULL /* event attributes */,
164 				TRUE /* manual-reset event: yes */,
165 				FALSE /* initial state: nonsignaled */,
166 				NULL /* name */);
167 			threadpool->command_event[i] = CreateEventW(
168 				NULL /* event attributes */,
169 				TRUE /* manual-reset event: yes */,
170 				FALSE /* initial state: nonsignaled */,
171 				NULL /* name */);
172 		}
173 
174 		pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
175 
176 		/* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */
177 		for (size_t tid = 1; tid < threads_count; tid++) {
178 			threadpool->threads[tid].thread_handle = CreateThread(
179 				NULL /* thread attributes */,
180 				0 /* stack size: default */,
181 				&thread_main,
182 				&threadpool->threads[tid],
183 				0 /* creation flags */,
184 				NULL /* thread id */);
185 		}
186 
187 		/* Wait until all threads initialize */
188 		wait_worker_threads(threadpool, 0);
189 	}
190 	return threadpool;
191 }
192 
pthreadpool_parallelize(struct pthreadpool * threadpool,thread_function_t thread_function,const void * params,size_t params_size,void * task,void * context,size_t linear_range,uint32_t flags)193 PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
194 	struct pthreadpool* threadpool,
195 	thread_function_t thread_function,
196 	const void* params,
197 	size_t params_size,
198 	void* task,
199 	void* context,
200 	size_t linear_range,
201 	uint32_t flags)
202 {
203 	assert(threadpool != NULL);
204 	assert(thread_function != NULL);
205 	assert(task != NULL);
206 	assert(linear_range > 1);
207 
208 	/* Protect the global threadpool structures */
209 	const DWORD wait_status = WaitForSingleObject(threadpool->execution_mutex, INFINITE);
210 	assert(wait_status == WAIT_OBJECT_0);
211 
212 	/* Setup global arguments */
213 	pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function);
214 	pthreadpool_store_relaxed_void_p(&threadpool->task, task);
215 	pthreadpool_store_relaxed_void_p(&threadpool->argument, context);
216 	pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);
217 
218 	const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
219 	pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count.value - 1 /* caller thread */);
220 
221 	if (params_size != 0) {
222 		CopyMemory(&threadpool->params, params, params_size);
223 		pthreadpool_fence_release();
224 	}
225 
226 	/* Spread the work between threads */
227 	const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, threads_count);
228 	size_t range_start = 0;
229 	for (size_t tid = 0; tid < threads_count.value; tid++) {
230 		struct thread_info* thread = &threadpool->threads[tid];
231 		const size_t range_length = range_params.quotient + (size_t) (tid < range_params.remainder);
232 		const size_t range_end = range_start + range_length;
233 		pthreadpool_store_relaxed_size_t(&thread->range_start, range_start);
234 		pthreadpool_store_relaxed_size_t(&thread->range_end, range_end);
235 		pthreadpool_store_relaxed_size_t(&thread->range_length, range_length);
236 
237 		/* The next subrange starts where the previous ended */
238 		range_start = range_end;
239 	}
240 
241 	/*
242 	 * Update the threadpool command.
243 	 * Imporantly, do it after initializing command parameters (range, task, argument, flags)
244 	 * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask
245 	 * to ensure the unmasked command is different then the last command, because worker threads
246 	 * monitor for change in the unmasked command.
247 	 */
248 	const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
249 	const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize;
250 
251 	/*
252 	 * Reset the command event for the next command.
253 	 * It is important to reset the event before writing out the new command, because as soon as the worker threads
254 	 * observe the new command, they may process it and switch to waiting on the next command event.
255 	 *
256 	 * Note: the event is different from the command event signalled in this update.
257 	 */
258 	const uint32_t event_index = (old_command >> 31);
259 	BOOL reset_event_status = ResetEvent(threadpool->command_event[event_index ^ 1]);
260 	assert(reset_event_status != FALSE);
261 
262 	/*
263 	 * Store the command with release semantics to guarantee that if a worker thread observes
264 	 * the new command value, it also observes the updated command parameters.
265 	 *
266 	 * Note: release semantics is necessary, because the workers might be waiting in a spin-loop
267 	 * rather than on the event object.
268 	 */
269 	pthreadpool_store_release_uint32_t(&threadpool->command, new_command);
270 
271 	/*
272 	 * Signal the event to wake up the threads.
273 	 * Event in use must be switched after every submitted command to avoid race conditions.
274 	 * Choose the event based on the high bit of the command, which is flipped on every update.
275 	 */
276 	const BOOL set_event_status = SetEvent(threadpool->command_event[event_index]);
277 	assert(set_event_status != FALSE);
278 
279 	/* Save and modify FPU denormals control, if needed */
280 	struct fpu_state saved_fpu_state = { 0 };
281 	if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
282 		saved_fpu_state = get_fpu_state();
283 		disable_fpu_denormals();
284 	}
285 
286 	/* Do computations as worker #0 */
287 	thread_function(threadpool, &threadpool->threads[0]);
288 
289 	/* Restore FPU denormals control, if needed */
290 	if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
291 		set_fpu_state(saved_fpu_state);
292 	}
293 
294 	/*
295 	 * Wait until the threads finish computation
296 	 * Use the complementary event because it corresponds to the new command.
297 	 */
298 	wait_worker_threads(threadpool, event_index ^ 1);
299 
300 	/*
301 	 * Reset the completion event for the next command.
302 	 * Note: the event is different from the one used for waiting in this update.
303 	 */
304 	reset_event_status = ResetEvent(threadpool->completion_event[event_index]);
305 	assert(reset_event_status != FALSE);
306 
307 	/* Make changes by other threads visible to this thread */
308 	pthreadpool_fence_acquire();
309 
310 	/* Unprotect the global threadpool structures */
311 	const BOOL release_mutex_status = ReleaseMutex(threadpool->execution_mutex);
312 	assert(release_mutex_status != FALSE);
313 }
314 
pthreadpool_destroy(struct pthreadpool * threadpool)315 void pthreadpool_destroy(struct pthreadpool* threadpool) {
316 	if (threadpool != NULL) {
317 		const size_t threads_count = threadpool->threads_count.value;
318 		if (threads_count > 1) {
319 			pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
320 
321 			/*
322 			 * Store the command with release semantics to guarantee that if a worker thread observes
323 			 * the new command value, it also observes the updated active_threads values.
324 			 */
325 			const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
326 			pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown);
327 
328 			/*
329 			 * Signal the event to wake up the threads.
330 			 * Event in use must be switched after every submitted command to avoid race conditions.
331 			 * Choose the event based on the high bit of the command, which is flipped on every update.
332 			 */
333 			const uint32_t event_index = (old_command >> 31);
334 			const BOOL set_event_status = SetEvent(threadpool->command_event[event_index]);
335 			assert(set_event_status != FALSE);
336 
337 			/* Wait until all threads return */
338 			for (size_t tid = 1; tid < threads_count; tid++) {
339 				const HANDLE thread_handle = threadpool->threads[tid].thread_handle;
340 				if (thread_handle != NULL) {
341 					const DWORD wait_status = WaitForSingleObject(thread_handle, INFINITE);
342 					assert(wait_status == WAIT_OBJECT_0);
343 
344 					const BOOL close_status = CloseHandle(thread_handle);
345 					assert(close_status != FALSE);
346 				}
347 			}
348 
349 			/* Release resources */
350 			if (threadpool->execution_mutex != NULL) {
351 				const BOOL close_status = CloseHandle(threadpool->execution_mutex);
352 				assert(close_status != FALSE);
353 			}
354 			for (size_t i = 0; i < 2; i++) {
355 				if (threadpool->command_event[i] != NULL) {
356 					const BOOL close_status = CloseHandle(threadpool->command_event[i]);
357 					assert(close_status != FALSE);
358 				}
359 				if (threadpool->completion_event[i] != NULL) {
360 					const BOOL close_status = CloseHandle(threadpool->completion_event[i]);
361 					assert(close_status != FALSE);
362 				}
363 			}
364 		}
365 		pthreadpool_deallocate(threadpool);
366 	}
367 }
368