0001
0002
0003
0004
0005
0006
0007
0008 #include <linux/kernel.h>
0009 #include <linux/init.h>
0010 #include <linux/errno.h>
0011 #include <linux/sched/signal.h>
0012 #include <linux/percpu.h>
0013 #include <linux/slab.h>
0014 #include <linux/rculist_nulls.h>
0015 #include <linux/cpu.h>
0016 #include <linux/task_work.h>
0017 #include <linux/audit.h>
0018 #include <uapi/linux/io_uring.h>
0019
0020 #include "io-wq.h"
0021 #include "slist.h"
0022 #include "io_uring.h"
0023
0024 #define WORKER_IDLE_TIMEOUT (5 * HZ)
0025
0026 enum {
0027 IO_WORKER_F_UP = 1,
0028 IO_WORKER_F_RUNNING = 2,
0029 IO_WORKER_F_FREE = 4,
0030 IO_WORKER_F_BOUND = 8,
0031 };
0032
0033 enum {
0034 IO_WQ_BIT_EXIT = 0,
0035 };
0036
0037 enum {
0038 IO_ACCT_STALLED_BIT = 0,
0039 };
0040
0041
0042
0043
0044 struct io_worker {
0045 refcount_t ref;
0046 unsigned flags;
0047 struct hlist_nulls_node nulls_node;
0048 struct list_head all_list;
0049 struct task_struct *task;
0050 struct io_wqe *wqe;
0051
0052 struct io_wq_work *cur_work;
0053 struct io_wq_work *next_work;
0054 raw_spinlock_t lock;
0055
0056 struct completion ref_done;
0057
0058 unsigned long create_state;
0059 struct callback_head create_work;
0060 int create_index;
0061
0062 union {
0063 struct rcu_head rcu;
0064 struct work_struct work;
0065 };
0066 };
0067
0068 #if BITS_PER_LONG == 64
0069 #define IO_WQ_HASH_ORDER 6
0070 #else
0071 #define IO_WQ_HASH_ORDER 5
0072 #endif
0073
0074 #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER)
0075
0076 struct io_wqe_acct {
0077 unsigned nr_workers;
0078 unsigned max_workers;
0079 int index;
0080 atomic_t nr_running;
0081 raw_spinlock_t lock;
0082 struct io_wq_work_list work_list;
0083 unsigned long flags;
0084 };
0085
0086 enum {
0087 IO_WQ_ACCT_BOUND,
0088 IO_WQ_ACCT_UNBOUND,
0089 IO_WQ_ACCT_NR,
0090 };
0091
0092
0093
0094
0095 struct io_wqe {
0096 raw_spinlock_t lock;
0097 struct io_wqe_acct acct[IO_WQ_ACCT_NR];
0098
0099 int node;
0100
0101 struct hlist_nulls_head free_list;
0102 struct list_head all_list;
0103
0104 struct wait_queue_entry wait;
0105
0106 struct io_wq *wq;
0107 struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
0108
0109 cpumask_var_t cpu_mask;
0110 };
0111
0112
0113
0114
0115 struct io_wq {
0116 unsigned long state;
0117
0118 free_work_fn *free_work;
0119 io_wq_work_fn *do_work;
0120
0121 struct io_wq_hash *hash;
0122
0123 atomic_t worker_refs;
0124 struct completion worker_done;
0125
0126 struct hlist_node cpuhp_node;
0127
0128 struct task_struct *task;
0129
0130 struct io_wqe *wqes[];
0131 };
0132
0133 static enum cpuhp_state io_wq_online;
0134
0135 struct io_cb_cancel_data {
0136 work_cancel_fn *fn;
0137 void *data;
0138 int nr_running;
0139 int nr_pending;
0140 bool cancel_all;
0141 };
0142
0143 static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
0144 static void io_wqe_dec_running(struct io_worker *worker);
0145 static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
0146 struct io_wqe_acct *acct,
0147 struct io_cb_cancel_data *match);
0148 static void create_worker_cb(struct callback_head *cb);
0149 static void io_wq_cancel_tw_create(struct io_wq *wq);
0150
0151 static bool io_worker_get(struct io_worker *worker)
0152 {
0153 return refcount_inc_not_zero(&worker->ref);
0154 }
0155
0156 static void io_worker_release(struct io_worker *worker)
0157 {
0158 if (refcount_dec_and_test(&worker->ref))
0159 complete(&worker->ref_done);
0160 }
0161
0162 static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
0163 {
0164 return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
0165 }
0166
0167 static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
0168 struct io_wq_work *work)
0169 {
0170 return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
0171 }
0172
0173 static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
0174 {
0175 return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
0176 }
0177
0178 static void io_worker_ref_put(struct io_wq *wq)
0179 {
0180 if (atomic_dec_and_test(&wq->worker_refs))
0181 complete(&wq->worker_done);
0182 }
0183
0184 static void io_worker_cancel_cb(struct io_worker *worker)
0185 {
0186 struct io_wqe_acct *acct = io_wqe_get_acct(worker);
0187 struct io_wqe *wqe = worker->wqe;
0188 struct io_wq *wq = wqe->wq;
0189
0190 atomic_dec(&acct->nr_running);
0191 raw_spin_lock(&worker->wqe->lock);
0192 acct->nr_workers--;
0193 raw_spin_unlock(&worker->wqe->lock);
0194 io_worker_ref_put(wq);
0195 clear_bit_unlock(0, &worker->create_state);
0196 io_worker_release(worker);
0197 }
0198
0199 static bool io_task_worker_match(struct callback_head *cb, void *data)
0200 {
0201 struct io_worker *worker;
0202
0203 if (cb->func != create_worker_cb)
0204 return false;
0205 worker = container_of(cb, struct io_worker, create_work);
0206 return worker == data;
0207 }
0208
0209 static void io_worker_exit(struct io_worker *worker)
0210 {
0211 struct io_wqe *wqe = worker->wqe;
0212 struct io_wq *wq = wqe->wq;
0213
0214 while (1) {
0215 struct callback_head *cb = task_work_cancel_match(wq->task,
0216 io_task_worker_match, worker);
0217
0218 if (!cb)
0219 break;
0220 io_worker_cancel_cb(worker);
0221 }
0222
0223 io_worker_release(worker);
0224 wait_for_completion(&worker->ref_done);
0225
0226 raw_spin_lock(&wqe->lock);
0227 if (worker->flags & IO_WORKER_F_FREE)
0228 hlist_nulls_del_rcu(&worker->nulls_node);
0229 list_del_rcu(&worker->all_list);
0230 raw_spin_unlock(&wqe->lock);
0231 io_wqe_dec_running(worker);
0232 worker->flags = 0;
0233 preempt_disable();
0234 current->flags &= ~PF_IO_WORKER;
0235 preempt_enable();
0236
0237 kfree_rcu(worker, rcu);
0238 io_worker_ref_put(wqe->wq);
0239 do_exit(0);
0240 }
0241
0242 static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
0243 {
0244 bool ret = false;
0245
0246 raw_spin_lock(&acct->lock);
0247 if (!wq_list_empty(&acct->work_list) &&
0248 !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
0249 ret = true;
0250 raw_spin_unlock(&acct->lock);
0251
0252 return ret;
0253 }
0254
0255
0256
0257
0258
0259 static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
0260 struct io_wqe_acct *acct)
0261 __must_hold(RCU)
0262 {
0263 struct hlist_nulls_node *n;
0264 struct io_worker *worker;
0265
0266
0267
0268
0269
0270
0271 hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
0272 if (!io_worker_get(worker))
0273 continue;
0274 if (io_wqe_get_acct(worker) != acct) {
0275 io_worker_release(worker);
0276 continue;
0277 }
0278 if (wake_up_process(worker->task)) {
0279 io_worker_release(worker);
0280 return true;
0281 }
0282 io_worker_release(worker);
0283 }
0284
0285 return false;
0286 }
0287
0288
0289
0290
0291
0292 static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
0293 {
0294
0295
0296
0297
0298 if (unlikely(!acct->max_workers))
0299 pr_warn_once("io-wq is not configured for unbound workers");
0300
0301 raw_spin_lock(&wqe->lock);
0302 if (acct->nr_workers >= acct->max_workers) {
0303 raw_spin_unlock(&wqe->lock);
0304 return true;
0305 }
0306 acct->nr_workers++;
0307 raw_spin_unlock(&wqe->lock);
0308 atomic_inc(&acct->nr_running);
0309 atomic_inc(&wqe->wq->worker_refs);
0310 return create_io_worker(wqe->wq, wqe, acct->index);
0311 }
0312
0313 static void io_wqe_inc_running(struct io_worker *worker)
0314 {
0315 struct io_wqe_acct *acct = io_wqe_get_acct(worker);
0316
0317 atomic_inc(&acct->nr_running);
0318 }
0319
0320 static void create_worker_cb(struct callback_head *cb)
0321 {
0322 struct io_worker *worker;
0323 struct io_wq *wq;
0324 struct io_wqe *wqe;
0325 struct io_wqe_acct *acct;
0326 bool do_create = false;
0327
0328 worker = container_of(cb, struct io_worker, create_work);
0329 wqe = worker->wqe;
0330 wq = wqe->wq;
0331 acct = &wqe->acct[worker->create_index];
0332 raw_spin_lock(&wqe->lock);
0333 if (acct->nr_workers < acct->max_workers) {
0334 acct->nr_workers++;
0335 do_create = true;
0336 }
0337 raw_spin_unlock(&wqe->lock);
0338 if (do_create) {
0339 create_io_worker(wq, wqe, worker->create_index);
0340 } else {
0341 atomic_dec(&acct->nr_running);
0342 io_worker_ref_put(wq);
0343 }
0344 clear_bit_unlock(0, &worker->create_state);
0345 io_worker_release(worker);
0346 }
0347
0348 static bool io_queue_worker_create(struct io_worker *worker,
0349 struct io_wqe_acct *acct,
0350 task_work_func_t func)
0351 {
0352 struct io_wqe *wqe = worker->wqe;
0353 struct io_wq *wq = wqe->wq;
0354
0355
0356 if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
0357 goto fail;
0358 if (!io_worker_get(worker))
0359 goto fail;
0360
0361
0362
0363
0364
0365
0366 if (test_bit(0, &worker->create_state) ||
0367 test_and_set_bit_lock(0, &worker->create_state))
0368 goto fail_release;
0369
0370 atomic_inc(&wq->worker_refs);
0371 init_task_work(&worker->create_work, func);
0372 worker->create_index = acct->index;
0373 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
0374
0375
0376
0377
0378
0379
0380 if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
0381 io_wq_cancel_tw_create(wq);
0382 io_worker_ref_put(wq);
0383 return true;
0384 }
0385 io_worker_ref_put(wq);
0386 clear_bit_unlock(0, &worker->create_state);
0387 fail_release:
0388 io_worker_release(worker);
0389 fail:
0390 atomic_dec(&acct->nr_running);
0391 io_worker_ref_put(wq);
0392 return false;
0393 }
0394
0395 static void io_wqe_dec_running(struct io_worker *worker)
0396 {
0397 struct io_wqe_acct *acct = io_wqe_get_acct(worker);
0398 struct io_wqe *wqe = worker->wqe;
0399
0400 if (!(worker->flags & IO_WORKER_F_UP))
0401 return;
0402
0403 if (!atomic_dec_and_test(&acct->nr_running))
0404 return;
0405 if (!io_acct_run_queue(acct))
0406 return;
0407
0408 atomic_inc(&acct->nr_running);
0409 atomic_inc(&wqe->wq->worker_refs);
0410 io_queue_worker_create(worker, acct, create_worker_cb);
0411 }
0412
0413
0414
0415
0416
0417 static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker)
0418 {
0419 if (worker->flags & IO_WORKER_F_FREE) {
0420 worker->flags &= ~IO_WORKER_F_FREE;
0421 raw_spin_lock(&wqe->lock);
0422 hlist_nulls_del_init_rcu(&worker->nulls_node);
0423 raw_spin_unlock(&wqe->lock);
0424 }
0425 }
0426
0427
0428
0429
0430
0431
0432
0433
0434 static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
0435 __must_hold(wqe->lock)
0436 {
0437 if (!(worker->flags & IO_WORKER_F_FREE)) {
0438 worker->flags |= IO_WORKER_F_FREE;
0439 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
0440 }
0441 }
0442
0443 static inline unsigned int io_get_work_hash(struct io_wq_work *work)
0444 {
0445 return work->flags >> IO_WQ_HASH_SHIFT;
0446 }
0447
0448 static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
0449 {
0450 struct io_wq *wq = wqe->wq;
0451 bool ret = false;
0452
0453 spin_lock_irq(&wq->hash->wait.lock);
0454 if (list_empty(&wqe->wait.entry)) {
0455 __add_wait_queue(&wq->hash->wait, &wqe->wait);
0456 if (!test_bit(hash, &wq->hash->map)) {
0457 __set_current_state(TASK_RUNNING);
0458 list_del_init(&wqe->wait.entry);
0459 ret = true;
0460 }
0461 }
0462 spin_unlock_irq(&wq->hash->wait.lock);
0463 return ret;
0464 }
0465
0466 static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
0467 struct io_worker *worker)
0468 __must_hold(acct->lock)
0469 {
0470 struct io_wq_work_node *node, *prev;
0471 struct io_wq_work *work, *tail;
0472 unsigned int stall_hash = -1U;
0473 struct io_wqe *wqe = worker->wqe;
0474
0475 wq_list_for_each(node, prev, &acct->work_list) {
0476 unsigned int hash;
0477
0478 work = container_of(node, struct io_wq_work, list);
0479
0480
0481 if (!io_wq_is_hashed(work)) {
0482 wq_list_del(&acct->work_list, node, prev);
0483 return work;
0484 }
0485
0486 hash = io_get_work_hash(work);
0487
0488 tail = wqe->hash_tail[hash];
0489
0490
0491 if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
0492 wqe->hash_tail[hash] = NULL;
0493 wq_list_cut(&acct->work_list, &tail->list, prev);
0494 return work;
0495 }
0496 if (stall_hash == -1U)
0497 stall_hash = hash;
0498
0499 node = &tail->list;
0500 }
0501
0502 if (stall_hash != -1U) {
0503 bool unstalled;
0504
0505
0506
0507
0508
0509 set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
0510 raw_spin_unlock(&acct->lock);
0511 unstalled = io_wait_on_hash(wqe, stall_hash);
0512 raw_spin_lock(&acct->lock);
0513 if (unstalled) {
0514 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
0515 if (wq_has_sleeper(&wqe->wq->hash->wait))
0516 wake_up(&wqe->wq->hash->wait);
0517 }
0518 }
0519
0520 return NULL;
0521 }
0522
0523 static void io_assign_current_work(struct io_worker *worker,
0524 struct io_wq_work *work)
0525 {
0526 if (work) {
0527 io_run_task_work();
0528 cond_resched();
0529 }
0530
0531 raw_spin_lock(&worker->lock);
0532 worker->cur_work = work;
0533 worker->next_work = NULL;
0534 raw_spin_unlock(&worker->lock);
0535 }
0536
0537 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
0538
0539 static void io_worker_handle_work(struct io_worker *worker)
0540 {
0541 struct io_wqe_acct *acct = io_wqe_get_acct(worker);
0542 struct io_wqe *wqe = worker->wqe;
0543 struct io_wq *wq = wqe->wq;
0544 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
0545
0546 do {
0547 struct io_wq_work *work;
0548
0549
0550
0551
0552
0553
0554
0555
0556 raw_spin_lock(&acct->lock);
0557 work = io_get_next_work(acct, worker);
0558 raw_spin_unlock(&acct->lock);
0559 if (work) {
0560 __io_worker_busy(wqe, worker);
0561
0562
0563
0564
0565
0566
0567
0568
0569 raw_spin_lock(&worker->lock);
0570 worker->next_work = work;
0571 raw_spin_unlock(&worker->lock);
0572 } else {
0573 break;
0574 }
0575 io_assign_current_work(worker, work);
0576 __set_current_state(TASK_RUNNING);
0577
0578
0579 do {
0580 struct io_wq_work *next_hashed, *linked;
0581 unsigned int hash = io_get_work_hash(work);
0582
0583 next_hashed = wq_next_work(work);
0584
0585 if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
0586 work->flags |= IO_WQ_WORK_CANCEL;
0587 wq->do_work(work);
0588 io_assign_current_work(worker, NULL);
0589
0590 linked = wq->free_work(work);
0591 work = next_hashed;
0592 if (!work && linked && !io_wq_is_hashed(linked)) {
0593 work = linked;
0594 linked = NULL;
0595 }
0596 io_assign_current_work(worker, work);
0597 if (linked)
0598 io_wqe_enqueue(wqe, linked);
0599
0600 if (hash != -1U && !next_hashed) {
0601
0602 spin_lock_irq(&wq->hash->wait.lock);
0603 clear_bit(hash, &wq->hash->map);
0604 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
0605 spin_unlock_irq(&wq->hash->wait.lock);
0606 if (wq_has_sleeper(&wq->hash->wait))
0607 wake_up(&wq->hash->wait);
0608 }
0609 } while (work);
0610 } while (1);
0611 }
0612
0613 static int io_wqe_worker(void *data)
0614 {
0615 struct io_worker *worker = data;
0616 struct io_wqe_acct *acct = io_wqe_get_acct(worker);
0617 struct io_wqe *wqe = worker->wqe;
0618 struct io_wq *wq = wqe->wq;
0619 bool last_timeout = false;
0620 char buf[TASK_COMM_LEN];
0621
0622 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
0623
0624 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
0625 set_task_comm(current, buf);
0626
0627 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
0628 long ret;
0629
0630 set_current_state(TASK_INTERRUPTIBLE);
0631 while (io_acct_run_queue(acct))
0632 io_worker_handle_work(worker);
0633
0634 raw_spin_lock(&wqe->lock);
0635
0636 if (last_timeout && acct->nr_workers > 1) {
0637 acct->nr_workers--;
0638 raw_spin_unlock(&wqe->lock);
0639 __set_current_state(TASK_RUNNING);
0640 break;
0641 }
0642 last_timeout = false;
0643 __io_worker_idle(wqe, worker);
0644 raw_spin_unlock(&wqe->lock);
0645 if (io_run_task_work())
0646 continue;
0647 ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
0648 if (signal_pending(current)) {
0649 struct ksignal ksig;
0650
0651 if (!get_signal(&ksig))
0652 continue;
0653 break;
0654 }
0655 last_timeout = !ret;
0656 }
0657
0658 if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
0659 io_worker_handle_work(worker);
0660
0661 io_worker_exit(worker);
0662 return 0;
0663 }
0664
0665
0666
0667
0668 void io_wq_worker_running(struct task_struct *tsk)
0669 {
0670 struct io_worker *worker = tsk->worker_private;
0671
0672 if (!worker)
0673 return;
0674 if (!(worker->flags & IO_WORKER_F_UP))
0675 return;
0676 if (worker->flags & IO_WORKER_F_RUNNING)
0677 return;
0678 worker->flags |= IO_WORKER_F_RUNNING;
0679 io_wqe_inc_running(worker);
0680 }
0681
0682
0683
0684
0685
0686 void io_wq_worker_sleeping(struct task_struct *tsk)
0687 {
0688 struct io_worker *worker = tsk->worker_private;
0689
0690 if (!worker)
0691 return;
0692 if (!(worker->flags & IO_WORKER_F_UP))
0693 return;
0694 if (!(worker->flags & IO_WORKER_F_RUNNING))
0695 return;
0696
0697 worker->flags &= ~IO_WORKER_F_RUNNING;
0698 io_wqe_dec_running(worker);
0699 }
0700
0701 static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
0702 struct task_struct *tsk)
0703 {
0704 tsk->worker_private = worker;
0705 worker->task = tsk;
0706 set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
0707 tsk->flags |= PF_NO_SETAFFINITY;
0708
0709 raw_spin_lock(&wqe->lock);
0710 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
0711 list_add_tail_rcu(&worker->all_list, &wqe->all_list);
0712 worker->flags |= IO_WORKER_F_FREE;
0713 raw_spin_unlock(&wqe->lock);
0714 wake_up_new_task(tsk);
0715 }
0716
0717 static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
0718 {
0719 return true;
0720 }
0721
0722 static inline bool io_should_retry_thread(long err)
0723 {
0724
0725
0726
0727
0728 if (fatal_signal_pending(current))
0729 return false;
0730
0731 switch (err) {
0732 case -EAGAIN:
0733 case -ERESTARTSYS:
0734 case -ERESTARTNOINTR:
0735 case -ERESTARTNOHAND:
0736 return true;
0737 default:
0738 return false;
0739 }
0740 }
0741
0742 static void create_worker_cont(struct callback_head *cb)
0743 {
0744 struct io_worker *worker;
0745 struct task_struct *tsk;
0746 struct io_wqe *wqe;
0747
0748 worker = container_of(cb, struct io_worker, create_work);
0749 clear_bit_unlock(0, &worker->create_state);
0750 wqe = worker->wqe;
0751 tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
0752 if (!IS_ERR(tsk)) {
0753 io_init_new_worker(wqe, worker, tsk);
0754 io_worker_release(worker);
0755 return;
0756 } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
0757 struct io_wqe_acct *acct = io_wqe_get_acct(worker);
0758
0759 atomic_dec(&acct->nr_running);
0760 raw_spin_lock(&wqe->lock);
0761 acct->nr_workers--;
0762 if (!acct->nr_workers) {
0763 struct io_cb_cancel_data match = {
0764 .fn = io_wq_work_match_all,
0765 .cancel_all = true,
0766 };
0767
0768 raw_spin_unlock(&wqe->lock);
0769 while (io_acct_cancel_pending_work(wqe, acct, &match))
0770 ;
0771 } else {
0772 raw_spin_unlock(&wqe->lock);
0773 }
0774 io_worker_ref_put(wqe->wq);
0775 kfree(worker);
0776 return;
0777 }
0778
0779
0780 io_worker_release(worker);
0781 schedule_work(&worker->work);
0782 }
0783
0784 static void io_workqueue_create(struct work_struct *work)
0785 {
0786 struct io_worker *worker = container_of(work, struct io_worker, work);
0787 struct io_wqe_acct *acct = io_wqe_get_acct(worker);
0788
0789 if (!io_queue_worker_create(worker, acct, create_worker_cont))
0790 kfree(worker);
0791 }
0792
0793 static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
0794 {
0795 struct io_wqe_acct *acct = &wqe->acct[index];
0796 struct io_worker *worker;
0797 struct task_struct *tsk;
0798
0799 __set_current_state(TASK_RUNNING);
0800
0801 worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
0802 if (!worker) {
0803 fail:
0804 atomic_dec(&acct->nr_running);
0805 raw_spin_lock(&wqe->lock);
0806 acct->nr_workers--;
0807 raw_spin_unlock(&wqe->lock);
0808 io_worker_ref_put(wq);
0809 return false;
0810 }
0811
0812 refcount_set(&worker->ref, 1);
0813 worker->wqe = wqe;
0814 raw_spin_lock_init(&worker->lock);
0815 init_completion(&worker->ref_done);
0816
0817 if (index == IO_WQ_ACCT_BOUND)
0818 worker->flags |= IO_WORKER_F_BOUND;
0819
0820 tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
0821 if (!IS_ERR(tsk)) {
0822 io_init_new_worker(wqe, worker, tsk);
0823 } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
0824 kfree(worker);
0825 goto fail;
0826 } else {
0827 INIT_WORK(&worker->work, io_workqueue_create);
0828 schedule_work(&worker->work);
0829 }
0830
0831 return true;
0832 }
0833
0834
0835
0836
0837
0838 static bool io_wq_for_each_worker(struct io_wqe *wqe,
0839 bool (*func)(struct io_worker *, void *),
0840 void *data)
0841 {
0842 struct io_worker *worker;
0843 bool ret = false;
0844
0845 list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
0846 if (io_worker_get(worker)) {
0847
0848 if (worker->task)
0849 ret = func(worker, data);
0850 io_worker_release(worker);
0851 if (ret)
0852 break;
0853 }
0854 }
0855
0856 return ret;
0857 }
0858
0859 static bool io_wq_worker_wake(struct io_worker *worker, void *data)
0860 {
0861 __set_notify_signal(worker->task);
0862 wake_up_process(worker->task);
0863 return false;
0864 }
0865
0866 static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
0867 {
0868 struct io_wq *wq = wqe->wq;
0869
0870 do {
0871 work->flags |= IO_WQ_WORK_CANCEL;
0872 wq->do_work(work);
0873 work = wq->free_work(work);
0874 } while (work);
0875 }
0876
0877 static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
0878 {
0879 struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
0880 unsigned int hash;
0881 struct io_wq_work *tail;
0882
0883 if (!io_wq_is_hashed(work)) {
0884 append:
0885 wq_list_add_tail(&work->list, &acct->work_list);
0886 return;
0887 }
0888
0889 hash = io_get_work_hash(work);
0890 tail = wqe->hash_tail[hash];
0891 wqe->hash_tail[hash] = work;
0892 if (!tail)
0893 goto append;
0894
0895 wq_list_add_after(&work->list, &tail->list, &acct->work_list);
0896 }
0897
0898 static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
0899 {
0900 return work == data;
0901 }
0902
0903 static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
0904 {
0905 struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
0906 struct io_cb_cancel_data match;
0907 unsigned work_flags = work->flags;
0908 bool do_create;
0909
0910
0911
0912
0913
0914 if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
0915 (work->flags & IO_WQ_WORK_CANCEL)) {
0916 io_run_cancel(work, wqe);
0917 return;
0918 }
0919
0920 raw_spin_lock(&acct->lock);
0921 io_wqe_insert_work(wqe, work);
0922 clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
0923 raw_spin_unlock(&acct->lock);
0924
0925 raw_spin_lock(&wqe->lock);
0926 rcu_read_lock();
0927 do_create = !io_wqe_activate_free_worker(wqe, acct);
0928 rcu_read_unlock();
0929
0930 raw_spin_unlock(&wqe->lock);
0931
0932 if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
0933 !atomic_read(&acct->nr_running))) {
0934 bool did_create;
0935
0936 did_create = io_wqe_create_worker(wqe, acct);
0937 if (likely(did_create))
0938 return;
0939
0940 raw_spin_lock(&wqe->lock);
0941 if (acct->nr_workers) {
0942 raw_spin_unlock(&wqe->lock);
0943 return;
0944 }
0945 raw_spin_unlock(&wqe->lock);
0946
0947
0948 match.fn = io_wq_work_match_item,
0949 match.data = work,
0950 match.cancel_all = false,
0951
0952 io_acct_cancel_pending_work(wqe, acct, &match);
0953 }
0954 }
0955
0956 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
0957 {
0958 struct io_wqe *wqe = wq->wqes[numa_node_id()];
0959
0960 io_wqe_enqueue(wqe, work);
0961 }
0962
0963
0964
0965
0966
0967 void io_wq_hash_work(struct io_wq_work *work, void *val)
0968 {
0969 unsigned int bit;
0970
0971 bit = hash_ptr(val, IO_WQ_HASH_ORDER);
0972 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
0973 }
0974
0975 static bool __io_wq_worker_cancel(struct io_worker *worker,
0976 struct io_cb_cancel_data *match,
0977 struct io_wq_work *work)
0978 {
0979 if (work && match->fn(work, match->data)) {
0980 work->flags |= IO_WQ_WORK_CANCEL;
0981 __set_notify_signal(worker->task);
0982 return true;
0983 }
0984
0985 return false;
0986 }
0987
0988 static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
0989 {
0990 struct io_cb_cancel_data *match = data;
0991
0992
0993
0994
0995
0996 raw_spin_lock(&worker->lock);
0997 if (__io_wq_worker_cancel(worker, match, worker->cur_work) ||
0998 __io_wq_worker_cancel(worker, match, worker->next_work))
0999 match->nr_running++;
1000 raw_spin_unlock(&worker->lock);
1001
1002 return match->nr_running && !match->cancel_all;
1003 }
1004
1005 static inline void io_wqe_remove_pending(struct io_wqe *wqe,
1006 struct io_wq_work *work,
1007 struct io_wq_work_node *prev)
1008 {
1009 struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
1010 unsigned int hash = io_get_work_hash(work);
1011 struct io_wq_work *prev_work = NULL;
1012
1013 if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
1014 if (prev)
1015 prev_work = container_of(prev, struct io_wq_work, list);
1016 if (prev_work && io_get_work_hash(prev_work) == hash)
1017 wqe->hash_tail[hash] = prev_work;
1018 else
1019 wqe->hash_tail[hash] = NULL;
1020 }
1021 wq_list_del(&acct->work_list, &work->list, prev);
1022 }
1023
1024 static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
1025 struct io_wqe_acct *acct,
1026 struct io_cb_cancel_data *match)
1027 {
1028 struct io_wq_work_node *node, *prev;
1029 struct io_wq_work *work;
1030
1031 raw_spin_lock(&acct->lock);
1032 wq_list_for_each(node, prev, &acct->work_list) {
1033 work = container_of(node, struct io_wq_work, list);
1034 if (!match->fn(work, match->data))
1035 continue;
1036 io_wqe_remove_pending(wqe, work, prev);
1037 raw_spin_unlock(&acct->lock);
1038 io_run_cancel(work, wqe);
1039 match->nr_pending++;
1040
1041 return true;
1042 }
1043 raw_spin_unlock(&acct->lock);
1044
1045 return false;
1046 }
1047
1048 static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
1049 struct io_cb_cancel_data *match)
1050 {
1051 int i;
1052 retry:
1053 for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1054 struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
1055
1056 if (io_acct_cancel_pending_work(wqe, acct, match)) {
1057 if (match->cancel_all)
1058 goto retry;
1059 break;
1060 }
1061 }
1062 }
1063
1064 static void io_wqe_cancel_running_work(struct io_wqe *wqe,
1065 struct io_cb_cancel_data *match)
1066 {
1067 rcu_read_lock();
1068 io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
1069 rcu_read_unlock();
1070 }
1071
1072 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
1073 void *data, bool cancel_all)
1074 {
1075 struct io_cb_cancel_data match = {
1076 .fn = cancel,
1077 .data = data,
1078 .cancel_all = cancel_all,
1079 };
1080 int node;
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095 for_each_node(node) {
1096 struct io_wqe *wqe = wq->wqes[node];
1097
1098 io_wqe_cancel_pending_work(wqe, &match);
1099 if (match.nr_pending && !match.cancel_all)
1100 return IO_WQ_CANCEL_OK;
1101
1102 raw_spin_lock(&wqe->lock);
1103 io_wqe_cancel_running_work(wqe, &match);
1104 raw_spin_unlock(&wqe->lock);
1105 if (match.nr_running && !match.cancel_all)
1106 return IO_WQ_CANCEL_RUNNING;
1107 }
1108
1109 if (match.nr_running)
1110 return IO_WQ_CANCEL_RUNNING;
1111 if (match.nr_pending)
1112 return IO_WQ_CANCEL_OK;
1113 return IO_WQ_CANCEL_NOTFOUND;
1114 }
1115
1116 static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
1117 int sync, void *key)
1118 {
1119 struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
1120 int i;
1121
1122 list_del_init(&wait->entry);
1123
1124 rcu_read_lock();
1125 for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1126 struct io_wqe_acct *acct = &wqe->acct[i];
1127
1128 if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
1129 io_wqe_activate_free_worker(wqe, acct);
1130 }
1131 rcu_read_unlock();
1132 return 1;
1133 }
1134
1135 struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1136 {
1137 int ret, node, i;
1138 struct io_wq *wq;
1139
1140 if (WARN_ON_ONCE(!data->free_work || !data->do_work))
1141 return ERR_PTR(-EINVAL);
1142 if (WARN_ON_ONCE(!bounded))
1143 return ERR_PTR(-EINVAL);
1144
1145 wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
1146 if (!wq)
1147 return ERR_PTR(-ENOMEM);
1148 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1149 if (ret)
1150 goto err_wq;
1151
1152 refcount_inc(&data->hash->refs);
1153 wq->hash = data->hash;
1154 wq->free_work = data->free_work;
1155 wq->do_work = data->do_work;
1156
1157 ret = -ENOMEM;
1158 for_each_node(node) {
1159 struct io_wqe *wqe;
1160 int alloc_node = node;
1161
1162 if (!node_online(alloc_node))
1163 alloc_node = NUMA_NO_NODE;
1164 wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
1165 if (!wqe)
1166 goto err;
1167 if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
1168 goto err;
1169 cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
1170 wq->wqes[node] = wqe;
1171 wqe->node = alloc_node;
1172 wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1173 wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1174 task_rlimit(current, RLIMIT_NPROC);
1175 INIT_LIST_HEAD(&wqe->wait.entry);
1176 wqe->wait.func = io_wqe_hash_wake;
1177 for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1178 struct io_wqe_acct *acct = &wqe->acct[i];
1179
1180 acct->index = i;
1181 atomic_set(&acct->nr_running, 0);
1182 INIT_WQ_LIST(&acct->work_list);
1183 raw_spin_lock_init(&acct->lock);
1184 }
1185 wqe->wq = wq;
1186 raw_spin_lock_init(&wqe->lock);
1187 INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1188 INIT_LIST_HEAD(&wqe->all_list);
1189 }
1190
1191 wq->task = get_task_struct(data->task);
1192 atomic_set(&wq->worker_refs, 1);
1193 init_completion(&wq->worker_done);
1194 return wq;
1195 err:
1196 io_wq_put_hash(data->hash);
1197 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1198 for_each_node(node) {
1199 if (!wq->wqes[node])
1200 continue;
1201 free_cpumask_var(wq->wqes[node]->cpu_mask);
1202 kfree(wq->wqes[node]);
1203 }
1204 err_wq:
1205 kfree(wq);
1206 return ERR_PTR(ret);
1207 }
1208
1209 static bool io_task_work_match(struct callback_head *cb, void *data)
1210 {
1211 struct io_worker *worker;
1212
1213 if (cb->func != create_worker_cb && cb->func != create_worker_cont)
1214 return false;
1215 worker = container_of(cb, struct io_worker, create_work);
1216 return worker->wqe->wq == data;
1217 }
1218
1219 void io_wq_exit_start(struct io_wq *wq)
1220 {
1221 set_bit(IO_WQ_BIT_EXIT, &wq->state);
1222 }
1223
1224 static void io_wq_cancel_tw_create(struct io_wq *wq)
1225 {
1226 struct callback_head *cb;
1227
1228 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1229 struct io_worker *worker;
1230
1231 worker = container_of(cb, struct io_worker, create_work);
1232 io_worker_cancel_cb(worker);
1233 }
1234 }
1235
1236 static void io_wq_exit_workers(struct io_wq *wq)
1237 {
1238 int node;
1239
1240 if (!wq->task)
1241 return;
1242
1243 io_wq_cancel_tw_create(wq);
1244
1245 rcu_read_lock();
1246 for_each_node(node) {
1247 struct io_wqe *wqe = wq->wqes[node];
1248
1249 io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
1250 }
1251 rcu_read_unlock();
1252 io_worker_ref_put(wq);
1253 wait_for_completion(&wq->worker_done);
1254
1255 for_each_node(node) {
1256 spin_lock_irq(&wq->hash->wait.lock);
1257 list_del_init(&wq->wqes[node]->wait.entry);
1258 spin_unlock_irq(&wq->hash->wait.lock);
1259 }
1260 put_task_struct(wq->task);
1261 wq->task = NULL;
1262 }
1263
1264 static void io_wq_destroy(struct io_wq *wq)
1265 {
1266 int node;
1267
1268 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1269
1270 for_each_node(node) {
1271 struct io_wqe *wqe = wq->wqes[node];
1272 struct io_cb_cancel_data match = {
1273 .fn = io_wq_work_match_all,
1274 .cancel_all = true,
1275 };
1276 io_wqe_cancel_pending_work(wqe, &match);
1277 free_cpumask_var(wqe->cpu_mask);
1278 kfree(wqe);
1279 }
1280 io_wq_put_hash(wq->hash);
1281 kfree(wq);
1282 }
1283
1284 void io_wq_put_and_exit(struct io_wq *wq)
1285 {
1286 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));
1287
1288 io_wq_exit_workers(wq);
1289 io_wq_destroy(wq);
1290 }
1291
1292 struct online_data {
1293 unsigned int cpu;
1294 bool online;
1295 };
1296
1297 static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
1298 {
1299 struct online_data *od = data;
1300
1301 if (od->online)
1302 cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
1303 else
1304 cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
1305 return false;
1306 }
1307
1308 static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1309 {
1310 struct online_data od = {
1311 .cpu = cpu,
1312 .online = online
1313 };
1314 int i;
1315
1316 rcu_read_lock();
1317 for_each_node(i)
1318 io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
1319 rcu_read_unlock();
1320 return 0;
1321 }
1322
1323 static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
1324 {
1325 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1326
1327 return __io_wq_cpu_online(wq, cpu, true);
1328 }
1329
1330 static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
1331 {
1332 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1333
1334 return __io_wq_cpu_online(wq, cpu, false);
1335 }
1336
1337 int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
1338 {
1339 int i;
1340
1341 rcu_read_lock();
1342 for_each_node(i) {
1343 struct io_wqe *wqe = wq->wqes[i];
1344
1345 if (mask)
1346 cpumask_copy(wqe->cpu_mask, mask);
1347 else
1348 cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
1349 }
1350 rcu_read_unlock();
1351 return 0;
1352 }
1353
1354
1355
1356
1357
1358 int io_wq_max_workers(struct io_wq *wq, int *new_count)
1359 {
1360 int prev[IO_WQ_ACCT_NR];
1361 bool first_node = true;
1362 int i, node;
1363
1364 BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND);
1365 BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
1366 BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2);
1367
1368 for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1369 if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
1370 new_count[i] = task_rlimit(current, RLIMIT_NPROC);
1371 }
1372
1373 for (i = 0; i < IO_WQ_ACCT_NR; i++)
1374 prev[i] = 0;
1375
1376 rcu_read_lock();
1377 for_each_node(node) {
1378 struct io_wqe *wqe = wq->wqes[node];
1379 struct io_wqe_acct *acct;
1380
1381 raw_spin_lock(&wqe->lock);
1382 for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1383 acct = &wqe->acct[i];
1384 if (first_node)
1385 prev[i] = max_t(int, acct->max_workers, prev[i]);
1386 if (new_count[i])
1387 acct->max_workers = new_count[i];
1388 }
1389 raw_spin_unlock(&wqe->lock);
1390 first_node = false;
1391 }
1392 rcu_read_unlock();
1393
1394 for (i = 0; i < IO_WQ_ACCT_NR; i++)
1395 new_count[i] = prev[i];
1396
1397 return 0;
1398 }
1399
1400 static __init int io_wq_init(void)
1401 {
1402 int ret;
1403
1404 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
1405 io_wq_cpu_online, io_wq_cpu_offline);
1406 if (ret < 0)
1407 return ret;
1408 io_wq_online = ret;
1409 return 0;
1410 }
1411 subsys_initcall(io_wq_init);