• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright © 2016 Advanced Micro Devices, Inc.
3  * All Rights Reserved.
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining
6  * a copy of this software and associated documentation files (the
7  * "Software"), to deal in the Software without restriction, including
8  * without limitation the rights to use, copy, modify, merge, publish,
9  * distribute, sub license, and/or sell copies of the Software, and to
10  * permit persons to whom the Software is furnished to do so, subject to
11  * the following conditions:
12  *
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
14  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
15  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
16  * NON-INFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT HOLDERS, AUTHORS
17  * AND/OR ITS SUPPLIERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
19  * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20  * USE OR OTHER DEALINGS IN THE SOFTWARE.
21  *
22  * The above copyright notice and this permission notice (including the
23  * next paragraph) shall be included in all copies or substantial portions
24  * of the Software.
25  */
26 
27 #include "u_queue.h"
28 
29 #include "c11/threads.h"
30 #include "util/u_cpu_detect.h"
31 #include "util/os_time.h"
32 #include "util/u_string.h"
33 #include "util/u_thread.h"
34 #include "u_process.h"
35 
36 #if defined(__linux__)
37 #include <sys/time.h>
38 #include <sys/resource.h>
39 #include <sys/syscall.h>
40 #endif
41 
42 
43 /* Define 256MB */
44 #define S_256MB (256 * 1024 * 1024)
45 
46 static void
47 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
48                         bool locked);
49 
50 /****************************************************************************
51  * Wait for all queues to assert idle when exit() is called.
52  *
53  * Otherwise, C++ static variable destructors can be called while threads
54  * are using the static variables.
55  */
56 
57 static once_flag atexit_once_flag = ONCE_FLAG_INIT;
58 static struct list_head queue_list = {
59    .next = &queue_list,
60    .prev = &queue_list,
61 };
62 static mtx_t exit_mutex;
63 
64 static void
atexit_handler(void)65 atexit_handler(void)
66 {
67    struct util_queue *iter;
68 
69    mtx_lock(&exit_mutex);
70    /* Wait for all queues to assert idle. */
71    LIST_FOR_EACH_ENTRY(iter, &queue_list, head) {
72       util_queue_kill_threads(iter, 0, false);
73    }
74    mtx_unlock(&exit_mutex);
75 }
76 
77 static void
global_init(void)78 global_init(void)
79 {
80    mtx_init(&exit_mutex, mtx_plain);
81    atexit(atexit_handler);
82 }
83 
84 static void
add_to_atexit_list(struct util_queue * queue)85 add_to_atexit_list(struct util_queue *queue)
86 {
87    call_once(&atexit_once_flag, global_init);
88 
89    mtx_lock(&exit_mutex);
90    list_add(&queue->head, &queue_list);
91    mtx_unlock(&exit_mutex);
92 }
93 
94 static void
remove_from_atexit_list(struct util_queue * queue)95 remove_from_atexit_list(struct util_queue *queue)
96 {
97    struct util_queue *iter, *tmp;
98 
99    mtx_lock(&exit_mutex);
100    LIST_FOR_EACH_ENTRY_SAFE(iter, tmp, &queue_list, head) {
101       if (iter == queue) {
102          list_del(&iter->head);
103          break;
104       }
105    }
106    mtx_unlock(&exit_mutex);
107 }
108 
109 /****************************************************************************
110  * util_queue_fence
111  */
112 
113 #ifdef UTIL_QUEUE_FENCE_FUTEX
114 static bool
do_futex_fence_wait(struct util_queue_fence * fence,bool timeout,int64_t abs_timeout)115 do_futex_fence_wait(struct util_queue_fence *fence,
116                     bool timeout, int64_t abs_timeout)
117 {
118    uint32_t v = p_atomic_read_relaxed(&fence->val);
119    struct timespec ts;
120    ts.tv_sec = abs_timeout / (1000*1000*1000);
121    ts.tv_nsec = abs_timeout % (1000*1000*1000);
122 
123    while (v != 0) {
124       if (v != 2) {
125          v = p_atomic_cmpxchg(&fence->val, 1, 2);
126          if (v == 0)
127             return true;
128       }
129 
130       int r = futex_wait(&fence->val, 2, timeout ? &ts : NULL);
131       if (timeout && r < 0) {
132          if (errno == ETIMEDOUT)
133             return false;
134       }
135 
136       v = p_atomic_read_relaxed(&fence->val);
137    }
138 
139    return true;
140 }
141 
142 void
_util_queue_fence_wait(struct util_queue_fence * fence)143 _util_queue_fence_wait(struct util_queue_fence *fence)
144 {
145    do_futex_fence_wait(fence, false, 0);
146 }
147 
148 bool
_util_queue_fence_wait_timeout(struct util_queue_fence * fence,int64_t abs_timeout)149 _util_queue_fence_wait_timeout(struct util_queue_fence *fence,
150                                int64_t abs_timeout)
151 {
152    return do_futex_fence_wait(fence, true, abs_timeout);
153 }
154 
155 #endif
156 
157 #ifdef UTIL_QUEUE_FENCE_STANDARD
158 void
util_queue_fence_signal(struct util_queue_fence * fence)159 util_queue_fence_signal(struct util_queue_fence *fence)
160 {
161    mtx_lock(&fence->mutex);
162    fence->signalled = true;
163    cnd_broadcast(&fence->cond);
164    mtx_unlock(&fence->mutex);
165 }
166 
167 void
_util_queue_fence_wait(struct util_queue_fence * fence)168 _util_queue_fence_wait(struct util_queue_fence *fence)
169 {
170    mtx_lock(&fence->mutex);
171    while (!fence->signalled)
172       cnd_wait(&fence->cond, &fence->mutex);
173    mtx_unlock(&fence->mutex);
174 }
175 
176 bool
_util_queue_fence_wait_timeout(struct util_queue_fence * fence,int64_t abs_timeout)177 _util_queue_fence_wait_timeout(struct util_queue_fence *fence,
178                                int64_t abs_timeout)
179 {
180    /* This terrible hack is made necessary by the fact that we really want an
181     * internal interface consistent with os_time_*, but cnd_timedwait is spec'd
182     * to be relative to the TIME_UTC clock.
183     */
184    int64_t rel = abs_timeout - os_time_get_nano();
185 
186    if (rel > 0) {
187       struct timespec ts;
188 
189 #ifdef HAVE_TIMESPEC_GET
190       timespec_get(&ts, TIME_UTC);
191 #else
192       clock_gettime(CLOCK_REALTIME, &ts);
193 #endif
194 
195       ts.tv_sec += abs_timeout / (1000*1000*1000);
196       ts.tv_nsec += abs_timeout % (1000*1000*1000);
197       if (ts.tv_nsec >= (1000*1000*1000)) {
198          ts.tv_sec++;
199          ts.tv_nsec -= (1000*1000*1000);
200       }
201 
202       mtx_lock(&fence->mutex);
203       while (!fence->signalled) {
204          if (cnd_timedwait(&fence->cond, &fence->mutex, &ts) != thrd_success)
205             break;
206       }
207       mtx_unlock(&fence->mutex);
208    }
209 
210    return fence->signalled;
211 }
212 
213 void
util_queue_fence_init(struct util_queue_fence * fence)214 util_queue_fence_init(struct util_queue_fence *fence)
215 {
216    memset(fence, 0, sizeof(*fence));
217    (void) mtx_init(&fence->mutex, mtx_plain);
218    cnd_init(&fence->cond);
219    fence->signalled = true;
220 }
221 
222 void
util_queue_fence_destroy(struct util_queue_fence * fence)223 util_queue_fence_destroy(struct util_queue_fence *fence)
224 {
225    assert(fence->signalled);
226 
227    /* Ensure that another thread is not in the middle of
228     * util_queue_fence_signal (having set the fence to signalled but still
229     * holding the fence mutex).
230     *
231     * A common contract between threads is that as soon as a fence is signalled
232     * by thread A, thread B is allowed to destroy it. Since
233     * util_queue_fence_is_signalled does not lock the fence mutex (for
234     * performance reasons), we must do so here.
235     */
236    mtx_lock(&fence->mutex);
237    mtx_unlock(&fence->mutex);
238 
239    cnd_destroy(&fence->cond);
240    mtx_destroy(&fence->mutex);
241 }
242 #endif
243 
244 /****************************************************************************
245  * util_queue implementation
246  */
247 
248 struct thread_input {
249    struct util_queue *queue;
250    int thread_index;
251 };
252 
253 static int
util_queue_thread_func(void * input)254 util_queue_thread_func(void *input)
255 {
256    struct util_queue *queue = ((struct thread_input*)input)->queue;
257    int thread_index = ((struct thread_input*)input)->thread_index;
258 
259    free(input);
260 
261    if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) {
262       /* Don't inherit the thread affinity from the parent thread.
263        * Set the full mask.
264        */
265       uint32_t mask[UTIL_MAX_CPUS / 32];
266 
267       memset(mask, 0xff, sizeof(mask));
268 
269       util_set_current_thread_affinity(mask, NULL,
270                                        util_get_cpu_caps()->num_cpu_mask_bits);
271    }
272 
273 #if defined(__linux__)
274    if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
275       /* The nice() function can only set a maximum of 19. */
276       setpriority(PRIO_PROCESS, syscall(SYS_gettid), 19);
277    }
278 #endif
279 
280    if (strlen(queue->name) > 0) {
281       char name[16];
282       snprintf(name, sizeof(name), "%s%i", queue->name, thread_index);
283       u_thread_setname(name);
284    }
285 
286    while (1) {
287       struct util_queue_job job;
288 
289       mtx_lock(&queue->lock);
290       assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
291 
292       /* wait if the queue is empty */
293       while (thread_index < queue->num_threads && queue->num_queued == 0)
294          cnd_wait(&queue->has_queued_cond, &queue->lock);
295 
296       /* only kill threads that are above "num_threads" */
297       if (thread_index >= queue->num_threads) {
298          mtx_unlock(&queue->lock);
299          break;
300       }
301 
302       job = queue->jobs[queue->read_idx];
303       memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job));
304       queue->read_idx = (queue->read_idx + 1) % queue->max_jobs;
305 
306       queue->num_queued--;
307       cnd_signal(&queue->has_space_cond);
308       if (job.job)
309          queue->total_jobs_size -= job.job_size;
310       mtx_unlock(&queue->lock);
311 
312       if (job.job) {
313          job.execute(job.job, job.global_data, thread_index);
314          if (job.fence)
315             util_queue_fence_signal(job.fence);
316          if (job.cleanup)
317             job.cleanup(job.job, job.global_data, thread_index);
318       }
319    }
320 
321    /* signal remaining jobs if all threads are being terminated */
322    mtx_lock(&queue->lock);
323    if (queue->num_threads == 0) {
324       for (unsigned i = queue->read_idx; i != queue->write_idx;
325            i = (i + 1) % queue->max_jobs) {
326          if (queue->jobs[i].job) {
327             if (queue->jobs[i].fence)
328                util_queue_fence_signal(queue->jobs[i].fence);
329             queue->jobs[i].job = NULL;
330          }
331       }
332       queue->read_idx = queue->write_idx;
333       queue->num_queued = 0;
334    }
335    mtx_unlock(&queue->lock);
336    return 0;
337 }
338 
339 static bool
util_queue_create_thread(struct util_queue * queue,unsigned index)340 util_queue_create_thread(struct util_queue *queue, unsigned index)
341 {
342    struct thread_input *input =
343       (struct thread_input *) malloc(sizeof(struct thread_input));
344    input->queue = queue;
345    input->thread_index = index;
346 
347    if (thrd_success != u_thread_create(queue->threads + index, util_queue_thread_func, input)) {
348       free(input);
349       return false;
350    }
351 
352    if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) {
353 #if defined(__linux__) && defined(SCHED_BATCH)
354       struct sched_param sched_param = {0};
355 
356       /* The nice() function can only set a maximum of 19.
357        * SCHED_BATCH gives the scheduler a hint that this is a latency
358        * insensitive thread.
359        *
360        * Note that Linux only allows decreasing the priority. The original
361        * priority can't be restored.
362        */
363       pthread_setschedparam(queue->threads[index], SCHED_BATCH, &sched_param);
364 #endif
365    }
366    return true;
367 }
368 
369 void
util_queue_adjust_num_threads(struct util_queue * queue,unsigned num_threads,bool locked)370 util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads,
371                               bool locked)
372 {
373    num_threads = MIN2(num_threads, queue->max_threads);
374    num_threads = MAX2(num_threads, 1);
375 
376    if (!locked)
377       mtx_lock(&queue->lock);
378 
379    unsigned old_num_threads = queue->num_threads;
380 
381    if (num_threads == old_num_threads) {
382       if (!locked)
383          mtx_unlock(&queue->lock);
384       return;
385    }
386 
387    if (num_threads < old_num_threads) {
388       util_queue_kill_threads(queue, num_threads, true);
389       if (!locked)
390          mtx_unlock(&queue->lock);
391       return;
392    }
393 
394    /* Create threads.
395     *
396     * We need to update num_threads first, because threads terminate
397     * when thread_index < num_threads.
398     */
399    queue->num_threads = num_threads;
400    for (unsigned i = old_num_threads; i < num_threads; i++) {
401       if (!util_queue_create_thread(queue, i)) {
402          queue->num_threads = i;
403          break;
404       }
405    }
406 
407    if (!locked)
408       mtx_unlock(&queue->lock);
409 }
410 
411 bool
util_queue_init(struct util_queue * queue,const char * name,unsigned max_jobs,unsigned num_threads,unsigned flags,void * global_data)412 util_queue_init(struct util_queue *queue,
413                 const char *name,
414                 unsigned max_jobs,
415                 unsigned num_threads,
416                 unsigned flags,
417                 void *global_data)
418 {
419    unsigned i;
420 
421    /* Form the thread name from process_name and name, limited to 13
422     * characters. Characters 14-15 are reserved for the thread number.
423     * Character 16 should be 0. Final form: "process:name12"
424     *
425     * If name is too long, it's truncated. If any space is left, the process
426     * name fills it.
427     */
428    const char *process_name = util_get_process_name();
429    int process_len = process_name ? strlen(process_name) : 0;
430    int name_len = strlen(name);
431    const int max_chars = sizeof(queue->name) - 1;
432 
433    name_len = MIN2(name_len, max_chars);
434 
435    /* See if there is any space left for the process name, reserve 1 for
436     * the colon. */
437    process_len = MIN2(process_len, max_chars - name_len - 1);
438    process_len = MAX2(process_len, 0);
439 
440    memset(queue, 0, sizeof(*queue));
441 
442    if (process_len) {
443       snprintf(queue->name, sizeof(queue->name), "%.*s:%s",
444                process_len, process_name, name);
445    } else {
446       snprintf(queue->name, sizeof(queue->name), "%s", name);
447    }
448 
449    queue->create_threads_on_demand = true;
450    queue->flags = flags;
451    queue->max_threads = num_threads;
452    queue->num_threads = 1;
453    queue->max_jobs = max_jobs;
454    queue->global_data = global_data;
455 
456    (void) mtx_init(&queue->lock, mtx_plain);
457 
458    queue->num_queued = 0;
459    cnd_init(&queue->has_queued_cond);
460    cnd_init(&queue->has_space_cond);
461 
462    queue->jobs = (struct util_queue_job*)
463                  calloc(max_jobs, sizeof(struct util_queue_job));
464    if (!queue->jobs)
465       goto fail;
466 
467    queue->threads = (thrd_t*) calloc(queue->max_threads, sizeof(thrd_t));
468    if (!queue->threads)
469       goto fail;
470 
471    /* start threads */
472    for (i = 0; i < queue->num_threads; i++) {
473       if (!util_queue_create_thread(queue, i)) {
474          if (i == 0) {
475             /* no threads created, fail */
476             goto fail;
477          } else {
478             /* at least one thread created, so use it */
479             queue->num_threads = i;
480             break;
481          }
482       }
483    }
484 
485    add_to_atexit_list(queue);
486    return true;
487 
488 fail:
489    free(queue->threads);
490 
491    if (queue->jobs) {
492       cnd_destroy(&queue->has_space_cond);
493       cnd_destroy(&queue->has_queued_cond);
494       mtx_destroy(&queue->lock);
495       free(queue->jobs);
496    }
497    /* also util_queue_is_initialized can be used to check for success */
498    memset(queue, 0, sizeof(*queue));
499    return false;
500 }
501 
502 static void
util_queue_kill_threads(struct util_queue * queue,unsigned keep_num_threads,bool locked)503 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
504                         bool locked)
505 {
506    /* Signal all threads to terminate. */
507    if (!locked)
508       mtx_lock(&queue->lock);
509 
510    if (keep_num_threads >= queue->num_threads) {
511       if (!locked)
512          mtx_unlock(&queue->lock);
513       return;
514    }
515 
516    unsigned old_num_threads = queue->num_threads;
517    /* Setting num_threads is what causes the threads to terminate.
518     * Then cnd_broadcast wakes them up and they will exit their function.
519     */
520    queue->num_threads = keep_num_threads;
521    cnd_broadcast(&queue->has_queued_cond);
522 
523    /* Wait for threads to terminate. */
524    if (keep_num_threads < old_num_threads) {
525       /* We need to unlock the mutex to allow threads to terminate. */
526       mtx_unlock(&queue->lock);
527       for (unsigned i = keep_num_threads; i < old_num_threads; i++)
528          thrd_join(queue->threads[i], NULL);
529       if (locked)
530          mtx_lock(&queue->lock);
531    } else {
532       if (!locked)
533          mtx_unlock(&queue->lock);
534    }
535 }
536 
537 static void
util_queue_finish_execute(void * data,void * gdata,int num_thread)538 util_queue_finish_execute(void *data, void *gdata, int num_thread)
539 {
540    util_barrier *barrier = data;
541    if (util_barrier_wait(barrier))
542       util_barrier_destroy(barrier);
543 }
544 
545 void
util_queue_destroy(struct util_queue * queue)546 util_queue_destroy(struct util_queue *queue)
547 {
548    util_queue_kill_threads(queue, 0, false);
549 
550    /* This makes it safe to call on a queue that failed util_queue_init. */
551    if (queue->head.next != NULL)
552       remove_from_atexit_list(queue);
553 
554    cnd_destroy(&queue->has_space_cond);
555    cnd_destroy(&queue->has_queued_cond);
556    mtx_destroy(&queue->lock);
557    free(queue->jobs);
558    free(queue->threads);
559 }
560 
561 static void
util_queue_add_job_locked(struct util_queue * queue,void * job,struct util_queue_fence * fence,util_queue_execute_func execute,util_queue_execute_func cleanup,const size_t job_size,bool locked)562 util_queue_add_job_locked(struct util_queue *queue,
563                           void *job,
564                           struct util_queue_fence *fence,
565                           util_queue_execute_func execute,
566                           util_queue_execute_func cleanup,
567                           const size_t job_size,
568                           bool locked)
569 {
570    struct util_queue_job *ptr;
571 
572    if (!locked)
573       mtx_lock(&queue->lock);
574    if (queue->num_threads == 0) {
575       if (!locked)
576          mtx_unlock(&queue->lock);
577       /* well no good option here, but any leaks will be
578        * short-lived as things are shutting down..
579        */
580       return;
581    }
582 
583    if (fence)
584       util_queue_fence_reset(fence);
585 
586    assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs);
587 
588    /* Scale the number of threads up if there's already one job waiting. */
589    if (queue->num_queued > 0 &&
590        queue->create_threads_on_demand &&
591        execute != util_queue_finish_execute &&
592        queue->num_threads < queue->max_threads) {
593       util_queue_adjust_num_threads(queue, queue->num_threads + 1, true);
594    }
595 
596    if (queue->num_queued == queue->max_jobs) {
597       if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL &&
598           queue->total_jobs_size + job_size < S_256MB) {
599          /* If the queue is full, make it larger to avoid waiting for a free
600           * slot.
601           */
602          unsigned new_max_jobs = queue->max_jobs + 8;
603          struct util_queue_job *jobs =
604             (struct util_queue_job*)calloc(new_max_jobs,
605                                            sizeof(struct util_queue_job));
606          assert(jobs);
607 
608          /* Copy all queued jobs into the new list. */
609          unsigned num_jobs = 0;
610          unsigned i = queue->read_idx;
611 
612          do {
613             jobs[num_jobs++] = queue->jobs[i];
614             i = (i + 1) % queue->max_jobs;
615          } while (i != queue->write_idx);
616 
617          assert(num_jobs == queue->num_queued);
618 
619          free(queue->jobs);
620          queue->jobs = jobs;
621          queue->read_idx = 0;
622          queue->write_idx = num_jobs;
623          queue->max_jobs = new_max_jobs;
624       } else {
625          /* Wait until there is a free slot. */
626          while (queue->num_queued == queue->max_jobs)
627             cnd_wait(&queue->has_space_cond, &queue->lock);
628       }
629    }
630 
631    ptr = &queue->jobs[queue->write_idx];
632    assert(ptr->job == NULL);
633    ptr->job = job;
634    ptr->global_data = queue->global_data;
635    ptr->fence = fence;
636    ptr->execute = execute;
637    ptr->cleanup = cleanup;
638    ptr->job_size = job_size;
639 
640    queue->write_idx = (queue->write_idx + 1) % queue->max_jobs;
641    queue->total_jobs_size += ptr->job_size;
642 
643    queue->num_queued++;
644    cnd_signal(&queue->has_queued_cond);
645    if (!locked)
646       mtx_unlock(&queue->lock);
647 }
648 
649 void
util_queue_add_job(struct util_queue * queue,void * job,struct util_queue_fence * fence,util_queue_execute_func execute,util_queue_execute_func cleanup,const size_t job_size)650 util_queue_add_job(struct util_queue *queue,
651                    void *job,
652                    struct util_queue_fence *fence,
653                    util_queue_execute_func execute,
654                    util_queue_execute_func cleanup,
655                    const size_t job_size)
656 {
657    util_queue_add_job_locked(queue, job, fence, execute, cleanup, job_size,
658                              false);
659 }
660 
661 /**
662  * Remove a queued job. If the job hasn't started execution, it's removed from
663  * the queue. If the job has started execution, the function waits for it to
664  * complete.
665  *
666  * In all cases, the fence is signalled when the function returns.
667  *
668  * The function can be used when destroying an object associated with the job
669  * when you don't care about the job completion state.
670  */
671 void
util_queue_drop_job(struct util_queue * queue,struct util_queue_fence * fence)672 util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence)
673 {
674    bool removed = false;
675 
676    if (util_queue_fence_is_signalled(fence))
677       return;
678 
679    mtx_lock(&queue->lock);
680    for (unsigned i = queue->read_idx; i != queue->write_idx;
681         i = (i + 1) % queue->max_jobs) {
682       if (queue->jobs[i].fence == fence) {
683          if (queue->jobs[i].cleanup)
684             queue->jobs[i].cleanup(queue->jobs[i].job, queue->global_data, -1);
685 
686          /* Just clear it. The threads will treat as a no-op job. */
687          memset(&queue->jobs[i], 0, sizeof(queue->jobs[i]));
688          removed = true;
689          break;
690       }
691    }
692    mtx_unlock(&queue->lock);
693 
694    if (removed)
695       util_queue_fence_signal(fence);
696    else
697       util_queue_fence_wait(fence);
698 }
699 
700 /**
701  * Wait until all previously added jobs have completed.
702  */
703 void
util_queue_finish(struct util_queue * queue)704 util_queue_finish(struct util_queue *queue)
705 {
706    util_barrier barrier;
707    struct util_queue_fence *fences;
708 
709    /* If 2 threads were adding jobs for 2 different barries at the same time,
710     * a deadlock would happen, because 1 barrier requires that all threads
711     * wait for it exclusively.
712     */
713    mtx_lock(&queue->lock);
714 
715    /* The number of threads can be changed to 0, e.g. by the atexit handler. */
716    if (!queue->num_threads) {
717       mtx_unlock(&queue->lock);
718       return;
719    }
720 
721    /* We need to disable adding new threads in util_queue_add_job because
722     * the finish operation requires a fixed number of threads.
723     *
724     * Also note that util_queue_add_job can unlock the mutex if there is not
725     * enough space in the queue and wait for space.
726     */
727    queue->create_threads_on_demand = false;
728 
729    fences = malloc(queue->num_threads * sizeof(*fences));
730    util_barrier_init(&barrier, queue->num_threads);
731 
732    for (unsigned i = 0; i < queue->num_threads; ++i) {
733       util_queue_fence_init(&fences[i]);
734       util_queue_add_job_locked(queue, &barrier, &fences[i],
735                                 util_queue_finish_execute, NULL, 0, true);
736    }
737    queue->create_threads_on_demand = true;
738    mtx_unlock(&queue->lock);
739 
740    for (unsigned i = 0; i < queue->num_threads; ++i) {
741       util_queue_fence_wait(&fences[i]);
742       util_queue_fence_destroy(&fences[i]);
743    }
744 
745    free(fences);
746 }
747 
748 int64_t
util_queue_get_thread_time_nano(struct util_queue * queue,unsigned thread_index)749 util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index)
750 {
751    /* Allow some flexibility by not raising an error. */
752    if (thread_index >= queue->num_threads)
753       return 0;
754 
755    return util_thread_get_time_nano(queue->threads[thread_index]);
756 }
757