1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * kernel/workqueue.c - generic async execution with shared worker pool
4 *
5 * Copyright (C) 2002 Ingo Molnar
6 *
7 * Derived from the taskqueue/keventd code by:
8 * David Woodhouse <dwmw2@infradead.org>
9 * Andrew Morton
10 * Kai Petzke <wpp@marie.physik.tu-berlin.de>
11 * Theodore Ts'o <tytso@mit.edu>
12 *
13 * Made to use alloc_percpu by Christoph Lameter.
14 *
15 * Copyright (C) 2010 SUSE Linux Products GmbH
16 * Copyright (C) 2010 Tejun Heo <tj@kernel.org>
17 *
18 * This is the generic async execution mechanism. Work items as are
19 * executed in process context. The worker pool is shared and
20 * automatically managed. There are two worker pools for each CPU (one for
21 * normal work items and the other for high priority ones) and some extra
22 * pools for workqueues which are not bound to any specific CPU - the
23 * number of these backing pools is dynamic.
24 *
25 * Please read Documentation/core-api/workqueue.rst for details.
26 */
27
28 #include <linux/export.h>
29 #include <linux/kernel.h>
30 #include <linux/sched.h>
31 #include <linux/init.h>
32 #include <linux/signal.h>
33 #include <linux/completion.h>
34 #include <linux/workqueue.h>
35 #include <linux/slab.h>
36 #include <linux/cpu.h>
37 #include <linux/notifier.h>
38 #include <linux/kthread.h>
39 #include <linux/hardirq.h>
40 #include <linux/mempolicy.h>
41 #include <linux/freezer.h>
42 #include <linux/debug_locks.h>
43 #include <linux/lockdep.h>
44 #include <linux/idr.h>
45 #include <linux/jhash.h>
46 #include <linux/hashtable.h>
47 #include <linux/rculist.h>
48 #include <linux/nodemask.h>
49 #include <linux/moduleparam.h>
50 #include <linux/uaccess.h>
51 #include <linux/sched/isolation.h>
52 #include <linux/nmi.h>
53 #include <linux/kvm_para.h>
54
55 #include "workqueue_internal.h"
56
57 #include <trace/hooks/wqlockup.h>
58 /* events/workqueue.h uses default TRACE_INCLUDE_PATH */
59 #undef TRACE_INCLUDE_PATH
60
61 enum {
62 /*
63 * worker_pool flags
64 *
65 * A bound pool is either associated or disassociated with its CPU.
66 * While associated (!DISASSOCIATED), all workers are bound to the
67 * CPU and none has %WORKER_UNBOUND set and concurrency management
68 * is in effect.
69 *
70 * While DISASSOCIATED, the cpu may be offline and all workers have
71 * %WORKER_UNBOUND set and concurrency management disabled, and may
72 * be executing on any CPU. The pool behaves as an unbound one.
73 *
74 * Note that DISASSOCIATED should be flipped only while holding
75 * wq_pool_attach_mutex to avoid changing binding state while
76 * worker_attach_to_pool() is in progress.
77 */
78 POOL_MANAGER_ACTIVE = 1 << 0, /* being managed */
79 POOL_DISASSOCIATED = 1 << 2, /* cpu can't serve workers */
80
81 /* worker flags */
82 WORKER_DIE = 1 << 1, /* die die die */
83 WORKER_IDLE = 1 << 2, /* is idle */
84 WORKER_PREP = 1 << 3, /* preparing to run works */
85 WORKER_CPU_INTENSIVE = 1 << 6, /* cpu intensive */
86 WORKER_UNBOUND = 1 << 7, /* worker is unbound */
87 WORKER_REBOUND = 1 << 8, /* worker was rebound */
88
89 WORKER_NOT_RUNNING = WORKER_PREP | WORKER_CPU_INTENSIVE |
90 WORKER_UNBOUND | WORKER_REBOUND,
91
92 NR_STD_WORKER_POOLS = 2, /* # standard pools per cpu */
93
94 UNBOUND_POOL_HASH_ORDER = 6, /* hashed by pool->attrs */
95 BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */
96
97 MAX_IDLE_WORKERS_RATIO = 4, /* 1/4 of busy can be idle */
98 IDLE_WORKER_TIMEOUT = 300 * HZ, /* keep idle ones for 5 mins */
99
100 MAYDAY_INITIAL_TIMEOUT = HZ / 100 >= 2 ? HZ / 100 : 2,
101 /* call for help after 10ms
102 (min two ticks) */
103 MAYDAY_INTERVAL = HZ / 10, /* and then every 100ms */
104 CREATE_COOLDOWN = HZ, /* time to breath after fail */
105
106 /*
107 * Rescue workers are used only on emergencies and shared by
108 * all cpus. Give MIN_NICE.
109 */
110 RESCUER_NICE_LEVEL = MIN_NICE,
111 HIGHPRI_NICE_LEVEL = MIN_NICE,
112
113 WQ_NAME_LEN = 24,
114 };
115
116 /*
117 * Structure fields follow one of the following exclusion rules.
118 *
119 * I: Modifiable by initialization/destruction paths and read-only for
120 * everyone else.
121 *
122 * P: Preemption protected. Disabling preemption is enough and should
123 * only be modified and accessed from the local cpu.
124 *
125 * L: pool->lock protected. Access with pool->lock held.
126 *
127 * X: During normal operation, modification requires pool->lock and should
128 * be done only from local cpu. Either disabling preemption on local
129 * cpu or grabbing pool->lock is enough for read access. If
130 * POOL_DISASSOCIATED is set, it's identical to L.
131 *
132 * A: wq_pool_attach_mutex protected.
133 *
134 * PL: wq_pool_mutex protected.
135 *
136 * PR: wq_pool_mutex protected for writes. RCU protected for reads.
137 *
138 * PW: wq_pool_mutex and wq->mutex protected for writes. Either for reads.
139 *
140 * PWR: wq_pool_mutex and wq->mutex protected for writes. Either or
141 * RCU for reads.
142 *
143 * WQ: wq->mutex protected.
144 *
145 * WR: wq->mutex protected for writes. RCU protected for reads.
146 *
147 * MD: wq_mayday_lock protected.
148 */
149
150 /* struct worker is defined in workqueue_internal.h */
151
152 struct worker_pool {
153 raw_spinlock_t lock; /* the pool lock */
154 int cpu; /* I: the associated cpu */
155 int node; /* I: the associated node ID */
156 int id; /* I: pool ID */
157 unsigned int flags; /* X: flags */
158
159 unsigned long watchdog_ts; /* L: watchdog timestamp */
160
161 struct list_head worklist; /* L: list of pending works */
162
163 int nr_workers; /* L: total number of workers */
164 int nr_idle; /* L: currently idle workers */
165
166 struct list_head idle_list; /* X: list of idle workers */
167 struct timer_list idle_timer; /* L: worker idle timeout */
168 struct timer_list mayday_timer; /* L: SOS timer for workers */
169
170 /* a workers is either on busy_hash or idle_list, or the manager */
171 DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER);
172 /* L: hash of busy workers */
173
174 struct worker *manager; /* L: purely informational */
175 struct list_head workers; /* A: attached workers */
176 struct completion *detach_completion; /* all workers detached */
177
178 struct ida worker_ida; /* worker IDs for task name */
179
180 struct workqueue_attrs *attrs; /* I: worker attributes */
181 struct hlist_node hash_node; /* PL: unbound_pool_hash node */
182 int refcnt; /* PL: refcnt for unbound pools */
183
184 /*
185 * The current concurrency level. As it's likely to be accessed
186 * from other CPUs during try_to_wake_up(), put it in a separate
187 * cacheline.
188 */
189 atomic_t nr_running ____cacheline_aligned_in_smp;
190
191 /*
192 * Destruction of pool is RCU protected to allow dereferences
193 * from get_work_pool().
194 */
195 struct rcu_head rcu;
196 } ____cacheline_aligned_in_smp;
197
198 /*
199 * The per-pool workqueue. While queued, the lower WORK_STRUCT_FLAG_BITS
200 * of work_struct->data are used for flags and the remaining high bits
201 * point to the pwq; thus, pwqs need to be aligned at two's power of the
202 * number of flag bits.
203 */
204 struct pool_workqueue {
205 struct worker_pool *pool; /* I: the associated pool */
206 struct workqueue_struct *wq; /* I: the owning workqueue */
207 int work_color; /* L: current color */
208 int flush_color; /* L: flushing color */
209 int refcnt; /* L: reference count */
210 int nr_in_flight[WORK_NR_COLORS];
211 /* L: nr of in_flight works */
212 int nr_active; /* L: nr of active works */
213 int max_active; /* L: max active works */
214 struct list_head delayed_works; /* L: delayed works */
215 struct list_head pwqs_node; /* WR: node on wq->pwqs */
216 struct list_head mayday_node; /* MD: node on wq->maydays */
217
218 /*
219 * Release of unbound pwq is punted to system_wq. See put_pwq()
220 * and pwq_unbound_release_workfn() for details. pool_workqueue
221 * itself is also RCU protected so that the first pwq can be
222 * determined without grabbing wq->mutex.
223 */
224 struct work_struct unbound_release_work;
225 struct rcu_head rcu;
226 } __aligned(1 << WORK_STRUCT_FLAG_BITS);
227
228 /*
229 * Structure used to wait for workqueue flush.
230 */
231 struct wq_flusher {
232 struct list_head list; /* WQ: list of flushers */
233 int flush_color; /* WQ: flush color waiting for */
234 struct completion done; /* flush completion */
235 };
236
237 struct wq_device;
238
239 /*
240 * The externally visible workqueue. It relays the issued work items to
241 * the appropriate worker_pool through its pool_workqueues.
242 */
243 struct workqueue_struct {
244 struct list_head pwqs; /* WR: all pwqs of this wq */
245 struct list_head list; /* PR: list of all workqueues */
246
247 struct mutex mutex; /* protects this wq */
248 int work_color; /* WQ: current work color */
249 int flush_color; /* WQ: current flush color */
250 atomic_t nr_pwqs_to_flush; /* flush in progress */
251 struct wq_flusher *first_flusher; /* WQ: first flusher */
252 struct list_head flusher_queue; /* WQ: flush waiters */
253 struct list_head flusher_overflow; /* WQ: flush overflow list */
254
255 struct list_head maydays; /* MD: pwqs requesting rescue */
256 struct worker *rescuer; /* MD: rescue worker */
257
258 int nr_drainers; /* WQ: drain in progress */
259 int saved_max_active; /* WQ: saved pwq max_active */
260
261 struct workqueue_attrs *unbound_attrs; /* PW: only for unbound wqs */
262 struct pool_workqueue *dfl_pwq; /* PW: only for unbound wqs */
263
264 #ifdef CONFIG_SYSFS
265 struct wq_device *wq_dev; /* I: for sysfs interface */
266 #endif
267 #ifdef CONFIG_LOCKDEP
268 char *lock_name;
269 struct lock_class_key key;
270 struct lockdep_map lockdep_map;
271 #endif
272 char name[WQ_NAME_LEN]; /* I: workqueue name */
273
274 /*
275 * Destruction of workqueue_struct is RCU protected to allow walking
276 * the workqueues list without grabbing wq_pool_mutex.
277 * This is used to dump all workqueues from sysrq.
278 */
279 struct rcu_head rcu;
280
281 /* hot fields used during command issue, aligned to cacheline */
282 unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags */
283 struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */
284 struct pool_workqueue __rcu *numa_pwq_tbl[]; /* PWR: unbound pwqs indexed by node */
285 };
286
287 static struct kmem_cache *pwq_cache;
288
289 static cpumask_var_t *wq_numa_possible_cpumask;
290 /* possible CPUs of each node */
291
292 static bool wq_disable_numa;
293 module_param_named(disable_numa, wq_disable_numa, bool, 0444);
294
295 /* see the comment above the definition of WQ_POWER_EFFICIENT */
296 static bool wq_power_efficient = IS_ENABLED(CONFIG_WQ_POWER_EFFICIENT_DEFAULT);
297 module_param_named(power_efficient, wq_power_efficient, bool, 0444);
298
299 static bool wq_online; /* can kworkers be created yet? */
300
301 static bool wq_numa_enabled; /* unbound NUMA affinity enabled */
302
303 /* buf for wq_update_unbound_numa_attrs(), protected by CPU hotplug exclusion */
304 static struct workqueue_attrs *wq_update_unbound_numa_attrs_buf;
305
306 static DEFINE_MUTEX(wq_pool_mutex); /* protects pools and workqueues list */
307 static DEFINE_MUTEX(wq_pool_attach_mutex); /* protects worker attach/detach */
308 static DEFINE_RAW_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */
309 /* wait for manager to go away */
310 static struct rcuwait manager_wait = __RCUWAIT_INITIALIZER(manager_wait);
311
312 static LIST_HEAD(workqueues); /* PR: list of all workqueues */
313 static bool workqueue_freezing; /* PL: have wqs started freezing? */
314
315 /* PL: allowable cpus for unbound wqs and work items */
316 static cpumask_var_t wq_unbound_cpumask;
317
318 /* CPU where unbound work was last round robin scheduled from this CPU */
319 static DEFINE_PER_CPU(int, wq_rr_cpu_last);
320
321 /*
322 * Local execution of unbound work items is no longer guaranteed. The
323 * following always forces round-robin CPU selection on unbound work items
324 * to uncover usages which depend on it.
325 */
326 #ifdef CONFIG_DEBUG_WQ_FORCE_RR_CPU
327 static bool wq_debug_force_rr_cpu = true;
328 #else
329 static bool wq_debug_force_rr_cpu = false;
330 #endif
331 module_param_named(debug_force_rr_cpu, wq_debug_force_rr_cpu, bool, 0644);
332
333 /* the per-cpu worker pools */
334 static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);
335
336 static DEFINE_IDR(worker_pool_idr); /* PR: idr of all pools */
337
338 /* PL: hash of all unbound pools keyed by pool->attrs */
339 static DEFINE_HASHTABLE(unbound_pool_hash, UNBOUND_POOL_HASH_ORDER);
340
341 /* I: attributes used when instantiating standard unbound pools on demand */
342 static struct workqueue_attrs *unbound_std_wq_attrs[NR_STD_WORKER_POOLS];
343
344 /* I: attributes used when instantiating ordered pools on demand */
345 static struct workqueue_attrs *ordered_wq_attrs[NR_STD_WORKER_POOLS];
346
347 struct workqueue_struct *system_wq __read_mostly;
348 EXPORT_SYMBOL(system_wq);
349 struct workqueue_struct *system_highpri_wq __read_mostly;
350 EXPORT_SYMBOL_GPL(system_highpri_wq);
351 struct workqueue_struct *system_long_wq __read_mostly;
352 EXPORT_SYMBOL_GPL(system_long_wq);
353 struct workqueue_struct *system_unbound_wq __read_mostly;
354 EXPORT_SYMBOL_GPL(system_unbound_wq);
355 struct workqueue_struct *system_freezable_wq __read_mostly;
356 EXPORT_SYMBOL_GPL(system_freezable_wq);
357 struct workqueue_struct *system_power_efficient_wq __read_mostly;
358 EXPORT_SYMBOL_GPL(system_power_efficient_wq);
359 struct workqueue_struct *system_freezable_power_efficient_wq __read_mostly;
360 EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq);
361
362 static int worker_thread(void *__worker);
363 static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
364 static void show_pwq(struct pool_workqueue *pwq);
365
366 #define CREATE_TRACE_POINTS
367 #include <trace/events/workqueue.h>
368
369 EXPORT_TRACEPOINT_SYMBOL_GPL(workqueue_execute_start);
370 EXPORT_TRACEPOINT_SYMBOL_GPL(workqueue_execute_end);
371
372 #define assert_rcu_or_pool_mutex() \
373 RCU_LOCKDEP_WARN(!rcu_read_lock_held() && \
374 !lockdep_is_held(&wq_pool_mutex), \
375 "RCU or wq_pool_mutex should be held")
376
377 #define assert_rcu_or_wq_mutex_or_pool_mutex(wq) \
378 RCU_LOCKDEP_WARN(!rcu_read_lock_held() && \
379 !lockdep_is_held(&wq->mutex) && \
380 !lockdep_is_held(&wq_pool_mutex), \
381 "RCU, wq->mutex or wq_pool_mutex should be held")
382
383 #define for_each_cpu_worker_pool(pool, cpu) \
384 for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0]; \
385 (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
386 (pool)++)
387
388 /**
389 * for_each_pool - iterate through all worker_pools in the system
390 * @pool: iteration cursor
391 * @pi: integer used for iteration
392 *
393 * This must be called either with wq_pool_mutex held or RCU read
394 * locked. If the pool needs to be used beyond the locking in effect, the
395 * caller is responsible for guaranteeing that the pool stays online.
396 *
397 * The if/else clause exists only for the lockdep assertion and can be
398 * ignored.
399 */
400 #define for_each_pool(pool, pi) \
401 idr_for_each_entry(&worker_pool_idr, pool, pi) \
402 if (({ assert_rcu_or_pool_mutex(); false; })) { } \
403 else
404
405 /**
406 * for_each_pool_worker - iterate through all workers of a worker_pool
407 * @worker: iteration cursor
408 * @pool: worker_pool to iterate workers of
409 *
410 * This must be called with wq_pool_attach_mutex.
411 *
412 * The if/else clause exists only for the lockdep assertion and can be
413 * ignored.
414 */
415 #define for_each_pool_worker(worker, pool) \
416 list_for_each_entry((worker), &(pool)->workers, node) \
417 if (({ lockdep_assert_held(&wq_pool_attach_mutex); false; })) { } \
418 else
419
420 /**
421 * for_each_pwq - iterate through all pool_workqueues of the specified workqueue
422 * @pwq: iteration cursor
423 * @wq: the target workqueue
424 *
425 * This must be called either with wq->mutex held or RCU read locked.
426 * If the pwq needs to be used beyond the locking in effect, the caller is
427 * responsible for guaranteeing that the pwq stays online.
428 *
429 * The if/else clause exists only for the lockdep assertion and can be
430 * ignored.
431 */
432 #define for_each_pwq(pwq, wq) \
433 list_for_each_entry_rcu((pwq), &(wq)->pwqs, pwqs_node, \
434 lockdep_is_held(&(wq->mutex)))
435
436 #ifdef CONFIG_DEBUG_OBJECTS_WORK
437
438 static const struct debug_obj_descr work_debug_descr;
439
work_debug_hint(void * addr)440 static void *work_debug_hint(void *addr)
441 {
442 return ((struct work_struct *) addr)->func;
443 }
444
work_is_static_object(void * addr)445 static bool work_is_static_object(void *addr)
446 {
447 struct work_struct *work = addr;
448
449 return test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work));
450 }
451
452 /*
453 * fixup_init is called when:
454 * - an active object is initialized
455 */
work_fixup_init(void * addr,enum debug_obj_state state)456 static bool work_fixup_init(void *addr, enum debug_obj_state state)
457 {
458 struct work_struct *work = addr;
459
460 switch (state) {
461 case ODEBUG_STATE_ACTIVE:
462 cancel_work_sync(work);
463 debug_object_init(work, &work_debug_descr);
464 return true;
465 default:
466 return false;
467 }
468 }
469
470 /*
471 * fixup_free is called when:
472 * - an active object is freed
473 */
work_fixup_free(void * addr,enum debug_obj_state state)474 static bool work_fixup_free(void *addr, enum debug_obj_state state)
475 {
476 struct work_struct *work = addr;
477
478 switch (state) {
479 case ODEBUG_STATE_ACTIVE:
480 cancel_work_sync(work);
481 debug_object_free(work, &work_debug_descr);
482 return true;
483 default:
484 return false;
485 }
486 }
487
488 static const struct debug_obj_descr work_debug_descr = {
489 .name = "work_struct",
490 .debug_hint = work_debug_hint,
491 .is_static_object = work_is_static_object,
492 .fixup_init = work_fixup_init,
493 .fixup_free = work_fixup_free,
494 };
495
debug_work_activate(struct work_struct * work)496 static inline void debug_work_activate(struct work_struct *work)
497 {
498 debug_object_activate(work, &work_debug_descr);
499 }
500
debug_work_deactivate(struct work_struct * work)501 static inline void debug_work_deactivate(struct work_struct *work)
502 {
503 debug_object_deactivate(work, &work_debug_descr);
504 }
505
__init_work(struct work_struct * work,int onstack)506 void __init_work(struct work_struct *work, int onstack)
507 {
508 if (onstack)
509 debug_object_init_on_stack(work, &work_debug_descr);
510 else
511 debug_object_init(work, &work_debug_descr);
512 }
513 EXPORT_SYMBOL_GPL(__init_work);
514
destroy_work_on_stack(struct work_struct * work)515 void destroy_work_on_stack(struct work_struct *work)
516 {
517 debug_object_free(work, &work_debug_descr);
518 }
519 EXPORT_SYMBOL_GPL(destroy_work_on_stack);
520
destroy_delayed_work_on_stack(struct delayed_work * work)521 void destroy_delayed_work_on_stack(struct delayed_work *work)
522 {
523 destroy_timer_on_stack(&work->timer);
524 debug_object_free(&work->work, &work_debug_descr);
525 }
526 EXPORT_SYMBOL_GPL(destroy_delayed_work_on_stack);
527
528 #else
debug_work_activate(struct work_struct * work)529 static inline void debug_work_activate(struct work_struct *work) { }
debug_work_deactivate(struct work_struct * work)530 static inline void debug_work_deactivate(struct work_struct *work) { }
531 #endif
532
533 /**
534 * worker_pool_assign_id - allocate ID and assing it to @pool
535 * @pool: the pool pointer of interest
536 *
537 * Returns 0 if ID in [0, WORK_OFFQ_POOL_NONE) is allocated and assigned
538 * successfully, -errno on failure.
539 */
worker_pool_assign_id(struct worker_pool * pool)540 static int worker_pool_assign_id(struct worker_pool *pool)
541 {
542 int ret;
543
544 lockdep_assert_held(&wq_pool_mutex);
545
546 ret = idr_alloc(&worker_pool_idr, pool, 0, WORK_OFFQ_POOL_NONE,
547 GFP_KERNEL);
548 if (ret >= 0) {
549 pool->id = ret;
550 return 0;
551 }
552 return ret;
553 }
554
555 /**
556 * unbound_pwq_by_node - return the unbound pool_workqueue for the given node
557 * @wq: the target workqueue
558 * @node: the node ID
559 *
560 * This must be called with any of wq_pool_mutex, wq->mutex or RCU
561 * read locked.
562 * If the pwq needs to be used beyond the locking in effect, the caller is
563 * responsible for guaranteeing that the pwq stays online.
564 *
565 * Return: The unbound pool_workqueue for @node.
566 */
unbound_pwq_by_node(struct workqueue_struct * wq,int node)567 static struct pool_workqueue *unbound_pwq_by_node(struct workqueue_struct *wq,
568 int node)
569 {
570 assert_rcu_or_wq_mutex_or_pool_mutex(wq);
571
572 /*
573 * XXX: @node can be NUMA_NO_NODE if CPU goes offline while a
574 * delayed item is pending. The plan is to keep CPU -> NODE
575 * mapping valid and stable across CPU on/offlines. Once that
576 * happens, this workaround can be removed.
577 */
578 if (unlikely(node == NUMA_NO_NODE))
579 return wq->dfl_pwq;
580
581 return rcu_dereference_raw(wq->numa_pwq_tbl[node]);
582 }
583
work_color_to_flags(int color)584 static unsigned int work_color_to_flags(int color)
585 {
586 return color << WORK_STRUCT_COLOR_SHIFT;
587 }
588
get_work_color(struct work_struct * work)589 static int get_work_color(struct work_struct *work)
590 {
591 return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) &
592 ((1 << WORK_STRUCT_COLOR_BITS) - 1);
593 }
594
work_next_color(int color)595 static int work_next_color(int color)
596 {
597 return (color + 1) % WORK_NR_COLORS;
598 }
599
600 /*
601 * While queued, %WORK_STRUCT_PWQ is set and non flag bits of a work's data
602 * contain the pointer to the queued pwq. Once execution starts, the flag
603 * is cleared and the high bits contain OFFQ flags and pool ID.
604 *
605 * set_work_pwq(), set_work_pool_and_clear_pending(), mark_work_canceling()
606 * and clear_work_data() can be used to set the pwq, pool or clear
607 * work->data. These functions should only be called while the work is
608 * owned - ie. while the PENDING bit is set.
609 *
610 * get_work_pool() and get_work_pwq() can be used to obtain the pool or pwq
611 * corresponding to a work. Pool is available once the work has been
612 * queued anywhere after initialization until it is sync canceled. pwq is
613 * available only while the work item is queued.
614 *
615 * %WORK_OFFQ_CANCELING is used to mark a work item which is being
616 * canceled. While being canceled, a work item may have its PENDING set
617 * but stay off timer and worklist for arbitrarily long and nobody should
618 * try to steal the PENDING bit.
619 */
set_work_data(struct work_struct * work,unsigned long data,unsigned long flags)620 static inline void set_work_data(struct work_struct *work, unsigned long data,
621 unsigned long flags)
622 {
623 WARN_ON_ONCE(!work_pending(work));
624 atomic_long_set(&work->data, data | flags | work_static(work));
625 }
626
set_work_pwq(struct work_struct * work,struct pool_workqueue * pwq,unsigned long extra_flags)627 static void set_work_pwq(struct work_struct *work, struct pool_workqueue *pwq,
628 unsigned long extra_flags)
629 {
630 set_work_data(work, (unsigned long)pwq,
631 WORK_STRUCT_PENDING | WORK_STRUCT_PWQ | extra_flags);
632 }
633
set_work_pool_and_keep_pending(struct work_struct * work,int pool_id)634 static void set_work_pool_and_keep_pending(struct work_struct *work,
635 int pool_id)
636 {
637 set_work_data(work, (unsigned long)pool_id << WORK_OFFQ_POOL_SHIFT,
638 WORK_STRUCT_PENDING);
639 }
640
set_work_pool_and_clear_pending(struct work_struct * work,int pool_id)641 static void set_work_pool_and_clear_pending(struct work_struct *work,
642 int pool_id)
643 {
644 /*
645 * The following wmb is paired with the implied mb in
646 * test_and_set_bit(PENDING) and ensures all updates to @work made
647 * here are visible to and precede any updates by the next PENDING
648 * owner.
649 */
650 smp_wmb();
651 set_work_data(work, (unsigned long)pool_id << WORK_OFFQ_POOL_SHIFT, 0);
652 /*
653 * The following mb guarantees that previous clear of a PENDING bit
654 * will not be reordered with any speculative LOADS or STORES from
655 * work->current_func, which is executed afterwards. This possible
656 * reordering can lead to a missed execution on attempt to queue
657 * the same @work. E.g. consider this case:
658 *
659 * CPU#0 CPU#1
660 * ---------------------------- --------------------------------
661 *
662 * 1 STORE event_indicated
663 * 2 queue_work_on() {
664 * 3 test_and_set_bit(PENDING)
665 * 4 } set_..._and_clear_pending() {
666 * 5 set_work_data() # clear bit
667 * 6 smp_mb()
668 * 7 work->current_func() {
669 * 8 LOAD event_indicated
670 * }
671 *
672 * Without an explicit full barrier speculative LOAD on line 8 can
673 * be executed before CPU#0 does STORE on line 1. If that happens,
674 * CPU#0 observes the PENDING bit is still set and new execution of
675 * a @work is not queued in a hope, that CPU#1 will eventually
676 * finish the queued @work. Meanwhile CPU#1 does not see
677 * event_indicated is set, because speculative LOAD was executed
678 * before actual STORE.
679 */
680 smp_mb();
681 }
682
clear_work_data(struct work_struct * work)683 static void clear_work_data(struct work_struct *work)
684 {
685 smp_wmb(); /* see set_work_pool_and_clear_pending() */
686 set_work_data(work, WORK_STRUCT_NO_POOL, 0);
687 }
688
work_struct_pwq(unsigned long data)689 static inline struct pool_workqueue *work_struct_pwq(unsigned long data)
690 {
691 return (struct pool_workqueue *)(data & WORK_STRUCT_WQ_DATA_MASK);
692 }
693
get_work_pwq(struct work_struct * work)694 static struct pool_workqueue *get_work_pwq(struct work_struct *work)
695 {
696 unsigned long data = atomic_long_read(&work->data);
697
698 if (data & WORK_STRUCT_PWQ)
699 return work_struct_pwq(data);
700 else
701 return NULL;
702 }
703
704 /**
705 * get_work_pool - return the worker_pool a given work was associated with
706 * @work: the work item of interest
707 *
708 * Pools are created and destroyed under wq_pool_mutex, and allows read
709 * access under RCU read lock. As such, this function should be
710 * called under wq_pool_mutex or inside of a rcu_read_lock() region.
711 *
712 * All fields of the returned pool are accessible as long as the above
713 * mentioned locking is in effect. If the returned pool needs to be used
714 * beyond the critical section, the caller is responsible for ensuring the
715 * returned pool is and stays online.
716 *
717 * Return: The worker_pool @work was last associated with. %NULL if none.
718 */
get_work_pool(struct work_struct * work)719 static struct worker_pool *get_work_pool(struct work_struct *work)
720 {
721 unsigned long data = atomic_long_read(&work->data);
722 int pool_id;
723
724 assert_rcu_or_pool_mutex();
725
726 if (data & WORK_STRUCT_PWQ)
727 return work_struct_pwq(data)->pool;
728
729 pool_id = data >> WORK_OFFQ_POOL_SHIFT;
730 if (pool_id == WORK_OFFQ_POOL_NONE)
731 return NULL;
732
733 return idr_find(&worker_pool_idr, pool_id);
734 }
735
736 /**
737 * get_work_pool_id - return the worker pool ID a given work is associated with
738 * @work: the work item of interest
739 *
740 * Return: The worker_pool ID @work was last associated with.
741 * %WORK_OFFQ_POOL_NONE if none.
742 */
get_work_pool_id(struct work_struct * work)743 static int get_work_pool_id(struct work_struct *work)
744 {
745 unsigned long data = atomic_long_read(&work->data);
746
747 if (data & WORK_STRUCT_PWQ)
748 return work_struct_pwq(data)->pool->id;
749
750 return data >> WORK_OFFQ_POOL_SHIFT;
751 }
752
mark_work_canceling(struct work_struct * work)753 static void mark_work_canceling(struct work_struct *work)
754 {
755 unsigned long pool_id = get_work_pool_id(work);
756
757 pool_id <<= WORK_OFFQ_POOL_SHIFT;
758 set_work_data(work, pool_id | WORK_OFFQ_CANCELING, WORK_STRUCT_PENDING);
759 }
760
work_is_canceling(struct work_struct * work)761 static bool work_is_canceling(struct work_struct *work)
762 {
763 unsigned long data = atomic_long_read(&work->data);
764
765 return !(data & WORK_STRUCT_PWQ) && (data & WORK_OFFQ_CANCELING);
766 }
767
768 /*
769 * Policy functions. These define the policies on how the global worker
770 * pools are managed. Unless noted otherwise, these functions assume that
771 * they're being called with pool->lock held.
772 */
773
__need_more_worker(struct worker_pool * pool)774 static bool __need_more_worker(struct worker_pool *pool)
775 {
776 return !atomic_read(&pool->nr_running);
777 }
778
779 /*
780 * Need to wake up a worker? Called from anything but currently
781 * running workers.
782 *
783 * Note that, because unbound workers never contribute to nr_running, this
784 * function will always return %true for unbound pools as long as the
785 * worklist isn't empty.
786 */
need_more_worker(struct worker_pool * pool)787 static bool need_more_worker(struct worker_pool *pool)
788 {
789 return !list_empty(&pool->worklist) && __need_more_worker(pool);
790 }
791
792 /* Can I start working? Called from busy but !running workers. */
may_start_working(struct worker_pool * pool)793 static bool may_start_working(struct worker_pool *pool)
794 {
795 return pool->nr_idle;
796 }
797
798 /* Do I need to keep working? Called from currently running workers. */
keep_working(struct worker_pool * pool)799 static bool keep_working(struct worker_pool *pool)
800 {
801 return !list_empty(&pool->worklist) &&
802 atomic_read(&pool->nr_running) <= 1;
803 }
804
805 /* Do we need a new worker? Called from manager. */
need_to_create_worker(struct worker_pool * pool)806 static bool need_to_create_worker(struct worker_pool *pool)
807 {
808 return need_more_worker(pool) && !may_start_working(pool);
809 }
810
811 /* Do we have too many workers and should some go away? */
too_many_workers(struct worker_pool * pool)812 static bool too_many_workers(struct worker_pool *pool)
813 {
814 bool managing = pool->flags & POOL_MANAGER_ACTIVE;
815 int nr_idle = pool->nr_idle + managing; /* manager is considered idle */
816 int nr_busy = pool->nr_workers - nr_idle;
817
818 return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy;
819 }
820
821 /*
822 * Wake up functions.
823 */
824
825 /* Return the first idle worker. Safe with preemption disabled */
first_idle_worker(struct worker_pool * pool)826 static struct worker *first_idle_worker(struct worker_pool *pool)
827 {
828 if (unlikely(list_empty(&pool->idle_list)))
829 return NULL;
830
831 return list_first_entry(&pool->idle_list, struct worker, entry);
832 }
833
834 /**
835 * wake_up_worker - wake up an idle worker
836 * @pool: worker pool to wake worker from
837 *
838 * Wake up the first idle worker of @pool.
839 *
840 * CONTEXT:
841 * raw_spin_lock_irq(pool->lock).
842 */
wake_up_worker(struct worker_pool * pool)843 static void wake_up_worker(struct worker_pool *pool)
844 {
845 struct worker *worker = first_idle_worker(pool);
846
847 if (likely(worker))
848 wake_up_process(worker->task);
849 }
850
851 /**
852 * wq_worker_running - a worker is running again
853 * @task: task waking up
854 *
855 * This function is called when a worker returns from schedule()
856 */
wq_worker_running(struct task_struct * task)857 void wq_worker_running(struct task_struct *task)
858 {
859 struct worker *worker = kthread_data(task);
860
861 if (!worker->sleeping)
862 return;
863
864 /*
865 * If preempted by unbind_workers() between the WORKER_NOT_RUNNING check
866 * and the nr_running increment below, we may ruin the nr_running reset
867 * and leave with an unexpected pool->nr_running == 1 on the newly unbound
868 * pool. Protect against such race.
869 */
870 preempt_disable();
871 if (!(worker->flags & WORKER_NOT_RUNNING))
872 atomic_inc(&worker->pool->nr_running);
873 preempt_enable();
874 worker->sleeping = 0;
875 }
876
877 /**
878 * wq_worker_sleeping - a worker is going to sleep
879 * @task: task going to sleep
880 *
881 * This function is called from schedule() when a busy worker is
882 * going to sleep. Preemption needs to be disabled to protect ->sleeping
883 * assignment.
884 */
wq_worker_sleeping(struct task_struct * task)885 void wq_worker_sleeping(struct task_struct *task)
886 {
887 struct worker *next, *worker = kthread_data(task);
888 struct worker_pool *pool;
889
890 /*
891 * Rescuers, which may not have all the fields set up like normal
892 * workers, also reach here, let's not access anything before
893 * checking NOT_RUNNING.
894 */
895 if (worker->flags & WORKER_NOT_RUNNING)
896 return;
897
898 pool = worker->pool;
899
900 /* Return if preempted before wq_worker_running() was reached */
901 if (worker->sleeping)
902 return;
903
904 worker->sleeping = 1;
905 raw_spin_lock_irq(&pool->lock);
906
907 /*
908 * The counterpart of the following dec_and_test, implied mb,
909 * worklist not empty test sequence is in insert_work().
910 * Please read comment there.
911 *
912 * NOT_RUNNING is clear. This means that we're bound to and
913 * running on the local cpu w/ rq lock held and preemption
914 * disabled, which in turn means that none else could be
915 * manipulating idle_list, so dereferencing idle_list without pool
916 * lock is safe.
917 */
918 if (atomic_dec_and_test(&pool->nr_running) &&
919 !list_empty(&pool->worklist)) {
920 next = first_idle_worker(pool);
921 if (next)
922 wake_up_process(next->task);
923 }
924 raw_spin_unlock_irq(&pool->lock);
925 }
926
927 /**
928 * wq_worker_last_func - retrieve worker's last work function
929 * @task: Task to retrieve last work function of.
930 *
931 * Determine the last function a worker executed. This is called from
932 * the scheduler to get a worker's last known identity.
933 *
934 * CONTEXT:
935 * raw_spin_lock_irq(rq->lock)
936 *
937 * This function is called during schedule() when a kworker is going
938 * to sleep. It's used by psi to identify aggregation workers during
939 * dequeuing, to allow periodic aggregation to shut-off when that
940 * worker is the last task in the system or cgroup to go to sleep.
941 *
942 * As this function doesn't involve any workqueue-related locking, it
943 * only returns stable values when called from inside the scheduler's
944 * queuing and dequeuing paths, when @task, which must be a kworker,
945 * is guaranteed to not be processing any works.
946 *
947 * Return:
948 * The last work function %current executed as a worker, NULL if it
949 * hasn't executed any work yet.
950 */
wq_worker_last_func(struct task_struct * task)951 work_func_t wq_worker_last_func(struct task_struct *task)
952 {
953 struct worker *worker = kthread_data(task);
954
955 return worker->last_func;
956 }
957
958 /**
959 * worker_set_flags - set worker flags and adjust nr_running accordingly
960 * @worker: self
961 * @flags: flags to set
962 *
963 * Set @flags in @worker->flags and adjust nr_running accordingly.
964 *
965 * CONTEXT:
966 * raw_spin_lock_irq(pool->lock)
967 */
worker_set_flags(struct worker * worker,unsigned int flags)968 static inline void worker_set_flags(struct worker *worker, unsigned int flags)
969 {
970 struct worker_pool *pool = worker->pool;
971
972 WARN_ON_ONCE(worker->task != current);
973
974 /* If transitioning into NOT_RUNNING, adjust nr_running. */
975 if ((flags & WORKER_NOT_RUNNING) &&
976 !(worker->flags & WORKER_NOT_RUNNING)) {
977 atomic_dec(&pool->nr_running);
978 }
979
980 worker->flags |= flags;
981 }
982
983 /**
984 * worker_clr_flags - clear worker flags and adjust nr_running accordingly
985 * @worker: self
986 * @flags: flags to clear
987 *
988 * Clear @flags in @worker->flags and adjust nr_running accordingly.
989 *
990 * CONTEXT:
991 * raw_spin_lock_irq(pool->lock)
992 */
worker_clr_flags(struct worker * worker,unsigned int flags)993 static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
994 {
995 struct worker_pool *pool = worker->pool;
996 unsigned int oflags = worker->flags;
997
998 WARN_ON_ONCE(worker->task != current);
999
1000 worker->flags &= ~flags;
1001
1002 /*
1003 * If transitioning out of NOT_RUNNING, increment nr_running. Note
1004 * that the nested NOT_RUNNING is not a noop. NOT_RUNNING is mask
1005 * of multiple flags, not a single flag.
1006 */
1007 if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING))
1008 if (!(worker->flags & WORKER_NOT_RUNNING))
1009 atomic_inc(&pool->nr_running);
1010 }
1011
1012 /**
1013 * find_worker_executing_work - find worker which is executing a work
1014 * @pool: pool of interest
1015 * @work: work to find worker for
1016 *
1017 * Find a worker which is executing @work on @pool by searching
1018 * @pool->busy_hash which is keyed by the address of @work. For a worker
1019 * to match, its current execution should match the address of @work and
1020 * its work function. This is to avoid unwanted dependency between
1021 * unrelated work executions through a work item being recycled while still
1022 * being executed.
1023 *
1024 * This is a bit tricky. A work item may be freed once its execution
1025 * starts and nothing prevents the freed area from being recycled for
1026 * another work item. If the same work item address ends up being reused
1027 * before the original execution finishes, workqueue will identify the
1028 * recycled work item as currently executing and make it wait until the
1029 * current execution finishes, introducing an unwanted dependency.
1030 *
1031 * This function checks the work item address and work function to avoid
1032 * false positives. Note that this isn't complete as one may construct a
1033 * work function which can introduce dependency onto itself through a
1034 * recycled work item. Well, if somebody wants to shoot oneself in the
1035 * foot that badly, there's only so much we can do, and if such deadlock
1036 * actually occurs, it should be easy to locate the culprit work function.
1037 *
1038 * CONTEXT:
1039 * raw_spin_lock_irq(pool->lock).
1040 *
1041 * Return:
1042 * Pointer to worker which is executing @work if found, %NULL
1043 * otherwise.
1044 */
find_worker_executing_work(struct worker_pool * pool,struct work_struct * work)1045 static struct worker *find_worker_executing_work(struct worker_pool *pool,
1046 struct work_struct *work)
1047 {
1048 struct worker *worker;
1049
1050 hash_for_each_possible(pool->busy_hash, worker, hentry,
1051 (unsigned long)work)
1052 if (worker->current_work == work &&
1053 worker->current_func == work->func)
1054 return worker;
1055
1056 return NULL;
1057 }
1058
1059 /**
1060 * move_linked_works - move linked works to a list
1061 * @work: start of series of works to be scheduled
1062 * @head: target list to append @work to
1063 * @nextp: out parameter for nested worklist walking
1064 *
1065 * Schedule linked works starting from @work to @head. Work series to
1066 * be scheduled starts at @work and includes any consecutive work with
1067 * WORK_STRUCT_LINKED set in its predecessor.
1068 *
1069 * If @nextp is not NULL, it's updated to point to the next work of
1070 * the last scheduled work. This allows move_linked_works() to be
1071 * nested inside outer list_for_each_entry_safe().
1072 *
1073 * CONTEXT:
1074 * raw_spin_lock_irq(pool->lock).
1075 */
move_linked_works(struct work_struct * work,struct list_head * head,struct work_struct ** nextp)1076 static void move_linked_works(struct work_struct *work, struct list_head *head,
1077 struct work_struct **nextp)
1078 {
1079 struct work_struct *n;
1080
1081 /*
1082 * Linked worklist will always end before the end of the list,
1083 * use NULL for list head.
1084 */
1085 list_for_each_entry_safe_from(work, n, NULL, entry) {
1086 list_move_tail(&work->entry, head);
1087 if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
1088 break;
1089 }
1090
1091 /*
1092 * If we're already inside safe list traversal and have moved
1093 * multiple works to the scheduled queue, the next position
1094 * needs to be updated.
1095 */
1096 if (nextp)
1097 *nextp = n;
1098 }
1099
1100 /**
1101 * get_pwq - get an extra reference on the specified pool_workqueue
1102 * @pwq: pool_workqueue to get
1103 *
1104 * Obtain an extra reference on @pwq. The caller should guarantee that
1105 * @pwq has positive refcnt and be holding the matching pool->lock.
1106 */
get_pwq(struct pool_workqueue * pwq)1107 static void get_pwq(struct pool_workqueue *pwq)
1108 {
1109 lockdep_assert_held(&pwq->pool->lock);
1110 WARN_ON_ONCE(pwq->refcnt <= 0);
1111 pwq->refcnt++;
1112 }
1113
1114 /**
1115 * put_pwq - put a pool_workqueue reference
1116 * @pwq: pool_workqueue to put
1117 *
1118 * Drop a reference of @pwq. If its refcnt reaches zero, schedule its
1119 * destruction. The caller should be holding the matching pool->lock.
1120 */
put_pwq(struct pool_workqueue * pwq)1121 static void put_pwq(struct pool_workqueue *pwq)
1122 {
1123 lockdep_assert_held(&pwq->pool->lock);
1124 if (likely(--pwq->refcnt))
1125 return;
1126 if (WARN_ON_ONCE(!(pwq->wq->flags & WQ_UNBOUND)))
1127 return;
1128 /*
1129 * @pwq can't be released under pool->lock, bounce to
1130 * pwq_unbound_release_workfn(). This never recurses on the same
1131 * pool->lock as this path is taken only for unbound workqueues and
1132 * the release work item is scheduled on a per-cpu workqueue. To
1133 * avoid lockdep warning, unbound pool->locks are given lockdep
1134 * subclass of 1 in get_unbound_pool().
1135 */
1136 schedule_work(&pwq->unbound_release_work);
1137 }
1138
1139 /**
1140 * put_pwq_unlocked - put_pwq() with surrounding pool lock/unlock
1141 * @pwq: pool_workqueue to put (can be %NULL)
1142 *
1143 * put_pwq() with locking. This function also allows %NULL @pwq.
1144 */
put_pwq_unlocked(struct pool_workqueue * pwq)1145 static void put_pwq_unlocked(struct pool_workqueue *pwq)
1146 {
1147 if (pwq) {
1148 /*
1149 * As both pwqs and pools are RCU protected, the
1150 * following lock operations are safe.
1151 */
1152 raw_spin_lock_irq(&pwq->pool->lock);
1153 put_pwq(pwq);
1154 raw_spin_unlock_irq(&pwq->pool->lock);
1155 }
1156 }
1157
pwq_activate_delayed_work(struct work_struct * work)1158 static void pwq_activate_delayed_work(struct work_struct *work)
1159 {
1160 struct pool_workqueue *pwq = get_work_pwq(work);
1161
1162 trace_workqueue_activate_work(work);
1163 if (list_empty(&pwq->pool->worklist))
1164 pwq->pool->watchdog_ts = jiffies;
1165 move_linked_works(work, &pwq->pool->worklist, NULL);
1166 __clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work));
1167 pwq->nr_active++;
1168 }
1169
pwq_activate_first_delayed(struct pool_workqueue * pwq)1170 static void pwq_activate_first_delayed(struct pool_workqueue *pwq)
1171 {
1172 struct work_struct *work = list_first_entry(&pwq->delayed_works,
1173 struct work_struct, entry);
1174
1175 pwq_activate_delayed_work(work);
1176 }
1177
1178 /**
1179 * pwq_dec_nr_in_flight - decrement pwq's nr_in_flight
1180 * @pwq: pwq of interest
1181 * @color: color of work which left the queue
1182 *
1183 * A work either has completed or is removed from pending queue,
1184 * decrement nr_in_flight of its pwq and handle workqueue flushing.
1185 *
1186 * CONTEXT:
1187 * raw_spin_lock_irq(pool->lock).
1188 */
pwq_dec_nr_in_flight(struct pool_workqueue * pwq,int color)1189 static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, int color)
1190 {
1191 /* uncolored work items don't participate in flushing or nr_active */
1192 if (color == WORK_NO_COLOR)
1193 goto out_put;
1194
1195 pwq->nr_in_flight[color]--;
1196
1197 pwq->nr_active--;
1198 if (!list_empty(&pwq->delayed_works)) {
1199 /* one down, submit a delayed one */
1200 if (pwq->nr_active < pwq->max_active)
1201 pwq_activate_first_delayed(pwq);
1202 }
1203
1204 /* is flush in progress and are we at the flushing tip? */
1205 if (likely(pwq->flush_color != color))
1206 goto out_put;
1207
1208 /* are there still in-flight works? */
1209 if (pwq->nr_in_flight[color])
1210 goto out_put;
1211
1212 /* this pwq is done, clear flush_color */
1213 pwq->flush_color = -1;
1214
1215 /*
1216 * If this was the last pwq, wake up the first flusher. It
1217 * will handle the rest.
1218 */
1219 if (atomic_dec_and_test(&pwq->wq->nr_pwqs_to_flush))
1220 complete(&pwq->wq->first_flusher->done);
1221 out_put:
1222 put_pwq(pwq);
1223 }
1224
1225 /**
1226 * try_to_grab_pending - steal work item from worklist and disable irq
1227 * @work: work item to steal
1228 * @is_dwork: @work is a delayed_work
1229 * @flags: place to store irq state
1230 *
1231 * Try to grab PENDING bit of @work. This function can handle @work in any
1232 * stable state - idle, on timer or on worklist.
1233 *
1234 * Return:
1235 *
1236 * ======== ================================================================
1237 * 1 if @work was pending and we successfully stole PENDING
1238 * 0 if @work was idle and we claimed PENDING
1239 * -EAGAIN if PENDING couldn't be grabbed at the moment, safe to busy-retry
1240 * -ENOENT if someone else is canceling @work, this state may persist
1241 * for arbitrarily long
1242 * ======== ================================================================
1243 *
1244 * Note:
1245 * On >= 0 return, the caller owns @work's PENDING bit. To avoid getting
1246 * interrupted while holding PENDING and @work off queue, irq must be
1247 * disabled on entry. This, combined with delayed_work->timer being
1248 * irqsafe, ensures that we return -EAGAIN for finite short period of time.
1249 *
1250 * On successful return, >= 0, irq is disabled and the caller is
1251 * responsible for releasing it using local_irq_restore(*@flags).
1252 *
1253 * This function is safe to call from any context including IRQ handler.
1254 */
try_to_grab_pending(struct work_struct * work,bool is_dwork,unsigned long * flags)1255 static int try_to_grab_pending(struct work_struct *work, bool is_dwork,
1256 unsigned long *flags)
1257 {
1258 struct worker_pool *pool;
1259 struct pool_workqueue *pwq;
1260
1261 local_irq_save(*flags);
1262
1263 /* try to steal the timer if it exists */
1264 if (is_dwork) {
1265 struct delayed_work *dwork = to_delayed_work(work);
1266
1267 /*
1268 * dwork->timer is irqsafe. If del_timer() fails, it's
1269 * guaranteed that the timer is not queued anywhere and not
1270 * running on the local CPU.
1271 */
1272 if (likely(del_timer(&dwork->timer)))
1273 return 1;
1274 }
1275
1276 /* try to claim PENDING the normal way */
1277 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
1278 return 0;
1279
1280 rcu_read_lock();
1281 /*
1282 * The queueing is in progress, or it is already queued. Try to
1283 * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
1284 */
1285 pool = get_work_pool(work);
1286 if (!pool)
1287 goto fail;
1288
1289 raw_spin_lock(&pool->lock);
1290 /*
1291 * work->data is guaranteed to point to pwq only while the work
1292 * item is queued on pwq->wq, and both updating work->data to point
1293 * to pwq on queueing and to pool on dequeueing are done under
1294 * pwq->pool->lock. This in turn guarantees that, if work->data
1295 * points to pwq which is associated with a locked pool, the work
1296 * item is currently queued on that pool.
1297 */
1298 pwq = get_work_pwq(work);
1299 if (pwq && pwq->pool == pool) {
1300 debug_work_deactivate(work);
1301
1302 /*
1303 * A delayed work item cannot be grabbed directly because
1304 * it might have linked NO_COLOR work items which, if left
1305 * on the delayed_list, will confuse pwq->nr_active
1306 * management later on and cause stall. Make sure the work
1307 * item is activated before grabbing.
1308 */
1309 if (*work_data_bits(work) & WORK_STRUCT_DELAYED)
1310 pwq_activate_delayed_work(work);
1311
1312 list_del_init(&work->entry);
1313 pwq_dec_nr_in_flight(pwq, get_work_color(work));
1314
1315 /* work->data points to pwq iff queued, point to pool */
1316 set_work_pool_and_keep_pending(work, pool->id);
1317
1318 raw_spin_unlock(&pool->lock);
1319 rcu_read_unlock();
1320 return 1;
1321 }
1322 raw_spin_unlock(&pool->lock);
1323 fail:
1324 rcu_read_unlock();
1325 local_irq_restore(*flags);
1326 if (work_is_canceling(work))
1327 return -ENOENT;
1328 cpu_relax();
1329 return -EAGAIN;
1330 }
1331
1332 /**
1333 * insert_work - insert a work into a pool
1334 * @pwq: pwq @work belongs to
1335 * @work: work to insert
1336 * @head: insertion point
1337 * @extra_flags: extra WORK_STRUCT_* flags to set
1338 *
1339 * Insert @work which belongs to @pwq after @head. @extra_flags is or'd to
1340 * work_struct flags.
1341 *
1342 * CONTEXT:
1343 * raw_spin_lock_irq(pool->lock).
1344 */
insert_work(struct pool_workqueue * pwq,struct work_struct * work,struct list_head * head,unsigned int extra_flags)1345 static void insert_work(struct pool_workqueue *pwq, struct work_struct *work,
1346 struct list_head *head, unsigned int extra_flags)
1347 {
1348 struct worker_pool *pool = pwq->pool;
1349
1350 /* record the work call stack in order to print it in KASAN reports */
1351 kasan_record_aux_stack(work);
1352
1353 /* we own @work, set data and link */
1354 set_work_pwq(work, pwq, extra_flags);
1355 list_add_tail(&work->entry, head);
1356 get_pwq(pwq);
1357
1358 /*
1359 * Ensure either wq_worker_sleeping() sees the above
1360 * list_add_tail() or we see zero nr_running to avoid workers lying
1361 * around lazily while there are works to be processed.
1362 */
1363 smp_mb();
1364
1365 if (__need_more_worker(pool))
1366 wake_up_worker(pool);
1367 }
1368
1369 /*
1370 * Test whether @work is being queued from another work executing on the
1371 * same workqueue.
1372 */
is_chained_work(struct workqueue_struct * wq)1373 static bool is_chained_work(struct workqueue_struct *wq)
1374 {
1375 struct worker *worker;
1376
1377 worker = current_wq_worker();
1378 /*
1379 * Return %true iff I'm a worker executing a work item on @wq. If
1380 * I'm @worker, it's safe to dereference it without locking.
1381 */
1382 return worker && worker->current_pwq->wq == wq;
1383 }
1384
1385 /*
1386 * When queueing an unbound work item to a wq, prefer local CPU if allowed
1387 * by wq_unbound_cpumask. Otherwise, round robin among the allowed ones to
1388 * avoid perturbing sensitive tasks.
1389 */
wq_select_unbound_cpu(int cpu)1390 static int wq_select_unbound_cpu(int cpu)
1391 {
1392 static bool printed_dbg_warning;
1393 int new_cpu;
1394
1395 if (likely(!wq_debug_force_rr_cpu)) {
1396 if (cpumask_test_cpu(cpu, wq_unbound_cpumask))
1397 return cpu;
1398 } else if (!printed_dbg_warning) {
1399 pr_warn("workqueue: round-robin CPU selection forced, expect performance impact\n");
1400 printed_dbg_warning = true;
1401 }
1402
1403 if (cpumask_empty(wq_unbound_cpumask))
1404 return cpu;
1405
1406 new_cpu = __this_cpu_read(wq_rr_cpu_last);
1407 new_cpu = cpumask_next_and(new_cpu, wq_unbound_cpumask, cpu_online_mask);
1408 if (unlikely(new_cpu >= nr_cpu_ids)) {
1409 new_cpu = cpumask_first_and(wq_unbound_cpumask, cpu_online_mask);
1410 if (unlikely(new_cpu >= nr_cpu_ids))
1411 return cpu;
1412 }
1413 __this_cpu_write(wq_rr_cpu_last, new_cpu);
1414
1415 return new_cpu;
1416 }
1417
__queue_work(int cpu,struct workqueue_struct * wq,struct work_struct * work)1418 static void __queue_work(int cpu, struct workqueue_struct *wq,
1419 struct work_struct *work)
1420 {
1421 struct pool_workqueue *pwq;
1422 struct worker_pool *last_pool;
1423 struct list_head *worklist;
1424 unsigned int work_flags;
1425 unsigned int req_cpu = cpu;
1426
1427 /*
1428 * While a work item is PENDING && off queue, a task trying to
1429 * steal the PENDING will busy-loop waiting for it to either get
1430 * queued or lose PENDING. Grabbing PENDING and queueing should
1431 * happen with IRQ disabled.
1432 */
1433 lockdep_assert_irqs_disabled();
1434
1435
1436 /* if draining, only works from the same workqueue are allowed */
1437 if (unlikely(wq->flags & __WQ_DRAINING) &&
1438 WARN_ON_ONCE(!is_chained_work(wq)))
1439 return;
1440 rcu_read_lock();
1441 retry:
1442 /* pwq which will be used unless @work is executing elsewhere */
1443 if (wq->flags & WQ_UNBOUND) {
1444 if (req_cpu == WORK_CPU_UNBOUND)
1445 cpu = wq_select_unbound_cpu(raw_smp_processor_id());
1446 pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
1447 } else {
1448 if (req_cpu == WORK_CPU_UNBOUND)
1449 cpu = raw_smp_processor_id();
1450 pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
1451 }
1452
1453 /*
1454 * If @work was previously on a different pool, it might still be
1455 * running there, in which case the work needs to be queued on that
1456 * pool to guarantee non-reentrancy.
1457 */
1458 last_pool = get_work_pool(work);
1459 if (last_pool && last_pool != pwq->pool) {
1460 struct worker *worker;
1461
1462 raw_spin_lock(&last_pool->lock);
1463
1464 worker = find_worker_executing_work(last_pool, work);
1465
1466 if (worker && worker->current_pwq->wq == wq) {
1467 pwq = worker->current_pwq;
1468 } else {
1469 /* meh... not running there, queue here */
1470 raw_spin_unlock(&last_pool->lock);
1471 raw_spin_lock(&pwq->pool->lock);
1472 }
1473 } else {
1474 raw_spin_lock(&pwq->pool->lock);
1475 }
1476
1477 /*
1478 * pwq is determined and locked. For unbound pools, we could have
1479 * raced with pwq release and it could already be dead. If its
1480 * refcnt is zero, repeat pwq selection. Note that pwqs never die
1481 * without another pwq replacing it in the numa_pwq_tbl or while
1482 * work items are executing on it, so the retrying is guaranteed to
1483 * make forward-progress.
1484 */
1485 if (unlikely(!pwq->refcnt)) {
1486 if (wq->flags & WQ_UNBOUND) {
1487 raw_spin_unlock(&pwq->pool->lock);
1488 cpu_relax();
1489 goto retry;
1490 }
1491 /* oops */
1492 WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt",
1493 wq->name, cpu);
1494 }
1495
1496 /* pwq determined, queue */
1497 trace_workqueue_queue_work(req_cpu, pwq, work);
1498
1499 if (WARN_ON(!list_empty(&work->entry)))
1500 goto out;
1501
1502 pwq->nr_in_flight[pwq->work_color]++;
1503 work_flags = work_color_to_flags(pwq->work_color);
1504
1505 if (likely(pwq->nr_active < pwq->max_active)) {
1506 trace_workqueue_activate_work(work);
1507 pwq->nr_active++;
1508 worklist = &pwq->pool->worklist;
1509 if (list_empty(worklist))
1510 pwq->pool->watchdog_ts = jiffies;
1511 } else {
1512 work_flags |= WORK_STRUCT_DELAYED;
1513 worklist = &pwq->delayed_works;
1514 }
1515
1516 debug_work_activate(work);
1517 insert_work(pwq, work, worklist, work_flags);
1518
1519 out:
1520 raw_spin_unlock(&pwq->pool->lock);
1521 rcu_read_unlock();
1522 }
1523
1524 /**
1525 * queue_work_on - queue work on specific cpu
1526 * @cpu: CPU number to execute work on
1527 * @wq: workqueue to use
1528 * @work: work to queue
1529 *
1530 * We queue the work to a specific CPU, the caller must ensure it
1531 * can't go away.
1532 *
1533 * Return: %false if @work was already on a queue, %true otherwise.
1534 */
queue_work_on(int cpu,struct workqueue_struct * wq,struct work_struct * work)1535 bool queue_work_on(int cpu, struct workqueue_struct *wq,
1536 struct work_struct *work)
1537 {
1538 bool ret = false;
1539 unsigned long flags;
1540
1541 local_irq_save(flags);
1542
1543 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1544 __queue_work(cpu, wq, work);
1545 ret = true;
1546 }
1547
1548 local_irq_restore(flags);
1549 return ret;
1550 }
1551 EXPORT_SYMBOL(queue_work_on);
1552
1553 /**
1554 * workqueue_select_cpu_near - Select a CPU based on NUMA node
1555 * @node: NUMA node ID that we want to select a CPU from
1556 *
1557 * This function will attempt to find a "random" cpu available on a given
1558 * node. If there are no CPUs available on the given node it will return
1559 * WORK_CPU_UNBOUND indicating that we should just schedule to any
1560 * available CPU if we need to schedule this work.
1561 */
workqueue_select_cpu_near(int node)1562 static int workqueue_select_cpu_near(int node)
1563 {
1564 int cpu;
1565
1566 /* No point in doing this if NUMA isn't enabled for workqueues */
1567 if (!wq_numa_enabled)
1568 return WORK_CPU_UNBOUND;
1569
1570 /* Delay binding to CPU if node is not valid or online */
1571 if (node < 0 || node >= MAX_NUMNODES || !node_online(node))
1572 return WORK_CPU_UNBOUND;
1573
1574 /* Use local node/cpu if we are already there */
1575 cpu = raw_smp_processor_id();
1576 if (node == cpu_to_node(cpu))
1577 return cpu;
1578
1579 /* Use "random" otherwise know as "first" online CPU of node */
1580 cpu = cpumask_any_and(cpumask_of_node(node), cpu_online_mask);
1581
1582 /* If CPU is valid return that, otherwise just defer */
1583 return cpu < nr_cpu_ids ? cpu : WORK_CPU_UNBOUND;
1584 }
1585
1586 /**
1587 * queue_work_node - queue work on a "random" cpu for a given NUMA node
1588 * @node: NUMA node that we are targeting the work for
1589 * @wq: workqueue to use
1590 * @work: work to queue
1591 *
1592 * We queue the work to a "random" CPU within a given NUMA node. The basic
1593 * idea here is to provide a way to somehow associate work with a given
1594 * NUMA node.
1595 *
1596 * This function will only make a best effort attempt at getting this onto
1597 * the right NUMA node. If no node is requested or the requested node is
1598 * offline then we just fall back to standard queue_work behavior.
1599 *
1600 * Currently the "random" CPU ends up being the first available CPU in the
1601 * intersection of cpu_online_mask and the cpumask of the node, unless we
1602 * are running on the node. In that case we just use the current CPU.
1603 *
1604 * Return: %false if @work was already on a queue, %true otherwise.
1605 */
queue_work_node(int node,struct workqueue_struct * wq,struct work_struct * work)1606 bool queue_work_node(int node, struct workqueue_struct *wq,
1607 struct work_struct *work)
1608 {
1609 unsigned long flags;
1610 bool ret = false;
1611
1612 /*
1613 * This current implementation is specific to unbound workqueues.
1614 * Specifically we only return the first available CPU for a given
1615 * node instead of cycling through individual CPUs within the node.
1616 *
1617 * If this is used with a per-cpu workqueue then the logic in
1618 * workqueue_select_cpu_near would need to be updated to allow for
1619 * some round robin type logic.
1620 */
1621 WARN_ON_ONCE(!(wq->flags & WQ_UNBOUND));
1622
1623 local_irq_save(flags);
1624
1625 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1626 int cpu = workqueue_select_cpu_near(node);
1627
1628 __queue_work(cpu, wq, work);
1629 ret = true;
1630 }
1631
1632 local_irq_restore(flags);
1633 return ret;
1634 }
1635 EXPORT_SYMBOL_GPL(queue_work_node);
1636
delayed_work_timer_fn(struct timer_list * t)1637 void delayed_work_timer_fn(struct timer_list *t)
1638 {
1639 struct delayed_work *dwork = from_timer(dwork, t, timer);
1640
1641 /* should have been called from irqsafe timer with irq already off */
1642 __queue_work(dwork->cpu, dwork->wq, &dwork->work);
1643 }
1644 EXPORT_SYMBOL(delayed_work_timer_fn);
1645
__queue_delayed_work(int cpu,struct workqueue_struct * wq,struct delayed_work * dwork,unsigned long delay)1646 static void __queue_delayed_work(int cpu, struct workqueue_struct *wq,
1647 struct delayed_work *dwork, unsigned long delay)
1648 {
1649 struct timer_list *timer = &dwork->timer;
1650 struct work_struct *work = &dwork->work;
1651
1652 WARN_ON_ONCE(!wq);
1653 /*
1654 * With CFI, timer->function can point to a jump table entry in a module,
1655 * which fails the comparison. Disable the warning if CFI and modules are
1656 * both enabled.
1657 */
1658 if (!IS_ENABLED(CONFIG_CFI_CLANG) || !IS_ENABLED(CONFIG_MODULES))
1659 WARN_ON_ONCE(timer->function != delayed_work_timer_fn);
1660
1661 WARN_ON_ONCE(timer_pending(timer));
1662 WARN_ON_ONCE(!list_empty(&work->entry));
1663
1664 /*
1665 * If @delay is 0, queue @dwork->work immediately. This is for
1666 * both optimization and correctness. The earliest @timer can
1667 * expire is on the closest next tick and delayed_work users depend
1668 * on that there's no such delay when @delay is 0.
1669 */
1670 if (!delay) {
1671 __queue_work(cpu, wq, &dwork->work);
1672 return;
1673 }
1674
1675 dwork->wq = wq;
1676 dwork->cpu = cpu;
1677 timer->expires = jiffies + delay;
1678
1679 if (unlikely(cpu != WORK_CPU_UNBOUND))
1680 add_timer_on(timer, cpu);
1681 else
1682 add_timer(timer);
1683 }
1684
1685 /**
1686 * queue_delayed_work_on - queue work on specific CPU after delay
1687 * @cpu: CPU number to execute work on
1688 * @wq: workqueue to use
1689 * @dwork: work to queue
1690 * @delay: number of jiffies to wait before queueing
1691 *
1692 * Return: %false if @work was already on a queue, %true otherwise. If
1693 * @delay is zero and @dwork is idle, it will be scheduled for immediate
1694 * execution.
1695 */
queue_delayed_work_on(int cpu,struct workqueue_struct * wq,struct delayed_work * dwork,unsigned long delay)1696 bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
1697 struct delayed_work *dwork, unsigned long delay)
1698 {
1699 struct work_struct *work = &dwork->work;
1700 bool ret = false;
1701 unsigned long flags;
1702
1703 /* read the comment in __queue_work() */
1704 local_irq_save(flags);
1705
1706 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1707 __queue_delayed_work(cpu, wq, dwork, delay);
1708 ret = true;
1709 }
1710
1711 local_irq_restore(flags);
1712 return ret;
1713 }
1714 EXPORT_SYMBOL(queue_delayed_work_on);
1715
1716 /**
1717 * mod_delayed_work_on - modify delay of or queue a delayed work on specific CPU
1718 * @cpu: CPU number to execute work on
1719 * @wq: workqueue to use
1720 * @dwork: work to queue
1721 * @delay: number of jiffies to wait before queueing
1722 *
1723 * If @dwork is idle, equivalent to queue_delayed_work_on(); otherwise,
1724 * modify @dwork's timer so that it expires after @delay. If @delay is
1725 * zero, @work is guaranteed to be scheduled immediately regardless of its
1726 * current state.
1727 *
1728 * Return: %false if @dwork was idle and queued, %true if @dwork was
1729 * pending and its timer was modified.
1730 *
1731 * This function is safe to call from any context including IRQ handler.
1732 * See try_to_grab_pending() for details.
1733 */
mod_delayed_work_on(int cpu,struct workqueue_struct * wq,struct delayed_work * dwork,unsigned long delay)1734 bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
1735 struct delayed_work *dwork, unsigned long delay)
1736 {
1737 unsigned long flags;
1738 int ret;
1739
1740 do {
1741 ret = try_to_grab_pending(&dwork->work, true, &flags);
1742 } while (unlikely(ret == -EAGAIN));
1743
1744 if (likely(ret >= 0)) {
1745 __queue_delayed_work(cpu, wq, dwork, delay);
1746 local_irq_restore(flags);
1747 }
1748
1749 /* -ENOENT from try_to_grab_pending() becomes %true */
1750 return ret;
1751 }
1752 EXPORT_SYMBOL_GPL(mod_delayed_work_on);
1753
rcu_work_rcufn(struct rcu_head * rcu)1754 static void rcu_work_rcufn(struct rcu_head *rcu)
1755 {
1756 struct rcu_work *rwork = container_of(rcu, struct rcu_work, rcu);
1757
1758 /* read the comment in __queue_work() */
1759 local_irq_disable();
1760 __queue_work(WORK_CPU_UNBOUND, rwork->wq, &rwork->work);
1761 local_irq_enable();
1762 }
1763
1764 /**
1765 * queue_rcu_work - queue work after a RCU grace period
1766 * @wq: workqueue to use
1767 * @rwork: work to queue
1768 *
1769 * Return: %false if @rwork was already pending, %true otherwise. Note
1770 * that a full RCU grace period is guaranteed only after a %true return.
1771 * While @rwork is guaranteed to be executed after a %false return, the
1772 * execution may happen before a full RCU grace period has passed.
1773 */
queue_rcu_work(struct workqueue_struct * wq,struct rcu_work * rwork)1774 bool queue_rcu_work(struct workqueue_struct *wq, struct rcu_work *rwork)
1775 {
1776 struct work_struct *work = &rwork->work;
1777
1778 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1779 rwork->wq = wq;
1780 call_rcu(&rwork->rcu, rcu_work_rcufn);
1781 return true;
1782 }
1783
1784 return false;
1785 }
1786 EXPORT_SYMBOL(queue_rcu_work);
1787
1788 /**
1789 * worker_enter_idle - enter idle state
1790 * @worker: worker which is entering idle state
1791 *
1792 * @worker is entering idle state. Update stats and idle timer if
1793 * necessary.
1794 *
1795 * LOCKING:
1796 * raw_spin_lock_irq(pool->lock).
1797 */
worker_enter_idle(struct worker * worker)1798 static void worker_enter_idle(struct worker *worker)
1799 {
1800 struct worker_pool *pool = worker->pool;
1801
1802 if (WARN_ON_ONCE(worker->flags & WORKER_IDLE) ||
1803 WARN_ON_ONCE(!list_empty(&worker->entry) &&
1804 (worker->hentry.next || worker->hentry.pprev)))
1805 return;
1806
1807 /* can't use worker_set_flags(), also called from create_worker() */
1808 worker->flags |= WORKER_IDLE;
1809 pool->nr_idle++;
1810 worker->last_active = jiffies;
1811
1812 /* idle_list is LIFO */
1813 list_add(&worker->entry, &pool->idle_list);
1814
1815 if (too_many_workers(pool) && !timer_pending(&pool->idle_timer))
1816 mod_timer(&pool->idle_timer, jiffies + IDLE_WORKER_TIMEOUT);
1817
1818 /*
1819 * Sanity check nr_running. Because unbind_workers() releases
1820 * pool->lock between setting %WORKER_UNBOUND and zapping
1821 * nr_running, the warning may trigger spuriously. Check iff
1822 * unbind is not in progress.
1823 */
1824 WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
1825 pool->nr_workers == pool->nr_idle &&
1826 atomic_read(&pool->nr_running));
1827 }
1828
1829 /**
1830 * worker_leave_idle - leave idle state
1831 * @worker: worker which is leaving idle state
1832 *
1833 * @worker is leaving idle state. Update stats.
1834 *
1835 * LOCKING:
1836 * raw_spin_lock_irq(pool->lock).
1837 */
worker_leave_idle(struct worker * worker)1838 static void worker_leave_idle(struct worker *worker)
1839 {
1840 struct worker_pool *pool = worker->pool;
1841
1842 if (WARN_ON_ONCE(!(worker->flags & WORKER_IDLE)))
1843 return;
1844 worker_clr_flags(worker, WORKER_IDLE);
1845 pool->nr_idle--;
1846 list_del_init(&worker->entry);
1847 }
1848
alloc_worker(int node)1849 static struct worker *alloc_worker(int node)
1850 {
1851 struct worker *worker;
1852
1853 worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, node);
1854 if (worker) {
1855 INIT_LIST_HEAD(&worker->entry);
1856 INIT_LIST_HEAD(&worker->scheduled);
1857 INIT_LIST_HEAD(&worker->node);
1858 /* on creation a worker is in !idle && prep state */
1859 worker->flags = WORKER_PREP;
1860 }
1861 return worker;
1862 }
1863
1864 /**
1865 * worker_attach_to_pool() - attach a worker to a pool
1866 * @worker: worker to be attached
1867 * @pool: the target pool
1868 *
1869 * Attach @worker to @pool. Once attached, the %WORKER_UNBOUND flag and
1870 * cpu-binding of @worker are kept coordinated with the pool across
1871 * cpu-[un]hotplugs.
1872 */
worker_attach_to_pool(struct worker * worker,struct worker_pool * pool)1873 static void worker_attach_to_pool(struct worker *worker,
1874 struct worker_pool *pool)
1875 {
1876 mutex_lock(&wq_pool_attach_mutex);
1877
1878 /*
1879 * The wq_pool_attach_mutex ensures %POOL_DISASSOCIATED remains
1880 * stable across this function. See the comments above the flag
1881 * definition for details.
1882 */
1883 if (pool->flags & POOL_DISASSOCIATED)
1884 worker->flags |= WORKER_UNBOUND;
1885
1886 if (worker->rescue_wq)
1887 set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
1888
1889 list_add_tail(&worker->node, &pool->workers);
1890 worker->pool = pool;
1891
1892 mutex_unlock(&wq_pool_attach_mutex);
1893 }
1894
1895 /**
1896 * worker_detach_from_pool() - detach a worker from its pool
1897 * @worker: worker which is attached to its pool
1898 *
1899 * Undo the attaching which had been done in worker_attach_to_pool(). The
1900 * caller worker shouldn't access to the pool after detached except it has
1901 * other reference to the pool.
1902 */
worker_detach_from_pool(struct worker * worker)1903 static void worker_detach_from_pool(struct worker *worker)
1904 {
1905 struct worker_pool *pool = worker->pool;
1906 struct completion *detach_completion = NULL;
1907
1908 mutex_lock(&wq_pool_attach_mutex);
1909
1910 list_del(&worker->node);
1911 worker->pool = NULL;
1912
1913 if (list_empty(&pool->workers))
1914 detach_completion = pool->detach_completion;
1915 mutex_unlock(&wq_pool_attach_mutex);
1916
1917 /* clear leftover flags without pool->lock after it is detached */
1918 worker->flags &= ~(WORKER_UNBOUND | WORKER_REBOUND);
1919
1920 if (detach_completion)
1921 complete(detach_completion);
1922 }
1923
1924 /**
1925 * create_worker - create a new workqueue worker
1926 * @pool: pool the new worker will belong to
1927 *
1928 * Create and start a new worker which is attached to @pool.
1929 *
1930 * CONTEXT:
1931 * Might sleep. Does GFP_KERNEL allocations.
1932 *
1933 * Return:
1934 * Pointer to the newly created worker.
1935 */
create_worker(struct worker_pool * pool)1936 static struct worker *create_worker(struct worker_pool *pool)
1937 {
1938 struct worker *worker = NULL;
1939 int id = -1;
1940 char id_buf[16];
1941
1942 /* ID is needed to determine kthread name */
1943 id = ida_simple_get(&pool->worker_ida, 0, 0, GFP_KERNEL);
1944 if (id < 0)
1945 goto fail;
1946
1947 worker = alloc_worker(pool->node);
1948 if (!worker)
1949 goto fail;
1950
1951 worker->id = id;
1952
1953 if (pool->cpu >= 0)
1954 snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
1955 pool->attrs->nice < 0 ? "H" : "");
1956 else
1957 snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
1958
1959 worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
1960 "kworker/%s", id_buf);
1961 if (IS_ERR(worker->task))
1962 goto fail;
1963
1964 set_user_nice(worker->task, pool->attrs->nice);
1965 kthread_bind_mask(worker->task, pool->attrs->cpumask);
1966
1967 /* successful, attach the worker to the pool */
1968 worker_attach_to_pool(worker, pool);
1969
1970 /* start the newly created worker */
1971 raw_spin_lock_irq(&pool->lock);
1972 worker->pool->nr_workers++;
1973 worker_enter_idle(worker);
1974 wake_up_process(worker->task);
1975 raw_spin_unlock_irq(&pool->lock);
1976
1977 return worker;
1978
1979 fail:
1980 if (id >= 0)
1981 ida_simple_remove(&pool->worker_ida, id);
1982 kfree(worker);
1983 return NULL;
1984 }
1985
1986 /**
1987 * destroy_worker - destroy a workqueue worker
1988 * @worker: worker to be destroyed
1989 *
1990 * Destroy @worker and adjust @pool stats accordingly. The worker should
1991 * be idle.
1992 *
1993 * CONTEXT:
1994 * raw_spin_lock_irq(pool->lock).
1995 */
destroy_worker(struct worker * worker)1996 static void destroy_worker(struct worker *worker)
1997 {
1998 struct worker_pool *pool = worker->pool;
1999
2000 lockdep_assert_held(&pool->lock);
2001
2002 /* sanity check frenzy */
2003 if (WARN_ON(worker->current_work) ||
2004 WARN_ON(!list_empty(&worker->scheduled)) ||
2005 WARN_ON(!(worker->flags & WORKER_IDLE)))
2006 return;
2007
2008 pool->nr_workers--;
2009 pool->nr_idle--;
2010
2011 list_del_init(&worker->entry);
2012 worker->flags |= WORKER_DIE;
2013 wake_up_process(worker->task);
2014 }
2015
idle_worker_timeout(struct timer_list * t)2016 static void idle_worker_timeout(struct timer_list *t)
2017 {
2018 struct worker_pool *pool = from_timer(pool, t, idle_timer);
2019
2020 raw_spin_lock_irq(&pool->lock);
2021
2022 while (too_many_workers(pool)) {
2023 struct worker *worker;
2024 unsigned long expires;
2025
2026 /* idle_list is kept in LIFO order, check the last one */
2027 worker = list_entry(pool->idle_list.prev, struct worker, entry);
2028 expires = worker->last_active + IDLE_WORKER_TIMEOUT;
2029
2030 if (time_before(jiffies, expires)) {
2031 mod_timer(&pool->idle_timer, expires);
2032 break;
2033 }
2034
2035 destroy_worker(worker);
2036 }
2037
2038 raw_spin_unlock_irq(&pool->lock);
2039 }
2040
send_mayday(struct work_struct * work)2041 static void send_mayday(struct work_struct *work)
2042 {
2043 struct pool_workqueue *pwq = get_work_pwq(work);
2044 struct workqueue_struct *wq = pwq->wq;
2045
2046 lockdep_assert_held(&wq_mayday_lock);
2047
2048 if (!wq->rescuer)
2049 return;
2050
2051 /* mayday mayday mayday */
2052 if (list_empty(&pwq->mayday_node)) {
2053 /*
2054 * If @pwq is for an unbound wq, its base ref may be put at
2055 * any time due to an attribute change. Pin @pwq until the
2056 * rescuer is done with it.
2057 */
2058 get_pwq(pwq);
2059 list_add_tail(&pwq->mayday_node, &wq->maydays);
2060 wake_up_process(wq->rescuer->task);
2061 }
2062 }
2063
pool_mayday_timeout(struct timer_list * t)2064 static void pool_mayday_timeout(struct timer_list *t)
2065 {
2066 struct worker_pool *pool = from_timer(pool, t, mayday_timer);
2067 struct work_struct *work;
2068
2069 raw_spin_lock_irq(&pool->lock);
2070 raw_spin_lock(&wq_mayday_lock); /* for wq->maydays */
2071
2072 if (need_to_create_worker(pool)) {
2073 /*
2074 * We've been trying to create a new worker but
2075 * haven't been successful. We might be hitting an
2076 * allocation deadlock. Send distress signals to
2077 * rescuers.
2078 */
2079 list_for_each_entry(work, &pool->worklist, entry)
2080 send_mayday(work);
2081 }
2082
2083 raw_spin_unlock(&wq_mayday_lock);
2084 raw_spin_unlock_irq(&pool->lock);
2085
2086 mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INTERVAL);
2087 }
2088
2089 /**
2090 * maybe_create_worker - create a new worker if necessary
2091 * @pool: pool to create a new worker for
2092 *
2093 * Create a new worker for @pool if necessary. @pool is guaranteed to
2094 * have at least one idle worker on return from this function. If
2095 * creating a new worker takes longer than MAYDAY_INTERVAL, mayday is
2096 * sent to all rescuers with works scheduled on @pool to resolve
2097 * possible allocation deadlock.
2098 *
2099 * On return, need_to_create_worker() is guaranteed to be %false and
2100 * may_start_working() %true.
2101 *
2102 * LOCKING:
2103 * raw_spin_lock_irq(pool->lock) which may be released and regrabbed
2104 * multiple times. Does GFP_KERNEL allocations. Called only from
2105 * manager.
2106 */
maybe_create_worker(struct worker_pool * pool)2107 static void maybe_create_worker(struct worker_pool *pool)
2108 __releases(&pool->lock)
2109 __acquires(&pool->lock)
2110 {
2111 restart:
2112 raw_spin_unlock_irq(&pool->lock);
2113
2114 /* if we don't make progress in MAYDAY_INITIAL_TIMEOUT, call for help */
2115 mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT);
2116
2117 while (true) {
2118 if (create_worker(pool) || !need_to_create_worker(pool))
2119 break;
2120
2121 schedule_timeout_interruptible(CREATE_COOLDOWN);
2122
2123 if (!need_to_create_worker(pool))
2124 break;
2125 }
2126
2127 del_timer_sync(&pool->mayday_timer);
2128 raw_spin_lock_irq(&pool->lock);
2129 /*
2130 * This is necessary even after a new worker was just successfully
2131 * created as @pool->lock was dropped and the new worker might have
2132 * already become busy.
2133 */
2134 if (need_to_create_worker(pool))
2135 goto restart;
2136 }
2137
2138 /**
2139 * manage_workers - manage worker pool
2140 * @worker: self
2141 *
2142 * Assume the manager role and manage the worker pool @worker belongs
2143 * to. At any given time, there can be only zero or one manager per
2144 * pool. The exclusion is handled automatically by this function.
2145 *
2146 * The caller can safely start processing works on false return. On
2147 * true return, it's guaranteed that need_to_create_worker() is false
2148 * and may_start_working() is true.
2149 *
2150 * CONTEXT:
2151 * raw_spin_lock_irq(pool->lock) which may be released and regrabbed
2152 * multiple times. Does GFP_KERNEL allocations.
2153 *
2154 * Return:
2155 * %false if the pool doesn't need management and the caller can safely
2156 * start processing works, %true if management function was performed and
2157 * the conditions that the caller verified before calling the function may
2158 * no longer be true.
2159 */
manage_workers(struct worker * worker)2160 static bool manage_workers(struct worker *worker)
2161 {
2162 struct worker_pool *pool = worker->pool;
2163
2164 if (pool->flags & POOL_MANAGER_ACTIVE)
2165 return false;
2166
2167 pool->flags |= POOL_MANAGER_ACTIVE;
2168 pool->manager = worker;
2169
2170 maybe_create_worker(pool);
2171
2172 pool->manager = NULL;
2173 pool->flags &= ~POOL_MANAGER_ACTIVE;
2174 rcuwait_wake_up(&manager_wait);
2175 return true;
2176 }
2177
2178 /**
2179 * process_one_work - process single work
2180 * @worker: self
2181 * @work: work to process
2182 *
2183 * Process @work. This function contains all the logics necessary to
2184 * process a single work including synchronization against and
2185 * interaction with other workers on the same cpu, queueing and
2186 * flushing. As long as context requirement is met, any worker can
2187 * call this function to process a work.
2188 *
2189 * CONTEXT:
2190 * raw_spin_lock_irq(pool->lock) which is released and regrabbed.
2191 */
process_one_work(struct worker * worker,struct work_struct * work)2192 static void process_one_work(struct worker *worker, struct work_struct *work)
2193 __releases(&pool->lock)
2194 __acquires(&pool->lock)
2195 {
2196 struct pool_workqueue *pwq = get_work_pwq(work);
2197 struct worker_pool *pool = worker->pool;
2198 bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;
2199 int work_color;
2200 struct worker *collision;
2201 #ifdef CONFIG_LOCKDEP
2202 /*
2203 * It is permissible to free the struct work_struct from
2204 * inside the function that is called from it, this we need to
2205 * take into account for lockdep too. To avoid bogus "held
2206 * lock freed" warnings as well as problems when looking into
2207 * work->lockdep_map, make a copy and use that here.
2208 */
2209 struct lockdep_map lockdep_map;
2210
2211 lockdep_copy_map(&lockdep_map, &work->lockdep_map);
2212 #endif
2213 /* ensure we're on the correct CPU */
2214 WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
2215 raw_smp_processor_id() != pool->cpu);
2216
2217 /*
2218 * A single work shouldn't be executed concurrently by
2219 * multiple workers on a single cpu. Check whether anyone is
2220 * already processing the work. If so, defer the work to the
2221 * currently executing one.
2222 */
2223 collision = find_worker_executing_work(pool, work);
2224 if (unlikely(collision)) {
2225 move_linked_works(work, &collision->scheduled, NULL);
2226 return;
2227 }
2228
2229 /* claim and dequeue */
2230 debug_work_deactivate(work);
2231 hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);
2232 worker->current_work = work;
2233 worker->current_func = work->func;
2234 worker->current_pwq = pwq;
2235 work_color = get_work_color(work);
2236
2237 /*
2238 * Record wq name for cmdline and debug reporting, may get
2239 * overridden through set_worker_desc().
2240 */
2241 strscpy(worker->desc, pwq->wq->name, WORKER_DESC_LEN);
2242
2243 list_del_init(&work->entry);
2244
2245 /*
2246 * CPU intensive works don't participate in concurrency management.
2247 * They're the scheduler's responsibility. This takes @worker out
2248 * of concurrency management and the next code block will chain
2249 * execution of the pending work items.
2250 */
2251 if (unlikely(cpu_intensive))
2252 worker_set_flags(worker, WORKER_CPU_INTENSIVE);
2253
2254 /*
2255 * Wake up another worker if necessary. The condition is always
2256 * false for normal per-cpu workers since nr_running would always
2257 * be >= 1 at this point. This is used to chain execution of the
2258 * pending work items for WORKER_NOT_RUNNING workers such as the
2259 * UNBOUND and CPU_INTENSIVE ones.
2260 */
2261 if (need_more_worker(pool))
2262 wake_up_worker(pool);
2263
2264 /*
2265 * Record the last pool and clear PENDING which should be the last
2266 * update to @work. Also, do this inside @pool->lock so that
2267 * PENDING and queued state changes happen together while IRQ is
2268 * disabled.
2269 */
2270 set_work_pool_and_clear_pending(work, pool->id);
2271
2272 raw_spin_unlock_irq(&pool->lock);
2273
2274 lock_map_acquire(&pwq->wq->lockdep_map);
2275 lock_map_acquire(&lockdep_map);
2276 /*
2277 * Strictly speaking we should mark the invariant state without holding
2278 * any locks, that is, before these two lock_map_acquire()'s.
2279 *
2280 * However, that would result in:
2281 *
2282 * A(W1)
2283 * WFC(C)
2284 * A(W1)
2285 * C(C)
2286 *
2287 * Which would create W1->C->W1 dependencies, even though there is no
2288 * actual deadlock possible. There are two solutions, using a
2289 * read-recursive acquire on the work(queue) 'locks', but this will then
2290 * hit the lockdep limitation on recursive locks, or simply discard
2291 * these locks.
2292 *
2293 * AFAICT there is no possible deadlock scenario between the
2294 * flush_work() and complete() primitives (except for single-threaded
2295 * workqueues), so hiding them isn't a problem.
2296 */
2297 lockdep_invariant_state(true);
2298 trace_workqueue_execute_start(work);
2299 worker->current_func(work);
2300 /*
2301 * While we must be careful to not use "work" after this, the trace
2302 * point will only record its address.
2303 */
2304 trace_workqueue_execute_end(work, worker->current_func);
2305 lock_map_release(&lockdep_map);
2306 lock_map_release(&pwq->wq->lockdep_map);
2307
2308 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
2309 pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"
2310 " last function: %ps\n",
2311 current->comm, preempt_count(), task_pid_nr(current),
2312 worker->current_func);
2313 debug_show_held_locks(current);
2314 dump_stack();
2315 }
2316
2317 /*
2318 * The following prevents a kworker from hogging CPU on !PREEMPTION
2319 * kernels, where a requeueing work item waiting for something to
2320 * happen could deadlock with stop_machine as such work item could
2321 * indefinitely requeue itself while all other CPUs are trapped in
2322 * stop_machine. At the same time, report a quiescent RCU state so
2323 * the same condition doesn't freeze RCU.
2324 */
2325 cond_resched();
2326
2327 raw_spin_lock_irq(&pool->lock);
2328
2329 /* clear cpu intensive status */
2330 if (unlikely(cpu_intensive))
2331 worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
2332
2333 /* tag the worker for identification in schedule() */
2334 worker->last_func = worker->current_func;
2335
2336 /* we're done with it, release */
2337 hash_del(&worker->hentry);
2338 worker->current_work = NULL;
2339 worker->current_func = NULL;
2340 worker->current_pwq = NULL;
2341 pwq_dec_nr_in_flight(pwq, work_color);
2342 }
2343
2344 /**
2345 * process_scheduled_works - process scheduled works
2346 * @worker: self
2347 *
2348 * Process all scheduled works. Please note that the scheduled list
2349 * may change while processing a work, so this function repeatedly
2350 * fetches a work from the top and executes it.
2351 *
2352 * CONTEXT:
2353 * raw_spin_lock_irq(pool->lock) which may be released and regrabbed
2354 * multiple times.
2355 */
process_scheduled_works(struct worker * worker)2356 static void process_scheduled_works(struct worker *worker)
2357 {
2358 while (!list_empty(&worker->scheduled)) {
2359 struct work_struct *work = list_first_entry(&worker->scheduled,
2360 struct work_struct, entry);
2361 process_one_work(worker, work);
2362 }
2363 }
2364
set_pf_worker(bool val)2365 static void set_pf_worker(bool val)
2366 {
2367 mutex_lock(&wq_pool_attach_mutex);
2368 if (val)
2369 current->flags |= PF_WQ_WORKER;
2370 else
2371 current->flags &= ~PF_WQ_WORKER;
2372 mutex_unlock(&wq_pool_attach_mutex);
2373 }
2374
2375 /**
2376 * worker_thread - the worker thread function
2377 * @__worker: self
2378 *
2379 * The worker thread function. All workers belong to a worker_pool -
2380 * either a per-cpu one or dynamic unbound one. These workers process all
2381 * work items regardless of their specific target workqueue. The only
2382 * exception is work items which belong to workqueues with a rescuer which
2383 * will be explained in rescuer_thread().
2384 *
2385 * Return: 0
2386 */
worker_thread(void * __worker)2387 static int worker_thread(void *__worker)
2388 {
2389 struct worker *worker = __worker;
2390 struct worker_pool *pool = worker->pool;
2391
2392 /* tell the scheduler that this is a workqueue worker */
2393 set_pf_worker(true);
2394 woke_up:
2395 raw_spin_lock_irq(&pool->lock);
2396
2397 /* am I supposed to die? */
2398 if (unlikely(worker->flags & WORKER_DIE)) {
2399 raw_spin_unlock_irq(&pool->lock);
2400 WARN_ON_ONCE(!list_empty(&worker->entry));
2401 set_pf_worker(false);
2402
2403 set_task_comm(worker->task, "kworker/dying");
2404 ida_simple_remove(&pool->worker_ida, worker->id);
2405 worker_detach_from_pool(worker);
2406 kfree(worker);
2407 return 0;
2408 }
2409
2410 worker_leave_idle(worker);
2411 recheck:
2412 /* no more worker necessary? */
2413 if (!need_more_worker(pool))
2414 goto sleep;
2415
2416 /* do we need to manage? */
2417 if (unlikely(!may_start_working(pool)) && manage_workers(worker))
2418 goto recheck;
2419
2420 /*
2421 * ->scheduled list can only be filled while a worker is
2422 * preparing to process a work or actually processing it.
2423 * Make sure nobody diddled with it while I was sleeping.
2424 */
2425 WARN_ON_ONCE(!list_empty(&worker->scheduled));
2426
2427 /*
2428 * Finish PREP stage. We're guaranteed to have at least one idle
2429 * worker or that someone else has already assumed the manager
2430 * role. This is where @worker starts participating in concurrency
2431 * management if applicable and concurrency management is restored
2432 * after being rebound. See rebind_workers() for details.
2433 */
2434 worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
2435
2436 do {
2437 struct work_struct *work =
2438 list_first_entry(&pool->worklist,
2439 struct work_struct, entry);
2440
2441 pool->watchdog_ts = jiffies;
2442
2443 if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
2444 /* optimization path, not strictly necessary */
2445 process_one_work(worker, work);
2446 if (unlikely(!list_empty(&worker->scheduled)))
2447 process_scheduled_works(worker);
2448 } else {
2449 move_linked_works(work, &worker->scheduled, NULL);
2450 process_scheduled_works(worker);
2451 }
2452 } while (keep_working(pool));
2453
2454 worker_set_flags(worker, WORKER_PREP);
2455 sleep:
2456 /*
2457 * pool->lock is held and there's no work to process and no need to
2458 * manage, sleep. Workers are woken up only while holding
2459 * pool->lock or from local cpu, so setting the current state
2460 * before releasing pool->lock is enough to prevent losing any
2461 * event.
2462 */
2463 worker_enter_idle(worker);
2464 __set_current_state(TASK_IDLE);
2465 raw_spin_unlock_irq(&pool->lock);
2466 schedule();
2467 goto woke_up;
2468 }
2469
2470 /**
2471 * rescuer_thread - the rescuer thread function
2472 * @__rescuer: self
2473 *
2474 * Workqueue rescuer thread function. There's one rescuer for each
2475 * workqueue which has WQ_MEM_RECLAIM set.
2476 *
2477 * Regular work processing on a pool may block trying to create a new
2478 * worker which uses GFP_KERNEL allocation which has slight chance of
2479 * developing into deadlock if some works currently on the same queue
2480 * need to be processed to satisfy the GFP_KERNEL allocation. This is
2481 * the problem rescuer solves.
2482 *
2483 * When such condition is possible, the pool summons rescuers of all
2484 * workqueues which have works queued on the pool and let them process
2485 * those works so that forward progress can be guaranteed.
2486 *
2487 * This should happen rarely.
2488 *
2489 * Return: 0
2490 */
rescuer_thread(void * __rescuer)2491 static int rescuer_thread(void *__rescuer)
2492 {
2493 struct worker *rescuer = __rescuer;
2494 struct workqueue_struct *wq = rescuer->rescue_wq;
2495 struct list_head *scheduled = &rescuer->scheduled;
2496 bool should_stop;
2497
2498 set_user_nice(current, RESCUER_NICE_LEVEL);
2499
2500 /*
2501 * Mark rescuer as worker too. As WORKER_PREP is never cleared, it
2502 * doesn't participate in concurrency management.
2503 */
2504 set_pf_worker(true);
2505 repeat:
2506 set_current_state(TASK_IDLE);
2507
2508 /*
2509 * By the time the rescuer is requested to stop, the workqueue
2510 * shouldn't have any work pending, but @wq->maydays may still have
2511 * pwq(s) queued. This can happen by non-rescuer workers consuming
2512 * all the work items before the rescuer got to them. Go through
2513 * @wq->maydays processing before acting on should_stop so that the
2514 * list is always empty on exit.
2515 */
2516 should_stop = kthread_should_stop();
2517
2518 /* see whether any pwq is asking for help */
2519 raw_spin_lock_irq(&wq_mayday_lock);
2520
2521 while (!list_empty(&wq->maydays)) {
2522 struct pool_workqueue *pwq = list_first_entry(&wq->maydays,
2523 struct pool_workqueue, mayday_node);
2524 struct worker_pool *pool = pwq->pool;
2525 struct work_struct *work, *n;
2526 bool first = true;
2527
2528 __set_current_state(TASK_RUNNING);
2529 list_del_init(&pwq->mayday_node);
2530
2531 raw_spin_unlock_irq(&wq_mayday_lock);
2532
2533 worker_attach_to_pool(rescuer, pool);
2534
2535 raw_spin_lock_irq(&pool->lock);
2536
2537 /*
2538 * Slurp in all works issued via this workqueue and
2539 * process'em.
2540 */
2541 WARN_ON_ONCE(!list_empty(scheduled));
2542 list_for_each_entry_safe(work, n, &pool->worklist, entry) {
2543 if (get_work_pwq(work) == pwq) {
2544 if (first)
2545 pool->watchdog_ts = jiffies;
2546 move_linked_works(work, scheduled, &n);
2547 }
2548 first = false;
2549 }
2550
2551 if (!list_empty(scheduled)) {
2552 process_scheduled_works(rescuer);
2553
2554 /*
2555 * The above execution of rescued work items could
2556 * have created more to rescue through
2557 * pwq_activate_first_delayed() or chained
2558 * queueing. Let's put @pwq back on mayday list so
2559 * that such back-to-back work items, which may be
2560 * being used to relieve memory pressure, don't
2561 * incur MAYDAY_INTERVAL delay inbetween.
2562 */
2563 if (pwq->nr_active && need_to_create_worker(pool)) {
2564 raw_spin_lock(&wq_mayday_lock);
2565 /*
2566 * Queue iff we aren't racing destruction
2567 * and somebody else hasn't queued it already.
2568 */
2569 if (wq->rescuer && list_empty(&pwq->mayday_node)) {
2570 get_pwq(pwq);
2571 list_add_tail(&pwq->mayday_node, &wq->maydays);
2572 }
2573 raw_spin_unlock(&wq_mayday_lock);
2574 }
2575 }
2576
2577 /*
2578 * Put the reference grabbed by send_mayday(). @pool won't
2579 * go away while we're still attached to it.
2580 */
2581 put_pwq(pwq);
2582
2583 /*
2584 * Leave this pool. If need_more_worker() is %true, notify a
2585 * regular worker; otherwise, we end up with 0 concurrency
2586 * and stalling the execution.
2587 */
2588 if (need_more_worker(pool))
2589 wake_up_worker(pool);
2590
2591 raw_spin_unlock_irq(&pool->lock);
2592
2593 worker_detach_from_pool(rescuer);
2594
2595 raw_spin_lock_irq(&wq_mayday_lock);
2596 }
2597
2598 raw_spin_unlock_irq(&wq_mayday_lock);
2599
2600 if (should_stop) {
2601 __set_current_state(TASK_RUNNING);
2602 set_pf_worker(false);
2603 return 0;
2604 }
2605
2606 /* rescuers should never participate in concurrency management */
2607 WARN_ON_ONCE(!(rescuer->flags & WORKER_NOT_RUNNING));
2608 schedule();
2609 goto repeat;
2610 }
2611
2612 /**
2613 * check_flush_dependency - check for flush dependency sanity
2614 * @target_wq: workqueue being flushed
2615 * @target_work: work item being flushed (NULL for workqueue flushes)
2616 *
2617 * %current is trying to flush the whole @target_wq or @target_work on it.
2618 * If @target_wq doesn't have %WQ_MEM_RECLAIM, verify that %current is not
2619 * reclaiming memory or running on a workqueue which doesn't have
2620 * %WQ_MEM_RECLAIM as that can break forward-progress guarantee leading to
2621 * a deadlock.
2622 */
check_flush_dependency(struct workqueue_struct * target_wq,struct work_struct * target_work)2623 static void check_flush_dependency(struct workqueue_struct *target_wq,
2624 struct work_struct *target_work)
2625 {
2626 work_func_t target_func = target_work ? target_work->func : NULL;
2627 struct worker *worker;
2628
2629 if (target_wq->flags & WQ_MEM_RECLAIM)
2630 return;
2631
2632 worker = current_wq_worker();
2633
2634 WARN_ONCE(current->flags & PF_MEMALLOC,
2635 "workqueue: PF_MEMALLOC task %d(%s) is flushing !WQ_MEM_RECLAIM %s:%ps",
2636 current->pid, current->comm, target_wq->name, target_func);
2637 WARN_ONCE(worker && ((worker->current_pwq->wq->flags &
2638 (WQ_MEM_RECLAIM | __WQ_LEGACY)) == WQ_MEM_RECLAIM),
2639 "workqueue: WQ_MEM_RECLAIM %s:%ps is flushing !WQ_MEM_RECLAIM %s:%ps",
2640 worker->current_pwq->wq->name, worker->current_func,
2641 target_wq->name, target_func);
2642 }
2643
2644 struct wq_barrier {
2645 struct work_struct work;
2646 struct completion done;
2647 struct task_struct *task; /* purely informational */
2648 };
2649
wq_barrier_func(struct work_struct * work)2650 static void wq_barrier_func(struct work_struct *work)
2651 {
2652 struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
2653 complete(&barr->done);
2654 }
2655
2656 /**
2657 * insert_wq_barrier - insert a barrier work
2658 * @pwq: pwq to insert barrier into
2659 * @barr: wq_barrier to insert
2660 * @target: target work to attach @barr to
2661 * @worker: worker currently executing @target, NULL if @target is not executing
2662 *
2663 * @barr is linked to @target such that @barr is completed only after
2664 * @target finishes execution. Please note that the ordering
2665 * guarantee is observed only with respect to @target and on the local
2666 * cpu.
2667 *
2668 * Currently, a queued barrier can't be canceled. This is because
2669 * try_to_grab_pending() can't determine whether the work to be
2670 * grabbed is at the head of the queue and thus can't clear LINKED
2671 * flag of the previous work while there must be a valid next work
2672 * after a work with LINKED flag set.
2673 *
2674 * Note that when @worker is non-NULL, @target may be modified
2675 * underneath us, so we can't reliably determine pwq from @target.
2676 *
2677 * CONTEXT:
2678 * raw_spin_lock_irq(pool->lock).
2679 */
insert_wq_barrier(struct pool_workqueue * pwq,struct wq_barrier * barr,struct work_struct * target,struct worker * worker)2680 static void insert_wq_barrier(struct pool_workqueue *pwq,
2681 struct wq_barrier *barr,
2682 struct work_struct *target, struct worker *worker)
2683 {
2684 struct list_head *head;
2685 unsigned int linked = 0;
2686
2687 /*
2688 * debugobject calls are safe here even with pool->lock locked
2689 * as we know for sure that this will not trigger any of the
2690 * checks and call back into the fixup functions where we
2691 * might deadlock.
2692 */
2693 INIT_WORK_ONSTACK(&barr->work, wq_barrier_func);
2694 __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
2695
2696 init_completion_map(&barr->done, &target->lockdep_map);
2697
2698 barr->task = current;
2699
2700 /*
2701 * If @target is currently being executed, schedule the
2702 * barrier to the worker; otherwise, put it after @target.
2703 */
2704 if (worker)
2705 head = worker->scheduled.next;
2706 else {
2707 unsigned long *bits = work_data_bits(target);
2708
2709 head = target->entry.next;
2710 /* there can already be other linked works, inherit and set */
2711 linked = *bits & WORK_STRUCT_LINKED;
2712 __set_bit(WORK_STRUCT_LINKED_BIT, bits);
2713 }
2714
2715 debug_work_activate(&barr->work);
2716 insert_work(pwq, &barr->work, head,
2717 work_color_to_flags(WORK_NO_COLOR) | linked);
2718 }
2719
2720 /**
2721 * flush_workqueue_prep_pwqs - prepare pwqs for workqueue flushing
2722 * @wq: workqueue being flushed
2723 * @flush_color: new flush color, < 0 for no-op
2724 * @work_color: new work color, < 0 for no-op
2725 *
2726 * Prepare pwqs for workqueue flushing.
2727 *
2728 * If @flush_color is non-negative, flush_color on all pwqs should be
2729 * -1. If no pwq has in-flight commands at the specified color, all
2730 * pwq->flush_color's stay at -1 and %false is returned. If any pwq
2731 * has in flight commands, its pwq->flush_color is set to
2732 * @flush_color, @wq->nr_pwqs_to_flush is updated accordingly, pwq
2733 * wakeup logic is armed and %true is returned.
2734 *
2735 * The caller should have initialized @wq->first_flusher prior to
2736 * calling this function with non-negative @flush_color. If
2737 * @flush_color is negative, no flush color update is done and %false
2738 * is returned.
2739 *
2740 * If @work_color is non-negative, all pwqs should have the same
2741 * work_color which is previous to @work_color and all will be
2742 * advanced to @work_color.
2743 *
2744 * CONTEXT:
2745 * mutex_lock(wq->mutex).
2746 *
2747 * Return:
2748 * %true if @flush_color >= 0 and there's something to flush. %false
2749 * otherwise.
2750 */
flush_workqueue_prep_pwqs(struct workqueue_struct * wq,int flush_color,int work_color)2751 static bool flush_workqueue_prep_pwqs(struct workqueue_struct *wq,
2752 int flush_color, int work_color)
2753 {
2754 bool wait = false;
2755 struct pool_workqueue *pwq;
2756
2757 if (flush_color >= 0) {
2758 WARN_ON_ONCE(atomic_read(&wq->nr_pwqs_to_flush));
2759 atomic_set(&wq->nr_pwqs_to_flush, 1);
2760 }
2761
2762 for_each_pwq(pwq, wq) {
2763 struct worker_pool *pool = pwq->pool;
2764
2765 raw_spin_lock_irq(&pool->lock);
2766
2767 if (flush_color >= 0) {
2768 WARN_ON_ONCE(pwq->flush_color != -1);
2769
2770 if (pwq->nr_in_flight[flush_color]) {
2771 pwq->flush_color = flush_color;
2772 atomic_inc(&wq->nr_pwqs_to_flush);
2773 wait = true;
2774 }
2775 }
2776
2777 if (work_color >= 0) {
2778 WARN_ON_ONCE(work_color != work_next_color(pwq->work_color));
2779 pwq->work_color = work_color;
2780 }
2781
2782 raw_spin_unlock_irq(&pool->lock);
2783 }
2784
2785 if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_pwqs_to_flush))
2786 complete(&wq->first_flusher->done);
2787
2788 return wait;
2789 }
2790
2791 /**
2792 * flush_workqueue - ensure that any scheduled work has run to completion.
2793 * @wq: workqueue to flush
2794 *
2795 * This function sleeps until all work items which were queued on entry
2796 * have finished execution, but it is not livelocked by new incoming ones.
2797 */
flush_workqueue(struct workqueue_struct * wq)2798 void flush_workqueue(struct workqueue_struct *wq)
2799 {
2800 struct wq_flusher this_flusher = {
2801 .list = LIST_HEAD_INIT(this_flusher.list),
2802 .flush_color = -1,
2803 .done = COMPLETION_INITIALIZER_ONSTACK_MAP(this_flusher.done, wq->lockdep_map),
2804 };
2805 int next_color;
2806
2807 if (WARN_ON(!wq_online))
2808 return;
2809
2810 lock_map_acquire(&wq->lockdep_map);
2811 lock_map_release(&wq->lockdep_map);
2812
2813 mutex_lock(&wq->mutex);
2814
2815 /*
2816 * Start-to-wait phase
2817 */
2818 next_color = work_next_color(wq->work_color);
2819
2820 if (next_color != wq->flush_color) {
2821 /*
2822 * Color space is not full. The current work_color
2823 * becomes our flush_color and work_color is advanced
2824 * by one.
2825 */
2826 WARN_ON_ONCE(!list_empty(&wq->flusher_overflow));
2827 this_flusher.flush_color = wq->work_color;
2828 wq->work_color = next_color;
2829
2830 if (!wq->first_flusher) {
2831 /* no flush in progress, become the first flusher */
2832 WARN_ON_ONCE(wq->flush_color != this_flusher.flush_color);
2833
2834 wq->first_flusher = &this_flusher;
2835
2836 if (!flush_workqueue_prep_pwqs(wq, wq->flush_color,
2837 wq->work_color)) {
2838 /* nothing to flush, done */
2839 wq->flush_color = next_color;
2840 wq->first_flusher = NULL;
2841 goto out_unlock;
2842 }
2843 } else {
2844 /* wait in queue */
2845 WARN_ON_ONCE(wq->flush_color == this_flusher.flush_color);
2846 list_add_tail(&this_flusher.list, &wq->flusher_queue);
2847 flush_workqueue_prep_pwqs(wq, -1, wq->work_color);
2848 }
2849 } else {
2850 /*
2851 * Oops, color space is full, wait on overflow queue.
2852 * The next flush completion will assign us
2853 * flush_color and transfer to flusher_queue.
2854 */
2855 list_add_tail(&this_flusher.list, &wq->flusher_overflow);
2856 }
2857
2858 check_flush_dependency(wq, NULL);
2859
2860 mutex_unlock(&wq->mutex);
2861
2862 wait_for_completion(&this_flusher.done);
2863
2864 /*
2865 * Wake-up-and-cascade phase
2866 *
2867 * First flushers are responsible for cascading flushes and
2868 * handling overflow. Non-first flushers can simply return.
2869 */
2870 if (READ_ONCE(wq->first_flusher) != &this_flusher)
2871 return;
2872
2873 mutex_lock(&wq->mutex);
2874
2875 /* we might have raced, check again with mutex held */
2876 if (wq->first_flusher != &this_flusher)
2877 goto out_unlock;
2878
2879 WRITE_ONCE(wq->first_flusher, NULL);
2880
2881 WARN_ON_ONCE(!list_empty(&this_flusher.list));
2882 WARN_ON_ONCE(wq->flush_color != this_flusher.flush_color);
2883
2884 while (true) {
2885 struct wq_flusher *next, *tmp;
2886
2887 /* complete all the flushers sharing the current flush color */
2888 list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) {
2889 if (next->flush_color != wq->flush_color)
2890 break;
2891 list_del_init(&next->list);
2892 complete(&next->done);
2893 }
2894
2895 WARN_ON_ONCE(!list_empty(&wq->flusher_overflow) &&
2896 wq->flush_color != work_next_color(wq->work_color));
2897
2898 /* this flush_color is finished, advance by one */
2899 wq->flush_color = work_next_color(wq->flush_color);
2900
2901 /* one color has been freed, handle overflow queue */
2902 if (!list_empty(&wq->flusher_overflow)) {
2903 /*
2904 * Assign the same color to all overflowed
2905 * flushers, advance work_color and append to
2906 * flusher_queue. This is the start-to-wait
2907 * phase for these overflowed flushers.
2908 */
2909 list_for_each_entry(tmp, &wq->flusher_overflow, list)
2910 tmp->flush_color = wq->work_color;
2911
2912 wq->work_color = work_next_color(wq->work_color);
2913
2914 list_splice_tail_init(&wq->flusher_overflow,
2915 &wq->flusher_queue);
2916 flush_workqueue_prep_pwqs(wq, -1, wq->work_color);
2917 }
2918
2919 if (list_empty(&wq->flusher_queue)) {
2920 WARN_ON_ONCE(wq->flush_color != wq->work_color);
2921 break;
2922 }
2923
2924 /*
2925 * Need to flush more colors. Make the next flusher
2926 * the new first flusher and arm pwqs.
2927 */
2928 WARN_ON_ONCE(wq->flush_color == wq->work_color);
2929 WARN_ON_ONCE(wq->flush_color != next->flush_color);
2930
2931 list_del_init(&next->list);
2932 wq->first_flusher = next;
2933
2934 if (flush_workqueue_prep_pwqs(wq, wq->flush_color, -1))
2935 break;
2936
2937 /*
2938 * Meh... this color is already done, clear first
2939 * flusher and repeat cascading.
2940 */
2941 wq->first_flusher = NULL;
2942 }
2943
2944 out_unlock:
2945 mutex_unlock(&wq->mutex);
2946 }
2947 EXPORT_SYMBOL(flush_workqueue);
2948
2949 /**
2950 * drain_workqueue - drain a workqueue
2951 * @wq: workqueue to drain
2952 *
2953 * Wait until the workqueue becomes empty. While draining is in progress,
2954 * only chain queueing is allowed. IOW, only currently pending or running
2955 * work items on @wq can queue further work items on it. @wq is flushed
2956 * repeatedly until it becomes empty. The number of flushing is determined
2957 * by the depth of chaining and should be relatively short. Whine if it
2958 * takes too long.
2959 */
drain_workqueue(struct workqueue_struct * wq)2960 void drain_workqueue(struct workqueue_struct *wq)
2961 {
2962 unsigned int flush_cnt = 0;
2963 struct pool_workqueue *pwq;
2964
2965 /*
2966 * __queue_work() needs to test whether there are drainers, is much
2967 * hotter than drain_workqueue() and already looks at @wq->flags.
2968 * Use __WQ_DRAINING so that queue doesn't have to check nr_drainers.
2969 */
2970 mutex_lock(&wq->mutex);
2971 if (!wq->nr_drainers++)
2972 wq->flags |= __WQ_DRAINING;
2973 mutex_unlock(&wq->mutex);
2974 reflush:
2975 flush_workqueue(wq);
2976
2977 mutex_lock(&wq->mutex);
2978
2979 for_each_pwq(pwq, wq) {
2980 bool drained;
2981
2982 raw_spin_lock_irq(&pwq->pool->lock);
2983 drained = !pwq->nr_active && list_empty(&pwq->delayed_works);
2984 raw_spin_unlock_irq(&pwq->pool->lock);
2985
2986 if (drained)
2987 continue;
2988
2989 if (++flush_cnt == 10 ||
2990 (flush_cnt % 100 == 0 && flush_cnt <= 1000))
2991 pr_warn("workqueue %s: drain_workqueue() isn't complete after %u tries\n",
2992 wq->name, flush_cnt);
2993
2994 mutex_unlock(&wq->mutex);
2995 goto reflush;
2996 }
2997
2998 if (!--wq->nr_drainers)
2999 wq->flags &= ~__WQ_DRAINING;
3000 mutex_unlock(&wq->mutex);
3001 }
3002 EXPORT_SYMBOL_GPL(drain_workqueue);
3003
start_flush_work(struct work_struct * work,struct wq_barrier * barr,bool from_cancel)3004 static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
3005 bool from_cancel)
3006 {
3007 struct worker *worker = NULL;
3008 struct worker_pool *pool;
3009 struct pool_workqueue *pwq;
3010
3011 might_sleep();
3012
3013 rcu_read_lock();
3014 pool = get_work_pool(work);
3015 if (!pool) {
3016 rcu_read_unlock();
3017 return false;
3018 }
3019
3020 raw_spin_lock_irq(&pool->lock);
3021 /* see the comment in try_to_grab_pending() with the same code */
3022 pwq = get_work_pwq(work);
3023 if (pwq) {
3024 if (unlikely(pwq->pool != pool))
3025 goto already_gone;
3026 } else {
3027 worker = find_worker_executing_work(pool, work);
3028 if (!worker)
3029 goto already_gone;
3030 pwq = worker->current_pwq;
3031 }
3032
3033 check_flush_dependency(pwq->wq, work);
3034
3035 insert_wq_barrier(pwq, barr, work, worker);
3036 raw_spin_unlock_irq(&pool->lock);
3037
3038 /*
3039 * Force a lock recursion deadlock when using flush_work() inside a
3040 * single-threaded or rescuer equipped workqueue.
3041 *
3042 * For single threaded workqueues the deadlock happens when the work
3043 * is after the work issuing the flush_work(). For rescuer equipped
3044 * workqueues the deadlock happens when the rescuer stalls, blocking
3045 * forward progress.
3046 */
3047 if (!from_cancel &&
3048 (pwq->wq->saved_max_active == 1 || pwq->wq->rescuer)) {
3049 lock_map_acquire(&pwq->wq->lockdep_map);
3050 lock_map_release(&pwq->wq->lockdep_map);
3051 }
3052 rcu_read_unlock();
3053 return true;
3054 already_gone:
3055 raw_spin_unlock_irq(&pool->lock);
3056 rcu_read_unlock();
3057 return false;
3058 }
3059
__flush_work(struct work_struct * work,bool from_cancel)3060 static bool __flush_work(struct work_struct *work, bool from_cancel)
3061 {
3062 struct wq_barrier barr;
3063
3064 if (WARN_ON(!wq_online))
3065 return false;
3066
3067 if (WARN_ON(!work->func))
3068 return false;
3069
3070 lock_map_acquire(&work->lockdep_map);
3071 lock_map_release(&work->lockdep_map);
3072
3073 if (start_flush_work(work, &barr, from_cancel)) {
3074 wait_for_completion(&barr.done);
3075 destroy_work_on_stack(&barr.work);
3076 return true;
3077 } else {
3078 return false;
3079 }
3080 }
3081
3082 /**
3083 * flush_work - wait for a work to finish executing the last queueing instance
3084 * @work: the work to flush
3085 *
3086 * Wait until @work has finished execution. @work is guaranteed to be idle
3087 * on return if it hasn't been requeued since flush started.
3088 *
3089 * Return:
3090 * %true if flush_work() waited for the work to finish execution,
3091 * %false if it was already idle.
3092 */
flush_work(struct work_struct * work)3093 bool flush_work(struct work_struct *work)
3094 {
3095 return __flush_work(work, false);
3096 }
3097 EXPORT_SYMBOL_GPL(flush_work);
3098
3099 struct cwt_wait {
3100 wait_queue_entry_t wait;
3101 struct work_struct *work;
3102 };
3103
cwt_wakefn(wait_queue_entry_t * wait,unsigned mode,int sync,void * key)3104 static int cwt_wakefn(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
3105 {
3106 struct cwt_wait *cwait = container_of(wait, struct cwt_wait, wait);
3107
3108 if (cwait->work != key)
3109 return 0;
3110 return autoremove_wake_function(wait, mode, sync, key);
3111 }
3112
__cancel_work_timer(struct work_struct * work,bool is_dwork)3113 static bool __cancel_work_timer(struct work_struct *work, bool is_dwork)
3114 {
3115 static DECLARE_WAIT_QUEUE_HEAD(cancel_waitq);
3116 unsigned long flags;
3117 int ret;
3118
3119 do {
3120 ret = try_to_grab_pending(work, is_dwork, &flags);
3121 /*
3122 * If someone else is already canceling, wait for it to
3123 * finish. flush_work() doesn't work for PREEMPT_NONE
3124 * because we may get scheduled between @work's completion
3125 * and the other canceling task resuming and clearing
3126 * CANCELING - flush_work() will return false immediately
3127 * as @work is no longer busy, try_to_grab_pending() will
3128 * return -ENOENT as @work is still being canceled and the
3129 * other canceling task won't be able to clear CANCELING as
3130 * we're hogging the CPU.
3131 *
3132 * Let's wait for completion using a waitqueue. As this
3133 * may lead to the thundering herd problem, use a custom
3134 * wake function which matches @work along with exclusive
3135 * wait and wakeup.
3136 */
3137 if (unlikely(ret == -ENOENT)) {
3138 struct cwt_wait cwait;
3139
3140 init_wait(&cwait.wait);
3141 cwait.wait.func = cwt_wakefn;
3142 cwait.work = work;
3143
3144 prepare_to_wait_exclusive(&cancel_waitq, &cwait.wait,
3145 TASK_UNINTERRUPTIBLE);
3146 if (work_is_canceling(work))
3147 schedule();
3148 finish_wait(&cancel_waitq, &cwait.wait);
3149 }
3150 } while (unlikely(ret < 0));
3151
3152 /* tell other tasks trying to grab @work to back off */
3153 mark_work_canceling(work);
3154 local_irq_restore(flags);
3155
3156 /*
3157 * This allows canceling during early boot. We know that @work
3158 * isn't executing.
3159 */
3160 if (wq_online)
3161 __flush_work(work, true);
3162
3163 clear_work_data(work);
3164
3165 /*
3166 * Paired with prepare_to_wait() above so that either
3167 * waitqueue_active() is visible here or !work_is_canceling() is
3168 * visible there.
3169 */
3170 smp_mb();
3171 if (waitqueue_active(&cancel_waitq))
3172 __wake_up(&cancel_waitq, TASK_NORMAL, 1, work);
3173
3174 return ret;
3175 }
3176
3177 /**
3178 * cancel_work_sync - cancel a work and wait for it to finish
3179 * @work: the work to cancel
3180 *
3181 * Cancel @work and wait for its execution to finish. This function
3182 * can be used even if the work re-queues itself or migrates to
3183 * another workqueue. On return from this function, @work is
3184 * guaranteed to be not pending or executing on any CPU.
3185 *
3186 * cancel_work_sync(&delayed_work->work) must not be used for
3187 * delayed_work's. Use cancel_delayed_work_sync() instead.
3188 *
3189 * The caller must ensure that the workqueue on which @work was last
3190 * queued can't be destroyed before this function returns.
3191 *
3192 * Return:
3193 * %true if @work was pending, %false otherwise.
3194 */
cancel_work_sync(struct work_struct * work)3195 bool cancel_work_sync(struct work_struct *work)
3196 {
3197 return __cancel_work_timer(work, false);
3198 }
3199 EXPORT_SYMBOL_GPL(cancel_work_sync);
3200
3201 /**
3202 * flush_delayed_work - wait for a dwork to finish executing the last queueing
3203 * @dwork: the delayed work to flush
3204 *
3205 * Delayed timer is cancelled and the pending work is queued for
3206 * immediate execution. Like flush_work(), this function only
3207 * considers the last queueing instance of @dwork.
3208 *
3209 * Return:
3210 * %true if flush_work() waited for the work to finish execution,
3211 * %false if it was already idle.
3212 */
flush_delayed_work(struct delayed_work * dwork)3213 bool flush_delayed_work(struct delayed_work *dwork)
3214 {
3215 local_irq_disable();
3216 if (del_timer_sync(&dwork->timer))
3217 __queue_work(dwork->cpu, dwork->wq, &dwork->work);
3218 local_irq_enable();
3219 return flush_work(&dwork->work);
3220 }
3221 EXPORT_SYMBOL(flush_delayed_work);
3222
3223 /**
3224 * flush_rcu_work - wait for a rwork to finish executing the last queueing
3225 * @rwork: the rcu work to flush
3226 *
3227 * Return:
3228 * %true if flush_rcu_work() waited for the work to finish execution,
3229 * %false if it was already idle.
3230 */
flush_rcu_work(struct rcu_work * rwork)3231 bool flush_rcu_work(struct rcu_work *rwork)
3232 {
3233 if (test_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&rwork->work))) {
3234 rcu_barrier();
3235 flush_work(&rwork->work);
3236 return true;
3237 } else {
3238 return flush_work(&rwork->work);
3239 }
3240 }
3241 EXPORT_SYMBOL(flush_rcu_work);
3242
__cancel_work(struct work_struct * work,bool is_dwork)3243 static bool __cancel_work(struct work_struct *work, bool is_dwork)
3244 {
3245 unsigned long flags;
3246 int ret;
3247
3248 do {
3249 ret = try_to_grab_pending(work, is_dwork, &flags);
3250 } while (unlikely(ret == -EAGAIN));
3251
3252 if (unlikely(ret < 0))
3253 return false;
3254
3255 set_work_pool_and_clear_pending(work, get_work_pool_id(work));
3256 local_irq_restore(flags);
3257 return ret;
3258 }
3259
3260 /*
3261 * See cancel_delayed_work()
3262 */
cancel_work(struct work_struct * work)3263 bool cancel_work(struct work_struct *work)
3264 {
3265 return __cancel_work(work, false);
3266 }
3267 EXPORT_SYMBOL(cancel_work);
3268
3269 /**
3270 * cancel_delayed_work - cancel a delayed work
3271 * @dwork: delayed_work to cancel
3272 *
3273 * Kill off a pending delayed_work.
3274 *
3275 * Return: %true if @dwork was pending and canceled; %false if it wasn't
3276 * pending.
3277 *
3278 * Note:
3279 * The work callback function may still be running on return, unless
3280 * it returns %true and the work doesn't re-arm itself. Explicitly flush or
3281 * use cancel_delayed_work_sync() to wait on it.
3282 *
3283 * This function is safe to call from any context including IRQ handler.
3284 */
cancel_delayed_work(struct delayed_work * dwork)3285 bool cancel_delayed_work(struct delayed_work *dwork)
3286 {
3287 return __cancel_work(&dwork->work, true);
3288 }
3289 EXPORT_SYMBOL(cancel_delayed_work);
3290
3291 /**
3292 * cancel_delayed_work_sync - cancel a delayed work and wait for it to finish
3293 * @dwork: the delayed work cancel
3294 *
3295 * This is cancel_work_sync() for delayed works.
3296 *
3297 * Return:
3298 * %true if @dwork was pending, %false otherwise.
3299 */
cancel_delayed_work_sync(struct delayed_work * dwork)3300 bool cancel_delayed_work_sync(struct delayed_work *dwork)
3301 {
3302 return __cancel_work_timer(&dwork->work, true);
3303 }
3304 EXPORT_SYMBOL(cancel_delayed_work_sync);
3305
3306 /**
3307 * schedule_on_each_cpu - execute a function synchronously on each online CPU
3308 * @func: the function to call
3309 *
3310 * schedule_on_each_cpu() executes @func on each online CPU using the
3311 * system workqueue and blocks until all CPUs have completed.
3312 * schedule_on_each_cpu() is very slow.
3313 *
3314 * Return:
3315 * 0 on success, -errno on failure.
3316 */
schedule_on_each_cpu(work_func_t func)3317 int schedule_on_each_cpu(work_func_t func)
3318 {
3319 int cpu;
3320 struct work_struct __percpu *works;
3321
3322 works = alloc_percpu(struct work_struct);
3323 if (!works)
3324 return -ENOMEM;
3325
3326 get_online_cpus();
3327
3328 for_each_online_cpu(cpu) {
3329 struct work_struct *work = per_cpu_ptr(works, cpu);
3330
3331 INIT_WORK(work, func);
3332 schedule_work_on(cpu, work);
3333 }
3334
3335 for_each_online_cpu(cpu)
3336 flush_work(per_cpu_ptr(works, cpu));
3337
3338 put_online_cpus();
3339 free_percpu(works);
3340 return 0;
3341 }
3342
3343 /**
3344 * execute_in_process_context - reliably execute the routine with user context
3345 * @fn: the function to execute
3346 * @ew: guaranteed storage for the execute work structure (must
3347 * be available when the work executes)
3348 *
3349 * Executes the function immediately if process context is available,
3350 * otherwise schedules the function for delayed execution.
3351 *
3352 * Return: 0 - function was executed
3353 * 1 - function was scheduled for execution
3354 */
execute_in_process_context(work_func_t fn,struct execute_work * ew)3355 int execute_in_process_context(work_func_t fn, struct execute_work *ew)
3356 {
3357 if (!in_interrupt()) {
3358 fn(&ew->work);
3359 return 0;
3360 }
3361
3362 INIT_WORK(&ew->work, fn);
3363 schedule_work(&ew->work);
3364
3365 return 1;
3366 }
3367 EXPORT_SYMBOL_GPL(execute_in_process_context);
3368
3369 /**
3370 * free_workqueue_attrs - free a workqueue_attrs
3371 * @attrs: workqueue_attrs to free
3372 *
3373 * Undo alloc_workqueue_attrs().
3374 */
free_workqueue_attrs(struct workqueue_attrs * attrs)3375 void free_workqueue_attrs(struct workqueue_attrs *attrs)
3376 {
3377 if (attrs) {
3378 free_cpumask_var(attrs->cpumask);
3379 kfree(attrs);
3380 }
3381 }
3382
3383 /**
3384 * alloc_workqueue_attrs - allocate a workqueue_attrs
3385 *
3386 * Allocate a new workqueue_attrs, initialize with default settings and
3387 * return it.
3388 *
3389 * Return: The allocated new workqueue_attr on success. %NULL on failure.
3390 */
alloc_workqueue_attrs(void)3391 struct workqueue_attrs *alloc_workqueue_attrs(void)
3392 {
3393 struct workqueue_attrs *attrs;
3394
3395 attrs = kzalloc(sizeof(*attrs), GFP_KERNEL);
3396 if (!attrs)
3397 goto fail;
3398 if (!alloc_cpumask_var(&attrs->cpumask, GFP_KERNEL))
3399 goto fail;
3400
3401 cpumask_copy(attrs->cpumask, cpu_possible_mask);
3402 return attrs;
3403 fail:
3404 free_workqueue_attrs(attrs);
3405 return NULL;
3406 }
3407
copy_workqueue_attrs(struct workqueue_attrs * to,const struct workqueue_attrs * from)3408 static void copy_workqueue_attrs(struct workqueue_attrs *to,
3409 const struct workqueue_attrs *from)
3410 {
3411 to->nice = from->nice;
3412 cpumask_copy(to->cpumask, from->cpumask);
3413 /*
3414 * Unlike hash and equality test, this function doesn't ignore
3415 * ->no_numa as it is used for both pool and wq attrs. Instead,
3416 * get_unbound_pool() explicitly clears ->no_numa after copying.
3417 */
3418 to->no_numa = from->no_numa;
3419 }
3420
3421 /* hash value of the content of @attr */
wqattrs_hash(const struct workqueue_attrs * attrs)3422 static u32 wqattrs_hash(const struct workqueue_attrs *attrs)
3423 {
3424 u32 hash = 0;
3425
3426 hash = jhash_1word(attrs->nice, hash);
3427 hash = jhash(cpumask_bits(attrs->cpumask),
3428 BITS_TO_LONGS(nr_cpumask_bits) * sizeof(long), hash);
3429 return hash;
3430 }
3431
3432 /* content equality test */
wqattrs_equal(const struct workqueue_attrs * a,const struct workqueue_attrs * b)3433 static bool wqattrs_equal(const struct workqueue_attrs *a,
3434 const struct workqueue_attrs *b)
3435 {
3436 if (a->nice != b->nice)
3437 return false;
3438 if (!cpumask_equal(a->cpumask, b->cpumask))
3439 return false;
3440 return true;
3441 }
3442
3443 /**
3444 * init_worker_pool - initialize a newly zalloc'd worker_pool
3445 * @pool: worker_pool to initialize
3446 *
3447 * Initialize a newly zalloc'd @pool. It also allocates @pool->attrs.
3448 *
3449 * Return: 0 on success, -errno on failure. Even on failure, all fields
3450 * inside @pool proper are initialized and put_unbound_pool() can be called
3451 * on @pool safely to release it.
3452 */
init_worker_pool(struct worker_pool * pool)3453 static int init_worker_pool(struct worker_pool *pool)
3454 {
3455 raw_spin_lock_init(&pool->lock);
3456 pool->id = -1;
3457 pool->cpu = -1;
3458 pool->node = NUMA_NO_NODE;
3459 pool->flags |= POOL_DISASSOCIATED;
3460 pool->watchdog_ts = jiffies;
3461 INIT_LIST_HEAD(&pool->worklist);
3462 INIT_LIST_HEAD(&pool->idle_list);
3463 hash_init(pool->busy_hash);
3464
3465 timer_setup(&pool->idle_timer, idle_worker_timeout, TIMER_DEFERRABLE);
3466
3467 timer_setup(&pool->mayday_timer, pool_mayday_timeout, 0);
3468
3469 INIT_LIST_HEAD(&pool->workers);
3470
3471 ida_init(&pool->worker_ida);
3472 INIT_HLIST_NODE(&pool->hash_node);
3473 pool->refcnt = 1;
3474
3475 /* shouldn't fail above this point */
3476 pool->attrs = alloc_workqueue_attrs();
3477 if (!pool->attrs)
3478 return -ENOMEM;
3479 return 0;
3480 }
3481
3482 #ifdef CONFIG_LOCKDEP
wq_init_lockdep(struct workqueue_struct * wq)3483 static void wq_init_lockdep(struct workqueue_struct *wq)
3484 {
3485 char *lock_name;
3486
3487 lockdep_register_key(&wq->key);
3488 lock_name = kasprintf(GFP_KERNEL, "%s%s", "(wq_completion)", wq->name);
3489 if (!lock_name)
3490 lock_name = wq->name;
3491
3492 wq->lock_name = lock_name;
3493 lockdep_init_map(&wq->lockdep_map, lock_name, &wq->key, 0);
3494 }
3495
wq_unregister_lockdep(struct workqueue_struct * wq)3496 static void wq_unregister_lockdep(struct workqueue_struct *wq)
3497 {
3498 lockdep_unregister_key(&wq->key);
3499 }
3500
wq_free_lockdep(struct workqueue_struct * wq)3501 static void wq_free_lockdep(struct workqueue_struct *wq)
3502 {
3503 if (wq->lock_name != wq->name)
3504 kfree(wq->lock_name);
3505 }
3506 #else
wq_init_lockdep(struct workqueue_struct * wq)3507 static void wq_init_lockdep(struct workqueue_struct *wq)
3508 {
3509 }
3510
wq_unregister_lockdep(struct workqueue_struct * wq)3511 static void wq_unregister_lockdep(struct workqueue_struct *wq)
3512 {
3513 }
3514
wq_free_lockdep(struct workqueue_struct * wq)3515 static void wq_free_lockdep(struct workqueue_struct *wq)
3516 {
3517 }
3518 #endif
3519
rcu_free_wq(struct rcu_head * rcu)3520 static void rcu_free_wq(struct rcu_head *rcu)
3521 {
3522 struct workqueue_struct *wq =
3523 container_of(rcu, struct workqueue_struct, rcu);
3524
3525 wq_free_lockdep(wq);
3526
3527 if (!(wq->flags & WQ_UNBOUND))
3528 free_percpu(wq->cpu_pwqs);
3529 else
3530 free_workqueue_attrs(wq->unbound_attrs);
3531
3532 kfree(wq);
3533 }
3534
rcu_free_pool(struct rcu_head * rcu)3535 static void rcu_free_pool(struct rcu_head *rcu)
3536 {
3537 struct worker_pool *pool = container_of(rcu, struct worker_pool, rcu);
3538
3539 ida_destroy(&pool->worker_ida);
3540 free_workqueue_attrs(pool->attrs);
3541 kfree(pool);
3542 }
3543
3544 /* This returns with the lock held on success (pool manager is inactive). */
wq_manager_inactive(struct worker_pool * pool)3545 static bool wq_manager_inactive(struct worker_pool *pool)
3546 {
3547 raw_spin_lock_irq(&pool->lock);
3548
3549 if (pool->flags & POOL_MANAGER_ACTIVE) {
3550 raw_spin_unlock_irq(&pool->lock);
3551 return false;
3552 }
3553 return true;
3554 }
3555
3556 /**
3557 * put_unbound_pool - put a worker_pool
3558 * @pool: worker_pool to put
3559 *
3560 * Put @pool. If its refcnt reaches zero, it gets destroyed in RCU
3561 * safe manner. get_unbound_pool() calls this function on its failure path
3562 * and this function should be able to release pools which went through,
3563 * successfully or not, init_worker_pool().
3564 *
3565 * Should be called with wq_pool_mutex held.
3566 */
put_unbound_pool(struct worker_pool * pool)3567 static void put_unbound_pool(struct worker_pool *pool)
3568 {
3569 DECLARE_COMPLETION_ONSTACK(detach_completion);
3570 struct worker *worker;
3571
3572 lockdep_assert_held(&wq_pool_mutex);
3573
3574 if (--pool->refcnt)
3575 return;
3576
3577 /* sanity checks */
3578 if (WARN_ON(!(pool->cpu < 0)) ||
3579 WARN_ON(!list_empty(&pool->worklist)))
3580 return;
3581
3582 /* release id and unhash */
3583 if (pool->id >= 0)
3584 idr_remove(&worker_pool_idr, pool->id);
3585 hash_del(&pool->hash_node);
3586
3587 /*
3588 * Become the manager and destroy all workers. This prevents
3589 * @pool's workers from blocking on attach_mutex. We're the last
3590 * manager and @pool gets freed with the flag set.
3591 * Because of how wq_manager_inactive() works, we will hold the
3592 * spinlock after a successful wait.
3593 */
3594 rcuwait_wait_event(&manager_wait, wq_manager_inactive(pool),
3595 TASK_UNINTERRUPTIBLE);
3596 pool->flags |= POOL_MANAGER_ACTIVE;
3597
3598 while ((worker = first_idle_worker(pool)))
3599 destroy_worker(worker);
3600 WARN_ON(pool->nr_workers || pool->nr_idle);
3601 raw_spin_unlock_irq(&pool->lock);
3602
3603 mutex_lock(&wq_pool_attach_mutex);
3604 if (!list_empty(&pool->workers))
3605 pool->detach_completion = &detach_completion;
3606 mutex_unlock(&wq_pool_attach_mutex);
3607
3608 if (pool->detach_completion)
3609 wait_for_completion(pool->detach_completion);
3610
3611 /* shut down the timers */
3612 del_timer_sync(&pool->idle_timer);
3613 del_timer_sync(&pool->mayday_timer);
3614
3615 /* RCU protected to allow dereferences from get_work_pool() */
3616 call_rcu(&pool->rcu, rcu_free_pool);
3617 }
3618
3619 /**
3620 * get_unbound_pool - get a worker_pool with the specified attributes
3621 * @attrs: the attributes of the worker_pool to get
3622 *
3623 * Obtain a worker_pool which has the same attributes as @attrs, bump the
3624 * reference count and return it. If there already is a matching
3625 * worker_pool, it will be used; otherwise, this function attempts to
3626 * create a new one.
3627 *
3628 * Should be called with wq_pool_mutex held.
3629 *
3630 * Return: On success, a worker_pool with the same attributes as @attrs.
3631 * On failure, %NULL.
3632 */
get_unbound_pool(const struct workqueue_attrs * attrs)3633 static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
3634 {
3635 u32 hash = wqattrs_hash(attrs);
3636 struct worker_pool *pool;
3637 int node;
3638 int target_node = NUMA_NO_NODE;
3639
3640 lockdep_assert_held(&wq_pool_mutex);
3641
3642 /* do we already have a matching pool? */
3643 hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
3644 if (wqattrs_equal(pool->attrs, attrs)) {
3645 pool->refcnt++;
3646 return pool;
3647 }
3648 }
3649
3650 /* if cpumask is contained inside a NUMA node, we belong to that node */
3651 if (wq_numa_enabled) {
3652 for_each_node(node) {
3653 if (cpumask_subset(attrs->cpumask,
3654 wq_numa_possible_cpumask[node])) {
3655 target_node = node;
3656 break;
3657 }
3658 }
3659 }
3660
3661 /* nope, create a new one */
3662 pool = kzalloc_node(sizeof(*pool), GFP_KERNEL, target_node);
3663 if (!pool || init_worker_pool(pool) < 0)
3664 goto fail;
3665
3666 lockdep_set_subclass(&pool->lock, 1); /* see put_pwq() */
3667 copy_workqueue_attrs(pool->attrs, attrs);
3668 pool->node = target_node;
3669
3670 /*
3671 * no_numa isn't a worker_pool attribute, always clear it. See
3672 * 'struct workqueue_attrs' comments for detail.
3673 */
3674 pool->attrs->no_numa = false;
3675
3676 if (worker_pool_assign_id(pool) < 0)
3677 goto fail;
3678
3679 /* create and start the initial worker */
3680 if (wq_online && !create_worker(pool))
3681 goto fail;
3682
3683 /* install */
3684 hash_add(unbound_pool_hash, &pool->hash_node, hash);
3685
3686 return pool;
3687 fail:
3688 if (pool)
3689 put_unbound_pool(pool);
3690 return NULL;
3691 }
3692
rcu_free_pwq(struct rcu_head * rcu)3693 static void rcu_free_pwq(struct rcu_head *rcu)
3694 {
3695 kmem_cache_free(pwq_cache,
3696 container_of(rcu, struct pool_workqueue, rcu));
3697 }
3698
3699 /*
3700 * Scheduled on system_wq by put_pwq() when an unbound pwq hits zero refcnt
3701 * and needs to be destroyed.
3702 */
pwq_unbound_release_workfn(struct work_struct * work)3703 static void pwq_unbound_release_workfn(struct work_struct *work)
3704 {
3705 struct pool_workqueue *pwq = container_of(work, struct pool_workqueue,
3706 unbound_release_work);
3707 struct workqueue_struct *wq = pwq->wq;
3708 struct worker_pool *pool = pwq->pool;
3709 bool is_last = false;
3710
3711 /*
3712 * when @pwq is not linked, it doesn't hold any reference to the
3713 * @wq, and @wq is invalid to access.
3714 */
3715 if (!list_empty(&pwq->pwqs_node)) {
3716 if (WARN_ON_ONCE(!(wq->flags & WQ_UNBOUND)))
3717 return;
3718
3719 mutex_lock(&wq->mutex);
3720 list_del_rcu(&pwq->pwqs_node);
3721 is_last = list_empty(&wq->pwqs);
3722 mutex_unlock(&wq->mutex);
3723 }
3724
3725 mutex_lock(&wq_pool_mutex);
3726 put_unbound_pool(pool);
3727 mutex_unlock(&wq_pool_mutex);
3728
3729 call_rcu(&pwq->rcu, rcu_free_pwq);
3730
3731 /*
3732 * If we're the last pwq going away, @wq is already dead and no one
3733 * is gonna access it anymore. Schedule RCU free.
3734 */
3735 if (is_last) {
3736 wq_unregister_lockdep(wq);
3737 call_rcu(&wq->rcu, rcu_free_wq);
3738 }
3739 }
3740
3741 /**
3742 * pwq_adjust_max_active - update a pwq's max_active to the current setting
3743 * @pwq: target pool_workqueue
3744 *
3745 * If @pwq isn't freezing, set @pwq->max_active to the associated
3746 * workqueue's saved_max_active and activate delayed work items
3747 * accordingly. If @pwq is freezing, clear @pwq->max_active to zero.
3748 */
pwq_adjust_max_active(struct pool_workqueue * pwq)3749 static void pwq_adjust_max_active(struct pool_workqueue *pwq)
3750 {
3751 struct workqueue_struct *wq = pwq->wq;
3752 bool freezable = wq->flags & WQ_FREEZABLE;
3753 unsigned long flags;
3754
3755 /* for @wq->saved_max_active */
3756 lockdep_assert_held(&wq->mutex);
3757
3758 /* fast exit for non-freezable wqs */
3759 if (!freezable && pwq->max_active == wq->saved_max_active)
3760 return;
3761
3762 /* this function can be called during early boot w/ irq disabled */
3763 raw_spin_lock_irqsave(&pwq->pool->lock, flags);
3764
3765 /*
3766 * During [un]freezing, the caller is responsible for ensuring that
3767 * this function is called at least once after @workqueue_freezing
3768 * is updated and visible.
3769 */
3770 if (!freezable || !workqueue_freezing) {
3771 bool kick = false;
3772
3773 pwq->max_active = wq->saved_max_active;
3774
3775 while (!list_empty(&pwq->delayed_works) &&
3776 pwq->nr_active < pwq->max_active) {
3777 pwq_activate_first_delayed(pwq);
3778 kick = true;
3779 }
3780
3781 /*
3782 * Need to kick a worker after thawed or an unbound wq's
3783 * max_active is bumped. In realtime scenarios, always kicking a
3784 * worker will cause interference on the isolated cpu cores, so
3785 * let's kick iff work items were activated.
3786 */
3787 if (kick)
3788 wake_up_worker(pwq->pool);
3789 } else {
3790 pwq->max_active = 0;
3791 }
3792
3793 raw_spin_unlock_irqrestore(&pwq->pool->lock, flags);
3794 }
3795
3796 /* initialize newly alloced @pwq which is associated with @wq and @pool */
init_pwq(struct pool_workqueue * pwq,struct workqueue_struct * wq,struct worker_pool * pool)3797 static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,
3798 struct worker_pool *pool)
3799 {
3800 BUG_ON((unsigned long)pwq & WORK_STRUCT_FLAG_MASK);
3801
3802 memset(pwq, 0, sizeof(*pwq));
3803
3804 pwq->pool = pool;
3805 pwq->wq = wq;
3806 pwq->flush_color = -1;
3807 pwq->refcnt = 1;
3808 INIT_LIST_HEAD(&pwq->delayed_works);
3809 INIT_LIST_HEAD(&pwq->pwqs_node);
3810 INIT_LIST_HEAD(&pwq->mayday_node);
3811 INIT_WORK(&pwq->unbound_release_work, pwq_unbound_release_workfn);
3812 }
3813
3814 /* sync @pwq with the current state of its associated wq and link it */
link_pwq(struct pool_workqueue * pwq)3815 static void link_pwq(struct pool_workqueue *pwq)
3816 {
3817 struct workqueue_struct *wq = pwq->wq;
3818
3819 lockdep_assert_held(&wq->mutex);
3820
3821 /* may be called multiple times, ignore if already linked */
3822 if (!list_empty(&pwq->pwqs_node))
3823 return;
3824
3825 /* set the matching work_color */
3826 pwq->work_color = wq->work_color;
3827
3828 /* sync max_active to the current setting */
3829 pwq_adjust_max_active(pwq);
3830
3831 /* link in @pwq */
3832 list_add_rcu(&pwq->pwqs_node, &wq->pwqs);
3833 }
3834
3835 /* obtain a pool matching @attr and create a pwq associating the pool and @wq */
alloc_unbound_pwq(struct workqueue_struct * wq,const struct workqueue_attrs * attrs)3836 static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
3837 const struct workqueue_attrs *attrs)
3838 {
3839 struct worker_pool *pool;
3840 struct pool_workqueue *pwq;
3841
3842 lockdep_assert_held(&wq_pool_mutex);
3843
3844 pool = get_unbound_pool(attrs);
3845 if (!pool)
3846 return NULL;
3847
3848 pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
3849 if (!pwq) {
3850 put_unbound_pool(pool);
3851 return NULL;
3852 }
3853
3854 init_pwq(pwq, wq, pool);
3855 return pwq;
3856 }
3857
3858 /**
3859 * wq_calc_node_cpumask - calculate a wq_attrs' cpumask for the specified node
3860 * @attrs: the wq_attrs of the default pwq of the target workqueue
3861 * @node: the target NUMA node
3862 * @cpu_going_down: if >= 0, the CPU to consider as offline
3863 * @cpumask: outarg, the resulting cpumask
3864 *
3865 * Calculate the cpumask a workqueue with @attrs should use on @node. If
3866 * @cpu_going_down is >= 0, that cpu is considered offline during
3867 * calculation. The result is stored in @cpumask.
3868 *
3869 * If NUMA affinity is not enabled, @attrs->cpumask is always used. If
3870 * enabled and @node has online CPUs requested by @attrs, the returned
3871 * cpumask is the intersection of the possible CPUs of @node and
3872 * @attrs->cpumask.
3873 *
3874 * The caller is responsible for ensuring that the cpumask of @node stays
3875 * stable.
3876 *
3877 * Return: %true if the resulting @cpumask is different from @attrs->cpumask,
3878 * %false if equal.
3879 */
wq_calc_node_cpumask(const struct workqueue_attrs * attrs,int node,int cpu_going_down,cpumask_t * cpumask)3880 static bool wq_calc_node_cpumask(const struct workqueue_attrs *attrs, int node,
3881 int cpu_going_down, cpumask_t *cpumask)
3882 {
3883 if (!wq_numa_enabled || attrs->no_numa)
3884 goto use_dfl;
3885
3886 /* does @node have any online CPUs @attrs wants? */
3887 cpumask_and(cpumask, cpumask_of_node(node), attrs->cpumask);
3888 if (cpu_going_down >= 0)
3889 cpumask_clear_cpu(cpu_going_down, cpumask);
3890
3891 if (cpumask_empty(cpumask))
3892 goto use_dfl;
3893
3894 /* yeap, return possible CPUs in @node that @attrs wants */
3895 cpumask_and(cpumask, attrs->cpumask, wq_numa_possible_cpumask[node]);
3896
3897 if (cpumask_empty(cpumask)) {
3898 pr_warn_once("WARNING: workqueue cpumask: online intersect > "
3899 "possible intersect\n");
3900 return false;
3901 }
3902
3903 return !cpumask_equal(cpumask, attrs->cpumask);
3904
3905 use_dfl:
3906 cpumask_copy(cpumask, attrs->cpumask);
3907 return false;
3908 }
3909
3910 /* install @pwq into @wq's numa_pwq_tbl[] for @node and return the old pwq */
numa_pwq_tbl_install(struct workqueue_struct * wq,int node,struct pool_workqueue * pwq)3911 static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq,
3912 int node,
3913 struct pool_workqueue *pwq)
3914 {
3915 struct pool_workqueue *old_pwq;
3916
3917 lockdep_assert_held(&wq_pool_mutex);
3918 lockdep_assert_held(&wq->mutex);
3919
3920 /* link_pwq() can handle duplicate calls */
3921 link_pwq(pwq);
3922
3923 old_pwq = rcu_access_pointer(wq->numa_pwq_tbl[node]);
3924 rcu_assign_pointer(wq->numa_pwq_tbl[node], pwq);
3925 return old_pwq;
3926 }
3927
3928 /* context to store the prepared attrs & pwqs before applying */
3929 struct apply_wqattrs_ctx {
3930 struct workqueue_struct *wq; /* target workqueue */
3931 struct workqueue_attrs *attrs; /* attrs to apply */
3932 struct list_head list; /* queued for batching commit */
3933 struct pool_workqueue *dfl_pwq;
3934 struct pool_workqueue *pwq_tbl[];
3935 };
3936
3937 /* free the resources after success or abort */
apply_wqattrs_cleanup(struct apply_wqattrs_ctx * ctx)3938 static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
3939 {
3940 if (ctx) {
3941 int node;
3942
3943 for_each_node(node)
3944 put_pwq_unlocked(ctx->pwq_tbl[node]);
3945 put_pwq_unlocked(ctx->dfl_pwq);
3946
3947 free_workqueue_attrs(ctx->attrs);
3948
3949 kfree(ctx);
3950 }
3951 }
3952
3953 /* allocate the attrs and pwqs for later installation */
3954 static struct apply_wqattrs_ctx *
apply_wqattrs_prepare(struct workqueue_struct * wq,const struct workqueue_attrs * attrs)3955 apply_wqattrs_prepare(struct workqueue_struct *wq,
3956 const struct workqueue_attrs *attrs)
3957 {
3958 struct apply_wqattrs_ctx *ctx;
3959 struct workqueue_attrs *new_attrs, *tmp_attrs;
3960 int node;
3961
3962 lockdep_assert_held(&wq_pool_mutex);
3963
3964 ctx = kzalloc(struct_size(ctx, pwq_tbl, nr_node_ids), GFP_KERNEL);
3965
3966 new_attrs = alloc_workqueue_attrs();
3967 tmp_attrs = alloc_workqueue_attrs();
3968 if (!ctx || !new_attrs || !tmp_attrs)
3969 goto out_free;
3970
3971 /*
3972 * Calculate the attrs of the default pwq.
3973 * If the user configured cpumask doesn't overlap with the
3974 * wq_unbound_cpumask, we fallback to the wq_unbound_cpumask.
3975 */
3976 copy_workqueue_attrs(new_attrs, attrs);
3977 cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_cpumask);
3978 if (unlikely(cpumask_empty(new_attrs->cpumask)))
3979 cpumask_copy(new_attrs->cpumask, wq_unbound_cpumask);
3980
3981 /*
3982 * We may create multiple pwqs with differing cpumasks. Make a
3983 * copy of @new_attrs which will be modified and used to obtain
3984 * pools.
3985 */
3986 copy_workqueue_attrs(tmp_attrs, new_attrs);
3987
3988 /*
3989 * If something goes wrong during CPU up/down, we'll fall back to
3990 * the default pwq covering whole @attrs->cpumask. Always create
3991 * it even if we don't use it immediately.
3992 */
3993 ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
3994 if (!ctx->dfl_pwq)
3995 goto out_free;
3996
3997 for_each_node(node) {
3998 if (wq_calc_node_cpumask(new_attrs, node, -1, tmp_attrs->cpumask)) {
3999 ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
4000 if (!ctx->pwq_tbl[node])
4001 goto out_free;
4002 } else {
4003 ctx->dfl_pwq->refcnt++;
4004 ctx->pwq_tbl[node] = ctx->dfl_pwq;
4005 }
4006 }
4007
4008 /* save the user configured attrs and sanitize it. */
4009 copy_workqueue_attrs(new_attrs, attrs);
4010 cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
4011 ctx->attrs = new_attrs;
4012
4013 ctx->wq = wq;
4014 free_workqueue_attrs(tmp_attrs);
4015 return ctx;
4016
4017 out_free:
4018 free_workqueue_attrs(tmp_attrs);
4019 free_workqueue_attrs(new_attrs);
4020 apply_wqattrs_cleanup(ctx);
4021 return NULL;
4022 }
4023
4024 /* set attrs and install prepared pwqs, @ctx points to old pwqs on return */
apply_wqattrs_commit(struct apply_wqattrs_ctx * ctx)4025 static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
4026 {
4027 int node;
4028
4029 /* all pwqs have been created successfully, let's install'em */
4030 mutex_lock(&ctx->wq->mutex);
4031
4032 copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);
4033
4034 /* save the previous pwq and install the new one */
4035 for_each_node(node)
4036 ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node,
4037 ctx->pwq_tbl[node]);
4038
4039 /* @dfl_pwq might not have been used, ensure it's linked */
4040 link_pwq(ctx->dfl_pwq);
4041 swap(ctx->wq->dfl_pwq, ctx->dfl_pwq);
4042
4043 mutex_unlock(&ctx->wq->mutex);
4044 }
4045
apply_wqattrs_lock(void)4046 static void apply_wqattrs_lock(void)
4047 {
4048 /* CPUs should stay stable across pwq creations and installations */
4049 get_online_cpus();
4050 mutex_lock(&wq_pool_mutex);
4051 }
4052
apply_wqattrs_unlock(void)4053 static void apply_wqattrs_unlock(void)
4054 {
4055 mutex_unlock(&wq_pool_mutex);
4056 put_online_cpus();
4057 }
4058
apply_workqueue_attrs_locked(struct workqueue_struct * wq,const struct workqueue_attrs * attrs)4059 static int apply_workqueue_attrs_locked(struct workqueue_struct *wq,
4060 const struct workqueue_attrs *attrs)
4061 {
4062 struct apply_wqattrs_ctx *ctx;
4063
4064 /* only unbound workqueues can change attributes */
4065 if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
4066 return -EINVAL;
4067
4068 /* creating multiple pwqs breaks ordering guarantee */
4069 if (!list_empty(&wq->pwqs)) {
4070 if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
4071 return -EINVAL;
4072
4073 wq->flags &= ~__WQ_ORDERED;
4074 }
4075
4076 ctx = apply_wqattrs_prepare(wq, attrs);
4077 if (!ctx)
4078 return -ENOMEM;
4079
4080 /* the ctx has been prepared successfully, let's commit it */
4081 apply_wqattrs_commit(ctx);
4082 apply_wqattrs_cleanup(ctx);
4083
4084 return 0;
4085 }
4086
4087 /**
4088 * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
4089 * @wq: the target workqueue
4090 * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
4091 *
4092 * Apply @attrs to an unbound workqueue @wq. Unless disabled, on NUMA
4093 * machines, this function maps a separate pwq to each NUMA node with
4094 * possibles CPUs in @attrs->cpumask so that work items are affine to the
4095 * NUMA node it was issued on. Older pwqs are released as in-flight work
4096 * items finish. Note that a work item which repeatedly requeues itself
4097 * back-to-back will stay on its current pwq.
4098 *
4099 * Performs GFP_KERNEL allocations.
4100 *
4101 * Assumes caller has CPU hotplug read exclusion, i.e. get_online_cpus().
4102 *
4103 * Return: 0 on success and -errno on failure.
4104 */
apply_workqueue_attrs(struct workqueue_struct * wq,const struct workqueue_attrs * attrs)4105 int apply_workqueue_attrs(struct workqueue_struct *wq,
4106 const struct workqueue_attrs *attrs)
4107 {
4108 int ret;
4109
4110 lockdep_assert_cpus_held();
4111
4112 mutex_lock(&wq_pool_mutex);
4113 ret = apply_workqueue_attrs_locked(wq, attrs);
4114 mutex_unlock(&wq_pool_mutex);
4115
4116 return ret;
4117 }
4118
4119 /**
4120 * wq_update_unbound_numa - update NUMA affinity of a wq for CPU hot[un]plug
4121 * @wq: the target workqueue
4122 * @cpu: the CPU coming up or going down
4123 * @online: whether @cpu is coming up or going down
4124 *
4125 * This function is to be called from %CPU_DOWN_PREPARE, %CPU_ONLINE and
4126 * %CPU_DOWN_FAILED. @cpu is being hot[un]plugged, update NUMA affinity of
4127 * @wq accordingly.
4128 *
4129 * If NUMA affinity can't be adjusted due to memory allocation failure, it
4130 * falls back to @wq->dfl_pwq which may not be optimal but is always
4131 * correct.
4132 *
4133 * Note that when the last allowed CPU of a NUMA node goes offline for a
4134 * workqueue with a cpumask spanning multiple nodes, the workers which were
4135 * already executing the work items for the workqueue will lose their CPU
4136 * affinity and may execute on any CPU. This is similar to how per-cpu
4137 * workqueues behave on CPU_DOWN. If a workqueue user wants strict
4138 * affinity, it's the user's responsibility to flush the work item from
4139 * CPU_DOWN_PREPARE.
4140 */
wq_update_unbound_numa(struct workqueue_struct * wq,int cpu,bool online)4141 static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu,
4142 bool online)
4143 {
4144 int node = cpu_to_node(cpu);
4145 int cpu_off = online ? -1 : cpu;
4146 struct pool_workqueue *old_pwq = NULL, *pwq;
4147 struct workqueue_attrs *target_attrs;
4148 cpumask_t *cpumask;
4149
4150 lockdep_assert_held(&wq_pool_mutex);
4151
4152 if (!wq_numa_enabled || !(wq->flags & WQ_UNBOUND) ||
4153 wq->unbound_attrs->no_numa)
4154 return;
4155
4156 /*
4157 * We don't wanna alloc/free wq_attrs for each wq for each CPU.
4158 * Let's use a preallocated one. The following buf is protected by
4159 * CPU hotplug exclusion.
4160 */
4161 target_attrs = wq_update_unbound_numa_attrs_buf;
4162 cpumask = target_attrs->cpumask;
4163
4164 copy_workqueue_attrs(target_attrs, wq->unbound_attrs);
4165 pwq = unbound_pwq_by_node(wq, node);
4166
4167 /*
4168 * Let's determine what needs to be done. If the target cpumask is
4169 * different from the default pwq's, we need to compare it to @pwq's
4170 * and create a new one if they don't match. If the target cpumask
4171 * equals the default pwq's, the default pwq should be used.
4172 */
4173 if (wq_calc_node_cpumask(wq->dfl_pwq->pool->attrs, node, cpu_off, cpumask)) {
4174 if (cpumask_equal(cpumask, pwq->pool->attrs->cpumask))
4175 return;
4176 } else {
4177 goto use_dfl_pwq;
4178 }
4179
4180 /* create a new pwq */
4181 pwq = alloc_unbound_pwq(wq, target_attrs);
4182 if (!pwq) {
4183 pr_warn("workqueue: allocation failed while updating NUMA affinity of \"%s\"\n",
4184 wq->name);
4185 goto use_dfl_pwq;
4186 }
4187
4188 /* Install the new pwq. */
4189 mutex_lock(&wq->mutex);
4190 old_pwq = numa_pwq_tbl_install(wq, node, pwq);
4191 goto out_unlock;
4192
4193 use_dfl_pwq:
4194 mutex_lock(&wq->mutex);
4195 raw_spin_lock_irq(&wq->dfl_pwq->pool->lock);
4196 get_pwq(wq->dfl_pwq);
4197 raw_spin_unlock_irq(&wq->dfl_pwq->pool->lock);
4198 old_pwq = numa_pwq_tbl_install(wq, node, wq->dfl_pwq);
4199 out_unlock:
4200 mutex_unlock(&wq->mutex);
4201 put_pwq_unlocked(old_pwq);
4202 }
4203
alloc_and_link_pwqs(struct workqueue_struct * wq)4204 static int alloc_and_link_pwqs(struct workqueue_struct *wq)
4205 {
4206 bool highpri = wq->flags & WQ_HIGHPRI;
4207 int cpu, ret;
4208
4209 if (!(wq->flags & WQ_UNBOUND)) {
4210 wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
4211 if (!wq->cpu_pwqs)
4212 return -ENOMEM;
4213
4214 for_each_possible_cpu(cpu) {
4215 struct pool_workqueue *pwq =
4216 per_cpu_ptr(wq->cpu_pwqs, cpu);
4217 struct worker_pool *cpu_pools =
4218 per_cpu(cpu_worker_pools, cpu);
4219
4220 init_pwq(pwq, wq, &cpu_pools[highpri]);
4221
4222 mutex_lock(&wq->mutex);
4223 link_pwq(pwq);
4224 mutex_unlock(&wq->mutex);
4225 }
4226 return 0;
4227 }
4228
4229 get_online_cpus();
4230 if (wq->flags & __WQ_ORDERED) {
4231 ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
4232 /* there should only be single pwq for ordering guarantee */
4233 WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node ||
4234 wq->pwqs.prev != &wq->dfl_pwq->pwqs_node),
4235 "ordering guarantee broken for workqueue %s\n", wq->name);
4236 } else {
4237 ret = apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
4238 }
4239 put_online_cpus();
4240
4241 return ret;
4242 }
4243
wq_clamp_max_active(int max_active,unsigned int flags,const char * name)4244 static int wq_clamp_max_active(int max_active, unsigned int flags,
4245 const char *name)
4246 {
4247 int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE;
4248
4249 if (max_active < 1 || max_active > lim)
4250 pr_warn("workqueue: max_active %d requested for %s is out of range, clamping between %d and %d\n",
4251 max_active, name, 1, lim);
4252
4253 return clamp_val(max_active, 1, lim);
4254 }
4255
4256 /*
4257 * Workqueues which may be used during memory reclaim should have a rescuer
4258 * to guarantee forward progress.
4259 */
init_rescuer(struct workqueue_struct * wq)4260 static int init_rescuer(struct workqueue_struct *wq)
4261 {
4262 struct worker *rescuer;
4263 int ret;
4264
4265 if (!(wq->flags & WQ_MEM_RECLAIM))
4266 return 0;
4267
4268 rescuer = alloc_worker(NUMA_NO_NODE);
4269 if (!rescuer)
4270 return -ENOMEM;
4271
4272 rescuer->rescue_wq = wq;
4273 rescuer->task = kthread_create(rescuer_thread, rescuer, "%s", wq->name);
4274 if (IS_ERR(rescuer->task)) {
4275 ret = PTR_ERR(rescuer->task);
4276 kfree(rescuer);
4277 return ret;
4278 }
4279
4280 wq->rescuer = rescuer;
4281 kthread_bind_mask(rescuer->task, cpu_possible_mask);
4282 wake_up_process(rescuer->task);
4283
4284 return 0;
4285 }
4286
4287 __printf(1, 4)
alloc_workqueue(const char * fmt,unsigned int flags,int max_active,...)4288 struct workqueue_struct *alloc_workqueue(const char *fmt,
4289 unsigned int flags,
4290 int max_active, ...)
4291 {
4292 size_t tbl_size = 0;
4293 va_list args;
4294 struct workqueue_struct *wq;
4295 struct pool_workqueue *pwq;
4296
4297 /*
4298 * Unbound && max_active == 1 used to imply ordered, which is no
4299 * longer the case on NUMA machines due to per-node pools. While
4300 * alloc_ordered_workqueue() is the right way to create an ordered
4301 * workqueue, keep the previous behavior to avoid subtle breakages
4302 * on NUMA.
4303 */
4304 if ((flags & WQ_UNBOUND) && max_active == 1)
4305 flags |= __WQ_ORDERED;
4306
4307 /* see the comment above the definition of WQ_POWER_EFFICIENT */
4308 if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)
4309 flags |= WQ_UNBOUND;
4310
4311 /* allocate wq and format name */
4312 if (flags & WQ_UNBOUND)
4313 tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);
4314
4315 wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
4316 if (!wq)
4317 return NULL;
4318
4319 if (flags & WQ_UNBOUND) {
4320 wq->unbound_attrs = alloc_workqueue_attrs();
4321 if (!wq->unbound_attrs)
4322 goto err_free_wq;
4323 }
4324
4325 va_start(args, max_active);
4326 vsnprintf(wq->name, sizeof(wq->name), fmt, args);
4327 va_end(args);
4328
4329 max_active = max_active ?: WQ_DFL_ACTIVE;
4330 max_active = wq_clamp_max_active(max_active, flags, wq->name);
4331
4332 /* init wq */
4333 wq->flags = flags;
4334 wq->saved_max_active = max_active;
4335 mutex_init(&wq->mutex);
4336 atomic_set(&wq->nr_pwqs_to_flush, 0);
4337 INIT_LIST_HEAD(&wq->pwqs);
4338 INIT_LIST_HEAD(&wq->flusher_queue);
4339 INIT_LIST_HEAD(&wq->flusher_overflow);
4340 INIT_LIST_HEAD(&wq->maydays);
4341
4342 wq_init_lockdep(wq);
4343 INIT_LIST_HEAD(&wq->list);
4344
4345 if (alloc_and_link_pwqs(wq) < 0)
4346 goto err_unreg_lockdep;
4347
4348 if (wq_online && init_rescuer(wq) < 0)
4349 goto err_destroy;
4350
4351 if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq))
4352 goto err_destroy;
4353
4354 /*
4355 * wq_pool_mutex protects global freeze state and workqueues list.
4356 * Grab it, adjust max_active and add the new @wq to workqueues
4357 * list.
4358 */
4359 mutex_lock(&wq_pool_mutex);
4360
4361 mutex_lock(&wq->mutex);
4362 for_each_pwq(pwq, wq)
4363 pwq_adjust_max_active(pwq);
4364 mutex_unlock(&wq->mutex);
4365
4366 list_add_tail_rcu(&wq->list, &workqueues);
4367
4368 mutex_unlock(&wq_pool_mutex);
4369
4370 return wq;
4371
4372 err_unreg_lockdep:
4373 wq_unregister_lockdep(wq);
4374 wq_free_lockdep(wq);
4375 err_free_wq:
4376 free_workqueue_attrs(wq->unbound_attrs);
4377 kfree(wq);
4378 return NULL;
4379 err_destroy:
4380 destroy_workqueue(wq);
4381 return NULL;
4382 }
4383 EXPORT_SYMBOL_GPL(alloc_workqueue);
4384
pwq_busy(struct pool_workqueue * pwq)4385 static bool pwq_busy(struct pool_workqueue *pwq)
4386 {
4387 int i;
4388
4389 for (i = 0; i < WORK_NR_COLORS; i++)
4390 if (pwq->nr_in_flight[i])
4391 return true;
4392
4393 if ((pwq != pwq->wq->dfl_pwq) && (pwq->refcnt > 1))
4394 return true;
4395 if (pwq->nr_active || !list_empty(&pwq->delayed_works))
4396 return true;
4397
4398 return false;
4399 }
4400
4401 /**
4402 * destroy_workqueue - safely terminate a workqueue
4403 * @wq: target workqueue
4404 *
4405 * Safely destroy a workqueue. All work currently pending will be done first.
4406 */
destroy_workqueue(struct workqueue_struct * wq)4407 void destroy_workqueue(struct workqueue_struct *wq)
4408 {
4409 struct pool_workqueue *pwq;
4410 int node;
4411
4412 /*
4413 * Remove it from sysfs first so that sanity check failure doesn't
4414 * lead to sysfs name conflicts.
4415 */
4416 workqueue_sysfs_unregister(wq);
4417
4418 /* drain it before proceeding with destruction */
4419 drain_workqueue(wq);
4420
4421 /* kill rescuer, if sanity checks fail, leave it w/o rescuer */
4422 if (wq->rescuer) {
4423 struct worker *rescuer = wq->rescuer;
4424
4425 /* this prevents new queueing */
4426 raw_spin_lock_irq(&wq_mayday_lock);
4427 wq->rescuer = NULL;
4428 raw_spin_unlock_irq(&wq_mayday_lock);
4429
4430 /* rescuer will empty maydays list before exiting */
4431 kthread_stop(rescuer->task);
4432 kfree(rescuer);
4433 }
4434
4435 /*
4436 * Sanity checks - grab all the locks so that we wait for all
4437 * in-flight operations which may do put_pwq().
4438 */
4439 mutex_lock(&wq_pool_mutex);
4440 mutex_lock(&wq->mutex);
4441 for_each_pwq(pwq, wq) {
4442 raw_spin_lock_irq(&pwq->pool->lock);
4443 if (WARN_ON(pwq_busy(pwq))) {
4444 pr_warn("%s: %s has the following busy pwq\n",
4445 __func__, wq->name);
4446 show_pwq(pwq);
4447 raw_spin_unlock_irq(&pwq->pool->lock);
4448 mutex_unlock(&wq->mutex);
4449 mutex_unlock(&wq_pool_mutex);
4450 show_workqueue_state();
4451 return;
4452 }
4453 raw_spin_unlock_irq(&pwq->pool->lock);
4454 }
4455 mutex_unlock(&wq->mutex);
4456
4457 /*
4458 * wq list is used to freeze wq, remove from list after
4459 * flushing is complete in case freeze races us.
4460 */
4461 list_del_rcu(&wq->list);
4462 mutex_unlock(&wq_pool_mutex);
4463
4464 if (!(wq->flags & WQ_UNBOUND)) {
4465 wq_unregister_lockdep(wq);
4466 /*
4467 * The base ref is never dropped on per-cpu pwqs. Directly
4468 * schedule RCU free.
4469 */
4470 call_rcu(&wq->rcu, rcu_free_wq);
4471 } else {
4472 /*
4473 * We're the sole accessor of @wq at this point. Directly
4474 * access numa_pwq_tbl[] and dfl_pwq to put the base refs.
4475 * @wq will be freed when the last pwq is released.
4476 */
4477 for_each_node(node) {
4478 pwq = rcu_access_pointer(wq->numa_pwq_tbl[node]);
4479 RCU_INIT_POINTER(wq->numa_pwq_tbl[node], NULL);
4480 put_pwq_unlocked(pwq);
4481 }
4482
4483 /*
4484 * Put dfl_pwq. @wq may be freed any time after dfl_pwq is
4485 * put. Don't access it afterwards.
4486 */
4487 pwq = wq->dfl_pwq;
4488 wq->dfl_pwq = NULL;
4489 put_pwq_unlocked(pwq);
4490 }
4491 }
4492 EXPORT_SYMBOL_GPL(destroy_workqueue);
4493
4494 /**
4495 * workqueue_set_max_active - adjust max_active of a workqueue
4496 * @wq: target workqueue
4497 * @max_active: new max_active value.
4498 *
4499 * Set max_active of @wq to @max_active.
4500 *
4501 * CONTEXT:
4502 * Don't call from IRQ context.
4503 */
workqueue_set_max_active(struct workqueue_struct * wq,int max_active)4504 void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
4505 {
4506 struct pool_workqueue *pwq;
4507
4508 /* disallow meddling with max_active for ordered workqueues */
4509 if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
4510 return;
4511
4512 max_active = wq_clamp_max_active(max_active, wq->flags, wq->name);
4513
4514 mutex_lock(&wq->mutex);
4515
4516 wq->flags &= ~__WQ_ORDERED;
4517 wq->saved_max_active = max_active;
4518
4519 for_each_pwq(pwq, wq)
4520 pwq_adjust_max_active(pwq);
4521
4522 mutex_unlock(&wq->mutex);
4523 }
4524 EXPORT_SYMBOL_GPL(workqueue_set_max_active);
4525
4526 /**
4527 * current_work - retrieve %current task's work struct
4528 *
4529 * Determine if %current task is a workqueue worker and what it's working on.
4530 * Useful to find out the context that the %current task is running in.
4531 *
4532 * Return: work struct if %current task is a workqueue worker, %NULL otherwise.
4533 */
current_work(void)4534 struct work_struct *current_work(void)
4535 {
4536 struct worker *worker = current_wq_worker();
4537
4538 return worker ? worker->current_work : NULL;
4539 }
4540 EXPORT_SYMBOL(current_work);
4541
4542 /**
4543 * current_is_workqueue_rescuer - is %current workqueue rescuer?
4544 *
4545 * Determine whether %current is a workqueue rescuer. Can be used from
4546 * work functions to determine whether it's being run off the rescuer task.
4547 *
4548 * Return: %true if %current is a workqueue rescuer. %false otherwise.
4549 */
current_is_workqueue_rescuer(void)4550 bool current_is_workqueue_rescuer(void)
4551 {
4552 struct worker *worker = current_wq_worker();
4553
4554 return worker && worker->rescue_wq;
4555 }
4556
4557 /**
4558 * workqueue_congested - test whether a workqueue is congested
4559 * @cpu: CPU in question
4560 * @wq: target workqueue
4561 *
4562 * Test whether @wq's cpu workqueue for @cpu is congested. There is
4563 * no synchronization around this function and the test result is
4564 * unreliable and only useful as advisory hints or for debugging.
4565 *
4566 * If @cpu is WORK_CPU_UNBOUND, the test is performed on the local CPU.
4567 * Note that both per-cpu and unbound workqueues may be associated with
4568 * multiple pool_workqueues which have separate congested states. A
4569 * workqueue being congested on one CPU doesn't mean the workqueue is also
4570 * contested on other CPUs / NUMA nodes.
4571 *
4572 * Return:
4573 * %true if congested, %false otherwise.
4574 */
workqueue_congested(int cpu,struct workqueue_struct * wq)4575 bool workqueue_congested(int cpu, struct workqueue_struct *wq)
4576 {
4577 struct pool_workqueue *pwq;
4578 bool ret;
4579
4580 rcu_read_lock();
4581 preempt_disable();
4582
4583 if (cpu == WORK_CPU_UNBOUND)
4584 cpu = smp_processor_id();
4585
4586 if (!(wq->flags & WQ_UNBOUND))
4587 pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
4588 else
4589 pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
4590
4591 ret = !list_empty(&pwq->delayed_works);
4592 preempt_enable();
4593 rcu_read_unlock();
4594
4595 return ret;
4596 }
4597 EXPORT_SYMBOL_GPL(workqueue_congested);
4598
4599 /**
4600 * work_busy - test whether a work is currently pending or running
4601 * @work: the work to be tested
4602 *
4603 * Test whether @work is currently pending or running. There is no
4604 * synchronization around this function and the test result is
4605 * unreliable and only useful as advisory hints or for debugging.
4606 *
4607 * Return:
4608 * OR'd bitmask of WORK_BUSY_* bits.
4609 */
work_busy(struct work_struct * work)4610 unsigned int work_busy(struct work_struct *work)
4611 {
4612 struct worker_pool *pool;
4613 unsigned long flags;
4614 unsigned int ret = 0;
4615
4616 if (work_pending(work))
4617 ret |= WORK_BUSY_PENDING;
4618
4619 rcu_read_lock();
4620 pool = get_work_pool(work);
4621 if (pool) {
4622 raw_spin_lock_irqsave(&pool->lock, flags);
4623 if (find_worker_executing_work(pool, work))
4624 ret |= WORK_BUSY_RUNNING;
4625 raw_spin_unlock_irqrestore(&pool->lock, flags);
4626 }
4627 rcu_read_unlock();
4628
4629 return ret;
4630 }
4631 EXPORT_SYMBOL_GPL(work_busy);
4632
4633 /**
4634 * set_worker_desc - set description for the current work item
4635 * @fmt: printf-style format string
4636 * @...: arguments for the format string
4637 *
4638 * This function can be called by a running work function to describe what
4639 * the work item is about. If the worker task gets dumped, this
4640 * information will be printed out together to help debugging. The
4641 * description can be at most WORKER_DESC_LEN including the trailing '\0'.
4642 */
set_worker_desc(const char * fmt,...)4643 void set_worker_desc(const char *fmt, ...)
4644 {
4645 struct worker *worker = current_wq_worker();
4646 va_list args;
4647
4648 if (worker) {
4649 va_start(args, fmt);
4650 vsnprintf(worker->desc, sizeof(worker->desc), fmt, args);
4651 va_end(args);
4652 }
4653 }
4654 EXPORT_SYMBOL_GPL(set_worker_desc);
4655
4656 /**
4657 * print_worker_info - print out worker information and description
4658 * @log_lvl: the log level to use when printing
4659 * @task: target task
4660 *
4661 * If @task is a worker and currently executing a work item, print out the
4662 * name of the workqueue being serviced and worker description set with
4663 * set_worker_desc() by the currently executing work item.
4664 *
4665 * This function can be safely called on any task as long as the
4666 * task_struct itself is accessible. While safe, this function isn't
4667 * synchronized and may print out mixups or garbages of limited length.
4668 */
print_worker_info(const char * log_lvl,struct task_struct * task)4669 void print_worker_info(const char *log_lvl, struct task_struct *task)
4670 {
4671 work_func_t *fn = NULL;
4672 char name[WQ_NAME_LEN] = { };
4673 char desc[WORKER_DESC_LEN] = { };
4674 struct pool_workqueue *pwq = NULL;
4675 struct workqueue_struct *wq = NULL;
4676 struct worker *worker;
4677
4678 if (!(task->flags & PF_WQ_WORKER))
4679 return;
4680
4681 /*
4682 * This function is called without any synchronization and @task
4683 * could be in any state. Be careful with dereferences.
4684 */
4685 worker = kthread_probe_data(task);
4686
4687 /*
4688 * Carefully copy the associated workqueue's workfn, name and desc.
4689 * Keep the original last '\0' in case the original is garbage.
4690 */
4691 copy_from_kernel_nofault(&fn, &worker->current_func, sizeof(fn));
4692 copy_from_kernel_nofault(&pwq, &worker->current_pwq, sizeof(pwq));
4693 copy_from_kernel_nofault(&wq, &pwq->wq, sizeof(wq));
4694 copy_from_kernel_nofault(name, wq->name, sizeof(name) - 1);
4695 copy_from_kernel_nofault(desc, worker->desc, sizeof(desc) - 1);
4696
4697 if (fn || name[0] || desc[0]) {
4698 printk("%sWorkqueue: %s %ps", log_lvl, name, fn);
4699 if (strcmp(name, desc))
4700 pr_cont(" (%s)", desc);
4701 pr_cont("\n");
4702 }
4703 }
4704
pr_cont_pool_info(struct worker_pool * pool)4705 static void pr_cont_pool_info(struct worker_pool *pool)
4706 {
4707 pr_cont(" cpus=%*pbl", nr_cpumask_bits, pool->attrs->cpumask);
4708 if (pool->node != NUMA_NO_NODE)
4709 pr_cont(" node=%d", pool->node);
4710 pr_cont(" flags=0x%x nice=%d", pool->flags, pool->attrs->nice);
4711 }
4712
pr_cont_work(bool comma,struct work_struct * work)4713 static void pr_cont_work(bool comma, struct work_struct *work)
4714 {
4715 if (work->func == wq_barrier_func) {
4716 struct wq_barrier *barr;
4717
4718 barr = container_of(work, struct wq_barrier, work);
4719
4720 pr_cont("%s BAR(%d)", comma ? "," : "",
4721 task_pid_nr(barr->task));
4722 } else {
4723 pr_cont("%s %ps", comma ? "," : "", work->func);
4724 }
4725 }
4726
show_pwq(struct pool_workqueue * pwq)4727 static void show_pwq(struct pool_workqueue *pwq)
4728 {
4729 struct worker_pool *pool = pwq->pool;
4730 struct work_struct *work;
4731 struct worker *worker;
4732 bool has_in_flight = false, has_pending = false;
4733 int bkt;
4734
4735 pr_info(" pwq %d:", pool->id);
4736 pr_cont_pool_info(pool);
4737
4738 pr_cont(" active=%d/%d refcnt=%d%s\n",
4739 pwq->nr_active, pwq->max_active, pwq->refcnt,
4740 !list_empty(&pwq->mayday_node) ? " MAYDAY" : "");
4741
4742 hash_for_each(pool->busy_hash, bkt, worker, hentry) {
4743 if (worker->current_pwq == pwq) {
4744 has_in_flight = true;
4745 break;
4746 }
4747 }
4748 if (has_in_flight) {
4749 bool comma = false;
4750
4751 pr_info(" in-flight:");
4752 hash_for_each(pool->busy_hash, bkt, worker, hentry) {
4753 if (worker->current_pwq != pwq)
4754 continue;
4755
4756 pr_cont("%s %d%s:%ps", comma ? "," : "",
4757 task_pid_nr(worker->task),
4758 worker->rescue_wq ? "(RESCUER)" : "",
4759 worker->current_func);
4760 list_for_each_entry(work, &worker->scheduled, entry)
4761 pr_cont_work(false, work);
4762 comma = true;
4763 }
4764 pr_cont("\n");
4765 }
4766
4767 list_for_each_entry(work, &pool->worklist, entry) {
4768 if (get_work_pwq(work) == pwq) {
4769 has_pending = true;
4770 break;
4771 }
4772 }
4773 if (has_pending) {
4774 bool comma = false;
4775
4776 pr_info(" pending:");
4777 list_for_each_entry(work, &pool->worklist, entry) {
4778 if (get_work_pwq(work) != pwq)
4779 continue;
4780
4781 pr_cont_work(comma, work);
4782 comma = !(*work_data_bits(work) & WORK_STRUCT_LINKED);
4783 }
4784 pr_cont("\n");
4785 }
4786
4787 if (!list_empty(&pwq->delayed_works)) {
4788 bool comma = false;
4789
4790 pr_info(" delayed:");
4791 list_for_each_entry(work, &pwq->delayed_works, entry) {
4792 pr_cont_work(comma, work);
4793 comma = !(*work_data_bits(work) & WORK_STRUCT_LINKED);
4794 }
4795 pr_cont("\n");
4796 }
4797 }
4798
4799 /**
4800 * show_workqueue_state - dump workqueue state
4801 *
4802 * Called from a sysrq handler or try_to_freeze_tasks() and prints out
4803 * all busy workqueues and pools.
4804 */
show_workqueue_state(void)4805 void show_workqueue_state(void)
4806 {
4807 struct workqueue_struct *wq;
4808 struct worker_pool *pool;
4809 unsigned long flags;
4810 int pi;
4811
4812 rcu_read_lock();
4813
4814 pr_info("Showing busy workqueues and worker pools:\n");
4815
4816 list_for_each_entry_rcu(wq, &workqueues, list) {
4817 struct pool_workqueue *pwq;
4818 bool idle = true;
4819
4820 for_each_pwq(pwq, wq) {
4821 if (pwq->nr_active || !list_empty(&pwq->delayed_works)) {
4822 idle = false;
4823 break;
4824 }
4825 }
4826 if (idle)
4827 continue;
4828
4829 pr_info("workqueue %s: flags=0x%x\n", wq->name, wq->flags);
4830
4831 for_each_pwq(pwq, wq) {
4832 raw_spin_lock_irqsave(&pwq->pool->lock, flags);
4833 if (pwq->nr_active || !list_empty(&pwq->delayed_works))
4834 show_pwq(pwq);
4835 raw_spin_unlock_irqrestore(&pwq->pool->lock, flags);
4836 /*
4837 * We could be printing a lot from atomic context, e.g.
4838 * sysrq-t -> show_workqueue_state(). Avoid triggering
4839 * hard lockup.
4840 */
4841 touch_nmi_watchdog();
4842 }
4843 }
4844
4845 for_each_pool(pool, pi) {
4846 struct worker *worker;
4847 bool first = true;
4848
4849 raw_spin_lock_irqsave(&pool->lock, flags);
4850 if (pool->nr_workers == pool->nr_idle)
4851 goto next_pool;
4852
4853 pr_info("pool %d:", pool->id);
4854 pr_cont_pool_info(pool);
4855 pr_cont(" hung=%us workers=%d",
4856 jiffies_to_msecs(jiffies - pool->watchdog_ts) / 1000,
4857 pool->nr_workers);
4858 if (pool->manager)
4859 pr_cont(" manager: %d",
4860 task_pid_nr(pool->manager->task));
4861 list_for_each_entry(worker, &pool->idle_list, entry) {
4862 pr_cont(" %s%d", first ? "idle: " : "",
4863 task_pid_nr(worker->task));
4864 first = false;
4865 }
4866 pr_cont("\n");
4867 next_pool:
4868 raw_spin_unlock_irqrestore(&pool->lock, flags);
4869 /*
4870 * We could be printing a lot from atomic context, e.g.
4871 * sysrq-t -> show_workqueue_state(). Avoid triggering
4872 * hard lockup.
4873 */
4874 touch_nmi_watchdog();
4875 }
4876
4877 rcu_read_unlock();
4878 }
4879
4880 /* used to show worker information through /proc/PID/{comm,stat,status} */
wq_worker_comm(char * buf,size_t size,struct task_struct * task)4881 void wq_worker_comm(char *buf, size_t size, struct task_struct *task)
4882 {
4883 int off;
4884
4885 /* always show the actual comm */
4886 off = strscpy(buf, task->comm, size);
4887 if (off < 0)
4888 return;
4889
4890 /* stabilize PF_WQ_WORKER and worker pool association */
4891 mutex_lock(&wq_pool_attach_mutex);
4892
4893 if (task->flags & PF_WQ_WORKER) {
4894 struct worker *worker = kthread_data(task);
4895 struct worker_pool *pool = worker->pool;
4896
4897 if (pool) {
4898 raw_spin_lock_irq(&pool->lock);
4899 /*
4900 * ->desc tracks information (wq name or
4901 * set_worker_desc()) for the latest execution. If
4902 * current, prepend '+', otherwise '-'.
4903 */
4904 if (worker->desc[0] != '\0') {
4905 if (worker->current_work)
4906 scnprintf(buf + off, size - off, "+%s",
4907 worker->desc);
4908 else
4909 scnprintf(buf + off, size - off, "-%s",
4910 worker->desc);
4911 }
4912 raw_spin_unlock_irq(&pool->lock);
4913 }
4914 }
4915
4916 mutex_unlock(&wq_pool_attach_mutex);
4917 }
4918 EXPORT_SYMBOL_GPL(wq_worker_comm);
4919
4920 #ifdef CONFIG_SMP
4921
4922 /*
4923 * CPU hotplug.
4924 *
4925 * There are two challenges in supporting CPU hotplug. Firstly, there
4926 * are a lot of assumptions on strong associations among work, pwq and
4927 * pool which make migrating pending and scheduled works very
4928 * difficult to implement without impacting hot paths. Secondly,
4929 * worker pools serve mix of short, long and very long running works making
4930 * blocked draining impractical.
4931 *
4932 * This is solved by allowing the pools to be disassociated from the CPU
4933 * running as an unbound one and allowing it to be reattached later if the
4934 * cpu comes back online.
4935 */
4936
unbind_workers(int cpu)4937 static void unbind_workers(int cpu)
4938 {
4939 struct worker_pool *pool;
4940 struct worker *worker;
4941
4942 for_each_cpu_worker_pool(pool, cpu) {
4943 mutex_lock(&wq_pool_attach_mutex);
4944 raw_spin_lock_irq(&pool->lock);
4945
4946 /*
4947 * We've blocked all attach/detach operations. Make all workers
4948 * unbound and set DISASSOCIATED. Before this, all workers
4949 * except for the ones which are still executing works from
4950 * before the last CPU down must be on the cpu. After
4951 * this, they may become diasporas.
4952 */
4953 for_each_pool_worker(worker, pool)
4954 worker->flags |= WORKER_UNBOUND;
4955
4956 pool->flags |= POOL_DISASSOCIATED;
4957
4958 raw_spin_unlock_irq(&pool->lock);
4959 mutex_unlock(&wq_pool_attach_mutex);
4960
4961 /*
4962 * Call schedule() so that we cross rq->lock and thus can
4963 * guarantee sched callbacks see the %WORKER_UNBOUND flag.
4964 * This is necessary as scheduler callbacks may be invoked
4965 * from other cpus.
4966 */
4967 schedule();
4968
4969 /*
4970 * Sched callbacks are disabled now. Zap nr_running.
4971 * After this, nr_running stays zero and need_more_worker()
4972 * and keep_working() are always true as long as the
4973 * worklist is not empty. This pool now behaves as an
4974 * unbound (in terms of concurrency management) pool which
4975 * are served by workers tied to the pool.
4976 */
4977 atomic_set(&pool->nr_running, 0);
4978
4979 /*
4980 * With concurrency management just turned off, a busy
4981 * worker blocking could lead to lengthy stalls. Kick off
4982 * unbound chain execution of currently pending work items.
4983 */
4984 raw_spin_lock_irq(&pool->lock);
4985 wake_up_worker(pool);
4986 raw_spin_unlock_irq(&pool->lock);
4987 }
4988 }
4989
4990 /**
4991 * rebind_workers - rebind all workers of a pool to the associated CPU
4992 * @pool: pool of interest
4993 *
4994 * @pool->cpu is coming online. Rebind all workers to the CPU.
4995 */
rebind_workers(struct worker_pool * pool)4996 static void rebind_workers(struct worker_pool *pool)
4997 {
4998 struct worker *worker;
4999
5000 lockdep_assert_held(&wq_pool_attach_mutex);
5001
5002 /*
5003 * Restore CPU affinity of all workers. As all idle workers should
5004 * be on the run-queue of the associated CPU before any local
5005 * wake-ups for concurrency management happen, restore CPU affinity
5006 * of all workers first and then clear UNBOUND. As we're called
5007 * from CPU_ONLINE, the following shouldn't fail.
5008 */
5009 for_each_pool_worker(worker, pool)
5010 WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task,
5011 pool->attrs->cpumask) < 0);
5012
5013 raw_spin_lock_irq(&pool->lock);
5014
5015 pool->flags &= ~POOL_DISASSOCIATED;
5016
5017 for_each_pool_worker(worker, pool) {
5018 unsigned int worker_flags = worker->flags;
5019
5020 /*
5021 * A bound idle worker should actually be on the runqueue
5022 * of the associated CPU for local wake-ups targeting it to
5023 * work. Kick all idle workers so that they migrate to the
5024 * associated CPU. Doing this in the same loop as
5025 * replacing UNBOUND with REBOUND is safe as no worker will
5026 * be bound before @pool->lock is released.
5027 */
5028 if (worker_flags & WORKER_IDLE)
5029 wake_up_process(worker->task);
5030
5031 /*
5032 * We want to clear UNBOUND but can't directly call
5033 * worker_clr_flags() or adjust nr_running. Atomically
5034 * replace UNBOUND with another NOT_RUNNING flag REBOUND.
5035 * @worker will clear REBOUND using worker_clr_flags() when
5036 * it initiates the next execution cycle thus restoring
5037 * concurrency management. Note that when or whether
5038 * @worker clears REBOUND doesn't affect correctness.
5039 *
5040 * WRITE_ONCE() is necessary because @worker->flags may be
5041 * tested without holding any lock in
5042 * wq_worker_running(). Without it, NOT_RUNNING test may
5043 * fail incorrectly leading to premature concurrency
5044 * management operations.
5045 */
5046 WARN_ON_ONCE(!(worker_flags & WORKER_UNBOUND));
5047 worker_flags |= WORKER_REBOUND;
5048 worker_flags &= ~WORKER_UNBOUND;
5049 WRITE_ONCE(worker->flags, worker_flags);
5050 }
5051
5052 raw_spin_unlock_irq(&pool->lock);
5053 }
5054
5055 /**
5056 * restore_unbound_workers_cpumask - restore cpumask of unbound workers
5057 * @pool: unbound pool of interest
5058 * @cpu: the CPU which is coming up
5059 *
5060 * An unbound pool may end up with a cpumask which doesn't have any online
5061 * CPUs. When a worker of such pool get scheduled, the scheduler resets
5062 * its cpus_allowed. If @cpu is in @pool's cpumask which didn't have any
5063 * online CPU before, cpus_allowed of all its workers should be restored.
5064 */
restore_unbound_workers_cpumask(struct worker_pool * pool,int cpu)5065 static void restore_unbound_workers_cpumask(struct worker_pool *pool, int cpu)
5066 {
5067 static cpumask_t cpumask;
5068 struct worker *worker;
5069
5070 lockdep_assert_held(&wq_pool_attach_mutex);
5071
5072 /* is @cpu allowed for @pool? */
5073 if (!cpumask_test_cpu(cpu, pool->attrs->cpumask))
5074 return;
5075
5076 cpumask_and(&cpumask, pool->attrs->cpumask, cpu_online_mask);
5077
5078 /* as we're called from CPU_ONLINE, the following shouldn't fail */
5079 for_each_pool_worker(worker, pool)
5080 WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task, &cpumask) < 0);
5081 }
5082
workqueue_prepare_cpu(unsigned int cpu)5083 int workqueue_prepare_cpu(unsigned int cpu)
5084 {
5085 struct worker_pool *pool;
5086
5087 for_each_cpu_worker_pool(pool, cpu) {
5088 if (pool->nr_workers)
5089 continue;
5090 if (!create_worker(pool))
5091 return -ENOMEM;
5092 }
5093 return 0;
5094 }
5095
workqueue_online_cpu(unsigned int cpu)5096 int workqueue_online_cpu(unsigned int cpu)
5097 {
5098 struct worker_pool *pool;
5099 struct workqueue_struct *wq;
5100 int pi;
5101
5102 mutex_lock(&wq_pool_mutex);
5103
5104 for_each_pool(pool, pi) {
5105 mutex_lock(&wq_pool_attach_mutex);
5106
5107 if (pool->cpu == cpu)
5108 rebind_workers(pool);
5109 else if (pool->cpu < 0)
5110 restore_unbound_workers_cpumask(pool, cpu);
5111
5112 mutex_unlock(&wq_pool_attach_mutex);
5113 }
5114
5115 /* update NUMA affinity of unbound workqueues */
5116 list_for_each_entry(wq, &workqueues, list)
5117 wq_update_unbound_numa(wq, cpu, true);
5118
5119 mutex_unlock(&wq_pool_mutex);
5120 return 0;
5121 }
5122
workqueue_offline_cpu(unsigned int cpu)5123 int workqueue_offline_cpu(unsigned int cpu)
5124 {
5125 struct workqueue_struct *wq;
5126
5127 /* unbinding per-cpu workers should happen on the local CPU */
5128 if (WARN_ON(cpu != smp_processor_id()))
5129 return -1;
5130
5131 unbind_workers(cpu);
5132
5133 /* update NUMA affinity of unbound workqueues */
5134 mutex_lock(&wq_pool_mutex);
5135 list_for_each_entry(wq, &workqueues, list)
5136 wq_update_unbound_numa(wq, cpu, false);
5137 mutex_unlock(&wq_pool_mutex);
5138
5139 return 0;
5140 }
5141
5142 struct work_for_cpu {
5143 struct work_struct work;
5144 long (*fn)(void *);
5145 void *arg;
5146 long ret;
5147 };
5148
work_for_cpu_fn(struct work_struct * work)5149 static void work_for_cpu_fn(struct work_struct *work)
5150 {
5151 struct work_for_cpu *wfc = container_of(work, struct work_for_cpu, work);
5152
5153 wfc->ret = wfc->fn(wfc->arg);
5154 }
5155
5156 /**
5157 * work_on_cpu - run a function in thread context on a particular cpu
5158 * @cpu: the cpu to run on
5159 * @fn: the function to run
5160 * @arg: the function arg
5161 *
5162 * It is up to the caller to ensure that the cpu doesn't go offline.
5163 * The caller must not hold any locks which would prevent @fn from completing.
5164 *
5165 * Return: The value @fn returns.
5166 */
work_on_cpu(int cpu,long (* fn)(void *),void * arg)5167 long work_on_cpu(int cpu, long (*fn)(void *), void *arg)
5168 {
5169 struct work_for_cpu wfc = { .fn = fn, .arg = arg };
5170
5171 INIT_WORK_ONSTACK(&wfc.work, work_for_cpu_fn);
5172 schedule_work_on(cpu, &wfc.work);
5173 flush_work(&wfc.work);
5174 destroy_work_on_stack(&wfc.work);
5175 return wfc.ret;
5176 }
5177 EXPORT_SYMBOL_GPL(work_on_cpu);
5178
5179 /**
5180 * work_on_cpu_safe - run a function in thread context on a particular cpu
5181 * @cpu: the cpu to run on
5182 * @fn: the function to run
5183 * @arg: the function argument
5184 *
5185 * Disables CPU hotplug and calls work_on_cpu(). The caller must not hold
5186 * any locks which would prevent @fn from completing.
5187 *
5188 * Return: The value @fn returns.
5189 */
work_on_cpu_safe(int cpu,long (* fn)(void *),void * arg)5190 long work_on_cpu_safe(int cpu, long (*fn)(void *), void *arg)
5191 {
5192 long ret = -ENODEV;
5193
5194 get_online_cpus();
5195 if (cpu_online(cpu))
5196 ret = work_on_cpu(cpu, fn, arg);
5197 put_online_cpus();
5198 return ret;
5199 }
5200 EXPORT_SYMBOL_GPL(work_on_cpu_safe);
5201 #endif /* CONFIG_SMP */
5202
5203 #ifdef CONFIG_FREEZER
5204
5205 /**
5206 * freeze_workqueues_begin - begin freezing workqueues
5207 *
5208 * Start freezing workqueues. After this function returns, all freezable
5209 * workqueues will queue new works to their delayed_works list instead of
5210 * pool->worklist.
5211 *
5212 * CONTEXT:
5213 * Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's.
5214 */
freeze_workqueues_begin(void)5215 void freeze_workqueues_begin(void)
5216 {
5217 struct workqueue_struct *wq;
5218 struct pool_workqueue *pwq;
5219
5220 mutex_lock(&wq_pool_mutex);
5221
5222 WARN_ON_ONCE(workqueue_freezing);
5223 workqueue_freezing = true;
5224
5225 list_for_each_entry(wq, &workqueues, list) {
5226 mutex_lock(&wq->mutex);
5227 for_each_pwq(pwq, wq)
5228 pwq_adjust_max_active(pwq);
5229 mutex_unlock(&wq->mutex);
5230 }
5231
5232 mutex_unlock(&wq_pool_mutex);
5233 }
5234
5235 /**
5236 * freeze_workqueues_busy - are freezable workqueues still busy?
5237 *
5238 * Check whether freezing is complete. This function must be called
5239 * between freeze_workqueues_begin() and thaw_workqueues().
5240 *
5241 * CONTEXT:
5242 * Grabs and releases wq_pool_mutex.
5243 *
5244 * Return:
5245 * %true if some freezable workqueues are still busy. %false if freezing
5246 * is complete.
5247 */
freeze_workqueues_busy(void)5248 bool freeze_workqueues_busy(void)
5249 {
5250 bool busy = false;
5251 struct workqueue_struct *wq;
5252 struct pool_workqueue *pwq;
5253
5254 mutex_lock(&wq_pool_mutex);
5255
5256 WARN_ON_ONCE(!workqueue_freezing);
5257
5258 list_for_each_entry(wq, &workqueues, list) {
5259 if (!(wq->flags & WQ_FREEZABLE))
5260 continue;
5261 /*
5262 * nr_active is monotonically decreasing. It's safe
5263 * to peek without lock.
5264 */
5265 rcu_read_lock();
5266 for_each_pwq(pwq, wq) {
5267 WARN_ON_ONCE(pwq->nr_active < 0);
5268 if (pwq->nr_active) {
5269 busy = true;
5270 rcu_read_unlock();
5271 goto out_unlock;
5272 }
5273 }
5274 rcu_read_unlock();
5275 }
5276 out_unlock:
5277 mutex_unlock(&wq_pool_mutex);
5278 return busy;
5279 }
5280
5281 /**
5282 * thaw_workqueues - thaw workqueues
5283 *
5284 * Thaw workqueues. Normal queueing is restored and all collected
5285 * frozen works are transferred to their respective pool worklists.
5286 *
5287 * CONTEXT:
5288 * Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's.
5289 */
thaw_workqueues(void)5290 void thaw_workqueues(void)
5291 {
5292 struct workqueue_struct *wq;
5293 struct pool_workqueue *pwq;
5294
5295 mutex_lock(&wq_pool_mutex);
5296
5297 if (!workqueue_freezing)
5298 goto out_unlock;
5299
5300 workqueue_freezing = false;
5301
5302 /* restore max_active and repopulate worklist */
5303 list_for_each_entry(wq, &workqueues, list) {
5304 mutex_lock(&wq->mutex);
5305 for_each_pwq(pwq, wq)
5306 pwq_adjust_max_active(pwq);
5307 mutex_unlock(&wq->mutex);
5308 }
5309
5310 out_unlock:
5311 mutex_unlock(&wq_pool_mutex);
5312 }
5313 #endif /* CONFIG_FREEZER */
5314
workqueue_apply_unbound_cpumask(void)5315 static int workqueue_apply_unbound_cpumask(void)
5316 {
5317 LIST_HEAD(ctxs);
5318 int ret = 0;
5319 struct workqueue_struct *wq;
5320 struct apply_wqattrs_ctx *ctx, *n;
5321
5322 lockdep_assert_held(&wq_pool_mutex);
5323
5324 list_for_each_entry(wq, &workqueues, list) {
5325 if (!(wq->flags & WQ_UNBOUND))
5326 continue;
5327
5328 /* creating multiple pwqs breaks ordering guarantee */
5329 if (!list_empty(&wq->pwqs)) {
5330 if (wq->flags & __WQ_ORDERED_EXPLICIT)
5331 continue;
5332 wq->flags &= ~__WQ_ORDERED;
5333 }
5334
5335 ctx = apply_wqattrs_prepare(wq, wq->unbound_attrs);
5336 if (!ctx) {
5337 ret = -ENOMEM;
5338 break;
5339 }
5340
5341 list_add_tail(&ctx->list, &ctxs);
5342 }
5343
5344 list_for_each_entry_safe(ctx, n, &ctxs, list) {
5345 if (!ret)
5346 apply_wqattrs_commit(ctx);
5347 apply_wqattrs_cleanup(ctx);
5348 }
5349
5350 return ret;
5351 }
5352
5353 /**
5354 * workqueue_set_unbound_cpumask - Set the low-level unbound cpumask
5355 * @cpumask: the cpumask to set
5356 *
5357 * The low-level workqueues cpumask is a global cpumask that limits
5358 * the affinity of all unbound workqueues. This function check the @cpumask
5359 * and apply it to all unbound workqueues and updates all pwqs of them.
5360 *
5361 * Retun: 0 - Success
5362 * -EINVAL - Invalid @cpumask
5363 * -ENOMEM - Failed to allocate memory for attrs or pwqs.
5364 */
workqueue_set_unbound_cpumask(cpumask_var_t cpumask)5365 int workqueue_set_unbound_cpumask(cpumask_var_t cpumask)
5366 {
5367 int ret = -EINVAL;
5368 cpumask_var_t saved_cpumask;
5369
5370 /*
5371 * Not excluding isolated cpus on purpose.
5372 * If the user wishes to include them, we allow that.
5373 */
5374 cpumask_and(cpumask, cpumask, cpu_possible_mask);
5375 if (!cpumask_empty(cpumask)) {
5376 apply_wqattrs_lock();
5377 if (cpumask_equal(cpumask, wq_unbound_cpumask)) {
5378 ret = 0;
5379 goto out_unlock;
5380 }
5381
5382 if (!zalloc_cpumask_var(&saved_cpumask, GFP_KERNEL)) {
5383 ret = -ENOMEM;
5384 goto out_unlock;
5385 }
5386
5387 /* save the old wq_unbound_cpumask. */
5388 cpumask_copy(saved_cpumask, wq_unbound_cpumask);
5389
5390 /* update wq_unbound_cpumask at first and apply it to wqs. */
5391 cpumask_copy(wq_unbound_cpumask, cpumask);
5392 ret = workqueue_apply_unbound_cpumask();
5393
5394 /* restore the wq_unbound_cpumask when failed. */
5395 if (ret < 0)
5396 cpumask_copy(wq_unbound_cpumask, saved_cpumask);
5397
5398 free_cpumask_var(saved_cpumask);
5399 out_unlock:
5400 apply_wqattrs_unlock();
5401 }
5402
5403 return ret;
5404 }
5405
5406 #ifdef CONFIG_SYSFS
5407 /*
5408 * Workqueues with WQ_SYSFS flag set is visible to userland via
5409 * /sys/bus/workqueue/devices/WQ_NAME. All visible workqueues have the
5410 * following attributes.
5411 *
5412 * per_cpu RO bool : whether the workqueue is per-cpu or unbound
5413 * max_active RW int : maximum number of in-flight work items
5414 *
5415 * Unbound workqueues have the following extra attributes.
5416 *
5417 * pool_ids RO int : the associated pool IDs for each node
5418 * nice RW int : nice value of the workers
5419 * cpumask RW mask : bitmask of allowed CPUs for the workers
5420 * numa RW bool : whether enable NUMA affinity
5421 */
5422 struct wq_device {
5423 struct workqueue_struct *wq;
5424 struct device dev;
5425 };
5426
dev_to_wq(struct device * dev)5427 static struct workqueue_struct *dev_to_wq(struct device *dev)
5428 {
5429 struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
5430
5431 return wq_dev->wq;
5432 }
5433
per_cpu_show(struct device * dev,struct device_attribute * attr,char * buf)5434 static ssize_t per_cpu_show(struct device *dev, struct device_attribute *attr,
5435 char *buf)
5436 {
5437 struct workqueue_struct *wq = dev_to_wq(dev);
5438
5439 return scnprintf(buf, PAGE_SIZE, "%d\n", (bool)!(wq->flags & WQ_UNBOUND));
5440 }
5441 static DEVICE_ATTR_RO(per_cpu);
5442
max_active_show(struct device * dev,struct device_attribute * attr,char * buf)5443 static ssize_t max_active_show(struct device *dev,
5444 struct device_attribute *attr, char *buf)
5445 {
5446 struct workqueue_struct *wq = dev_to_wq(dev);
5447
5448 return scnprintf(buf, PAGE_SIZE, "%d\n", wq->saved_max_active);
5449 }
5450
max_active_store(struct device * dev,struct device_attribute * attr,const char * buf,size_t count)5451 static ssize_t max_active_store(struct device *dev,
5452 struct device_attribute *attr, const char *buf,
5453 size_t count)
5454 {
5455 struct workqueue_struct *wq = dev_to_wq(dev);
5456 int val;
5457
5458 if (sscanf(buf, "%d", &val) != 1 || val <= 0)
5459 return -EINVAL;
5460
5461 workqueue_set_max_active(wq, val);
5462 return count;
5463 }
5464 static DEVICE_ATTR_RW(max_active);
5465
5466 static struct attribute *wq_sysfs_attrs[] = {
5467 &dev_attr_per_cpu.attr,
5468 &dev_attr_max_active.attr,
5469 NULL,
5470 };
5471 ATTRIBUTE_GROUPS(wq_sysfs);
5472
wq_pool_ids_show(struct device * dev,struct device_attribute * attr,char * buf)5473 static ssize_t wq_pool_ids_show(struct device *dev,
5474 struct device_attribute *attr, char *buf)
5475 {
5476 struct workqueue_struct *wq = dev_to_wq(dev);
5477 const char *delim = "";
5478 int node, written = 0;
5479
5480 get_online_cpus();
5481 rcu_read_lock();
5482 for_each_node(node) {
5483 written += scnprintf(buf + written, PAGE_SIZE - written,
5484 "%s%d:%d", delim, node,
5485 unbound_pwq_by_node(wq, node)->pool->id);
5486 delim = " ";
5487 }
5488 written += scnprintf(buf + written, PAGE_SIZE - written, "\n");
5489 rcu_read_unlock();
5490 put_online_cpus();
5491
5492 return written;
5493 }
5494
wq_nice_show(struct device * dev,struct device_attribute * attr,char * buf)5495 static ssize_t wq_nice_show(struct device *dev, struct device_attribute *attr,
5496 char *buf)
5497 {
5498 struct workqueue_struct *wq = dev_to_wq(dev);
5499 int written;
5500
5501 mutex_lock(&wq->mutex);
5502 written = scnprintf(buf, PAGE_SIZE, "%d\n", wq->unbound_attrs->nice);
5503 mutex_unlock(&wq->mutex);
5504
5505 return written;
5506 }
5507
5508 /* prepare workqueue_attrs for sysfs store operations */
wq_sysfs_prep_attrs(struct workqueue_struct * wq)5509 static struct workqueue_attrs *wq_sysfs_prep_attrs(struct workqueue_struct *wq)
5510 {
5511 struct workqueue_attrs *attrs;
5512
5513 lockdep_assert_held(&wq_pool_mutex);
5514
5515 attrs = alloc_workqueue_attrs();
5516 if (!attrs)
5517 return NULL;
5518
5519 copy_workqueue_attrs(attrs, wq->unbound_attrs);
5520 return attrs;
5521 }
5522
wq_nice_store(struct device * dev,struct device_attribute * attr,const char * buf,size_t count)5523 static ssize_t wq_nice_store(struct device *dev, struct device_attribute *attr,
5524 const char *buf, size_t count)
5525 {
5526 struct workqueue_struct *wq = dev_to_wq(dev);
5527 struct workqueue_attrs *attrs;
5528 int ret = -ENOMEM;
5529
5530 apply_wqattrs_lock();
5531
5532 attrs = wq_sysfs_prep_attrs(wq);
5533 if (!attrs)
5534 goto out_unlock;
5535
5536 if (sscanf(buf, "%d", &attrs->nice) == 1 &&
5537 attrs->nice >= MIN_NICE && attrs->nice <= MAX_NICE)
5538 ret = apply_workqueue_attrs_locked(wq, attrs);
5539 else
5540 ret = -EINVAL;
5541
5542 out_unlock:
5543 apply_wqattrs_unlock();
5544 free_workqueue_attrs(attrs);
5545 return ret ?: count;
5546 }
5547
wq_cpumask_show(struct device * dev,struct device_attribute * attr,char * buf)5548 static ssize_t wq_cpumask_show(struct device *dev,
5549 struct device_attribute *attr, char *buf)
5550 {
5551 struct workqueue_struct *wq = dev_to_wq(dev);
5552 int written;
5553
5554 mutex_lock(&wq->mutex);
5555 written = scnprintf(buf, PAGE_SIZE, "%*pb\n",
5556 cpumask_pr_args(wq->unbound_attrs->cpumask));
5557 mutex_unlock(&wq->mutex);
5558 return written;
5559 }
5560
wq_cpumask_store(struct device * dev,struct device_attribute * attr,const char * buf,size_t count)5561 static ssize_t wq_cpumask_store(struct device *dev,
5562 struct device_attribute *attr,
5563 const char *buf, size_t count)
5564 {
5565 struct workqueue_struct *wq = dev_to_wq(dev);
5566 struct workqueue_attrs *attrs;
5567 int ret = -ENOMEM;
5568
5569 apply_wqattrs_lock();
5570
5571 attrs = wq_sysfs_prep_attrs(wq);
5572 if (!attrs)
5573 goto out_unlock;
5574
5575 ret = cpumask_parse(buf, attrs->cpumask);
5576 if (!ret)
5577 ret = apply_workqueue_attrs_locked(wq, attrs);
5578
5579 out_unlock:
5580 apply_wqattrs_unlock();
5581 free_workqueue_attrs(attrs);
5582 return ret ?: count;
5583 }
5584
wq_numa_show(struct device * dev,struct device_attribute * attr,char * buf)5585 static ssize_t wq_numa_show(struct device *dev, struct device_attribute *attr,
5586 char *buf)
5587 {
5588 struct workqueue_struct *wq = dev_to_wq(dev);
5589 int written;
5590
5591 mutex_lock(&wq->mutex);
5592 written = scnprintf(buf, PAGE_SIZE, "%d\n",
5593 !wq->unbound_attrs->no_numa);
5594 mutex_unlock(&wq->mutex);
5595
5596 return written;
5597 }
5598
wq_numa_store(struct device * dev,struct device_attribute * attr,const char * buf,size_t count)5599 static ssize_t wq_numa_store(struct device *dev, struct device_attribute *attr,
5600 const char *buf, size_t count)
5601 {
5602 struct workqueue_struct *wq = dev_to_wq(dev);
5603 struct workqueue_attrs *attrs;
5604 int v, ret = -ENOMEM;
5605
5606 apply_wqattrs_lock();
5607
5608 attrs = wq_sysfs_prep_attrs(wq);
5609 if (!attrs)
5610 goto out_unlock;
5611
5612 ret = -EINVAL;
5613 if (sscanf(buf, "%d", &v) == 1) {
5614 attrs->no_numa = !v;
5615 ret = apply_workqueue_attrs_locked(wq, attrs);
5616 }
5617
5618 out_unlock:
5619 apply_wqattrs_unlock();
5620 free_workqueue_attrs(attrs);
5621 return ret ?: count;
5622 }
5623
5624 static struct device_attribute wq_sysfs_unbound_attrs[] = {
5625 __ATTR(pool_ids, 0444, wq_pool_ids_show, NULL),
5626 __ATTR(nice, 0644, wq_nice_show, wq_nice_store),
5627 __ATTR(cpumask, 0644, wq_cpumask_show, wq_cpumask_store),
5628 __ATTR(numa, 0644, wq_numa_show, wq_numa_store),
5629 __ATTR_NULL,
5630 };
5631
5632 static struct bus_type wq_subsys = {
5633 .name = "workqueue",
5634 .dev_groups = wq_sysfs_groups,
5635 };
5636
wq_unbound_cpumask_show(struct device * dev,struct device_attribute * attr,char * buf)5637 static ssize_t wq_unbound_cpumask_show(struct device *dev,
5638 struct device_attribute *attr, char *buf)
5639 {
5640 int written;
5641
5642 mutex_lock(&wq_pool_mutex);
5643 written = scnprintf(buf, PAGE_SIZE, "%*pb\n",
5644 cpumask_pr_args(wq_unbound_cpumask));
5645 mutex_unlock(&wq_pool_mutex);
5646
5647 return written;
5648 }
5649
wq_unbound_cpumask_store(struct device * dev,struct device_attribute * attr,const char * buf,size_t count)5650 static ssize_t wq_unbound_cpumask_store(struct device *dev,
5651 struct device_attribute *attr, const char *buf, size_t count)
5652 {
5653 cpumask_var_t cpumask;
5654 int ret;
5655
5656 if (!zalloc_cpumask_var(&cpumask, GFP_KERNEL))
5657 return -ENOMEM;
5658
5659 ret = cpumask_parse(buf, cpumask);
5660 if (!ret)
5661 ret = workqueue_set_unbound_cpumask(cpumask);
5662
5663 free_cpumask_var(cpumask);
5664 return ret ? ret : count;
5665 }
5666
5667 static struct device_attribute wq_sysfs_cpumask_attr =
5668 __ATTR(cpumask, 0644, wq_unbound_cpumask_show,
5669 wq_unbound_cpumask_store);
5670
wq_sysfs_init(void)5671 static int __init wq_sysfs_init(void)
5672 {
5673 int err;
5674
5675 err = subsys_virtual_register(&wq_subsys, NULL);
5676 if (err)
5677 return err;
5678
5679 return device_create_file(wq_subsys.dev_root, &wq_sysfs_cpumask_attr);
5680 }
5681 core_initcall(wq_sysfs_init);
5682
wq_device_release(struct device * dev)5683 static void wq_device_release(struct device *dev)
5684 {
5685 struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
5686
5687 kfree(wq_dev);
5688 }
5689
5690 /**
5691 * workqueue_sysfs_register - make a workqueue visible in sysfs
5692 * @wq: the workqueue to register
5693 *
5694 * Expose @wq in sysfs under /sys/bus/workqueue/devices.
5695 * alloc_workqueue*() automatically calls this function if WQ_SYSFS is set
5696 * which is the preferred method.
5697 *
5698 * Workqueue user should use this function directly iff it wants to apply
5699 * workqueue_attrs before making the workqueue visible in sysfs; otherwise,
5700 * apply_workqueue_attrs() may race against userland updating the
5701 * attributes.
5702 *
5703 * Return: 0 on success, -errno on failure.
5704 */
workqueue_sysfs_register(struct workqueue_struct * wq)5705 int workqueue_sysfs_register(struct workqueue_struct *wq)
5706 {
5707 struct wq_device *wq_dev;
5708 int ret;
5709
5710 /*
5711 * Adjusting max_active or creating new pwqs by applying
5712 * attributes breaks ordering guarantee. Disallow exposing ordered
5713 * workqueues.
5714 */
5715 if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
5716 return -EINVAL;
5717
5718 wq->wq_dev = wq_dev = kzalloc(sizeof(*wq_dev), GFP_KERNEL);
5719 if (!wq_dev)
5720 return -ENOMEM;
5721
5722 wq_dev->wq = wq;
5723 wq_dev->dev.bus = &wq_subsys;
5724 wq_dev->dev.release = wq_device_release;
5725 dev_set_name(&wq_dev->dev, "%s", wq->name);
5726
5727 /*
5728 * unbound_attrs are created separately. Suppress uevent until
5729 * everything is ready.
5730 */
5731 dev_set_uevent_suppress(&wq_dev->dev, true);
5732
5733 ret = device_register(&wq_dev->dev);
5734 if (ret) {
5735 put_device(&wq_dev->dev);
5736 wq->wq_dev = NULL;
5737 return ret;
5738 }
5739
5740 if (wq->flags & WQ_UNBOUND) {
5741 struct device_attribute *attr;
5742
5743 for (attr = wq_sysfs_unbound_attrs; attr->attr.name; attr++) {
5744 ret = device_create_file(&wq_dev->dev, attr);
5745 if (ret) {
5746 device_unregister(&wq_dev->dev);
5747 wq->wq_dev = NULL;
5748 return ret;
5749 }
5750 }
5751 }
5752
5753 dev_set_uevent_suppress(&wq_dev->dev, false);
5754 kobject_uevent(&wq_dev->dev.kobj, KOBJ_ADD);
5755 return 0;
5756 }
5757
5758 /**
5759 * workqueue_sysfs_unregister - undo workqueue_sysfs_register()
5760 * @wq: the workqueue to unregister
5761 *
5762 * If @wq is registered to sysfs by workqueue_sysfs_register(), unregister.
5763 */
workqueue_sysfs_unregister(struct workqueue_struct * wq)5764 static void workqueue_sysfs_unregister(struct workqueue_struct *wq)
5765 {
5766 struct wq_device *wq_dev = wq->wq_dev;
5767
5768 if (!wq->wq_dev)
5769 return;
5770
5771 wq->wq_dev = NULL;
5772 device_unregister(&wq_dev->dev);
5773 }
5774 #else /* CONFIG_SYSFS */
workqueue_sysfs_unregister(struct workqueue_struct * wq)5775 static void workqueue_sysfs_unregister(struct workqueue_struct *wq) { }
5776 #endif /* CONFIG_SYSFS */
5777
5778 /*
5779 * Workqueue watchdog.
5780 *
5781 * Stall may be caused by various bugs - missing WQ_MEM_RECLAIM, illegal
5782 * flush dependency, a concurrency managed work item which stays RUNNING
5783 * indefinitely. Workqueue stalls can be very difficult to debug as the
5784 * usual warning mechanisms don't trigger and internal workqueue state is
5785 * largely opaque.
5786 *
5787 * Workqueue watchdog monitors all worker pools periodically and dumps
5788 * state if some pools failed to make forward progress for a while where
5789 * forward progress is defined as the first item on ->worklist changing.
5790 *
5791 * This mechanism is controlled through the kernel parameter
5792 * "workqueue.watchdog_thresh" which can be updated at runtime through the
5793 * corresponding sysfs parameter file.
5794 */
5795 #ifdef CONFIG_WQ_WATCHDOG
5796
5797 static unsigned long wq_watchdog_thresh = 30;
5798 static struct timer_list wq_watchdog_timer;
5799
5800 static unsigned long wq_watchdog_touched = INITIAL_JIFFIES;
5801 static DEFINE_PER_CPU(unsigned long, wq_watchdog_touched_cpu) = INITIAL_JIFFIES;
5802
wq_watchdog_reset_touched(void)5803 static void wq_watchdog_reset_touched(void)
5804 {
5805 int cpu;
5806
5807 wq_watchdog_touched = jiffies;
5808 for_each_possible_cpu(cpu)
5809 per_cpu(wq_watchdog_touched_cpu, cpu) = jiffies;
5810 }
5811
wq_watchdog_timer_fn(struct timer_list * unused)5812 static void wq_watchdog_timer_fn(struct timer_list *unused)
5813 {
5814 unsigned long thresh = READ_ONCE(wq_watchdog_thresh) * HZ;
5815 bool lockup_detected = false;
5816 unsigned long now = jiffies;
5817 struct worker_pool *pool;
5818 int pi;
5819
5820 if (!thresh)
5821 return;
5822
5823 rcu_read_lock();
5824
5825 for_each_pool(pool, pi) {
5826 unsigned long pool_ts, touched, ts;
5827
5828 if (list_empty(&pool->worklist))
5829 continue;
5830
5831 /*
5832 * If a virtual machine is stopped by the host it can look to
5833 * the watchdog like a stall.
5834 */
5835 kvm_check_and_clear_guest_paused();
5836
5837 /* get the latest of pool and touched timestamps */
5838 pool_ts = READ_ONCE(pool->watchdog_ts);
5839 touched = READ_ONCE(wq_watchdog_touched);
5840
5841 if (time_after(pool_ts, touched))
5842 ts = pool_ts;
5843 else
5844 ts = touched;
5845
5846 if (pool->cpu >= 0) {
5847 unsigned long cpu_touched =
5848 READ_ONCE(per_cpu(wq_watchdog_touched_cpu,
5849 pool->cpu));
5850 if (time_after(cpu_touched, ts))
5851 ts = cpu_touched;
5852 }
5853
5854 /* did we stall? */
5855 if (time_after(now, ts + thresh)) {
5856 lockup_detected = true;
5857 pr_emerg("BUG: workqueue lockup - pool");
5858 pr_cont_pool_info(pool);
5859 pr_cont(" stuck for %us!\n",
5860 jiffies_to_msecs(now - pool_ts) / 1000);
5861 trace_android_vh_wq_lockup_pool(pool->cpu, pool_ts);
5862 }
5863 }
5864
5865 rcu_read_unlock();
5866
5867 if (lockup_detected)
5868 show_workqueue_state();
5869
5870 wq_watchdog_reset_touched();
5871 mod_timer(&wq_watchdog_timer, jiffies + thresh);
5872 }
5873
wq_watchdog_touch(int cpu)5874 notrace void wq_watchdog_touch(int cpu)
5875 {
5876 if (cpu >= 0)
5877 per_cpu(wq_watchdog_touched_cpu, cpu) = jiffies;
5878 else
5879 wq_watchdog_touched = jiffies;
5880 }
5881
wq_watchdog_set_thresh(unsigned long thresh)5882 static void wq_watchdog_set_thresh(unsigned long thresh)
5883 {
5884 wq_watchdog_thresh = 0;
5885 del_timer_sync(&wq_watchdog_timer);
5886
5887 if (thresh) {
5888 wq_watchdog_thresh = thresh;
5889 wq_watchdog_reset_touched();
5890 mod_timer(&wq_watchdog_timer, jiffies + thresh * HZ);
5891 }
5892 }
5893
wq_watchdog_param_set_thresh(const char * val,const struct kernel_param * kp)5894 static int wq_watchdog_param_set_thresh(const char *val,
5895 const struct kernel_param *kp)
5896 {
5897 unsigned long thresh;
5898 int ret;
5899
5900 ret = kstrtoul(val, 0, &thresh);
5901 if (ret)
5902 return ret;
5903
5904 if (system_wq)
5905 wq_watchdog_set_thresh(thresh);
5906 else
5907 wq_watchdog_thresh = thresh;
5908
5909 return 0;
5910 }
5911
5912 static const struct kernel_param_ops wq_watchdog_thresh_ops = {
5913 .set = wq_watchdog_param_set_thresh,
5914 .get = param_get_ulong,
5915 };
5916
5917 module_param_cb(watchdog_thresh, &wq_watchdog_thresh_ops, &wq_watchdog_thresh,
5918 0644);
5919
wq_watchdog_init(void)5920 static void wq_watchdog_init(void)
5921 {
5922 timer_setup(&wq_watchdog_timer, wq_watchdog_timer_fn, TIMER_DEFERRABLE);
5923 wq_watchdog_set_thresh(wq_watchdog_thresh);
5924 }
5925
5926 #else /* CONFIG_WQ_WATCHDOG */
5927
wq_watchdog_init(void)5928 static inline void wq_watchdog_init(void) { }
5929
5930 #endif /* CONFIG_WQ_WATCHDOG */
5931
wq_numa_init(void)5932 static void __init wq_numa_init(void)
5933 {
5934 cpumask_var_t *tbl;
5935 int node, cpu;
5936
5937 if (num_possible_nodes() <= 1)
5938 return;
5939
5940 if (wq_disable_numa) {
5941 pr_info("workqueue: NUMA affinity support disabled\n");
5942 return;
5943 }
5944
5945 for_each_possible_cpu(cpu) {
5946 if (WARN_ON(cpu_to_node(cpu) == NUMA_NO_NODE)) {
5947 pr_warn("workqueue: NUMA node mapping not available for cpu%d, disabling NUMA support\n", cpu);
5948 return;
5949 }
5950 }
5951
5952 wq_update_unbound_numa_attrs_buf = alloc_workqueue_attrs();
5953 BUG_ON(!wq_update_unbound_numa_attrs_buf);
5954
5955 /*
5956 * We want masks of possible CPUs of each node which isn't readily
5957 * available. Build one from cpu_to_node() which should have been
5958 * fully initialized by now.
5959 */
5960 tbl = kcalloc(nr_node_ids, sizeof(tbl[0]), GFP_KERNEL);
5961 BUG_ON(!tbl);
5962
5963 for_each_node(node)
5964 BUG_ON(!zalloc_cpumask_var_node(&tbl[node], GFP_KERNEL,
5965 node_online(node) ? node : NUMA_NO_NODE));
5966
5967 for_each_possible_cpu(cpu) {
5968 node = cpu_to_node(cpu);
5969 cpumask_set_cpu(cpu, tbl[node]);
5970 }
5971
5972 wq_numa_possible_cpumask = tbl;
5973 wq_numa_enabled = true;
5974 }
5975
5976 /**
5977 * workqueue_init_early - early init for workqueue subsystem
5978 *
5979 * This is the first half of two-staged workqueue subsystem initialization
5980 * and invoked as soon as the bare basics - memory allocation, cpumasks and
5981 * idr are up. It sets up all the data structures and system workqueues
5982 * and allows early boot code to create workqueues and queue/cancel work
5983 * items. Actual work item execution starts only after kthreads can be
5984 * created and scheduled right before early initcalls.
5985 */
workqueue_init_early(void)5986 void __init workqueue_init_early(void)
5987 {
5988 int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };
5989 int hk_flags = HK_FLAG_DOMAIN | HK_FLAG_WQ;
5990 int i, cpu;
5991
5992 BUILD_BUG_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));
5993
5994 BUG_ON(!alloc_cpumask_var(&wq_unbound_cpumask, GFP_KERNEL));
5995 cpumask_copy(wq_unbound_cpumask, housekeeping_cpumask(hk_flags));
5996
5997 pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);
5998
5999 /* initialize CPU pools */
6000 for_each_possible_cpu(cpu) {
6001 struct worker_pool *pool;
6002
6003 i = 0;
6004 for_each_cpu_worker_pool(pool, cpu) {
6005 BUG_ON(init_worker_pool(pool));
6006 pool->cpu = cpu;
6007 cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
6008 pool->attrs->nice = std_nice[i++];
6009 pool->node = cpu_to_node(cpu);
6010
6011 /* alloc pool ID */
6012 mutex_lock(&wq_pool_mutex);
6013 BUG_ON(worker_pool_assign_id(pool));
6014 mutex_unlock(&wq_pool_mutex);
6015 }
6016 }
6017
6018 /* create default unbound and ordered wq attrs */
6019 for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
6020 struct workqueue_attrs *attrs;
6021
6022 BUG_ON(!(attrs = alloc_workqueue_attrs()));
6023 attrs->nice = std_nice[i];
6024 unbound_std_wq_attrs[i] = attrs;
6025
6026 /*
6027 * An ordered wq should have only one pwq as ordering is
6028 * guaranteed by max_active which is enforced by pwqs.
6029 * Turn off NUMA so that dfl_pwq is used for all nodes.
6030 */
6031 BUG_ON(!(attrs = alloc_workqueue_attrs()));
6032 attrs->nice = std_nice[i];
6033 attrs->no_numa = true;
6034 ordered_wq_attrs[i] = attrs;
6035 }
6036
6037 system_wq = alloc_workqueue("events", 0, 0);
6038 system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
6039 system_long_wq = alloc_workqueue("events_long", 0, 0);
6040 system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
6041 WQ_UNBOUND_MAX_ACTIVE);
6042 system_freezable_wq = alloc_workqueue("events_freezable",
6043 WQ_FREEZABLE, 0);
6044 system_power_efficient_wq = alloc_workqueue("events_power_efficient",
6045 WQ_POWER_EFFICIENT, 0);
6046 system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient",
6047 WQ_FREEZABLE | WQ_POWER_EFFICIENT,
6048 0);
6049 BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
6050 !system_unbound_wq || !system_freezable_wq ||
6051 !system_power_efficient_wq ||
6052 !system_freezable_power_efficient_wq);
6053 }
6054
6055 /**
6056 * workqueue_init - bring workqueue subsystem fully online
6057 *
6058 * This is the latter half of two-staged workqueue subsystem initialization
6059 * and invoked as soon as kthreads can be created and scheduled.
6060 * Workqueues have been created and work items queued on them, but there
6061 * are no kworkers executing the work items yet. Populate the worker pools
6062 * with the initial workers and enable future kworker creations.
6063 */
workqueue_init(void)6064 void __init workqueue_init(void)
6065 {
6066 struct workqueue_struct *wq;
6067 struct worker_pool *pool;
6068 int cpu, bkt;
6069
6070 /*
6071 * It'd be simpler to initialize NUMA in workqueue_init_early() but
6072 * CPU to node mapping may not be available that early on some
6073 * archs such as power and arm64. As per-cpu pools created
6074 * previously could be missing node hint and unbound pools NUMA
6075 * affinity, fix them up.
6076 *
6077 * Also, while iterating workqueues, create rescuers if requested.
6078 */
6079 wq_numa_init();
6080
6081 mutex_lock(&wq_pool_mutex);
6082
6083 for_each_possible_cpu(cpu) {
6084 for_each_cpu_worker_pool(pool, cpu) {
6085 pool->node = cpu_to_node(cpu);
6086 }
6087 }
6088
6089 list_for_each_entry(wq, &workqueues, list) {
6090 wq_update_unbound_numa(wq, smp_processor_id(), true);
6091 WARN(init_rescuer(wq),
6092 "workqueue: failed to create early rescuer for %s",
6093 wq->name);
6094 }
6095
6096 mutex_unlock(&wq_pool_mutex);
6097
6098 /* create the initial workers */
6099 for_each_online_cpu(cpu) {
6100 for_each_cpu_worker_pool(pool, cpu) {
6101 pool->flags &= ~POOL_DISASSOCIATED;
6102 BUG_ON(!create_worker(pool));
6103 }
6104 }
6105
6106 hash_for_each(unbound_pool_hash, bkt, pool, hash_node)
6107 BUG_ON(!create_worker(pool));
6108
6109 wq_online = true;
6110 wq_watchdog_init();
6111 }
6112