Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0
0002 /*
0003  * Basic worker thread pool for io_uring
0004  *
0005  * Copyright (C) 2019 Jens Axboe
0006  *
0007  */
0008 #include <linux/kernel.h>
0009 #include <linux/init.h>
0010 #include <linux/errno.h>
0011 #include <linux/sched/signal.h>
0012 #include <linux/percpu.h>
0013 #include <linux/slab.h>
0014 #include <linux/rculist_nulls.h>
0015 #include <linux/cpu.h>
0016 #include <linux/task_work.h>
0017 #include <linux/audit.h>
0018 #include <uapi/linux/io_uring.h>
0019 
0020 #include "io-wq.h"
0021 #include "slist.h"
0022 #include "io_uring.h"
0023 
0024 #define WORKER_IDLE_TIMEOUT (5 * HZ)
0025 
0026 enum {
0027     IO_WORKER_F_UP      = 1,    /* up and active */
0028     IO_WORKER_F_RUNNING = 2,    /* account as running */
0029     IO_WORKER_F_FREE    = 4,    /* worker on free list */
0030     IO_WORKER_F_BOUND   = 8,    /* is doing bounded work */
0031 };
0032 
0033 enum {
0034     IO_WQ_BIT_EXIT      = 0,    /* wq exiting */
0035 };
0036 
0037 enum {
0038     IO_ACCT_STALLED_BIT = 0,    /* stalled on hash */
0039 };
0040 
0041 /*
0042  * One for each thread in a wqe pool
0043  */
0044 struct io_worker {
0045     refcount_t ref;
0046     unsigned flags;
0047     struct hlist_nulls_node nulls_node;
0048     struct list_head all_list;
0049     struct task_struct *task;
0050     struct io_wqe *wqe;
0051 
0052     struct io_wq_work *cur_work;
0053     struct io_wq_work *next_work;
0054     raw_spinlock_t lock;
0055 
0056     struct completion ref_done;
0057 
0058     unsigned long create_state;
0059     struct callback_head create_work;
0060     int create_index;
0061 
0062     union {
0063         struct rcu_head rcu;
0064         struct work_struct work;
0065     };
0066 };
0067 
0068 #if BITS_PER_LONG == 64
0069 #define IO_WQ_HASH_ORDER    6
0070 #else
0071 #define IO_WQ_HASH_ORDER    5
0072 #endif
0073 
0074 #define IO_WQ_NR_HASH_BUCKETS   (1u << IO_WQ_HASH_ORDER)
0075 
0076 struct io_wqe_acct {
0077     unsigned nr_workers;
0078     unsigned max_workers;
0079     int index;
0080     atomic_t nr_running;
0081     raw_spinlock_t lock;
0082     struct io_wq_work_list work_list;
0083     unsigned long flags;
0084 };
0085 
0086 enum {
0087     IO_WQ_ACCT_BOUND,
0088     IO_WQ_ACCT_UNBOUND,
0089     IO_WQ_ACCT_NR,
0090 };
0091 
0092 /*
0093  * Per-node worker thread pool
0094  */
0095 struct io_wqe {
0096     raw_spinlock_t lock;
0097     struct io_wqe_acct acct[IO_WQ_ACCT_NR];
0098 
0099     int node;
0100 
0101     struct hlist_nulls_head free_list;
0102     struct list_head all_list;
0103 
0104     struct wait_queue_entry wait;
0105 
0106     struct io_wq *wq;
0107     struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
0108 
0109     cpumask_var_t cpu_mask;
0110 };
0111 
0112 /*
0113  * Per io_wq state
0114   */
0115 struct io_wq {
0116     unsigned long state;
0117 
0118     free_work_fn *free_work;
0119     io_wq_work_fn *do_work;
0120 
0121     struct io_wq_hash *hash;
0122 
0123     atomic_t worker_refs;
0124     struct completion worker_done;
0125 
0126     struct hlist_node cpuhp_node;
0127 
0128     struct task_struct *task;
0129 
0130     struct io_wqe *wqes[];
0131 };
0132 
0133 static enum cpuhp_state io_wq_online;
0134 
0135 struct io_cb_cancel_data {
0136     work_cancel_fn *fn;
0137     void *data;
0138     int nr_running;
0139     int nr_pending;
0140     bool cancel_all;
0141 };
0142 
0143 static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
0144 static void io_wqe_dec_running(struct io_worker *worker);
0145 static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
0146                     struct io_wqe_acct *acct,
0147                     struct io_cb_cancel_data *match);
0148 static void create_worker_cb(struct callback_head *cb);
0149 static void io_wq_cancel_tw_create(struct io_wq *wq);
0150 
0151 static bool io_worker_get(struct io_worker *worker)
0152 {
0153     return refcount_inc_not_zero(&worker->ref);
0154 }
0155 
0156 static void io_worker_release(struct io_worker *worker)
0157 {
0158     if (refcount_dec_and_test(&worker->ref))
0159         complete(&worker->ref_done);
0160 }
0161 
0162 static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
0163 {
0164     return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
0165 }
0166 
0167 static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
0168                            struct io_wq_work *work)
0169 {
0170     return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
0171 }
0172 
0173 static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
0174 {
0175     return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
0176 }
0177 
0178 static void io_worker_ref_put(struct io_wq *wq)
0179 {
0180     if (atomic_dec_and_test(&wq->worker_refs))
0181         complete(&wq->worker_done);
0182 }
0183 
0184 static void io_worker_cancel_cb(struct io_worker *worker)
0185 {
0186     struct io_wqe_acct *acct = io_wqe_get_acct(worker);
0187     struct io_wqe *wqe = worker->wqe;
0188     struct io_wq *wq = wqe->wq;
0189 
0190     atomic_dec(&acct->nr_running);
0191     raw_spin_lock(&worker->wqe->lock);
0192     acct->nr_workers--;
0193     raw_spin_unlock(&worker->wqe->lock);
0194     io_worker_ref_put(wq);
0195     clear_bit_unlock(0, &worker->create_state);
0196     io_worker_release(worker);
0197 }
0198 
0199 static bool io_task_worker_match(struct callback_head *cb, void *data)
0200 {
0201     struct io_worker *worker;
0202 
0203     if (cb->func != create_worker_cb)
0204         return false;
0205     worker = container_of(cb, struct io_worker, create_work);
0206     return worker == data;
0207 }
0208 
0209 static void io_worker_exit(struct io_worker *worker)
0210 {
0211     struct io_wqe *wqe = worker->wqe;
0212     struct io_wq *wq = wqe->wq;
0213 
0214     while (1) {
0215         struct callback_head *cb = task_work_cancel_match(wq->task,
0216                         io_task_worker_match, worker);
0217 
0218         if (!cb)
0219             break;
0220         io_worker_cancel_cb(worker);
0221     }
0222 
0223     io_worker_release(worker);
0224     wait_for_completion(&worker->ref_done);
0225 
0226     raw_spin_lock(&wqe->lock);
0227     if (worker->flags & IO_WORKER_F_FREE)
0228         hlist_nulls_del_rcu(&worker->nulls_node);
0229     list_del_rcu(&worker->all_list);
0230     raw_spin_unlock(&wqe->lock);
0231     io_wqe_dec_running(worker);
0232     worker->flags = 0;
0233     preempt_disable();
0234     current->flags &= ~PF_IO_WORKER;
0235     preempt_enable();
0236 
0237     kfree_rcu(worker, rcu);
0238     io_worker_ref_put(wqe->wq);
0239     do_exit(0);
0240 }
0241 
0242 static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
0243 {
0244     bool ret = false;
0245 
0246     raw_spin_lock(&acct->lock);
0247     if (!wq_list_empty(&acct->work_list) &&
0248         !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
0249         ret = true;
0250     raw_spin_unlock(&acct->lock);
0251 
0252     return ret;
0253 }
0254 
0255 /*
0256  * Check head of free list for an available worker. If one isn't available,
0257  * caller must create one.
0258  */
0259 static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
0260                     struct io_wqe_acct *acct)
0261     __must_hold(RCU)
0262 {
0263     struct hlist_nulls_node *n;
0264     struct io_worker *worker;
0265 
0266     /*
0267      * Iterate free_list and see if we can find an idle worker to
0268      * activate. If a given worker is on the free_list but in the process
0269      * of exiting, keep trying.
0270      */
0271     hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
0272         if (!io_worker_get(worker))
0273             continue;
0274         if (io_wqe_get_acct(worker) != acct) {
0275             io_worker_release(worker);
0276             continue;
0277         }
0278         if (wake_up_process(worker->task)) {
0279             io_worker_release(worker);
0280             return true;
0281         }
0282         io_worker_release(worker);
0283     }
0284 
0285     return false;
0286 }
0287 
0288 /*
0289  * We need a worker. If we find a free one, we're good. If not, and we're
0290  * below the max number of workers, create one.
0291  */
0292 static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
0293 {
0294     /*
0295      * Most likely an attempt to queue unbounded work on an io_wq that
0296      * wasn't setup with any unbounded workers.
0297      */
0298     if (unlikely(!acct->max_workers))
0299         pr_warn_once("io-wq is not configured for unbound workers");
0300 
0301     raw_spin_lock(&wqe->lock);
0302     if (acct->nr_workers >= acct->max_workers) {
0303         raw_spin_unlock(&wqe->lock);
0304         return true;
0305     }
0306     acct->nr_workers++;
0307     raw_spin_unlock(&wqe->lock);
0308     atomic_inc(&acct->nr_running);
0309     atomic_inc(&wqe->wq->worker_refs);
0310     return create_io_worker(wqe->wq, wqe, acct->index);
0311 }
0312 
0313 static void io_wqe_inc_running(struct io_worker *worker)
0314 {
0315     struct io_wqe_acct *acct = io_wqe_get_acct(worker);
0316 
0317     atomic_inc(&acct->nr_running);
0318 }
0319 
0320 static void create_worker_cb(struct callback_head *cb)
0321 {
0322     struct io_worker *worker;
0323     struct io_wq *wq;
0324     struct io_wqe *wqe;
0325     struct io_wqe_acct *acct;
0326     bool do_create = false;
0327 
0328     worker = container_of(cb, struct io_worker, create_work);
0329     wqe = worker->wqe;
0330     wq = wqe->wq;
0331     acct = &wqe->acct[worker->create_index];
0332     raw_spin_lock(&wqe->lock);
0333     if (acct->nr_workers < acct->max_workers) {
0334         acct->nr_workers++;
0335         do_create = true;
0336     }
0337     raw_spin_unlock(&wqe->lock);
0338     if (do_create) {
0339         create_io_worker(wq, wqe, worker->create_index);
0340     } else {
0341         atomic_dec(&acct->nr_running);
0342         io_worker_ref_put(wq);
0343     }
0344     clear_bit_unlock(0, &worker->create_state);
0345     io_worker_release(worker);
0346 }
0347 
0348 static bool io_queue_worker_create(struct io_worker *worker,
0349                    struct io_wqe_acct *acct,
0350                    task_work_func_t func)
0351 {
0352     struct io_wqe *wqe = worker->wqe;
0353     struct io_wq *wq = wqe->wq;
0354 
0355     /* raced with exit, just ignore create call */
0356     if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
0357         goto fail;
0358     if (!io_worker_get(worker))
0359         goto fail;
0360     /*
0361      * create_state manages ownership of create_work/index. We should
0362      * only need one entry per worker, as the worker going to sleep
0363      * will trigger the condition, and waking will clear it once it
0364      * runs the task_work.
0365      */
0366     if (test_bit(0, &worker->create_state) ||
0367         test_and_set_bit_lock(0, &worker->create_state))
0368         goto fail_release;
0369 
0370     atomic_inc(&wq->worker_refs);
0371     init_task_work(&worker->create_work, func);
0372     worker->create_index = acct->index;
0373     if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
0374         /*
0375          * EXIT may have been set after checking it above, check after
0376          * adding the task_work and remove any creation item if it is
0377          * now set. wq exit does that too, but we can have added this
0378          * work item after we canceled in io_wq_exit_workers().
0379          */
0380         if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
0381             io_wq_cancel_tw_create(wq);
0382         io_worker_ref_put(wq);
0383         return true;
0384     }
0385     io_worker_ref_put(wq);
0386     clear_bit_unlock(0, &worker->create_state);
0387 fail_release:
0388     io_worker_release(worker);
0389 fail:
0390     atomic_dec(&acct->nr_running);
0391     io_worker_ref_put(wq);
0392     return false;
0393 }
0394 
0395 static void io_wqe_dec_running(struct io_worker *worker)
0396 {
0397     struct io_wqe_acct *acct = io_wqe_get_acct(worker);
0398     struct io_wqe *wqe = worker->wqe;
0399 
0400     if (!(worker->flags & IO_WORKER_F_UP))
0401         return;
0402 
0403     if (!atomic_dec_and_test(&acct->nr_running))
0404         return;
0405     if (!io_acct_run_queue(acct))
0406         return;
0407 
0408     atomic_inc(&acct->nr_running);
0409     atomic_inc(&wqe->wq->worker_refs);
0410     io_queue_worker_create(worker, acct, create_worker_cb);
0411 }
0412 
0413 /*
0414  * Worker will start processing some work. Move it to the busy list, if
0415  * it's currently on the freelist
0416  */
0417 static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker)
0418 {
0419     if (worker->flags & IO_WORKER_F_FREE) {
0420         worker->flags &= ~IO_WORKER_F_FREE;
0421         raw_spin_lock(&wqe->lock);
0422         hlist_nulls_del_init_rcu(&worker->nulls_node);
0423         raw_spin_unlock(&wqe->lock);
0424     }
0425 }
0426 
0427 /*
0428  * No work, worker going to sleep. Move to freelist, and unuse mm if we
0429  * have one attached. Dropping the mm may potentially sleep, so we drop
0430  * the lock in that case and return success. Since the caller has to
0431  * retry the loop in that case (we changed task state), we don't regrab
0432  * the lock if we return success.
0433  */
0434 static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
0435     __must_hold(wqe->lock)
0436 {
0437     if (!(worker->flags & IO_WORKER_F_FREE)) {
0438         worker->flags |= IO_WORKER_F_FREE;
0439         hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
0440     }
0441 }
0442 
0443 static inline unsigned int io_get_work_hash(struct io_wq_work *work)
0444 {
0445     return work->flags >> IO_WQ_HASH_SHIFT;
0446 }
0447 
0448 static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
0449 {
0450     struct io_wq *wq = wqe->wq;
0451     bool ret = false;
0452 
0453     spin_lock_irq(&wq->hash->wait.lock);
0454     if (list_empty(&wqe->wait.entry)) {
0455         __add_wait_queue(&wq->hash->wait, &wqe->wait);
0456         if (!test_bit(hash, &wq->hash->map)) {
0457             __set_current_state(TASK_RUNNING);
0458             list_del_init(&wqe->wait.entry);
0459             ret = true;
0460         }
0461     }
0462     spin_unlock_irq(&wq->hash->wait.lock);
0463     return ret;
0464 }
0465 
0466 static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
0467                        struct io_worker *worker)
0468     __must_hold(acct->lock)
0469 {
0470     struct io_wq_work_node *node, *prev;
0471     struct io_wq_work *work, *tail;
0472     unsigned int stall_hash = -1U;
0473     struct io_wqe *wqe = worker->wqe;
0474 
0475     wq_list_for_each(node, prev, &acct->work_list) {
0476         unsigned int hash;
0477 
0478         work = container_of(node, struct io_wq_work, list);
0479 
0480         /* not hashed, can run anytime */
0481         if (!io_wq_is_hashed(work)) {
0482             wq_list_del(&acct->work_list, node, prev);
0483             return work;
0484         }
0485 
0486         hash = io_get_work_hash(work);
0487         /* all items with this hash lie in [work, tail] */
0488         tail = wqe->hash_tail[hash];
0489 
0490         /* hashed, can run if not already running */
0491         if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
0492             wqe->hash_tail[hash] = NULL;
0493             wq_list_cut(&acct->work_list, &tail->list, prev);
0494             return work;
0495         }
0496         if (stall_hash == -1U)
0497             stall_hash = hash;
0498         /* fast forward to a next hash, for-each will fix up @prev */
0499         node = &tail->list;
0500     }
0501 
0502     if (stall_hash != -1U) {
0503         bool unstalled;
0504 
0505         /*
0506          * Set this before dropping the lock to avoid racing with new
0507          * work being added and clearing the stalled bit.
0508          */
0509         set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
0510         raw_spin_unlock(&acct->lock);
0511         unstalled = io_wait_on_hash(wqe, stall_hash);
0512         raw_spin_lock(&acct->lock);
0513         if (unstalled) {
0514             clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
0515             if (wq_has_sleeper(&wqe->wq->hash->wait))
0516                 wake_up(&wqe->wq->hash->wait);
0517         }
0518     }
0519 
0520     return NULL;
0521 }
0522 
0523 static void io_assign_current_work(struct io_worker *worker,
0524                    struct io_wq_work *work)
0525 {
0526     if (work) {
0527         io_run_task_work();
0528         cond_resched();
0529     }
0530 
0531     raw_spin_lock(&worker->lock);
0532     worker->cur_work = work;
0533     worker->next_work = NULL;
0534     raw_spin_unlock(&worker->lock);
0535 }
0536 
0537 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
0538 
0539 static void io_worker_handle_work(struct io_worker *worker)
0540 {
0541     struct io_wqe_acct *acct = io_wqe_get_acct(worker);
0542     struct io_wqe *wqe = worker->wqe;
0543     struct io_wq *wq = wqe->wq;
0544     bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
0545 
0546     do {
0547         struct io_wq_work *work;
0548 
0549         /*
0550          * If we got some work, mark us as busy. If we didn't, but
0551          * the list isn't empty, it means we stalled on hashed work.
0552          * Mark us stalled so we don't keep looking for work when we
0553          * can't make progress, any work completion or insertion will
0554          * clear the stalled flag.
0555          */
0556         raw_spin_lock(&acct->lock);
0557         work = io_get_next_work(acct, worker);
0558         raw_spin_unlock(&acct->lock);
0559         if (work) {
0560             __io_worker_busy(wqe, worker);
0561 
0562             /*
0563              * Make sure cancelation can find this, even before
0564              * it becomes the active work. That avoids a window
0565              * where the work has been removed from our general
0566              * work list, but isn't yet discoverable as the
0567              * current work item for this worker.
0568              */
0569             raw_spin_lock(&worker->lock);
0570             worker->next_work = work;
0571             raw_spin_unlock(&worker->lock);
0572         } else {
0573             break;
0574         }
0575         io_assign_current_work(worker, work);
0576         __set_current_state(TASK_RUNNING);
0577 
0578         /* handle a whole dependent link */
0579         do {
0580             struct io_wq_work *next_hashed, *linked;
0581             unsigned int hash = io_get_work_hash(work);
0582 
0583             next_hashed = wq_next_work(work);
0584 
0585             if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
0586                 work->flags |= IO_WQ_WORK_CANCEL;
0587             wq->do_work(work);
0588             io_assign_current_work(worker, NULL);
0589 
0590             linked = wq->free_work(work);
0591             work = next_hashed;
0592             if (!work && linked && !io_wq_is_hashed(linked)) {
0593                 work = linked;
0594                 linked = NULL;
0595             }
0596             io_assign_current_work(worker, work);
0597             if (linked)
0598                 io_wqe_enqueue(wqe, linked);
0599 
0600             if (hash != -1U && !next_hashed) {
0601                 /* serialize hash clear with wake_up() */
0602                 spin_lock_irq(&wq->hash->wait.lock);
0603                 clear_bit(hash, &wq->hash->map);
0604                 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
0605                 spin_unlock_irq(&wq->hash->wait.lock);
0606                 if (wq_has_sleeper(&wq->hash->wait))
0607                     wake_up(&wq->hash->wait);
0608             }
0609         } while (work);
0610     } while (1);
0611 }
0612 
0613 static int io_wqe_worker(void *data)
0614 {
0615     struct io_worker *worker = data;
0616     struct io_wqe_acct *acct = io_wqe_get_acct(worker);
0617     struct io_wqe *wqe = worker->wqe;
0618     struct io_wq *wq = wqe->wq;
0619     bool last_timeout = false;
0620     char buf[TASK_COMM_LEN];
0621 
0622     worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
0623 
0624     snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
0625     set_task_comm(current, buf);
0626 
0627     while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
0628         long ret;
0629 
0630         set_current_state(TASK_INTERRUPTIBLE);
0631         while (io_acct_run_queue(acct))
0632             io_worker_handle_work(worker);
0633 
0634         raw_spin_lock(&wqe->lock);
0635         /* timed out, exit unless we're the last worker */
0636         if (last_timeout && acct->nr_workers > 1) {
0637             acct->nr_workers--;
0638             raw_spin_unlock(&wqe->lock);
0639             __set_current_state(TASK_RUNNING);
0640             break;
0641         }
0642         last_timeout = false;
0643         __io_worker_idle(wqe, worker);
0644         raw_spin_unlock(&wqe->lock);
0645         if (io_run_task_work())
0646             continue;
0647         ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
0648         if (signal_pending(current)) {
0649             struct ksignal ksig;
0650 
0651             if (!get_signal(&ksig))
0652                 continue;
0653             break;
0654         }
0655         last_timeout = !ret;
0656     }
0657 
0658     if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
0659         io_worker_handle_work(worker);
0660 
0661     io_worker_exit(worker);
0662     return 0;
0663 }
0664 
0665 /*
0666  * Called when a worker is scheduled in. Mark us as currently running.
0667  */
0668 void io_wq_worker_running(struct task_struct *tsk)
0669 {
0670     struct io_worker *worker = tsk->worker_private;
0671 
0672     if (!worker)
0673         return;
0674     if (!(worker->flags & IO_WORKER_F_UP))
0675         return;
0676     if (worker->flags & IO_WORKER_F_RUNNING)
0677         return;
0678     worker->flags |= IO_WORKER_F_RUNNING;
0679     io_wqe_inc_running(worker);
0680 }
0681 
0682 /*
0683  * Called when worker is going to sleep. If there are no workers currently
0684  * running and we have work pending, wake up a free one or create a new one.
0685  */
0686 void io_wq_worker_sleeping(struct task_struct *tsk)
0687 {
0688     struct io_worker *worker = tsk->worker_private;
0689 
0690     if (!worker)
0691         return;
0692     if (!(worker->flags & IO_WORKER_F_UP))
0693         return;
0694     if (!(worker->flags & IO_WORKER_F_RUNNING))
0695         return;
0696 
0697     worker->flags &= ~IO_WORKER_F_RUNNING;
0698     io_wqe_dec_running(worker);
0699 }
0700 
0701 static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
0702                    struct task_struct *tsk)
0703 {
0704     tsk->worker_private = worker;
0705     worker->task = tsk;
0706     set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
0707     tsk->flags |= PF_NO_SETAFFINITY;
0708 
0709     raw_spin_lock(&wqe->lock);
0710     hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
0711     list_add_tail_rcu(&worker->all_list, &wqe->all_list);
0712     worker->flags |= IO_WORKER_F_FREE;
0713     raw_spin_unlock(&wqe->lock);
0714     wake_up_new_task(tsk);
0715 }
0716 
0717 static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
0718 {
0719     return true;
0720 }
0721 
0722 static inline bool io_should_retry_thread(long err)
0723 {
0724     /*
0725      * Prevent perpetual task_work retry, if the task (or its group) is
0726      * exiting.
0727      */
0728     if (fatal_signal_pending(current))
0729         return false;
0730 
0731     switch (err) {
0732     case -EAGAIN:
0733     case -ERESTARTSYS:
0734     case -ERESTARTNOINTR:
0735     case -ERESTARTNOHAND:
0736         return true;
0737     default:
0738         return false;
0739     }
0740 }
0741 
0742 static void create_worker_cont(struct callback_head *cb)
0743 {
0744     struct io_worker *worker;
0745     struct task_struct *tsk;
0746     struct io_wqe *wqe;
0747 
0748     worker = container_of(cb, struct io_worker, create_work);
0749     clear_bit_unlock(0, &worker->create_state);
0750     wqe = worker->wqe;
0751     tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
0752     if (!IS_ERR(tsk)) {
0753         io_init_new_worker(wqe, worker, tsk);
0754         io_worker_release(worker);
0755         return;
0756     } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
0757         struct io_wqe_acct *acct = io_wqe_get_acct(worker);
0758 
0759         atomic_dec(&acct->nr_running);
0760         raw_spin_lock(&wqe->lock);
0761         acct->nr_workers--;
0762         if (!acct->nr_workers) {
0763             struct io_cb_cancel_data match = {
0764                 .fn     = io_wq_work_match_all,
0765                 .cancel_all = true,
0766             };
0767 
0768             raw_spin_unlock(&wqe->lock);
0769             while (io_acct_cancel_pending_work(wqe, acct, &match))
0770                 ;
0771         } else {
0772             raw_spin_unlock(&wqe->lock);
0773         }
0774         io_worker_ref_put(wqe->wq);
0775         kfree(worker);
0776         return;
0777     }
0778 
0779     /* re-create attempts grab a new worker ref, drop the existing one */
0780     io_worker_release(worker);
0781     schedule_work(&worker->work);
0782 }
0783 
0784 static void io_workqueue_create(struct work_struct *work)
0785 {
0786     struct io_worker *worker = container_of(work, struct io_worker, work);
0787     struct io_wqe_acct *acct = io_wqe_get_acct(worker);
0788 
0789     if (!io_queue_worker_create(worker, acct, create_worker_cont))
0790         kfree(worker);
0791 }
0792 
0793 static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
0794 {
0795     struct io_wqe_acct *acct = &wqe->acct[index];
0796     struct io_worker *worker;
0797     struct task_struct *tsk;
0798 
0799     __set_current_state(TASK_RUNNING);
0800 
0801     worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
0802     if (!worker) {
0803 fail:
0804         atomic_dec(&acct->nr_running);
0805         raw_spin_lock(&wqe->lock);
0806         acct->nr_workers--;
0807         raw_spin_unlock(&wqe->lock);
0808         io_worker_ref_put(wq);
0809         return false;
0810     }
0811 
0812     refcount_set(&worker->ref, 1);
0813     worker->wqe = wqe;
0814     raw_spin_lock_init(&worker->lock);
0815     init_completion(&worker->ref_done);
0816 
0817     if (index == IO_WQ_ACCT_BOUND)
0818         worker->flags |= IO_WORKER_F_BOUND;
0819 
0820     tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
0821     if (!IS_ERR(tsk)) {
0822         io_init_new_worker(wqe, worker, tsk);
0823     } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
0824         kfree(worker);
0825         goto fail;
0826     } else {
0827         INIT_WORK(&worker->work, io_workqueue_create);
0828         schedule_work(&worker->work);
0829     }
0830 
0831     return true;
0832 }
0833 
0834 /*
0835  * Iterate the passed in list and call the specific function for each
0836  * worker that isn't exiting
0837  */
0838 static bool io_wq_for_each_worker(struct io_wqe *wqe,
0839                   bool (*func)(struct io_worker *, void *),
0840                   void *data)
0841 {
0842     struct io_worker *worker;
0843     bool ret = false;
0844 
0845     list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
0846         if (io_worker_get(worker)) {
0847             /* no task if node is/was offline */
0848             if (worker->task)
0849                 ret = func(worker, data);
0850             io_worker_release(worker);
0851             if (ret)
0852                 break;
0853         }
0854     }
0855 
0856     return ret;
0857 }
0858 
0859 static bool io_wq_worker_wake(struct io_worker *worker, void *data)
0860 {
0861     __set_notify_signal(worker->task);
0862     wake_up_process(worker->task);
0863     return false;
0864 }
0865 
0866 static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
0867 {
0868     struct io_wq *wq = wqe->wq;
0869 
0870     do {
0871         work->flags |= IO_WQ_WORK_CANCEL;
0872         wq->do_work(work);
0873         work = wq->free_work(work);
0874     } while (work);
0875 }
0876 
0877 static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
0878 {
0879     struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
0880     unsigned int hash;
0881     struct io_wq_work *tail;
0882 
0883     if (!io_wq_is_hashed(work)) {
0884 append:
0885         wq_list_add_tail(&work->list, &acct->work_list);
0886         return;
0887     }
0888 
0889     hash = io_get_work_hash(work);
0890     tail = wqe->hash_tail[hash];
0891     wqe->hash_tail[hash] = work;
0892     if (!tail)
0893         goto append;
0894 
0895     wq_list_add_after(&work->list, &tail->list, &acct->work_list);
0896 }
0897 
0898 static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
0899 {
0900     return work == data;
0901 }
0902 
0903 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
0904 {
0905     struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
0906     struct io_cb_cancel_data match;
0907     unsigned work_flags = work->flags;
0908     bool do_create;
0909 
0910     /*
0911      * If io-wq is exiting for this task, or if the request has explicitly
0912      * been marked as one that should not get executed, cancel it here.
0913      */
0914     if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
0915         (work->flags & IO_WQ_WORK_CANCEL)) {
0916         io_run_cancel(work, wqe);
0917         return;
0918     }
0919 
0920     raw_spin_lock(&acct->lock);
0921     io_wqe_insert_work(wqe, work);
0922     clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
0923     raw_spin_unlock(&acct->lock);
0924 
0925     raw_spin_lock(&wqe->lock);
0926     rcu_read_lock();
0927     do_create = !io_wqe_activate_free_worker(wqe, acct);
0928     rcu_read_unlock();
0929 
0930     raw_spin_unlock(&wqe->lock);
0931 
0932     if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
0933         !atomic_read(&acct->nr_running))) {
0934         bool did_create;
0935 
0936         did_create = io_wqe_create_worker(wqe, acct);
0937         if (likely(did_create))
0938             return;
0939 
0940         raw_spin_lock(&wqe->lock);
0941         if (acct->nr_workers) {
0942             raw_spin_unlock(&wqe->lock);
0943             return;
0944         }
0945         raw_spin_unlock(&wqe->lock);
0946 
0947         /* fatal condition, failed to create the first worker */
0948         match.fn        = io_wq_work_match_item,
0949         match.data      = work,
0950         match.cancel_all    = false,
0951 
0952         io_acct_cancel_pending_work(wqe, acct, &match);
0953     }
0954 }
0955 
0956 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
0957 {
0958     struct io_wqe *wqe = wq->wqes[numa_node_id()];
0959 
0960     io_wqe_enqueue(wqe, work);
0961 }
0962 
0963 /*
0964  * Work items that hash to the same value will not be done in parallel.
0965  * Used to limit concurrent writes, generally hashed by inode.
0966  */
0967 void io_wq_hash_work(struct io_wq_work *work, void *val)
0968 {
0969     unsigned int bit;
0970 
0971     bit = hash_ptr(val, IO_WQ_HASH_ORDER);
0972     work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
0973 }
0974 
0975 static bool __io_wq_worker_cancel(struct io_worker *worker,
0976                   struct io_cb_cancel_data *match,
0977                   struct io_wq_work *work)
0978 {
0979     if (work && match->fn(work, match->data)) {
0980         work->flags |= IO_WQ_WORK_CANCEL;
0981         __set_notify_signal(worker->task);
0982         return true;
0983     }
0984 
0985     return false;
0986 }
0987 
0988 static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
0989 {
0990     struct io_cb_cancel_data *match = data;
0991 
0992     /*
0993      * Hold the lock to avoid ->cur_work going out of scope, caller
0994      * may dereference the passed in work.
0995      */
0996     raw_spin_lock(&worker->lock);
0997     if (__io_wq_worker_cancel(worker, match, worker->cur_work) ||
0998         __io_wq_worker_cancel(worker, match, worker->next_work))
0999         match->nr_running++;
1000     raw_spin_unlock(&worker->lock);
1001 
1002     return match->nr_running && !match->cancel_all;
1003 }
1004 
1005 static inline void io_wqe_remove_pending(struct io_wqe *wqe,
1006                      struct io_wq_work *work,
1007                      struct io_wq_work_node *prev)
1008 {
1009     struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
1010     unsigned int hash = io_get_work_hash(work);
1011     struct io_wq_work *prev_work = NULL;
1012 
1013     if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
1014         if (prev)
1015             prev_work = container_of(prev, struct io_wq_work, list);
1016         if (prev_work && io_get_work_hash(prev_work) == hash)
1017             wqe->hash_tail[hash] = prev_work;
1018         else
1019             wqe->hash_tail[hash] = NULL;
1020     }
1021     wq_list_del(&acct->work_list, &work->list, prev);
1022 }
1023 
1024 static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
1025                     struct io_wqe_acct *acct,
1026                     struct io_cb_cancel_data *match)
1027 {
1028     struct io_wq_work_node *node, *prev;
1029     struct io_wq_work *work;
1030 
1031     raw_spin_lock(&acct->lock);
1032     wq_list_for_each(node, prev, &acct->work_list) {
1033         work = container_of(node, struct io_wq_work, list);
1034         if (!match->fn(work, match->data))
1035             continue;
1036         io_wqe_remove_pending(wqe, work, prev);
1037         raw_spin_unlock(&acct->lock);
1038         io_run_cancel(work, wqe);
1039         match->nr_pending++;
1040         /* not safe to continue after unlock */
1041         return true;
1042     }
1043     raw_spin_unlock(&acct->lock);
1044 
1045     return false;
1046 }
1047 
1048 static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
1049                        struct io_cb_cancel_data *match)
1050 {
1051     int i;
1052 retry:
1053     for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1054         struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
1055 
1056         if (io_acct_cancel_pending_work(wqe, acct, match)) {
1057             if (match->cancel_all)
1058                 goto retry;
1059             break;
1060         }
1061     }
1062 }
1063 
1064 static void io_wqe_cancel_running_work(struct io_wqe *wqe,
1065                        struct io_cb_cancel_data *match)
1066 {
1067     rcu_read_lock();
1068     io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
1069     rcu_read_unlock();
1070 }
1071 
1072 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
1073                   void *data, bool cancel_all)
1074 {
1075     struct io_cb_cancel_data match = {
1076         .fn     = cancel,
1077         .data       = data,
1078         .cancel_all = cancel_all,
1079     };
1080     int node;
1081 
1082     /*
1083      * First check pending list, if we're lucky we can just remove it
1084      * from there. CANCEL_OK means that the work is returned as-new,
1085      * no completion will be posted for it.
1086      *
1087      * Then check if a free (going busy) or busy worker has the work
1088      * currently running. If we find it there, we'll return CANCEL_RUNNING
1089      * as an indication that we attempt to signal cancellation. The
1090      * completion will run normally in this case.
1091      *
1092      * Do both of these while holding the wqe->lock, to ensure that
1093      * we'll find a work item regardless of state.
1094      */
1095     for_each_node(node) {
1096         struct io_wqe *wqe = wq->wqes[node];
1097 
1098         io_wqe_cancel_pending_work(wqe, &match);
1099         if (match.nr_pending && !match.cancel_all)
1100             return IO_WQ_CANCEL_OK;
1101 
1102         raw_spin_lock(&wqe->lock);
1103         io_wqe_cancel_running_work(wqe, &match);
1104         raw_spin_unlock(&wqe->lock);
1105         if (match.nr_running && !match.cancel_all)
1106             return IO_WQ_CANCEL_RUNNING;
1107     }
1108 
1109     if (match.nr_running)
1110         return IO_WQ_CANCEL_RUNNING;
1111     if (match.nr_pending)
1112         return IO_WQ_CANCEL_OK;
1113     return IO_WQ_CANCEL_NOTFOUND;
1114 }
1115 
1116 static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
1117                 int sync, void *key)
1118 {
1119     struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
1120     int i;
1121 
1122     list_del_init(&wait->entry);
1123 
1124     rcu_read_lock();
1125     for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1126         struct io_wqe_acct *acct = &wqe->acct[i];
1127 
1128         if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
1129             io_wqe_activate_free_worker(wqe, acct);
1130     }
1131     rcu_read_unlock();
1132     return 1;
1133 }
1134 
1135 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1136 {
1137     int ret, node, i;
1138     struct io_wq *wq;
1139 
1140     if (WARN_ON_ONCE(!data->free_work || !data->do_work))
1141         return ERR_PTR(-EINVAL);
1142     if (WARN_ON_ONCE(!bounded))
1143         return ERR_PTR(-EINVAL);
1144 
1145     wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
1146     if (!wq)
1147         return ERR_PTR(-ENOMEM);
1148     ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1149     if (ret)
1150         goto err_wq;
1151 
1152     refcount_inc(&data->hash->refs);
1153     wq->hash = data->hash;
1154     wq->free_work = data->free_work;
1155     wq->do_work = data->do_work;
1156 
1157     ret = -ENOMEM;
1158     for_each_node(node) {
1159         struct io_wqe *wqe;
1160         int alloc_node = node;
1161 
1162         if (!node_online(alloc_node))
1163             alloc_node = NUMA_NO_NODE;
1164         wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
1165         if (!wqe)
1166             goto err;
1167         if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
1168             goto err;
1169         cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
1170         wq->wqes[node] = wqe;
1171         wqe->node = alloc_node;
1172         wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1173         wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1174                     task_rlimit(current, RLIMIT_NPROC);
1175         INIT_LIST_HEAD(&wqe->wait.entry);
1176         wqe->wait.func = io_wqe_hash_wake;
1177         for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1178             struct io_wqe_acct *acct = &wqe->acct[i];
1179 
1180             acct->index = i;
1181             atomic_set(&acct->nr_running, 0);
1182             INIT_WQ_LIST(&acct->work_list);
1183             raw_spin_lock_init(&acct->lock);
1184         }
1185         wqe->wq = wq;
1186         raw_spin_lock_init(&wqe->lock);
1187         INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1188         INIT_LIST_HEAD(&wqe->all_list);
1189     }
1190 
1191     wq->task = get_task_struct(data->task);
1192     atomic_set(&wq->worker_refs, 1);
1193     init_completion(&wq->worker_done);
1194     return wq;
1195 err:
1196     io_wq_put_hash(data->hash);
1197     cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1198     for_each_node(node) {
1199         if (!wq->wqes[node])
1200             continue;
1201         free_cpumask_var(wq->wqes[node]->cpu_mask);
1202         kfree(wq->wqes[node]);
1203     }
1204 err_wq:
1205     kfree(wq);
1206     return ERR_PTR(ret);
1207 }
1208 
1209 static bool io_task_work_match(struct callback_head *cb, void *data)
1210 {
1211     struct io_worker *worker;
1212 
1213     if (cb->func != create_worker_cb && cb->func != create_worker_cont)
1214         return false;
1215     worker = container_of(cb, struct io_worker, create_work);
1216     return worker->wqe->wq == data;
1217 }
1218 
1219 void io_wq_exit_start(struct io_wq *wq)
1220 {
1221     set_bit(IO_WQ_BIT_EXIT, &wq->state);
1222 }
1223 
1224 static void io_wq_cancel_tw_create(struct io_wq *wq)
1225 {
1226     struct callback_head *cb;
1227 
1228     while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1229         struct io_worker *worker;
1230 
1231         worker = container_of(cb, struct io_worker, create_work);
1232         io_worker_cancel_cb(worker);
1233     }
1234 }
1235 
1236 static void io_wq_exit_workers(struct io_wq *wq)
1237 {
1238     int node;
1239 
1240     if (!wq->task)
1241         return;
1242 
1243     io_wq_cancel_tw_create(wq);
1244 
1245     rcu_read_lock();
1246     for_each_node(node) {
1247         struct io_wqe *wqe = wq->wqes[node];
1248 
1249         io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
1250     }
1251     rcu_read_unlock();
1252     io_worker_ref_put(wq);
1253     wait_for_completion(&wq->worker_done);
1254 
1255     for_each_node(node) {
1256         spin_lock_irq(&wq->hash->wait.lock);
1257         list_del_init(&wq->wqes[node]->wait.entry);
1258         spin_unlock_irq(&wq->hash->wait.lock);
1259     }
1260     put_task_struct(wq->task);
1261     wq->task = NULL;
1262 }
1263 
1264 static void io_wq_destroy(struct io_wq *wq)
1265 {
1266     int node;
1267 
1268     cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1269 
1270     for_each_node(node) {
1271         struct io_wqe *wqe = wq->wqes[node];
1272         struct io_cb_cancel_data match = {
1273             .fn     = io_wq_work_match_all,
1274             .cancel_all = true,
1275         };
1276         io_wqe_cancel_pending_work(wqe, &match);
1277         free_cpumask_var(wqe->cpu_mask);
1278         kfree(wqe);
1279     }
1280     io_wq_put_hash(wq->hash);
1281     kfree(wq);
1282 }
1283 
1284 void io_wq_put_and_exit(struct io_wq *wq)
1285 {
1286     WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));
1287 
1288     io_wq_exit_workers(wq);
1289     io_wq_destroy(wq);
1290 }
1291 
1292 struct online_data {
1293     unsigned int cpu;
1294     bool online;
1295 };
1296 
1297 static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
1298 {
1299     struct online_data *od = data;
1300 
1301     if (od->online)
1302         cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
1303     else
1304         cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
1305     return false;
1306 }
1307 
1308 static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1309 {
1310     struct online_data od = {
1311         .cpu = cpu,
1312         .online = online
1313     };
1314     int i;
1315 
1316     rcu_read_lock();
1317     for_each_node(i)
1318         io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
1319     rcu_read_unlock();
1320     return 0;
1321 }
1322 
1323 static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
1324 {
1325     struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1326 
1327     return __io_wq_cpu_online(wq, cpu, true);
1328 }
1329 
1330 static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
1331 {
1332     struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1333 
1334     return __io_wq_cpu_online(wq, cpu, false);
1335 }
1336 
1337 int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
1338 {
1339     int i;
1340 
1341     rcu_read_lock();
1342     for_each_node(i) {
1343         struct io_wqe *wqe = wq->wqes[i];
1344 
1345         if (mask)
1346             cpumask_copy(wqe->cpu_mask, mask);
1347         else
1348             cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
1349     }
1350     rcu_read_unlock();
1351     return 0;
1352 }
1353 
1354 /*
1355  * Set max number of unbounded workers, returns old value. If new_count is 0,
1356  * then just return the old value.
1357  */
1358 int io_wq_max_workers(struct io_wq *wq, int *new_count)
1359 {
1360     int prev[IO_WQ_ACCT_NR];
1361     bool first_node = true;
1362     int i, node;
1363 
1364     BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND   != (int) IO_WQ_BOUND);
1365     BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
1366     BUILD_BUG_ON((int) IO_WQ_ACCT_NR      != 2);
1367 
1368     for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1369         if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
1370             new_count[i] = task_rlimit(current, RLIMIT_NPROC);
1371     }
1372 
1373     for (i = 0; i < IO_WQ_ACCT_NR; i++)
1374         prev[i] = 0;
1375 
1376     rcu_read_lock();
1377     for_each_node(node) {
1378         struct io_wqe *wqe = wq->wqes[node];
1379         struct io_wqe_acct *acct;
1380 
1381         raw_spin_lock(&wqe->lock);
1382         for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1383             acct = &wqe->acct[i];
1384             if (first_node)
1385                 prev[i] = max_t(int, acct->max_workers, prev[i]);
1386             if (new_count[i])
1387                 acct->max_workers = new_count[i];
1388         }
1389         raw_spin_unlock(&wqe->lock);
1390         first_node = false;
1391     }
1392     rcu_read_unlock();
1393 
1394     for (i = 0; i < IO_WQ_ACCT_NR; i++)
1395         new_count[i] = prev[i];
1396 
1397     return 0;
1398 }
1399 
1400 static __init int io_wq_init(void)
1401 {
1402     int ret;
1403 
1404     ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
1405                     io_wq_cpu_online, io_wq_cpu_offline);
1406     if (ret < 0)
1407         return ret;
1408     io_wq_online = ret;
1409     return 0;
1410 }
1411 subsys_initcall(io_wq_init);