Back to home page

LXR

 
 

    


0001 /*
0002  * kernel/workqueue.c - generic async execution with shared worker pool
0003  *
0004  * Copyright (C) 2002       Ingo Molnar
0005  *
0006  *   Derived from the taskqueue/keventd code by:
0007  *     David Woodhouse <dwmw2@infradead.org>
0008  *     Andrew Morton
0009  *     Kai Petzke <wpp@marie.physik.tu-berlin.de>
0010  *     Theodore Ts'o <tytso@mit.edu>
0011  *
0012  * Made to use alloc_percpu by Christoph Lameter.
0013  *
0014  * Copyright (C) 2010       SUSE Linux Products GmbH
0015  * Copyright (C) 2010       Tejun Heo <tj@kernel.org>
0016  *
0017  * This is the generic async execution mechanism.  Work items as are
0018  * executed in process context.  The worker pool is shared and
0019  * automatically managed.  There are two worker pools for each CPU (one for
0020  * normal work items and the other for high priority ones) and some extra
0021  * pools for workqueues which are not bound to any specific CPU - the
0022  * number of these backing pools is dynamic.
0023  *
0024  * Please read Documentation/workqueue.txt for details.
0025  */
0026 
0027 #include <linux/export.h>
0028 #include <linux/kernel.h>
0029 #include <linux/sched.h>
0030 #include <linux/init.h>
0031 #include <linux/signal.h>
0032 #include <linux/completion.h>
0033 #include <linux/workqueue.h>
0034 #include <linux/slab.h>
0035 #include <linux/cpu.h>
0036 #include <linux/notifier.h>
0037 #include <linux/kthread.h>
0038 #include <linux/hardirq.h>
0039 #include <linux/mempolicy.h>
0040 #include <linux/freezer.h>
0041 #include <linux/kallsyms.h>
0042 #include <linux/debug_locks.h>
0043 #include <linux/lockdep.h>
0044 #include <linux/idr.h>
0045 #include <linux/jhash.h>
0046 #include <linux/hashtable.h>
0047 #include <linux/rculist.h>
0048 #include <linux/nodemask.h>
0049 #include <linux/moduleparam.h>
0050 #include <linux/uaccess.h>
0051 
0052 #include "workqueue_internal.h"
0053 
0054 enum {
0055     /*
0056      * worker_pool flags
0057      *
0058      * A bound pool is either associated or disassociated with its CPU.
0059      * While associated (!DISASSOCIATED), all workers are bound to the
0060      * CPU and none has %WORKER_UNBOUND set and concurrency management
0061      * is in effect.
0062      *
0063      * While DISASSOCIATED, the cpu may be offline and all workers have
0064      * %WORKER_UNBOUND set and concurrency management disabled, and may
0065      * be executing on any CPU.  The pool behaves as an unbound one.
0066      *
0067      * Note that DISASSOCIATED should be flipped only while holding
0068      * attach_mutex to avoid changing binding state while
0069      * worker_attach_to_pool() is in progress.
0070      */
0071     POOL_DISASSOCIATED  = 1 << 2,   /* cpu can't serve workers */
0072 
0073     /* worker flags */
0074     WORKER_DIE      = 1 << 1,   /* die die die */
0075     WORKER_IDLE     = 1 << 2,   /* is idle */
0076     WORKER_PREP     = 1 << 3,   /* preparing to run works */
0077     WORKER_CPU_INTENSIVE    = 1 << 6,   /* cpu intensive */
0078     WORKER_UNBOUND      = 1 << 7,   /* worker is unbound */
0079     WORKER_REBOUND      = 1 << 8,   /* worker was rebound */
0080 
0081     WORKER_NOT_RUNNING  = WORKER_PREP | WORKER_CPU_INTENSIVE |
0082                   WORKER_UNBOUND | WORKER_REBOUND,
0083 
0084     NR_STD_WORKER_POOLS = 2,        /* # standard pools per cpu */
0085 
0086     UNBOUND_POOL_HASH_ORDER = 6,        /* hashed by pool->attrs */
0087     BUSY_WORKER_HASH_ORDER  = 6,        /* 64 pointers */
0088 
0089     MAX_IDLE_WORKERS_RATIO  = 4,        /* 1/4 of busy can be idle */
0090     IDLE_WORKER_TIMEOUT = 300 * HZ, /* keep idle ones for 5 mins */
0091 
0092     MAYDAY_INITIAL_TIMEOUT  = HZ / 100 >= 2 ? HZ / 100 : 2,
0093                         /* call for help after 10ms
0094                            (min two ticks) */
0095     MAYDAY_INTERVAL     = HZ / 10,  /* and then every 100ms */
0096     CREATE_COOLDOWN     = HZ,       /* time to breath after fail */
0097 
0098     /*
0099      * Rescue workers are used only on emergencies and shared by
0100      * all cpus.  Give MIN_NICE.
0101      */
0102     RESCUER_NICE_LEVEL  = MIN_NICE,
0103     HIGHPRI_NICE_LEVEL  = MIN_NICE,
0104 
0105     WQ_NAME_LEN     = 24,
0106 };
0107 
0108 /*
0109  * Structure fields follow one of the following exclusion rules.
0110  *
0111  * I: Modifiable by initialization/destruction paths and read-only for
0112  *    everyone else.
0113  *
0114  * P: Preemption protected.  Disabling preemption is enough and should
0115  *    only be modified and accessed from the local cpu.
0116  *
0117  * L: pool->lock protected.  Access with pool->lock held.
0118  *
0119  * X: During normal operation, modification requires pool->lock and should
0120  *    be done only from local cpu.  Either disabling preemption on local
0121  *    cpu or grabbing pool->lock is enough for read access.  If
0122  *    POOL_DISASSOCIATED is set, it's identical to L.
0123  *
0124  * A: pool->attach_mutex protected.
0125  *
0126  * PL: wq_pool_mutex protected.
0127  *
0128  * PR: wq_pool_mutex protected for writes.  Sched-RCU protected for reads.
0129  *
0130  * PW: wq_pool_mutex and wq->mutex protected for writes.  Either for reads.
0131  *
0132  * PWR: wq_pool_mutex and wq->mutex protected for writes.  Either or
0133  *      sched-RCU for reads.
0134  *
0135  * WQ: wq->mutex protected.
0136  *
0137  * WR: wq->mutex protected for writes.  Sched-RCU protected for reads.
0138  *
0139  * MD: wq_mayday_lock protected.
0140  */
0141 
0142 /* struct worker is defined in workqueue_internal.h */
0143 
0144 struct worker_pool {
0145     spinlock_t      lock;       /* the pool lock */
0146     int         cpu;        /* I: the associated cpu */
0147     int         node;       /* I: the associated node ID */
0148     int         id;     /* I: pool ID */
0149     unsigned int        flags;      /* X: flags */
0150 
0151     unsigned long       watchdog_ts;    /* L: watchdog timestamp */
0152 
0153     struct list_head    worklist;   /* L: list of pending works */
0154     int         nr_workers; /* L: total number of workers */
0155 
0156     /* nr_idle includes the ones off idle_list for rebinding */
0157     int         nr_idle;    /* L: currently idle ones */
0158 
0159     struct list_head    idle_list;  /* X: list of idle workers */
0160     struct timer_list   idle_timer; /* L: worker idle timeout */
0161     struct timer_list   mayday_timer;   /* L: SOS timer for workers */
0162 
0163     /* a workers is either on busy_hash or idle_list, or the manager */
0164     DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER);
0165                         /* L: hash of busy workers */
0166 
0167     /* see manage_workers() for details on the two manager mutexes */
0168     struct mutex        manager_arb;    /* manager arbitration */
0169     struct worker       *manager;   /* L: purely informational */
0170     struct mutex        attach_mutex;   /* attach/detach exclusion */
0171     struct list_head    workers;    /* A: attached workers */
0172     struct completion   *detach_completion; /* all workers detached */
0173 
0174     struct ida      worker_ida; /* worker IDs for task name */
0175 
0176     struct workqueue_attrs  *attrs;     /* I: worker attributes */
0177     struct hlist_node   hash_node;  /* PL: unbound_pool_hash node */
0178     int         refcnt;     /* PL: refcnt for unbound pools */
0179 
0180     /*
0181      * The current concurrency level.  As it's likely to be accessed
0182      * from other CPUs during try_to_wake_up(), put it in a separate
0183      * cacheline.
0184      */
0185     atomic_t        nr_running ____cacheline_aligned_in_smp;
0186 
0187     /*
0188      * Destruction of pool is sched-RCU protected to allow dereferences
0189      * from get_work_pool().
0190      */
0191     struct rcu_head     rcu;
0192 } ____cacheline_aligned_in_smp;
0193 
0194 /*
0195  * The per-pool workqueue.  While queued, the lower WORK_STRUCT_FLAG_BITS
0196  * of work_struct->data are used for flags and the remaining high bits
0197  * point to the pwq; thus, pwqs need to be aligned at two's power of the
0198  * number of flag bits.
0199  */
0200 struct pool_workqueue {
0201     struct worker_pool  *pool;      /* I: the associated pool */
0202     struct workqueue_struct *wq;        /* I: the owning workqueue */
0203     int         work_color; /* L: current color */
0204     int         flush_color;    /* L: flushing color */
0205     int         refcnt;     /* L: reference count */
0206     int         nr_in_flight[WORK_NR_COLORS];
0207                         /* L: nr of in_flight works */
0208     int         nr_active;  /* L: nr of active works */
0209     int         max_active; /* L: max active works */
0210     struct list_head    delayed_works;  /* L: delayed works */
0211     struct list_head    pwqs_node;  /* WR: node on wq->pwqs */
0212     struct list_head    mayday_node;    /* MD: node on wq->maydays */
0213 
0214     /*
0215      * Release of unbound pwq is punted to system_wq.  See put_pwq()
0216      * and pwq_unbound_release_workfn() for details.  pool_workqueue
0217      * itself is also sched-RCU protected so that the first pwq can be
0218      * determined without grabbing wq->mutex.
0219      */
0220     struct work_struct  unbound_release_work;
0221     struct rcu_head     rcu;
0222 } __aligned(1 << WORK_STRUCT_FLAG_BITS);
0223 
0224 /*
0225  * Structure used to wait for workqueue flush.
0226  */
0227 struct wq_flusher {
0228     struct list_head    list;       /* WQ: list of flushers */
0229     int         flush_color;    /* WQ: flush color waiting for */
0230     struct completion   done;       /* flush completion */
0231 };
0232 
0233 struct wq_device;
0234 
0235 /*
0236  * The externally visible workqueue.  It relays the issued work items to
0237  * the appropriate worker_pool through its pool_workqueues.
0238  */
0239 struct workqueue_struct {
0240     struct list_head    pwqs;       /* WR: all pwqs of this wq */
0241     struct list_head    list;       /* PR: list of all workqueues */
0242 
0243     struct mutex        mutex;      /* protects this wq */
0244     int         work_color; /* WQ: current work color */
0245     int         flush_color;    /* WQ: current flush color */
0246     atomic_t        nr_pwqs_to_flush; /* flush in progress */
0247     struct wq_flusher   *first_flusher; /* WQ: first flusher */
0248     struct list_head    flusher_queue;  /* WQ: flush waiters */
0249     struct list_head    flusher_overflow; /* WQ: flush overflow list */
0250 
0251     struct list_head    maydays;    /* MD: pwqs requesting rescue */
0252     struct worker       *rescuer;   /* I: rescue worker */
0253 
0254     int         nr_drainers;    /* WQ: drain in progress */
0255     int         saved_max_active; /* WQ: saved pwq max_active */
0256 
0257     struct workqueue_attrs  *unbound_attrs; /* PW: only for unbound wqs */
0258     struct pool_workqueue   *dfl_pwq;   /* PW: only for unbound wqs */
0259 
0260 #ifdef CONFIG_SYSFS
0261     struct wq_device    *wq_dev;    /* I: for sysfs interface */
0262 #endif
0263 #ifdef CONFIG_LOCKDEP
0264     struct lockdep_map  lockdep_map;
0265 #endif
0266     char            name[WQ_NAME_LEN]; /* I: workqueue name */
0267 
0268     /*
0269      * Destruction of workqueue_struct is sched-RCU protected to allow
0270      * walking the workqueues list without grabbing wq_pool_mutex.
0271      * This is used to dump all workqueues from sysrq.
0272      */
0273     struct rcu_head     rcu;
0274 
0275     /* hot fields used during command issue, aligned to cacheline */
0276     unsigned int        flags ____cacheline_aligned; /* WQ: WQ_* flags */
0277     struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */
0278     struct pool_workqueue __rcu *numa_pwq_tbl[]; /* PWR: unbound pwqs indexed by node */
0279 };
0280 
0281 static struct kmem_cache *pwq_cache;
0282 
0283 static cpumask_var_t *wq_numa_possible_cpumask;
0284                     /* possible CPUs of each node */
0285 
0286 static bool wq_disable_numa;
0287 module_param_named(disable_numa, wq_disable_numa, bool, 0444);
0288 
0289 /* see the comment above the definition of WQ_POWER_EFFICIENT */
0290 static bool wq_power_efficient = IS_ENABLED(CONFIG_WQ_POWER_EFFICIENT_DEFAULT);
0291 module_param_named(power_efficient, wq_power_efficient, bool, 0444);
0292 
0293 static bool wq_online;          /* can kworkers be created yet? */
0294 
0295 static bool wq_numa_enabled;        /* unbound NUMA affinity enabled */
0296 
0297 /* buf for wq_update_unbound_numa_attrs(), protected by CPU hotplug exclusion */
0298 static struct workqueue_attrs *wq_update_unbound_numa_attrs_buf;
0299 
0300 static DEFINE_MUTEX(wq_pool_mutex); /* protects pools and workqueues list */
0301 static DEFINE_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */
0302 
0303 static LIST_HEAD(workqueues);       /* PR: list of all workqueues */
0304 static bool workqueue_freezing;     /* PL: have wqs started freezing? */
0305 
0306 /* PL: allowable cpus for unbound wqs and work items */
0307 static cpumask_var_t wq_unbound_cpumask;
0308 
0309 /* CPU where unbound work was last round robin scheduled from this CPU */
0310 static DEFINE_PER_CPU(int, wq_rr_cpu_last);
0311 
0312 /*
0313  * Local execution of unbound work items is no longer guaranteed.  The
0314  * following always forces round-robin CPU selection on unbound work items
0315  * to uncover usages which depend on it.
0316  */
0317 #ifdef CONFIG_DEBUG_WQ_FORCE_RR_CPU
0318 static bool wq_debug_force_rr_cpu = true;
0319 #else
0320 static bool wq_debug_force_rr_cpu = false;
0321 #endif
0322 module_param_named(debug_force_rr_cpu, wq_debug_force_rr_cpu, bool, 0644);
0323 
0324 /* the per-cpu worker pools */
0325 static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);
0326 
0327 static DEFINE_IDR(worker_pool_idr); /* PR: idr of all pools */
0328 
0329 /* PL: hash of all unbound pools keyed by pool->attrs */
0330 static DEFINE_HASHTABLE(unbound_pool_hash, UNBOUND_POOL_HASH_ORDER);
0331 
0332 /* I: attributes used when instantiating standard unbound pools on demand */
0333 static struct workqueue_attrs *unbound_std_wq_attrs[NR_STD_WORKER_POOLS];
0334 
0335 /* I: attributes used when instantiating ordered pools on demand */
0336 static struct workqueue_attrs *ordered_wq_attrs[NR_STD_WORKER_POOLS];
0337 
0338 struct workqueue_struct *system_wq __read_mostly;
0339 EXPORT_SYMBOL(system_wq);
0340 struct workqueue_struct *system_highpri_wq __read_mostly;
0341 EXPORT_SYMBOL_GPL(system_highpri_wq);
0342 struct workqueue_struct *system_long_wq __read_mostly;
0343 EXPORT_SYMBOL_GPL(system_long_wq);
0344 struct workqueue_struct *system_unbound_wq __read_mostly;
0345 EXPORT_SYMBOL_GPL(system_unbound_wq);
0346 struct workqueue_struct *system_freezable_wq __read_mostly;
0347 EXPORT_SYMBOL_GPL(system_freezable_wq);
0348 struct workqueue_struct *system_power_efficient_wq __read_mostly;
0349 EXPORT_SYMBOL_GPL(system_power_efficient_wq);
0350 struct workqueue_struct *system_freezable_power_efficient_wq __read_mostly;
0351 EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq);
0352 
0353 static int worker_thread(void *__worker);
0354 static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
0355 
0356 #define CREATE_TRACE_POINTS
0357 #include <trace/events/workqueue.h>
0358 
0359 #define assert_rcu_or_pool_mutex()                  \
0360     RCU_LOCKDEP_WARN(!rcu_read_lock_sched_held() &&         \
0361              !lockdep_is_held(&wq_pool_mutex),      \
0362              "sched RCU or wq_pool_mutex should be held")
0363 
0364 #define assert_rcu_or_wq_mutex(wq)                  \
0365     RCU_LOCKDEP_WARN(!rcu_read_lock_sched_held() &&         \
0366              !lockdep_is_held(&wq->mutex),          \
0367              "sched RCU or wq->mutex should be held")
0368 
0369 #define assert_rcu_or_wq_mutex_or_pool_mutex(wq)            \
0370     RCU_LOCKDEP_WARN(!rcu_read_lock_sched_held() &&         \
0371              !lockdep_is_held(&wq->mutex) &&        \
0372              !lockdep_is_held(&wq_pool_mutex),      \
0373              "sched RCU, wq->mutex or wq_pool_mutex should be held")
0374 
0375 #define for_each_cpu_worker_pool(pool, cpu)             \
0376     for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0];       \
0377          (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
0378          (pool)++)
0379 
0380 /**
0381  * for_each_pool - iterate through all worker_pools in the system
0382  * @pool: iteration cursor
0383  * @pi: integer used for iteration
0384  *
0385  * This must be called either with wq_pool_mutex held or sched RCU read
0386  * locked.  If the pool needs to be used beyond the locking in effect, the
0387  * caller is responsible for guaranteeing that the pool stays online.
0388  *
0389  * The if/else clause exists only for the lockdep assertion and can be
0390  * ignored.
0391  */
0392 #define for_each_pool(pool, pi)                     \
0393     idr_for_each_entry(&worker_pool_idr, pool, pi)          \
0394         if (({ assert_rcu_or_pool_mutex(); false; })) { }   \
0395         else
0396 
0397 /**
0398  * for_each_pool_worker - iterate through all workers of a worker_pool
0399  * @worker: iteration cursor
0400  * @pool: worker_pool to iterate workers of
0401  *
0402  * This must be called with @pool->attach_mutex.
0403  *
0404  * The if/else clause exists only for the lockdep assertion and can be
0405  * ignored.
0406  */
0407 #define for_each_pool_worker(worker, pool)              \
0408     list_for_each_entry((worker), &(pool)->workers, node)       \
0409         if (({ lockdep_assert_held(&pool->attach_mutex); false; })) { } \
0410         else
0411 
0412 /**
0413  * for_each_pwq - iterate through all pool_workqueues of the specified workqueue
0414  * @pwq: iteration cursor
0415  * @wq: the target workqueue
0416  *
0417  * This must be called either with wq->mutex held or sched RCU read locked.
0418  * If the pwq needs to be used beyond the locking in effect, the caller is
0419  * responsible for guaranteeing that the pwq stays online.
0420  *
0421  * The if/else clause exists only for the lockdep assertion and can be
0422  * ignored.
0423  */
0424 #define for_each_pwq(pwq, wq)                       \
0425     list_for_each_entry_rcu((pwq), &(wq)->pwqs, pwqs_node)      \
0426         if (({ assert_rcu_or_wq_mutex(wq); false; })) { }   \
0427         else
0428 
0429 #ifdef CONFIG_DEBUG_OBJECTS_WORK
0430 
0431 static struct debug_obj_descr work_debug_descr;
0432 
0433 static void *work_debug_hint(void *addr)
0434 {
0435     return ((struct work_struct *) addr)->func;
0436 }
0437 
0438 static bool work_is_static_object(void *addr)
0439 {
0440     struct work_struct *work = addr;
0441 
0442     return test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work));
0443 }
0444 
0445 /*
0446  * fixup_init is called when:
0447  * - an active object is initialized
0448  */
0449 static bool work_fixup_init(void *addr, enum debug_obj_state state)
0450 {
0451     struct work_struct *work = addr;
0452 
0453     switch (state) {
0454     case ODEBUG_STATE_ACTIVE:
0455         cancel_work_sync(work);
0456         debug_object_init(work, &work_debug_descr);
0457         return true;
0458     default:
0459         return false;
0460     }
0461 }
0462 
0463 /*
0464  * fixup_free is called when:
0465  * - an active object is freed
0466  */
0467 static bool work_fixup_free(void *addr, enum debug_obj_state state)
0468 {
0469     struct work_struct *work = addr;
0470 
0471     switch (state) {
0472     case ODEBUG_STATE_ACTIVE:
0473         cancel_work_sync(work);
0474         debug_object_free(work, &work_debug_descr);
0475         return true;
0476     default:
0477         return false;
0478     }
0479 }
0480 
0481 static struct debug_obj_descr work_debug_descr = {
0482     .name       = "work_struct",
0483     .debug_hint = work_debug_hint,
0484     .is_static_object = work_is_static_object,
0485     .fixup_init = work_fixup_init,
0486     .fixup_free = work_fixup_free,
0487 };
0488 
0489 static inline void debug_work_activate(struct work_struct *work)
0490 {
0491     debug_object_activate(work, &work_debug_descr);
0492 }
0493 
0494 static inline void debug_work_deactivate(struct work_struct *work)
0495 {
0496     debug_object_deactivate(work, &work_debug_descr);
0497 }
0498 
0499 void __init_work(struct work_struct *work, int onstack)
0500 {
0501     if (onstack)
0502         debug_object_init_on_stack(work, &work_debug_descr);
0503     else
0504         debug_object_init(work, &work_debug_descr);
0505 }
0506 EXPORT_SYMBOL_GPL(__init_work);
0507 
0508 void destroy_work_on_stack(struct work_struct *work)
0509 {
0510     debug_object_free(work, &work_debug_descr);
0511 }
0512 EXPORT_SYMBOL_GPL(destroy_work_on_stack);
0513 
0514 void destroy_delayed_work_on_stack(struct delayed_work *work)
0515 {
0516     destroy_timer_on_stack(&work->timer);
0517     debug_object_free(&work->work, &work_debug_descr);
0518 }
0519 EXPORT_SYMBOL_GPL(destroy_delayed_work_on_stack);
0520 
0521 #else
0522 static inline void debug_work_activate(struct work_struct *work) { }
0523 static inline void debug_work_deactivate(struct work_struct *work) { }
0524 #endif
0525 
0526 /**
0527  * worker_pool_assign_id - allocate ID and assing it to @pool
0528  * @pool: the pool pointer of interest
0529  *
0530  * Returns 0 if ID in [0, WORK_OFFQ_POOL_NONE) is allocated and assigned
0531  * successfully, -errno on failure.
0532  */
0533 static int worker_pool_assign_id(struct worker_pool *pool)
0534 {
0535     int ret;
0536 
0537     lockdep_assert_held(&wq_pool_mutex);
0538 
0539     ret = idr_alloc(&worker_pool_idr, pool, 0, WORK_OFFQ_POOL_NONE,
0540             GFP_KERNEL);
0541     if (ret >= 0) {
0542         pool->id = ret;
0543         return 0;
0544     }
0545     return ret;
0546 }
0547 
0548 /**
0549  * unbound_pwq_by_node - return the unbound pool_workqueue for the given node
0550  * @wq: the target workqueue
0551  * @node: the node ID
0552  *
0553  * This must be called with any of wq_pool_mutex, wq->mutex or sched RCU
0554  * read locked.
0555  * If the pwq needs to be used beyond the locking in effect, the caller is
0556  * responsible for guaranteeing that the pwq stays online.
0557  *
0558  * Return: The unbound pool_workqueue for @node.
0559  */
0560 static struct pool_workqueue *unbound_pwq_by_node(struct workqueue_struct *wq,
0561                           int node)
0562 {
0563     assert_rcu_or_wq_mutex_or_pool_mutex(wq);
0564 
0565     /*
0566      * XXX: @node can be NUMA_NO_NODE if CPU goes offline while a
0567      * delayed item is pending.  The plan is to keep CPU -> NODE
0568      * mapping valid and stable across CPU on/offlines.  Once that
0569      * happens, this workaround can be removed.
0570      */
0571     if (unlikely(node == NUMA_NO_NODE))
0572         return wq->dfl_pwq;
0573 
0574     return rcu_dereference_raw(wq->numa_pwq_tbl[node]);
0575 }
0576 
0577 static unsigned int work_color_to_flags(int color)
0578 {
0579     return color << WORK_STRUCT_COLOR_SHIFT;
0580 }
0581 
0582 static int get_work_color(struct work_struct *work)
0583 {
0584     return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) &
0585         ((1 << WORK_STRUCT_COLOR_BITS) - 1);
0586 }
0587 
0588 static int work_next_color(int color)
0589 {
0590     return (color + 1) % WORK_NR_COLORS;
0591 }
0592 
0593 /*
0594  * While queued, %WORK_STRUCT_PWQ is set and non flag bits of a work's data
0595  * contain the pointer to the queued pwq.  Once execution starts, the flag
0596  * is cleared and the high bits contain OFFQ flags and pool ID.
0597  *
0598  * set_work_pwq(), set_work_pool_and_clear_pending(), mark_work_canceling()
0599  * and clear_work_data() can be used to set the pwq, pool or clear
0600  * work->data.  These functions should only be called while the work is
0601  * owned - ie. while the PENDING bit is set.
0602  *
0603  * get_work_pool() and get_work_pwq() can be used to obtain the pool or pwq
0604  * corresponding to a work.  Pool is available once the work has been
0605  * queued anywhere after initialization until it is sync canceled.  pwq is
0606  * available only while the work item is queued.
0607  *
0608  * %WORK_OFFQ_CANCELING is used to mark a work item which is being
0609  * canceled.  While being canceled, a work item may have its PENDING set
0610  * but stay off timer and worklist for arbitrarily long and nobody should
0611  * try to steal the PENDING bit.
0612  */
0613 static inline void set_work_data(struct work_struct *work, unsigned long data,
0614                  unsigned long flags)
0615 {
0616     WARN_ON_ONCE(!work_pending(work));
0617     atomic_long_set(&work->data, data | flags | work_static(work));
0618 }
0619 
0620 static void set_work_pwq(struct work_struct *work, struct pool_workqueue *pwq,
0621              unsigned long extra_flags)
0622 {
0623     set_work_data(work, (unsigned long)pwq,
0624               WORK_STRUCT_PENDING | WORK_STRUCT_PWQ | extra_flags);
0625 }
0626 
0627 static void set_work_pool_and_keep_pending(struct work_struct *work,
0628                        int pool_id)
0629 {
0630     set_work_data(work, (unsigned long)pool_id << WORK_OFFQ_POOL_SHIFT,
0631               WORK_STRUCT_PENDING);
0632 }
0633 
0634 static void set_work_pool_and_clear_pending(struct work_struct *work,
0635                         int pool_id)
0636 {
0637     /*
0638      * The following wmb is paired with the implied mb in
0639      * test_and_set_bit(PENDING) and ensures all updates to @work made
0640      * here are visible to and precede any updates by the next PENDING
0641      * owner.
0642      */
0643     smp_wmb();
0644     set_work_data(work, (unsigned long)pool_id << WORK_OFFQ_POOL_SHIFT, 0);
0645     /*
0646      * The following mb guarantees that previous clear of a PENDING bit
0647      * will not be reordered with any speculative LOADS or STORES from
0648      * work->current_func, which is executed afterwards.  This possible
0649      * reordering can lead to a missed execution on attempt to qeueue
0650      * the same @work.  E.g. consider this case:
0651      *
0652      *   CPU#0                         CPU#1
0653      *   ----------------------------  --------------------------------
0654      *
0655      * 1  STORE event_indicated
0656      * 2  queue_work_on() {
0657      * 3    test_and_set_bit(PENDING)
0658      * 4 }                             set_..._and_clear_pending() {
0659      * 5                                 set_work_data() # clear bit
0660      * 6                                 smp_mb()
0661      * 7                               work->current_func() {
0662      * 8                      LOAD event_indicated
0663      *                 }
0664      *
0665      * Without an explicit full barrier speculative LOAD on line 8 can
0666      * be executed before CPU#0 does STORE on line 1.  If that happens,
0667      * CPU#0 observes the PENDING bit is still set and new execution of
0668      * a @work is not queued in a hope, that CPU#1 will eventually
0669      * finish the queued @work.  Meanwhile CPU#1 does not see
0670      * event_indicated is set, because speculative LOAD was executed
0671      * before actual STORE.
0672      */
0673     smp_mb();
0674 }
0675 
0676 static void clear_work_data(struct work_struct *work)
0677 {
0678     smp_wmb();  /* see set_work_pool_and_clear_pending() */
0679     set_work_data(work, WORK_STRUCT_NO_POOL, 0);
0680 }
0681 
0682 static struct pool_workqueue *get_work_pwq(struct work_struct *work)
0683 {
0684     unsigned long data = atomic_long_read(&work->data);
0685 
0686     if (data & WORK_STRUCT_PWQ)
0687         return (void *)(data & WORK_STRUCT_WQ_DATA_MASK);
0688     else
0689         return NULL;
0690 }
0691 
0692 /**
0693  * get_work_pool - return the worker_pool a given work was associated with
0694  * @work: the work item of interest
0695  *
0696  * Pools are created and destroyed under wq_pool_mutex, and allows read
0697  * access under sched-RCU read lock.  As such, this function should be
0698  * called under wq_pool_mutex or with preemption disabled.
0699  *
0700  * All fields of the returned pool are accessible as long as the above
0701  * mentioned locking is in effect.  If the returned pool needs to be used
0702  * beyond the critical section, the caller is responsible for ensuring the
0703  * returned pool is and stays online.
0704  *
0705  * Return: The worker_pool @work was last associated with.  %NULL if none.
0706  */
0707 static struct worker_pool *get_work_pool(struct work_struct *work)
0708 {
0709     unsigned long data = atomic_long_read(&work->data);
0710     int pool_id;
0711 
0712     assert_rcu_or_pool_mutex();
0713 
0714     if (data & WORK_STRUCT_PWQ)
0715         return ((struct pool_workqueue *)
0716             (data & WORK_STRUCT_WQ_DATA_MASK))->pool;
0717 
0718     pool_id = data >> WORK_OFFQ_POOL_SHIFT;
0719     if (pool_id == WORK_OFFQ_POOL_NONE)
0720         return NULL;
0721 
0722     return idr_find(&worker_pool_idr, pool_id);
0723 }
0724 
0725 /**
0726  * get_work_pool_id - return the worker pool ID a given work is associated with
0727  * @work: the work item of interest
0728  *
0729  * Return: The worker_pool ID @work was last associated with.
0730  * %WORK_OFFQ_POOL_NONE if none.
0731  */
0732 static int get_work_pool_id(struct work_struct *work)
0733 {
0734     unsigned long data = atomic_long_read(&work->data);
0735 
0736     if (data & WORK_STRUCT_PWQ)
0737         return ((struct pool_workqueue *)
0738             (data & WORK_STRUCT_WQ_DATA_MASK))->pool->id;
0739 
0740     return data >> WORK_OFFQ_POOL_SHIFT;
0741 }
0742 
0743 static void mark_work_canceling(struct work_struct *work)
0744 {
0745     unsigned long pool_id = get_work_pool_id(work);
0746 
0747     pool_id <<= WORK_OFFQ_POOL_SHIFT;
0748     set_work_data(work, pool_id | WORK_OFFQ_CANCELING, WORK_STRUCT_PENDING);
0749 }
0750 
0751 static bool work_is_canceling(struct work_struct *work)
0752 {
0753     unsigned long data = atomic_long_read(&work->data);
0754 
0755     return !(data & WORK_STRUCT_PWQ) && (data & WORK_OFFQ_CANCELING);
0756 }
0757 
0758 /*
0759  * Policy functions.  These define the policies on how the global worker
0760  * pools are managed.  Unless noted otherwise, these functions assume that
0761  * they're being called with pool->lock held.
0762  */
0763 
0764 static bool __need_more_worker(struct worker_pool *pool)
0765 {
0766     return !atomic_read(&pool->nr_running);
0767 }
0768 
0769 /*
0770  * Need to wake up a worker?  Called from anything but currently
0771  * running workers.
0772  *
0773  * Note that, because unbound workers never contribute to nr_running, this
0774  * function will always return %true for unbound pools as long as the
0775  * worklist isn't empty.
0776  */
0777 static bool need_more_worker(struct worker_pool *pool)
0778 {
0779     return !list_empty(&pool->worklist) && __need_more_worker(pool);
0780 }
0781 
0782 /* Can I start working?  Called from busy but !running workers. */
0783 static bool may_start_working(struct worker_pool *pool)
0784 {
0785     return pool->nr_idle;
0786 }
0787 
0788 /* Do I need to keep working?  Called from currently running workers. */
0789 static bool keep_working(struct worker_pool *pool)
0790 {
0791     return !list_empty(&pool->worklist) &&
0792         atomic_read(&pool->nr_running) <= 1;
0793 }
0794 
0795 /* Do we need a new worker?  Called from manager. */
0796 static bool need_to_create_worker(struct worker_pool *pool)
0797 {
0798     return need_more_worker(pool) && !may_start_working(pool);
0799 }
0800 
0801 /* Do we have too many workers and should some go away? */
0802 static bool too_many_workers(struct worker_pool *pool)
0803 {
0804     bool managing = mutex_is_locked(&pool->manager_arb);
0805     int nr_idle = pool->nr_idle + managing; /* manager is considered idle */
0806     int nr_busy = pool->nr_workers - nr_idle;
0807 
0808     return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy;
0809 }
0810 
0811 /*
0812  * Wake up functions.
0813  */
0814 
0815 /* Return the first idle worker.  Safe with preemption disabled */
0816 static struct worker *first_idle_worker(struct worker_pool *pool)
0817 {
0818     if (unlikely(list_empty(&pool->idle_list)))
0819         return NULL;
0820 
0821     return list_first_entry(&pool->idle_list, struct worker, entry);
0822 }
0823 
0824 /**
0825  * wake_up_worker - wake up an idle worker
0826  * @pool: worker pool to wake worker from
0827  *
0828  * Wake up the first idle worker of @pool.
0829  *
0830  * CONTEXT:
0831  * spin_lock_irq(pool->lock).
0832  */
0833 static void wake_up_worker(struct worker_pool *pool)
0834 {
0835     struct worker *worker = first_idle_worker(pool);
0836 
0837     if (likely(worker))
0838         wake_up_process(worker->task);
0839 }
0840 
0841 /**
0842  * wq_worker_waking_up - a worker is waking up
0843  * @task: task waking up
0844  * @cpu: CPU @task is waking up to
0845  *
0846  * This function is called during try_to_wake_up() when a worker is
0847  * being awoken.
0848  *
0849  * CONTEXT:
0850  * spin_lock_irq(rq->lock)
0851  */
0852 void wq_worker_waking_up(struct task_struct *task, int cpu)
0853 {
0854     struct worker *worker = kthread_data(task);
0855 
0856     if (!(worker->flags & WORKER_NOT_RUNNING)) {
0857         WARN_ON_ONCE(worker->pool->cpu != cpu);
0858         atomic_inc(&worker->pool->nr_running);
0859     }
0860 }
0861 
0862 /**
0863  * wq_worker_sleeping - a worker is going to sleep
0864  * @task: task going to sleep
0865  *
0866  * This function is called during schedule() when a busy worker is
0867  * going to sleep.  Worker on the same cpu can be woken up by
0868  * returning pointer to its task.
0869  *
0870  * CONTEXT:
0871  * spin_lock_irq(rq->lock)
0872  *
0873  * Return:
0874  * Worker task on @cpu to wake up, %NULL if none.
0875  */
0876 struct task_struct *wq_worker_sleeping(struct task_struct *task)
0877 {
0878     struct worker *worker = kthread_data(task), *to_wakeup = NULL;
0879     struct worker_pool *pool;
0880 
0881     /*
0882      * Rescuers, which may not have all the fields set up like normal
0883      * workers, also reach here, let's not access anything before
0884      * checking NOT_RUNNING.
0885      */
0886     if (worker->flags & WORKER_NOT_RUNNING)
0887         return NULL;
0888 
0889     pool = worker->pool;
0890 
0891     /* this can only happen on the local cpu */
0892     if (WARN_ON_ONCE(pool->cpu != raw_smp_processor_id()))
0893         return NULL;
0894 
0895     /*
0896      * The counterpart of the following dec_and_test, implied mb,
0897      * worklist not empty test sequence is in insert_work().
0898      * Please read comment there.
0899      *
0900      * NOT_RUNNING is clear.  This means that we're bound to and
0901      * running on the local cpu w/ rq lock held and preemption
0902      * disabled, which in turn means that none else could be
0903      * manipulating idle_list, so dereferencing idle_list without pool
0904      * lock is safe.
0905      */
0906     if (atomic_dec_and_test(&pool->nr_running) &&
0907         !list_empty(&pool->worklist))
0908         to_wakeup = first_idle_worker(pool);
0909     return to_wakeup ? to_wakeup->task : NULL;
0910 }
0911 
0912 /**
0913  * worker_set_flags - set worker flags and adjust nr_running accordingly
0914  * @worker: self
0915  * @flags: flags to set
0916  *
0917  * Set @flags in @worker->flags and adjust nr_running accordingly.
0918  *
0919  * CONTEXT:
0920  * spin_lock_irq(pool->lock)
0921  */
0922 static inline void worker_set_flags(struct worker *worker, unsigned int flags)
0923 {
0924     struct worker_pool *pool = worker->pool;
0925 
0926     WARN_ON_ONCE(worker->task != current);
0927 
0928     /* If transitioning into NOT_RUNNING, adjust nr_running. */
0929     if ((flags & WORKER_NOT_RUNNING) &&
0930         !(worker->flags & WORKER_NOT_RUNNING)) {
0931         atomic_dec(&pool->nr_running);
0932     }
0933 
0934     worker->flags |= flags;
0935 }
0936 
0937 /**
0938  * worker_clr_flags - clear worker flags and adjust nr_running accordingly
0939  * @worker: self
0940  * @flags: flags to clear
0941  *
0942  * Clear @flags in @worker->flags and adjust nr_running accordingly.
0943  *
0944  * CONTEXT:
0945  * spin_lock_irq(pool->lock)
0946  */
0947 static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
0948 {
0949     struct worker_pool *pool = worker->pool;
0950     unsigned int oflags = worker->flags;
0951 
0952     WARN_ON_ONCE(worker->task != current);
0953 
0954     worker->flags &= ~flags;
0955 
0956     /*
0957      * If transitioning out of NOT_RUNNING, increment nr_running.  Note
0958      * that the nested NOT_RUNNING is not a noop.  NOT_RUNNING is mask
0959      * of multiple flags, not a single flag.
0960      */
0961     if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING))
0962         if (!(worker->flags & WORKER_NOT_RUNNING))
0963             atomic_inc(&pool->nr_running);
0964 }
0965 
0966 /**
0967  * find_worker_executing_work - find worker which is executing a work
0968  * @pool: pool of interest
0969  * @work: work to find worker for
0970  *
0971  * Find a worker which is executing @work on @pool by searching
0972  * @pool->busy_hash which is keyed by the address of @work.  For a worker
0973  * to match, its current execution should match the address of @work and
0974  * its work function.  This is to avoid unwanted dependency between
0975  * unrelated work executions through a work item being recycled while still
0976  * being executed.
0977  *
0978  * This is a bit tricky.  A work item may be freed once its execution
0979  * starts and nothing prevents the freed area from being recycled for
0980  * another work item.  If the same work item address ends up being reused
0981  * before the original execution finishes, workqueue will identify the
0982  * recycled work item as currently executing and make it wait until the
0983  * current execution finishes, introducing an unwanted dependency.
0984  *
0985  * This function checks the work item address and work function to avoid
0986  * false positives.  Note that this isn't complete as one may construct a
0987  * work function which can introduce dependency onto itself through a
0988  * recycled work item.  Well, if somebody wants to shoot oneself in the
0989  * foot that badly, there's only so much we can do, and if such deadlock
0990  * actually occurs, it should be easy to locate the culprit work function.
0991  *
0992  * CONTEXT:
0993  * spin_lock_irq(pool->lock).
0994  *
0995  * Return:
0996  * Pointer to worker which is executing @work if found, %NULL
0997  * otherwise.
0998  */
0999 static struct worker *find_worker_executing_work(struct worker_pool *pool,
1000                          struct work_struct *work)
1001 {
1002     struct worker *worker;
1003 
1004     hash_for_each_possible(pool->busy_hash, worker, hentry,
1005                    (unsigned long)work)
1006         if (worker->current_work == work &&
1007             worker->current_func == work->func)
1008             return worker;
1009 
1010     return NULL;
1011 }
1012 
1013 /**
1014  * move_linked_works - move linked works to a list
1015  * @work: start of series of works to be scheduled
1016  * @head: target list to append @work to
1017  * @nextp: out parameter for nested worklist walking
1018  *
1019  * Schedule linked works starting from @work to @head.  Work series to
1020  * be scheduled starts at @work and includes any consecutive work with
1021  * WORK_STRUCT_LINKED set in its predecessor.
1022  *
1023  * If @nextp is not NULL, it's updated to point to the next work of
1024  * the last scheduled work.  This allows move_linked_works() to be
1025  * nested inside outer list_for_each_entry_safe().
1026  *
1027  * CONTEXT:
1028  * spin_lock_irq(pool->lock).
1029  */
1030 static void move_linked_works(struct work_struct *work, struct list_head *head,
1031                   struct work_struct **nextp)
1032 {
1033     struct work_struct *n;
1034 
1035     /*
1036      * Linked worklist will always end before the end of the list,
1037      * use NULL for list head.
1038      */
1039     list_for_each_entry_safe_from(work, n, NULL, entry) {
1040         list_move_tail(&work->entry, head);
1041         if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
1042             break;
1043     }
1044 
1045     /*
1046      * If we're already inside safe list traversal and have moved
1047      * multiple works to the scheduled queue, the next position
1048      * needs to be updated.
1049      */
1050     if (nextp)
1051         *nextp = n;
1052 }
1053 
1054 /**
1055  * get_pwq - get an extra reference on the specified pool_workqueue
1056  * @pwq: pool_workqueue to get
1057  *
1058  * Obtain an extra reference on @pwq.  The caller should guarantee that
1059  * @pwq has positive refcnt and be holding the matching pool->lock.
1060  */
1061 static void get_pwq(struct pool_workqueue *pwq)
1062 {
1063     lockdep_assert_held(&pwq->pool->lock);
1064     WARN_ON_ONCE(pwq->refcnt <= 0);
1065     pwq->refcnt++;
1066 }
1067 
1068 /**
1069  * put_pwq - put a pool_workqueue reference
1070  * @pwq: pool_workqueue to put
1071  *
1072  * Drop a reference of @pwq.  If its refcnt reaches zero, schedule its
1073  * destruction.  The caller should be holding the matching pool->lock.
1074  */
1075 static void put_pwq(struct pool_workqueue *pwq)
1076 {
1077     lockdep_assert_held(&pwq->pool->lock);
1078     if (likely(--pwq->refcnt))
1079         return;
1080     if (WARN_ON_ONCE(!(pwq->wq->flags & WQ_UNBOUND)))
1081         return;
1082     /*
1083      * @pwq can't be released under pool->lock, bounce to
1084      * pwq_unbound_release_workfn().  This never recurses on the same
1085      * pool->lock as this path is taken only for unbound workqueues and
1086      * the release work item is scheduled on a per-cpu workqueue.  To
1087      * avoid lockdep warning, unbound pool->locks are given lockdep
1088      * subclass of 1 in get_unbound_pool().
1089      */
1090     schedule_work(&pwq->unbound_release_work);
1091 }
1092 
1093 /**
1094  * put_pwq_unlocked - put_pwq() with surrounding pool lock/unlock
1095  * @pwq: pool_workqueue to put (can be %NULL)
1096  *
1097  * put_pwq() with locking.  This function also allows %NULL @pwq.
1098  */
1099 static void put_pwq_unlocked(struct pool_workqueue *pwq)
1100 {
1101     if (pwq) {
1102         /*
1103          * As both pwqs and pools are sched-RCU protected, the
1104          * following lock operations are safe.
1105          */
1106         spin_lock_irq(&pwq->pool->lock);
1107         put_pwq(pwq);
1108         spin_unlock_irq(&pwq->pool->lock);
1109     }
1110 }
1111 
1112 static void pwq_activate_delayed_work(struct work_struct *work)
1113 {
1114     struct pool_workqueue *pwq = get_work_pwq(work);
1115 
1116     trace_workqueue_activate_work(work);
1117     if (list_empty(&pwq->pool->worklist))
1118         pwq->pool->watchdog_ts = jiffies;
1119     move_linked_works(work, &pwq->pool->worklist, NULL);
1120     __clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work));
1121     pwq->nr_active++;
1122 }
1123 
1124 static void pwq_activate_first_delayed(struct pool_workqueue *pwq)
1125 {
1126     struct work_struct *work = list_first_entry(&pwq->delayed_works,
1127                             struct work_struct, entry);
1128 
1129     pwq_activate_delayed_work(work);
1130 }
1131 
1132 /**
1133  * pwq_dec_nr_in_flight - decrement pwq's nr_in_flight
1134  * @pwq: pwq of interest
1135  * @color: color of work which left the queue
1136  *
1137  * A work either has completed or is removed from pending queue,
1138  * decrement nr_in_flight of its pwq and handle workqueue flushing.
1139  *
1140  * CONTEXT:
1141  * spin_lock_irq(pool->lock).
1142  */
1143 static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, int color)
1144 {
1145     /* uncolored work items don't participate in flushing or nr_active */
1146     if (color == WORK_NO_COLOR)
1147         goto out_put;
1148 
1149     pwq->nr_in_flight[color]--;
1150 
1151     pwq->nr_active--;
1152     if (!list_empty(&pwq->delayed_works)) {
1153         /* one down, submit a delayed one */
1154         if (pwq->nr_active < pwq->max_active)
1155             pwq_activate_first_delayed(pwq);
1156     }
1157 
1158     /* is flush in progress and are we at the flushing tip? */
1159     if (likely(pwq->flush_color != color))
1160         goto out_put;
1161 
1162     /* are there still in-flight works? */
1163     if (pwq->nr_in_flight[color])
1164         goto out_put;
1165 
1166     /* this pwq is done, clear flush_color */
1167     pwq->flush_color = -1;
1168 
1169     /*
1170      * If this was the last pwq, wake up the first flusher.  It
1171      * will handle the rest.
1172      */
1173     if (atomic_dec_and_test(&pwq->wq->nr_pwqs_to_flush))
1174         complete(&pwq->wq->first_flusher->done);
1175 out_put:
1176     put_pwq(pwq);
1177 }
1178 
1179 /**
1180  * try_to_grab_pending - steal work item from worklist and disable irq
1181  * @work: work item to steal
1182  * @is_dwork: @work is a delayed_work
1183  * @flags: place to store irq state
1184  *
1185  * Try to grab PENDING bit of @work.  This function can handle @work in any
1186  * stable state - idle, on timer or on worklist.
1187  *
1188  * Return:
1189  *  1       if @work was pending and we successfully stole PENDING
1190  *  0       if @work was idle and we claimed PENDING
1191  *  -EAGAIN if PENDING couldn't be grabbed at the moment, safe to busy-retry
1192  *  -ENOENT if someone else is canceling @work, this state may persist
1193  *      for arbitrarily long
1194  *
1195  * Note:
1196  * On >= 0 return, the caller owns @work's PENDING bit.  To avoid getting
1197  * interrupted while holding PENDING and @work off queue, irq must be
1198  * disabled on entry.  This, combined with delayed_work->timer being
1199  * irqsafe, ensures that we return -EAGAIN for finite short period of time.
1200  *
1201  * On successful return, >= 0, irq is disabled and the caller is
1202  * responsible for releasing it using local_irq_restore(*@flags).
1203  *
1204  * This function is safe to call from any context including IRQ handler.
1205  */
1206 static int try_to_grab_pending(struct work_struct *work, bool is_dwork,
1207                    unsigned long *flags)
1208 {
1209     struct worker_pool *pool;
1210     struct pool_workqueue *pwq;
1211 
1212     local_irq_save(*flags);
1213 
1214     /* try to steal the timer if it exists */
1215     if (is_dwork) {
1216         struct delayed_work *dwork = to_delayed_work(work);
1217 
1218         /*
1219          * dwork->timer is irqsafe.  If del_timer() fails, it's
1220          * guaranteed that the timer is not queued anywhere and not
1221          * running on the local CPU.
1222          */
1223         if (likely(del_timer(&dwork->timer)))
1224             return 1;
1225     }
1226 
1227     /* try to claim PENDING the normal way */
1228     if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
1229         return 0;
1230 
1231     /*
1232      * The queueing is in progress, or it is already queued. Try to
1233      * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
1234      */
1235     pool = get_work_pool(work);
1236     if (!pool)
1237         goto fail;
1238 
1239     spin_lock(&pool->lock);
1240     /*
1241      * work->data is guaranteed to point to pwq only while the work
1242      * item is queued on pwq->wq, and both updating work->data to point
1243      * to pwq on queueing and to pool on dequeueing are done under
1244      * pwq->pool->lock.  This in turn guarantees that, if work->data
1245      * points to pwq which is associated with a locked pool, the work
1246      * item is currently queued on that pool.
1247      */
1248     pwq = get_work_pwq(work);
1249     if (pwq && pwq->pool == pool) {
1250         debug_work_deactivate(work);
1251 
1252         /*
1253          * A delayed work item cannot be grabbed directly because
1254          * it might have linked NO_COLOR work items which, if left
1255          * on the delayed_list, will confuse pwq->nr_active
1256          * management later on and cause stall.  Make sure the work
1257          * item is activated before grabbing.
1258          */
1259         if (*work_data_bits(work) & WORK_STRUCT_DELAYED)
1260             pwq_activate_delayed_work(work);
1261 
1262         list_del_init(&work->entry);
1263         pwq_dec_nr_in_flight(pwq, get_work_color(work));
1264 
1265         /* work->data points to pwq iff queued, point to pool */
1266         set_work_pool_and_keep_pending(work, pool->id);
1267 
1268         spin_unlock(&pool->lock);
1269         return 1;
1270     }
1271     spin_unlock(&pool->lock);
1272 fail:
1273     local_irq_restore(*flags);
1274     if (work_is_canceling(work))
1275         return -ENOENT;
1276     cpu_relax();
1277     return -EAGAIN;
1278 }
1279 
1280 /**
1281  * insert_work - insert a work into a pool
1282  * @pwq: pwq @work belongs to
1283  * @work: work to insert
1284  * @head: insertion point
1285  * @extra_flags: extra WORK_STRUCT_* flags to set
1286  *
1287  * Insert @work which belongs to @pwq after @head.  @extra_flags is or'd to
1288  * work_struct flags.
1289  *
1290  * CONTEXT:
1291  * spin_lock_irq(pool->lock).
1292  */
1293 static void insert_work(struct pool_workqueue *pwq, struct work_struct *work,
1294             struct list_head *head, unsigned int extra_flags)
1295 {
1296     struct worker_pool *pool = pwq->pool;
1297 
1298     /* we own @work, set data and link */
1299     set_work_pwq(work, pwq, extra_flags);
1300     list_add_tail(&work->entry, head);
1301     get_pwq(pwq);
1302 
1303     /*
1304      * Ensure either wq_worker_sleeping() sees the above
1305      * list_add_tail() or we see zero nr_running to avoid workers lying
1306      * around lazily while there are works to be processed.
1307      */
1308     smp_mb();
1309 
1310     if (__need_more_worker(pool))
1311         wake_up_worker(pool);
1312 }
1313 
1314 /*
1315  * Test whether @work is being queued from another work executing on the
1316  * same workqueue.
1317  */
1318 static bool is_chained_work(struct workqueue_struct *wq)
1319 {
1320     struct worker *worker;
1321 
1322     worker = current_wq_worker();
1323     /*
1324      * Return %true iff I'm a worker execuing a work item on @wq.  If
1325      * I'm @worker, it's safe to dereference it without locking.
1326      */
1327     return worker && worker->current_pwq->wq == wq;
1328 }
1329 
1330 /*
1331  * When queueing an unbound work item to a wq, prefer local CPU if allowed
1332  * by wq_unbound_cpumask.  Otherwise, round robin among the allowed ones to
1333  * avoid perturbing sensitive tasks.
1334  */
1335 static int wq_select_unbound_cpu(int cpu)
1336 {
1337     static bool printed_dbg_warning;
1338     int new_cpu;
1339 
1340     if (likely(!wq_debug_force_rr_cpu)) {
1341         if (cpumask_test_cpu(cpu, wq_unbound_cpumask))
1342             return cpu;
1343     } else if (!printed_dbg_warning) {
1344         pr_warn("workqueue: round-robin CPU selection forced, expect performance impact\n");
1345         printed_dbg_warning = true;
1346     }
1347 
1348     if (cpumask_empty(wq_unbound_cpumask))
1349         return cpu;
1350 
1351     new_cpu = __this_cpu_read(wq_rr_cpu_last);
1352     new_cpu = cpumask_next_and(new_cpu, wq_unbound_cpumask, cpu_online_mask);
1353     if (unlikely(new_cpu >= nr_cpu_ids)) {
1354         new_cpu = cpumask_first_and(wq_unbound_cpumask, cpu_online_mask);
1355         if (unlikely(new_cpu >= nr_cpu_ids))
1356             return cpu;
1357     }
1358     __this_cpu_write(wq_rr_cpu_last, new_cpu);
1359 
1360     return new_cpu;
1361 }
1362 
1363 static void __queue_work(int cpu, struct workqueue_struct *wq,
1364              struct work_struct *work)
1365 {
1366     struct pool_workqueue *pwq;
1367     struct worker_pool *last_pool;
1368     struct list_head *worklist;
1369     unsigned int work_flags;
1370     unsigned int req_cpu = cpu;
1371 
1372     /*
1373      * While a work item is PENDING && off queue, a task trying to
1374      * steal the PENDING will busy-loop waiting for it to either get
1375      * queued or lose PENDING.  Grabbing PENDING and queueing should
1376      * happen with IRQ disabled.
1377      */
1378     WARN_ON_ONCE(!irqs_disabled());
1379 
1380     debug_work_activate(work);
1381 
1382     /* if draining, only works from the same workqueue are allowed */
1383     if (unlikely(wq->flags & __WQ_DRAINING) &&
1384         WARN_ON_ONCE(!is_chained_work(wq)))
1385         return;
1386 retry:
1387     if (req_cpu == WORK_CPU_UNBOUND)
1388         cpu = wq_select_unbound_cpu(raw_smp_processor_id());
1389 
1390     /* pwq which will be used unless @work is executing elsewhere */
1391     if (!(wq->flags & WQ_UNBOUND))
1392         pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
1393     else
1394         pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
1395 
1396     /*
1397      * If @work was previously on a different pool, it might still be
1398      * running there, in which case the work needs to be queued on that
1399      * pool to guarantee non-reentrancy.
1400      */
1401     last_pool = get_work_pool(work);
1402     if (last_pool && last_pool != pwq->pool) {
1403         struct worker *worker;
1404 
1405         spin_lock(&last_pool->lock);
1406 
1407         worker = find_worker_executing_work(last_pool, work);
1408 
1409         if (worker && worker->current_pwq->wq == wq) {
1410             pwq = worker->current_pwq;
1411         } else {
1412             /* meh... not running there, queue here */
1413             spin_unlock(&last_pool->lock);
1414             spin_lock(&pwq->pool->lock);
1415         }
1416     } else {
1417         spin_lock(&pwq->pool->lock);
1418     }
1419 
1420     /*
1421      * pwq is determined and locked.  For unbound pools, we could have
1422      * raced with pwq release and it could already be dead.  If its
1423      * refcnt is zero, repeat pwq selection.  Note that pwqs never die
1424      * without another pwq replacing it in the numa_pwq_tbl or while
1425      * work items are executing on it, so the retrying is guaranteed to
1426      * make forward-progress.
1427      */
1428     if (unlikely(!pwq->refcnt)) {
1429         if (wq->flags & WQ_UNBOUND) {
1430             spin_unlock(&pwq->pool->lock);
1431             cpu_relax();
1432             goto retry;
1433         }
1434         /* oops */
1435         WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt",
1436               wq->name, cpu);
1437     }
1438 
1439     /* pwq determined, queue */
1440     trace_workqueue_queue_work(req_cpu, pwq, work);
1441 
1442     if (WARN_ON(!list_empty(&work->entry))) {
1443         spin_unlock(&pwq->pool->lock);
1444         return;
1445     }
1446 
1447     pwq->nr_in_flight[pwq->work_color]++;
1448     work_flags = work_color_to_flags(pwq->work_color);
1449 
1450     if (likely(pwq->nr_active < pwq->max_active)) {
1451         trace_workqueue_activate_work(work);
1452         pwq->nr_active++;
1453         worklist = &pwq->pool->worklist;
1454         if (list_empty(worklist))
1455             pwq->pool->watchdog_ts = jiffies;
1456     } else {
1457         work_flags |= WORK_STRUCT_DELAYED;
1458         worklist = &pwq->delayed_works;
1459     }
1460 
1461     insert_work(pwq, work, worklist, work_flags);
1462 
1463     spin_unlock(&pwq->pool->lock);
1464 }
1465 
1466 /**
1467  * queue_work_on - queue work on specific cpu
1468  * @cpu: CPU number to execute work on
1469  * @wq: workqueue to use
1470  * @work: work to queue
1471  *
1472  * We queue the work to a specific CPU, the caller must ensure it
1473  * can't go away.
1474  *
1475  * Return: %false if @work was already on a queue, %true otherwise.
1476  */
1477 bool queue_work_on(int cpu, struct workqueue_struct *wq,
1478            struct work_struct *work)
1479 {
1480     bool ret = false;
1481     unsigned long flags;
1482 
1483     local_irq_save(flags);
1484 
1485     if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1486         __queue_work(cpu, wq, work);
1487         ret = true;
1488     }
1489 
1490     local_irq_restore(flags);
1491     return ret;
1492 }
1493 EXPORT_SYMBOL(queue_work_on);
1494 
1495 void delayed_work_timer_fn(unsigned long __data)
1496 {
1497     struct delayed_work *dwork = (struct delayed_work *)__data;
1498 
1499     /* should have been called from irqsafe timer with irq already off */
1500     __queue_work(dwork->cpu, dwork->wq, &dwork->work);
1501 }
1502 EXPORT_SYMBOL(delayed_work_timer_fn);
1503 
1504 static void __queue_delayed_work(int cpu, struct workqueue_struct *wq,
1505                 struct delayed_work *dwork, unsigned long delay)
1506 {
1507     struct timer_list *timer = &dwork->timer;
1508     struct work_struct *work = &dwork->work;
1509 
1510     WARN_ON_ONCE(timer->function != delayed_work_timer_fn ||
1511              timer->data != (unsigned long)dwork);
1512     WARN_ON_ONCE(timer_pending(timer));
1513     WARN_ON_ONCE(!list_empty(&work->entry));
1514 
1515     /*
1516      * If @delay is 0, queue @dwork->work immediately.  This is for
1517      * both optimization and correctness.  The earliest @timer can
1518      * expire is on the closest next tick and delayed_work users depend
1519      * on that there's no such delay when @delay is 0.
1520      */
1521     if (!delay) {
1522         __queue_work(cpu, wq, &dwork->work);
1523         return;
1524     }
1525 
1526     timer_stats_timer_set_start_info(&dwork->timer);
1527 
1528     dwork->wq = wq;
1529     dwork->cpu = cpu;
1530     timer->expires = jiffies + delay;
1531 
1532     if (unlikely(cpu != WORK_CPU_UNBOUND))
1533         add_timer_on(timer, cpu);
1534     else
1535         add_timer(timer);
1536 }
1537 
1538 /**
1539  * queue_delayed_work_on - queue work on specific CPU after delay
1540  * @cpu: CPU number to execute work on
1541  * @wq: workqueue to use
1542  * @dwork: work to queue
1543  * @delay: number of jiffies to wait before queueing
1544  *
1545  * Return: %false if @work was already on a queue, %true otherwise.  If
1546  * @delay is zero and @dwork is idle, it will be scheduled for immediate
1547  * execution.
1548  */
1549 bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
1550                struct delayed_work *dwork, unsigned long delay)
1551 {
1552     struct work_struct *work = &dwork->work;
1553     bool ret = false;
1554     unsigned long flags;
1555 
1556     /* read the comment in __queue_work() */
1557     local_irq_save(flags);
1558 
1559     if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1560         __queue_delayed_work(cpu, wq, dwork, delay);
1561         ret = true;
1562     }
1563 
1564     local_irq_restore(flags);
1565     return ret;
1566 }
1567 EXPORT_SYMBOL(queue_delayed_work_on);
1568 
1569 /**
1570  * mod_delayed_work_on - modify delay of or queue a delayed work on specific CPU
1571  * @cpu: CPU number to execute work on
1572  * @wq: workqueue to use
1573  * @dwork: work to queue
1574  * @delay: number of jiffies to wait before queueing
1575  *
1576  * If @dwork is idle, equivalent to queue_delayed_work_on(); otherwise,
1577  * modify @dwork's timer so that it expires after @delay.  If @delay is
1578  * zero, @work is guaranteed to be scheduled immediately regardless of its
1579  * current state.
1580  *
1581  * Return: %false if @dwork was idle and queued, %true if @dwork was
1582  * pending and its timer was modified.
1583  *
1584  * This function is safe to call from any context including IRQ handler.
1585  * See try_to_grab_pending() for details.
1586  */
1587 bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
1588              struct delayed_work *dwork, unsigned long delay)
1589 {
1590     unsigned long flags;
1591     int ret;
1592 
1593     do {
1594         ret = try_to_grab_pending(&dwork->work, true, &flags);
1595     } while (unlikely(ret == -EAGAIN));
1596 
1597     if (likely(ret >= 0)) {
1598         __queue_delayed_work(cpu, wq, dwork, delay);
1599         local_irq_restore(flags);
1600     }
1601 
1602     /* -ENOENT from try_to_grab_pending() becomes %true */
1603     return ret;
1604 }
1605 EXPORT_SYMBOL_GPL(mod_delayed_work_on);
1606 
1607 /**
1608  * worker_enter_idle - enter idle state
1609  * @worker: worker which is entering idle state
1610  *
1611  * @worker is entering idle state.  Update stats and idle timer if
1612  * necessary.
1613  *
1614  * LOCKING:
1615  * spin_lock_irq(pool->lock).
1616  */
1617 static void worker_enter_idle(struct worker *worker)
1618 {
1619     struct worker_pool *pool = worker->pool;
1620 
1621     if (WARN_ON_ONCE(worker->flags & WORKER_IDLE) ||
1622         WARN_ON_ONCE(!list_empty(&worker->entry) &&
1623              (worker->hentry.next || worker->hentry.pprev)))
1624         return;
1625 
1626     /* can't use worker_set_flags(), also called from create_worker() */
1627     worker->flags |= WORKER_IDLE;
1628     pool->nr_idle++;
1629     worker->last_active = jiffies;
1630 
1631     /* idle_list is LIFO */
1632     list_add(&worker->entry, &pool->idle_list);
1633 
1634     if (too_many_workers(pool) && !timer_pending(&pool->idle_timer))
1635         mod_timer(&pool->idle_timer, jiffies + IDLE_WORKER_TIMEOUT);
1636 
1637     /*
1638      * Sanity check nr_running.  Because wq_unbind_fn() releases
1639      * pool->lock between setting %WORKER_UNBOUND and zapping
1640      * nr_running, the warning may trigger spuriously.  Check iff
1641      * unbind is not in progress.
1642      */
1643     WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
1644              pool->nr_workers == pool->nr_idle &&
1645              atomic_read(&pool->nr_running));
1646 }
1647 
1648 /**
1649  * worker_leave_idle - leave idle state
1650  * @worker: worker which is leaving idle state
1651  *
1652  * @worker is leaving idle state.  Update stats.
1653  *
1654  * LOCKING:
1655  * spin_lock_irq(pool->lock).
1656  */
1657 static void worker_leave_idle(struct worker *worker)
1658 {
1659     struct worker_pool *pool = worker->pool;
1660 
1661     if (WARN_ON_ONCE(!(worker->flags & WORKER_IDLE)))
1662         return;
1663     worker_clr_flags(worker, WORKER_IDLE);
1664     pool->nr_idle--;
1665     list_del_init(&worker->entry);
1666 }
1667 
1668 static struct worker *alloc_worker(int node)
1669 {
1670     struct worker *worker;
1671 
1672     worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, node);
1673     if (worker) {
1674         INIT_LIST_HEAD(&worker->entry);
1675         INIT_LIST_HEAD(&worker->scheduled);
1676         INIT_LIST_HEAD(&worker->node);
1677         /* on creation a worker is in !idle && prep state */
1678         worker->flags = WORKER_PREP;
1679     }
1680     return worker;
1681 }
1682 
1683 /**
1684  * worker_attach_to_pool() - attach a worker to a pool
1685  * @worker: worker to be attached
1686  * @pool: the target pool
1687  *
1688  * Attach @worker to @pool.  Once attached, the %WORKER_UNBOUND flag and
1689  * cpu-binding of @worker are kept coordinated with the pool across
1690  * cpu-[un]hotplugs.
1691  */
1692 static void worker_attach_to_pool(struct worker *worker,
1693                    struct worker_pool *pool)
1694 {
1695     mutex_lock(&pool->attach_mutex);
1696 
1697     /*
1698      * set_cpus_allowed_ptr() will fail if the cpumask doesn't have any
1699      * online CPUs.  It'll be re-applied when any of the CPUs come up.
1700      */
1701     set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
1702 
1703     /*
1704      * The pool->attach_mutex ensures %POOL_DISASSOCIATED remains
1705      * stable across this function.  See the comments above the
1706      * flag definition for details.
1707      */
1708     if (pool->flags & POOL_DISASSOCIATED)
1709         worker->flags |= WORKER_UNBOUND;
1710 
1711     list_add_tail(&worker->node, &pool->workers);
1712 
1713     mutex_unlock(&pool->attach_mutex);
1714 }
1715 
1716 /**
1717  * worker_detach_from_pool() - detach a worker from its pool
1718  * @worker: worker which is attached to its pool
1719  * @pool: the pool @worker is attached to
1720  *
1721  * Undo the attaching which had been done in worker_attach_to_pool().  The
1722  * caller worker shouldn't access to the pool after detached except it has
1723  * other reference to the pool.
1724  */
1725 static void worker_detach_from_pool(struct worker *worker,
1726                     struct worker_pool *pool)
1727 {
1728     struct completion *detach_completion = NULL;
1729 
1730     mutex_lock(&pool->attach_mutex);
1731     list_del(&worker->node);
1732     if (list_empty(&pool->workers))
1733         detach_completion = pool->detach_completion;
1734     mutex_unlock(&pool->attach_mutex);
1735 
1736     /* clear leftover flags without pool->lock after it is detached */
1737     worker->flags &= ~(WORKER_UNBOUND | WORKER_REBOUND);
1738 
1739     if (detach_completion)
1740         complete(detach_completion);
1741 }
1742 
1743 /**
1744  * create_worker - create a new workqueue worker
1745  * @pool: pool the new worker will belong to
1746  *
1747  * Create and start a new worker which is attached to @pool.
1748  *
1749  * CONTEXT:
1750  * Might sleep.  Does GFP_KERNEL allocations.
1751  *
1752  * Return:
1753  * Pointer to the newly created worker.
1754  */
1755 static struct worker *create_worker(struct worker_pool *pool)
1756 {
1757     struct worker *worker = NULL;
1758     int id = -1;
1759     char id_buf[16];
1760 
1761     /* ID is needed to determine kthread name */
1762     id = ida_simple_get(&pool->worker_ida, 0, 0, GFP_KERNEL);
1763     if (id < 0)
1764         goto fail;
1765 
1766     worker = alloc_worker(pool->node);
1767     if (!worker)
1768         goto fail;
1769 
1770     worker->pool = pool;
1771     worker->id = id;
1772 
1773     if (pool->cpu >= 0)
1774         snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
1775              pool->attrs->nice < 0  ? "H" : "");
1776     else
1777         snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
1778 
1779     worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
1780                           "kworker/%s", id_buf);
1781     if (IS_ERR(worker->task))
1782         goto fail;
1783 
1784     set_user_nice(worker->task, pool->attrs->nice);
1785     kthread_bind_mask(worker->task, pool->attrs->cpumask);
1786 
1787     /* successful, attach the worker to the pool */
1788     worker_attach_to_pool(worker, pool);
1789 
1790     /* start the newly created worker */
1791     spin_lock_irq(&pool->lock);
1792     worker->pool->nr_workers++;
1793     worker_enter_idle(worker);
1794     wake_up_process(worker->task);
1795     spin_unlock_irq(&pool->lock);
1796 
1797     return worker;
1798 
1799 fail:
1800     if (id >= 0)
1801         ida_simple_remove(&pool->worker_ida, id);
1802     kfree(worker);
1803     return NULL;
1804 }
1805 
1806 /**
1807  * destroy_worker - destroy a workqueue worker
1808  * @worker: worker to be destroyed
1809  *
1810  * Destroy @worker and adjust @pool stats accordingly.  The worker should
1811  * be idle.
1812  *
1813  * CONTEXT:
1814  * spin_lock_irq(pool->lock).
1815  */
1816 static void destroy_worker(struct worker *worker)
1817 {
1818     struct worker_pool *pool = worker->pool;
1819 
1820     lockdep_assert_held(&pool->lock);
1821 
1822     /* sanity check frenzy */
1823     if (WARN_ON(worker->current_work) ||
1824         WARN_ON(!list_empty(&worker->scheduled)) ||
1825         WARN_ON(!(worker->flags & WORKER_IDLE)))
1826         return;
1827 
1828     pool->nr_workers--;
1829     pool->nr_idle--;
1830 
1831     list_del_init(&worker->entry);
1832     worker->flags |= WORKER_DIE;
1833     wake_up_process(worker->task);
1834 }
1835 
1836 static void idle_worker_timeout(unsigned long __pool)
1837 {
1838     struct worker_pool *pool = (void *)__pool;
1839 
1840     spin_lock_irq(&pool->lock);
1841 
1842     while (too_many_workers(pool)) {
1843         struct worker *worker;
1844         unsigned long expires;
1845 
1846         /* idle_list is kept in LIFO order, check the last one */
1847         worker = list_entry(pool->idle_list.prev, struct worker, entry);
1848         expires = worker->last_active + IDLE_WORKER_TIMEOUT;
1849 
1850         if (time_before(jiffies, expires)) {
1851             mod_timer(&pool->idle_timer, expires);
1852             break;
1853         }
1854 
1855         destroy_worker(worker);
1856     }
1857 
1858     spin_unlock_irq(&pool->lock);
1859 }
1860 
1861 static void send_mayday(struct work_struct *work)
1862 {
1863     struct pool_workqueue *pwq = get_work_pwq(work);
1864     struct workqueue_struct *wq = pwq->wq;
1865 
1866     lockdep_assert_held(&wq_mayday_lock);
1867 
1868     if (!wq->rescuer)
1869         return;
1870 
1871     /* mayday mayday mayday */
1872     if (list_empty(&pwq->mayday_node)) {
1873         /*
1874          * If @pwq is for an unbound wq, its base ref may be put at
1875          * any time due to an attribute change.  Pin @pwq until the
1876          * rescuer is done with it.
1877          */
1878         get_pwq(pwq);
1879         list_add_tail(&pwq->mayday_node, &wq->maydays);
1880         wake_up_process(wq->rescuer->task);
1881     }
1882 }
1883 
1884 static void pool_mayday_timeout(unsigned long __pool)
1885 {
1886     struct worker_pool *pool = (void *)__pool;
1887     struct work_struct *work;
1888 
1889     spin_lock_irq(&pool->lock);
1890     spin_lock(&wq_mayday_lock);     /* for wq->maydays */
1891 
1892     if (need_to_create_worker(pool)) {
1893         /*
1894          * We've been trying to create a new worker but
1895          * haven't been successful.  We might be hitting an
1896          * allocation deadlock.  Send distress signals to
1897          * rescuers.
1898          */
1899         list_for_each_entry(work, &pool->worklist, entry)
1900             send_mayday(work);
1901     }
1902 
1903     spin_unlock(&wq_mayday_lock);
1904     spin_unlock_irq(&pool->lock);
1905 
1906     mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INTERVAL);
1907 }
1908 
1909 /**
1910  * maybe_create_worker - create a new worker if necessary
1911  * @pool: pool to create a new worker for
1912  *
1913  * Create a new worker for @pool if necessary.  @pool is guaranteed to
1914  * have at least one idle worker on return from this function.  If
1915  * creating a new worker takes longer than MAYDAY_INTERVAL, mayday is
1916  * sent to all rescuers with works scheduled on @pool to resolve
1917  * possible allocation deadlock.
1918  *
1919  * On return, need_to_create_worker() is guaranteed to be %false and
1920  * may_start_working() %true.
1921  *
1922  * LOCKING:
1923  * spin_lock_irq(pool->lock) which may be released and regrabbed
1924  * multiple times.  Does GFP_KERNEL allocations.  Called only from
1925  * manager.
1926  */
1927 static void maybe_create_worker(struct worker_pool *pool)
1928 __releases(&pool->lock)
1929 __acquires(&pool->lock)
1930 {
1931 restart:
1932     spin_unlock_irq(&pool->lock);
1933 
1934     /* if we don't make progress in MAYDAY_INITIAL_TIMEOUT, call for help */
1935     mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT);
1936 
1937     while (true) {
1938         if (create_worker(pool) || !need_to_create_worker(pool))
1939             break;
1940 
1941         schedule_timeout_interruptible(CREATE_COOLDOWN);
1942 
1943         if (!need_to_create_worker(pool))
1944             break;
1945     }
1946 
1947     del_timer_sync(&pool->mayday_timer);
1948     spin_lock_irq(&pool->lock);
1949     /*
1950      * This is necessary even after a new worker was just successfully
1951      * created as @pool->lock was dropped and the new worker might have
1952      * already become busy.
1953      */
1954     if (need_to_create_worker(pool))
1955         goto restart;
1956 }
1957 
1958 /**
1959  * manage_workers - manage worker pool
1960  * @worker: self
1961  *
1962  * Assume the manager role and manage the worker pool @worker belongs
1963  * to.  At any given time, there can be only zero or one manager per
1964  * pool.  The exclusion is handled automatically by this function.
1965  *
1966  * The caller can safely start processing works on false return.  On
1967  * true return, it's guaranteed that need_to_create_worker() is false
1968  * and may_start_working() is true.
1969  *
1970  * CONTEXT:
1971  * spin_lock_irq(pool->lock) which may be released and regrabbed
1972  * multiple times.  Does GFP_KERNEL allocations.
1973  *
1974  * Return:
1975  * %false if the pool doesn't need management and the caller can safely
1976  * start processing works, %true if management function was performed and
1977  * the conditions that the caller verified before calling the function may
1978  * no longer be true.
1979  */
1980 static bool manage_workers(struct worker *worker)
1981 {
1982     struct worker_pool *pool = worker->pool;
1983 
1984     /*
1985      * Anyone who successfully grabs manager_arb wins the arbitration
1986      * and becomes the manager.  mutex_trylock() on pool->manager_arb
1987      * failure while holding pool->lock reliably indicates that someone
1988      * else is managing the pool and the worker which failed trylock
1989      * can proceed to executing work items.  This means that anyone
1990      * grabbing manager_arb is responsible for actually performing
1991      * manager duties.  If manager_arb is grabbed and released without
1992      * actual management, the pool may stall indefinitely.
1993      */
1994     if (!mutex_trylock(&pool->manager_arb))
1995         return false;
1996     pool->manager = worker;
1997 
1998     maybe_create_worker(pool);
1999 
2000     pool->manager = NULL;
2001     mutex_unlock(&pool->manager_arb);
2002     return true;
2003 }
2004 
2005 /**
2006  * process_one_work - process single work
2007  * @worker: self
2008  * @work: work to process
2009  *
2010  * Process @work.  This function contains all the logics necessary to
2011  * process a single work including synchronization against and
2012  * interaction with other workers on the same cpu, queueing and
2013  * flushing.  As long as context requirement is met, any worker can
2014  * call this function to process a work.
2015  *
2016  * CONTEXT:
2017  * spin_lock_irq(pool->lock) which is released and regrabbed.
2018  */
2019 static void process_one_work(struct worker *worker, struct work_struct *work)
2020 __releases(&pool->lock)
2021 __acquires(&pool->lock)
2022 {
2023     struct pool_workqueue *pwq = get_work_pwq(work);
2024     struct worker_pool *pool = worker->pool;
2025     bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;
2026     int work_color;
2027     struct worker *collision;
2028 #ifdef CONFIG_LOCKDEP
2029     /*
2030      * It is permissible to free the struct work_struct from
2031      * inside the function that is called from it, this we need to
2032      * take into account for lockdep too.  To avoid bogus "held
2033      * lock freed" warnings as well as problems when looking into
2034      * work->lockdep_map, make a copy and use that here.
2035      */
2036     struct lockdep_map lockdep_map;
2037 
2038     lockdep_copy_map(&lockdep_map, &work->lockdep_map);
2039 #endif
2040     /* ensure we're on the correct CPU */
2041     WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
2042              raw_smp_processor_id() != pool->cpu);
2043 
2044     /*
2045      * A single work shouldn't be executed concurrently by
2046      * multiple workers on a single cpu.  Check whether anyone is
2047      * already processing the work.  If so, defer the work to the
2048      * currently executing one.
2049      */
2050     collision = find_worker_executing_work(pool, work);
2051     if (unlikely(collision)) {
2052         move_linked_works(work, &collision->scheduled, NULL);
2053         return;
2054     }
2055 
2056     /* claim and dequeue */
2057     debug_work_deactivate(work);
2058     hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);
2059     worker->current_work = work;
2060     worker->current_func = work->func;
2061     worker->current_pwq = pwq;
2062     work_color = get_work_color(work);
2063 
2064     list_del_init(&work->entry);
2065 
2066     /*
2067      * CPU intensive works don't participate in concurrency management.
2068      * They're the scheduler's responsibility.  This takes @worker out
2069      * of concurrency management and the next code block will chain
2070      * execution of the pending work items.
2071      */
2072     if (unlikely(cpu_intensive))
2073         worker_set_flags(worker, WORKER_CPU_INTENSIVE);
2074 
2075     /*
2076      * Wake up another worker if necessary.  The condition is always
2077      * false for normal per-cpu workers since nr_running would always
2078      * be >= 1 at this point.  This is used to chain execution of the
2079      * pending work items for WORKER_NOT_RUNNING workers such as the
2080      * UNBOUND and CPU_INTENSIVE ones.
2081      */
2082     if (need_more_worker(pool))
2083         wake_up_worker(pool);
2084 
2085     /*
2086      * Record the last pool and clear PENDING which should be the last
2087      * update to @work.  Also, do this inside @pool->lock so that
2088      * PENDING and queued state changes happen together while IRQ is
2089      * disabled.
2090      */
2091     set_work_pool_and_clear_pending(work, pool->id);
2092 
2093     spin_unlock_irq(&pool->lock);
2094 
2095     lock_map_acquire_read(&pwq->wq->lockdep_map);
2096     lock_map_acquire(&lockdep_map);
2097     trace_workqueue_execute_start(work);
2098     worker->current_func(work);
2099     /*
2100      * While we must be careful to not use "work" after this, the trace
2101      * point will only record its address.
2102      */
2103     trace_workqueue_execute_end(work);
2104     lock_map_release(&lockdep_map);
2105     lock_map_release(&pwq->wq->lockdep_map);
2106 
2107     if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
2108         pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"
2109                "     last function: %pf\n",
2110                current->comm, preempt_count(), task_pid_nr(current),
2111                worker->current_func);
2112         debug_show_held_locks(current);
2113         dump_stack();
2114     }
2115 
2116     /*
2117      * The following prevents a kworker from hogging CPU on !PREEMPT
2118      * kernels, where a requeueing work item waiting for something to
2119      * happen could deadlock with stop_machine as such work item could
2120      * indefinitely requeue itself while all other CPUs are trapped in
2121      * stop_machine. At the same time, report a quiescent RCU state so
2122      * the same condition doesn't freeze RCU.
2123      */
2124     cond_resched_rcu_qs();
2125 
2126     spin_lock_irq(&pool->lock);
2127 
2128     /* clear cpu intensive status */
2129     if (unlikely(cpu_intensive))
2130         worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
2131 
2132     /* we're done with it, release */
2133     hash_del(&worker->hentry);
2134     worker->current_work = NULL;
2135     worker->current_func = NULL;
2136     worker->current_pwq = NULL;
2137     worker->desc_valid = false;
2138     pwq_dec_nr_in_flight(pwq, work_color);
2139 }
2140 
2141 /**
2142  * process_scheduled_works - process scheduled works
2143  * @worker: self
2144  *
2145  * Process all scheduled works.  Please note that the scheduled list
2146  * may change while processing a work, so this function repeatedly
2147  * fetches a work from the top and executes it.
2148  *
2149  * CONTEXT:
2150  * spin_lock_irq(pool->lock) which may be released and regrabbed
2151  * multiple times.
2152  */
2153 static void process_scheduled_works(struct worker *worker)
2154 {
2155     while (!list_empty(&worker->scheduled)) {
2156         struct work_struct *work = list_first_entry(&worker->scheduled,
2157                         struct work_struct, entry);
2158         process_one_work(worker, work);
2159     }
2160 }
2161 
2162 /**
2163  * worker_thread - the worker thread function
2164  * @__worker: self
2165  *
2166  * The worker thread function.  All workers belong to a worker_pool -
2167  * either a per-cpu one or dynamic unbound one.  These workers process all
2168  * work items regardless of their specific target workqueue.  The only
2169  * exception is work items which belong to workqueues with a rescuer which
2170  * will be explained in rescuer_thread().
2171  *
2172  * Return: 0
2173  */
2174 static int worker_thread(void *__worker)
2175 {
2176     struct worker *worker = __worker;
2177     struct worker_pool *pool = worker->pool;
2178 
2179     /* tell the scheduler that this is a workqueue worker */
2180     worker->task->flags |= PF_WQ_WORKER;
2181 woke_up:
2182     spin_lock_irq(&pool->lock);
2183 
2184     /* am I supposed to die? */
2185     if (unlikely(worker->flags & WORKER_DIE)) {
2186         spin_unlock_irq(&pool->lock);
2187         WARN_ON_ONCE(!list_empty(&worker->entry));
2188         worker->task->flags &= ~PF_WQ_WORKER;
2189 
2190         set_task_comm(worker->task, "kworker/dying");
2191         ida_simple_remove(&pool->worker_ida, worker->id);
2192         worker_detach_from_pool(worker, pool);
2193         kfree(worker);
2194         return 0;
2195     }
2196 
2197     worker_leave_idle(worker);
2198 recheck:
2199     /* no more worker necessary? */
2200     if (!need_more_worker(pool))
2201         goto sleep;
2202 
2203     /* do we need to manage? */
2204     if (unlikely(!may_start_working(pool)) && manage_workers(worker))
2205         goto recheck;
2206 
2207     /*
2208      * ->scheduled list can only be filled while a worker is
2209      * preparing to process a work or actually processing it.
2210      * Make sure nobody diddled with it while I was sleeping.
2211      */
2212     WARN_ON_ONCE(!list_empty(&worker->scheduled));
2213 
2214     /*
2215      * Finish PREP stage.  We're guaranteed to have at least one idle
2216      * worker or that someone else has already assumed the manager
2217      * role.  This is where @worker starts participating in concurrency
2218      * management if applicable and concurrency management is restored
2219      * after being rebound.  See rebind_workers() for details.
2220      */
2221     worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
2222 
2223     do {
2224         struct work_struct *work =
2225             list_first_entry(&pool->worklist,
2226                      struct work_struct, entry);
2227 
2228         pool->watchdog_ts = jiffies;
2229 
2230         if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
2231             /* optimization path, not strictly necessary */
2232             process_one_work(worker, work);
2233             if (unlikely(!list_empty(&worker->scheduled)))
2234                 process_scheduled_works(worker);
2235         } else {
2236             move_linked_works(work, &worker->scheduled, NULL);
2237             process_scheduled_works(worker);
2238         }
2239     } while (keep_working(pool));
2240 
2241     worker_set_flags(worker, WORKER_PREP);
2242 sleep:
2243     /*
2244      * pool->lock is held and there's no work to process and no need to
2245      * manage, sleep.  Workers are woken up only while holding
2246      * pool->lock or from local cpu, so setting the current state
2247      * before releasing pool->lock is enough to prevent losing any
2248      * event.
2249      */
2250     worker_enter_idle(worker);
2251     __set_current_state(TASK_INTERRUPTIBLE);
2252     spin_unlock_irq(&pool->lock);
2253     schedule();
2254     goto woke_up;
2255 }
2256 
2257 /**
2258  * rescuer_thread - the rescuer thread function
2259  * @__rescuer: self
2260  *
2261  * Workqueue rescuer thread function.  There's one rescuer for each
2262  * workqueue which has WQ_MEM_RECLAIM set.
2263  *
2264  * Regular work processing on a pool may block trying to create a new
2265  * worker which uses GFP_KERNEL allocation which has slight chance of
2266  * developing into deadlock if some works currently on the same queue
2267  * need to be processed to satisfy the GFP_KERNEL allocation.  This is
2268  * the problem rescuer solves.
2269  *
2270  * When such condition is possible, the pool summons rescuers of all
2271  * workqueues which have works queued on the pool and let them process
2272  * those works so that forward progress can be guaranteed.
2273  *
2274  * This should happen rarely.
2275  *
2276  * Return: 0
2277  */
2278 static int rescuer_thread(void *__rescuer)
2279 {
2280     struct worker *rescuer = __rescuer;
2281     struct workqueue_struct *wq = rescuer->rescue_wq;
2282     struct list_head *scheduled = &rescuer->scheduled;
2283     bool should_stop;
2284 
2285     set_user_nice(current, RESCUER_NICE_LEVEL);
2286 
2287     /*
2288      * Mark rescuer as worker too.  As WORKER_PREP is never cleared, it
2289      * doesn't participate in concurrency management.
2290      */
2291     rescuer->task->flags |= PF_WQ_WORKER;
2292 repeat:
2293     set_current_state(TASK_INTERRUPTIBLE);
2294 
2295     /*
2296      * By the time the rescuer is requested to stop, the workqueue
2297      * shouldn't have any work pending, but @wq->maydays may still have
2298      * pwq(s) queued.  This can happen by non-rescuer workers consuming
2299      * all the work items before the rescuer got to them.  Go through
2300      * @wq->maydays processing before acting on should_stop so that the
2301      * list is always empty on exit.
2302      */
2303     should_stop = kthread_should_stop();
2304 
2305     /* see whether any pwq is asking for help */
2306     spin_lock_irq(&wq_mayday_lock);
2307 
2308     while (!list_empty(&wq->maydays)) {
2309         struct pool_workqueue *pwq = list_first_entry(&wq->maydays,
2310                     struct pool_workqueue, mayday_node);
2311         struct worker_pool *pool = pwq->pool;
2312         struct work_struct *work, *n;
2313         bool first = true;
2314 
2315         __set_current_state(TASK_RUNNING);
2316         list_del_init(&pwq->mayday_node);
2317 
2318         spin_unlock_irq(&wq_mayday_lock);
2319 
2320         worker_attach_to_pool(rescuer, pool);
2321 
2322         spin_lock_irq(&pool->lock);
2323         rescuer->pool = pool;
2324 
2325         /*
2326          * Slurp in all works issued via this workqueue and
2327          * process'em.
2328          */
2329         WARN_ON_ONCE(!list_empty(scheduled));
2330         list_for_each_entry_safe(work, n, &pool->worklist, entry) {
2331             if (get_work_pwq(work) == pwq) {
2332                 if (first)
2333                     pool->watchdog_ts = jiffies;
2334                 move_linked_works(work, scheduled, &n);
2335             }
2336             first = false;
2337         }
2338 
2339         if (!list_empty(scheduled)) {
2340             process_scheduled_works(rescuer);
2341 
2342             /*
2343              * The above execution of rescued work items could
2344              * have created more to rescue through
2345              * pwq_activate_first_delayed() or chained
2346              * queueing.  Let's put @pwq back on mayday list so
2347              * that such back-to-back work items, which may be
2348              * being used to relieve memory pressure, don't
2349              * incur MAYDAY_INTERVAL delay inbetween.
2350              */
2351             if (need_to_create_worker(pool)) {
2352                 spin_lock(&wq_mayday_lock);
2353                 get_pwq(pwq);
2354                 list_move_tail(&pwq->mayday_node, &wq->maydays);
2355                 spin_unlock(&wq_mayday_lock);
2356             }
2357         }
2358 
2359         /*
2360          * Put the reference grabbed by send_mayday().  @pool won't
2361          * go away while we're still attached to it.
2362          */
2363         put_pwq(pwq);
2364 
2365         /*
2366          * Leave this pool.  If need_more_worker() is %true, notify a
2367          * regular worker; otherwise, we end up with 0 concurrency
2368          * and stalling the execution.
2369          */
2370         if (need_more_worker(pool))
2371             wake_up_worker(pool);
2372 
2373         rescuer->pool = NULL;
2374         spin_unlock_irq(&pool->lock);
2375 
2376         worker_detach_from_pool(rescuer, pool);
2377 
2378         spin_lock_irq(&wq_mayday_lock);
2379     }
2380 
2381     spin_unlock_irq(&wq_mayday_lock);
2382 
2383     if (should_stop) {
2384         __set_current_state(TASK_RUNNING);
2385         rescuer->task->flags &= ~PF_WQ_WORKER;
2386         return 0;
2387     }
2388 
2389     /* rescuers should never participate in concurrency management */
2390     WARN_ON_ONCE(!(rescuer->flags & WORKER_NOT_RUNNING));
2391     schedule();
2392     goto repeat;
2393 }
2394 
2395 /**
2396  * check_flush_dependency - check for flush dependency sanity
2397  * @target_wq: workqueue being flushed
2398  * @target_work: work item being flushed (NULL for workqueue flushes)
2399  *
2400  * %current is trying to flush the whole @target_wq or @target_work on it.
2401  * If @target_wq doesn't have %WQ_MEM_RECLAIM, verify that %current is not
2402  * reclaiming memory or running on a workqueue which doesn't have
2403  * %WQ_MEM_RECLAIM as that can break forward-progress guarantee leading to
2404  * a deadlock.
2405  */
2406 static void check_flush_dependency(struct workqueue_struct *target_wq,
2407                    struct work_struct *target_work)
2408 {
2409     work_func_t target_func = target_work ? target_work->func : NULL;
2410     struct worker *worker;
2411 
2412     if (target_wq->flags & WQ_MEM_RECLAIM)
2413         return;
2414 
2415     worker = current_wq_worker();
2416 
2417     WARN_ONCE(current->flags & PF_MEMALLOC,
2418           "workqueue: PF_MEMALLOC task %d(%s) is flushing !WQ_MEM_RECLAIM %s:%pf",
2419           current->pid, current->comm, target_wq->name, target_func);
2420     WARN_ONCE(worker && ((worker->current_pwq->wq->flags &
2421                   (WQ_MEM_RECLAIM | __WQ_LEGACY)) == WQ_MEM_RECLAIM),
2422           "workqueue: WQ_MEM_RECLAIM %s:%pf is flushing !WQ_MEM_RECLAIM %s:%pf",
2423           worker->current_pwq->wq->name, worker->current_func,
2424           target_wq->name, target_func);
2425 }
2426 
2427 struct wq_barrier {
2428     struct work_struct  work;
2429     struct completion   done;
2430     struct task_struct  *task;  /* purely informational */
2431 };
2432 
2433 static void wq_barrier_func(struct work_struct *work)
2434 {
2435     struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
2436     complete(&barr->done);
2437 }
2438 
2439 /**
2440  * insert_wq_barrier - insert a barrier work
2441  * @pwq: pwq to insert barrier into
2442  * @barr: wq_barrier to insert
2443  * @target: target work to attach @barr to
2444  * @worker: worker currently executing @target, NULL if @target is not executing
2445  *
2446  * @barr is linked to @target such that @barr is completed only after
2447  * @target finishes execution.  Please note that the ordering
2448  * guarantee is observed only with respect to @target and on the local
2449  * cpu.
2450  *
2451  * Currently, a queued barrier can't be canceled.  This is because
2452  * try_to_grab_pending() can't determine whether the work to be
2453  * grabbed is at the head of the queue and thus can't clear LINKED
2454  * flag of the previous work while there must be a valid next work
2455  * after a work with LINKED flag set.
2456  *
2457  * Note that when @worker is non-NULL, @target may be modified
2458  * underneath us, so we can't reliably determine pwq from @target.
2459  *
2460  * CONTEXT:
2461  * spin_lock_irq(pool->lock).
2462  */
2463 static void insert_wq_barrier(struct pool_workqueue *pwq,
2464                   struct wq_barrier *barr,
2465                   struct work_struct *target, struct worker *worker)
2466 {
2467     struct list_head *head;
2468     unsigned int linked = 0;
2469 
2470     /*
2471      * debugobject calls are safe here even with pool->lock locked
2472      * as we know for sure that this will not trigger any of the
2473      * checks and call back into the fixup functions where we
2474      * might deadlock.
2475      */
2476     INIT_WORK_ONSTACK(&barr->work, wq_barrier_func);
2477     __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
2478     init_completion(&barr->done);
2479     barr->task = current;
2480 
2481     /*
2482      * If @target is currently being executed, schedule the
2483      * barrier to the worker; otherwise, put it after @target.
2484      */
2485     if (worker)
2486         head = worker->scheduled.next;
2487     else {
2488         unsigned long *bits = work_data_bits(target);
2489 
2490         head = target->entry.next;
2491         /* there can already be other linked works, inherit and set */
2492         linked = *bits & WORK_STRUCT_LINKED;
2493         __set_bit(WORK_STRUCT_LINKED_BIT, bits);
2494     }
2495 
2496     debug_work_activate(&barr->work);
2497     insert_work(pwq, &barr->work, head,
2498             work_color_to_flags(WORK_NO_COLOR) | linked);
2499 }
2500 
2501 /**
2502  * flush_workqueue_prep_pwqs - prepare pwqs for workqueue flushing
2503  * @wq: workqueue being flushed
2504  * @flush_color: new flush color, < 0 for no-op
2505  * @work_color: new work color, < 0 for no-op
2506  *
2507  * Prepare pwqs for workqueue flushing.
2508  *
2509  * If @flush_color is non-negative, flush_color on all pwqs should be
2510  * -1.  If no pwq has in-flight commands at the specified color, all
2511  * pwq->flush_color's stay at -1 and %false is returned.  If any pwq
2512  * has in flight commands, its pwq->flush_color is set to
2513  * @flush_color, @wq->nr_pwqs_to_flush is updated accordingly, pwq
2514  * wakeup logic is armed and %true is returned.
2515  *
2516  * The caller should have initialized @wq->first_flusher prior to
2517  * calling this function with non-negative @flush_color.  If
2518  * @flush_color is negative, no flush color update is done and %false
2519  * is returned.
2520  *
2521  * If @work_color is non-negative, all pwqs should have the same
2522  * work_color which is previous to @work_color and all will be
2523  * advanced to @work_color.
2524  *
2525  * CONTEXT:
2526  * mutex_lock(wq->mutex).
2527  *
2528  * Return:
2529  * %true if @flush_color >= 0 and there's something to flush.  %false
2530  * otherwise.
2531  */
2532 static bool flush_workqueue_prep_pwqs(struct workqueue_struct *wq,
2533                       int flush_color, int work_color)
2534 {
2535     bool wait = false;
2536     struct pool_workqueue *pwq;
2537 
2538     if (flush_color >= 0) {
2539         WARN_ON_ONCE(atomic_read(&wq->nr_pwqs_to_flush));
2540         atomic_set(&wq->nr_pwqs_to_flush, 1);
2541     }
2542 
2543     for_each_pwq(pwq, wq) {
2544         struct worker_pool *pool = pwq->pool;
2545 
2546         spin_lock_irq(&pool->lock);
2547 
2548         if (flush_color >= 0) {
2549             WARN_ON_ONCE(pwq->flush_color != -1);
2550 
2551             if (pwq->nr_in_flight[flush_color]) {
2552                 pwq->flush_color = flush_color;
2553                 atomic_inc(&wq->nr_pwqs_to_flush);
2554                 wait = true;
2555             }
2556         }
2557 
2558         if (work_color >= 0) {
2559             WARN_ON_ONCE(work_color != work_next_color(pwq->work_color));
2560             pwq->work_color = work_color;
2561         }
2562 
2563         spin_unlock_irq(&pool->lock);
2564     }
2565 
2566     if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_pwqs_to_flush))
2567         complete(&wq->first_flusher->done);
2568 
2569     return wait;
2570 }
2571 
2572 /**
2573  * flush_workqueue - ensure that any scheduled work has run to completion.
2574  * @wq: workqueue to flush
2575  *
2576  * This function sleeps until all work items which were queued on entry
2577  * have finished execution, but it is not livelocked by new incoming ones.
2578  */
2579 void flush_workqueue(struct workqueue_struct *wq)
2580 {
2581     struct wq_flusher this_flusher = {
2582         .list = LIST_HEAD_INIT(this_flusher.list),
2583         .flush_color = -1,
2584         .done = COMPLETION_INITIALIZER_ONSTACK(this_flusher.done),
2585     };
2586     int next_color;
2587 
2588     if (WARN_ON(!wq_online))
2589         return;
2590 
2591     lock_map_acquire(&wq->lockdep_map);
2592     lock_map_release(&wq->lockdep_map);
2593 
2594     mutex_lock(&wq->mutex);
2595 
2596     /*
2597      * Start-to-wait phase
2598      */
2599     next_color = work_next_color(wq->work_color);
2600 
2601     if (next_color != wq->flush_color) {
2602         /*
2603          * Color space is not full.  The current work_color
2604          * becomes our flush_color and work_color is advanced
2605          * by one.
2606          */
2607         WARN_ON_ONCE(!list_empty(&wq->flusher_overflow));
2608         this_flusher.flush_color = wq->work_color;
2609         wq->work_color = next_color;
2610 
2611         if (!wq->first_flusher) {
2612             /* no flush in progress, become the first flusher */
2613             WARN_ON_ONCE(wq->flush_color != this_flusher.flush_color);
2614 
2615             wq->first_flusher = &this_flusher;
2616 
2617             if (!flush_workqueue_prep_pwqs(wq, wq->flush_color,
2618                                wq->work_color)) {
2619                 /* nothing to flush, done */
2620                 wq->flush_color = next_color;
2621                 wq->first_flusher = NULL;
2622                 goto out_unlock;
2623             }
2624         } else {
2625             /* wait in queue */
2626             WARN_ON_ONCE(wq->flush_color == this_flusher.flush_color);
2627             list_add_tail(&this_flusher.list, &wq->flusher_queue);
2628             flush_workqueue_prep_pwqs(wq, -1, wq->work_color);
2629         }
2630     } else {
2631         /*
2632          * Oops, color space is full, wait on overflow queue.
2633          * The next flush completion will assign us
2634          * flush_color and transfer to flusher_queue.
2635          */
2636         list_add_tail(&this_flusher.list, &wq->flusher_overflow);
2637     }
2638 
2639     check_flush_dependency(wq, NULL);
2640 
2641     mutex_unlock(&wq->mutex);
2642 
2643     wait_for_completion(&this_flusher.done);
2644 
2645     /*
2646      * Wake-up-and-cascade phase
2647      *
2648      * First flushers are responsible for cascading flushes and
2649      * handling overflow.  Non-first flushers can simply return.
2650      */
2651     if (wq->first_flusher != &this_flusher)
2652         return;
2653 
2654     mutex_lock(&wq->mutex);
2655 
2656     /* we might have raced, check again with mutex held */
2657     if (wq->first_flusher != &this_flusher)
2658         goto out_unlock;
2659 
2660     wq->first_flusher = NULL;
2661 
2662     WARN_ON_ONCE(!list_empty(&this_flusher.list));
2663     WARN_ON_ONCE(wq->flush_color != this_flusher.flush_color);
2664 
2665     while (true) {
2666         struct wq_flusher *next, *tmp;
2667 
2668         /* complete all the flushers sharing the current flush color */
2669         list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) {
2670             if (next->flush_color != wq->flush_color)
2671                 break;
2672             list_del_init(&next->list);
2673             complete(&next->done);
2674         }
2675 
2676         WARN_ON_ONCE(!list_empty(&wq->flusher_overflow) &&
2677                  wq->flush_color != work_next_color(wq->work_color));
2678 
2679         /* this flush_color is finished, advance by one */
2680         wq->flush_color = work_next_color(wq->flush_color);
2681 
2682         /* one color has been freed, handle overflow queue */
2683         if (!list_empty(&wq->flusher_overflow)) {
2684             /*
2685              * Assign the same color to all overflowed
2686              * flushers, advance work_color and append to
2687              * flusher_queue.  This is the start-to-wait
2688              * phase for these overflowed flushers.
2689              */
2690             list_for_each_entry(tmp, &wq->flusher_overflow, list)
2691                 tmp->flush_color = wq->work_color;
2692 
2693             wq->work_color = work_next_color(wq->work_color);
2694 
2695             list_splice_tail_init(&wq->flusher_overflow,
2696                           &wq->flusher_queue);
2697             flush_workqueue_prep_pwqs(wq, -1, wq->work_color);
2698         }
2699 
2700         if (list_empty(&wq->flusher_queue)) {
2701             WARN_ON_ONCE(wq->flush_color != wq->work_color);
2702             break;
2703         }
2704 
2705         /*
2706          * Need to flush more colors.  Make the next flusher
2707          * the new first flusher and arm pwqs.
2708          */
2709         WARN_ON_ONCE(wq->flush_color == wq->work_color);
2710         WARN_ON_ONCE(wq->flush_color != next->flush_color);
2711 
2712         list_del_init(&next->list);
2713         wq->first_flusher = next;
2714 
2715         if (flush_workqueue_prep_pwqs(wq, wq->flush_color, -1))
2716             break;
2717 
2718         /*
2719          * Meh... this color is already done, clear first
2720          * flusher and repeat cascading.
2721          */
2722         wq->first_flusher = NULL;
2723     }
2724 
2725 out_unlock:
2726     mutex_unlock(&wq->mutex);
2727 }
2728 EXPORT_SYMBOL(flush_workqueue);
2729 
2730 /**
2731  * drain_workqueue - drain a workqueue
2732  * @wq: workqueue to drain
2733  *
2734  * Wait until the workqueue becomes empty.  While draining is in progress,
2735  * only chain queueing is allowed.  IOW, only currently pending or running
2736  * work items on @wq can queue further work items on it.  @wq is flushed
2737  * repeatedly until it becomes empty.  The number of flushing is determined
2738  * by the depth of chaining and should be relatively short.  Whine if it
2739  * takes too long.
2740  */
2741 void drain_workqueue(struct workqueue_struct *wq)
2742 {
2743     unsigned int flush_cnt = 0;
2744     struct pool_workqueue *pwq;
2745 
2746     /*
2747      * __queue_work() needs to test whether there are drainers, is much
2748      * hotter than drain_workqueue() and already looks at @wq->flags.
2749      * Use __WQ_DRAINING so that queue doesn't have to check nr_drainers.
2750      */
2751     mutex_lock(&wq->mutex);
2752     if (!wq->nr_drainers++)
2753         wq->flags |= __WQ_DRAINING;
2754     mutex_unlock(&wq->mutex);
2755 reflush:
2756     flush_workqueue(wq);
2757 
2758     mutex_lock(&wq->mutex);
2759 
2760     for_each_pwq(pwq, wq) {
2761         bool drained;
2762 
2763         spin_lock_irq(&pwq->pool->lock);
2764         drained = !pwq->nr_active && list_empty(&pwq->delayed_works);
2765         spin_unlock_irq(&pwq->pool->lock);
2766 
2767         if (drained)
2768             continue;
2769 
2770         if (++flush_cnt == 10 ||
2771             (flush_cnt % 100 == 0 && flush_cnt <= 1000))
2772             pr_warn("workqueue %s: drain_workqueue() isn't complete after %u tries\n",
2773                 wq->name, flush_cnt);
2774 
2775         mutex_unlock(&wq->mutex);
2776         goto reflush;
2777     }
2778 
2779     if (!--wq->nr_drainers)
2780         wq->flags &= ~__WQ_DRAINING;
2781     mutex_unlock(&wq->mutex);
2782 }
2783 EXPORT_SYMBOL_GPL(drain_workqueue);
2784 
2785 static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr)
2786 {
2787     struct worker *worker = NULL;
2788     struct worker_pool *pool;
2789     struct pool_workqueue *pwq;
2790 
2791     might_sleep();
2792 
2793     local_irq_disable();
2794     pool = get_work_pool(work);
2795     if (!pool) {
2796         local_irq_enable();
2797         return false;
2798     }
2799 
2800     spin_lock(&pool->lock);
2801     /* see the comment in try_to_grab_pending() with the same code */
2802     pwq = get_work_pwq(work);
2803     if (pwq) {
2804         if (unlikely(pwq->pool != pool))
2805             goto already_gone;
2806     } else {
2807         worker = find_worker_executing_work(pool, work);
2808         if (!worker)
2809             goto already_gone;
2810         pwq = worker->current_pwq;
2811     }
2812 
2813     check_flush_dependency(pwq->wq, work);
2814 
2815     insert_wq_barrier(pwq, barr, work, worker);
2816     spin_unlock_irq(&pool->lock);
2817 
2818     /*
2819      * If @max_active is 1 or rescuer is in use, flushing another work
2820      * item on the same workqueue may lead to deadlock.  Make sure the
2821      * flusher is not running on the same workqueue by verifying write
2822      * access.
2823      */
2824     if (pwq->wq->saved_max_active == 1 || pwq->wq->rescuer)
2825         lock_map_acquire(&pwq->wq->lockdep_map);
2826     else
2827         lock_map_acquire_read(&pwq->wq->lockdep_map);
2828     lock_map_release(&pwq->wq->lockdep_map);
2829 
2830     return true;
2831 already_gone:
2832     spin_unlock_irq(&pool->lock);
2833     return false;
2834 }
2835 
2836 /**
2837  * flush_work - wait for a work to finish executing the last queueing instance
2838  * @work: the work to flush
2839  *
2840  * Wait until @work has finished execution.  @work is guaranteed to be idle
2841  * on return if it hasn't been requeued since flush started.
2842  *
2843  * Return:
2844  * %true if flush_work() waited for the work to finish execution,
2845  * %false if it was already idle.
2846  */
2847 bool flush_work(struct work_struct *work)
2848 {
2849     struct wq_barrier barr;
2850 
2851     if (WARN_ON(!wq_online))
2852         return false;
2853 
2854     lock_map_acquire(&work->lockdep_map);
2855     lock_map_release(&work->lockdep_map);
2856 
2857     if (start_flush_work(work, &barr)) {
2858         wait_for_completion(&barr.done);
2859         destroy_work_on_stack(&barr.work);
2860         return true;
2861     } else {
2862         return false;
2863     }
2864 }
2865 EXPORT_SYMBOL_GPL(flush_work);
2866 
2867 struct cwt_wait {
2868     wait_queue_t        wait;
2869     struct work_struct  *work;
2870 };
2871 
2872 static int cwt_wakefn(wait_queue_t *wait, unsigned mode, int sync, void *key)
2873 {
2874     struct cwt_wait *cwait = container_of(wait, struct cwt_wait, wait);
2875 
2876     if (cwait->work != key)
2877         return 0;
2878     return autoremove_wake_function(wait, mode, sync, key);
2879 }
2880 
2881 static bool __cancel_work_timer(struct work_struct *work, bool is_dwork)
2882 {
2883     static DECLARE_WAIT_QUEUE_HEAD(cancel_waitq);
2884     unsigned long flags;
2885     int ret;
2886 
2887     do {
2888         ret = try_to_grab_pending(work, is_dwork, &flags);
2889         /*
2890          * If someone else is already canceling, wait for it to
2891          * finish.  flush_work() doesn't work for PREEMPT_NONE
2892          * because we may get scheduled between @work's completion
2893          * and the other canceling task resuming and clearing
2894          * CANCELING - flush_work() will return false immediately
2895          * as @work is no longer busy, try_to_grab_pending() will
2896          * return -ENOENT as @work is still being canceled and the
2897          * other canceling task won't be able to clear CANCELING as
2898          * we're hogging the CPU.
2899          *
2900          * Let's wait for completion using a waitqueue.  As this
2901          * may lead to the thundering herd problem, use a custom
2902          * wake function which matches @work along with exclusive
2903          * wait and wakeup.
2904          */
2905         if (unlikely(ret == -ENOENT)) {
2906             struct cwt_wait cwait;
2907 
2908             init_wait(&cwait.wait);
2909             cwait.wait.func = cwt_wakefn;
2910             cwait.work = work;
2911 
2912             prepare_to_wait_exclusive(&cancel_waitq, &cwait.wait,
2913                           TASK_UNINTERRUPTIBLE);
2914             if (work_is_canceling(work))
2915                 schedule();
2916             finish_wait(&cancel_waitq, &cwait.wait);
2917         }
2918     } while (unlikely(ret < 0));
2919 
2920     /* tell other tasks trying to grab @work to back off */
2921     mark_work_canceling(work);
2922     local_irq_restore(flags);
2923 
2924     /*
2925      * This allows canceling during early boot.  We know that @work
2926      * isn't executing.
2927      */
2928     if (wq_online)
2929         flush_work(work);
2930 
2931     clear_work_data(work);
2932 
2933     /*
2934      * Paired with prepare_to_wait() above so that either
2935      * waitqueue_active() is visible here or !work_is_canceling() is
2936      * visible there.
2937      */
2938     smp_mb();
2939     if (waitqueue_active(&cancel_waitq))
2940         __wake_up(&cancel_waitq, TASK_NORMAL, 1, work);
2941 
2942     return ret;
2943 }
2944 
2945 /**
2946  * cancel_work_sync - cancel a work and wait for it to finish
2947  * @work: the work to cancel
2948  *
2949  * Cancel @work and wait for its execution to finish.  This function
2950  * can be used even if the work re-queues itself or migrates to
2951  * another workqueue.  On return from this function, @work is
2952  * guaranteed to be not pending or executing on any CPU.
2953  *
2954  * cancel_work_sync(&delayed_work->work) must not be used for
2955  * delayed_work's.  Use cancel_delayed_work_sync() instead.
2956  *
2957  * The caller must ensure that the workqueue on which @work was last
2958  * queued can't be destroyed before this function returns.
2959  *
2960  * Return:
2961  * %true if @work was pending, %false otherwise.
2962  */
2963 bool cancel_work_sync(struct work_struct *work)
2964 {
2965     return __cancel_work_timer(work, false);
2966 }
2967 EXPORT_SYMBOL_GPL(cancel_work_sync);
2968 
2969 /**
2970  * flush_delayed_work - wait for a dwork to finish executing the last queueing
2971  * @dwork: the delayed work to flush
2972  *
2973  * Delayed timer is cancelled and the pending work is queued for
2974  * immediate execution.  Like flush_work(), this function only
2975  * considers the last queueing instance of @dwork.
2976  *
2977  * Return:
2978  * %true if flush_work() waited for the work to finish execution,
2979  * %false if it was already idle.
2980  */
2981 bool flush_delayed_work(struct delayed_work *dwork)
2982 {
2983     local_irq_disable();
2984     if (del_timer_sync(&dwork->timer))
2985         __queue_work(dwork->cpu, dwork->wq, &dwork->work);
2986     local_irq_enable();
2987     return flush_work(&dwork->work);
2988 }
2989 EXPORT_SYMBOL(flush_delayed_work);
2990 
2991 static bool __cancel_work(struct work_struct *work, bool is_dwork)
2992 {
2993     unsigned long flags;
2994     int ret;
2995 
2996     do {
2997         ret = try_to_grab_pending(work, is_dwork, &flags);
2998     } while (unlikely(ret == -EAGAIN));
2999 
3000     if (unlikely(ret < 0))
3001         return false;
3002 
3003     set_work_pool_and_clear_pending(work, get_work_pool_id(work));
3004     local_irq_restore(flags);
3005     return ret;
3006 }
3007 
3008 /*
3009  * See cancel_delayed_work()
3010  */
3011 bool cancel_work(struct work_struct *work)
3012 {
3013     return __cancel_work(work, false);
3014 }
3015 
3016 /**
3017  * cancel_delayed_work - cancel a delayed work
3018  * @dwork: delayed_work to cancel
3019  *
3020  * Kill off a pending delayed_work.
3021  *
3022  * Return: %true if @dwork was pending and canceled; %false if it wasn't
3023  * pending.
3024  *
3025  * Note:
3026  * The work callback function may still be running on return, unless
3027  * it returns %true and the work doesn't re-arm itself.  Explicitly flush or
3028  * use cancel_delayed_work_sync() to wait on it.
3029  *
3030  * This function is safe to call from any context including IRQ handler.
3031  */
3032 bool cancel_delayed_work(struct delayed_work *dwork)
3033 {
3034     return __cancel_work(&dwork->work, true);
3035 }
3036 EXPORT_SYMBOL(cancel_delayed_work);
3037 
3038 /**
3039  * cancel_delayed_work_sync - cancel a delayed work and wait for it to finish
3040  * @dwork: the delayed work cancel
3041  *
3042  * This is cancel_work_sync() for delayed works.
3043  *
3044  * Return:
3045  * %true if @dwork was pending, %false otherwise.
3046  */
3047 bool cancel_delayed_work_sync(struct delayed_work *dwork)
3048 {
3049     return __cancel_work_timer(&dwork->work, true);
3050 }
3051 EXPORT_SYMBOL(cancel_delayed_work_sync);
3052 
3053 /**
3054  * schedule_on_each_cpu - execute a function synchronously on each online CPU
3055  * @func: the function to call
3056  *
3057  * schedule_on_each_cpu() executes @func on each online CPU using the
3058  * system workqueue and blocks until all CPUs have completed.
3059  * schedule_on_each_cpu() is very slow.
3060  *
3061  * Return:
3062  * 0 on success, -errno on failure.
3063  */
3064 int schedule_on_each_cpu(work_func_t func)
3065 {
3066     int cpu;
3067     struct work_struct __percpu *works;
3068 
3069     works = alloc_percpu(struct work_struct);
3070     if (!works)
3071         return -ENOMEM;
3072 
3073     get_online_cpus();
3074 
3075     for_each_online_cpu(cpu) {
3076         struct work_struct *work = per_cpu_ptr(works, cpu);
3077 
3078         INIT_WORK(work, func);
3079         schedule_work_on(cpu, work);
3080     }
3081 
3082     for_each_online_cpu(cpu)
3083         flush_work(per_cpu_ptr(works, cpu));
3084 
3085     put_online_cpus();
3086     free_percpu(works);
3087     return 0;
3088 }
3089 
3090 /**
3091  * execute_in_process_context - reliably execute the routine with user context
3092  * @fn:     the function to execute
3093  * @ew:     guaranteed storage for the execute work structure (must
3094  *      be available when the work executes)
3095  *
3096  * Executes the function immediately if process context is available,
3097  * otherwise schedules the function for delayed execution.
3098  *
3099  * Return:  0 - function was executed
3100  *      1 - function was scheduled for execution
3101  */
3102 int execute_in_process_context(work_func_t fn, struct execute_work *ew)
3103 {
3104     if (!in_interrupt()) {
3105         fn(&ew->work);
3106         return 0;
3107     }
3108 
3109     INIT_WORK(&ew->work, fn);
3110     schedule_work(&ew->work);
3111 
3112     return 1;
3113 }
3114 EXPORT_SYMBOL_GPL(execute_in_process_context);
3115 
3116 /**
3117  * free_workqueue_attrs - free a workqueue_attrs
3118  * @attrs: workqueue_attrs to free
3119  *
3120  * Undo alloc_workqueue_attrs().
3121  */
3122 void free_workqueue_attrs(struct workqueue_attrs *attrs)
3123 {
3124     if (attrs) {
3125         free_cpumask_var(attrs->cpumask);
3126         kfree(attrs);
3127     }
3128 }
3129 
3130 /**
3131  * alloc_workqueue_attrs - allocate a workqueue_attrs
3132  * @gfp_mask: allocation mask to use
3133  *
3134  * Allocate a new workqueue_attrs, initialize with default settings and
3135  * return it.
3136  *
3137  * Return: The allocated new workqueue_attr on success. %NULL on failure.
3138  */
3139 struct workqueue_attrs *alloc_workqueue_attrs(gfp_t gfp_mask)
3140 {
3141     struct workqueue_attrs *attrs;
3142 
3143     attrs = kzalloc(sizeof(*attrs), gfp_mask);
3144     if (!attrs)
3145         goto fail;
3146     if (!alloc_cpumask_var(&attrs->cpumask, gfp_mask))
3147         goto fail;
3148 
3149     cpumask_copy(attrs->cpumask, cpu_possible_mask);
3150     return attrs;
3151 fail:
3152     free_workqueue_attrs(attrs);
3153     return NULL;
3154 }
3155 
3156 static void copy_workqueue_attrs(struct workqueue_attrs *to,
3157                  const struct workqueue_attrs *from)
3158 {
3159     to->nice = from->nice;
3160     cpumask_copy(to->cpumask, from->cpumask);
3161     /*
3162      * Unlike hash and equality test, this function doesn't ignore
3163      * ->no_numa as it is used for both pool and wq attrs.  Instead,
3164      * get_unbound_pool() explicitly clears ->no_numa after copying.
3165      */
3166     to->no_numa = from->no_numa;
3167 }
3168 
3169 /* hash value of the content of @attr */
3170 static u32 wqattrs_hash(const struct workqueue_attrs *attrs)
3171 {
3172     u32 hash = 0;
3173 
3174     hash = jhash_1word(attrs->nice, hash);
3175     hash = jhash(cpumask_bits(attrs->cpumask),
3176              BITS_TO_LONGS(nr_cpumask_bits) * sizeof(long), hash);
3177     return hash;
3178 }
3179 
3180 /* content equality test */
3181 static bool wqattrs_equal(const struct workqueue_attrs *a,
3182               const struct workqueue_attrs *b)
3183 {
3184     if (a->nice != b->nice)
3185         return false;
3186     if (!cpumask_equal(a->cpumask, b->cpumask))
3187         return false;
3188     return true;
3189 }
3190 
3191 /**
3192  * init_worker_pool - initialize a newly zalloc'd worker_pool
3193  * @pool: worker_pool to initialize
3194  *
3195  * Initialize a newly zalloc'd @pool.  It also allocates @pool->attrs.
3196  *
3197  * Return: 0 on success, -errno on failure.  Even on failure, all fields
3198  * inside @pool proper are initialized and put_unbound_pool() can be called
3199  * on @pool safely to release it.
3200  */
3201 static int init_worker_pool(struct worker_pool *pool)
3202 {
3203     spin_lock_init(&pool->lock);
3204     pool->id = -1;
3205     pool->cpu = -1;
3206     pool->node = NUMA_NO_NODE;
3207     pool->flags |= POOL_DISASSOCIATED;
3208     pool->watchdog_ts = jiffies;
3209     INIT_LIST_HEAD(&pool->worklist);
3210     INIT_LIST_HEAD(&pool->idle_list);
3211     hash_init(pool->busy_hash);
3212 
3213     init_timer_deferrable(&pool->idle_timer);
3214     pool->idle_timer.function = idle_worker_timeout;
3215     pool->idle_timer.data = (unsigned long)pool;
3216 
3217     setup_timer(&pool->mayday_timer, pool_mayday_timeout,
3218             (unsigned long)pool);
3219 
3220     mutex_init(&pool->manager_arb);
3221     mutex_init(&pool->attach_mutex);
3222     INIT_LIST_HEAD(&pool->workers);
3223 
3224     ida_init(&pool->worker_ida);
3225     INIT_HLIST_NODE(&pool->hash_node);
3226     pool->refcnt = 1;
3227 
3228     /* shouldn't fail above this point */
3229     pool->attrs = alloc_workqueue_attrs(GFP_KERNEL);
3230     if (!pool->attrs)
3231         return -ENOMEM;
3232     return 0;
3233 }
3234 
3235 static void rcu_free_wq(struct rcu_head *rcu)
3236 {
3237     struct workqueue_struct *wq =
3238         container_of(rcu, struct workqueue_struct, rcu);
3239 
3240     if (!(wq->flags & WQ_UNBOUND))
3241         free_percpu(wq->cpu_pwqs);
3242     else
3243         free_workqueue_attrs(wq->unbound_attrs);
3244 
3245     kfree(wq->rescuer);
3246     kfree(wq);
3247 }
3248 
3249 static void rcu_free_pool(struct rcu_head *rcu)
3250 {
3251     struct worker_pool *pool = container_of(rcu, struct worker_pool, rcu);
3252 
3253     ida_destroy(&pool->worker_ida);
3254     free_workqueue_attrs(pool->attrs);
3255     kfree(pool);
3256 }
3257 
3258 /**
3259  * put_unbound_pool - put a worker_pool
3260  * @pool: worker_pool to put
3261  *
3262  * Put @pool.  If its refcnt reaches zero, it gets destroyed in sched-RCU
3263  * safe manner.  get_unbound_pool() calls this function on its failure path
3264  * and this function should be able to release pools which went through,
3265  * successfully or not, init_worker_pool().
3266  *
3267  * Should be called with wq_pool_mutex held.
3268  */
3269 static void put_unbound_pool(struct worker_pool *pool)
3270 {
3271     DECLARE_COMPLETION_ONSTACK(detach_completion);
3272     struct worker *worker;
3273 
3274     lockdep_assert_held(&wq_pool_mutex);
3275 
3276     if (--pool->refcnt)
3277         return;
3278 
3279     /* sanity checks */
3280     if (WARN_ON(!(pool->cpu < 0)) ||
3281         WARN_ON(!list_empty(&pool->worklist)))
3282         return;
3283 
3284     /* release id and unhash */
3285     if (pool->id >= 0)
3286         idr_remove(&worker_pool_idr, pool->id);
3287     hash_del(&pool->hash_node);
3288 
3289     /*
3290      * Become the manager and destroy all workers.  Grabbing
3291      * manager_arb prevents @pool's workers from blocking on
3292      * attach_mutex.
3293      */
3294     mutex_lock(&pool->manager_arb);
3295 
3296     spin_lock_irq(&pool->lock);
3297     while ((worker = first_idle_worker(pool)))
3298         destroy_worker(worker);
3299     WARN_ON(pool->nr_workers || pool->nr_idle);
3300     spin_unlock_irq(&pool->lock);
3301 
3302     mutex_lock(&pool->attach_mutex);
3303     if (!list_empty(&pool->workers))
3304         pool->detach_completion = &detach_completion;
3305     mutex_unlock(&pool->attach_mutex);
3306 
3307     if (pool->detach_completion)
3308         wait_for_completion(pool->detach_completion);
3309 
3310     mutex_unlock(&pool->manager_arb);
3311 
3312     /* shut down the timers */
3313     del_timer_sync(&pool->idle_timer);
3314     del_timer_sync(&pool->mayday_timer);
3315 
3316     /* sched-RCU protected to allow dereferences from get_work_pool() */
3317     call_rcu_sched(&pool->rcu, rcu_free_pool);
3318 }
3319 
3320 /**
3321  * get_unbound_pool - get a worker_pool with the specified attributes
3322  * @attrs: the attributes of the worker_pool to get
3323  *
3324  * Obtain a worker_pool which has the same attributes as @attrs, bump the
3325  * reference count and return it.  If there already is a matching
3326  * worker_pool, it will be used; otherwise, this function attempts to
3327  * create a new one.
3328  *
3329  * Should be called with wq_pool_mutex held.
3330  *
3331  * Return: On success, a worker_pool with the same attributes as @attrs.
3332  * On failure, %NULL.
3333  */
3334 static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
3335 {
3336     u32 hash = wqattrs_hash(attrs);
3337     struct worker_pool *pool;
3338     int node;
3339     int target_node = NUMA_NO_NODE;
3340 
3341     lockdep_assert_held(&wq_pool_mutex);
3342 
3343     /* do we already have a matching pool? */
3344     hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
3345         if (wqattrs_equal(pool->attrs, attrs)) {
3346             pool->refcnt++;
3347             return pool;
3348         }
3349     }
3350 
3351     /* if cpumask is contained inside a NUMA node, we belong to that node */
3352     if (wq_numa_enabled) {
3353         for_each_node(node) {
3354             if (cpumask_subset(attrs->cpumask,
3355                        wq_numa_possible_cpumask[node])) {
3356                 target_node = node;
3357                 break;
3358             }
3359         }
3360     }
3361 
3362     /* nope, create a new one */
3363     pool = kzalloc_node(sizeof(*pool), GFP_KERNEL, target_node);
3364     if (!pool || init_worker_pool(pool) < 0)
3365         goto fail;
3366 
3367     lockdep_set_subclass(&pool->lock, 1);   /* see put_pwq() */
3368     copy_workqueue_attrs(pool->attrs, attrs);
3369     pool->node = target_node;
3370 
3371     /*
3372      * no_numa isn't a worker_pool attribute, always clear it.  See
3373      * 'struct workqueue_attrs' comments for detail.
3374      */
3375     pool->attrs->no_numa = false;
3376 
3377     if (worker_pool_assign_id(pool) < 0)
3378         goto fail;
3379 
3380     /* create and start the initial worker */
3381     if (wq_online && !create_worker(pool))
3382         goto fail;
3383 
3384     /* install */
3385     hash_add(unbound_pool_hash, &pool->hash_node, hash);
3386 
3387     return pool;
3388 fail:
3389     if (pool)
3390         put_unbound_pool(pool);
3391     return NULL;
3392 }
3393 
3394 static void rcu_free_pwq(struct rcu_head *rcu)
3395 {
3396     kmem_cache_free(pwq_cache,
3397             container_of(rcu, struct pool_workqueue, rcu));
3398 }
3399 
3400 /*
3401  * Scheduled on system_wq by put_pwq() when an unbound pwq hits zero refcnt
3402  * and needs to be destroyed.
3403  */
3404 static void pwq_unbound_release_workfn(struct work_struct *work)
3405 {
3406     struct pool_workqueue *pwq = container_of(work, struct pool_workqueue,
3407                           unbound_release_work);
3408     struct workqueue_struct *wq = pwq->wq;
3409     struct worker_pool *pool = pwq->pool;
3410     bool is_last;
3411 
3412     if (WARN_ON_ONCE(!(wq->flags & WQ_UNBOUND)))
3413         return;
3414 
3415     mutex_lock(&wq->mutex);
3416     list_del_rcu(&pwq->pwqs_node);
3417     is_last = list_empty(&wq->pwqs);
3418     mutex_unlock(&wq->mutex);
3419 
3420     mutex_lock(&wq_pool_mutex);
3421     put_unbound_pool(pool);
3422     mutex_unlock(&wq_pool_mutex);
3423 
3424     call_rcu_sched(&pwq->rcu, rcu_free_pwq);
3425 
3426     /*
3427      * If we're the last pwq going away, @wq is already dead and no one
3428      * is gonna access it anymore.  Schedule RCU free.
3429      */
3430     if (is_last)
3431         call_rcu_sched(&wq->rcu, rcu_free_wq);
3432 }
3433 
3434 /**
3435  * pwq_adjust_max_active - update a pwq's max_active to the current setting
3436  * @pwq: target pool_workqueue
3437  *
3438  * If @pwq isn't freezing, set @pwq->max_active to the associated
3439  * workqueue's saved_max_active and activate delayed work items
3440  * accordingly.  If @pwq is freezing, clear @pwq->max_active to zero.
3441  */
3442 static void pwq_adjust_max_active(struct pool_workqueue *pwq)
3443 {
3444     struct workqueue_struct *wq = pwq->wq;
3445     bool freezable = wq->flags & WQ_FREEZABLE;
3446     unsigned long flags;
3447 
3448     /* for @wq->saved_max_active */
3449     lockdep_assert_held(&wq->mutex);
3450 
3451     /* fast exit for non-freezable wqs */
3452     if (!freezable && pwq->max_active == wq->saved_max_active)
3453         return;
3454 
3455     /* this function can be called during early boot w/ irq disabled */
3456     spin_lock_irqsave(&pwq->pool->lock, flags);
3457 
3458     /*
3459      * During [un]freezing, the caller is responsible for ensuring that
3460      * this function is called at least once after @workqueue_freezing
3461      * is updated and visible.
3462      */
3463     if (!freezable || !workqueue_freezing) {
3464         pwq->max_active = wq->saved_max_active;
3465 
3466         while (!list_empty(&pwq->delayed_works) &&
3467                pwq->nr_active < pwq->max_active)
3468             pwq_activate_first_delayed(pwq);
3469 
3470         /*
3471          * Need to kick a worker after thawed or an unbound wq's
3472          * max_active is bumped.  It's a slow path.  Do it always.
3473          */
3474         wake_up_worker(pwq->pool);
3475     } else {
3476         pwq->max_active = 0;
3477     }
3478 
3479     spin_unlock_irqrestore(&pwq->pool->lock, flags);
3480 }
3481 
3482 /* initialize newly alloced @pwq which is associated with @wq and @pool */
3483 static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,
3484              struct worker_pool *pool)
3485 {
3486     BUG_ON((unsigned long)pwq & WORK_STRUCT_FLAG_MASK);
3487 
3488     memset(pwq, 0, sizeof(*pwq));
3489 
3490     pwq->pool = pool;
3491     pwq->wq = wq;
3492     pwq->flush_color = -1;
3493     pwq->refcnt = 1;
3494     INIT_LIST_HEAD(&pwq->delayed_works);
3495     INIT_LIST_HEAD(&pwq->pwqs_node);
3496     INIT_LIST_HEAD(&pwq->mayday_node);
3497     INIT_WORK(&pwq->unbound_release_work, pwq_unbound_release_workfn);
3498 }
3499 
3500 /* sync @pwq with the current state of its associated wq and link it */
3501 static void link_pwq(struct pool_workqueue *pwq)
3502 {
3503     struct workqueue_struct *wq = pwq->wq;
3504 
3505     lockdep_assert_held(&wq->mutex);
3506 
3507     /* may be called multiple times, ignore if already linked */
3508     if (!list_empty(&pwq->pwqs_node))
3509         return;
3510 
3511     /* set the matching work_color */
3512     pwq->work_color = wq->work_color;
3513 
3514     /* sync max_active to the current setting */
3515     pwq_adjust_max_active(pwq);
3516 
3517     /* link in @pwq */
3518     list_add_rcu(&pwq->pwqs_node, &wq->pwqs);
3519 }
3520 
3521 /* obtain a pool matching @attr and create a pwq associating the pool and @wq */
3522 static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
3523                     const struct workqueue_attrs *attrs)
3524 {
3525     struct worker_pool *pool;
3526     struct pool_workqueue *pwq;
3527 
3528     lockdep_assert_held(&wq_pool_mutex);
3529 
3530     pool = get_unbound_pool(attrs);
3531     if (!pool)
3532         return NULL;
3533 
3534     pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
3535     if (!pwq) {
3536         put_unbound_pool(pool);
3537         return NULL;
3538     }
3539 
3540     init_pwq(pwq, wq, pool);
3541     return pwq;
3542 }
3543 
3544 /**
3545  * wq_calc_node_cpumask - calculate a wq_attrs' cpumask for the specified node
3546  * @attrs: the wq_attrs of the default pwq of the target workqueue
3547  * @node: the target NUMA node
3548  * @cpu_going_down: if >= 0, the CPU to consider as offline
3549  * @cpumask: outarg, the resulting cpumask
3550  *
3551  * Calculate the cpumask a workqueue with @attrs should use on @node.  If
3552  * @cpu_going_down is >= 0, that cpu is considered offline during
3553  * calculation.  The result is stored in @cpumask.
3554  *
3555  * If NUMA affinity is not enabled, @attrs->cpumask is always used.  If
3556  * enabled and @node has online CPUs requested by @attrs, the returned
3557  * cpumask is the intersection of the possible CPUs of @node and
3558  * @attrs->cpumask.
3559  *
3560  * The caller is responsible for ensuring that the cpumask of @node stays
3561  * stable.
3562  *
3563  * Return: %true if the resulting @cpumask is different from @attrs->cpumask,
3564  * %false if equal.
3565  */
3566 static bool wq_calc_node_cpumask(const struct workqueue_attrs *attrs, int node,
3567                  int cpu_going_down, cpumask_t *cpumask)
3568 {
3569     if (!wq_numa_enabled || attrs->no_numa)
3570         goto use_dfl;
3571 
3572     /* does @node have any online CPUs @attrs wants? */
3573     cpumask_and(cpumask, cpumask_of_node(node), attrs->cpumask);
3574     if (cpu_going_down >= 0)
3575         cpumask_clear_cpu(cpu_going_down, cpumask);
3576 
3577     if (cpumask_empty(cpumask))
3578         goto use_dfl;
3579 
3580     /* yeap, return possible CPUs in @node that @attrs wants */
3581     cpumask_and(cpumask, attrs->cpumask, wq_numa_possible_cpumask[node]);
3582     return !cpumask_equal(cpumask, attrs->cpumask);
3583 
3584 use_dfl:
3585     cpumask_copy(cpumask, attrs->cpumask);
3586     return false;
3587 }
3588 
3589 /* install @pwq into @wq's numa_pwq_tbl[] for @node and return the old pwq */
3590 static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq,
3591                            int node,
3592                            struct pool_workqueue *pwq)
3593 {
3594     struct pool_workqueue *old_pwq;
3595 
3596     lockdep_assert_held(&wq_pool_mutex);
3597     lockdep_assert_held(&wq->mutex);
3598 
3599     /* link_pwq() can handle duplicate calls */
3600     link_pwq(pwq);
3601 
3602     old_pwq = rcu_access_pointer(wq->numa_pwq_tbl[node]);
3603     rcu_assign_pointer(wq->numa_pwq_tbl[node], pwq);
3604     return old_pwq;
3605 }
3606 
3607 /* context to store the prepared attrs & pwqs before applying */
3608 struct apply_wqattrs_ctx {
3609     struct workqueue_struct *wq;        /* target workqueue */
3610     struct workqueue_attrs  *attrs;     /* attrs to apply */
3611     struct list_head    list;       /* queued for batching commit */
3612     struct pool_workqueue   *dfl_pwq;
3613     struct pool_workqueue   *pwq_tbl[];
3614 };
3615 
3616 /* free the resources after success or abort */
3617 static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
3618 {
3619     if (ctx) {
3620         int node;
3621 
3622         for_each_node(node)
3623             put_pwq_unlocked(ctx->pwq_tbl[node]);
3624         put_pwq_unlocked(ctx->dfl_pwq);
3625 
3626         free_workqueue_attrs(ctx->attrs);
3627 
3628         kfree(ctx);
3629     }
3630 }
3631 
3632 /* allocate the attrs and pwqs for later installation */
3633 static struct apply_wqattrs_ctx *
3634 apply_wqattrs_prepare(struct workqueue_struct *wq,
3635               const struct workqueue_attrs *attrs)
3636 {
3637     struct apply_wqattrs_ctx *ctx;
3638     struct workqueue_attrs *new_attrs, *tmp_attrs;
3639     int node;
3640 
3641     lockdep_assert_held(&wq_pool_mutex);
3642 
3643     ctx = kzalloc(sizeof(*ctx) + nr_node_ids * sizeof(ctx->pwq_tbl[0]),
3644               GFP_KERNEL);
3645 
3646     new_attrs = alloc_workqueue_attrs(GFP_KERNEL);
3647     tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL);
3648     if (!ctx || !new_attrs || !tmp_attrs)
3649         goto out_free;
3650 
3651     /*
3652      * Calculate the attrs of the default pwq.
3653      * If the user configured cpumask doesn't overlap with the
3654      * wq_unbound_cpumask, we fallback to the wq_unbound_cpumask.
3655      */
3656     copy_workqueue_attrs(new_attrs, attrs);
3657     cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_cpumask);
3658     if (unlikely(cpumask_empty(new_attrs->cpumask)))
3659         cpumask_copy(new_attrs->cpumask, wq_unbound_cpumask);
3660 
3661     /*
3662      * We may create multiple pwqs with differing cpumasks.  Make a
3663      * copy of @new_attrs which will be modified and used to obtain
3664      * pools.
3665      */
3666     copy_workqueue_attrs(tmp_attrs, new_attrs);
3667 
3668     /*
3669      * If something goes wrong during CPU up/down, we'll fall back to
3670      * the default pwq covering whole @attrs->cpumask.  Always create
3671      * it even if we don't use it immediately.
3672      */
3673     ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
3674     if (!ctx->dfl_pwq)
3675         goto out_free;
3676 
3677     for_each_node(node) {
3678         if (wq_calc_node_cpumask(new_attrs, node, -1, tmp_attrs->cpumask)) {
3679             ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
3680             if (!ctx->pwq_tbl[node])
3681                 goto out_free;
3682         } else {
3683             ctx->dfl_pwq->refcnt++;
3684             ctx->pwq_tbl[node] = ctx->dfl_pwq;
3685         }
3686     }
3687 
3688     /* save the user configured attrs and sanitize it. */
3689     copy_workqueue_attrs(new_attrs, attrs);
3690     cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
3691     ctx->attrs = new_attrs;
3692 
3693     ctx->wq = wq;
3694     free_workqueue_attrs(tmp_attrs);
3695     return ctx;
3696 
3697 out_free:
3698     free_workqueue_attrs(tmp_attrs);
3699     free_workqueue_attrs(new_attrs);
3700     apply_wqattrs_cleanup(ctx);
3701     return NULL;
3702 }
3703 
3704 /* set attrs and install prepared pwqs, @ctx points to old pwqs on return */
3705 static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
3706 {
3707     int node;
3708 
3709     /* all pwqs have been created successfully, let's install'em */
3710     mutex_lock(&ctx->wq->mutex);
3711 
3712     copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);
3713 
3714     /* save the previous pwq and install the new one */
3715     for_each_node(node)
3716         ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node,
3717                               ctx->pwq_tbl[node]);
3718 
3719     /* @dfl_pwq might not have been used, ensure it's linked */
3720     link_pwq(ctx->dfl_pwq);
3721     swap(ctx->wq->dfl_pwq, ctx->dfl_pwq);
3722 
3723     mutex_unlock(&ctx->wq->mutex);
3724 }
3725 
3726 static void apply_wqattrs_lock(void)
3727 {
3728     /* CPUs should stay stable across pwq creations and installations */
3729     get_online_cpus();
3730     mutex_lock(&wq_pool_mutex);
3731 }
3732 
3733 static void apply_wqattrs_unlock(void)
3734 {
3735     mutex_unlock(&wq_pool_mutex);
3736     put_online_cpus();
3737 }
3738 
3739 static int apply_workqueue_attrs_locked(struct workqueue_struct *wq,
3740                     const struct workqueue_attrs *attrs)
3741 {
3742     struct apply_wqattrs_ctx *ctx;
3743 
3744     /* only unbound workqueues can change attributes */
3745     if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
3746         return -EINVAL;
3747 
3748     /* creating multiple pwqs breaks ordering guarantee */
3749     if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))
3750         return -EINVAL;
3751 
3752     ctx = apply_wqattrs_prepare(wq, attrs);
3753     if (!ctx)
3754         return -ENOMEM;
3755 
3756     /* the ctx has been prepared successfully, let's commit it */
3757     apply_wqattrs_commit(ctx);
3758     apply_wqattrs_cleanup(ctx);
3759 
3760     return 0;
3761 }
3762 
3763 /**
3764  * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
3765  * @wq: the target workqueue
3766  * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
3767  *
3768  * Apply @attrs to an unbound workqueue @wq.  Unless disabled, on NUMA
3769  * machines, this function maps a separate pwq to each NUMA node with
3770  * possibles CPUs in @attrs->cpumask so that work items are affine to the
3771  * NUMA node it was issued on.  Older pwqs are released as in-flight work
3772  * items finish.  Note that a work item which repeatedly requeues itself
3773  * back-to-back will stay on its current pwq.
3774  *
3775  * Performs GFP_KERNEL allocations.
3776  *
3777  * Return: 0 on success and -errno on failure.
3778  */
3779 int apply_workqueue_attrs(struct workqueue_struct *wq,
3780               const struct workqueue_attrs *attrs)
3781 {
3782     int ret;
3783 
3784     apply_wqattrs_lock();
3785     ret = apply_workqueue_attrs_locked(wq, attrs);
3786     apply_wqattrs_unlock();
3787 
3788     return ret;
3789 }
3790 
3791 /**
3792  * wq_update_unbound_numa - update NUMA affinity of a wq for CPU hot[un]plug
3793  * @wq: the target workqueue
3794  * @cpu: the CPU coming up or going down
3795  * @online: whether @cpu is coming up or going down
3796  *
3797  * This function is to be called from %CPU_DOWN_PREPARE, %CPU_ONLINE and
3798  * %CPU_DOWN_FAILED.  @cpu is being hot[un]plugged, update NUMA affinity of
3799  * @wq accordingly.
3800  *
3801  * If NUMA affinity can't be adjusted due to memory allocation failure, it
3802  * falls back to @wq->dfl_pwq which may not be optimal but is always
3803  * correct.
3804  *
3805  * Note that when the last allowed CPU of a NUMA node goes offline for a
3806  * workqueue with a cpumask spanning multiple nodes, the workers which were
3807  * already executing the work items for the workqueue will lose their CPU
3808  * affinity and may execute on any CPU.  This is similar to how per-cpu
3809  * workqueues behave on CPU_DOWN.  If a workqueue user wants strict
3810  * affinity, it's the user's responsibility to flush the work item from
3811  * CPU_DOWN_PREPARE.
3812  */
3813 static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu,
3814                    bool online)
3815 {
3816     int node = cpu_to_node(cpu);
3817     int cpu_off = online ? -1 : cpu;
3818     struct pool_workqueue *old_pwq = NULL, *pwq;
3819     struct workqueue_attrs *target_attrs;
3820     cpumask_t *cpumask;
3821 
3822     lockdep_assert_held(&wq_pool_mutex);
3823 
3824     if (!wq_numa_enabled || !(wq->flags & WQ_UNBOUND) ||
3825         wq->unbound_attrs->no_numa)
3826         return;
3827 
3828     /*
3829      * We don't wanna alloc/free wq_attrs for each wq for each CPU.
3830      * Let's use a preallocated one.  The following buf is protected by
3831      * CPU hotplug exclusion.
3832      */
3833     target_attrs = wq_update_unbound_numa_attrs_buf;
3834     cpumask = target_attrs->cpumask;
3835 
3836     copy_workqueue_attrs(target_attrs, wq->unbound_attrs);
3837     pwq = unbound_pwq_by_node(wq, node);
3838 
3839     /*
3840      * Let's determine what needs to be done.  If the target cpumask is
3841      * different from the default pwq's, we need to compare it to @pwq's
3842      * and create a new one if they don't match.  If the target cpumask
3843      * equals the default pwq's, the default pwq should be used.
3844      */
3845     if (wq_calc_node_cpumask(wq->dfl_pwq->pool->attrs, node, cpu_off, cpumask)) {
3846         if (cpumask_equal(cpumask, pwq->pool->attrs->cpumask))
3847             return;
3848     } else {
3849         goto use_dfl_pwq;
3850     }
3851 
3852     /* create a new pwq */
3853     pwq = alloc_unbound_pwq(wq, target_attrs);
3854     if (!pwq) {
3855         pr_warn("workqueue: allocation failed while updating NUMA affinity of \"%s\"\n",
3856             wq->name);
3857         goto use_dfl_pwq;
3858     }
3859 
3860     /* Install the new pwq. */
3861     mutex_lock(&wq->mutex);
3862     old_pwq = numa_pwq_tbl_install(wq, node, pwq);
3863     goto out_unlock;
3864 
3865 use_dfl_pwq:
3866     mutex_lock(&wq->mutex);
3867     spin_lock_irq(&wq->dfl_pwq->pool->lock);
3868     get_pwq(wq->dfl_pwq);
3869     spin_unlock_irq(&wq->dfl_pwq->pool->lock);
3870     old_pwq = numa_pwq_tbl_install(wq, node, wq->dfl_pwq);
3871 out_unlock:
3872     mutex_unlock(&wq->mutex);
3873     put_pwq_unlocked(old_pwq);
3874 }
3875 
3876 static int alloc_and_link_pwqs(struct workqueue_struct *wq)
3877 {
3878     bool highpri = wq->flags & WQ_HIGHPRI;
3879     int cpu, ret;
3880 
3881     if (!(wq->flags & WQ_UNBOUND)) {
3882         wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
3883         if (!wq->cpu_pwqs)
3884             return -ENOMEM;
3885 
3886         for_each_possible_cpu(cpu) {
3887             struct pool_workqueue *pwq =
3888                 per_cpu_ptr(wq->cpu_pwqs, cpu);
3889             struct worker_pool *cpu_pools =
3890                 per_cpu(cpu_worker_pools, cpu);
3891 
3892             init_pwq(pwq, wq, &cpu_pools[highpri]);
3893 
3894             mutex_lock(&wq->mutex);
3895             link_pwq(pwq);
3896             mutex_unlock(&wq->mutex);
3897         }
3898         return 0;
3899     } else if (wq->flags & __WQ_ORDERED) {
3900         ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
3901         /* there should only be single pwq for ordering guarantee */
3902         WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node ||
3903                   wq->pwqs.prev != &wq->dfl_pwq->pwqs_node),
3904              "ordering guarantee broken for workqueue %s\n", wq->name);
3905         return ret;
3906     } else {
3907         return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
3908     }
3909 }
3910 
3911 static int wq_clamp_max_active(int max_active, unsigned int flags,
3912                    const char *name)
3913 {
3914     int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE;
3915 
3916     if (max_active < 1 || max_active > lim)
3917         pr_warn("workqueue: max_active %d requested for %s is out of range, clamping between %d and %d\n",
3918             max_active, name, 1, lim);
3919 
3920     return clamp_val(max_active, 1, lim);
3921 }
3922 
3923 struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
3924                            unsigned int flags,
3925                            int max_active,
3926                            struct lock_class_key *key,
3927                            const char *lock_name, ...)
3928 {
3929     size_t tbl_size = 0;
3930     va_list args;
3931     struct workqueue_struct *wq;
3932     struct pool_workqueue *pwq;
3933 
3934     /* see the comment above the definition of WQ_POWER_EFFICIENT */
3935     if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)
3936         flags |= WQ_UNBOUND;
3937 
3938     /* allocate wq and format name */
3939     if (flags & WQ_UNBOUND)
3940         tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);
3941 
3942     wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
3943     if (!wq)
3944         return NULL;
3945 
3946     if (flags & WQ_UNBOUND) {
3947         wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL);
3948         if (!wq->unbound_attrs)
3949             goto err_free_wq;
3950     }
3951 
3952     va_start(args, lock_name);
3953     vsnprintf(wq->name, sizeof(wq->name), fmt, args);
3954     va_end(args);
3955 
3956     max_active = max_active ?: WQ_DFL_ACTIVE;
3957     max_active = wq_clamp_max_active(max_active, flags, wq->name);
3958 
3959     /* init wq */
3960     wq->flags = flags;
3961     wq->saved_max_active = max_active;
3962     mutex_init(&wq->mutex);
3963     atomic_set(&wq->nr_pwqs_to_flush, 0);
3964     INIT_LIST_HEAD(&wq->pwqs);
3965     INIT_LIST_HEAD(&wq->flusher_queue);
3966     INIT_LIST_HEAD(&wq->flusher_overflow);
3967     INIT_LIST_HEAD(&wq->maydays);
3968 
3969     lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
3970     INIT_LIST_HEAD(&wq->list);
3971 
3972     if (alloc_and_link_pwqs(wq) < 0)
3973         goto err_free_wq;
3974 
3975     /*
3976      * Workqueues which may be used during memory reclaim should
3977      * have a rescuer to guarantee forward progress.
3978      */
3979     if (flags & WQ_MEM_RECLAIM) {
3980         struct worker *rescuer;
3981 
3982         rescuer = alloc_worker(NUMA_NO_NODE);
3983         if (!rescuer)
3984             goto err_destroy;
3985 
3986         rescuer->rescue_wq = wq;
3987         rescuer->task = kthread_create(rescuer_thread, rescuer, "%s",
3988                            wq->name);
3989         if (IS_ERR(rescuer->task)) {
3990             kfree(rescuer);
3991             goto err_destroy;
3992         }
3993 
3994         wq->rescuer = rescuer;
3995         kthread_bind_mask(rescuer->task, cpu_possible_mask);
3996         wake_up_process(rescuer->task);
3997     }
3998 
3999     if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq))
4000         goto err_destroy;
4001 
4002     /*
4003      * wq_pool_mutex protects global freeze state and workqueues list.
4004      * Grab it, adjust max_active and add the new @wq to workqueues
4005      * list.
4006      */
4007     mutex_lock(&wq_pool_mutex);
4008 
4009     mutex_lock(&wq->mutex);
4010     for_each_pwq(pwq, wq)
4011         pwq_adjust_max_active(pwq);
4012     mutex_unlock(&wq->mutex);
4013 
4014     list_add_tail_rcu(&wq->list, &workqueues);
4015 
4016     mutex_unlock(&wq_pool_mutex);
4017 
4018     return wq;
4019 
4020 err_free_wq:
4021     free_workqueue_attrs(wq->unbound_attrs);
4022     kfree(wq);
4023     return NULL;
4024 err_destroy:
4025     destroy_workqueue(wq);
4026     return NULL;
4027 }
4028 EXPORT_SYMBOL_GPL(__alloc_workqueue_key);
4029 
4030 /**
4031  * destroy_workqueue - safely terminate a workqueue
4032  * @wq: target workqueue
4033  *
4034  * Safely destroy a workqueue. All work currently pending will be done first.
4035  */
4036 void destroy_workqueue(struct workqueue_struct *wq)
4037 {
4038     struct pool_workqueue *pwq;
4039     int node;
4040 
4041     /* drain it before proceeding with destruction */
4042     drain_workqueue(wq);
4043 
4044     /* sanity checks */
4045     mutex_lock(&wq->mutex);
4046     for_each_pwq(pwq, wq) {
4047         int i;
4048 
4049         for (i = 0; i < WORK_NR_COLORS; i++) {
4050             if (WARN_ON(pwq->nr_in_flight[i])) {
4051                 mutex_unlock(&wq->mutex);
4052                 show_workqueue_state();
4053                 return;
4054             }
4055         }
4056 
4057         if (WARN_ON((pwq != wq->dfl_pwq) && (pwq->refcnt > 1)) ||
4058             WARN_ON(pwq->nr_active) ||
4059             WARN_ON(!list_empty(&pwq->delayed_works))) {
4060             mutex_unlock(&wq->mutex);
4061             show_workqueue_state();
4062             return;
4063         }
4064     }
4065     mutex_unlock(&wq->mutex);
4066 
4067     /*
4068      * wq list is used to freeze wq, remove from list after
4069      * flushing is complete in case freeze races us.
4070      */
4071     mutex_lock(&wq_pool_mutex);
4072     list_del_rcu(&wq->list);
4073     mutex_unlock(&wq_pool_mutex);
4074 
4075     workqueue_sysfs_unregister(wq);
4076 
4077     if (wq->rescuer)
4078         kthread_stop(wq->rescuer->task);
4079 
4080     if (!(wq->flags & WQ_UNBOUND)) {
4081         /*
4082          * The base ref is never dropped on per-cpu pwqs.  Directly
4083          * schedule RCU free.
4084          */
4085         call_rcu_sched(&wq->rcu, rcu_free_wq);
4086     } else {
4087         /*
4088          * We're the sole accessor of @wq at this point.  Directly
4089          * access numa_pwq_tbl[] and dfl_pwq to put the base refs.
4090          * @wq will be freed when the last pwq is released.
4091          */
4092         for_each_node(node) {
4093             pwq = rcu_access_pointer(wq->numa_pwq_tbl[node]);
4094             RCU_INIT_POINTER(wq->numa_pwq_tbl[node], NULL);
4095             put_pwq_unlocked(pwq);
4096         }
4097 
4098         /*
4099          * Put dfl_pwq.  @wq may be freed any time after dfl_pwq is
4100          * put.  Don't access it afterwards.
4101          */
4102         pwq = wq->dfl_pwq;
4103         wq->dfl_pwq = NULL;
4104         put_pwq_unlocked(pwq);
4105     }
4106 }
4107 EXPORT_SYMBOL_GPL(destroy_workqueue);
4108 
4109 /**
4110  * workqueue_set_max_active - adjust max_active of a workqueue
4111  * @wq: target workqueue
4112  * @max_active: new max_active value.
4113  *
4114  * Set max_active of @wq to @max_active.
4115  *
4116  * CONTEXT:
4117  * Don't call from IRQ context.
4118  */
4119 void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
4120 {
4121     struct pool_workqueue *pwq;
4122 
4123     /* disallow meddling with max_active for ordered workqueues */
4124     if (WARN_ON(wq->flags & __WQ_ORDERED))
4125         return;
4126 
4127     max_active = wq_clamp_max_active(max_active, wq->flags, wq->name);
4128 
4129     mutex_lock(&wq->mutex);
4130 
4131     wq->saved_max_active = max_active;
4132 
4133     for_each_pwq(pwq, wq)
4134         pwq_adjust_max_active(pwq);
4135 
4136     mutex_unlock(&wq->mutex);
4137 }
4138 EXPORT_SYMBOL_GPL(workqueue_set_max_active);
4139 
4140 /**
4141  * current_is_workqueue_rescuer - is %current workqueue rescuer?
4142  *
4143  * Determine whether %current is a workqueue rescuer.  Can be used from
4144  * work functions to determine whether it's being run off the rescuer task.
4145  *
4146  * Return: %true if %current is a workqueue rescuer. %false otherwise.
4147  */
4148 bool current_is_workqueue_rescuer(void)
4149 {
4150     struct worker *worker = current_wq_worker();
4151 
4152     return worker && worker->rescue_wq;
4153 }
4154 
4155 /**
4156  * workqueue_congested - test whether a workqueue is congested
4157  * @cpu: CPU in question
4158  * @wq: target workqueue
4159  *
4160  * Test whether @wq's cpu workqueue for @cpu is congested.  There is
4161  * no synchronization around this function and the test result is
4162  * unreliable and only useful as advisory hints or for debugging.
4163  *
4164  * If @cpu is WORK_CPU_UNBOUND, the test is performed on the local CPU.
4165  * Note that both per-cpu and unbound workqueues may be associated with
4166  * multiple pool_workqueues which have separate congested states.  A
4167  * workqueue being congested on one CPU doesn't mean the workqueue is also
4168  * contested on other CPUs / NUMA nodes.
4169  *
4170  * Return:
4171  * %true if congested, %false otherwise.
4172  */
4173 bool workqueue_congested(int cpu, struct workqueue_struct *wq)
4174 {
4175     struct pool_workqueue *pwq;
4176     bool ret;
4177 
4178     rcu_read_lock_sched();
4179 
4180     if (cpu == WORK_CPU_UNBOUND)
4181         cpu = smp_processor_id();
4182 
4183     if (!(wq->flags & WQ_UNBOUND))
4184         pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
4185     else
4186         pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
4187 
4188     ret = !list_empty(&pwq->delayed_works);
4189     rcu_read_unlock_sched();
4190 
4191     return ret;
4192 }
4193 EXPORT_SYMBOL_GPL(workqueue_congested);
4194 
4195 /**
4196  * work_busy - test whether a work is currently pending or running
4197  * @work: the work to be tested
4198  *
4199  * Test whether @work is currently pending or running.  There is no
4200  * synchronization around this function and the test result is
4201  * unreliable and only useful as advisory hints or for debugging.
4202  *
4203  * Return:
4204  * OR'd bitmask of WORK_BUSY_* bits.
4205  */
4206 unsigned int work_busy(struct work_struct *work)
4207 {
4208     struct worker_pool *pool;
4209     unsigned long flags;
4210     unsigned int ret = 0;
4211 
4212     if (work_pending(work))
4213         ret |= WORK_BUSY_PENDING;
4214 
4215     local_irq_save(flags);
4216     pool = get_work_pool(work);
4217     if (pool) {
4218         spin_lock(&pool->lock);
4219         if (find_worker_executing_work(pool, work))
4220             ret |= WORK_BUSY_RUNNING;
4221         spin_unlock(&pool->lock);
4222     }
4223     local_irq_restore(flags);
4224 
4225     return ret;
4226 }
4227 EXPORT_SYMBOL_GPL(work_busy);
4228 
4229 /**
4230  * set_worker_desc - set description for the current work item
4231  * @fmt: printf-style format string
4232  * @...: arguments for the format string
4233  *
4234  * This function can be called by a running work function to describe what
4235  * the work item is about.  If the worker task gets dumped, this
4236  * information will be printed out together to help debugging.  The
4237  * description can be at most WORKER_DESC_LEN including the trailing '\0'.
4238  */
4239 void set_worker_desc(const char *fmt, ...)
4240 {
4241     struct worker *worker = current_wq_worker();
4242     va_list args;
4243 
4244     if (worker) {
4245         va_start(args, fmt);
4246         vsnprintf(worker->desc, sizeof(worker->desc), fmt, args);
4247         va_end(args);
4248         worker->desc_valid = true;
4249     }
4250 }
4251 
4252 /**
4253  * print_worker_info - print out worker information and description
4254  * @log_lvl: the log level to use when printing
4255  * @task: target task
4256  *
4257  * If @task is a worker and currently executing a work item, print out the
4258  * name of the workqueue being serviced and worker description set with
4259  * set_worker_desc() by the currently executing work item.
4260  *
4261  * This function can be safely called on any task as long as the
4262  * task_struct itself is accessible.  While safe, this function isn't
4263  * synchronized and may print out mixups or garbages of limited length.
4264  */
4265 void print_worker_info(const char *log_lvl, struct task_struct *task)
4266 {
4267     work_func_t *fn = NULL;
4268     char name[WQ_NAME_LEN] = { };
4269     char desc[WORKER_DESC_LEN] = { };
4270     struct pool_workqueue *pwq = NULL;
4271     struct workqueue_struct *wq = NULL;
4272     bool desc_valid = false;
4273     struct worker *worker;
4274 
4275     if (!(task->flags & PF_WQ_WORKER))
4276         return;
4277 
4278     /*
4279      * This function is called without any synchronization and @task
4280      * could be in any state.  Be careful with dereferences.
4281      */
4282     worker = kthread_probe_data(task);
4283 
4284     /*
4285      * Carefully copy the associated workqueue's workfn and name.  Keep
4286      * the original last '\0' in case the original contains garbage.
4287      */
4288     probe_kernel_read(&fn, &worker->current_func, sizeof(fn));
4289     probe_kernel_read(&pwq, &worker->current_pwq, sizeof(pwq));
4290     probe_kernel_read(&wq, &pwq->wq, sizeof(wq));
4291     probe_kernel_read(name, wq->name, sizeof(name) - 1);
4292 
4293     /* copy worker description */
4294     probe_kernel_read(&desc_valid, &worker->desc_valid, sizeof(desc_valid));
4295     if (desc_valid)
4296         probe_kernel_read(desc, worker->desc, sizeof(desc) - 1);
4297 
4298     if (fn || name[0] || desc[0]) {
4299         printk("%sWorkqueue: %s %pf", log_lvl, name, fn);
4300         if (desc[0])
4301             pr_cont(" (%s)", desc);
4302         pr_cont("\n");
4303     }
4304 }
4305 
4306 static void pr_cont_pool_info(struct worker_pool *pool)
4307 {
4308     pr_cont(" cpus=%*pbl", nr_cpumask_bits, pool->attrs->cpumask);
4309     if (pool->node != NUMA_NO_NODE)
4310         pr_cont(" node=%d", pool->node);
4311     pr_cont(" flags=0x%x nice=%d", pool->flags, pool->attrs->nice);
4312 }
4313 
4314 static void pr_cont_work(bool comma, struct work_struct *work)
4315 {
4316     if (work->func == wq_barrier_func) {
4317         struct wq_barrier *barr;
4318 
4319         barr = container_of(work, struct wq_barrier, work);
4320 
4321         pr_cont("%s BAR(%d)", comma ? "," : "",
4322             task_pid_nr(barr->task));
4323     } else {
4324         pr_cont("%s %pf", comma ? "," : "", work->func);
4325     }
4326 }
4327 
4328 static void show_pwq(struct pool_workqueue *pwq)
4329 {
4330     struct worker_pool *pool = pwq->pool;
4331     struct work_struct *work;
4332     struct worker *worker;
4333     bool has_in_flight = false, has_pending = false;
4334     int bkt;
4335 
4336     pr_info("  pwq %d:", pool->id);
4337     pr_cont_pool_info(pool);
4338 
4339     pr_cont(" active=%d/%d%s\n", pwq->nr_active, pwq->max_active,
4340         !list_empty(&pwq->mayday_node) ? " MAYDAY" : "");
4341 
4342     hash_for_each(pool->busy_hash, bkt, worker, hentry) {
4343         if (worker->current_pwq == pwq) {
4344             has_in_flight = true;
4345             break;
4346         }
4347     }
4348     if (has_in_flight) {
4349         bool comma = false;
4350 
4351         pr_info("    in-flight:");
4352         hash_for_each(pool->busy_hash, bkt, worker, hentry) {
4353             if (worker->current_pwq != pwq)
4354                 continue;
4355 
4356             pr_cont("%s %d%s:%pf", comma ? "," : "",
4357                 task_pid_nr(worker->task),
4358                 worker == pwq->wq->rescuer ? "(RESCUER)" : "",
4359                 worker->current_func);
4360             list_for_each_entry(work, &worker->scheduled, entry)
4361                 pr_cont_work(false, work);
4362             comma = true;
4363         }
4364         pr_cont("\n");
4365     }
4366 
4367     list_for_each_entry(work, &pool->worklist, entry) {
4368         if (get_work_pwq(work) == pwq) {
4369             has_pending = true;
4370             break;
4371         }
4372     }
4373     if (has_pending) {
4374         bool comma = false;
4375 
4376         pr_info("    pending:");
4377         list_for_each_entry(work, &pool->worklist, entry) {
4378             if (get_work_pwq(work) != pwq)
4379                 continue;
4380 
4381             pr_cont_work(comma, work);
4382             comma = !(*work_data_bits(work) & WORK_STRUCT_LINKED);
4383         }
4384         pr_cont("\n");
4385     }
4386 
4387     if (!list_empty(&pwq->delayed_works)) {
4388         bool comma = false;
4389 
4390         pr_info("    delayed:");
4391         list_for_each_entry(work, &pwq->delayed_works, entry) {
4392             pr_cont_work(comma, work);
4393             comma = !(*work_data_bits(work) & WORK_STRUCT_LINKED);
4394         }
4395         pr_cont("\n");
4396     }
4397 }
4398 
4399 /**
4400  * show_workqueue_state - dump workqueue state
4401  *
4402  * Called from a sysrq handler or try_to_freeze_tasks() and prints out
4403  * all busy workqueues and pools.
4404  */
4405 void show_workqueue_state(void)
4406 {
4407     struct workqueue_struct *wq;
4408     struct worker_pool *pool;
4409     unsigned long flags;
4410     int pi;
4411 
4412     rcu_read_lock_sched();
4413 
4414     pr_info("Showing busy workqueues and worker pools:\n");
4415 
4416     list_for_each_entry_rcu(wq, &workqueues, list) {
4417         struct pool_workqueue *pwq;
4418         bool idle = true;
4419 
4420         for_each_pwq(pwq, wq) {
4421             if (pwq->nr_active || !list_empty(&pwq->delayed_works)) {
4422                 idle = false;
4423                 break;
4424             }
4425         }
4426         if (idle)
4427             continue;
4428 
4429         pr_info("workqueue %s: flags=0x%x\n", wq->name, wq->flags);
4430 
4431         for_each_pwq(pwq, wq) {
4432             spin_lock_irqsave(&pwq->pool->lock, flags);
4433             if (pwq->nr_active || !list_empty(&pwq->delayed_works))
4434                 show_pwq(pwq);
4435             spin_unlock_irqrestore(&pwq->pool->lock, flags);
4436         }
4437     }
4438 
4439     for_each_pool(pool, pi) {
4440         struct worker *worker;
4441         bool first = true;
4442 
4443         spin_lock_irqsave(&pool->lock, flags);
4444         if (pool->nr_workers == pool->nr_idle)
4445             goto next_pool;
4446 
4447         pr_info("pool %d:", pool->id);
4448         pr_cont_pool_info(pool);
4449         pr_cont(" hung=%us workers=%d",
4450             jiffies_to_msecs(jiffies - pool->watchdog_ts) / 1000,
4451             pool->nr_workers);
4452         if (pool->manager)
4453             pr_cont(" manager: %d",
4454                 task_pid_nr(pool->manager->task));
4455         list_for_each_entry(worker, &pool->idle_list, entry) {
4456             pr_cont(" %s%d", first ? "idle: " : "",
4457                 task_pid_nr(worker->task));
4458             first = false;
4459         }
4460         pr_cont("\n");
4461     next_pool:
4462         spin_unlock_irqrestore(&pool->lock, flags);
4463     }
4464 
4465     rcu_read_unlock_sched();
4466 }
4467 
4468 /*
4469  * CPU hotplug.
4470  *
4471  * There are two challenges in supporting CPU hotplug.  Firstly, there
4472  * are a lot of assumptions on strong associations among work, pwq and
4473  * pool which make migrating pending and scheduled works very
4474  * difficult to implement without impacting hot paths.  Secondly,
4475  * worker pools serve mix of short, long and very long running works making
4476  * blocked draining impractical.
4477  *
4478  * This is solved by allowing the pools to be disassociated from the CPU
4479  * running as an unbound one and allowing it to be reattached later if the
4480  * cpu comes back online.
4481  */
4482 
4483 static void wq_unbind_fn(struct work_struct *work)
4484 {
4485     int cpu = smp_processor_id();
4486     struct worker_pool *pool;
4487     struct worker *worker;
4488 
4489     for_each_cpu_worker_pool(pool, cpu) {
4490         mutex_lock(&pool->attach_mutex);
4491         spin_lock_irq(&pool->lock);
4492 
4493         /*
4494          * We've blocked all attach/detach operations. Make all workers
4495          * unbound and set DISASSOCIATED.  Before this, all workers
4496          * except for the ones which are still executing works from
4497          * before the last CPU down must be on the cpu.  After
4498          * this, they may become diasporas.
4499          */
4500         for_each_pool_worker(worker, pool)
4501             worker->flags |= WORKER_UNBOUND;
4502 
4503         pool->flags |= POOL_DISASSOCIATED;
4504 
4505         spin_unlock_irq(&pool->lock);
4506         mutex_unlock(&pool->attach_mutex);
4507 
4508         /*
4509          * Call schedule() so that we cross rq->lock and thus can
4510          * guarantee sched callbacks see the %WORKER_UNBOUND flag.
4511          * This is necessary as scheduler callbacks may be invoked
4512          * from other cpus.
4513          */
4514         schedule();
4515 
4516         /*
4517          * Sched callbacks are disabled now.  Zap nr_running.
4518          * After this, nr_running stays zero and need_more_worker()
4519          * and keep_working() are always true as long as the
4520          * worklist is not empty.  This pool now behaves as an
4521          * unbound (in terms of concurrency management) pool which
4522          * are served by workers tied to the pool.
4523          */
4524         atomic_set(&pool->nr_running, 0);
4525 
4526         /*
4527          * With concurrency management just turned off, a busy
4528          * worker blocking could lead to lengthy stalls.  Kick off
4529          * unbound chain execution of currently pending work items.
4530          */
4531         spin_lock_irq(&pool->lock);
4532         wake_up_worker(pool);
4533         spin_unlock_irq(&pool->lock);
4534     }
4535 }
4536 
4537 /**
4538  * rebind_workers - rebind all workers of a pool to the associated CPU
4539  * @pool: pool of interest
4540  *
4541  * @pool->cpu is coming online.  Rebind all workers to the CPU.
4542  */
4543 static void rebind_workers(struct worker_pool *pool)
4544 {
4545     struct worker *worker;
4546 
4547     lockdep_assert_held(&pool->attach_mutex);
4548 
4549     /*
4550      * Restore CPU affinity of all workers.  As all idle workers should
4551      * be on the run-queue of the associated CPU before any local
4552      * wake-ups for concurrency management happen, restore CPU affinity
4553      * of all workers first and then clear UNBOUND.  As we're called
4554      * from CPU_ONLINE, the following shouldn't fail.
4555      */
4556     for_each_pool_worker(worker, pool)
4557         WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task,
4558                           pool->attrs->cpumask) < 0);
4559 
4560     spin_lock_irq(&pool->lock);
4561 
4562     /*
4563      * XXX: CPU hotplug notifiers are weird and can call DOWN_FAILED
4564      * w/o preceding DOWN_PREPARE.  Work around it.  CPU hotplug is
4565      * being reworked and this can go away in time.
4566      */
4567     if (!(pool->flags & POOL_DISASSOCIATED)) {
4568         spin_unlock_irq(&pool->lock);
4569         return;
4570     }
4571 
4572     pool->flags &= ~POOL_DISASSOCIATED;
4573 
4574     for_each_pool_worker(worker, pool) {
4575         unsigned int worker_flags = worker->flags;
4576 
4577         /*
4578          * A bound idle worker should actually be on the runqueue
4579          * of the associated CPU for local wake-ups targeting it to
4580          * work.  Kick all idle workers so that they migrate to the
4581          * associated CPU.  Doing this in the same loop as
4582          * replacing UNBOUND with REBOUND is safe as no worker will
4583          * be bound before @pool->lock is released.
4584          */
4585         if (worker_flags & WORKER_IDLE)
4586             wake_up_process(worker->task);
4587 
4588         /*
4589          * We want to clear UNBOUND but can't directly call
4590          * worker_clr_flags() or adjust nr_running.  Atomically
4591          * replace UNBOUND with another NOT_RUNNING flag REBOUND.
4592          * @worker will clear REBOUND using worker_clr_flags() when
4593          * it initiates the next execution cycle thus restoring
4594          * concurrency management.  Note that when or whether
4595          * @worker clears REBOUND doesn't affect correctness.
4596          *
4597          * ACCESS_ONCE() is necessary because @worker->flags may be
4598          * tested without holding any lock in
4599          * wq_worker_waking_up().  Without it, NOT_RUNNING test may
4600          * fail incorrectly leading to premature concurrency
4601          * management operations.
4602          */
4603         WARN_ON_ONCE(!(worker_flags & WORKER_UNBOUND));
4604         worker_flags |= WORKER_REBOUND;
4605         worker_flags &= ~WORKER_UNBOUND;
4606         ACCESS_ONCE(worker->flags) = worker_flags;
4607     }
4608 
4609     spin_unlock_irq(&pool->lock);
4610 }
4611 
4612 /**
4613  * restore_unbound_workers_cpumask - restore cpumask of unbound workers
4614  * @pool: unbound pool of interest
4615  * @cpu: the CPU which is coming up
4616  *
4617  * An unbound pool may end up with a cpumask which doesn't have any online
4618  * CPUs.  When a worker of such pool get scheduled, the scheduler resets
4619  * its cpus_allowed.  If @cpu is in @pool's cpumask which didn't have any
4620  * online CPU before, cpus_allowed of all its workers should be restored.
4621  */
4622 static void restore_unbound_workers_cpumask(struct worker_pool *pool, int cpu)
4623 {
4624     static cpumask_t cpumask;
4625     struct worker *worker;
4626 
4627     lockdep_assert_held(&pool->attach_mutex);
4628 
4629     /* is @cpu allowed for @pool? */
4630     if (!cpumask_test_cpu(cpu, pool->attrs->cpumask))
4631         return;
4632 
4633     cpumask_and(&cpumask, pool->attrs->cpumask, cpu_online_mask);
4634 
4635     /* as we're called from CPU_ONLINE, the following shouldn't fail */
4636     for_each_pool_worker(worker, pool)
4637         WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task, &cpumask) < 0);
4638 }
4639 
4640 int workqueue_prepare_cpu(unsigned int cpu)
4641 {
4642     struct worker_pool *pool;
4643 
4644     for_each_cpu_worker_pool(pool, cpu) {
4645         if (pool->nr_workers)
4646             continue;
4647         if (!create_worker(pool))
4648             return -ENOMEM;
4649     }
4650     return 0;
4651 }
4652 
4653 int workqueue_online_cpu(unsigned int cpu)
4654 {
4655     struct worker_pool *pool;
4656     struct workqueue_struct *wq;
4657     int pi;
4658 
4659     mutex_lock(&wq_pool_mutex);
4660 
4661     for_each_pool(pool, pi) {
4662         mutex_lock(&pool->attach_mutex);
4663 
4664         if (pool->cpu == cpu)
4665             rebind_workers(pool);
4666         else if (pool->cpu < 0)
4667             restore_unbound_workers_cpumask(pool, cpu);
4668 
4669         mutex_unlock(&pool->attach_mutex);
4670     }
4671 
4672     /* update NUMA affinity of unbound workqueues */
4673     list_for_each_entry(wq, &workqueues, list)
4674         wq_update_unbound_numa(wq, cpu, true);
4675 
4676     mutex_unlock(&wq_pool_mutex);
4677     return 0;
4678 }
4679 
4680 int workqueue_offline_cpu(unsigned int cpu)
4681 {
4682     struct work_struct unbind_work;
4683     struct workqueue_struct *wq;
4684 
4685     /* unbinding per-cpu workers should happen on the local CPU */
4686     INIT_WORK_ONSTACK(&unbind_work, wq_unbind_fn);
4687     queue_work_on(cpu, system_highpri_wq, &unbind_work);
4688 
4689     /* update NUMA affinity of unbound workqueues */
4690     mutex_lock(&wq_pool_mutex);
4691     list_for_each_entry(wq, &workqueues, list)
4692         wq_update_unbound_numa(wq, cpu, false);
4693     mutex_unlock(&wq_pool_mutex);
4694 
4695     /* wait for per-cpu unbinding to finish */
4696     flush_work(&unbind_work);
4697     destroy_work_on_stack(&unbind_work);
4698     return 0;
4699 }
4700 
4701 #ifdef CONFIG_SMP
4702 
4703 struct work_for_cpu {
4704     struct work_struct work;
4705     long (*fn)(void *);
4706     void *arg;
4707     long ret;
4708 };
4709 
4710 static void work_for_cpu_fn(struct work_struct *work)
4711 {
4712     struct work_for_cpu *wfc = container_of(work, struct work_for_cpu, work);
4713 
4714     wfc->ret = wfc->fn(wfc->arg);
4715 }
4716 
4717 /**
4718  * work_on_cpu - run a function in thread context on a particular cpu
4719  * @cpu: the cpu to run on
4720  * @fn: the function to run
4721  * @arg: the function arg
4722  *
4723  * It is up to the caller to ensure that the cpu doesn't go offline.
4724  * The caller must not hold any locks which would prevent @fn from completing.
4725  *
4726  * Return: The value @fn returns.
4727  */
4728 long work_on_cpu(int cpu, long (*fn)(void *), void *arg)
4729 {
4730     struct work_for_cpu wfc = { .fn = fn, .arg = arg };
4731 
4732     INIT_WORK_ONSTACK(&wfc.work, work_for_cpu_fn);
4733     schedule_work_on(cpu, &wfc.work);
4734     flush_work(&wfc.work);
4735     destroy_work_on_stack(&wfc.work);
4736     return wfc.ret;
4737 }
4738 EXPORT_SYMBOL_GPL(work_on_cpu);
4739 #endif /* CONFIG_SMP */
4740 
4741 #ifdef CONFIG_FREEZER
4742 
4743 /**
4744  * freeze_workqueues_begin - begin freezing workqueues
4745  *
4746  * Start freezing workqueues.  After this function returns, all freezable
4747  * workqueues will queue new works to their delayed_works list instead of
4748  * pool->worklist.
4749  *
4750  * CONTEXT:
4751  * Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's.
4752  */
4753 void freeze_workqueues_begin(void)
4754 {
4755     struct workqueue_struct *wq;
4756     struct pool_workqueue *pwq;
4757 
4758     mutex_lock(&wq_pool_mutex);
4759 
4760     WARN_ON_ONCE(workqueue_freezing);
4761     workqueue_freezing = true;
4762 
4763     list_for_each_entry(wq, &workqueues, list) {
4764         mutex_lock(&wq->mutex);
4765         for_each_pwq(pwq, wq)
4766             pwq_adjust_max_active(pwq);
4767         mutex_unlock(&wq->mutex);
4768     }
4769 
4770     mutex_unlock(&wq_pool_mutex);
4771 }
4772 
4773 /**
4774  * freeze_workqueues_busy - are freezable workqueues still busy?
4775  *
4776  * Check whether freezing is complete.  This function must be called
4777  * between freeze_workqueues_begin() and thaw_workqueues().
4778  *
4779  * CONTEXT:
4780  * Grabs and releases wq_pool_mutex.
4781  *
4782  * Return:
4783  * %true if some freezable workqueues are still busy.  %false if freezing
4784  * is complete.
4785  */
4786 bool freeze_workqueues_busy(void)
4787 {
4788     bool busy = false;
4789     struct workqueue_struct *wq;
4790     struct pool_workqueue *pwq;
4791 
4792     mutex_lock(&wq_pool_mutex);
4793 
4794     WARN_ON_ONCE(!workqueue_freezing);
4795 
4796     list_for_each_entry(wq, &workqueues, list) {
4797         if (!(wq->flags & WQ_FREEZABLE))
4798             continue;
4799         /*
4800          * nr_active is monotonically decreasing.  It's safe
4801          * to peek without lock.
4802          */
4803         rcu_read_lock_sched();
4804         for_each_pwq(pwq, wq) {
4805             WARN_ON_ONCE(pwq->nr_active < 0);
4806             if (pwq->nr_active) {
4807                 busy = true;
4808                 rcu_read_unlock_sched();
4809                 goto out_unlock;
4810             }
4811         }
4812         rcu_read_unlock_sched();
4813     }
4814 out_unlock:
4815     mutex_unlock(&wq_pool_mutex);
4816     return busy;
4817 }
4818 
4819 /**
4820  * thaw_workqueues - thaw workqueues
4821  *
4822  * Thaw workqueues.  Normal queueing is restored and all collected
4823  * frozen works are transferred to their respective pool worklists.
4824  *
4825  * CONTEXT:
4826  * Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's.
4827  */
4828 void thaw_workqueues(void)
4829 {
4830     struct workqueue_struct *wq;
4831     struct pool_workqueue *pwq;
4832 
4833     mutex_lock(&wq_pool_mutex);
4834 
4835     if (!workqueue_freezing)
4836         goto out_unlock;
4837 
4838     workqueue_freezing = false;
4839 
4840     /* restore max_active and repopulate worklist */
4841     list_for_each_entry(wq, &workqueues, list) {
4842         mutex_lock(&wq->mutex);
4843         for_each_pwq(pwq, wq)
4844             pwq_adjust_max_active(pwq);
4845         mutex_unlock(&wq->mutex);
4846     }
4847 
4848 out_unlock:
4849     mutex_unlock(&wq_pool_mutex);
4850 }
4851 #endif /* CONFIG_FREEZER */
4852 
4853 static int workqueue_apply_unbound_cpumask(void)
4854 {
4855     LIST_HEAD(ctxs);
4856     int ret = 0;
4857     struct workqueue_struct *wq;
4858     struct apply_wqattrs_ctx *ctx, *n;
4859 
4860     lockdep_assert_held(&wq_pool_mutex);
4861 
4862     list_for_each_entry(wq, &workqueues, list) {
4863         if (!(wq->flags & WQ_UNBOUND))
4864             continue;
4865         /* creating multiple pwqs breaks ordering guarantee */
4866         if (wq->flags & __WQ_ORDERED)
4867             continue;
4868 
4869         ctx = apply_wqattrs_prepare(wq, wq->unbound_attrs);
4870         if (!ctx) {
4871             ret = -ENOMEM;
4872             break;
4873         }
4874 
4875         list_add_tail(&ctx->list, &ctxs);
4876     }
4877 
4878     list_for_each_entry_safe(ctx, n, &ctxs, list) {
4879         if (!ret)
4880             apply_wqattrs_commit(ctx);
4881         apply_wqattrs_cleanup(ctx);
4882     }
4883 
4884     return ret;
4885 }
4886 
4887 /**
4888  *  workqueue_set_unbound_cpumask - Set the low-level unbound cpumask
4889  *  @cpumask: the cpumask to set
4890  *
4891  *  The low-level workqueues cpumask is a global cpumask that limits
4892  *  the affinity of all unbound workqueues.  This function check the @cpumask
4893  *  and apply it to all unbound workqueues and updates all pwqs of them.
4894  *
4895  *  Retun:  0   - Success
4896  *          -EINVAL - Invalid @cpumask
4897  *          -ENOMEM - Failed to allocate memory for attrs or pwqs.
4898  */
4899 int workqueue_set_unbound_cpumask(cpumask_var_t cpumask)
4900 {
4901     int ret = -EINVAL;
4902     cpumask_var_t saved_cpumask;
4903 
4904     if (!zalloc_cpumask_var(&saved_cpumask, GFP_KERNEL))
4905         return -ENOMEM;
4906 
4907     cpumask_and(cpumask, cpumask, cpu_possible_mask);
4908     if (!cpumask_empty(cpumask)) {
4909         apply_wqattrs_lock();
4910 
4911         /* save the old wq_unbound_cpumask. */
4912         cpumask_copy(saved_cpumask, wq_unbound_cpumask);
4913 
4914         /* update wq_unbound_cpumask at first and apply it to wqs. */
4915         cpumask_copy(wq_unbound_cpumask, cpumask);
4916         ret = workqueue_apply_unbound_cpumask();
4917 
4918         /* restore the wq_unbound_cpumask when failed. */
4919         if (ret < 0)
4920             cpumask_copy(wq_unbound_cpumask, saved_cpumask);
4921 
4922         apply_wqattrs_unlock();
4923     }
4924 
4925     free_cpumask_var(saved_cpumask);
4926     return ret;
4927 }
4928 
4929 #ifdef CONFIG_SYSFS
4930 /*
4931  * Workqueues with WQ_SYSFS flag set is visible to userland via
4932  * /sys/bus/workqueue/devices/WQ_NAME.  All visible workqueues have the
4933  * following attributes.
4934  *
4935  *  per_cpu RO bool : whether the workqueue is per-cpu or unbound
4936  *  max_active  RW int  : maximum number of in-flight work items
4937  *
4938  * Unbound workqueues have the following extra attributes.
4939  *
4940  *  id      RO int  : the associated pool ID
4941  *  nice    RW int  : nice value of the workers
4942  *  cpumask RW mask : bitmask of allowed CPUs for the workers
4943  */
4944 struct wq_device {
4945     struct workqueue_struct     *wq;
4946     struct device           dev;
4947 };
4948 
4949 static struct workqueue_struct *dev_to_wq(struct device *dev)
4950 {
4951     struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
4952 
4953     return wq_dev->wq;
4954 }
4955 
4956 static ssize_t per_cpu_show(struct device *dev, struct device_attribute *attr,
4957                 char *buf)
4958 {
4959     struct workqueue_struct *wq = dev_to_wq(dev);
4960 
4961     return scnprintf(buf, PAGE_SIZE, "%d\n", (bool)!(wq->flags & WQ_UNBOUND));
4962 }
4963 static DEVICE_ATTR_RO(per_cpu);
4964 
4965 static ssize_t max_active_show(struct device *dev,
4966                    struct device_attribute *attr, char *buf)
4967 {
4968     struct workqueue_struct *wq = dev_to_wq(dev);
4969 
4970     return scnprintf(buf, PAGE_SIZE, "%d\n", wq->saved_max_active);
4971 }
4972 
4973 static ssize_t max_active_store(struct device *dev,
4974                 struct device_attribute *attr, const char *buf,
4975                 size_t count)
4976 {
4977     struct workqueue_struct *wq = dev_to_wq(dev);
4978     int val;
4979 
4980     if (sscanf(buf, "%d", &val) != 1 || val <= 0)
4981         return -EINVAL;
4982 
4983     workqueue_set_max_active(wq, val);
4984     return count;
4985 }
4986 static DEVICE_ATTR_RW(max_active);
4987 
4988 static struct attribute *wq_sysfs_attrs[] = {
4989     &dev_attr_per_cpu.attr,
4990     &dev_attr_max_active.attr,
4991     NULL,
4992 };
4993 ATTRIBUTE_GROUPS(wq_sysfs);
4994 
4995 static ssize_t wq_pool_ids_show(struct device *dev,
4996                 struct device_attribute *attr, char *buf)
4997 {
4998     struct workqueue_struct *wq = dev_to_wq(dev);
4999     const char *delim = "";
5000     int node, written = 0;
5001 
5002     rcu_read_lock_sched();
5003     for_each_node(node) {
5004         written += scnprintf(buf + written, PAGE_SIZE - written,
5005                      "%s%d:%d", delim, node,
5006                      unbound_pwq_by_node(wq, node)->pool->id);
5007         delim = " ";
5008     }
5009     written += scnprintf(buf + written, PAGE_SIZE - written, "\n");
5010     rcu_read_unlock_sched();
5011 
5012     return written;
5013 }
5014 
5015 static ssize_t wq_nice_show(struct device *dev, struct device_attribute *attr,
5016                 char *buf)
5017 {
5018     struct workqueue_struct *wq = dev_to_wq(dev);
5019     int written;
5020 
5021     mutex_lock(&wq->mutex);
5022     written = scnprintf(buf, PAGE_SIZE, "%d\n", wq->unbound_attrs->nice);
5023     mutex_unlock(&wq->mutex);
5024 
5025     return written;
5026 }
5027 
5028 /* prepare workqueue_attrs for sysfs store operations */
5029 static struct workqueue_attrs *wq_sysfs_prep_attrs(struct workqueue_struct *wq)
5030 {
5031     struct workqueue_attrs *attrs;
5032 
5033     lockdep_assert_held(&wq_pool_mutex);
5034 
5035     attrs = alloc_workqueue_attrs(GFP_KERNEL);
5036     if (!attrs)
5037         return NULL;
5038 
5039     copy_workqueue_attrs(attrs, wq->unbound_attrs);
5040     return attrs;
5041 }
5042 
5043 static ssize_t wq_nice_store(struct device *dev, struct device_attribute *attr,
5044                  const char *buf, size_t count)
5045 {
5046     struct workqueue_struct *wq = dev_to_wq(dev);
5047     struct workqueue_attrs *attrs;
5048     int ret = -ENOMEM;
5049 
5050     apply_wqattrs_lock();
5051 
5052     attrs = wq_sysfs_prep_attrs(wq);
5053     if (!attrs)
5054         goto out_unlock;
5055 
5056     if (sscanf(buf, "%d", &attrs->nice) == 1 &&
5057         attrs->nice >= MIN_NICE && attrs->nice <= MAX_NICE)
5058         ret = apply_workqueue_attrs_locked(wq, attrs);
5059     else
5060         ret = -EINVAL;
5061 
5062 out_unlock:
5063     apply_wqattrs_unlock();
5064     free_workqueue_attrs(attrs);
5065     return ret ?: count;
5066 }
5067 
5068 static ssize_t wq_cpumask_show(struct device *dev,
5069                    struct device_attribute *attr, char *buf)
5070 {
5071     struct workqueue_struct *wq = dev_to_wq(dev);
5072     int written;
5073 
5074     mutex_lock(&wq->mutex);
5075     written = scnprintf(buf, PAGE_SIZE, "%*pb\n",
5076                 cpumask_pr_args(wq->unbound_attrs->cpumask));
5077     mutex_unlock(&wq->mutex);
5078     return written;
5079 }
5080 
5081 static ssize_t wq_cpumask_store(struct device *dev,
5082                 struct device_attribute *attr,
5083                 const char *buf, size_t count)
5084 {
5085     struct workqueue_struct *wq = dev_to_wq(dev);
5086     struct workqueue_attrs *attrs;
5087     int ret = -ENOMEM;
5088 
5089     apply_wqattrs_lock();
5090 
5091     attrs = wq_sysfs_prep_attrs(wq);
5092     if (!attrs)
5093         goto out_unlock;
5094 
5095     ret = cpumask_parse(buf, attrs->cpumask);
5096     if (!ret)
5097         ret = apply_workqueue_attrs_locked(wq, attrs);
5098 
5099 out_unlock:
5100     apply_wqattrs_unlock();
5101     free_workqueue_attrs(attrs);
5102     return ret ?: count;
5103 }
5104 
5105 static ssize_t wq_numa_show(struct device *dev, struct device_attribute *attr,
5106                 char *buf)
5107 {
5108     struct workqueue_struct *wq = dev_to_wq(dev);
5109     int written;
5110 
5111     mutex_lock(&wq->mutex);
5112     written = scnprintf(buf, PAGE_SIZE, "%d\n",
5113                 !wq->unbound_attrs->no_numa);
5114     mutex_unlock(&wq->mutex);
5115 
5116     return written;
5117 }
5118 
5119 static ssize_t wq_numa_store(struct device *dev, struct device_attribute *attr,
5120                  const char *buf, size_t count)
5121 {
5122     struct workqueue_struct *wq = dev_to_wq(dev);
5123     struct workqueue_attrs *attrs;
5124     int v, ret = -ENOMEM;
5125 
5126     apply_wqattrs_lock();
5127 
5128     attrs = wq_sysfs_prep_attrs(wq);
5129     if (!attrs)
5130         goto out_unlock;
5131 
5132     ret = -EINVAL;
5133     if (sscanf(buf, "%d", &v) == 1) {
5134         attrs->no_numa = !v;
5135         ret = apply_workqueue_attrs_locked(wq, attrs);
5136     }
5137 
5138 out_unlock:
5139     apply_wqattrs_unlock();
5140     free_workqueue_attrs(attrs);
5141     return ret ?: count;
5142 }
5143 
5144 static struct device_attribute wq_sysfs_unbound_attrs[] = {
5145     __ATTR(pool_ids, 0444, wq_pool_ids_show, NULL),
5146     __ATTR(nice, 0644, wq_nice_show, wq_nice_store),
5147     __ATTR(cpumask, 0644, wq_cpumask_show, wq_cpumask_store),
5148     __ATTR(numa, 0644, wq_numa_show, wq_numa_store),
5149     __ATTR_NULL,
5150 };
5151 
5152 static struct bus_type wq_subsys = {
5153     .name               = "workqueue",
5154     .dev_groups         = wq_sysfs_groups,
5155 };
5156 
5157 static ssize_t wq_unbound_cpumask_show(struct device *dev,
5158         struct device_attribute *attr, char *buf)
5159 {
5160     int written;
5161 
5162     mutex_lock(&wq_pool_mutex);
5163     written = scnprintf(buf, PAGE_SIZE, "%*pb\n",
5164                 cpumask_pr_args(wq_unbound_cpumask));
5165     mutex_unlock(&wq_pool_mutex);
5166 
5167     return written;
5168 }
5169 
5170 static ssize_t wq_unbound_cpumask_store(struct device *dev,
5171         struct device_attribute *attr, const char *buf, size_t count)
5172 {
5173     cpumask_var_t cpumask;
5174     int ret;
5175 
5176     if (!zalloc_cpumask_var(&cpumask, GFP_KERNEL))
5177         return -ENOMEM;
5178 
5179     ret = cpumask_parse(buf, cpumask);
5180     if (!ret)
5181         ret = workqueue_set_unbound_cpumask(cpumask);
5182 
5183     free_cpumask_var(cpumask);
5184     return ret ? ret : count;
5185 }
5186 
5187 static struct device_attribute wq_sysfs_cpumask_attr =
5188     __ATTR(cpumask, 0644, wq_unbound_cpumask_show,
5189            wq_unbound_cpumask_store);
5190 
5191 static int __init wq_sysfs_init(void)
5192 {
5193     int err;
5194 
5195     err = subsys_virtual_register(&wq_subsys, NULL);
5196     if (err)
5197         return err;
5198 
5199     return device_create_file(wq_subsys.dev_root, &wq_sysfs_cpumask_attr);
5200 }
5201 core_initcall(wq_sysfs_init);
5202 
5203 static void wq_device_release(struct device *dev)
5204 {
5205     struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
5206 
5207     kfree(wq_dev);
5208 }
5209 
5210 /**
5211  * workqueue_sysfs_register - make a workqueue visible in sysfs
5212  * @wq: the workqueue to register
5213  *
5214  * Expose @wq in sysfs under /sys/bus/workqueue/devices.
5215  * alloc_workqueue*() automatically calls this function if WQ_SYSFS is set
5216  * which is the preferred method.
5217  *
5218  * Workqueue user should use this function directly iff it wants to apply
5219  * workqueue_attrs before making the workqueue visible in sysfs; otherwise,
5220  * apply_workqueue_attrs() may race against userland updating the
5221  * attributes.
5222  *
5223  * Return: 0 on success, -errno on failure.
5224  */
5225 int workqueue_sysfs_register(struct workqueue_struct *wq)
5226 {
5227     struct wq_device *wq_dev;
5228     int ret;
5229 
5230     /*
5231      * Adjusting max_active or creating new pwqs by applying
5232      * attributes breaks ordering guarantee.  Disallow exposing ordered
5233      * workqueues.
5234      */
5235     if (WARN_ON(wq->flags & __WQ_ORDERED))
5236         return -EINVAL;
5237 
5238     wq->wq_dev = wq_dev = kzalloc(sizeof(*wq_dev), GFP_KERNEL);
5239     if (!wq_dev)
5240         return -ENOMEM;
5241 
5242     wq_dev->wq = wq;
5243     wq_dev->dev.bus = &wq_subsys;
5244     wq_dev->dev.release = wq_device_release;
5245     dev_set_name(&wq_dev->dev, "%s", wq->name);
5246 
5247     /*
5248      * unbound_attrs are created separately.  Suppress uevent until
5249      * everything is ready.
5250      */
5251     dev_set_uevent_suppress(&wq_dev->dev, true);
5252 
5253     ret = device_register(&wq_dev->dev);
5254     if (ret) {
5255         kfree(wq_dev);
5256         wq->wq_dev = NULL;
5257         return ret;
5258     }
5259 
5260     if (wq->flags & WQ_UNBOUND) {
5261         struct device_attribute *attr;
5262 
5263         for (attr = wq_sysfs_unbound_attrs; attr->attr.name; attr++) {
5264             ret = device_create_file(&wq_dev->dev, attr);
5265             if (ret) {
5266                 device_unregister(&wq_dev->dev);
5267                 wq->wq_dev = NULL;
5268                 return ret;
5269             }
5270         }
5271     }
5272 
5273     dev_set_uevent_suppress(&wq_dev->dev, false);
5274     kobject_uevent(&wq_dev->dev.kobj, KOBJ_ADD);
5275     return 0;
5276 }
5277 
5278 /**
5279  * workqueue_sysfs_unregister - undo workqueue_sysfs_register()
5280  * @wq: the workqueue to unregister
5281  *
5282  * If @wq is registered to sysfs by workqueue_sysfs_register(), unregister.
5283  */
5284 static void workqueue_sysfs_unregister(struct workqueue_struct *wq)
5285 {
5286     struct wq_device *wq_dev = wq->wq_dev;
5287 
5288     if (!wq->wq_dev)
5289         return;
5290 
5291     wq->wq_dev = NULL;
5292     device_unregister(&wq_dev->dev);
5293 }
5294 #else   /* CONFIG_SYSFS */
5295 static void workqueue_sysfs_unregister(struct workqueue_struct *wq) { }
5296 #endif  /* CONFIG_SYSFS */
5297 
5298 /*
5299  * Workqueue watchdog.
5300  *
5301  * Stall may be caused by various bugs - missing WQ_MEM_RECLAIM, illegal
5302  * flush dependency, a concurrency managed work item which stays RUNNING
5303  * indefinitely.  Workqueue stalls can be very difficult to debug as the
5304  * usual warning mechanisms don't trigger and internal workqueue state is
5305  * largely opaque.
5306  *
5307  * Workqueue watchdog monitors all worker pools periodically and dumps
5308  * state if some pools failed to make forward progress for a while where
5309  * forward progress is defined as the first item on ->worklist changing.
5310  *
5311  * This mechanism is controlled through the kernel parameter
5312  * "workqueue.watchdog_thresh" which can be updated at runtime through the
5313  * corresponding sysfs parameter file.
5314  */
5315 #ifdef CONFIG_WQ_WATCHDOG
5316 
5317 static void wq_watchdog_timer_fn(unsigned long data);
5318 
5319 static unsigned long wq_watchdog_thresh = 30;
5320 static struct timer_list wq_watchdog_timer =
5321     TIMER_DEFERRED_INITIALIZER(wq_watchdog_timer_fn, 0, 0);
5322 
5323 static unsigned long wq_watchdog_touched = INITIAL_JIFFIES;
5324 static DEFINE_PER_CPU(unsigned long, wq_watchdog_touched_cpu) = INITIAL_JIFFIES;
5325 
5326 static void wq_watchdog_reset_touched(void)
5327 {
5328     int cpu;
5329 
5330     wq_watchdog_touched = jiffies;
5331     for_each_possible_cpu(cpu)
5332         per_cpu(wq_watchdog_touched_cpu, cpu) = jiffies;
5333 }
5334 
5335 static void wq_watchdog_timer_fn(unsigned long data)
5336 {
5337     unsigned long thresh = READ_ONCE(wq_watchdog_thresh) * HZ;
5338     bool lockup_detected = false;
5339     struct worker_pool *pool;
5340     int pi;
5341 
5342     if (!thresh)
5343         return;
5344 
5345     rcu_read_lock();
5346 
5347     for_each_pool(pool, pi) {
5348         unsigned long pool_ts, touched, ts;
5349 
5350         if (list_empty(&pool->worklist))
5351             continue;
5352 
5353         /* get the latest of pool and touched timestamps */
5354         pool_ts = READ_ONCE(pool->watchdog_ts);
5355         touched = READ_ONCE(wq_watchdog_touched);
5356 
5357         if (time_after(pool_ts, touched))
5358             ts = pool_ts;
5359         else
5360             ts = touched;
5361 
5362         if (pool->cpu >= 0) {
5363             unsigned long cpu_touched =
5364                 READ_ONCE(per_cpu(wq_watchdog_touched_cpu,
5365                           pool->cpu));
5366             if (time_after(cpu_touched, ts))
5367                 ts = cpu_touched;
5368         }
5369 
5370         /* did we stall? */
5371         if (time_after(jiffies, ts + thresh)) {
5372             lockup_detected = true;
5373             pr_emerg("BUG: workqueue lockup - pool");
5374             pr_cont_pool_info(pool);
5375             pr_cont(" stuck for %us!\n",
5376                 jiffies_to_msecs(jiffies - pool_ts) / 1000);
5377         }
5378     }
5379 
5380     rcu_read_unlock();
5381 
5382     if (lockup_detected)
5383         show_workqueue_state();
5384 
5385     wq_watchdog_reset_touched();
5386     mod_timer(&wq_watchdog_timer, jiffies + thresh);
5387 }
5388 
5389 void wq_watchdog_touch(int cpu)
5390 {
5391     if (cpu >= 0)
5392         per_cpu(wq_watchdog_touched_cpu, cpu) = jiffies;
5393     else
5394         wq_watchdog_touched = jiffies;
5395 }
5396 
5397 static void wq_watchdog_set_thresh(unsigned long thresh)
5398 {
5399     wq_watchdog_thresh = 0;
5400     del_timer_sync(&wq_watchdog_timer);
5401 
5402     if (thresh) {
5403         wq_watchdog_thresh = thresh;
5404         wq_watchdog_reset_touched();
5405         mod_timer(&wq_watchdog_timer, jiffies + thresh * HZ);
5406     }
5407 }
5408 
5409 static int wq_watchdog_param_set_thresh(const char *val,
5410                     const struct kernel_param *kp)
5411 {
5412     unsigned long thresh;
5413     int ret;
5414 
5415     ret = kstrtoul(val, 0, &thresh);
5416     if (ret)
5417         return ret;
5418 
5419     if (system_wq)
5420         wq_watchdog_set_thresh(thresh);
5421     else
5422         wq_watchdog_thresh = thresh;
5423 
5424     return 0;
5425 }
5426 
5427 static const struct kernel_param_ops wq_watchdog_thresh_ops = {
5428     .set    = wq_watchdog_param_set_thresh,
5429     .get    = param_get_ulong,
5430 };
5431 
5432 module_param_cb(watchdog_thresh, &wq_watchdog_thresh_ops, &wq_watchdog_thresh,
5433         0644);
5434 
5435 static void wq_watchdog_init(void)
5436 {
5437     wq_watchdog_set_thresh(wq_watchdog_thresh);
5438 }
5439 
5440 #else   /* CONFIG_WQ_WATCHDOG */
5441 
5442 static inline void wq_watchdog_init(void) { }
5443 
5444 #endif  /* CONFIG_WQ_WATCHDOG */
5445 
5446 static void __init wq_numa_init(void)
5447 {
5448     cpumask_var_t *tbl;
5449     int node, cpu;
5450 
5451     if (num_possible_nodes() <= 1)
5452         return;
5453 
5454     if (wq_disable_numa) {
5455         pr_info("workqueue: NUMA affinity support disabled\n");
5456         return;
5457     }
5458 
5459     wq_update_unbound_numa_attrs_buf = alloc_workqueue_attrs(GFP_KERNEL);
5460     BUG_ON(!wq_update_unbound_numa_attrs_buf);
5461 
5462     /*
5463      * We want masks of possible CPUs of each node which isn't readily
5464      * available.  Build one from cpu_to_node() which should have been
5465      * fully initialized by now.
5466      */
5467     tbl = kzalloc(nr_node_ids * sizeof(tbl[0]), GFP_KERNEL);
5468     BUG_ON(!tbl);
5469 
5470     for_each_node(node)
5471         BUG_ON(!zalloc_cpumask_var_node(&tbl[node], GFP_KERNEL,
5472                 node_online(node) ? node : NUMA_NO_NODE));
5473 
5474     for_each_possible_cpu(cpu) {
5475         node = cpu_to_node(cpu);
5476         if (WARN_ON(node == NUMA_NO_NODE)) {
5477             pr_warn("workqueue: NUMA node mapping not available for cpu%d, disabling NUMA support\n", cpu);
5478             /* happens iff arch is bonkers, let's just proceed */
5479             return;
5480         }
5481         cpumask_set_cpu(cpu, tbl[node]);
5482     }
5483 
5484     wq_numa_possible_cpumask = tbl;
5485     wq_numa_enabled = true;
5486 }
5487 
5488 /**
5489  * workqueue_init_early - early init for workqueue subsystem
5490  *
5491  * This is the first half of two-staged workqueue subsystem initialization
5492  * and invoked as soon as the bare basics - memory allocation, cpumasks and
5493  * idr are up.  It sets up all the data structures and system workqueues
5494  * and allows early boot code to create workqueues and queue/cancel work
5495  * items.  Actual work item execution starts only after kthreads can be
5496  * created and scheduled right before early initcalls.
5497  */
5498 int __init workqueue_init_early(void)
5499 {
5500     int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };
5501     int i, cpu;
5502 
5503     WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));
5504 
5505     BUG_ON(!alloc_cpumask_var(&wq_unbound_cpumask, GFP_KERNEL));
5506     cpumask_copy(wq_unbound_cpumask, cpu_possible_mask);
5507 
5508     pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);
5509 
5510     /* initialize CPU pools */
5511     for_each_possible_cpu(cpu) {
5512         struct worker_pool *pool;
5513 
5514         i = 0;
5515         for_each_cpu_worker_pool(pool, cpu) {
5516             BUG_ON(init_worker_pool(pool));
5517             pool->cpu = cpu;
5518             cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
5519             pool->attrs->nice = std_nice[i++];
5520             pool->node = cpu_to_node(cpu);
5521 
5522             /* alloc pool ID */
5523             mutex_lock(&wq_pool_mutex);
5524             BUG_ON(worker_pool_assign_id(pool));
5525             mutex_unlock(&wq_pool_mutex);
5526         }
5527     }
5528 
5529     /* create default unbound and ordered wq attrs */
5530     for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
5531         struct workqueue_attrs *attrs;
5532 
5533         BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));
5534         attrs->nice = std_nice[i];
5535         unbound_std_wq_attrs[i] = attrs;
5536 
5537         /*
5538          * An ordered wq should have only one pwq as ordering is
5539          * guaranteed by max_active which is enforced by pwqs.
5540          * Turn off NUMA so that dfl_pwq is used for all nodes.
5541          */
5542         BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));
5543         attrs->nice = std_nice[i];
5544         attrs->no_numa = true;
5545         ordered_wq_attrs[i] = attrs;
5546     }
5547 
5548     system_wq = alloc_workqueue("events", 0, 0);
5549     system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
5550     system_long_wq = alloc_workqueue("events_long", 0, 0);
5551     system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
5552                         WQ_UNBOUND_MAX_ACTIVE);
5553     system_freezable_wq = alloc_workqueue("events_freezable",
5554                           WQ_FREEZABLE, 0);
5555     system_power_efficient_wq = alloc_workqueue("events_power_efficient",
5556                           WQ_POWER_EFFICIENT, 0);
5557     system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient",
5558                           WQ_FREEZABLE | WQ_POWER_EFFICIENT,
5559                           0);
5560     BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
5561            !system_unbound_wq || !system_freezable_wq ||
5562            !system_power_efficient_wq ||
5563            !system_freezable_power_efficient_wq);
5564 
5565     return 0;
5566 }
5567 
5568 /**
5569  * workqueue_init - bring workqueue subsystem fully online
5570  *
5571  * This is the latter half of two-staged workqueue subsystem initialization
5572  * and invoked as soon as kthreads can be created and scheduled.
5573  * Workqueues have been created and work items queued on them, but there
5574  * are no kworkers executing the work items yet.  Populate the worker pools
5575  * with the initial workers and enable future kworker creations.
5576  */
5577 int __init workqueue_init(void)
5578 {
5579     struct workqueue_struct *wq;
5580     struct worker_pool *pool;
5581     int cpu, bkt;
5582 
5583     /*
5584      * It'd be simpler to initialize NUMA in workqueue_init_early() but
5585      * CPU to node mapping may not be available that early on some
5586      * archs such as power and arm64.  As per-cpu pools created
5587      * previously could be missing node hint and unbound pools NUMA
5588      * affinity, fix them up.
5589      */
5590     wq_numa_init();
5591 
5592     mutex_lock(&wq_pool_mutex);
5593 
5594     for_each_possible_cpu(cpu) {
5595         for_each_cpu_worker_pool(pool, cpu) {
5596             pool->node = cpu_to_node(cpu);
5597         }
5598     }
5599 
5600     list_for_each_entry(wq, &workqueues, list)
5601         wq_update_unbound_numa(wq, smp_processor_id(), true);
5602 
5603     mutex_unlock(&wq_pool_mutex);
5604 
5605     /* create the initial workers */
5606     for_each_online_cpu(cpu) {
5607         for_each_cpu_worker_pool(pool, cpu) {
5608             pool->flags &= ~POOL_DISASSOCIATED;
5609             BUG_ON(!create_worker(pool));
5610         }
5611     }
5612 
5613     hash_for_each(unbound_pool_hash, bkt, pool, hash_node)
5614         BUG_ON(!create_worker(pool));
5615 
5616     wq_online = true;
5617     wq_watchdog_init();
5618 
5619     return 0;
5620 }