Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0
0002 /*
0003  * Shared application/kernel submission and completion ring pairs, for
0004  * supporting fast/efficient IO.
0005  *
0006  * A note on the read/write ordering memory barriers that are matched between
0007  * the application and kernel side.
0008  *
0009  * After the application reads the CQ ring tail, it must use an
0010  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
0011  * before writing the tail (using smp_load_acquire to read the tail will
0012  * do). It also needs a smp_mb() before updating CQ head (ordering the
0013  * entry load(s) with the head store), pairing with an implicit barrier
0014  * through a control-dependency in io_get_cqe (smp_store_release to
0015  * store head will do). Failure to do so could lead to reading invalid
0016  * CQ entries.
0017  *
0018  * Likewise, the application must use an appropriate smp_wmb() before
0019  * writing the SQ tail (ordering SQ entry stores with the tail store),
0020  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
0021  * to store the tail will do). And it needs a barrier ordering the SQ
0022  * head load before writing new SQ entries (smp_load_acquire to read
0023  * head will do).
0024  *
0025  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
0026  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
0027  * updating the SQ tail; a full memory barrier smp_mb() is needed
0028  * between.
0029  *
0030  * Also see the examples in the liburing library:
0031  *
0032  *  git://git.kernel.dk/liburing
0033  *
0034  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
0035  * from data shared between the kernel and application. This is done both
0036  * for ordering purposes, but also to ensure that once a value is loaded from
0037  * data that the application could potentially modify, it remains stable.
0038  *
0039  * Copyright (C) 2018-2019 Jens Axboe
0040  * Copyright (c) 2018-2019 Christoph Hellwig
0041  */
0042 #include <linux/kernel.h>
0043 #include <linux/init.h>
0044 #include <linux/errno.h>
0045 #include <linux/syscalls.h>
0046 #include <net/compat.h>
0047 #include <linux/refcount.h>
0048 #include <linux/uio.h>
0049 #include <linux/bits.h>
0050 
0051 #include <linux/sched/signal.h>
0052 #include <linux/fs.h>
0053 #include <linux/file.h>
0054 #include <linux/fdtable.h>
0055 #include <linux/mm.h>
0056 #include <linux/mman.h>
0057 #include <linux/percpu.h>
0058 #include <linux/slab.h>
0059 #include <linux/bvec.h>
0060 #include <linux/net.h>
0061 #include <net/sock.h>
0062 #include <net/af_unix.h>
0063 #include <net/scm.h>
0064 #include <linux/anon_inodes.h>
0065 #include <linux/sched/mm.h>
0066 #include <linux/uaccess.h>
0067 #include <linux/nospec.h>
0068 #include <linux/highmem.h>
0069 #include <linux/fsnotify.h>
0070 #include <linux/fadvise.h>
0071 #include <linux/task_work.h>
0072 #include <linux/io_uring.h>
0073 #include <linux/audit.h>
0074 #include <linux/security.h>
0075 
0076 #define CREATE_TRACE_POINTS
0077 #include <trace/events/io_uring.h>
0078 
0079 #include <uapi/linux/io_uring.h>
0080 
0081 #include "io-wq.h"
0082 
0083 #include "io_uring.h"
0084 #include "opdef.h"
0085 #include "refs.h"
0086 #include "tctx.h"
0087 #include "sqpoll.h"
0088 #include "fdinfo.h"
0089 #include "kbuf.h"
0090 #include "rsrc.h"
0091 #include "cancel.h"
0092 #include "net.h"
0093 #include "notif.h"
0094 
0095 #include "timeout.h"
0096 #include "poll.h"
0097 #include "alloc_cache.h"
0098 
0099 #define IORING_MAX_ENTRIES  32768
0100 #define IORING_MAX_CQ_ENTRIES   (2 * IORING_MAX_ENTRIES)
0101 
0102 #define IORING_MAX_RESTRICTIONS (IORING_RESTRICTION_LAST + \
0103                  IORING_REGISTER_LAST + IORING_OP_LAST)
0104 
0105 #define SQE_COMMON_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_LINK | \
0106               IOSQE_IO_HARDLINK | IOSQE_ASYNC)
0107 
0108 #define SQE_VALID_FLAGS (SQE_COMMON_FLAGS | IOSQE_BUFFER_SELECT | \
0109             IOSQE_IO_DRAIN | IOSQE_CQE_SKIP_SUCCESS)
0110 
0111 #define IO_REQ_CLEAN_FLAGS (REQ_F_BUFFER_SELECTED | REQ_F_NEED_CLEANUP | \
0112                 REQ_F_POLLED | REQ_F_INFLIGHT | REQ_F_CREDS | \
0113                 REQ_F_ASYNC_DATA)
0114 
0115 #define IO_REQ_CLEAN_SLOW_FLAGS (REQ_F_REFCOUNT | REQ_F_LINK | REQ_F_HARDLINK |\
0116                  IO_REQ_CLEAN_FLAGS)
0117 
0118 #define IO_TCTX_REFS_CACHE_NR   (1U << 10)
0119 
0120 #define IO_COMPL_BATCH          32
0121 #define IO_REQ_ALLOC_BATCH      8
0122 
0123 enum {
0124     IO_CHECK_CQ_OVERFLOW_BIT,
0125     IO_CHECK_CQ_DROPPED_BIT,
0126 };
0127 
0128 struct io_defer_entry {
0129     struct list_head    list;
0130     struct io_kiocb     *req;
0131     u32         seq;
0132 };
0133 
0134 /* requests with any of those set should undergo io_disarm_next() */
0135 #define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
0136 #define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK)
0137 
0138 static bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
0139                      struct task_struct *task,
0140                      bool cancel_all);
0141 
0142 static void io_dismantle_req(struct io_kiocb *req);
0143 static void io_clean_op(struct io_kiocb *req);
0144 static void io_queue_sqe(struct io_kiocb *req);
0145 
0146 static void __io_submit_flush_completions(struct io_ring_ctx *ctx);
0147 
0148 static struct kmem_cache *req_cachep;
0149 
0150 struct sock *io_uring_get_socket(struct file *file)
0151 {
0152 #if defined(CONFIG_UNIX)
0153     if (io_is_uring_fops(file)) {
0154         struct io_ring_ctx *ctx = file->private_data;
0155 
0156         return ctx->ring_sock->sk;
0157     }
0158 #endif
0159     return NULL;
0160 }
0161 EXPORT_SYMBOL(io_uring_get_socket);
0162 
0163 static inline void io_submit_flush_completions(struct io_ring_ctx *ctx)
0164 {
0165     if (!wq_list_empty(&ctx->submit_state.compl_reqs))
0166         __io_submit_flush_completions(ctx);
0167 }
0168 
0169 static inline unsigned int __io_cqring_events(struct io_ring_ctx *ctx)
0170 {
0171     return ctx->cached_cq_tail - READ_ONCE(ctx->rings->cq.head);
0172 }
0173 
0174 static bool io_match_linked(struct io_kiocb *head)
0175 {
0176     struct io_kiocb *req;
0177 
0178     io_for_each_link(req, head) {
0179         if (req->flags & REQ_F_INFLIGHT)
0180             return true;
0181     }
0182     return false;
0183 }
0184 
0185 /*
0186  * As io_match_task() but protected against racing with linked timeouts.
0187  * User must not hold timeout_lock.
0188  */
0189 bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task,
0190             bool cancel_all)
0191 {
0192     bool matched;
0193 
0194     if (task && head->task != task)
0195         return false;
0196     if (cancel_all)
0197         return true;
0198 
0199     if (head->flags & REQ_F_LINK_TIMEOUT) {
0200         struct io_ring_ctx *ctx = head->ctx;
0201 
0202         /* protect against races with linked timeouts */
0203         spin_lock_irq(&ctx->timeout_lock);
0204         matched = io_match_linked(head);
0205         spin_unlock_irq(&ctx->timeout_lock);
0206     } else {
0207         matched = io_match_linked(head);
0208     }
0209     return matched;
0210 }
0211 
0212 static inline void req_fail_link_node(struct io_kiocb *req, int res)
0213 {
0214     req_set_fail(req);
0215     io_req_set_res(req, res, 0);
0216 }
0217 
0218 static inline void io_req_add_to_cache(struct io_kiocb *req, struct io_ring_ctx *ctx)
0219 {
0220     wq_stack_add_head(&req->comp_list, &ctx->submit_state.free_list);
0221 }
0222 
0223 static __cold void io_ring_ctx_ref_free(struct percpu_ref *ref)
0224 {
0225     struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
0226 
0227     complete(&ctx->ref_comp);
0228 }
0229 
0230 static __cold void io_fallback_req_func(struct work_struct *work)
0231 {
0232     struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx,
0233                         fallback_work.work);
0234     struct llist_node *node = llist_del_all(&ctx->fallback_llist);
0235     struct io_kiocb *req, *tmp;
0236     bool locked = false;
0237 
0238     percpu_ref_get(&ctx->refs);
0239     llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
0240         req->io_task_work.func(req, &locked);
0241 
0242     if (locked) {
0243         io_submit_flush_completions(ctx);
0244         mutex_unlock(&ctx->uring_lock);
0245     }
0246     percpu_ref_put(&ctx->refs);
0247 }
0248 
0249 static int io_alloc_hash_table(struct io_hash_table *table, unsigned bits)
0250 {
0251     unsigned hash_buckets = 1U << bits;
0252     size_t hash_size = hash_buckets * sizeof(table->hbs[0]);
0253 
0254     table->hbs = kmalloc(hash_size, GFP_KERNEL);
0255     if (!table->hbs)
0256         return -ENOMEM;
0257 
0258     table->hash_bits = bits;
0259     init_hash_table(table, hash_buckets);
0260     return 0;
0261 }
0262 
0263 static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
0264 {
0265     struct io_ring_ctx *ctx;
0266     int hash_bits;
0267 
0268     ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
0269     if (!ctx)
0270         return NULL;
0271 
0272     xa_init(&ctx->io_bl_xa);
0273 
0274     /*
0275      * Use 5 bits less than the max cq entries, that should give us around
0276      * 32 entries per hash list if totally full and uniformly spread, but
0277      * don't keep too many buckets to not overconsume memory.
0278      */
0279     hash_bits = ilog2(p->cq_entries) - 5;
0280     hash_bits = clamp(hash_bits, 1, 8);
0281     if (io_alloc_hash_table(&ctx->cancel_table, hash_bits))
0282         goto err;
0283     if (io_alloc_hash_table(&ctx->cancel_table_locked, hash_bits))
0284         goto err;
0285 
0286     ctx->dummy_ubuf = kzalloc(sizeof(*ctx->dummy_ubuf), GFP_KERNEL);
0287     if (!ctx->dummy_ubuf)
0288         goto err;
0289     /* set invalid range, so io_import_fixed() fails meeting it */
0290     ctx->dummy_ubuf->ubuf = -1UL;
0291 
0292     if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
0293                 0, GFP_KERNEL))
0294         goto err;
0295 
0296     ctx->flags = p->flags;
0297     init_waitqueue_head(&ctx->sqo_sq_wait);
0298     INIT_LIST_HEAD(&ctx->sqd_list);
0299     INIT_LIST_HEAD(&ctx->cq_overflow_list);
0300     INIT_LIST_HEAD(&ctx->io_buffers_cache);
0301     io_alloc_cache_init(&ctx->apoll_cache);
0302     io_alloc_cache_init(&ctx->netmsg_cache);
0303     init_completion(&ctx->ref_comp);
0304     xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
0305     mutex_init(&ctx->uring_lock);
0306     init_waitqueue_head(&ctx->cq_wait);
0307     spin_lock_init(&ctx->completion_lock);
0308     spin_lock_init(&ctx->timeout_lock);
0309     INIT_WQ_LIST(&ctx->iopoll_list);
0310     INIT_LIST_HEAD(&ctx->io_buffers_pages);
0311     INIT_LIST_HEAD(&ctx->io_buffers_comp);
0312     INIT_LIST_HEAD(&ctx->defer_list);
0313     INIT_LIST_HEAD(&ctx->timeout_list);
0314     INIT_LIST_HEAD(&ctx->ltimeout_list);
0315     spin_lock_init(&ctx->rsrc_ref_lock);
0316     INIT_LIST_HEAD(&ctx->rsrc_ref_list);
0317     INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work);
0318     init_llist_head(&ctx->rsrc_put_llist);
0319     INIT_LIST_HEAD(&ctx->tctx_list);
0320     ctx->submit_state.free_list.next = NULL;
0321     INIT_WQ_LIST(&ctx->locked_free_list);
0322     INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
0323     INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
0324     return ctx;
0325 err:
0326     kfree(ctx->dummy_ubuf);
0327     kfree(ctx->cancel_table.hbs);
0328     kfree(ctx->cancel_table_locked.hbs);
0329     kfree(ctx->io_bl);
0330     xa_destroy(&ctx->io_bl_xa);
0331     kfree(ctx);
0332     return NULL;
0333 }
0334 
0335 static void io_account_cq_overflow(struct io_ring_ctx *ctx)
0336 {
0337     struct io_rings *r = ctx->rings;
0338 
0339     WRITE_ONCE(r->cq_overflow, READ_ONCE(r->cq_overflow) + 1);
0340     ctx->cq_extra--;
0341 }
0342 
0343 static bool req_need_defer(struct io_kiocb *req, u32 seq)
0344 {
0345     if (unlikely(req->flags & REQ_F_IO_DRAIN)) {
0346         struct io_ring_ctx *ctx = req->ctx;
0347 
0348         return seq + READ_ONCE(ctx->cq_extra) != ctx->cached_cq_tail;
0349     }
0350 
0351     return false;
0352 }
0353 
0354 static inline void io_req_track_inflight(struct io_kiocb *req)
0355 {
0356     if (!(req->flags & REQ_F_INFLIGHT)) {
0357         req->flags |= REQ_F_INFLIGHT;
0358         atomic_inc(&req->task->io_uring->inflight_tracked);
0359     }
0360 }
0361 
0362 static struct io_kiocb *__io_prep_linked_timeout(struct io_kiocb *req)
0363 {
0364     if (WARN_ON_ONCE(!req->link))
0365         return NULL;
0366 
0367     req->flags &= ~REQ_F_ARM_LTIMEOUT;
0368     req->flags |= REQ_F_LINK_TIMEOUT;
0369 
0370     /* linked timeouts should have two refs once prep'ed */
0371     io_req_set_refcount(req);
0372     __io_req_set_refcount(req->link, 2);
0373     return req->link;
0374 }
0375 
0376 static inline struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
0377 {
0378     if (likely(!(req->flags & REQ_F_ARM_LTIMEOUT)))
0379         return NULL;
0380     return __io_prep_linked_timeout(req);
0381 }
0382 
0383 static noinline void __io_arm_ltimeout(struct io_kiocb *req)
0384 {
0385     io_queue_linked_timeout(__io_prep_linked_timeout(req));
0386 }
0387 
0388 static inline void io_arm_ltimeout(struct io_kiocb *req)
0389 {
0390     if (unlikely(req->flags & REQ_F_ARM_LTIMEOUT))
0391         __io_arm_ltimeout(req);
0392 }
0393 
0394 static void io_prep_async_work(struct io_kiocb *req)
0395 {
0396     const struct io_op_def *def = &io_op_defs[req->opcode];
0397     struct io_ring_ctx *ctx = req->ctx;
0398 
0399     if (!(req->flags & REQ_F_CREDS)) {
0400         req->flags |= REQ_F_CREDS;
0401         req->creds = get_current_cred();
0402     }
0403 
0404     req->work.list.next = NULL;
0405     req->work.flags = 0;
0406     req->work.cancel_seq = atomic_read(&ctx->cancel_seq);
0407     if (req->flags & REQ_F_FORCE_ASYNC)
0408         req->work.flags |= IO_WQ_WORK_CONCURRENT;
0409 
0410     if (req->file && !io_req_ffs_set(req))
0411         req->flags |= io_file_get_flags(req->file) << REQ_F_SUPPORT_NOWAIT_BIT;
0412 
0413     if (req->flags & REQ_F_ISREG) {
0414         if (def->hash_reg_file || (ctx->flags & IORING_SETUP_IOPOLL))
0415             io_wq_hash_work(&req->work, file_inode(req->file));
0416     } else if (!req->file || !S_ISBLK(file_inode(req->file)->i_mode)) {
0417         if (def->unbound_nonreg_file)
0418             req->work.flags |= IO_WQ_WORK_UNBOUND;
0419     }
0420 }
0421 
0422 static void io_prep_async_link(struct io_kiocb *req)
0423 {
0424     struct io_kiocb *cur;
0425 
0426     if (req->flags & REQ_F_LINK_TIMEOUT) {
0427         struct io_ring_ctx *ctx = req->ctx;
0428 
0429         spin_lock_irq(&ctx->timeout_lock);
0430         io_for_each_link(cur, req)
0431             io_prep_async_work(cur);
0432         spin_unlock_irq(&ctx->timeout_lock);
0433     } else {
0434         io_for_each_link(cur, req)
0435             io_prep_async_work(cur);
0436     }
0437 }
0438 
0439 void io_queue_iowq(struct io_kiocb *req, bool *dont_use)
0440 {
0441     struct io_kiocb *link = io_prep_linked_timeout(req);
0442     struct io_uring_task *tctx = req->task->io_uring;
0443 
0444     BUG_ON(!tctx);
0445     BUG_ON(!tctx->io_wq);
0446 
0447     /* init ->work of the whole link before punting */
0448     io_prep_async_link(req);
0449 
0450     /*
0451      * Not expected to happen, but if we do have a bug where this _can_
0452      * happen, catch it here and ensure the request is marked as
0453      * canceled. That will make io-wq go through the usual work cancel
0454      * procedure rather than attempt to run this request (or create a new
0455      * worker for it).
0456      */
0457     if (WARN_ON_ONCE(!same_thread_group(req->task, current)))
0458         req->work.flags |= IO_WQ_WORK_CANCEL;
0459 
0460     trace_io_uring_queue_async_work(req, io_wq_is_hashed(&req->work));
0461     io_wq_enqueue(tctx->io_wq, &req->work);
0462     if (link)
0463         io_queue_linked_timeout(link);
0464 }
0465 
0466 static __cold void io_queue_deferred(struct io_ring_ctx *ctx)
0467 {
0468     while (!list_empty(&ctx->defer_list)) {
0469         struct io_defer_entry *de = list_first_entry(&ctx->defer_list,
0470                         struct io_defer_entry, list);
0471 
0472         if (req_need_defer(de->req, de->seq))
0473             break;
0474         list_del_init(&de->list);
0475         io_req_task_queue(de->req);
0476         kfree(de);
0477     }
0478 }
0479 
0480 static void io_eventfd_signal(struct io_ring_ctx *ctx)
0481 {
0482     struct io_ev_fd *ev_fd;
0483     bool skip;
0484 
0485     spin_lock(&ctx->completion_lock);
0486     /*
0487      * Eventfd should only get triggered when at least one event has been
0488      * posted. Some applications rely on the eventfd notification count only
0489      * changing IFF a new CQE has been added to the CQ ring. There's no
0490      * depedency on 1:1 relationship between how many times this function is
0491      * called (and hence the eventfd count) and number of CQEs posted to the
0492      * CQ ring.
0493      */
0494     skip = ctx->cached_cq_tail == ctx->evfd_last_cq_tail;
0495     ctx->evfd_last_cq_tail = ctx->cached_cq_tail;
0496     spin_unlock(&ctx->completion_lock);
0497     if (skip)
0498         return;
0499 
0500     rcu_read_lock();
0501     /*
0502      * rcu_dereference ctx->io_ev_fd once and use it for both for checking
0503      * and eventfd_signal
0504      */
0505     ev_fd = rcu_dereference(ctx->io_ev_fd);
0506 
0507     /*
0508      * Check again if ev_fd exists incase an io_eventfd_unregister call
0509      * completed between the NULL check of ctx->io_ev_fd at the start of
0510      * the function and rcu_read_lock.
0511      */
0512     if (unlikely(!ev_fd))
0513         goto out;
0514     if (READ_ONCE(ctx->rings->cq_flags) & IORING_CQ_EVENTFD_DISABLED)
0515         goto out;
0516 
0517     if (!ev_fd->eventfd_async || io_wq_current_is_worker())
0518         eventfd_signal(ev_fd->cq_ev_fd, 1);
0519 out:
0520     rcu_read_unlock();
0521 }
0522 
0523 void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
0524 {
0525     if (ctx->off_timeout_used || ctx->drain_active) {
0526         spin_lock(&ctx->completion_lock);
0527         if (ctx->off_timeout_used)
0528             io_flush_timeouts(ctx);
0529         if (ctx->drain_active)
0530             io_queue_deferred(ctx);
0531         spin_unlock(&ctx->completion_lock);
0532     }
0533     if (ctx->has_evfd)
0534         io_eventfd_signal(ctx);
0535 }
0536 
0537 static inline void io_cqring_ev_posted(struct io_ring_ctx *ctx)
0538 {
0539     io_commit_cqring_flush(ctx);
0540     io_cqring_wake(ctx);
0541 }
0542 
0543 static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
0544     __releases(ctx->completion_lock)
0545 {
0546     io_commit_cqring(ctx);
0547     spin_unlock(&ctx->completion_lock);
0548     io_cqring_ev_posted(ctx);
0549 }
0550 
0551 void io_cq_unlock_post(struct io_ring_ctx *ctx)
0552 {
0553     __io_cq_unlock_post(ctx);
0554 }
0555 
0556 /* Returns true if there are no backlogged entries after the flush */
0557 static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
0558 {
0559     bool all_flushed;
0560     size_t cqe_size = sizeof(struct io_uring_cqe);
0561 
0562     if (!force && __io_cqring_events(ctx) == ctx->cq_entries)
0563         return false;
0564 
0565     if (ctx->flags & IORING_SETUP_CQE32)
0566         cqe_size <<= 1;
0567 
0568     io_cq_lock(ctx);
0569     while (!list_empty(&ctx->cq_overflow_list)) {
0570         struct io_uring_cqe *cqe = io_get_cqe(ctx);
0571         struct io_overflow_cqe *ocqe;
0572 
0573         if (!cqe && !force)
0574             break;
0575         ocqe = list_first_entry(&ctx->cq_overflow_list,
0576                     struct io_overflow_cqe, list);
0577         if (cqe)
0578             memcpy(cqe, &ocqe->cqe, cqe_size);
0579         else
0580             io_account_cq_overflow(ctx);
0581 
0582         list_del(&ocqe->list);
0583         kfree(ocqe);
0584     }
0585 
0586     all_flushed = list_empty(&ctx->cq_overflow_list);
0587     if (all_flushed) {
0588         clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
0589         atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
0590     }
0591 
0592     io_cq_unlock_post(ctx);
0593     return all_flushed;
0594 }
0595 
0596 static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx)
0597 {
0598     bool ret = true;
0599 
0600     if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
0601         /* iopoll syncs against uring_lock, not completion_lock */
0602         if (ctx->flags & IORING_SETUP_IOPOLL)
0603             mutex_lock(&ctx->uring_lock);
0604         ret = __io_cqring_overflow_flush(ctx, false);
0605         if (ctx->flags & IORING_SETUP_IOPOLL)
0606             mutex_unlock(&ctx->uring_lock);
0607     }
0608 
0609     return ret;
0610 }
0611 
0612 void __io_put_task(struct task_struct *task, int nr)
0613 {
0614     struct io_uring_task *tctx = task->io_uring;
0615 
0616     percpu_counter_sub(&tctx->inflight, nr);
0617     if (unlikely(atomic_read(&tctx->in_idle)))
0618         wake_up(&tctx->wait);
0619     put_task_struct_many(task, nr);
0620 }
0621 
0622 void io_task_refs_refill(struct io_uring_task *tctx)
0623 {
0624     unsigned int refill = -tctx->cached_refs + IO_TCTX_REFS_CACHE_NR;
0625 
0626     percpu_counter_add(&tctx->inflight, refill);
0627     refcount_add(refill, &current->usage);
0628     tctx->cached_refs += refill;
0629 }
0630 
0631 static __cold void io_uring_drop_tctx_refs(struct task_struct *task)
0632 {
0633     struct io_uring_task *tctx = task->io_uring;
0634     unsigned int refs = tctx->cached_refs;
0635 
0636     if (refs) {
0637         tctx->cached_refs = 0;
0638         percpu_counter_sub(&tctx->inflight, refs);
0639         put_task_struct_many(task, refs);
0640     }
0641 }
0642 
0643 static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data,
0644                      s32 res, u32 cflags, u64 extra1, u64 extra2)
0645 {
0646     struct io_overflow_cqe *ocqe;
0647     size_t ocq_size = sizeof(struct io_overflow_cqe);
0648     bool is_cqe32 = (ctx->flags & IORING_SETUP_CQE32);
0649 
0650     if (is_cqe32)
0651         ocq_size += sizeof(struct io_uring_cqe);
0652 
0653     ocqe = kmalloc(ocq_size, GFP_ATOMIC | __GFP_ACCOUNT);
0654     trace_io_uring_cqe_overflow(ctx, user_data, res, cflags, ocqe);
0655     if (!ocqe) {
0656         /*
0657          * If we're in ring overflow flush mode, or in task cancel mode,
0658          * or cannot allocate an overflow entry, then we need to drop it
0659          * on the floor.
0660          */
0661         io_account_cq_overflow(ctx);
0662         set_bit(IO_CHECK_CQ_DROPPED_BIT, &ctx->check_cq);
0663         return false;
0664     }
0665     if (list_empty(&ctx->cq_overflow_list)) {
0666         set_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
0667         atomic_or(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
0668 
0669     }
0670     ocqe->cqe.user_data = user_data;
0671     ocqe->cqe.res = res;
0672     ocqe->cqe.flags = cflags;
0673     if (is_cqe32) {
0674         ocqe->cqe.big_cqe[0] = extra1;
0675         ocqe->cqe.big_cqe[1] = extra2;
0676     }
0677     list_add_tail(&ocqe->list, &ctx->cq_overflow_list);
0678     return true;
0679 }
0680 
0681 bool io_req_cqe_overflow(struct io_kiocb *req)
0682 {
0683     if (!(req->flags & REQ_F_CQE32_INIT)) {
0684         req->extra1 = 0;
0685         req->extra2 = 0;
0686     }
0687     return io_cqring_event_overflow(req->ctx, req->cqe.user_data,
0688                     req->cqe.res, req->cqe.flags,
0689                     req->extra1, req->extra2);
0690 }
0691 
0692 /*
0693  * writes to the cq entry need to come after reading head; the
0694  * control dependency is enough as we're using WRITE_ONCE to
0695  * fill the cq entry
0696  */
0697 struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx)
0698 {
0699     struct io_rings *rings = ctx->rings;
0700     unsigned int off = ctx->cached_cq_tail & (ctx->cq_entries - 1);
0701     unsigned int free, queued, len;
0702 
0703 
0704     /* userspace may cheat modifying the tail, be safe and do min */
0705     queued = min(__io_cqring_events(ctx), ctx->cq_entries);
0706     free = ctx->cq_entries - queued;
0707     /* we need a contiguous range, limit based on the current array offset */
0708     len = min(free, ctx->cq_entries - off);
0709     if (!len)
0710         return NULL;
0711 
0712     if (ctx->flags & IORING_SETUP_CQE32) {
0713         off <<= 1;
0714         len <<= 1;
0715     }
0716 
0717     ctx->cqe_cached = &rings->cqes[off];
0718     ctx->cqe_sentinel = ctx->cqe_cached + len;
0719 
0720     ctx->cached_cq_tail++;
0721     ctx->cqe_cached++;
0722     if (ctx->flags & IORING_SETUP_CQE32)
0723         ctx->cqe_cached++;
0724     return &rings->cqes[off];
0725 }
0726 
0727 bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags,
0728              bool allow_overflow)
0729 {
0730     struct io_uring_cqe *cqe;
0731 
0732     ctx->cq_extra++;
0733 
0734     /*
0735      * If we can't get a cq entry, userspace overflowed the
0736      * submission (by quite a lot). Increment the overflow count in
0737      * the ring.
0738      */
0739     cqe = io_get_cqe(ctx);
0740     if (likely(cqe)) {
0741         trace_io_uring_complete(ctx, NULL, user_data, res, cflags, 0, 0);
0742 
0743         WRITE_ONCE(cqe->user_data, user_data);
0744         WRITE_ONCE(cqe->res, res);
0745         WRITE_ONCE(cqe->flags, cflags);
0746 
0747         if (ctx->flags & IORING_SETUP_CQE32) {
0748             WRITE_ONCE(cqe->big_cqe[0], 0);
0749             WRITE_ONCE(cqe->big_cqe[1], 0);
0750         }
0751         return true;
0752     }
0753 
0754     if (allow_overflow)
0755         return io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);
0756 
0757     return false;
0758 }
0759 
0760 bool io_post_aux_cqe(struct io_ring_ctx *ctx,
0761              u64 user_data, s32 res, u32 cflags,
0762              bool allow_overflow)
0763 {
0764     bool filled;
0765 
0766     io_cq_lock(ctx);
0767     filled = io_fill_cqe_aux(ctx, user_data, res, cflags, allow_overflow);
0768     io_cq_unlock_post(ctx);
0769     return filled;
0770 }
0771 
0772 static void __io_req_complete_put(struct io_kiocb *req)
0773 {
0774     /*
0775      * If we're the last reference to this request, add to our locked
0776      * free_list cache.
0777      */
0778     if (req_ref_put_and_test(req)) {
0779         struct io_ring_ctx *ctx = req->ctx;
0780 
0781         if (req->flags & IO_REQ_LINK_FLAGS) {
0782             if (req->flags & IO_DISARM_MASK)
0783                 io_disarm_next(req);
0784             if (req->link) {
0785                 io_req_task_queue(req->link);
0786                 req->link = NULL;
0787             }
0788         }
0789         io_req_put_rsrc(req);
0790         /*
0791          * Selected buffer deallocation in io_clean_op() assumes that
0792          * we don't hold ->completion_lock. Clean them here to avoid
0793          * deadlocks.
0794          */
0795         io_put_kbuf_comp(req);
0796         io_dismantle_req(req);
0797         io_put_task(req->task, 1);
0798         wq_list_add_head(&req->comp_list, &ctx->locked_free_list);
0799         ctx->locked_free_nr++;
0800     }
0801 }
0802 
0803 void __io_req_complete_post(struct io_kiocb *req)
0804 {
0805     if (!(req->flags & REQ_F_CQE_SKIP))
0806         __io_fill_cqe_req(req->ctx, req);
0807     __io_req_complete_put(req);
0808 }
0809 
0810 void io_req_complete_post(struct io_kiocb *req)
0811 {
0812     struct io_ring_ctx *ctx = req->ctx;
0813 
0814     io_cq_lock(ctx);
0815     __io_req_complete_post(req);
0816     io_cq_unlock_post(ctx);
0817 }
0818 
0819 inline void __io_req_complete(struct io_kiocb *req, unsigned issue_flags)
0820 {
0821     io_req_complete_post(req);
0822 }
0823 
0824 void io_req_complete_failed(struct io_kiocb *req, s32 res)
0825 {
0826     req_set_fail(req);
0827     io_req_set_res(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED));
0828     io_req_complete_post(req);
0829 }
0830 
0831 /*
0832  * Don't initialise the fields below on every allocation, but do that in
0833  * advance and keep them valid across allocations.
0834  */
0835 static void io_preinit_req(struct io_kiocb *req, struct io_ring_ctx *ctx)
0836 {
0837     req->ctx = ctx;
0838     req->link = NULL;
0839     req->async_data = NULL;
0840     /* not necessary, but safer to zero */
0841     req->cqe.res = 0;
0842 }
0843 
0844 static void io_flush_cached_locked_reqs(struct io_ring_ctx *ctx,
0845                     struct io_submit_state *state)
0846 {
0847     spin_lock(&ctx->completion_lock);
0848     wq_list_splice(&ctx->locked_free_list, &state->free_list);
0849     ctx->locked_free_nr = 0;
0850     spin_unlock(&ctx->completion_lock);
0851 }
0852 
0853 /*
0854  * A request might get retired back into the request caches even before opcode
0855  * handlers and io_issue_sqe() are done with it, e.g. inline completion path.
0856  * Because of that, io_alloc_req() should be called only under ->uring_lock
0857  * and with extra caution to not get a request that is still worked on.
0858  */
0859 __cold bool __io_alloc_req_refill(struct io_ring_ctx *ctx)
0860     __must_hold(&ctx->uring_lock)
0861 {
0862     gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
0863     void *reqs[IO_REQ_ALLOC_BATCH];
0864     int ret, i;
0865 
0866     /*
0867      * If we have more than a batch's worth of requests in our IRQ side
0868      * locked cache, grab the lock and move them over to our submission
0869      * side cache.
0870      */
0871     if (data_race(ctx->locked_free_nr) > IO_COMPL_BATCH) {
0872         io_flush_cached_locked_reqs(ctx, &ctx->submit_state);
0873         if (!io_req_cache_empty(ctx))
0874             return true;
0875     }
0876 
0877     ret = kmem_cache_alloc_bulk(req_cachep, gfp, ARRAY_SIZE(reqs), reqs);
0878 
0879     /*
0880      * Bulk alloc is all-or-nothing. If we fail to get a batch,
0881      * retry single alloc to be on the safe side.
0882      */
0883     if (unlikely(ret <= 0)) {
0884         reqs[0] = kmem_cache_alloc(req_cachep, gfp);
0885         if (!reqs[0])
0886             return false;
0887         ret = 1;
0888     }
0889 
0890     percpu_ref_get_many(&ctx->refs, ret);
0891     for (i = 0; i < ret; i++) {
0892         struct io_kiocb *req = reqs[i];
0893 
0894         io_preinit_req(req, ctx);
0895         io_req_add_to_cache(req, ctx);
0896     }
0897     return true;
0898 }
0899 
0900 static inline void io_dismantle_req(struct io_kiocb *req)
0901 {
0902     unsigned int flags = req->flags;
0903 
0904     if (unlikely(flags & IO_REQ_CLEAN_FLAGS))
0905         io_clean_op(req);
0906     if (!(flags & REQ_F_FIXED_FILE))
0907         io_put_file(req->file);
0908 }
0909 
0910 __cold void io_free_req(struct io_kiocb *req)
0911 {
0912     struct io_ring_ctx *ctx = req->ctx;
0913 
0914     io_req_put_rsrc(req);
0915     io_dismantle_req(req);
0916     io_put_task(req->task, 1);
0917 
0918     spin_lock(&ctx->completion_lock);
0919     wq_list_add_head(&req->comp_list, &ctx->locked_free_list);
0920     ctx->locked_free_nr++;
0921     spin_unlock(&ctx->completion_lock);
0922 }
0923 
0924 static void __io_req_find_next_prep(struct io_kiocb *req)
0925 {
0926     struct io_ring_ctx *ctx = req->ctx;
0927 
0928     io_cq_lock(ctx);
0929     io_disarm_next(req);
0930     io_cq_unlock_post(ctx);
0931 }
0932 
0933 static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
0934 {
0935     struct io_kiocb *nxt;
0936 
0937     /*
0938      * If LINK is set, we have dependent requests in this chain. If we
0939      * didn't fail this request, queue the first one up, moving any other
0940      * dependencies to the next request. In case of failure, fail the rest
0941      * of the chain.
0942      */
0943     if (unlikely(req->flags & IO_DISARM_MASK))
0944         __io_req_find_next_prep(req);
0945     nxt = req->link;
0946     req->link = NULL;
0947     return nxt;
0948 }
0949 
0950 static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked)
0951 {
0952     if (!ctx)
0953         return;
0954     if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
0955         atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
0956     if (*locked) {
0957         io_submit_flush_completions(ctx);
0958         mutex_unlock(&ctx->uring_lock);
0959         *locked = false;
0960     }
0961     percpu_ref_put(&ctx->refs);
0962 }
0963 
0964 static unsigned int handle_tw_list(struct llist_node *node,
0965                    struct io_ring_ctx **ctx, bool *locked,
0966                    struct llist_node *last)
0967 {
0968     unsigned int count = 0;
0969 
0970     while (node != last) {
0971         struct llist_node *next = node->next;
0972         struct io_kiocb *req = container_of(node, struct io_kiocb,
0973                             io_task_work.node);
0974 
0975         prefetch(container_of(next, struct io_kiocb, io_task_work.node));
0976 
0977         if (req->ctx != *ctx) {
0978             ctx_flush_and_put(*ctx, locked);
0979             *ctx = req->ctx;
0980             /* if not contended, grab and improve batching */
0981             *locked = mutex_trylock(&(*ctx)->uring_lock);
0982             percpu_ref_get(&(*ctx)->refs);
0983         }
0984         req->io_task_work.func(req, locked);
0985         node = next;
0986         count++;
0987     }
0988 
0989     return count;
0990 }
0991 
0992 /**
0993  * io_llist_xchg - swap all entries in a lock-less list
0994  * @head:   the head of lock-less list to delete all entries
0995  * @new:    new entry as the head of the list
0996  *
0997  * If list is empty, return NULL, otherwise, return the pointer to the first entry.
0998  * The order of entries returned is from the newest to the oldest added one.
0999  */
1000 static inline struct llist_node *io_llist_xchg(struct llist_head *head,
1001                            struct llist_node *new)
1002 {
1003     return xchg(&head->first, new);
1004 }
1005 
1006 /**
1007  * io_llist_cmpxchg - possibly swap all entries in a lock-less list
1008  * @head:   the head of lock-less list to delete all entries
1009  * @old:    expected old value of the first entry of the list
1010  * @new:    new entry as the head of the list
1011  *
1012  * perform a cmpxchg on the first entry of the list.
1013  */
1014 
1015 static inline struct llist_node *io_llist_cmpxchg(struct llist_head *head,
1016                           struct llist_node *old,
1017                           struct llist_node *new)
1018 {
1019     return cmpxchg(&head->first, old, new);
1020 }
1021 
1022 void tctx_task_work(struct callback_head *cb)
1023 {
1024     bool uring_locked = false;
1025     struct io_ring_ctx *ctx = NULL;
1026     struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
1027                           task_work);
1028     struct llist_node fake = {};
1029     struct llist_node *node = io_llist_xchg(&tctx->task_list, &fake);
1030     unsigned int loops = 1;
1031     unsigned int count = handle_tw_list(node, &ctx, &uring_locked, NULL);
1032 
1033     node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL);
1034     while (node != &fake) {
1035         loops++;
1036         node = io_llist_xchg(&tctx->task_list, &fake);
1037         count += handle_tw_list(node, &ctx, &uring_locked, &fake);
1038         node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL);
1039     }
1040 
1041     ctx_flush_and_put(ctx, &uring_locked);
1042 
1043     /* relaxed read is enough as only the task itself sets ->in_idle */
1044     if (unlikely(atomic_read(&tctx->in_idle)))
1045         io_uring_drop_tctx_refs(current);
1046 
1047     trace_io_uring_task_work_run(tctx, count, loops);
1048 }
1049 
1050 void io_req_task_work_add(struct io_kiocb *req)
1051 {
1052     struct io_uring_task *tctx = req->task->io_uring;
1053     struct io_ring_ctx *ctx = req->ctx;
1054     struct llist_node *node;
1055     bool running;
1056 
1057     running = !llist_add(&req->io_task_work.node, &tctx->task_list);
1058 
1059     /* task_work already pending, we're done */
1060     if (running)
1061         return;
1062 
1063     if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
1064         atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
1065 
1066     if (likely(!task_work_add(req->task, &tctx->task_work, ctx->notify_method)))
1067         return;
1068 
1069     node = llist_del_all(&tctx->task_list);
1070 
1071     while (node) {
1072         req = container_of(node, struct io_kiocb, io_task_work.node);
1073         node = node->next;
1074         if (llist_add(&req->io_task_work.node,
1075                   &req->ctx->fallback_llist))
1076             schedule_delayed_work(&req->ctx->fallback_work, 1);
1077     }
1078 }
1079 
1080 static void io_req_tw_post(struct io_kiocb *req, bool *locked)
1081 {
1082     io_req_complete_post(req);
1083 }
1084 
1085 void io_req_tw_post_queue(struct io_kiocb *req, s32 res, u32 cflags)
1086 {
1087     io_req_set_res(req, res, cflags);
1088     req->io_task_work.func = io_req_tw_post;
1089     io_req_task_work_add(req);
1090 }
1091 
1092 static void io_req_task_cancel(struct io_kiocb *req, bool *locked)
1093 {
1094     /* not needed for normal modes, but SQPOLL depends on it */
1095     io_tw_lock(req->ctx, locked);
1096     io_req_complete_failed(req, req->cqe.res);
1097 }
1098 
1099 void io_req_task_submit(struct io_kiocb *req, bool *locked)
1100 {
1101     io_tw_lock(req->ctx, locked);
1102     /* req->task == current here, checking PF_EXITING is safe */
1103     if (likely(!(req->task->flags & PF_EXITING)))
1104         io_queue_sqe(req);
1105     else
1106         io_req_complete_failed(req, -EFAULT);
1107 }
1108 
1109 void io_req_task_queue_fail(struct io_kiocb *req, int ret)
1110 {
1111     io_req_set_res(req, ret, 0);
1112     req->io_task_work.func = io_req_task_cancel;
1113     io_req_task_work_add(req);
1114 }
1115 
1116 void io_req_task_queue(struct io_kiocb *req)
1117 {
1118     req->io_task_work.func = io_req_task_submit;
1119     io_req_task_work_add(req);
1120 }
1121 
1122 void io_queue_next(struct io_kiocb *req)
1123 {
1124     struct io_kiocb *nxt = io_req_find_next(req);
1125 
1126     if (nxt)
1127         io_req_task_queue(nxt);
1128 }
1129 
1130 void io_free_batch_list(struct io_ring_ctx *ctx, struct io_wq_work_node *node)
1131     __must_hold(&ctx->uring_lock)
1132 {
1133     struct task_struct *task = NULL;
1134     int task_refs = 0;
1135 
1136     do {
1137         struct io_kiocb *req = container_of(node, struct io_kiocb,
1138                             comp_list);
1139 
1140         if (unlikely(req->flags & IO_REQ_CLEAN_SLOW_FLAGS)) {
1141             if (req->flags & REQ_F_REFCOUNT) {
1142                 node = req->comp_list.next;
1143                 if (!req_ref_put_and_test(req))
1144                     continue;
1145             }
1146             if ((req->flags & REQ_F_POLLED) && req->apoll) {
1147                 struct async_poll *apoll = req->apoll;
1148 
1149                 if (apoll->double_poll)
1150                     kfree(apoll->double_poll);
1151                 if (!io_alloc_cache_put(&ctx->apoll_cache, &apoll->cache))
1152                     kfree(apoll);
1153                 req->flags &= ~REQ_F_POLLED;
1154             }
1155             if (req->flags & IO_REQ_LINK_FLAGS)
1156                 io_queue_next(req);
1157             if (unlikely(req->flags & IO_REQ_CLEAN_FLAGS))
1158                 io_clean_op(req);
1159         }
1160         if (!(req->flags & REQ_F_FIXED_FILE))
1161             io_put_file(req->file);
1162 
1163         io_req_put_rsrc_locked(req, ctx);
1164 
1165         if (req->task != task) {
1166             if (task)
1167                 io_put_task(task, task_refs);
1168             task = req->task;
1169             task_refs = 0;
1170         }
1171         task_refs++;
1172         node = req->comp_list.next;
1173         io_req_add_to_cache(req, ctx);
1174     } while (node);
1175 
1176     if (task)
1177         io_put_task(task, task_refs);
1178 }
1179 
1180 static void __io_submit_flush_completions(struct io_ring_ctx *ctx)
1181     __must_hold(&ctx->uring_lock)
1182 {
1183     struct io_wq_work_node *node, *prev;
1184     struct io_submit_state *state = &ctx->submit_state;
1185 
1186     spin_lock(&ctx->completion_lock);
1187     wq_list_for_each(node, prev, &state->compl_reqs) {
1188         struct io_kiocb *req = container_of(node, struct io_kiocb,
1189                         comp_list);
1190 
1191         if (!(req->flags & REQ_F_CQE_SKIP))
1192             __io_fill_cqe_req(ctx, req);
1193     }
1194     __io_cq_unlock_post(ctx);
1195 
1196     io_free_batch_list(ctx, state->compl_reqs.first);
1197     INIT_WQ_LIST(&state->compl_reqs);
1198 }
1199 
1200 /*
1201  * Drop reference to request, return next in chain (if there is one) if this
1202  * was the last reference to this request.
1203  */
1204 static inline struct io_kiocb *io_put_req_find_next(struct io_kiocb *req)
1205 {
1206     struct io_kiocb *nxt = NULL;
1207 
1208     if (req_ref_put_and_test(req)) {
1209         if (unlikely(req->flags & IO_REQ_LINK_FLAGS))
1210             nxt = io_req_find_next(req);
1211         io_free_req(req);
1212     }
1213     return nxt;
1214 }
1215 
1216 static unsigned io_cqring_events(struct io_ring_ctx *ctx)
1217 {
1218     /* See comment at the top of this file */
1219     smp_rmb();
1220     return __io_cqring_events(ctx);
1221 }
1222 
1223 /*
1224  * We can't just wait for polled events to come to us, we have to actively
1225  * find and complete them.
1226  */
1227 static __cold void io_iopoll_try_reap_events(struct io_ring_ctx *ctx)
1228 {
1229     if (!(ctx->flags & IORING_SETUP_IOPOLL))
1230         return;
1231 
1232     mutex_lock(&ctx->uring_lock);
1233     while (!wq_list_empty(&ctx->iopoll_list)) {
1234         /* let it sleep and repeat later if can't complete a request */
1235         if (io_do_iopoll(ctx, true) == 0)
1236             break;
1237         /*
1238          * Ensure we allow local-to-the-cpu processing to take place,
1239          * in this case we need to ensure that we reap all events.
1240          * Also let task_work, etc. to progress by releasing the mutex
1241          */
1242         if (need_resched()) {
1243             mutex_unlock(&ctx->uring_lock);
1244             cond_resched();
1245             mutex_lock(&ctx->uring_lock);
1246         }
1247     }
1248     mutex_unlock(&ctx->uring_lock);
1249 }
1250 
1251 static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
1252 {
1253     unsigned int nr_events = 0;
1254     int ret = 0;
1255     unsigned long check_cq;
1256 
1257     check_cq = READ_ONCE(ctx->check_cq);
1258     if (unlikely(check_cq)) {
1259         if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
1260             __io_cqring_overflow_flush(ctx, false);
1261         /*
1262          * Similarly do not spin if we have not informed the user of any
1263          * dropped CQE.
1264          */
1265         if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT))
1266             return -EBADR;
1267     }
1268     /*
1269      * Don't enter poll loop if we already have events pending.
1270      * If we do, we can potentially be spinning for commands that
1271      * already triggered a CQE (eg in error).
1272      */
1273     if (io_cqring_events(ctx))
1274         return 0;
1275 
1276     do {
1277         /*
1278          * If a submit got punted to a workqueue, we can have the
1279          * application entering polling for a command before it gets
1280          * issued. That app will hold the uring_lock for the duration
1281          * of the poll right here, so we need to take a breather every
1282          * now and then to ensure that the issue has a chance to add
1283          * the poll to the issued list. Otherwise we can spin here
1284          * forever, while the workqueue is stuck trying to acquire the
1285          * very same mutex.
1286          */
1287         if (wq_list_empty(&ctx->iopoll_list)) {
1288             u32 tail = ctx->cached_cq_tail;
1289 
1290             mutex_unlock(&ctx->uring_lock);
1291             io_run_task_work();
1292             mutex_lock(&ctx->uring_lock);
1293 
1294             /* some requests don't go through iopoll_list */
1295             if (tail != ctx->cached_cq_tail ||
1296                 wq_list_empty(&ctx->iopoll_list))
1297                 break;
1298         }
1299         ret = io_do_iopoll(ctx, !min);
1300         if (ret < 0)
1301             break;
1302         nr_events += ret;
1303         ret = 0;
1304     } while (nr_events < min && !need_resched());
1305 
1306     return ret;
1307 }
1308 
1309 void io_req_task_complete(struct io_kiocb *req, bool *locked)
1310 {
1311     if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
1312         unsigned issue_flags = *locked ? 0 : IO_URING_F_UNLOCKED;
1313 
1314         req->cqe.flags |= io_put_kbuf(req, issue_flags);
1315     }
1316 
1317     if (*locked)
1318         io_req_complete_defer(req);
1319     else
1320         io_req_complete_post(req);
1321 }
1322 
1323 /*
1324  * After the iocb has been issued, it's safe to be found on the poll list.
1325  * Adding the kiocb to the list AFTER submission ensures that we don't
1326  * find it from a io_do_iopoll() thread before the issuer is done
1327  * accessing the kiocb cookie.
1328  */
1329 static void io_iopoll_req_issued(struct io_kiocb *req, unsigned int issue_flags)
1330 {
1331     struct io_ring_ctx *ctx = req->ctx;
1332     const bool needs_lock = issue_flags & IO_URING_F_UNLOCKED;
1333 
1334     /* workqueue context doesn't hold uring_lock, grab it now */
1335     if (unlikely(needs_lock))
1336         mutex_lock(&ctx->uring_lock);
1337 
1338     /*
1339      * Track whether we have multiple files in our lists. This will impact
1340      * how we do polling eventually, not spinning if we're on potentially
1341      * different devices.
1342      */
1343     if (wq_list_empty(&ctx->iopoll_list)) {
1344         ctx->poll_multi_queue = false;
1345     } else if (!ctx->poll_multi_queue) {
1346         struct io_kiocb *list_req;
1347 
1348         list_req = container_of(ctx->iopoll_list.first, struct io_kiocb,
1349                     comp_list);
1350         if (list_req->file != req->file)
1351             ctx->poll_multi_queue = true;
1352     }
1353 
1354     /*
1355      * For fast devices, IO may have already completed. If it has, add
1356      * it to the front so we find it first.
1357      */
1358     if (READ_ONCE(req->iopoll_completed))
1359         wq_list_add_head(&req->comp_list, &ctx->iopoll_list);
1360     else
1361         wq_list_add_tail(&req->comp_list, &ctx->iopoll_list);
1362 
1363     if (unlikely(needs_lock)) {
1364         /*
1365          * If IORING_SETUP_SQPOLL is enabled, sqes are either handle
1366          * in sq thread task context or in io worker task context. If
1367          * current task context is sq thread, we don't need to check
1368          * whether should wake up sq thread.
1369          */
1370         if ((ctx->flags & IORING_SETUP_SQPOLL) &&
1371             wq_has_sleeper(&ctx->sq_data->wait))
1372             wake_up(&ctx->sq_data->wait);
1373 
1374         mutex_unlock(&ctx->uring_lock);
1375     }
1376 }
1377 
1378 static bool io_bdev_nowait(struct block_device *bdev)
1379 {
1380     return !bdev || blk_queue_nowait(bdev_get_queue(bdev));
1381 }
1382 
1383 /*
1384  * If we tracked the file through the SCM inflight mechanism, we could support
1385  * any file. For now, just ensure that anything potentially problematic is done
1386  * inline.
1387  */
1388 static bool __io_file_supports_nowait(struct file *file, umode_t mode)
1389 {
1390     if (S_ISBLK(mode)) {
1391         if (IS_ENABLED(CONFIG_BLOCK) &&
1392             io_bdev_nowait(I_BDEV(file->f_mapping->host)))
1393             return true;
1394         return false;
1395     }
1396     if (S_ISSOCK(mode))
1397         return true;
1398     if (S_ISREG(mode)) {
1399         if (IS_ENABLED(CONFIG_BLOCK) &&
1400             io_bdev_nowait(file->f_inode->i_sb->s_bdev) &&
1401             !io_is_uring_fops(file))
1402             return true;
1403         return false;
1404     }
1405 
1406     /* any ->read/write should understand O_NONBLOCK */
1407     if (file->f_flags & O_NONBLOCK)
1408         return true;
1409     return file->f_mode & FMODE_NOWAIT;
1410 }
1411 
1412 /*
1413  * If we tracked the file through the SCM inflight mechanism, we could support
1414  * any file. For now, just ensure that anything potentially problematic is done
1415  * inline.
1416  */
1417 unsigned int io_file_get_flags(struct file *file)
1418 {
1419     umode_t mode = file_inode(file)->i_mode;
1420     unsigned int res = 0;
1421 
1422     if (S_ISREG(mode))
1423         res |= FFS_ISREG;
1424     if (__io_file_supports_nowait(file, mode))
1425         res |= FFS_NOWAIT;
1426     if (io_file_need_scm(file))
1427         res |= FFS_SCM;
1428     return res;
1429 }
1430 
1431 bool io_alloc_async_data(struct io_kiocb *req)
1432 {
1433     WARN_ON_ONCE(!io_op_defs[req->opcode].async_size);
1434     req->async_data = kmalloc(io_op_defs[req->opcode].async_size, GFP_KERNEL);
1435     if (req->async_data) {
1436         req->flags |= REQ_F_ASYNC_DATA;
1437         return false;
1438     }
1439     return true;
1440 }
1441 
1442 int io_req_prep_async(struct io_kiocb *req)
1443 {
1444     const struct io_op_def *def = &io_op_defs[req->opcode];
1445 
1446     /* assign early for deferred execution for non-fixed file */
1447     if (def->needs_file && !(req->flags & REQ_F_FIXED_FILE))
1448         req->file = io_file_get_normal(req, req->cqe.fd);
1449     if (!def->prep_async)
1450         return 0;
1451     if (WARN_ON_ONCE(req_has_async_data(req)))
1452         return -EFAULT;
1453     if (!io_op_defs[req->opcode].manual_alloc) {
1454         if (io_alloc_async_data(req))
1455             return -EAGAIN;
1456     }
1457     return def->prep_async(req);
1458 }
1459 
1460 static u32 io_get_sequence(struct io_kiocb *req)
1461 {
1462     u32 seq = req->ctx->cached_sq_head;
1463     struct io_kiocb *cur;
1464 
1465     /* need original cached_sq_head, but it was increased for each req */
1466     io_for_each_link(cur, req)
1467         seq--;
1468     return seq;
1469 }
1470 
1471 static __cold void io_drain_req(struct io_kiocb *req)
1472 {
1473     struct io_ring_ctx *ctx = req->ctx;
1474     struct io_defer_entry *de;
1475     int ret;
1476     u32 seq = io_get_sequence(req);
1477 
1478     /* Still need defer if there is pending req in defer list. */
1479     spin_lock(&ctx->completion_lock);
1480     if (!req_need_defer(req, seq) && list_empty_careful(&ctx->defer_list)) {
1481         spin_unlock(&ctx->completion_lock);
1482 queue:
1483         ctx->drain_active = false;
1484         io_req_task_queue(req);
1485         return;
1486     }
1487     spin_unlock(&ctx->completion_lock);
1488 
1489     ret = io_req_prep_async(req);
1490     if (ret) {
1491 fail:
1492         io_req_complete_failed(req, ret);
1493         return;
1494     }
1495     io_prep_async_link(req);
1496     de = kmalloc(sizeof(*de), GFP_KERNEL);
1497     if (!de) {
1498         ret = -ENOMEM;
1499         goto fail;
1500     }
1501 
1502     spin_lock(&ctx->completion_lock);
1503     if (!req_need_defer(req, seq) && list_empty(&ctx->defer_list)) {
1504         spin_unlock(&ctx->completion_lock);
1505         kfree(de);
1506         goto queue;
1507     }
1508 
1509     trace_io_uring_defer(req);
1510     de->req = req;
1511     de->seq = seq;
1512     list_add_tail(&de->list, &ctx->defer_list);
1513     spin_unlock(&ctx->completion_lock);
1514 }
1515 
1516 static void io_clean_op(struct io_kiocb *req)
1517 {
1518     if (req->flags & REQ_F_BUFFER_SELECTED) {
1519         spin_lock(&req->ctx->completion_lock);
1520         io_put_kbuf_comp(req);
1521         spin_unlock(&req->ctx->completion_lock);
1522     }
1523 
1524     if (req->flags & REQ_F_NEED_CLEANUP) {
1525         const struct io_op_def *def = &io_op_defs[req->opcode];
1526 
1527         if (def->cleanup)
1528             def->cleanup(req);
1529     }
1530     if ((req->flags & REQ_F_POLLED) && req->apoll) {
1531         kfree(req->apoll->double_poll);
1532         kfree(req->apoll);
1533         req->apoll = NULL;
1534     }
1535     if (req->flags & REQ_F_INFLIGHT) {
1536         struct io_uring_task *tctx = req->task->io_uring;
1537 
1538         atomic_dec(&tctx->inflight_tracked);
1539     }
1540     if (req->flags & REQ_F_CREDS)
1541         put_cred(req->creds);
1542     if (req->flags & REQ_F_ASYNC_DATA) {
1543         kfree(req->async_data);
1544         req->async_data = NULL;
1545     }
1546     req->flags &= ~IO_REQ_CLEAN_FLAGS;
1547 }
1548 
1549 static bool io_assign_file(struct io_kiocb *req, unsigned int issue_flags)
1550 {
1551     if (req->file || !io_op_defs[req->opcode].needs_file)
1552         return true;
1553 
1554     if (req->flags & REQ_F_FIXED_FILE)
1555         req->file = io_file_get_fixed(req, req->cqe.fd, issue_flags);
1556     else
1557         req->file = io_file_get_normal(req, req->cqe.fd);
1558 
1559     return !!req->file;
1560 }
1561 
1562 static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
1563 {
1564     const struct io_op_def *def = &io_op_defs[req->opcode];
1565     const struct cred *creds = NULL;
1566     int ret;
1567 
1568     if (unlikely(!io_assign_file(req, issue_flags)))
1569         return -EBADF;
1570 
1571     if (unlikely((req->flags & REQ_F_CREDS) && req->creds != current_cred()))
1572         creds = override_creds(req->creds);
1573 
1574     if (!def->audit_skip)
1575         audit_uring_entry(req->opcode);
1576 
1577     ret = def->issue(req, issue_flags);
1578 
1579     if (!def->audit_skip)
1580         audit_uring_exit(!ret, ret);
1581 
1582     if (creds)
1583         revert_creds(creds);
1584 
1585     if (ret == IOU_OK) {
1586         if (issue_flags & IO_URING_F_COMPLETE_DEFER)
1587             io_req_complete_defer(req);
1588         else
1589             io_req_complete_post(req);
1590     } else if (ret != IOU_ISSUE_SKIP_COMPLETE)
1591         return ret;
1592 
1593     /* If the op doesn't have a file, we're not polling for it */
1594     if ((req->ctx->flags & IORING_SETUP_IOPOLL) && req->file)
1595         io_iopoll_req_issued(req, issue_flags);
1596 
1597     return 0;
1598 }
1599 
1600 int io_poll_issue(struct io_kiocb *req, bool *locked)
1601 {
1602     io_tw_lock(req->ctx, locked);
1603     if (unlikely(req->task->flags & PF_EXITING))
1604         return -EFAULT;
1605     return io_issue_sqe(req, IO_URING_F_NONBLOCK);
1606 }
1607 
1608 struct io_wq_work *io_wq_free_work(struct io_wq_work *work)
1609 {
1610     struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1611 
1612     req = io_put_req_find_next(req);
1613     return req ? &req->work : NULL;
1614 }
1615 
1616 void io_wq_submit_work(struct io_wq_work *work)
1617 {
1618     struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1619     const struct io_op_def *def = &io_op_defs[req->opcode];
1620     unsigned int issue_flags = IO_URING_F_UNLOCKED;
1621     bool needs_poll = false;
1622     int ret = 0, err = -ECANCELED;
1623 
1624     /* one will be dropped by ->io_free_work() after returning to io-wq */
1625     if (!(req->flags & REQ_F_REFCOUNT))
1626         __io_req_set_refcount(req, 2);
1627     else
1628         req_ref_get(req);
1629 
1630     io_arm_ltimeout(req);
1631 
1632     /* either cancelled or io-wq is dying, so don't touch tctx->iowq */
1633     if (work->flags & IO_WQ_WORK_CANCEL) {
1634 fail:
1635         io_req_task_queue_fail(req, err);
1636         return;
1637     }
1638     if (!io_assign_file(req, issue_flags)) {
1639         err = -EBADF;
1640         work->flags |= IO_WQ_WORK_CANCEL;
1641         goto fail;
1642     }
1643 
1644     if (req->flags & REQ_F_FORCE_ASYNC) {
1645         bool opcode_poll = def->pollin || def->pollout;
1646 
1647         if (opcode_poll && file_can_poll(req->file)) {
1648             needs_poll = true;
1649             issue_flags |= IO_URING_F_NONBLOCK;
1650         }
1651     }
1652 
1653     do {
1654         ret = io_issue_sqe(req, issue_flags);
1655         if (ret != -EAGAIN)
1656             break;
1657         /*
1658          * We can get EAGAIN for iopolled IO even though we're
1659          * forcing a sync submission from here, since we can't
1660          * wait for request slots on the block side.
1661          */
1662         if (!needs_poll) {
1663             if (!(req->ctx->flags & IORING_SETUP_IOPOLL))
1664                 break;
1665             cond_resched();
1666             continue;
1667         }
1668 
1669         if (io_arm_poll_handler(req, issue_flags) == IO_APOLL_OK)
1670             return;
1671         /* aborted or ready, in either case retry blocking */
1672         needs_poll = false;
1673         issue_flags &= ~IO_URING_F_NONBLOCK;
1674     } while (1);
1675 
1676     /* avoid locking problems by failing it from a clean context */
1677     if (ret < 0)
1678         io_req_task_queue_fail(req, ret);
1679 }
1680 
1681 inline struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
1682                       unsigned int issue_flags)
1683 {
1684     struct io_ring_ctx *ctx = req->ctx;
1685     struct file *file = NULL;
1686     unsigned long file_ptr;
1687 
1688     io_ring_submit_lock(ctx, issue_flags);
1689 
1690     if (unlikely((unsigned int)fd >= ctx->nr_user_files))
1691         goto out;
1692     fd = array_index_nospec(fd, ctx->nr_user_files);
1693     file_ptr = io_fixed_file_slot(&ctx->file_table, fd)->file_ptr;
1694     file = (struct file *) (file_ptr & FFS_MASK);
1695     file_ptr &= ~FFS_MASK;
1696     /* mask in overlapping REQ_F and FFS bits */
1697     req->flags |= (file_ptr << REQ_F_SUPPORT_NOWAIT_BIT);
1698     io_req_set_rsrc_node(req, ctx, 0);
1699     WARN_ON_ONCE(file && !test_bit(fd, ctx->file_table.bitmap));
1700 out:
1701     io_ring_submit_unlock(ctx, issue_flags);
1702     return file;
1703 }
1704 
1705 struct file *io_file_get_normal(struct io_kiocb *req, int fd)
1706 {
1707     struct file *file = fget(fd);
1708 
1709     trace_io_uring_file_get(req, fd);
1710 
1711     /* we don't allow fixed io_uring files */
1712     if (file && io_is_uring_fops(file))
1713         io_req_track_inflight(req);
1714     return file;
1715 }
1716 
1717 static void io_queue_async(struct io_kiocb *req, int ret)
1718     __must_hold(&req->ctx->uring_lock)
1719 {
1720     struct io_kiocb *linked_timeout;
1721 
1722     if (ret != -EAGAIN || (req->flags & REQ_F_NOWAIT)) {
1723         io_req_complete_failed(req, ret);
1724         return;
1725     }
1726 
1727     linked_timeout = io_prep_linked_timeout(req);
1728 
1729     switch (io_arm_poll_handler(req, 0)) {
1730     case IO_APOLL_READY:
1731         io_kbuf_recycle(req, 0);
1732         io_req_task_queue(req);
1733         break;
1734     case IO_APOLL_ABORTED:
1735         /*
1736          * Queued up for async execution, worker will release
1737          * submit reference when the iocb is actually submitted.
1738          */
1739         io_kbuf_recycle(req, 0);
1740         io_queue_iowq(req, NULL);
1741         break;
1742     case IO_APOLL_OK:
1743         break;
1744     }
1745 
1746     if (linked_timeout)
1747         io_queue_linked_timeout(linked_timeout);
1748 }
1749 
1750 static inline void io_queue_sqe(struct io_kiocb *req)
1751     __must_hold(&req->ctx->uring_lock)
1752 {
1753     int ret;
1754 
1755     ret = io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_COMPLETE_DEFER);
1756 
1757     /*
1758      * We async punt it if the file wasn't marked NOWAIT, or if the file
1759      * doesn't support non-blocking read/write attempts
1760      */
1761     if (likely(!ret))
1762         io_arm_ltimeout(req);
1763     else
1764         io_queue_async(req, ret);
1765 }
1766 
1767 static void io_queue_sqe_fallback(struct io_kiocb *req)
1768     __must_hold(&req->ctx->uring_lock)
1769 {
1770     if (unlikely(req->flags & REQ_F_FAIL)) {
1771         /*
1772          * We don't submit, fail them all, for that replace hardlinks
1773          * with normal links. Extra REQ_F_LINK is tolerated.
1774          */
1775         req->flags &= ~REQ_F_HARDLINK;
1776         req->flags |= REQ_F_LINK;
1777         io_req_complete_failed(req, req->cqe.res);
1778     } else if (unlikely(req->ctx->drain_active)) {
1779         io_drain_req(req);
1780     } else {
1781         int ret = io_req_prep_async(req);
1782 
1783         if (unlikely(ret))
1784             io_req_complete_failed(req, ret);
1785         else
1786             io_queue_iowq(req, NULL);
1787     }
1788 }
1789 
1790 /*
1791  * Check SQE restrictions (opcode and flags).
1792  *
1793  * Returns 'true' if SQE is allowed, 'false' otherwise.
1794  */
1795 static inline bool io_check_restriction(struct io_ring_ctx *ctx,
1796                     struct io_kiocb *req,
1797                     unsigned int sqe_flags)
1798 {
1799     if (!test_bit(req->opcode, ctx->restrictions.sqe_op))
1800         return false;
1801 
1802     if ((sqe_flags & ctx->restrictions.sqe_flags_required) !=
1803         ctx->restrictions.sqe_flags_required)
1804         return false;
1805 
1806     if (sqe_flags & ~(ctx->restrictions.sqe_flags_allowed |
1807               ctx->restrictions.sqe_flags_required))
1808         return false;
1809 
1810     return true;
1811 }
1812 
1813 static void io_init_req_drain(struct io_kiocb *req)
1814 {
1815     struct io_ring_ctx *ctx = req->ctx;
1816     struct io_kiocb *head = ctx->submit_state.link.head;
1817 
1818     ctx->drain_active = true;
1819     if (head) {
1820         /*
1821          * If we need to drain a request in the middle of a link, drain
1822          * the head request and the next request/link after the current
1823          * link. Considering sequential execution of links,
1824          * REQ_F_IO_DRAIN will be maintained for every request of our
1825          * link.
1826          */
1827         head->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
1828         ctx->drain_next = true;
1829     }
1830 }
1831 
1832 static int io_init_req(struct io_ring_ctx *ctx, struct io_kiocb *req,
1833                const struct io_uring_sqe *sqe)
1834     __must_hold(&ctx->uring_lock)
1835 {
1836     const struct io_op_def *def;
1837     unsigned int sqe_flags;
1838     int personality;
1839     u8 opcode;
1840 
1841     /* req is partially pre-initialised, see io_preinit_req() */
1842     req->opcode = opcode = READ_ONCE(sqe->opcode);
1843     /* same numerical values with corresponding REQ_F_*, safe to copy */
1844     req->flags = sqe_flags = READ_ONCE(sqe->flags);
1845     req->cqe.user_data = READ_ONCE(sqe->user_data);
1846     req->file = NULL;
1847     req->rsrc_node = NULL;
1848     req->task = current;
1849 
1850     if (unlikely(opcode >= IORING_OP_LAST)) {
1851         req->opcode = 0;
1852         return -EINVAL;
1853     }
1854     def = &io_op_defs[opcode];
1855     if (unlikely(sqe_flags & ~SQE_COMMON_FLAGS)) {
1856         /* enforce forwards compatibility on users */
1857         if (sqe_flags & ~SQE_VALID_FLAGS)
1858             return -EINVAL;
1859         if (sqe_flags & IOSQE_BUFFER_SELECT) {
1860             if (!def->buffer_select)
1861                 return -EOPNOTSUPP;
1862             req->buf_index = READ_ONCE(sqe->buf_group);
1863         }
1864         if (sqe_flags & IOSQE_CQE_SKIP_SUCCESS)
1865             ctx->drain_disabled = true;
1866         if (sqe_flags & IOSQE_IO_DRAIN) {
1867             if (ctx->drain_disabled)
1868                 return -EOPNOTSUPP;
1869             io_init_req_drain(req);
1870         }
1871     }
1872     if (unlikely(ctx->restricted || ctx->drain_active || ctx->drain_next)) {
1873         if (ctx->restricted && !io_check_restriction(ctx, req, sqe_flags))
1874             return -EACCES;
1875         /* knock it to the slow queue path, will be drained there */
1876         if (ctx->drain_active)
1877             req->flags |= REQ_F_FORCE_ASYNC;
1878         /* if there is no link, we're at "next" request and need to drain */
1879         if (unlikely(ctx->drain_next) && !ctx->submit_state.link.head) {
1880             ctx->drain_next = false;
1881             ctx->drain_active = true;
1882             req->flags |= REQ_F_IO_DRAIN | REQ_F_FORCE_ASYNC;
1883         }
1884     }
1885 
1886     if (!def->ioprio && sqe->ioprio)
1887         return -EINVAL;
1888     if (!def->iopoll && (ctx->flags & IORING_SETUP_IOPOLL))
1889         return -EINVAL;
1890 
1891     if (def->needs_file) {
1892         struct io_submit_state *state = &ctx->submit_state;
1893 
1894         req->cqe.fd = READ_ONCE(sqe->fd);
1895 
1896         /*
1897          * Plug now if we have more than 2 IO left after this, and the
1898          * target is potentially a read/write to block based storage.
1899          */
1900         if (state->need_plug && def->plug) {
1901             state->plug_started = true;
1902             state->need_plug = false;
1903             blk_start_plug_nr_ios(&state->plug, state->submit_nr);
1904         }
1905     }
1906 
1907     personality = READ_ONCE(sqe->personality);
1908     if (personality) {
1909         int ret;
1910 
1911         req->creds = xa_load(&ctx->personalities, personality);
1912         if (!req->creds)
1913             return -EINVAL;
1914         get_cred(req->creds);
1915         ret = security_uring_override_creds(req->creds);
1916         if (ret) {
1917             put_cred(req->creds);
1918             return ret;
1919         }
1920         req->flags |= REQ_F_CREDS;
1921     }
1922 
1923     return def->prep(req, sqe);
1924 }
1925 
1926 static __cold int io_submit_fail_init(const struct io_uring_sqe *sqe,
1927                       struct io_kiocb *req, int ret)
1928 {
1929     struct io_ring_ctx *ctx = req->ctx;
1930     struct io_submit_link *link = &ctx->submit_state.link;
1931     struct io_kiocb *head = link->head;
1932 
1933     trace_io_uring_req_failed(sqe, req, ret);
1934 
1935     /*
1936      * Avoid breaking links in the middle as it renders links with SQPOLL
1937      * unusable. Instead of failing eagerly, continue assembling the link if
1938      * applicable and mark the head with REQ_F_FAIL. The link flushing code
1939      * should find the flag and handle the rest.
1940      */
1941     req_fail_link_node(req, ret);
1942     if (head && !(head->flags & REQ_F_FAIL))
1943         req_fail_link_node(head, -ECANCELED);
1944 
1945     if (!(req->flags & IO_REQ_LINK_FLAGS)) {
1946         if (head) {
1947             link->last->link = req;
1948             link->head = NULL;
1949             req = head;
1950         }
1951         io_queue_sqe_fallback(req);
1952         return ret;
1953     }
1954 
1955     if (head)
1956         link->last->link = req;
1957     else
1958         link->head = req;
1959     link->last = req;
1960     return 0;
1961 }
1962 
1963 static inline int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
1964              const struct io_uring_sqe *sqe)
1965     __must_hold(&ctx->uring_lock)
1966 {
1967     struct io_submit_link *link = &ctx->submit_state.link;
1968     int ret;
1969 
1970     ret = io_init_req(ctx, req, sqe);
1971     if (unlikely(ret))
1972         return io_submit_fail_init(sqe, req, ret);
1973 
1974     /* don't need @sqe from now on */
1975     trace_io_uring_submit_sqe(req, true);
1976 
1977     /*
1978      * If we already have a head request, queue this one for async
1979      * submittal once the head completes. If we don't have a head but
1980      * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
1981      * submitted sync once the chain is complete. If none of those
1982      * conditions are true (normal request), then just queue it.
1983      */
1984     if (unlikely(link->head)) {
1985         ret = io_req_prep_async(req);
1986         if (unlikely(ret))
1987             return io_submit_fail_init(sqe, req, ret);
1988 
1989         trace_io_uring_link(req, link->head);
1990         link->last->link = req;
1991         link->last = req;
1992 
1993         if (req->flags & IO_REQ_LINK_FLAGS)
1994             return 0;
1995         /* last request of the link, flush it */
1996         req = link->head;
1997         link->head = NULL;
1998         if (req->flags & (REQ_F_FORCE_ASYNC | REQ_F_FAIL))
1999             goto fallback;
2000 
2001     } else if (unlikely(req->flags & (IO_REQ_LINK_FLAGS |
2002                       REQ_F_FORCE_ASYNC | REQ_F_FAIL))) {
2003         if (req->flags & IO_REQ_LINK_FLAGS) {
2004             link->head = req;
2005             link->last = req;
2006         } else {
2007 fallback:
2008             io_queue_sqe_fallback(req);
2009         }
2010         return 0;
2011     }
2012 
2013     io_queue_sqe(req);
2014     return 0;
2015 }
2016 
2017 /*
2018  * Batched submission is done, ensure local IO is flushed out.
2019  */
2020 static void io_submit_state_end(struct io_ring_ctx *ctx)
2021 {
2022     struct io_submit_state *state = &ctx->submit_state;
2023 
2024     if (unlikely(state->link.head))
2025         io_queue_sqe_fallback(state->link.head);
2026     /* flush only after queuing links as they can generate completions */
2027     io_submit_flush_completions(ctx);
2028     if (state->plug_started)
2029         blk_finish_plug(&state->plug);
2030 }
2031 
2032 /*
2033  * Start submission side cache.
2034  */
2035 static void io_submit_state_start(struct io_submit_state *state,
2036                   unsigned int max_ios)
2037 {
2038     state->plug_started = false;
2039     state->need_plug = max_ios > 2;
2040     state->submit_nr = max_ios;
2041     /* set only head, no need to init link_last in advance */
2042     state->link.head = NULL;
2043 }
2044 
2045 static void io_commit_sqring(struct io_ring_ctx *ctx)
2046 {
2047     struct io_rings *rings = ctx->rings;
2048 
2049     /*
2050      * Ensure any loads from the SQEs are done at this point,
2051      * since once we write the new head, the application could
2052      * write new data to them.
2053      */
2054     smp_store_release(&rings->sq.head, ctx->cached_sq_head);
2055 }
2056 
2057 /*
2058  * Fetch an sqe, if one is available. Note this returns a pointer to memory
2059  * that is mapped by userspace. This means that care needs to be taken to
2060  * ensure that reads are stable, as we cannot rely on userspace always
2061  * being a good citizen. If members of the sqe are validated and then later
2062  * used, it's important that those reads are done through READ_ONCE() to
2063  * prevent a re-load down the line.
2064  */
2065 static const struct io_uring_sqe *io_get_sqe(struct io_ring_ctx *ctx)
2066 {
2067     unsigned head, mask = ctx->sq_entries - 1;
2068     unsigned sq_idx = ctx->cached_sq_head++ & mask;
2069 
2070     /*
2071      * The cached sq head (or cq tail) serves two purposes:
2072      *
2073      * 1) allows us to batch the cost of updating the user visible
2074      *    head updates.
2075      * 2) allows the kernel side to track the head on its own, even
2076      *    though the application is the one updating it.
2077      */
2078     head = READ_ONCE(ctx->sq_array[sq_idx]);
2079     if (likely(head < ctx->sq_entries)) {
2080         /* double index for 128-byte SQEs, twice as long */
2081         if (ctx->flags & IORING_SETUP_SQE128)
2082             head <<= 1;
2083         return &ctx->sq_sqes[head];
2084     }
2085 
2086     /* drop invalid entries */
2087     ctx->cq_extra--;
2088     WRITE_ONCE(ctx->rings->sq_dropped,
2089            READ_ONCE(ctx->rings->sq_dropped) + 1);
2090     return NULL;
2091 }
2092 
2093 int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
2094     __must_hold(&ctx->uring_lock)
2095 {
2096     unsigned int entries = io_sqring_entries(ctx);
2097     unsigned int left;
2098     int ret;
2099 
2100     if (unlikely(!entries))
2101         return 0;
2102     /* make sure SQ entry isn't read before tail */
2103     ret = left = min3(nr, ctx->sq_entries, entries);
2104     io_get_task_refs(left);
2105     io_submit_state_start(&ctx->submit_state, left);
2106 
2107     do {
2108         const struct io_uring_sqe *sqe;
2109         struct io_kiocb *req;
2110 
2111         if (unlikely(!io_alloc_req_refill(ctx)))
2112             break;
2113         req = io_alloc_req(ctx);
2114         sqe = io_get_sqe(ctx);
2115         if (unlikely(!sqe)) {
2116             io_req_add_to_cache(req, ctx);
2117             break;
2118         }
2119 
2120         /*
2121          * Continue submitting even for sqe failure if the
2122          * ring was setup with IORING_SETUP_SUBMIT_ALL
2123          */
2124         if (unlikely(io_submit_sqe(ctx, req, sqe)) &&
2125             !(ctx->flags & IORING_SETUP_SUBMIT_ALL)) {
2126             left--;
2127             break;
2128         }
2129     } while (--left);
2130 
2131     if (unlikely(left)) {
2132         ret -= left;
2133         /* try again if it submitted nothing and can't allocate a req */
2134         if (!ret && io_req_cache_empty(ctx))
2135             ret = -EAGAIN;
2136         current->io_uring->cached_refs += left;
2137     }
2138 
2139     io_submit_state_end(ctx);
2140      /* Commit SQ ring head once we've consumed and submitted all SQEs */
2141     io_commit_sqring(ctx);
2142     return ret;
2143 }
2144 
2145 struct io_wait_queue {
2146     struct wait_queue_entry wq;
2147     struct io_ring_ctx *ctx;
2148     unsigned cq_tail;
2149     unsigned nr_timeouts;
2150 };
2151 
2152 static inline bool io_should_wake(struct io_wait_queue *iowq)
2153 {
2154     struct io_ring_ctx *ctx = iowq->ctx;
2155     int dist = ctx->cached_cq_tail - (int) iowq->cq_tail;
2156 
2157     /*
2158      * Wake up if we have enough events, or if a timeout occurred since we
2159      * started waiting. For timeouts, we always want to return to userspace,
2160      * regardless of event count.
2161      */
2162     return dist >= 0 || atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
2163 }
2164 
2165 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
2166                 int wake_flags, void *key)
2167 {
2168     struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
2169                             wq);
2170 
2171     /*
2172      * Cannot safely flush overflowed CQEs from here, ensure we wake up
2173      * the task, and the next invocation will do it.
2174      */
2175     if (io_should_wake(iowq) ||
2176         test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &iowq->ctx->check_cq))
2177         return autoremove_wake_function(curr, mode, wake_flags, key);
2178     return -1;
2179 }
2180 
2181 int io_run_task_work_sig(void)
2182 {
2183     if (io_run_task_work())
2184         return 1;
2185     if (task_sigpending(current))
2186         return -EINTR;
2187     return 0;
2188 }
2189 
2190 /* when returns >0, the caller should retry */
2191 static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
2192                       struct io_wait_queue *iowq,
2193                       ktime_t timeout)
2194 {
2195     int ret;
2196     unsigned long check_cq;
2197 
2198     /* make sure we run task_work before checking for signals */
2199     ret = io_run_task_work_sig();
2200     if (ret || io_should_wake(iowq))
2201         return ret;
2202 
2203     check_cq = READ_ONCE(ctx->check_cq);
2204     if (unlikely(check_cq)) {
2205         /* let the caller flush overflows, retry */
2206         if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
2207             return 1;
2208         if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT))
2209             return -EBADR;
2210     }
2211     if (!schedule_hrtimeout(&timeout, HRTIMER_MODE_ABS))
2212         return -ETIME;
2213     return 1;
2214 }
2215 
2216 /*
2217  * Wait until events become available, if we don't already have some. The
2218  * application must reap them itself, as they reside on the shared cq ring.
2219  */
2220 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
2221               const sigset_t __user *sig, size_t sigsz,
2222               struct __kernel_timespec __user *uts)
2223 {
2224     struct io_wait_queue iowq;
2225     struct io_rings *rings = ctx->rings;
2226     ktime_t timeout = KTIME_MAX;
2227     int ret;
2228 
2229     do {
2230         io_cqring_overflow_flush(ctx);
2231         if (io_cqring_events(ctx) >= min_events)
2232             return 0;
2233         if (!io_run_task_work())
2234             break;
2235     } while (1);
2236 
2237     if (sig) {
2238 #ifdef CONFIG_COMPAT
2239         if (in_compat_syscall())
2240             ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
2241                               sigsz);
2242         else
2243 #endif
2244             ret = set_user_sigmask(sig, sigsz);
2245 
2246         if (ret)
2247             return ret;
2248     }
2249 
2250     if (uts) {
2251         struct timespec64 ts;
2252 
2253         if (get_timespec64(&ts, uts))
2254             return -EFAULT;
2255         timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns());
2256     }
2257 
2258     init_waitqueue_func_entry(&iowq.wq, io_wake_function);
2259     iowq.wq.private = current;
2260     INIT_LIST_HEAD(&iowq.wq.entry);
2261     iowq.ctx = ctx;
2262     iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
2263     iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
2264 
2265     trace_io_uring_cqring_wait(ctx, min_events);
2266     do {
2267         /* if we can't even flush overflow, don't wait for more */
2268         if (!io_cqring_overflow_flush(ctx)) {
2269             ret = -EBUSY;
2270             break;
2271         }
2272         prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
2273                         TASK_INTERRUPTIBLE);
2274         ret = io_cqring_wait_schedule(ctx, &iowq, timeout);
2275         cond_resched();
2276     } while (ret > 0);
2277 
2278     finish_wait(&ctx->cq_wait, &iowq.wq);
2279     restore_saved_sigmask_unless(ret == -EINTR);
2280 
2281     return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
2282 }
2283 
2284 static void io_mem_free(void *ptr)
2285 {
2286     struct page *page;
2287 
2288     if (!ptr)
2289         return;
2290 
2291     page = virt_to_head_page(ptr);
2292     if (put_page_testzero(page))
2293         free_compound_page(page);
2294 }
2295 
2296 static void *io_mem_alloc(size_t size)
2297 {
2298     gfp_t gfp = GFP_KERNEL_ACCOUNT | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP;
2299 
2300     return (void *) __get_free_pages(gfp, get_order(size));
2301 }
2302 
2303 static unsigned long rings_size(struct io_ring_ctx *ctx, unsigned int sq_entries,
2304                 unsigned int cq_entries, size_t *sq_offset)
2305 {
2306     struct io_rings *rings;
2307     size_t off, sq_array_size;
2308 
2309     off = struct_size(rings, cqes, cq_entries);
2310     if (off == SIZE_MAX)
2311         return SIZE_MAX;
2312     if (ctx->flags & IORING_SETUP_CQE32) {
2313         if (check_shl_overflow(off, 1, &off))
2314             return SIZE_MAX;
2315     }
2316 
2317 #ifdef CONFIG_SMP
2318     off = ALIGN(off, SMP_CACHE_BYTES);
2319     if (off == 0)
2320         return SIZE_MAX;
2321 #endif
2322 
2323     if (sq_offset)
2324         *sq_offset = off;
2325 
2326     sq_array_size = array_size(sizeof(u32), sq_entries);
2327     if (sq_array_size == SIZE_MAX)
2328         return SIZE_MAX;
2329 
2330     if (check_add_overflow(off, sq_array_size, &off))
2331         return SIZE_MAX;
2332 
2333     return off;
2334 }
2335 
2336 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg,
2337                    unsigned int eventfd_async)
2338 {
2339     struct io_ev_fd *ev_fd;
2340     __s32 __user *fds = arg;
2341     int fd;
2342 
2343     ev_fd = rcu_dereference_protected(ctx->io_ev_fd,
2344                     lockdep_is_held(&ctx->uring_lock));
2345     if (ev_fd)
2346         return -EBUSY;
2347 
2348     if (copy_from_user(&fd, fds, sizeof(*fds)))
2349         return -EFAULT;
2350 
2351     ev_fd = kmalloc(sizeof(*ev_fd), GFP_KERNEL);
2352     if (!ev_fd)
2353         return -ENOMEM;
2354 
2355     ev_fd->cq_ev_fd = eventfd_ctx_fdget(fd);
2356     if (IS_ERR(ev_fd->cq_ev_fd)) {
2357         int ret = PTR_ERR(ev_fd->cq_ev_fd);
2358         kfree(ev_fd);
2359         return ret;
2360     }
2361 
2362     spin_lock(&ctx->completion_lock);
2363     ctx->evfd_last_cq_tail = ctx->cached_cq_tail;
2364     spin_unlock(&ctx->completion_lock);
2365 
2366     ev_fd->eventfd_async = eventfd_async;
2367     ctx->has_evfd = true;
2368     rcu_assign_pointer(ctx->io_ev_fd, ev_fd);
2369     return 0;
2370 }
2371 
2372 static void io_eventfd_put(struct rcu_head *rcu)
2373 {
2374     struct io_ev_fd *ev_fd = container_of(rcu, struct io_ev_fd, rcu);
2375 
2376     eventfd_ctx_put(ev_fd->cq_ev_fd);
2377     kfree(ev_fd);
2378 }
2379 
2380 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
2381 {
2382     struct io_ev_fd *ev_fd;
2383 
2384     ev_fd = rcu_dereference_protected(ctx->io_ev_fd,
2385                     lockdep_is_held(&ctx->uring_lock));
2386     if (ev_fd) {
2387         ctx->has_evfd = false;
2388         rcu_assign_pointer(ctx->io_ev_fd, NULL);
2389         call_rcu(&ev_fd->rcu, io_eventfd_put);
2390         return 0;
2391     }
2392 
2393     return -ENXIO;
2394 }
2395 
2396 static void io_req_caches_free(struct io_ring_ctx *ctx)
2397 {
2398     struct io_submit_state *state = &ctx->submit_state;
2399     int nr = 0;
2400 
2401     mutex_lock(&ctx->uring_lock);
2402     io_flush_cached_locked_reqs(ctx, state);
2403 
2404     while (!io_req_cache_empty(ctx)) {
2405         struct io_wq_work_node *node;
2406         struct io_kiocb *req;
2407 
2408         node = wq_stack_extract(&state->free_list);
2409         req = container_of(node, struct io_kiocb, comp_list);
2410         kmem_cache_free(req_cachep, req);
2411         nr++;
2412     }
2413     if (nr)
2414         percpu_ref_put_many(&ctx->refs, nr);
2415     mutex_unlock(&ctx->uring_lock);
2416 }
2417 
2418 static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
2419 {
2420     io_sq_thread_finish(ctx);
2421 
2422     if (ctx->mm_account) {
2423         mmdrop(ctx->mm_account);
2424         ctx->mm_account = NULL;
2425     }
2426 
2427     io_rsrc_refs_drop(ctx);
2428     /* __io_rsrc_put_work() may need uring_lock to progress, wait w/o it */
2429     io_wait_rsrc_data(ctx->buf_data);
2430     io_wait_rsrc_data(ctx->file_data);
2431 
2432     mutex_lock(&ctx->uring_lock);
2433     if (ctx->buf_data)
2434         __io_sqe_buffers_unregister(ctx);
2435     if (ctx->file_data)
2436         __io_sqe_files_unregister(ctx);
2437     if (ctx->rings)
2438         __io_cqring_overflow_flush(ctx, true);
2439     io_eventfd_unregister(ctx);
2440     io_alloc_cache_free(&ctx->apoll_cache, io_apoll_cache_free);
2441     io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
2442     mutex_unlock(&ctx->uring_lock);
2443     io_destroy_buffers(ctx);
2444     if (ctx->sq_creds)
2445         put_cred(ctx->sq_creds);
2446     if (ctx->submitter_task)
2447         put_task_struct(ctx->submitter_task);
2448 
2449     /* there are no registered resources left, nobody uses it */
2450     if (ctx->rsrc_node)
2451         io_rsrc_node_destroy(ctx->rsrc_node);
2452     if (ctx->rsrc_backup_node)
2453         io_rsrc_node_destroy(ctx->rsrc_backup_node);
2454     flush_delayed_work(&ctx->rsrc_put_work);
2455     flush_delayed_work(&ctx->fallback_work);
2456 
2457     WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list));
2458     WARN_ON_ONCE(!llist_empty(&ctx->rsrc_put_llist));
2459 
2460 #if defined(CONFIG_UNIX)
2461     if (ctx->ring_sock) {
2462         ctx->ring_sock->file = NULL; /* so that iput() is called */
2463         sock_release(ctx->ring_sock);
2464     }
2465 #endif
2466     WARN_ON_ONCE(!list_empty(&ctx->ltimeout_list));
2467     WARN_ON_ONCE(ctx->notif_slots || ctx->nr_notif_slots);
2468 
2469     io_mem_free(ctx->rings);
2470     io_mem_free(ctx->sq_sqes);
2471 
2472     percpu_ref_exit(&ctx->refs);
2473     free_uid(ctx->user);
2474     io_req_caches_free(ctx);
2475     if (ctx->hash_map)
2476         io_wq_put_hash(ctx->hash_map);
2477     kfree(ctx->cancel_table.hbs);
2478     kfree(ctx->cancel_table_locked.hbs);
2479     kfree(ctx->dummy_ubuf);
2480     kfree(ctx->io_bl);
2481     xa_destroy(&ctx->io_bl_xa);
2482     kfree(ctx);
2483 }
2484 
2485 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
2486 {
2487     struct io_ring_ctx *ctx = file->private_data;
2488     __poll_t mask = 0;
2489 
2490     poll_wait(file, &ctx->cq_wait, wait);
2491     /*
2492      * synchronizes with barrier from wq_has_sleeper call in
2493      * io_commit_cqring
2494      */
2495     smp_rmb();
2496     if (!io_sqring_full(ctx))
2497         mask |= EPOLLOUT | EPOLLWRNORM;
2498 
2499     /*
2500      * Don't flush cqring overflow list here, just do a simple check.
2501      * Otherwise there could possible be ABBA deadlock:
2502      *      CPU0                    CPU1
2503      *      ----                    ----
2504      * lock(&ctx->uring_lock);
2505      *                              lock(&ep->mtx);
2506      *                              lock(&ctx->uring_lock);
2507      * lock(&ep->mtx);
2508      *
2509      * Users may get EPOLLIN meanwhile seeing nothing in cqring, this
2510      * pushs them to do the flush.
2511      */
2512     if (io_cqring_events(ctx) ||
2513         test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))
2514         mask |= EPOLLIN | EPOLLRDNORM;
2515 
2516     return mask;
2517 }
2518 
2519 static int io_unregister_personality(struct io_ring_ctx *ctx, unsigned id)
2520 {
2521     const struct cred *creds;
2522 
2523     creds = xa_erase(&ctx->personalities, id);
2524     if (creds) {
2525         put_cred(creds);
2526         return 0;
2527     }
2528 
2529     return -EINVAL;
2530 }
2531 
2532 struct io_tctx_exit {
2533     struct callback_head        task_work;
2534     struct completion       completion;
2535     struct io_ring_ctx      *ctx;
2536 };
2537 
2538 static __cold void io_tctx_exit_cb(struct callback_head *cb)
2539 {
2540     struct io_uring_task *tctx = current->io_uring;
2541     struct io_tctx_exit *work;
2542 
2543     work = container_of(cb, struct io_tctx_exit, task_work);
2544     /*
2545      * When @in_idle, we're in cancellation and it's racy to remove the
2546      * node. It'll be removed by the end of cancellation, just ignore it.
2547      */
2548     if (!atomic_read(&tctx->in_idle))
2549         io_uring_del_tctx_node((unsigned long)work->ctx);
2550     complete(&work->completion);
2551 }
2552 
2553 static __cold bool io_cancel_ctx_cb(struct io_wq_work *work, void *data)
2554 {
2555     struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2556 
2557     return req->ctx == data;
2558 }
2559 
2560 static __cold void io_ring_exit_work(struct work_struct *work)
2561 {
2562     struct io_ring_ctx *ctx = container_of(work, struct io_ring_ctx, exit_work);
2563     unsigned long timeout = jiffies + HZ * 60 * 5;
2564     unsigned long interval = HZ / 20;
2565     struct io_tctx_exit exit;
2566     struct io_tctx_node *node;
2567     int ret;
2568 
2569     /*
2570      * If we're doing polled IO and end up having requests being
2571      * submitted async (out-of-line), then completions can come in while
2572      * we're waiting for refs to drop. We need to reap these manually,
2573      * as nobody else will be looking for them.
2574      */
2575     do {
2576         while (io_uring_try_cancel_requests(ctx, NULL, true))
2577             cond_resched();
2578 
2579         if (ctx->sq_data) {
2580             struct io_sq_data *sqd = ctx->sq_data;
2581             struct task_struct *tsk;
2582 
2583             io_sq_thread_park(sqd);
2584             tsk = sqd->thread;
2585             if (tsk && tsk->io_uring && tsk->io_uring->io_wq)
2586                 io_wq_cancel_cb(tsk->io_uring->io_wq,
2587                         io_cancel_ctx_cb, ctx, true);
2588             io_sq_thread_unpark(sqd);
2589         }
2590 
2591         io_req_caches_free(ctx);
2592 
2593         if (WARN_ON_ONCE(time_after(jiffies, timeout))) {
2594             /* there is little hope left, don't run it too often */
2595             interval = HZ * 60;
2596         }
2597     } while (!wait_for_completion_timeout(&ctx->ref_comp, interval));
2598 
2599     init_completion(&exit.completion);
2600     init_task_work(&exit.task_work, io_tctx_exit_cb);
2601     exit.ctx = ctx;
2602     /*
2603      * Some may use context even when all refs and requests have been put,
2604      * and they are free to do so while still holding uring_lock or
2605      * completion_lock, see io_req_task_submit(). Apart from other work,
2606      * this lock/unlock section also waits them to finish.
2607      */
2608     mutex_lock(&ctx->uring_lock);
2609     while (!list_empty(&ctx->tctx_list)) {
2610         WARN_ON_ONCE(time_after(jiffies, timeout));
2611 
2612         node = list_first_entry(&ctx->tctx_list, struct io_tctx_node,
2613                     ctx_node);
2614         /* don't spin on a single task if cancellation failed */
2615         list_rotate_left(&ctx->tctx_list);
2616         ret = task_work_add(node->task, &exit.task_work, TWA_SIGNAL);
2617         if (WARN_ON_ONCE(ret))
2618             continue;
2619 
2620         mutex_unlock(&ctx->uring_lock);
2621         wait_for_completion(&exit.completion);
2622         mutex_lock(&ctx->uring_lock);
2623     }
2624     mutex_unlock(&ctx->uring_lock);
2625     spin_lock(&ctx->completion_lock);
2626     spin_unlock(&ctx->completion_lock);
2627 
2628     io_ring_ctx_free(ctx);
2629 }
2630 
2631 static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
2632 {
2633     unsigned long index;
2634     struct creds *creds;
2635 
2636     mutex_lock(&ctx->uring_lock);
2637     percpu_ref_kill(&ctx->refs);
2638     if (ctx->rings)
2639         __io_cqring_overflow_flush(ctx, true);
2640     xa_for_each(&ctx->personalities, index, creds)
2641         io_unregister_personality(ctx, index);
2642     if (ctx->rings)
2643         io_poll_remove_all(ctx, NULL, true);
2644     mutex_unlock(&ctx->uring_lock);
2645 
2646     /* failed during ring init, it couldn't have issued any requests */
2647     if (ctx->rings) {
2648         io_kill_timeouts(ctx, NULL, true);
2649         /* if we failed setting up the ctx, we might not have any rings */
2650         io_iopoll_try_reap_events(ctx);
2651         /* drop cached put refs after potentially doing completions */
2652         if (current->io_uring)
2653             io_uring_drop_tctx_refs(current);
2654     }
2655 
2656     INIT_WORK(&ctx->exit_work, io_ring_exit_work);
2657     /*
2658      * Use system_unbound_wq to avoid spawning tons of event kworkers
2659      * if we're exiting a ton of rings at the same time. It just adds
2660      * noise and overhead, there's no discernable change in runtime
2661      * over using system_wq.
2662      */
2663     queue_work(system_unbound_wq, &ctx->exit_work);
2664 }
2665 
2666 static int io_uring_release(struct inode *inode, struct file *file)
2667 {
2668     struct io_ring_ctx *ctx = file->private_data;
2669 
2670     file->private_data = NULL;
2671     io_ring_ctx_wait_and_kill(ctx);
2672     return 0;
2673 }
2674 
2675 struct io_task_cancel {
2676     struct task_struct *task;
2677     bool all;
2678 };
2679 
2680 static bool io_cancel_task_cb(struct io_wq_work *work, void *data)
2681 {
2682     struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2683     struct io_task_cancel *cancel = data;
2684 
2685     return io_match_task_safe(req, cancel->task, cancel->all);
2686 }
2687 
2688 static __cold bool io_cancel_defer_files(struct io_ring_ctx *ctx,
2689                      struct task_struct *task,
2690                      bool cancel_all)
2691 {
2692     struct io_defer_entry *de;
2693     LIST_HEAD(list);
2694 
2695     spin_lock(&ctx->completion_lock);
2696     list_for_each_entry_reverse(de, &ctx->defer_list, list) {
2697         if (io_match_task_safe(de->req, task, cancel_all)) {
2698             list_cut_position(&list, &ctx->defer_list, &de->list);
2699             break;
2700         }
2701     }
2702     spin_unlock(&ctx->completion_lock);
2703     if (list_empty(&list))
2704         return false;
2705 
2706     while (!list_empty(&list)) {
2707         de = list_first_entry(&list, struct io_defer_entry, list);
2708         list_del_init(&de->list);
2709         io_req_complete_failed(de->req, -ECANCELED);
2710         kfree(de);
2711     }
2712     return true;
2713 }
2714 
2715 static __cold bool io_uring_try_cancel_iowq(struct io_ring_ctx *ctx)
2716 {
2717     struct io_tctx_node *node;
2718     enum io_wq_cancel cret;
2719     bool ret = false;
2720 
2721     mutex_lock(&ctx->uring_lock);
2722     list_for_each_entry(node, &ctx->tctx_list, ctx_node) {
2723         struct io_uring_task *tctx = node->task->io_uring;
2724 
2725         /*
2726          * io_wq will stay alive while we hold uring_lock, because it's
2727          * killed after ctx nodes, which requires to take the lock.
2728          */
2729         if (!tctx || !tctx->io_wq)
2730             continue;
2731         cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_ctx_cb, ctx, true);
2732         ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
2733     }
2734     mutex_unlock(&ctx->uring_lock);
2735 
2736     return ret;
2737 }
2738 
2739 static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
2740                         struct task_struct *task,
2741                         bool cancel_all)
2742 {
2743     struct io_task_cancel cancel = { .task = task, .all = cancel_all, };
2744     struct io_uring_task *tctx = task ? task->io_uring : NULL;
2745     enum io_wq_cancel cret;
2746     bool ret = false;
2747 
2748     /* failed during ring init, it couldn't have issued any requests */
2749     if (!ctx->rings)
2750         return false;
2751 
2752     if (!task) {
2753         ret |= io_uring_try_cancel_iowq(ctx);
2754     } else if (tctx && tctx->io_wq) {
2755         /*
2756          * Cancels requests of all rings, not only @ctx, but
2757          * it's fine as the task is in exit/exec.
2758          */
2759         cret = io_wq_cancel_cb(tctx->io_wq, io_cancel_task_cb,
2760                        &cancel, true);
2761         ret |= (cret != IO_WQ_CANCEL_NOTFOUND);
2762     }
2763 
2764     /* SQPOLL thread does its own polling */
2765     if ((!(ctx->flags & IORING_SETUP_SQPOLL) && cancel_all) ||
2766         (ctx->sq_data && ctx->sq_data->thread == current)) {
2767         while (!wq_list_empty(&ctx->iopoll_list)) {
2768             io_iopoll_try_reap_events(ctx);
2769             ret = true;
2770         }
2771     }
2772 
2773     ret |= io_cancel_defer_files(ctx, task, cancel_all);
2774     mutex_lock(&ctx->uring_lock);
2775     ret |= io_poll_remove_all(ctx, task, cancel_all);
2776     mutex_unlock(&ctx->uring_lock);
2777     ret |= io_kill_timeouts(ctx, task, cancel_all);
2778     if (task)
2779         ret |= io_run_task_work();
2780     return ret;
2781 }
2782 
2783 static s64 tctx_inflight(struct io_uring_task *tctx, bool tracked)
2784 {
2785     if (tracked)
2786         return atomic_read(&tctx->inflight_tracked);
2787     return percpu_counter_sum(&tctx->inflight);
2788 }
2789 
2790 /*
2791  * Find any io_uring ctx that this task has registered or done IO on, and cancel
2792  * requests. @sqd should be not-null IFF it's an SQPOLL thread cancellation.
2793  */
2794 __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
2795 {
2796     struct io_uring_task *tctx = current->io_uring;
2797     struct io_ring_ctx *ctx;
2798     s64 inflight;
2799     DEFINE_WAIT(wait);
2800 
2801     WARN_ON_ONCE(sqd && sqd->thread != current);
2802 
2803     if (!current->io_uring)
2804         return;
2805     if (tctx->io_wq)
2806         io_wq_exit_start(tctx->io_wq);
2807 
2808     atomic_inc(&tctx->in_idle);
2809     do {
2810         bool loop = false;
2811 
2812         io_uring_drop_tctx_refs(current);
2813         /* read completions before cancelations */
2814         inflight = tctx_inflight(tctx, !cancel_all);
2815         if (!inflight)
2816             break;
2817 
2818         if (!sqd) {
2819             struct io_tctx_node *node;
2820             unsigned long index;
2821 
2822             xa_for_each(&tctx->xa, index, node) {
2823                 /* sqpoll task will cancel all its requests */
2824                 if (node->ctx->sq_data)
2825                     continue;
2826                 loop |= io_uring_try_cancel_requests(node->ctx,
2827                             current, cancel_all);
2828             }
2829         } else {
2830             list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
2831                 loop |= io_uring_try_cancel_requests(ctx,
2832                                      current,
2833                                      cancel_all);
2834         }
2835 
2836         if (loop) {
2837             cond_resched();
2838             continue;
2839         }
2840 
2841         prepare_to_wait(&tctx->wait, &wait, TASK_INTERRUPTIBLE);
2842         io_run_task_work();
2843         io_uring_drop_tctx_refs(current);
2844 
2845         /*
2846          * If we've seen completions, retry without waiting. This
2847          * avoids a race where a completion comes in before we did
2848          * prepare_to_wait().
2849          */
2850         if (inflight == tctx_inflight(tctx, !cancel_all))
2851             schedule();
2852         finish_wait(&tctx->wait, &wait);
2853     } while (1);
2854 
2855     io_uring_clean_tctx(tctx);
2856     if (cancel_all) {
2857         /*
2858          * We shouldn't run task_works after cancel, so just leave
2859          * ->in_idle set for normal exit.
2860          */
2861         atomic_dec(&tctx->in_idle);
2862         /* for exec all current's requests should be gone, kill tctx */
2863         __io_uring_free(current);
2864     }
2865 }
2866 
2867 void __io_uring_cancel(bool cancel_all)
2868 {
2869     io_uring_cancel_generic(cancel_all, NULL);
2870 }
2871 
2872 static void *io_uring_validate_mmap_request(struct file *file,
2873                         loff_t pgoff, size_t sz)
2874 {
2875     struct io_ring_ctx *ctx = file->private_data;
2876     loff_t offset = pgoff << PAGE_SHIFT;
2877     struct page *page;
2878     void *ptr;
2879 
2880     switch (offset) {
2881     case IORING_OFF_SQ_RING:
2882     case IORING_OFF_CQ_RING:
2883         ptr = ctx->rings;
2884         break;
2885     case IORING_OFF_SQES:
2886         ptr = ctx->sq_sqes;
2887         break;
2888     default:
2889         return ERR_PTR(-EINVAL);
2890     }
2891 
2892     page = virt_to_head_page(ptr);
2893     if (sz > page_size(page))
2894         return ERR_PTR(-EINVAL);
2895 
2896     return ptr;
2897 }
2898 
2899 #ifdef CONFIG_MMU
2900 
2901 static __cold int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
2902 {
2903     size_t sz = vma->vm_end - vma->vm_start;
2904     unsigned long pfn;
2905     void *ptr;
2906 
2907     ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz);
2908     if (IS_ERR(ptr))
2909         return PTR_ERR(ptr);
2910 
2911     pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
2912     return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
2913 }
2914 
2915 #else /* !CONFIG_MMU */
2916 
2917 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
2918 {
2919     return vma->vm_flags & (VM_SHARED | VM_MAYSHARE) ? 0 : -EINVAL;
2920 }
2921 
2922 static unsigned int io_uring_nommu_mmap_capabilities(struct file *file)
2923 {
2924     return NOMMU_MAP_DIRECT | NOMMU_MAP_READ | NOMMU_MAP_WRITE;
2925 }
2926 
2927 static unsigned long io_uring_nommu_get_unmapped_area(struct file *file,
2928     unsigned long addr, unsigned long len,
2929     unsigned long pgoff, unsigned long flags)
2930 {
2931     void *ptr;
2932 
2933     ptr = io_uring_validate_mmap_request(file, pgoff, len);
2934     if (IS_ERR(ptr))
2935         return PTR_ERR(ptr);
2936 
2937     return (unsigned long) ptr;
2938 }
2939 
2940 #endif /* !CONFIG_MMU */
2941 
2942 static int io_validate_ext_arg(unsigned flags, const void __user *argp, size_t argsz)
2943 {
2944     if (flags & IORING_ENTER_EXT_ARG) {
2945         struct io_uring_getevents_arg arg;
2946 
2947         if (argsz != sizeof(arg))
2948             return -EINVAL;
2949         if (copy_from_user(&arg, argp, sizeof(arg)))
2950             return -EFAULT;
2951     }
2952     return 0;
2953 }
2954 
2955 static int io_get_ext_arg(unsigned flags, const void __user *argp, size_t *argsz,
2956               struct __kernel_timespec __user **ts,
2957               const sigset_t __user **sig)
2958 {
2959     struct io_uring_getevents_arg arg;
2960 
2961     /*
2962      * If EXT_ARG isn't set, then we have no timespec and the argp pointer
2963      * is just a pointer to the sigset_t.
2964      */
2965     if (!(flags & IORING_ENTER_EXT_ARG)) {
2966         *sig = (const sigset_t __user *) argp;
2967         *ts = NULL;
2968         return 0;
2969     }
2970 
2971     /*
2972      * EXT_ARG is set - ensure we agree on the size of it and copy in our
2973      * timespec and sigset_t pointers if good.
2974      */
2975     if (*argsz != sizeof(arg))
2976         return -EINVAL;
2977     if (copy_from_user(&arg, argp, sizeof(arg)))
2978         return -EFAULT;
2979     if (arg.pad)
2980         return -EINVAL;
2981     *sig = u64_to_user_ptr(arg.sigmask);
2982     *argsz = arg.sigmask_sz;
2983     *ts = u64_to_user_ptr(arg.ts);
2984     return 0;
2985 }
2986 
2987 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
2988         u32, min_complete, u32, flags, const void __user *, argp,
2989         size_t, argsz)
2990 {
2991     struct io_ring_ctx *ctx;
2992     struct fd f;
2993     long ret;
2994 
2995     io_run_task_work();
2996 
2997     if (unlikely(flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP |
2998                    IORING_ENTER_SQ_WAIT | IORING_ENTER_EXT_ARG |
2999                    IORING_ENTER_REGISTERED_RING)))
3000         return -EINVAL;
3001 
3002     /*
3003      * Ring fd has been registered via IORING_REGISTER_RING_FDS, we
3004      * need only dereference our task private array to find it.
3005      */
3006     if (flags & IORING_ENTER_REGISTERED_RING) {
3007         struct io_uring_task *tctx = current->io_uring;
3008 
3009         if (unlikely(!tctx || fd >= IO_RINGFD_REG_MAX))
3010             return -EINVAL;
3011         fd = array_index_nospec(fd, IO_RINGFD_REG_MAX);
3012         f.file = tctx->registered_rings[fd];
3013         f.flags = 0;
3014         if (unlikely(!f.file))
3015             return -EBADF;
3016     } else {
3017         f = fdget(fd);
3018         if (unlikely(!f.file))
3019             return -EBADF;
3020         ret = -EOPNOTSUPP;
3021         if (unlikely(!io_is_uring_fops(f.file)))
3022             goto out;
3023     }
3024 
3025     ctx = f.file->private_data;
3026     ret = -EBADFD;
3027     if (unlikely(ctx->flags & IORING_SETUP_R_DISABLED))
3028         goto out;
3029 
3030     /*
3031      * For SQ polling, the thread will do all submissions and completions.
3032      * Just return the requested submit count, and wake the thread if
3033      * we were asked to.
3034      */
3035     ret = 0;
3036     if (ctx->flags & IORING_SETUP_SQPOLL) {
3037         io_cqring_overflow_flush(ctx);
3038 
3039         if (unlikely(ctx->sq_data->thread == NULL)) {
3040             ret = -EOWNERDEAD;
3041             goto out;
3042         }
3043         if (flags & IORING_ENTER_SQ_WAKEUP)
3044             wake_up(&ctx->sq_data->wait);
3045         if (flags & IORING_ENTER_SQ_WAIT) {
3046             ret = io_sqpoll_wait_sq(ctx);
3047             if (ret)
3048                 goto out;
3049         }
3050         ret = to_submit;
3051     } else if (to_submit) {
3052         ret = io_uring_add_tctx_node(ctx);
3053         if (unlikely(ret))
3054             goto out;
3055 
3056         mutex_lock(&ctx->uring_lock);
3057         ret = io_submit_sqes(ctx, to_submit);
3058         if (ret != to_submit) {
3059             mutex_unlock(&ctx->uring_lock);
3060             goto out;
3061         }
3062         if ((flags & IORING_ENTER_GETEVENTS) && ctx->syscall_iopoll)
3063             goto iopoll_locked;
3064         mutex_unlock(&ctx->uring_lock);
3065     }
3066     if (flags & IORING_ENTER_GETEVENTS) {
3067         int ret2;
3068         if (ctx->syscall_iopoll) {
3069             /*
3070              * We disallow the app entering submit/complete with
3071              * polling, but we still need to lock the ring to
3072              * prevent racing with polled issue that got punted to
3073              * a workqueue.
3074              */
3075             mutex_lock(&ctx->uring_lock);
3076 iopoll_locked:
3077             ret2 = io_validate_ext_arg(flags, argp, argsz);
3078             if (likely(!ret2)) {
3079                 min_complete = min(min_complete,
3080                            ctx->cq_entries);
3081                 ret2 = io_iopoll_check(ctx, min_complete);
3082             }
3083             mutex_unlock(&ctx->uring_lock);
3084         } else {
3085             const sigset_t __user *sig;
3086             struct __kernel_timespec __user *ts;
3087 
3088             ret2 = io_get_ext_arg(flags, argp, &argsz, &ts, &sig);
3089             if (likely(!ret2)) {
3090                 min_complete = min(min_complete,
3091                            ctx->cq_entries);
3092                 ret2 = io_cqring_wait(ctx, min_complete, sig,
3093                               argsz, ts);
3094             }
3095         }
3096 
3097         if (!ret) {
3098             ret = ret2;
3099 
3100             /*
3101              * EBADR indicates that one or more CQE were dropped.
3102              * Once the user has been informed we can clear the bit
3103              * as they are obviously ok with those drops.
3104              */
3105             if (unlikely(ret2 == -EBADR))
3106                 clear_bit(IO_CHECK_CQ_DROPPED_BIT,
3107                       &ctx->check_cq);
3108         }
3109     }
3110 out:
3111     fdput(f);
3112     return ret;
3113 }
3114 
3115 static const struct file_operations io_uring_fops = {
3116     .release    = io_uring_release,
3117     .mmap       = io_uring_mmap,
3118 #ifndef CONFIG_MMU
3119     .get_unmapped_area = io_uring_nommu_get_unmapped_area,
3120     .mmap_capabilities = io_uring_nommu_mmap_capabilities,
3121 #endif
3122     .poll       = io_uring_poll,
3123 #ifdef CONFIG_PROC_FS
3124     .show_fdinfo    = io_uring_show_fdinfo,
3125 #endif
3126 };
3127 
3128 bool io_is_uring_fops(struct file *file)
3129 {
3130     return file->f_op == &io_uring_fops;
3131 }
3132 
3133 static __cold int io_allocate_scq_urings(struct io_ring_ctx *ctx,
3134                      struct io_uring_params *p)
3135 {
3136     struct io_rings *rings;
3137     size_t size, sq_array_offset;
3138 
3139     /* make sure these are sane, as we already accounted them */
3140     ctx->sq_entries = p->sq_entries;
3141     ctx->cq_entries = p->cq_entries;
3142 
3143     size = rings_size(ctx, p->sq_entries, p->cq_entries, &sq_array_offset);
3144     if (size == SIZE_MAX)
3145         return -EOVERFLOW;
3146 
3147     rings = io_mem_alloc(size);
3148     if (!rings)
3149         return -ENOMEM;
3150 
3151     ctx->rings = rings;
3152     ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
3153     rings->sq_ring_mask = p->sq_entries - 1;
3154     rings->cq_ring_mask = p->cq_entries - 1;
3155     rings->sq_ring_entries = p->sq_entries;
3156     rings->cq_ring_entries = p->cq_entries;
3157 
3158     if (p->flags & IORING_SETUP_SQE128)
3159         size = array_size(2 * sizeof(struct io_uring_sqe), p->sq_entries);
3160     else
3161         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
3162     if (size == SIZE_MAX) {
3163         io_mem_free(ctx->rings);
3164         ctx->rings = NULL;
3165         return -EOVERFLOW;
3166     }
3167 
3168     ctx->sq_sqes = io_mem_alloc(size);
3169     if (!ctx->sq_sqes) {
3170         io_mem_free(ctx->rings);
3171         ctx->rings = NULL;
3172         return -ENOMEM;
3173     }
3174 
3175     return 0;
3176 }
3177 
3178 static int io_uring_install_fd(struct io_ring_ctx *ctx, struct file *file)
3179 {
3180     int ret, fd;
3181 
3182     fd = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
3183     if (fd < 0)
3184         return fd;
3185 
3186     ret = __io_uring_add_tctx_node(ctx, false);
3187     if (ret) {
3188         put_unused_fd(fd);
3189         return ret;
3190     }
3191     fd_install(fd, file);
3192     return fd;
3193 }
3194 
3195 /*
3196  * Allocate an anonymous fd, this is what constitutes the application
3197  * visible backing of an io_uring instance. The application mmaps this
3198  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
3199  * we have to tie this fd to a socket for file garbage collection purposes.
3200  */
3201 static struct file *io_uring_get_file(struct io_ring_ctx *ctx)
3202 {
3203     struct file *file;
3204 #if defined(CONFIG_UNIX)
3205     int ret;
3206 
3207     ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
3208                 &ctx->ring_sock);
3209     if (ret)
3210         return ERR_PTR(ret);
3211 #endif
3212 
3213     file = anon_inode_getfile_secure("[io_uring]", &io_uring_fops, ctx,
3214                      O_RDWR | O_CLOEXEC, NULL);
3215 #if defined(CONFIG_UNIX)
3216     if (IS_ERR(file)) {
3217         sock_release(ctx->ring_sock);
3218         ctx->ring_sock = NULL;
3219     } else {
3220         ctx->ring_sock->file = file;
3221     }
3222 #endif
3223     return file;
3224 }
3225 
3226 static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
3227                   struct io_uring_params __user *params)
3228 {
3229     struct io_ring_ctx *ctx;
3230     struct file *file;
3231     int ret;
3232 
3233     if (!entries)
3234         return -EINVAL;
3235     if (entries > IORING_MAX_ENTRIES) {
3236         if (!(p->flags & IORING_SETUP_CLAMP))
3237             return -EINVAL;
3238         entries = IORING_MAX_ENTRIES;
3239     }
3240 
3241     /*
3242      * Use twice as many entries for the CQ ring. It's possible for the
3243      * application to drive a higher depth than the size of the SQ ring,
3244      * since the sqes are only used at submission time. This allows for
3245      * some flexibility in overcommitting a bit. If the application has
3246      * set IORING_SETUP_CQSIZE, it will have passed in the desired number
3247      * of CQ ring entries manually.
3248      */
3249     p->sq_entries = roundup_pow_of_two(entries);
3250     if (p->flags & IORING_SETUP_CQSIZE) {
3251         /*
3252          * If IORING_SETUP_CQSIZE is set, we do the same roundup
3253          * to a power-of-two, if it isn't already. We do NOT impose
3254          * any cq vs sq ring sizing.
3255          */
3256         if (!p->cq_entries)
3257             return -EINVAL;
3258         if (p->cq_entries > IORING_MAX_CQ_ENTRIES) {
3259             if (!(p->flags & IORING_SETUP_CLAMP))
3260                 return -EINVAL;
3261             p->cq_entries = IORING_MAX_CQ_ENTRIES;
3262         }
3263         p->cq_entries = roundup_pow_of_two(p->cq_entries);
3264         if (p->cq_entries < p->sq_entries)
3265             return -EINVAL;
3266     } else {
3267         p->cq_entries = 2 * p->sq_entries;
3268     }
3269 
3270     ctx = io_ring_ctx_alloc(p);
3271     if (!ctx)
3272         return -ENOMEM;
3273 
3274     /*
3275      * When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user
3276      * space applications don't need to do io completion events
3277      * polling again, they can rely on io_sq_thread to do polling
3278      * work, which can reduce cpu usage and uring_lock contention.
3279      */
3280     if (ctx->flags & IORING_SETUP_IOPOLL &&
3281         !(ctx->flags & IORING_SETUP_SQPOLL))
3282         ctx->syscall_iopoll = 1;
3283 
3284     ctx->compat = in_compat_syscall();
3285     if (!capable(CAP_IPC_LOCK))
3286         ctx->user = get_uid(current_user());
3287 
3288     /*
3289      * For SQPOLL, we just need a wakeup, always. For !SQPOLL, if
3290      * COOP_TASKRUN is set, then IPIs are never needed by the app.
3291      */
3292     ret = -EINVAL;
3293     if (ctx->flags & IORING_SETUP_SQPOLL) {
3294         /* IPI related flags don't make sense with SQPOLL */
3295         if (ctx->flags & (IORING_SETUP_COOP_TASKRUN |
3296                   IORING_SETUP_TASKRUN_FLAG))
3297             goto err;
3298         ctx->notify_method = TWA_SIGNAL_NO_IPI;
3299     } else if (ctx->flags & IORING_SETUP_COOP_TASKRUN) {
3300         ctx->notify_method = TWA_SIGNAL_NO_IPI;
3301     } else {
3302         if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
3303             goto err;
3304         ctx->notify_method = TWA_SIGNAL;
3305     }
3306 
3307     /*
3308      * This is just grabbed for accounting purposes. When a process exits,
3309      * the mm is exited and dropped before the files, hence we need to hang
3310      * on to this mm purely for the purposes of being able to unaccount
3311      * memory (locked/pinned vm). It's not used for anything else.
3312      */
3313     mmgrab(current->mm);
3314     ctx->mm_account = current->mm;
3315 
3316     ret = io_allocate_scq_urings(ctx, p);
3317     if (ret)
3318         goto err;
3319 
3320     ret = io_sq_offload_create(ctx, p);
3321     if (ret)
3322         goto err;
3323     /* always set a rsrc node */
3324     ret = io_rsrc_node_switch_start(ctx);
3325     if (ret)
3326         goto err;
3327     io_rsrc_node_switch(ctx, NULL);
3328 
3329     memset(&p->sq_off, 0, sizeof(p->sq_off));
3330     p->sq_off.head = offsetof(struct io_rings, sq.head);
3331     p->sq_off.tail = offsetof(struct io_rings, sq.tail);
3332     p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
3333     p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
3334     p->sq_off.flags = offsetof(struct io_rings, sq_flags);
3335     p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
3336     p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
3337 
3338     memset(&p->cq_off, 0, sizeof(p->cq_off));
3339     p->cq_off.head = offsetof(struct io_rings, cq.head);
3340     p->cq_off.tail = offsetof(struct io_rings, cq.tail);
3341     p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
3342     p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
3343     p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
3344     p->cq_off.cqes = offsetof(struct io_rings, cqes);
3345     p->cq_off.flags = offsetof(struct io_rings, cq_flags);
3346 
3347     p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
3348             IORING_FEAT_SUBMIT_STABLE | IORING_FEAT_RW_CUR_POS |
3349             IORING_FEAT_CUR_PERSONALITY | IORING_FEAT_FAST_POLL |
3350             IORING_FEAT_POLL_32BITS | IORING_FEAT_SQPOLL_NONFIXED |
3351             IORING_FEAT_EXT_ARG | IORING_FEAT_NATIVE_WORKERS |
3352             IORING_FEAT_RSRC_TAGS | IORING_FEAT_CQE_SKIP |
3353             IORING_FEAT_LINKED_FILE;
3354 
3355     if (copy_to_user(params, p, sizeof(*p))) {
3356         ret = -EFAULT;
3357         goto err;
3358     }
3359 
3360     if (ctx->flags & IORING_SETUP_SINGLE_ISSUER
3361         && !(ctx->flags & IORING_SETUP_R_DISABLED))
3362         ctx->submitter_task = get_task_struct(current);
3363 
3364     file = io_uring_get_file(ctx);
3365     if (IS_ERR(file)) {
3366         ret = PTR_ERR(file);
3367         goto err;
3368     }
3369 
3370     /*
3371      * Install ring fd as the very last thing, so we don't risk someone
3372      * having closed it before we finish setup
3373      */
3374     ret = io_uring_install_fd(ctx, file);
3375     if (ret < 0) {
3376         /* fput will clean it up */
3377         fput(file);
3378         return ret;
3379     }
3380 
3381     trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
3382     return ret;
3383 err:
3384     io_ring_ctx_wait_and_kill(ctx);
3385     return ret;
3386 }
3387 
3388 /*
3389  * Sets up an aio uring context, and returns the fd. Applications asks for a
3390  * ring size, we return the actual sq/cq ring sizes (among other things) in the
3391  * params structure passed in.
3392  */
3393 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
3394 {
3395     struct io_uring_params p;
3396     int i;
3397 
3398     if (copy_from_user(&p, params, sizeof(p)))
3399         return -EFAULT;
3400     for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
3401         if (p.resv[i])
3402             return -EINVAL;
3403     }
3404 
3405     if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
3406             IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE |
3407             IORING_SETUP_CLAMP | IORING_SETUP_ATTACH_WQ |
3408             IORING_SETUP_R_DISABLED | IORING_SETUP_SUBMIT_ALL |
3409             IORING_SETUP_COOP_TASKRUN | IORING_SETUP_TASKRUN_FLAG |
3410             IORING_SETUP_SQE128 | IORING_SETUP_CQE32 |
3411             IORING_SETUP_SINGLE_ISSUER))
3412         return -EINVAL;
3413 
3414     return io_uring_create(entries, &p, params);
3415 }
3416 
3417 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
3418         struct io_uring_params __user *, params)
3419 {
3420     return io_uring_setup(entries, params);
3421 }
3422 
3423 static __cold int io_probe(struct io_ring_ctx *ctx, void __user *arg,
3424                unsigned nr_args)
3425 {
3426     struct io_uring_probe *p;
3427     size_t size;
3428     int i, ret;
3429 
3430     size = struct_size(p, ops, nr_args);
3431     if (size == SIZE_MAX)
3432         return -EOVERFLOW;
3433     p = kzalloc(size, GFP_KERNEL);
3434     if (!p)
3435         return -ENOMEM;
3436 
3437     ret = -EFAULT;
3438     if (copy_from_user(p, arg, size))
3439         goto out;
3440     ret = -EINVAL;
3441     if (memchr_inv(p, 0, size))
3442         goto out;
3443 
3444     p->last_op = IORING_OP_LAST - 1;
3445     if (nr_args > IORING_OP_LAST)
3446         nr_args = IORING_OP_LAST;
3447 
3448     for (i = 0; i < nr_args; i++) {
3449         p->ops[i].op = i;
3450         if (!io_op_defs[i].not_supported)
3451             p->ops[i].flags = IO_URING_OP_SUPPORTED;
3452     }
3453     p->ops_len = i;
3454 
3455     ret = 0;
3456     if (copy_to_user(arg, p, size))
3457         ret = -EFAULT;
3458 out:
3459     kfree(p);
3460     return ret;
3461 }
3462 
3463 static int io_register_personality(struct io_ring_ctx *ctx)
3464 {
3465     const struct cred *creds;
3466     u32 id;
3467     int ret;
3468 
3469     creds = get_current_cred();
3470 
3471     ret = xa_alloc_cyclic(&ctx->personalities, &id, (void *)creds,
3472             XA_LIMIT(0, USHRT_MAX), &ctx->pers_next, GFP_KERNEL);
3473     if (ret < 0) {
3474         put_cred(creds);
3475         return ret;
3476     }
3477     return id;
3478 }
3479 
3480 static __cold int io_register_restrictions(struct io_ring_ctx *ctx,
3481                        void __user *arg, unsigned int nr_args)
3482 {
3483     struct io_uring_restriction *res;
3484     size_t size;
3485     int i, ret;
3486 
3487     /* Restrictions allowed only if rings started disabled */
3488     if (!(ctx->flags & IORING_SETUP_R_DISABLED))
3489         return -EBADFD;
3490 
3491     /* We allow only a single restrictions registration */
3492     if (ctx->restrictions.registered)
3493         return -EBUSY;
3494 
3495     if (!arg || nr_args > IORING_MAX_RESTRICTIONS)
3496         return -EINVAL;
3497 
3498     size = array_size(nr_args, sizeof(*res));
3499     if (size == SIZE_MAX)
3500         return -EOVERFLOW;
3501 
3502     res = memdup_user(arg, size);
3503     if (IS_ERR(res))
3504         return PTR_ERR(res);
3505 
3506     ret = 0;
3507 
3508     for (i = 0; i < nr_args; i++) {
3509         switch (res[i].opcode) {
3510         case IORING_RESTRICTION_REGISTER_OP:
3511             if (res[i].register_op >= IORING_REGISTER_LAST) {
3512                 ret = -EINVAL;
3513                 goto out;
3514             }
3515 
3516             __set_bit(res[i].register_op,
3517                   ctx->restrictions.register_op);
3518             break;
3519         case IORING_RESTRICTION_SQE_OP:
3520             if (res[i].sqe_op >= IORING_OP_LAST) {
3521                 ret = -EINVAL;
3522                 goto out;
3523             }
3524 
3525             __set_bit(res[i].sqe_op, ctx->restrictions.sqe_op);
3526             break;
3527         case IORING_RESTRICTION_SQE_FLAGS_ALLOWED:
3528             ctx->restrictions.sqe_flags_allowed = res[i].sqe_flags;
3529             break;
3530         case IORING_RESTRICTION_SQE_FLAGS_REQUIRED:
3531             ctx->restrictions.sqe_flags_required = res[i].sqe_flags;
3532             break;
3533         default:
3534             ret = -EINVAL;
3535             goto out;
3536         }
3537     }
3538 
3539 out:
3540     /* Reset all restrictions if an error happened */
3541     if (ret != 0)
3542         memset(&ctx->restrictions, 0, sizeof(ctx->restrictions));
3543     else
3544         ctx->restrictions.registered = true;
3545 
3546     kfree(res);
3547     return ret;
3548 }
3549 
3550 static int io_register_enable_rings(struct io_ring_ctx *ctx)
3551 {
3552     if (!(ctx->flags & IORING_SETUP_R_DISABLED))
3553         return -EBADFD;
3554 
3555     if (ctx->flags & IORING_SETUP_SINGLE_ISSUER && !ctx->submitter_task)
3556         ctx->submitter_task = get_task_struct(current);
3557 
3558     if (ctx->restrictions.registered)
3559         ctx->restricted = 1;
3560 
3561     ctx->flags &= ~IORING_SETUP_R_DISABLED;
3562     if (ctx->sq_data && wq_has_sleeper(&ctx->sq_data->wait))
3563         wake_up(&ctx->sq_data->wait);
3564     return 0;
3565 }
3566 
3567 static __cold int io_register_iowq_aff(struct io_ring_ctx *ctx,
3568                        void __user *arg, unsigned len)
3569 {
3570     struct io_uring_task *tctx = current->io_uring;
3571     cpumask_var_t new_mask;
3572     int ret;
3573 
3574     if (!tctx || !tctx->io_wq)
3575         return -EINVAL;
3576 
3577     if (!alloc_cpumask_var(&new_mask, GFP_KERNEL))
3578         return -ENOMEM;
3579 
3580     cpumask_clear(new_mask);
3581     if (len > cpumask_size())
3582         len = cpumask_size();
3583 
3584     if (in_compat_syscall()) {
3585         ret = compat_get_bitmap(cpumask_bits(new_mask),
3586                     (const compat_ulong_t __user *)arg,
3587                     len * 8 /* CHAR_BIT */);
3588     } else {
3589         ret = copy_from_user(new_mask, arg, len);
3590     }
3591 
3592     if (ret) {
3593         free_cpumask_var(new_mask);
3594         return -EFAULT;
3595     }
3596 
3597     ret = io_wq_cpu_affinity(tctx->io_wq, new_mask);
3598     free_cpumask_var(new_mask);
3599     return ret;
3600 }
3601 
3602 static __cold int io_unregister_iowq_aff(struct io_ring_ctx *ctx)
3603 {
3604     struct io_uring_task *tctx = current->io_uring;
3605 
3606     if (!tctx || !tctx->io_wq)
3607         return -EINVAL;
3608 
3609     return io_wq_cpu_affinity(tctx->io_wq, NULL);
3610 }
3611 
3612 static __cold int io_register_iowq_max_workers(struct io_ring_ctx *ctx,
3613                            void __user *arg)
3614     __must_hold(&ctx->uring_lock)
3615 {
3616     struct io_tctx_node *node;
3617     struct io_uring_task *tctx = NULL;
3618     struct io_sq_data *sqd = NULL;
3619     __u32 new_count[2];
3620     int i, ret;
3621 
3622     if (copy_from_user(new_count, arg, sizeof(new_count)))
3623         return -EFAULT;
3624     for (i = 0; i < ARRAY_SIZE(new_count); i++)
3625         if (new_count[i] > INT_MAX)
3626             return -EINVAL;
3627 
3628     if (ctx->flags & IORING_SETUP_SQPOLL) {
3629         sqd = ctx->sq_data;
3630         if (sqd) {
3631             /*
3632              * Observe the correct sqd->lock -> ctx->uring_lock
3633              * ordering. Fine to drop uring_lock here, we hold
3634              * a ref to the ctx.
3635              */
3636             refcount_inc(&sqd->refs);
3637             mutex_unlock(&ctx->uring_lock);
3638             mutex_lock(&sqd->lock);
3639             mutex_lock(&ctx->uring_lock);
3640             if (sqd->thread)
3641                 tctx = sqd->thread->io_uring;
3642         }
3643     } else {
3644         tctx = current->io_uring;
3645     }
3646 
3647     BUILD_BUG_ON(sizeof(new_count) != sizeof(ctx->iowq_limits));
3648 
3649     for (i = 0; i < ARRAY_SIZE(new_count); i++)
3650         if (new_count[i])
3651             ctx->iowq_limits[i] = new_count[i];
3652     ctx->iowq_limits_set = true;
3653 
3654     if (tctx && tctx->io_wq) {
3655         ret = io_wq_max_workers(tctx->io_wq, new_count);
3656         if (ret)
3657             goto err;
3658     } else {
3659         memset(new_count, 0, sizeof(new_count));
3660     }
3661 
3662     if (sqd) {
3663         mutex_unlock(&sqd->lock);
3664         io_put_sq_data(sqd);
3665     }
3666 
3667     if (copy_to_user(arg, new_count, sizeof(new_count)))
3668         return -EFAULT;
3669 
3670     /* that's it for SQPOLL, only the SQPOLL task creates requests */
3671     if (sqd)
3672         return 0;
3673 
3674     /* now propagate the restriction to all registered users */
3675     list_for_each_entry(node, &ctx->tctx_list, ctx_node) {
3676         struct io_uring_task *tctx = node->task->io_uring;
3677 
3678         if (WARN_ON_ONCE(!tctx->io_wq))
3679             continue;
3680 
3681         for (i = 0; i < ARRAY_SIZE(new_count); i++)
3682             new_count[i] = ctx->iowq_limits[i];
3683         /* ignore errors, it always returns zero anyway */
3684         (void)io_wq_max_workers(tctx->io_wq, new_count);
3685     }
3686     return 0;
3687 err:
3688     if (sqd) {
3689         mutex_unlock(&sqd->lock);
3690         io_put_sq_data(sqd);
3691     }
3692     return ret;
3693 }
3694 
3695 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
3696                    void __user *arg, unsigned nr_args)
3697     __releases(ctx->uring_lock)
3698     __acquires(ctx->uring_lock)
3699 {
3700     int ret;
3701 
3702     /*
3703      * We don't quiesce the refs for register anymore and so it can't be
3704      * dying as we're holding a file ref here.
3705      */
3706     if (WARN_ON_ONCE(percpu_ref_is_dying(&ctx->refs)))
3707         return -ENXIO;
3708 
3709     if (ctx->restricted) {
3710         if (opcode >= IORING_REGISTER_LAST)
3711             return -EINVAL;
3712         opcode = array_index_nospec(opcode, IORING_REGISTER_LAST);
3713         if (!test_bit(opcode, ctx->restrictions.register_op))
3714             return -EACCES;
3715     }
3716 
3717     switch (opcode) {
3718     case IORING_REGISTER_BUFFERS:
3719         ret = -EFAULT;
3720         if (!arg)
3721             break;
3722         ret = io_sqe_buffers_register(ctx, arg, nr_args, NULL);
3723         break;
3724     case IORING_UNREGISTER_BUFFERS:
3725         ret = -EINVAL;
3726         if (arg || nr_args)
3727             break;
3728         ret = io_sqe_buffers_unregister(ctx);
3729         break;
3730     case IORING_REGISTER_FILES:
3731         ret = -EFAULT;
3732         if (!arg)
3733             break;
3734         ret = io_sqe_files_register(ctx, arg, nr_args, NULL);
3735         break;
3736     case IORING_UNREGISTER_FILES:
3737         ret = -EINVAL;
3738         if (arg || nr_args)
3739             break;
3740         ret = io_sqe_files_unregister(ctx);
3741         break;
3742     case IORING_REGISTER_FILES_UPDATE:
3743         ret = io_register_files_update(ctx, arg, nr_args);
3744         break;
3745     case IORING_REGISTER_EVENTFD:
3746         ret = -EINVAL;
3747         if (nr_args != 1)
3748             break;
3749         ret = io_eventfd_register(ctx, arg, 0);
3750         break;
3751     case IORING_REGISTER_EVENTFD_ASYNC:
3752         ret = -EINVAL;
3753         if (nr_args != 1)
3754             break;
3755         ret = io_eventfd_register(ctx, arg, 1);
3756         break;
3757     case IORING_UNREGISTER_EVENTFD:
3758         ret = -EINVAL;
3759         if (arg || nr_args)
3760             break;
3761         ret = io_eventfd_unregister(ctx);
3762         break;
3763     case IORING_REGISTER_PROBE:
3764         ret = -EINVAL;
3765         if (!arg || nr_args > 256)
3766             break;
3767         ret = io_probe(ctx, arg, nr_args);
3768         break;
3769     case IORING_REGISTER_PERSONALITY:
3770         ret = -EINVAL;
3771         if (arg || nr_args)
3772             break;
3773         ret = io_register_personality(ctx);
3774         break;
3775     case IORING_UNREGISTER_PERSONALITY:
3776         ret = -EINVAL;
3777         if (arg)
3778             break;
3779         ret = io_unregister_personality(ctx, nr_args);
3780         break;
3781     case IORING_REGISTER_ENABLE_RINGS:
3782         ret = -EINVAL;
3783         if (arg || nr_args)
3784             break;
3785         ret = io_register_enable_rings(ctx);
3786         break;
3787     case IORING_REGISTER_RESTRICTIONS:
3788         ret = io_register_restrictions(ctx, arg, nr_args);
3789         break;
3790     case IORING_REGISTER_FILES2:
3791         ret = io_register_rsrc(ctx, arg, nr_args, IORING_RSRC_FILE);
3792         break;
3793     case IORING_REGISTER_FILES_UPDATE2:
3794         ret = io_register_rsrc_update(ctx, arg, nr_args,
3795                           IORING_RSRC_FILE);
3796         break;
3797     case IORING_REGISTER_BUFFERS2:
3798         ret = io_register_rsrc(ctx, arg, nr_args, IORING_RSRC_BUFFER);
3799         break;
3800     case IORING_REGISTER_BUFFERS_UPDATE:
3801         ret = io_register_rsrc_update(ctx, arg, nr_args,
3802                           IORING_RSRC_BUFFER);
3803         break;
3804     case IORING_REGISTER_IOWQ_AFF:
3805         ret = -EINVAL;
3806         if (!arg || !nr_args)
3807             break;
3808         ret = io_register_iowq_aff(ctx, arg, nr_args);
3809         break;
3810     case IORING_UNREGISTER_IOWQ_AFF:
3811         ret = -EINVAL;
3812         if (arg || nr_args)
3813             break;
3814         ret = io_unregister_iowq_aff(ctx);
3815         break;
3816     case IORING_REGISTER_IOWQ_MAX_WORKERS:
3817         ret = -EINVAL;
3818         if (!arg || nr_args != 2)
3819             break;
3820         ret = io_register_iowq_max_workers(ctx, arg);
3821         break;
3822     case IORING_REGISTER_RING_FDS:
3823         ret = io_ringfd_register(ctx, arg, nr_args);
3824         break;
3825     case IORING_UNREGISTER_RING_FDS:
3826         ret = io_ringfd_unregister(ctx, arg, nr_args);
3827         break;
3828     case IORING_REGISTER_PBUF_RING:
3829         ret = -EINVAL;
3830         if (!arg || nr_args != 1)
3831             break;
3832         ret = io_register_pbuf_ring(ctx, arg);
3833         break;
3834     case IORING_UNREGISTER_PBUF_RING:
3835         ret = -EINVAL;
3836         if (!arg || nr_args != 1)
3837             break;
3838         ret = io_unregister_pbuf_ring(ctx, arg);
3839         break;
3840     case IORING_REGISTER_SYNC_CANCEL:
3841         ret = -EINVAL;
3842         if (!arg || nr_args != 1)
3843             break;
3844         ret = io_sync_cancel(ctx, arg);
3845         break;
3846     case IORING_REGISTER_FILE_ALLOC_RANGE:
3847         ret = -EINVAL;
3848         if (!arg || nr_args)
3849             break;
3850         ret = io_register_file_alloc_range(ctx, arg);
3851         break;
3852     default:
3853         ret = -EINVAL;
3854         break;
3855     }
3856 
3857     return ret;
3858 }
3859 
3860 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
3861         void __user *, arg, unsigned int, nr_args)
3862 {
3863     struct io_ring_ctx *ctx;
3864     long ret = -EBADF;
3865     struct fd f;
3866 
3867     f = fdget(fd);
3868     if (!f.file)
3869         return -EBADF;
3870 
3871     ret = -EOPNOTSUPP;
3872     if (!io_is_uring_fops(f.file))
3873         goto out_fput;
3874 
3875     ctx = f.file->private_data;
3876 
3877     io_run_task_work();
3878 
3879     mutex_lock(&ctx->uring_lock);
3880     ret = __io_uring_register(ctx, opcode, arg, nr_args);
3881     mutex_unlock(&ctx->uring_lock);
3882     trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs, ret);
3883 out_fput:
3884     fdput(f);
3885     return ret;
3886 }
3887 
3888 static int __init io_uring_init(void)
3889 {
3890 #define __BUILD_BUG_VERIFY_OFFSET_SIZE(stype, eoffset, esize, ename) do { \
3891     BUILD_BUG_ON(offsetof(stype, ename) != eoffset); \
3892     BUILD_BUG_ON(sizeof_field(stype, ename) != esize); \
3893 } while (0)
3894 
3895 #define BUILD_BUG_SQE_ELEM(eoffset, etype, ename) \
3896     __BUILD_BUG_VERIFY_OFFSET_SIZE(struct io_uring_sqe, eoffset, sizeof(etype), ename)
3897 #define BUILD_BUG_SQE_ELEM_SIZE(eoffset, esize, ename) \
3898     __BUILD_BUG_VERIFY_OFFSET_SIZE(struct io_uring_sqe, eoffset, esize, ename)
3899     BUILD_BUG_ON(sizeof(struct io_uring_sqe) != 64);
3900     BUILD_BUG_SQE_ELEM(0,  __u8,   opcode);
3901     BUILD_BUG_SQE_ELEM(1,  __u8,   flags);
3902     BUILD_BUG_SQE_ELEM(2,  __u16,  ioprio);
3903     BUILD_BUG_SQE_ELEM(4,  __s32,  fd);
3904     BUILD_BUG_SQE_ELEM(8,  __u64,  off);
3905     BUILD_BUG_SQE_ELEM(8,  __u64,  addr2);
3906     BUILD_BUG_SQE_ELEM(8,  __u32,  cmd_op);
3907     BUILD_BUG_SQE_ELEM(12, __u32, __pad1);
3908     BUILD_BUG_SQE_ELEM(16, __u64,  addr);
3909     BUILD_BUG_SQE_ELEM(16, __u64,  splice_off_in);
3910     BUILD_BUG_SQE_ELEM(24, __u32,  len);
3911     BUILD_BUG_SQE_ELEM(28,     __kernel_rwf_t, rw_flags);
3912     BUILD_BUG_SQE_ELEM(28, /* compat */   int, rw_flags);
3913     BUILD_BUG_SQE_ELEM(28, /* compat */ __u32, rw_flags);
3914     BUILD_BUG_SQE_ELEM(28, __u32,  fsync_flags);
3915     BUILD_BUG_SQE_ELEM(28, /* compat */ __u16,  poll_events);
3916     BUILD_BUG_SQE_ELEM(28, __u32,  poll32_events);
3917     BUILD_BUG_SQE_ELEM(28, __u32,  sync_range_flags);
3918     BUILD_BUG_SQE_ELEM(28, __u32,  msg_flags);
3919     BUILD_BUG_SQE_ELEM(28, __u32,  timeout_flags);
3920     BUILD_BUG_SQE_ELEM(28, __u32,  accept_flags);
3921     BUILD_BUG_SQE_ELEM(28, __u32,  cancel_flags);
3922     BUILD_BUG_SQE_ELEM(28, __u32,  open_flags);
3923     BUILD_BUG_SQE_ELEM(28, __u32,  statx_flags);
3924     BUILD_BUG_SQE_ELEM(28, __u32,  fadvise_advice);
3925     BUILD_BUG_SQE_ELEM(28, __u32,  splice_flags);
3926     BUILD_BUG_SQE_ELEM(28, __u32,  rename_flags);
3927     BUILD_BUG_SQE_ELEM(28, __u32,  unlink_flags);
3928     BUILD_BUG_SQE_ELEM(28, __u32,  hardlink_flags);
3929     BUILD_BUG_SQE_ELEM(28, __u32,  xattr_flags);
3930     BUILD_BUG_SQE_ELEM(28, __u32,  msg_ring_flags);
3931     BUILD_BUG_SQE_ELEM(32, __u64,  user_data);
3932     BUILD_BUG_SQE_ELEM(40, __u16,  buf_index);
3933     BUILD_BUG_SQE_ELEM(40, __u16,  buf_group);
3934     BUILD_BUG_SQE_ELEM(42, __u16,  personality);
3935     BUILD_BUG_SQE_ELEM(44, __s32,  splice_fd_in);
3936     BUILD_BUG_SQE_ELEM(44, __u32,  file_index);
3937     BUILD_BUG_SQE_ELEM(44, __u16,  addr_len);
3938     BUILD_BUG_SQE_ELEM(46, __u16,  __pad3[0]);
3939     BUILD_BUG_SQE_ELEM(48, __u64,  addr3);
3940     BUILD_BUG_SQE_ELEM_SIZE(48, 0, cmd);
3941     BUILD_BUG_SQE_ELEM(56, __u64,  __pad2);
3942 
3943     BUILD_BUG_ON(sizeof(struct io_uring_files_update) !=
3944              sizeof(struct io_uring_rsrc_update));
3945     BUILD_BUG_ON(sizeof(struct io_uring_rsrc_update) >
3946              sizeof(struct io_uring_rsrc_update2));
3947 
3948     /* ->buf_index is u16 */
3949     BUILD_BUG_ON(offsetof(struct io_uring_buf_ring, bufs) != 0);
3950     BUILD_BUG_ON(offsetof(struct io_uring_buf, resv) !=
3951              offsetof(struct io_uring_buf_ring, tail));
3952 
3953     /* should fit into one byte */
3954     BUILD_BUG_ON(SQE_VALID_FLAGS >= (1 << 8));
3955     BUILD_BUG_ON(SQE_COMMON_FLAGS >= (1 << 8));
3956     BUILD_BUG_ON((SQE_VALID_FLAGS | SQE_COMMON_FLAGS) != SQE_VALID_FLAGS);
3957 
3958     BUILD_BUG_ON(__REQ_F_LAST_BIT > 8 * sizeof(int));
3959 
3960     BUILD_BUG_ON(sizeof(atomic_t) != sizeof(u32));
3961 
3962     io_uring_optable_init();
3963 
3964     req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC |
3965                 SLAB_ACCOUNT);
3966     return 0;
3967 };
3968 __initcall(io_uring_init);