Lines Matching refs:worker
139 static void io_wqe_dec_running(struct io_worker *worker);
146 static bool io_worker_get(struct io_worker *worker) in io_worker_get() argument
148 return refcount_inc_not_zero(&worker->ref); in io_worker_get()
151 static void io_worker_release(struct io_worker *worker) in io_worker_release() argument
153 if (refcount_dec_and_test(&worker->ref)) in io_worker_release()
154 complete(&worker->ref_done); in io_worker_release()
168 static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker) in io_wqe_get_acct() argument
170 return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND); in io_wqe_get_acct()
181 struct io_worker *worker = current->pf_io_worker; in io_wq_worker_stopped() local
186 return test_bit(IO_WQ_BIT_EXIT, &worker->wqe->wq->state); in io_wq_worker_stopped()
189 static void io_worker_cancel_cb(struct io_worker *worker) in io_worker_cancel_cb() argument
191 struct io_wqe_acct *acct = io_wqe_get_acct(worker); in io_worker_cancel_cb()
192 struct io_wqe *wqe = worker->wqe; in io_worker_cancel_cb()
196 raw_spin_lock(&worker->wqe->lock); in io_worker_cancel_cb()
198 raw_spin_unlock(&worker->wqe->lock); in io_worker_cancel_cb()
200 clear_bit_unlock(0, &worker->create_state); in io_worker_cancel_cb()
201 io_worker_release(worker); in io_worker_cancel_cb()
206 struct io_worker *worker; in io_task_worker_match() local
210 worker = container_of(cb, struct io_worker, create_work); in io_task_worker_match()
211 return worker == data; in io_task_worker_match()
214 static void io_worker_exit(struct io_worker *worker) in io_worker_exit() argument
216 struct io_wqe *wqe = worker->wqe; in io_worker_exit()
221 io_task_worker_match, worker); in io_worker_exit()
225 io_worker_cancel_cb(worker); in io_worker_exit()
228 if (refcount_dec_and_test(&worker->ref)) in io_worker_exit()
229 complete(&worker->ref_done); in io_worker_exit()
230 wait_for_completion(&worker->ref_done); in io_worker_exit()
233 if (worker->flags & IO_WORKER_F_FREE) in io_worker_exit()
234 hlist_nulls_del_rcu(&worker->nulls_node); in io_worker_exit()
235 list_del_rcu(&worker->all_list); in io_worker_exit()
237 io_wqe_dec_running(worker); in io_worker_exit()
238 worker->flags = 0; in io_worker_exit()
243 kfree_rcu(worker, rcu); in io_worker_exit()
265 struct io_worker *worker; in io_wqe_activate_free_worker() local
272 hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) { in io_wqe_activate_free_worker()
273 if (!io_worker_get(worker)) in io_wqe_activate_free_worker()
275 if (io_wqe_get_acct(worker) != acct) { in io_wqe_activate_free_worker()
276 io_worker_release(worker); in io_wqe_activate_free_worker()
279 if (wake_up_process(worker->task)) { in io_wqe_activate_free_worker()
280 io_worker_release(worker); in io_wqe_activate_free_worker()
283 io_worker_release(worker); in io_wqe_activate_free_worker()
314 static void io_wqe_inc_running(struct io_worker *worker) in io_wqe_inc_running() argument
316 struct io_wqe_acct *acct = io_wqe_get_acct(worker); in io_wqe_inc_running()
323 struct io_worker *worker; in create_worker_cb() local
329 worker = container_of(cb, struct io_worker, create_work); in create_worker_cb()
330 wqe = worker->wqe; in create_worker_cb()
332 acct = &wqe->acct[worker->create_index]; in create_worker_cb()
340 create_io_worker(wq, wqe, worker->create_index); in create_worker_cb()
345 clear_bit_unlock(0, &worker->create_state); in create_worker_cb()
346 io_worker_release(worker); in create_worker_cb()
349 static bool io_queue_worker_create(struct io_worker *worker, in io_queue_worker_create() argument
353 struct io_wqe *wqe = worker->wqe; in io_queue_worker_create()
359 if (!io_worker_get(worker)) in io_queue_worker_create()
367 if (test_bit(0, &worker->create_state) || in io_queue_worker_create()
368 test_and_set_bit_lock(0, &worker->create_state)) in io_queue_worker_create()
372 init_task_work(&worker->create_work, func); in io_queue_worker_create()
373 worker->create_index = acct->index; in io_queue_worker_create()
374 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { in io_queue_worker_create()
387 clear_bit_unlock(0, &worker->create_state); in io_queue_worker_create()
389 io_worker_release(worker); in io_queue_worker_create()
396 static void io_wqe_dec_running(struct io_worker *worker) in io_wqe_dec_running() argument
399 struct io_wqe_acct *acct = io_wqe_get_acct(worker); in io_wqe_dec_running()
400 struct io_wqe *wqe = worker->wqe; in io_wqe_dec_running()
402 if (!(worker->flags & IO_WORKER_F_UP)) in io_wqe_dec_running()
409 io_queue_worker_create(worker, acct, create_worker_cb); in io_wqe_dec_running()
418 static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, in __io_worker_busy() argument
422 if (worker->flags & IO_WORKER_F_FREE) { in __io_worker_busy()
423 worker->flags &= ~IO_WORKER_F_FREE; in __io_worker_busy()
424 hlist_nulls_del_init_rcu(&worker->nulls_node); in __io_worker_busy()
435 static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker) in __io_worker_idle() argument
438 if (!(worker->flags & IO_WORKER_F_FREE)) { in __io_worker_idle()
439 worker->flags |= IO_WORKER_F_FREE; in __io_worker_idle()
440 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); in __io_worker_idle()
468 struct io_worker *worker) in io_get_next_work() argument
474 struct io_wqe *wqe = worker->wqe; in io_get_next_work()
534 static void io_assign_current_work(struct io_worker *worker, in io_assign_current_work() argument
542 spin_lock(&worker->lock); in io_assign_current_work()
543 worker->cur_work = work; in io_assign_current_work()
544 spin_unlock(&worker->lock); in io_assign_current_work()
549 static void io_worker_handle_work(struct io_worker *worker) in io_worker_handle_work() argument
552 struct io_wqe_acct *acct = io_wqe_get_acct(worker); in io_worker_handle_work()
553 struct io_wqe *wqe = worker->wqe; in io_worker_handle_work()
567 work = io_get_next_work(acct, worker); in io_worker_handle_work()
569 __io_worker_busy(wqe, worker, work); in io_worker_handle_work()
574 io_assign_current_work(worker, work); in io_worker_handle_work()
587 io_assign_current_work(worker, NULL); in io_worker_handle_work()
595 io_assign_current_work(worker, work); in io_worker_handle_work()
621 struct io_worker *worker = data; in io_wqe_worker() local
622 struct io_wqe_acct *acct = io_wqe_get_acct(worker); in io_wqe_worker()
623 struct io_wqe *wqe = worker->wqe; in io_wqe_worker()
628 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); in io_wqe_worker()
640 io_worker_handle_work(worker); in io_wqe_worker()
651 __io_worker_idle(wqe, worker); in io_wqe_worker()
668 io_worker_handle_work(worker); in io_wqe_worker()
671 io_worker_exit(worker); in io_wqe_worker()
680 struct io_worker *worker = tsk->pf_io_worker; in io_wq_worker_running() local
682 if (!worker) in io_wq_worker_running()
684 if (!(worker->flags & IO_WORKER_F_UP)) in io_wq_worker_running()
686 if (worker->flags & IO_WORKER_F_RUNNING) in io_wq_worker_running()
688 worker->flags |= IO_WORKER_F_RUNNING; in io_wq_worker_running()
689 io_wqe_inc_running(worker); in io_wq_worker_running()
698 struct io_worker *worker = tsk->pf_io_worker; in io_wq_worker_sleeping() local
700 if (!worker) in io_wq_worker_sleeping()
702 if (!(worker->flags & IO_WORKER_F_UP)) in io_wq_worker_sleeping()
704 if (!(worker->flags & IO_WORKER_F_RUNNING)) in io_wq_worker_sleeping()
707 worker->flags &= ~IO_WORKER_F_RUNNING; in io_wq_worker_sleeping()
709 raw_spin_lock(&worker->wqe->lock); in io_wq_worker_sleeping()
710 io_wqe_dec_running(worker); in io_wq_worker_sleeping()
711 raw_spin_unlock(&worker->wqe->lock); in io_wq_worker_sleeping()
714 static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker, in io_init_new_worker() argument
717 tsk->pf_io_worker = worker; in io_init_new_worker()
718 worker->task = tsk; in io_init_new_worker()
723 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list); in io_init_new_worker()
724 list_add_tail_rcu(&worker->all_list, &wqe->all_list); in io_init_new_worker()
725 worker->flags |= IO_WORKER_F_FREE; in io_init_new_worker()
757 struct io_worker *worker; in create_worker_cont() local
761 worker = container_of(cb, struct io_worker, create_work); in create_worker_cont()
762 clear_bit_unlock(0, &worker->create_state); in create_worker_cont()
763 wqe = worker->wqe; in create_worker_cont()
764 tsk = create_io_thread(io_wqe_worker, worker, wqe->node); in create_worker_cont()
766 io_init_new_worker(wqe, worker, tsk); in create_worker_cont()
767 io_worker_release(worker); in create_worker_cont()
770 struct io_wqe_acct *acct = io_wqe_get_acct(worker); in create_worker_cont()
786 kfree(worker); in create_worker_cont()
791 io_worker_release(worker); in create_worker_cont()
792 schedule_work(&worker->work); in create_worker_cont()
797 struct io_worker *worker = container_of(work, struct io_worker, work); in io_workqueue_create() local
798 struct io_wqe_acct *acct = io_wqe_get_acct(worker); in io_workqueue_create()
800 if (!io_queue_worker_create(worker, acct, create_worker_cont)) in io_workqueue_create()
801 kfree(worker); in io_workqueue_create()
807 struct io_worker *worker; in create_io_worker() local
812 worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); in create_io_worker()
813 if (!worker) { in create_io_worker()
823 refcount_set(&worker->ref, 1); in create_io_worker()
824 worker->wqe = wqe; in create_io_worker()
825 spin_lock_init(&worker->lock); in create_io_worker()
826 init_completion(&worker->ref_done); in create_io_worker()
829 worker->flags |= IO_WORKER_F_BOUND; in create_io_worker()
831 tsk = create_io_thread(io_wqe_worker, worker, wqe->node); in create_io_worker()
833 io_init_new_worker(wqe, worker, tsk); in create_io_worker()
835 kfree(worker); in create_io_worker()
838 INIT_WORK(&worker->work, io_workqueue_create); in create_io_worker()
839 schedule_work(&worker->work); in create_io_worker()
853 struct io_worker *worker; in io_wq_for_each_worker() local
856 list_for_each_entry_rcu(worker, &wqe->all_list, all_list) { in io_wq_for_each_worker()
857 if (io_worker_get(worker)) { in io_wq_for_each_worker()
859 if (worker->task) in io_wq_for_each_worker()
860 ret = func(worker, data); in io_wq_for_each_worker()
861 io_worker_release(worker); in io_wq_for_each_worker()
870 static bool io_wq_worker_wake(struct io_worker *worker, void *data) in io_wq_worker_wake() argument
872 set_notify_signal(worker->task); in io_wq_worker_wake()
873 wake_up_process(worker->task); in io_wq_worker_wake()
983 static bool io_wq_worker_cancel(struct io_worker *worker, void *data) in io_wq_worker_cancel() argument
991 spin_lock(&worker->lock); in io_wq_worker_cancel()
992 if (worker->cur_work && in io_wq_worker_cancel()
993 match->fn(worker->cur_work, match->data)) { in io_wq_worker_cancel()
994 set_notify_signal(worker->task); in io_wq_worker_cancel()
997 spin_unlock(&worker->lock); in io_wq_worker_cancel()
1208 struct io_worker *worker; in io_task_work_match() local
1212 worker = container_of(cb, struct io_worker, create_work); in io_task_work_match()
1213 return worker->wqe->wq == data; in io_task_work_match()
1226 struct io_worker *worker; in io_wq_cancel_tw_create() local
1228 worker = container_of(cb, struct io_worker, create_work); in io_wq_cancel_tw_create()
1229 io_worker_cancel_cb(worker); in io_wq_cancel_tw_create()
1235 kfree(worker); in io_wq_cancel_tw_create()
1300 static bool io_wq_worker_affinity(struct io_worker *worker, void *data) in io_wq_worker_affinity() argument
1305 cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask); in io_wq_worker_affinity()
1307 cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask); in io_wq_worker_affinity()