1 /* Standard C headers */
2 #include <assert.h>
3 #include <stdbool.h>
4 #include <stdint.h>
5 #include <stdlib.h>
6 #include <string.h>
7
8 /* Configuration header */
9 #include "threadpool-common.h"
10
11 /* Windows headers */
12 #ifndef WIN32_LEAN_AND_MEAN
13 #define WIN32_LEAN_AND_MEAN
14 #endif
15 #include <windows.h>
16
17 /* Public library header */
18 #include <pthreadpool.h>
19
20 /* Internal library headers */
21 #include "threadpool-atomics.h"
22 #include "threadpool-object.h"
23 #include "threadpool-utils.h"
24
25
checkin_worker_thread(struct pthreadpool * threadpool,uint32_t event_index)26 static void checkin_worker_thread(struct pthreadpool* threadpool, uint32_t event_index) {
27 if (pthreadpool_decrement_fetch_acquire_release_size_t(&threadpool->active_threads) == 0) {
28 SetEvent(threadpool->completion_event[event_index]);
29 }
30 }
31
wait_worker_threads(struct pthreadpool * threadpool,uint32_t event_index)32 static void wait_worker_threads(struct pthreadpool* threadpool, uint32_t event_index) {
33 /* Initial check */
34 size_t active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
35 if (active_threads == 0) {
36 return;
37 }
38
39 /* Spin-wait */
40 for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
41 pthreadpool_yield();
42
43 active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads);
44 if (active_threads == 0) {
45 return;
46 }
47 }
48
49 /* Fall-back to event wait */
50 const DWORD wait_status = WaitForSingleObject(threadpool->completion_event[event_index], INFINITE);
51 assert(wait_status == WAIT_OBJECT_0);
52 assert(pthreadpool_load_relaxed_size_t(&threadpool->active_threads) == 0);
53 }
54
wait_for_new_command(struct pthreadpool * threadpool,uint32_t last_command,uint32_t last_flags)55 static uint32_t wait_for_new_command(
56 struct pthreadpool* threadpool,
57 uint32_t last_command,
58 uint32_t last_flags)
59 {
60 uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
61 if (command != last_command) {
62 return command;
63 }
64
65 if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) {
66 /* Spin-wait loop */
67 for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) {
68 pthreadpool_yield();
69
70 command = pthreadpool_load_acquire_uint32_t(&threadpool->command);
71 if (command != last_command) {
72 return command;
73 }
74 }
75 }
76
77 /* Spin-wait disabled or timed out, fall back to event wait */
78 const uint32_t event_index = (last_command >> 31);
79 const DWORD wait_status = WaitForSingleObject(threadpool->command_event[event_index], INFINITE);
80 assert(wait_status == WAIT_OBJECT_0);
81
82 command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
83 assert(command != last_command);
84 return command;
85 }
86
thread_main(LPVOID arg)87 static DWORD WINAPI thread_main(LPVOID arg) {
88 struct thread_info* thread = (struct thread_info*) arg;
89 struct pthreadpool* threadpool = thread->threadpool;
90 uint32_t last_command = threadpool_command_init;
91 struct fpu_state saved_fpu_state = { 0 };
92 uint32_t flags = 0;
93
94 /* Check in */
95 checkin_worker_thread(threadpool, 0);
96
97 /* Monitor new commands and act accordingly */
98 for (;;) {
99 uint32_t command = wait_for_new_command(threadpool, last_command, flags);
100 pthreadpool_fence_acquire();
101
102 flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags);
103
104 /* Process command */
105 switch (command & THREADPOOL_COMMAND_MASK) {
106 case threadpool_command_parallelize:
107 {
108 const thread_function_t thread_function =
109 (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function);
110 if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
111 saved_fpu_state = get_fpu_state();
112 disable_fpu_denormals();
113 }
114
115 thread_function(threadpool, thread);
116 if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
117 set_fpu_state(saved_fpu_state);
118 }
119 break;
120 }
121 case threadpool_command_shutdown:
122 /* Exit immediately: the master thread is waiting on pthread_join */
123 return 0;
124 case threadpool_command_init:
125 /* To inhibit compiler warning */
126 break;
127 }
128 /* Notify the master thread that we finished processing */
129 const uint32_t event_index = command >> 31;
130 checkin_worker_thread(threadpool, event_index);
131 /* Update last command */
132 last_command = command;
133 };
134 return 0;
135 }
136
pthreadpool_create(size_t threads_count)137 struct pthreadpool* pthreadpool_create(size_t threads_count) {
138 if (threads_count == 0) {
139 SYSTEM_INFO system_info;
140 ZeroMemory(&system_info, sizeof(system_info));
141 GetSystemInfo(&system_info);
142 threads_count = (size_t) system_info.dwNumberOfProcessors;
143 }
144
145 struct pthreadpool* threadpool = pthreadpool_allocate(threads_count);
146 if (threadpool == NULL) {
147 return NULL;
148 }
149 threadpool->threads_count = fxdiv_init_size_t(threads_count);
150 for (size_t tid = 0; tid < threads_count; tid++) {
151 threadpool->threads[tid].thread_number = tid;
152 threadpool->threads[tid].threadpool = threadpool;
153 }
154
155 /* Thread pool with a single thread computes everything on the caller thread. */
156 if (threads_count > 1) {
157 threadpool->execution_mutex = CreateMutexW(
158 NULL /* mutex attributes */,
159 FALSE /* initially owned */,
160 NULL /* name */);
161 for (size_t i = 0; i < 2; i++) {
162 threadpool->completion_event[i] = CreateEventW(
163 NULL /* event attributes */,
164 TRUE /* manual-reset event: yes */,
165 FALSE /* initial state: nonsignaled */,
166 NULL /* name */);
167 threadpool->command_event[i] = CreateEventW(
168 NULL /* event attributes */,
169 TRUE /* manual-reset event: yes */,
170 FALSE /* initial state: nonsignaled */,
171 NULL /* name */);
172 }
173
174 pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
175
176 /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */
177 for (size_t tid = 1; tid < threads_count; tid++) {
178 threadpool->threads[tid].thread_handle = CreateThread(
179 NULL /* thread attributes */,
180 0 /* stack size: default */,
181 &thread_main,
182 &threadpool->threads[tid],
183 0 /* creation flags */,
184 NULL /* thread id */);
185 }
186
187 /* Wait until all threads initialize */
188 wait_worker_threads(threadpool, 0);
189 }
190 return threadpool;
191 }
192
pthreadpool_parallelize(struct pthreadpool * threadpool,thread_function_t thread_function,const void * params,size_t params_size,void * task,void * context,size_t linear_range,uint32_t flags)193 PTHREADPOOL_INTERNAL void pthreadpool_parallelize(
194 struct pthreadpool* threadpool,
195 thread_function_t thread_function,
196 const void* params,
197 size_t params_size,
198 void* task,
199 void* context,
200 size_t linear_range,
201 uint32_t flags)
202 {
203 assert(threadpool != NULL);
204 assert(thread_function != NULL);
205 assert(task != NULL);
206 assert(linear_range > 1);
207
208 /* Protect the global threadpool structures */
209 const DWORD wait_status = WaitForSingleObject(threadpool->execution_mutex, INFINITE);
210 assert(wait_status == WAIT_OBJECT_0);
211
212 /* Setup global arguments */
213 pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function);
214 pthreadpool_store_relaxed_void_p(&threadpool->task, task);
215 pthreadpool_store_relaxed_void_p(&threadpool->argument, context);
216 pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags);
217
218 const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count;
219 pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count.value - 1 /* caller thread */);
220
221 if (params_size != 0) {
222 CopyMemory(&threadpool->params, params, params_size);
223 pthreadpool_fence_release();
224 }
225
226 /* Spread the work between threads */
227 const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, threads_count);
228 size_t range_start = 0;
229 for (size_t tid = 0; tid < threads_count.value; tid++) {
230 struct thread_info* thread = &threadpool->threads[tid];
231 const size_t range_length = range_params.quotient + (size_t) (tid < range_params.remainder);
232 const size_t range_end = range_start + range_length;
233 pthreadpool_store_relaxed_size_t(&thread->range_start, range_start);
234 pthreadpool_store_relaxed_size_t(&thread->range_end, range_end);
235 pthreadpool_store_relaxed_size_t(&thread->range_length, range_length);
236
237 /* The next subrange starts where the previous ended */
238 range_start = range_end;
239 }
240
241 /*
242 * Update the threadpool command.
243 * Imporantly, do it after initializing command parameters (range, task, argument, flags)
244 * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask
245 * to ensure the unmasked command is different then the last command, because worker threads
246 * monitor for change in the unmasked command.
247 */
248 const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
249 const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize;
250
251 /*
252 * Reset the command event for the next command.
253 * It is important to reset the event before writing out the new command, because as soon as the worker threads
254 * observe the new command, they may process it and switch to waiting on the next command event.
255 *
256 * Note: the event is different from the command event signalled in this update.
257 */
258 const uint32_t event_index = (old_command >> 31);
259 BOOL reset_event_status = ResetEvent(threadpool->command_event[event_index ^ 1]);
260 assert(reset_event_status != FALSE);
261
262 /*
263 * Store the command with release semantics to guarantee that if a worker thread observes
264 * the new command value, it also observes the updated command parameters.
265 *
266 * Note: release semantics is necessary, because the workers might be waiting in a spin-loop
267 * rather than on the event object.
268 */
269 pthreadpool_store_release_uint32_t(&threadpool->command, new_command);
270
271 /*
272 * Signal the event to wake up the threads.
273 * Event in use must be switched after every submitted command to avoid race conditions.
274 * Choose the event based on the high bit of the command, which is flipped on every update.
275 */
276 const BOOL set_event_status = SetEvent(threadpool->command_event[event_index]);
277 assert(set_event_status != FALSE);
278
279 /* Save and modify FPU denormals control, if needed */
280 struct fpu_state saved_fpu_state = { 0 };
281 if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
282 saved_fpu_state = get_fpu_state();
283 disable_fpu_denormals();
284 }
285
286 /* Do computations as worker #0 */
287 thread_function(threadpool, &threadpool->threads[0]);
288
289 /* Restore FPU denormals control, if needed */
290 if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) {
291 set_fpu_state(saved_fpu_state);
292 }
293
294 /*
295 * Wait until the threads finish computation
296 * Use the complementary event because it corresponds to the new command.
297 */
298 wait_worker_threads(threadpool, event_index ^ 1);
299
300 /*
301 * Reset the completion event for the next command.
302 * Note: the event is different from the one used for waiting in this update.
303 */
304 reset_event_status = ResetEvent(threadpool->completion_event[event_index]);
305 assert(reset_event_status != FALSE);
306
307 /* Make changes by other threads visible to this thread */
308 pthreadpool_fence_acquire();
309
310 /* Unprotect the global threadpool structures */
311 const BOOL release_mutex_status = ReleaseMutex(threadpool->execution_mutex);
312 assert(release_mutex_status != FALSE);
313 }
314
pthreadpool_destroy(struct pthreadpool * threadpool)315 void pthreadpool_destroy(struct pthreadpool* threadpool) {
316 if (threadpool != NULL) {
317 const size_t threads_count = threadpool->threads_count.value;
318 if (threads_count > 1) {
319 pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */);
320
321 /*
322 * Store the command with release semantics to guarantee that if a worker thread observes
323 * the new command value, it also observes the updated active_threads values.
324 */
325 const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command);
326 pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown);
327
328 /*
329 * Signal the event to wake up the threads.
330 * Event in use must be switched after every submitted command to avoid race conditions.
331 * Choose the event based on the high bit of the command, which is flipped on every update.
332 */
333 const uint32_t event_index = (old_command >> 31);
334 const BOOL set_event_status = SetEvent(threadpool->command_event[event_index]);
335 assert(set_event_status != FALSE);
336
337 /* Wait until all threads return */
338 for (size_t tid = 1; tid < threads_count; tid++) {
339 const HANDLE thread_handle = threadpool->threads[tid].thread_handle;
340 if (thread_handle != NULL) {
341 const DWORD wait_status = WaitForSingleObject(thread_handle, INFINITE);
342 assert(wait_status == WAIT_OBJECT_0);
343
344 const BOOL close_status = CloseHandle(thread_handle);
345 assert(close_status != FALSE);
346 }
347 }
348
349 /* Release resources */
350 if (threadpool->execution_mutex != NULL) {
351 const BOOL close_status = CloseHandle(threadpool->execution_mutex);
352 assert(close_status != FALSE);
353 }
354 for (size_t i = 0; i < 2; i++) {
355 if (threadpool->command_event[i] != NULL) {
356 const BOOL close_status = CloseHandle(threadpool->command_event[i]);
357 assert(close_status != FALSE);
358 }
359 if (threadpool->completion_event[i] != NULL) {
360 const BOOL close_status = CloseHandle(threadpool->completion_event[i]);
361 assert(close_status != FALSE);
362 }
363 }
364 }
365 pthreadpool_deallocate(threadpool);
366 }
367 }
368