Lines Matching refs:queue
47 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads,
82 add_to_atexit_list(struct util_queue *queue) in add_to_atexit_list() argument
87 list_add(&queue->head, &queue_list); in add_to_atexit_list()
92 remove_from_atexit_list(struct util_queue *queue) in remove_from_atexit_list() argument
98 if (iter == queue) { in remove_from_atexit_list()
246 struct util_queue *queue; member
253 struct util_queue *queue = ((struct thread_input*)input)->queue; in util_queue_thread_func() local
258 if (queue->flags & UTIL_QUEUE_INIT_SET_FULL_THREAD_AFFINITY) { in util_queue_thread_func()
269 if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) { in util_queue_thread_func()
275 if (strlen(queue->name) > 0) { in util_queue_thread_func()
277 snprintf(name, sizeof(name), "%s%i", queue->name, thread_index); in util_queue_thread_func()
284 mtx_lock(&queue->lock); in util_queue_thread_func()
285 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); in util_queue_thread_func()
288 while (thread_index < queue->num_threads && queue->num_queued == 0) in util_queue_thread_func()
289 cnd_wait(&queue->has_queued_cond, &queue->lock); in util_queue_thread_func()
292 if (thread_index >= queue->num_threads) { in util_queue_thread_func()
293 mtx_unlock(&queue->lock); in util_queue_thread_func()
297 job = queue->jobs[queue->read_idx]; in util_queue_thread_func()
298 memset(&queue->jobs[queue->read_idx], 0, sizeof(struct util_queue_job)); in util_queue_thread_func()
299 queue->read_idx = (queue->read_idx + 1) % queue->max_jobs; in util_queue_thread_func()
301 queue->num_queued--; in util_queue_thread_func()
302 cnd_signal(&queue->has_space_cond); in util_queue_thread_func()
304 queue->total_jobs_size -= job.job_size; in util_queue_thread_func()
305 mtx_unlock(&queue->lock); in util_queue_thread_func()
316 mtx_lock(&queue->lock); in util_queue_thread_func()
317 if (queue->num_threads == 0) { in util_queue_thread_func()
318 for (unsigned i = queue->read_idx; i != queue->write_idx; in util_queue_thread_func()
319 i = (i + 1) % queue->max_jobs) { in util_queue_thread_func()
320 if (queue->jobs[i].job) { in util_queue_thread_func()
321 util_queue_fence_signal(queue->jobs[i].fence); in util_queue_thread_func()
322 queue->jobs[i].job = NULL; in util_queue_thread_func()
325 queue->read_idx = queue->write_idx; in util_queue_thread_func()
326 queue->num_queued = 0; in util_queue_thread_func()
328 mtx_unlock(&queue->lock); in util_queue_thread_func()
333 util_queue_create_thread(struct util_queue *queue, unsigned index) in util_queue_create_thread() argument
337 input->queue = queue; in util_queue_create_thread()
340 queue->threads[index] = u_thread_create(util_queue_thread_func, input); in util_queue_create_thread()
342 if (!queue->threads[index]) { in util_queue_create_thread()
347 if (queue->flags & UTIL_QUEUE_INIT_USE_MINIMUM_PRIORITY) { in util_queue_create_thread()
358 pthread_setschedparam(queue->threads[index], SCHED_BATCH, &sched_param); in util_queue_create_thread()
365 util_queue_adjust_num_threads(struct util_queue *queue, unsigned num_threads) in util_queue_adjust_num_threads() argument
367 num_threads = MIN2(num_threads, queue->max_threads); in util_queue_adjust_num_threads()
370 mtx_lock(&queue->finish_lock); in util_queue_adjust_num_threads()
371 unsigned old_num_threads = queue->num_threads; in util_queue_adjust_num_threads()
374 mtx_unlock(&queue->finish_lock); in util_queue_adjust_num_threads()
379 util_queue_kill_threads(queue, num_threads, true); in util_queue_adjust_num_threads()
380 mtx_unlock(&queue->finish_lock); in util_queue_adjust_num_threads()
389 queue->num_threads = num_threads; in util_queue_adjust_num_threads()
391 if (!util_queue_create_thread(queue, i)) in util_queue_adjust_num_threads()
394 mtx_unlock(&queue->finish_lock); in util_queue_adjust_num_threads()
398 util_queue_init(struct util_queue *queue, in util_queue_init() argument
416 const int max_chars = sizeof(queue->name) - 1; in util_queue_init()
425 memset(queue, 0, sizeof(*queue)); in util_queue_init()
428 snprintf(queue->name, sizeof(queue->name), "%.*s:%s", in util_queue_init()
431 snprintf(queue->name, sizeof(queue->name), "%s", name); in util_queue_init()
434 queue->flags = flags; in util_queue_init()
435 queue->max_threads = num_threads; in util_queue_init()
436 queue->num_threads = num_threads; in util_queue_init()
437 queue->max_jobs = max_jobs; in util_queue_init()
439 queue->jobs = (struct util_queue_job*) in util_queue_init()
441 if (!queue->jobs) in util_queue_init()
444 (void) mtx_init(&queue->lock, mtx_plain); in util_queue_init()
445 (void) mtx_init(&queue->finish_lock, mtx_plain); in util_queue_init()
447 queue->num_queued = 0; in util_queue_init()
448 cnd_init(&queue->has_queued_cond); in util_queue_init()
449 cnd_init(&queue->has_space_cond); in util_queue_init()
451 queue->threads = (thrd_t*) calloc(num_threads, sizeof(thrd_t)); in util_queue_init()
452 if (!queue->threads) in util_queue_init()
457 if (!util_queue_create_thread(queue, i)) { in util_queue_init()
463 queue->num_threads = i; in util_queue_init()
469 add_to_atexit_list(queue); in util_queue_init()
473 free(queue->threads); in util_queue_init()
475 if (queue->jobs) { in util_queue_init()
476 cnd_destroy(&queue->has_space_cond); in util_queue_init()
477 cnd_destroy(&queue->has_queued_cond); in util_queue_init()
478 mtx_destroy(&queue->lock); in util_queue_init()
479 free(queue->jobs); in util_queue_init()
482 memset(queue, 0, sizeof(*queue)); in util_queue_init()
487 util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads, in util_queue_kill_threads() argument
494 mtx_lock(&queue->finish_lock); in util_queue_kill_threads()
496 if (keep_num_threads >= queue->num_threads) { in util_queue_kill_threads()
497 mtx_unlock(&queue->finish_lock); in util_queue_kill_threads()
501 mtx_lock(&queue->lock); in util_queue_kill_threads()
502 unsigned old_num_threads = queue->num_threads; in util_queue_kill_threads()
506 queue->num_threads = keep_num_threads; in util_queue_kill_threads()
507 cnd_broadcast(&queue->has_queued_cond); in util_queue_kill_threads()
508 mtx_unlock(&queue->lock); in util_queue_kill_threads()
511 thrd_join(queue->threads[i], NULL); in util_queue_kill_threads()
514 mtx_unlock(&queue->finish_lock); in util_queue_kill_threads()
518 util_queue_destroy(struct util_queue *queue) in util_queue_destroy() argument
520 util_queue_kill_threads(queue, 0, false); in util_queue_destroy()
521 remove_from_atexit_list(queue); in util_queue_destroy()
523 cnd_destroy(&queue->has_space_cond); in util_queue_destroy()
524 cnd_destroy(&queue->has_queued_cond); in util_queue_destroy()
525 mtx_destroy(&queue->finish_lock); in util_queue_destroy()
526 mtx_destroy(&queue->lock); in util_queue_destroy()
527 free(queue->jobs); in util_queue_destroy()
528 free(queue->threads); in util_queue_destroy()
532 util_queue_add_job(struct util_queue *queue, in util_queue_add_job() argument
541 mtx_lock(&queue->lock); in util_queue_add_job()
542 if (queue->num_threads == 0) { in util_queue_add_job()
543 mtx_unlock(&queue->lock); in util_queue_add_job()
552 assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); in util_queue_add_job()
554 if (queue->num_queued == queue->max_jobs) { in util_queue_add_job()
555 if (queue->flags & UTIL_QUEUE_INIT_RESIZE_IF_FULL && in util_queue_add_job()
556 queue->total_jobs_size + job_size < S_256MB) { in util_queue_add_job()
560 unsigned new_max_jobs = queue->max_jobs + 8; in util_queue_add_job()
568 unsigned i = queue->read_idx; in util_queue_add_job()
571 jobs[num_jobs++] = queue->jobs[i]; in util_queue_add_job()
572 i = (i + 1) % queue->max_jobs; in util_queue_add_job()
573 } while (i != queue->write_idx); in util_queue_add_job()
575 assert(num_jobs == queue->num_queued); in util_queue_add_job()
577 free(queue->jobs); in util_queue_add_job()
578 queue->jobs = jobs; in util_queue_add_job()
579 queue->read_idx = 0; in util_queue_add_job()
580 queue->write_idx = num_jobs; in util_queue_add_job()
581 queue->max_jobs = new_max_jobs; in util_queue_add_job()
584 while (queue->num_queued == queue->max_jobs) in util_queue_add_job()
585 cnd_wait(&queue->has_space_cond, &queue->lock); in util_queue_add_job()
589 ptr = &queue->jobs[queue->write_idx]; in util_queue_add_job()
597 queue->write_idx = (queue->write_idx + 1) % queue->max_jobs; in util_queue_add_job()
598 queue->total_jobs_size += ptr->job_size; in util_queue_add_job()
600 queue->num_queued++; in util_queue_add_job()
601 cnd_signal(&queue->has_queued_cond); in util_queue_add_job()
602 mtx_unlock(&queue->lock); in util_queue_add_job()
616 util_queue_drop_job(struct util_queue *queue, struct util_queue_fence *fence) in util_queue_drop_job() argument
623 mtx_lock(&queue->lock); in util_queue_drop_job()
624 for (unsigned i = queue->read_idx; i != queue->write_idx; in util_queue_drop_job()
625 i = (i + 1) % queue->max_jobs) { in util_queue_drop_job()
626 if (queue->jobs[i].fence == fence) { in util_queue_drop_job()
627 if (queue->jobs[i].cleanup) in util_queue_drop_job()
628 queue->jobs[i].cleanup(queue->jobs[i].job, -1); in util_queue_drop_job()
631 memset(&queue->jobs[i], 0, sizeof(queue->jobs[i])); in util_queue_drop_job()
636 mtx_unlock(&queue->lock); in util_queue_drop_job()
655 util_queue_finish(struct util_queue *queue) in util_queue_finish() argument
664 mtx_lock(&queue->finish_lock); in util_queue_finish()
667 if (!queue->num_threads) { in util_queue_finish()
668 mtx_unlock(&queue->finish_lock); in util_queue_finish()
672 fences = malloc(queue->num_threads * sizeof(*fences)); in util_queue_finish()
673 util_barrier_init(&barrier, queue->num_threads); in util_queue_finish()
675 for (unsigned i = 0; i < queue->num_threads; ++i) { in util_queue_finish()
677 util_queue_add_job(queue, &barrier, &fences[i], in util_queue_finish()
681 for (unsigned i = 0; i < queue->num_threads; ++i) { in util_queue_finish()
685 mtx_unlock(&queue->finish_lock); in util_queue_finish()
693 util_queue_get_thread_time_nano(struct util_queue *queue, unsigned thread_index) in util_queue_get_thread_time_nano() argument
696 if (thread_index >= queue->num_threads) in util_queue_get_thread_time_nano()
699 return util_thread_get_time_nano(queue->threads[thread_index]); in util_queue_get_thread_time_nano()