Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-only
0002 /*
0003  * linux/net/sunrpc/svc.c
0004  *
0005  * High-level RPC service routines
0006  *
0007  * Copyright (C) 1995, 1996 Olaf Kirch <okir@monad.swb.de>
0008  *
0009  * Multiple threads pools and NUMAisation
0010  * Copyright (c) 2006 Silicon Graphics, Inc.
0011  * by Greg Banks <gnb@melbourne.sgi.com>
0012  */
0013 
0014 #include <linux/linkage.h>
0015 #include <linux/sched/signal.h>
0016 #include <linux/errno.h>
0017 #include <linux/net.h>
0018 #include <linux/in.h>
0019 #include <linux/mm.h>
0020 #include <linux/interrupt.h>
0021 #include <linux/module.h>
0022 #include <linux/kthread.h>
0023 #include <linux/slab.h>
0024 
0025 #include <linux/sunrpc/types.h>
0026 #include <linux/sunrpc/xdr.h>
0027 #include <linux/sunrpc/stats.h>
0028 #include <linux/sunrpc/svcsock.h>
0029 #include <linux/sunrpc/clnt.h>
0030 #include <linux/sunrpc/bc_xprt.h>
0031 
0032 #include <trace/events/sunrpc.h>
0033 
0034 #include "fail.h"
0035 
0036 #define RPCDBG_FACILITY RPCDBG_SVCDSP
0037 
0038 static void svc_unregister(const struct svc_serv *serv, struct net *net);
0039 
0040 #define SVC_POOL_DEFAULT    SVC_POOL_GLOBAL
0041 
0042 /*
0043  * Mode for mapping cpus to pools.
0044  */
0045 enum {
0046     SVC_POOL_AUTO = -1, /* choose one of the others */
0047     SVC_POOL_GLOBAL,    /* no mapping, just a single global pool
0048                  * (legacy & UP mode) */
0049     SVC_POOL_PERCPU,    /* one pool per cpu */
0050     SVC_POOL_PERNODE    /* one pool per numa node */
0051 };
0052 
0053 /*
0054  * Structure for mapping cpus to pools and vice versa.
0055  * Setup once during sunrpc initialisation.
0056  */
0057 
0058 struct svc_pool_map {
0059     int count;          /* How many svc_servs use us */
0060     int mode;           /* Note: int not enum to avoid
0061                      * warnings about "enumeration value
0062                      * not handled in switch" */
0063     unsigned int npools;
0064     unsigned int *pool_to;      /* maps pool id to cpu or node */
0065     unsigned int *to_pool;      /* maps cpu or node to pool id */
0066 };
0067 
0068 static struct svc_pool_map svc_pool_map = {
0069     .mode = SVC_POOL_DEFAULT
0070 };
0071 
0072 static DEFINE_MUTEX(svc_pool_map_mutex);/* protects svc_pool_map.count only */
0073 
0074 static int
0075 param_set_pool_mode(const char *val, const struct kernel_param *kp)
0076 {
0077     int *ip = (int *)kp->arg;
0078     struct svc_pool_map *m = &svc_pool_map;
0079     int err;
0080 
0081     mutex_lock(&svc_pool_map_mutex);
0082 
0083     err = -EBUSY;
0084     if (m->count)
0085         goto out;
0086 
0087     err = 0;
0088     if (!strncmp(val, "auto", 4))
0089         *ip = SVC_POOL_AUTO;
0090     else if (!strncmp(val, "global", 6))
0091         *ip = SVC_POOL_GLOBAL;
0092     else if (!strncmp(val, "percpu", 6))
0093         *ip = SVC_POOL_PERCPU;
0094     else if (!strncmp(val, "pernode", 7))
0095         *ip = SVC_POOL_PERNODE;
0096     else
0097         err = -EINVAL;
0098 
0099 out:
0100     mutex_unlock(&svc_pool_map_mutex);
0101     return err;
0102 }
0103 
0104 static int
0105 param_get_pool_mode(char *buf, const struct kernel_param *kp)
0106 {
0107     int *ip = (int *)kp->arg;
0108 
0109     switch (*ip)
0110     {
0111     case SVC_POOL_AUTO:
0112         return strlcpy(buf, "auto\n", 20);
0113     case SVC_POOL_GLOBAL:
0114         return strlcpy(buf, "global\n", 20);
0115     case SVC_POOL_PERCPU:
0116         return strlcpy(buf, "percpu\n", 20);
0117     case SVC_POOL_PERNODE:
0118         return strlcpy(buf, "pernode\n", 20);
0119     default:
0120         return sprintf(buf, "%d\n", *ip);
0121     }
0122 }
0123 
0124 module_param_call(pool_mode, param_set_pool_mode, param_get_pool_mode,
0125          &svc_pool_map.mode, 0644);
0126 
0127 /*
0128  * Detect best pool mapping mode heuristically,
0129  * according to the machine's topology.
0130  */
0131 static int
0132 svc_pool_map_choose_mode(void)
0133 {
0134     unsigned int node;
0135 
0136     if (nr_online_nodes > 1) {
0137         /*
0138          * Actually have multiple NUMA nodes,
0139          * so split pools on NUMA node boundaries
0140          */
0141         return SVC_POOL_PERNODE;
0142     }
0143 
0144     node = first_online_node;
0145     if (nr_cpus_node(node) > 2) {
0146         /*
0147          * Non-trivial SMP, or CONFIG_NUMA on
0148          * non-NUMA hardware, e.g. with a generic
0149          * x86_64 kernel on Xeons.  In this case we
0150          * want to divide the pools on cpu boundaries.
0151          */
0152         return SVC_POOL_PERCPU;
0153     }
0154 
0155     /* default: one global pool */
0156     return SVC_POOL_GLOBAL;
0157 }
0158 
0159 /*
0160  * Allocate the to_pool[] and pool_to[] arrays.
0161  * Returns 0 on success or an errno.
0162  */
0163 static int
0164 svc_pool_map_alloc_arrays(struct svc_pool_map *m, unsigned int maxpools)
0165 {
0166     m->to_pool = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
0167     if (!m->to_pool)
0168         goto fail;
0169     m->pool_to = kcalloc(maxpools, sizeof(unsigned int), GFP_KERNEL);
0170     if (!m->pool_to)
0171         goto fail_free;
0172 
0173     return 0;
0174 
0175 fail_free:
0176     kfree(m->to_pool);
0177     m->to_pool = NULL;
0178 fail:
0179     return -ENOMEM;
0180 }
0181 
0182 /*
0183  * Initialise the pool map for SVC_POOL_PERCPU mode.
0184  * Returns number of pools or <0 on error.
0185  */
0186 static int
0187 svc_pool_map_init_percpu(struct svc_pool_map *m)
0188 {
0189     unsigned int maxpools = nr_cpu_ids;
0190     unsigned int pidx = 0;
0191     unsigned int cpu;
0192     int err;
0193 
0194     err = svc_pool_map_alloc_arrays(m, maxpools);
0195     if (err)
0196         return err;
0197 
0198     for_each_online_cpu(cpu) {
0199         BUG_ON(pidx >= maxpools);
0200         m->to_pool[cpu] = pidx;
0201         m->pool_to[pidx] = cpu;
0202         pidx++;
0203     }
0204     /* cpus brought online later all get mapped to pool0, sorry */
0205 
0206     return pidx;
0207 };
0208 
0209 
0210 /*
0211  * Initialise the pool map for SVC_POOL_PERNODE mode.
0212  * Returns number of pools or <0 on error.
0213  */
0214 static int
0215 svc_pool_map_init_pernode(struct svc_pool_map *m)
0216 {
0217     unsigned int maxpools = nr_node_ids;
0218     unsigned int pidx = 0;
0219     unsigned int node;
0220     int err;
0221 
0222     err = svc_pool_map_alloc_arrays(m, maxpools);
0223     if (err)
0224         return err;
0225 
0226     for_each_node_with_cpus(node) {
0227         /* some architectures (e.g. SN2) have cpuless nodes */
0228         BUG_ON(pidx > maxpools);
0229         m->to_pool[node] = pidx;
0230         m->pool_to[pidx] = node;
0231         pidx++;
0232     }
0233     /* nodes brought online later all get mapped to pool0, sorry */
0234 
0235     return pidx;
0236 }
0237 
0238 
0239 /*
0240  * Add a reference to the global map of cpus to pools (and
0241  * vice versa) if pools are in use.
0242  * Initialise the map if we're the first user.
0243  * Returns the number of pools. If this is '1', no reference
0244  * was taken.
0245  */
0246 static unsigned int
0247 svc_pool_map_get(void)
0248 {
0249     struct svc_pool_map *m = &svc_pool_map;
0250     int npools = -1;
0251 
0252     mutex_lock(&svc_pool_map_mutex);
0253 
0254     if (m->count++) {
0255         mutex_unlock(&svc_pool_map_mutex);
0256         WARN_ON_ONCE(m->npools <= 1);
0257         return m->npools;
0258     }
0259 
0260     if (m->mode == SVC_POOL_AUTO)
0261         m->mode = svc_pool_map_choose_mode();
0262 
0263     switch (m->mode) {
0264     case SVC_POOL_PERCPU:
0265         npools = svc_pool_map_init_percpu(m);
0266         break;
0267     case SVC_POOL_PERNODE:
0268         npools = svc_pool_map_init_pernode(m);
0269         break;
0270     }
0271 
0272     if (npools <= 0) {
0273         /* default, or memory allocation failure */
0274         npools = 1;
0275         m->mode = SVC_POOL_GLOBAL;
0276     }
0277     m->npools = npools;
0278 
0279     if (npools == 1)
0280         /* service is unpooled, so doesn't hold a reference */
0281         m->count--;
0282 
0283     mutex_unlock(&svc_pool_map_mutex);
0284     return npools;
0285 }
0286 
0287 /*
0288  * Drop a reference to the global map of cpus to pools, if
0289  * pools were in use, i.e. if npools > 1.
0290  * When the last reference is dropped, the map data is
0291  * freed; this allows the sysadmin to change the pool
0292  * mode using the pool_mode module option without
0293  * rebooting or re-loading sunrpc.ko.
0294  */
0295 static void
0296 svc_pool_map_put(int npools)
0297 {
0298     struct svc_pool_map *m = &svc_pool_map;
0299 
0300     if (npools <= 1)
0301         return;
0302     mutex_lock(&svc_pool_map_mutex);
0303 
0304     if (!--m->count) {
0305         kfree(m->to_pool);
0306         m->to_pool = NULL;
0307         kfree(m->pool_to);
0308         m->pool_to = NULL;
0309         m->npools = 0;
0310     }
0311 
0312     mutex_unlock(&svc_pool_map_mutex);
0313 }
0314 
0315 static int svc_pool_map_get_node(unsigned int pidx)
0316 {
0317     const struct svc_pool_map *m = &svc_pool_map;
0318 
0319     if (m->count) {
0320         if (m->mode == SVC_POOL_PERCPU)
0321             return cpu_to_node(m->pool_to[pidx]);
0322         if (m->mode == SVC_POOL_PERNODE)
0323             return m->pool_to[pidx];
0324     }
0325     return NUMA_NO_NODE;
0326 }
0327 /*
0328  * Set the given thread's cpus_allowed mask so that it
0329  * will only run on cpus in the given pool.
0330  */
0331 static inline void
0332 svc_pool_map_set_cpumask(struct task_struct *task, unsigned int pidx)
0333 {
0334     struct svc_pool_map *m = &svc_pool_map;
0335     unsigned int node = m->pool_to[pidx];
0336 
0337     /*
0338      * The caller checks for sv_nrpools > 1, which
0339      * implies that we've been initialized.
0340      */
0341     WARN_ON_ONCE(m->count == 0);
0342     if (m->count == 0)
0343         return;
0344 
0345     switch (m->mode) {
0346     case SVC_POOL_PERCPU:
0347     {
0348         set_cpus_allowed_ptr(task, cpumask_of(node));
0349         break;
0350     }
0351     case SVC_POOL_PERNODE:
0352     {
0353         set_cpus_allowed_ptr(task, cpumask_of_node(node));
0354         break;
0355     }
0356     }
0357 }
0358 
0359 /**
0360  * svc_pool_for_cpu - Select pool to run a thread on this cpu
0361  * @serv: An RPC service
0362  *
0363  * Use the active CPU and the svc_pool_map's mode setting to
0364  * select the svc thread pool to use. Once initialized, the
0365  * svc_pool_map does not change.
0366  *
0367  * Return value:
0368  *   A pointer to an svc_pool
0369  */
0370 struct svc_pool *svc_pool_for_cpu(struct svc_serv *serv)
0371 {
0372     struct svc_pool_map *m = &svc_pool_map;
0373     int cpu = raw_smp_processor_id();
0374     unsigned int pidx = 0;
0375 
0376     if (serv->sv_nrpools <= 1)
0377         return serv->sv_pools;
0378 
0379     switch (m->mode) {
0380     case SVC_POOL_PERCPU:
0381         pidx = m->to_pool[cpu];
0382         break;
0383     case SVC_POOL_PERNODE:
0384         pidx = m->to_pool[cpu_to_node(cpu)];
0385         break;
0386     }
0387 
0388     return &serv->sv_pools[pidx % serv->sv_nrpools];
0389 }
0390 
0391 int svc_rpcb_setup(struct svc_serv *serv, struct net *net)
0392 {
0393     int err;
0394 
0395     err = rpcb_create_local(net);
0396     if (err)
0397         return err;
0398 
0399     /* Remove any stale portmap registrations */
0400     svc_unregister(serv, net);
0401     return 0;
0402 }
0403 EXPORT_SYMBOL_GPL(svc_rpcb_setup);
0404 
0405 void svc_rpcb_cleanup(struct svc_serv *serv, struct net *net)
0406 {
0407     svc_unregister(serv, net);
0408     rpcb_put_local(net);
0409 }
0410 EXPORT_SYMBOL_GPL(svc_rpcb_cleanup);
0411 
0412 static int svc_uses_rpcbind(struct svc_serv *serv)
0413 {
0414     struct svc_program  *progp;
0415     unsigned int        i;
0416 
0417     for (progp = serv->sv_program; progp; progp = progp->pg_next) {
0418         for (i = 0; i < progp->pg_nvers; i++) {
0419             if (progp->pg_vers[i] == NULL)
0420                 continue;
0421             if (!progp->pg_vers[i]->vs_hidden)
0422                 return 1;
0423         }
0424     }
0425 
0426     return 0;
0427 }
0428 
0429 int svc_bind(struct svc_serv *serv, struct net *net)
0430 {
0431     if (!svc_uses_rpcbind(serv))
0432         return 0;
0433     return svc_rpcb_setup(serv, net);
0434 }
0435 EXPORT_SYMBOL_GPL(svc_bind);
0436 
0437 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
0438 static void
0439 __svc_init_bc(struct svc_serv *serv)
0440 {
0441     INIT_LIST_HEAD(&serv->sv_cb_list);
0442     spin_lock_init(&serv->sv_cb_lock);
0443     init_waitqueue_head(&serv->sv_cb_waitq);
0444 }
0445 #else
0446 static void
0447 __svc_init_bc(struct svc_serv *serv)
0448 {
0449 }
0450 #endif
0451 
0452 /*
0453  * Create an RPC service
0454  */
0455 static struct svc_serv *
0456 __svc_create(struct svc_program *prog, unsigned int bufsize, int npools,
0457          int (*threadfn)(void *data))
0458 {
0459     struct svc_serv *serv;
0460     unsigned int vers;
0461     unsigned int xdrsize;
0462     unsigned int i;
0463 
0464     if (!(serv = kzalloc(sizeof(*serv), GFP_KERNEL)))
0465         return NULL;
0466     serv->sv_name      = prog->pg_name;
0467     serv->sv_program   = prog;
0468     kref_init(&serv->sv_refcnt);
0469     serv->sv_stats     = prog->pg_stats;
0470     if (bufsize > RPCSVC_MAXPAYLOAD)
0471         bufsize = RPCSVC_MAXPAYLOAD;
0472     serv->sv_max_payload = bufsize? bufsize : 4096;
0473     serv->sv_max_mesg  = roundup(serv->sv_max_payload + PAGE_SIZE, PAGE_SIZE);
0474     serv->sv_threadfn = threadfn;
0475     xdrsize = 0;
0476     while (prog) {
0477         prog->pg_lovers = prog->pg_nvers-1;
0478         for (vers=0; vers<prog->pg_nvers ; vers++)
0479             if (prog->pg_vers[vers]) {
0480                 prog->pg_hivers = vers;
0481                 if (prog->pg_lovers > vers)
0482                     prog->pg_lovers = vers;
0483                 if (prog->pg_vers[vers]->vs_xdrsize > xdrsize)
0484                     xdrsize = prog->pg_vers[vers]->vs_xdrsize;
0485             }
0486         prog = prog->pg_next;
0487     }
0488     serv->sv_xdrsize   = xdrsize;
0489     INIT_LIST_HEAD(&serv->sv_tempsocks);
0490     INIT_LIST_HEAD(&serv->sv_permsocks);
0491     timer_setup(&serv->sv_temptimer, NULL, 0);
0492     spin_lock_init(&serv->sv_lock);
0493 
0494     __svc_init_bc(serv);
0495 
0496     serv->sv_nrpools = npools;
0497     serv->sv_pools =
0498         kcalloc(serv->sv_nrpools, sizeof(struct svc_pool),
0499             GFP_KERNEL);
0500     if (!serv->sv_pools) {
0501         kfree(serv);
0502         return NULL;
0503     }
0504 
0505     for (i = 0; i < serv->sv_nrpools; i++) {
0506         struct svc_pool *pool = &serv->sv_pools[i];
0507 
0508         dprintk("svc: initialising pool %u for %s\n",
0509                 i, serv->sv_name);
0510 
0511         pool->sp_id = i;
0512         INIT_LIST_HEAD(&pool->sp_sockets);
0513         INIT_LIST_HEAD(&pool->sp_all_threads);
0514         spin_lock_init(&pool->sp_lock);
0515     }
0516 
0517     return serv;
0518 }
0519 
0520 /**
0521  * svc_create - Create an RPC service
0522  * @prog: the RPC program the new service will handle
0523  * @bufsize: maximum message size for @prog
0524  * @threadfn: a function to service RPC requests for @prog
0525  *
0526  * Returns an instantiated struct svc_serv object or NULL.
0527  */
0528 struct svc_serv *svc_create(struct svc_program *prog, unsigned int bufsize,
0529                 int (*threadfn)(void *data))
0530 {
0531     return __svc_create(prog, bufsize, 1, threadfn);
0532 }
0533 EXPORT_SYMBOL_GPL(svc_create);
0534 
0535 /**
0536  * svc_create_pooled - Create an RPC service with pooled threads
0537  * @prog: the RPC program the new service will handle
0538  * @bufsize: maximum message size for @prog
0539  * @threadfn: a function to service RPC requests for @prog
0540  *
0541  * Returns an instantiated struct svc_serv object or NULL.
0542  */
0543 struct svc_serv *svc_create_pooled(struct svc_program *prog,
0544                    unsigned int bufsize,
0545                    int (*threadfn)(void *data))
0546 {
0547     struct svc_serv *serv;
0548     unsigned int npools = svc_pool_map_get();
0549 
0550     serv = __svc_create(prog, bufsize, npools, threadfn);
0551     if (!serv)
0552         goto out_err;
0553     return serv;
0554 out_err:
0555     svc_pool_map_put(npools);
0556     return NULL;
0557 }
0558 EXPORT_SYMBOL_GPL(svc_create_pooled);
0559 
0560 /*
0561  * Destroy an RPC service. Should be called with appropriate locking to
0562  * protect sv_permsocks and sv_tempsocks.
0563  */
0564 void
0565 svc_destroy(struct kref *ref)
0566 {
0567     struct svc_serv *serv = container_of(ref, struct svc_serv, sv_refcnt);
0568 
0569     dprintk("svc: svc_destroy(%s)\n", serv->sv_program->pg_name);
0570     del_timer_sync(&serv->sv_temptimer);
0571 
0572     /*
0573      * The last user is gone and thus all sockets have to be destroyed to
0574      * the point. Check this.
0575      */
0576     BUG_ON(!list_empty(&serv->sv_permsocks));
0577     BUG_ON(!list_empty(&serv->sv_tempsocks));
0578 
0579     cache_clean_deferred(serv);
0580 
0581     svc_pool_map_put(serv->sv_nrpools);
0582 
0583     kfree(serv->sv_pools);
0584     kfree(serv);
0585 }
0586 EXPORT_SYMBOL_GPL(svc_destroy);
0587 
0588 /*
0589  * Allocate an RPC server's buffer space.
0590  * We allocate pages and place them in rq_pages.
0591  */
0592 static int
0593 svc_init_buffer(struct svc_rqst *rqstp, unsigned int size, int node)
0594 {
0595     unsigned int pages, arghi;
0596 
0597     /* bc_xprt uses fore channel allocated buffers */
0598     if (svc_is_backchannel(rqstp))
0599         return 1;
0600 
0601     pages = size / PAGE_SIZE + 1; /* extra page as we hold both request and reply.
0602                        * We assume one is at most one page
0603                        */
0604     arghi = 0;
0605     WARN_ON_ONCE(pages > RPCSVC_MAXPAGES);
0606     if (pages > RPCSVC_MAXPAGES)
0607         pages = RPCSVC_MAXPAGES;
0608     while (pages) {
0609         struct page *p = alloc_pages_node(node, GFP_KERNEL, 0);
0610         if (!p)
0611             break;
0612         rqstp->rq_pages[arghi++] = p;
0613         pages--;
0614     }
0615     return pages == 0;
0616 }
0617 
0618 /*
0619  * Release an RPC server buffer
0620  */
0621 static void
0622 svc_release_buffer(struct svc_rqst *rqstp)
0623 {
0624     unsigned int i;
0625 
0626     for (i = 0; i < ARRAY_SIZE(rqstp->rq_pages); i++)
0627         if (rqstp->rq_pages[i])
0628             put_page(rqstp->rq_pages[i]);
0629 }
0630 
0631 struct svc_rqst *
0632 svc_rqst_alloc(struct svc_serv *serv, struct svc_pool *pool, int node)
0633 {
0634     struct svc_rqst *rqstp;
0635 
0636     rqstp = kzalloc_node(sizeof(*rqstp), GFP_KERNEL, node);
0637     if (!rqstp)
0638         return rqstp;
0639 
0640     __set_bit(RQ_BUSY, &rqstp->rq_flags);
0641     spin_lock_init(&rqstp->rq_lock);
0642     rqstp->rq_server = serv;
0643     rqstp->rq_pool = pool;
0644 
0645     rqstp->rq_scratch_page = alloc_pages_node(node, GFP_KERNEL, 0);
0646     if (!rqstp->rq_scratch_page)
0647         goto out_enomem;
0648 
0649     rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
0650     if (!rqstp->rq_argp)
0651         goto out_enomem;
0652 
0653     rqstp->rq_resp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
0654     if (!rqstp->rq_resp)
0655         goto out_enomem;
0656 
0657     if (!svc_init_buffer(rqstp, serv->sv_max_mesg, node))
0658         goto out_enomem;
0659 
0660     return rqstp;
0661 out_enomem:
0662     svc_rqst_free(rqstp);
0663     return NULL;
0664 }
0665 EXPORT_SYMBOL_GPL(svc_rqst_alloc);
0666 
0667 static struct svc_rqst *
0668 svc_prepare_thread(struct svc_serv *serv, struct svc_pool *pool, int node)
0669 {
0670     struct svc_rqst *rqstp;
0671 
0672     rqstp = svc_rqst_alloc(serv, pool, node);
0673     if (!rqstp)
0674         return ERR_PTR(-ENOMEM);
0675 
0676     svc_get(serv);
0677     spin_lock_bh(&serv->sv_lock);
0678     serv->sv_nrthreads += 1;
0679     spin_unlock_bh(&serv->sv_lock);
0680 
0681     spin_lock_bh(&pool->sp_lock);
0682     pool->sp_nrthreads++;
0683     list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
0684     spin_unlock_bh(&pool->sp_lock);
0685     return rqstp;
0686 }
0687 
0688 /*
0689  * Choose a pool in which to create a new thread, for svc_set_num_threads
0690  */
0691 static inline struct svc_pool *
0692 choose_pool(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
0693 {
0694     if (pool != NULL)
0695         return pool;
0696 
0697     return &serv->sv_pools[(*state)++ % serv->sv_nrpools];
0698 }
0699 
0700 /*
0701  * Choose a thread to kill, for svc_set_num_threads
0702  */
0703 static inline struct task_struct *
0704 choose_victim(struct svc_serv *serv, struct svc_pool *pool, unsigned int *state)
0705 {
0706     unsigned int i;
0707     struct task_struct *task = NULL;
0708 
0709     if (pool != NULL) {
0710         spin_lock_bh(&pool->sp_lock);
0711     } else {
0712         /* choose a pool in round-robin fashion */
0713         for (i = 0; i < serv->sv_nrpools; i++) {
0714             pool = &serv->sv_pools[--(*state) % serv->sv_nrpools];
0715             spin_lock_bh(&pool->sp_lock);
0716             if (!list_empty(&pool->sp_all_threads))
0717                 goto found_pool;
0718             spin_unlock_bh(&pool->sp_lock);
0719         }
0720         return NULL;
0721     }
0722 
0723 found_pool:
0724     if (!list_empty(&pool->sp_all_threads)) {
0725         struct svc_rqst *rqstp;
0726 
0727         /*
0728          * Remove from the pool->sp_all_threads list
0729          * so we don't try to kill it again.
0730          */
0731         rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
0732         set_bit(RQ_VICTIM, &rqstp->rq_flags);
0733         list_del_rcu(&rqstp->rq_all);
0734         task = rqstp->rq_task;
0735     }
0736     spin_unlock_bh(&pool->sp_lock);
0737 
0738     return task;
0739 }
0740 
0741 /* create new threads */
0742 static int
0743 svc_start_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
0744 {
0745     struct svc_rqst *rqstp;
0746     struct task_struct *task;
0747     struct svc_pool *chosen_pool;
0748     unsigned int state = serv->sv_nrthreads-1;
0749     int node;
0750 
0751     do {
0752         nrservs--;
0753         chosen_pool = choose_pool(serv, pool, &state);
0754 
0755         node = svc_pool_map_get_node(chosen_pool->sp_id);
0756         rqstp = svc_prepare_thread(serv, chosen_pool, node);
0757         if (IS_ERR(rqstp))
0758             return PTR_ERR(rqstp);
0759 
0760         task = kthread_create_on_node(serv->sv_threadfn, rqstp,
0761                           node, "%s", serv->sv_name);
0762         if (IS_ERR(task)) {
0763             svc_exit_thread(rqstp);
0764             return PTR_ERR(task);
0765         }
0766 
0767         rqstp->rq_task = task;
0768         if (serv->sv_nrpools > 1)
0769             svc_pool_map_set_cpumask(task, chosen_pool->sp_id);
0770 
0771         svc_sock_update_bufs(serv);
0772         wake_up_process(task);
0773     } while (nrservs > 0);
0774 
0775     return 0;
0776 }
0777 
0778 /*
0779  * Create or destroy enough new threads to make the number
0780  * of threads the given number.  If `pool' is non-NULL, applies
0781  * only to threads in that pool, otherwise round-robins between
0782  * all pools.  Caller must ensure that mutual exclusion between this and
0783  * server startup or shutdown.
0784  */
0785 
0786 /* destroy old threads */
0787 static int
0788 svc_stop_kthreads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
0789 {
0790     struct task_struct *task;
0791     unsigned int state = serv->sv_nrthreads-1;
0792 
0793     /* destroy old threads */
0794     do {
0795         task = choose_victim(serv, pool, &state);
0796         if (task == NULL)
0797             break;
0798         kthread_stop(task);
0799         nrservs++;
0800     } while (nrservs < 0);
0801     return 0;
0802 }
0803 
0804 int
0805 svc_set_num_threads(struct svc_serv *serv, struct svc_pool *pool, int nrservs)
0806 {
0807     if (pool == NULL) {
0808         nrservs -= serv->sv_nrthreads;
0809     } else {
0810         spin_lock_bh(&pool->sp_lock);
0811         nrservs -= pool->sp_nrthreads;
0812         spin_unlock_bh(&pool->sp_lock);
0813     }
0814 
0815     if (nrservs > 0)
0816         return svc_start_kthreads(serv, pool, nrservs);
0817     if (nrservs < 0)
0818         return svc_stop_kthreads(serv, pool, nrservs);
0819     return 0;
0820 }
0821 EXPORT_SYMBOL_GPL(svc_set_num_threads);
0822 
0823 /**
0824  * svc_rqst_replace_page - Replace one page in rq_pages[]
0825  * @rqstp: svc_rqst with pages to replace
0826  * @page: replacement page
0827  *
0828  * When replacing a page in rq_pages, batch the release of the
0829  * replaced pages to avoid hammering the page allocator.
0830  */
0831 void svc_rqst_replace_page(struct svc_rqst *rqstp, struct page *page)
0832 {
0833     if (*rqstp->rq_next_page) {
0834         if (!pagevec_space(&rqstp->rq_pvec))
0835             __pagevec_release(&rqstp->rq_pvec);
0836         pagevec_add(&rqstp->rq_pvec, *rqstp->rq_next_page);
0837     }
0838 
0839     get_page(page);
0840     *(rqstp->rq_next_page++) = page;
0841 }
0842 EXPORT_SYMBOL_GPL(svc_rqst_replace_page);
0843 
0844 /*
0845  * Called from a server thread as it's exiting. Caller must hold the "service
0846  * mutex" for the service.
0847  */
0848 void
0849 svc_rqst_free(struct svc_rqst *rqstp)
0850 {
0851     svc_release_buffer(rqstp);
0852     if (rqstp->rq_scratch_page)
0853         put_page(rqstp->rq_scratch_page);
0854     kfree(rqstp->rq_resp);
0855     kfree(rqstp->rq_argp);
0856     kfree(rqstp->rq_auth_data);
0857     kfree_rcu(rqstp, rq_rcu_head);
0858 }
0859 EXPORT_SYMBOL_GPL(svc_rqst_free);
0860 
0861 void
0862 svc_exit_thread(struct svc_rqst *rqstp)
0863 {
0864     struct svc_serv *serv = rqstp->rq_server;
0865     struct svc_pool *pool = rqstp->rq_pool;
0866 
0867     spin_lock_bh(&pool->sp_lock);
0868     pool->sp_nrthreads--;
0869     if (!test_and_set_bit(RQ_VICTIM, &rqstp->rq_flags))
0870         list_del_rcu(&rqstp->rq_all);
0871     spin_unlock_bh(&pool->sp_lock);
0872 
0873     spin_lock_bh(&serv->sv_lock);
0874     serv->sv_nrthreads -= 1;
0875     spin_unlock_bh(&serv->sv_lock);
0876     svc_sock_update_bufs(serv);
0877 
0878     svc_rqst_free(rqstp);
0879 
0880     svc_put(serv);
0881 }
0882 EXPORT_SYMBOL_GPL(svc_exit_thread);
0883 
0884 /*
0885  * Register an "inet" protocol family netid with the local
0886  * rpcbind daemon via an rpcbind v4 SET request.
0887  *
0888  * No netconfig infrastructure is available in the kernel, so
0889  * we map IP_ protocol numbers to netids by hand.
0890  *
0891  * Returns zero on success; a negative errno value is returned
0892  * if any error occurs.
0893  */
0894 static int __svc_rpcb_register4(struct net *net, const u32 program,
0895                 const u32 version,
0896                 const unsigned short protocol,
0897                 const unsigned short port)
0898 {
0899     const struct sockaddr_in sin = {
0900         .sin_family     = AF_INET,
0901         .sin_addr.s_addr    = htonl(INADDR_ANY),
0902         .sin_port       = htons(port),
0903     };
0904     const char *netid;
0905     int error;
0906 
0907     switch (protocol) {
0908     case IPPROTO_UDP:
0909         netid = RPCBIND_NETID_UDP;
0910         break;
0911     case IPPROTO_TCP:
0912         netid = RPCBIND_NETID_TCP;
0913         break;
0914     default:
0915         return -ENOPROTOOPT;
0916     }
0917 
0918     error = rpcb_v4_register(net, program, version,
0919                     (const struct sockaddr *)&sin, netid);
0920 
0921     /*
0922      * User space didn't support rpcbind v4, so retry this
0923      * registration request with the legacy rpcbind v2 protocol.
0924      */
0925     if (error == -EPROTONOSUPPORT)
0926         error = rpcb_register(net, program, version, protocol, port);
0927 
0928     return error;
0929 }
0930 
0931 #if IS_ENABLED(CONFIG_IPV6)
0932 /*
0933  * Register an "inet6" protocol family netid with the local
0934  * rpcbind daemon via an rpcbind v4 SET request.
0935  *
0936  * No netconfig infrastructure is available in the kernel, so
0937  * we map IP_ protocol numbers to netids by hand.
0938  *
0939  * Returns zero on success; a negative errno value is returned
0940  * if any error occurs.
0941  */
0942 static int __svc_rpcb_register6(struct net *net, const u32 program,
0943                 const u32 version,
0944                 const unsigned short protocol,
0945                 const unsigned short port)
0946 {
0947     const struct sockaddr_in6 sin6 = {
0948         .sin6_family        = AF_INET6,
0949         .sin6_addr      = IN6ADDR_ANY_INIT,
0950         .sin6_port      = htons(port),
0951     };
0952     const char *netid;
0953     int error;
0954 
0955     switch (protocol) {
0956     case IPPROTO_UDP:
0957         netid = RPCBIND_NETID_UDP6;
0958         break;
0959     case IPPROTO_TCP:
0960         netid = RPCBIND_NETID_TCP6;
0961         break;
0962     default:
0963         return -ENOPROTOOPT;
0964     }
0965 
0966     error = rpcb_v4_register(net, program, version,
0967                     (const struct sockaddr *)&sin6, netid);
0968 
0969     /*
0970      * User space didn't support rpcbind version 4, so we won't
0971      * use a PF_INET6 listener.
0972      */
0973     if (error == -EPROTONOSUPPORT)
0974         error = -EAFNOSUPPORT;
0975 
0976     return error;
0977 }
0978 #endif  /* IS_ENABLED(CONFIG_IPV6) */
0979 
0980 /*
0981  * Register a kernel RPC service via rpcbind version 4.
0982  *
0983  * Returns zero on success; a negative errno value is returned
0984  * if any error occurs.
0985  */
0986 static int __svc_register(struct net *net, const char *progname,
0987               const u32 program, const u32 version,
0988               const int family,
0989               const unsigned short protocol,
0990               const unsigned short port)
0991 {
0992     int error = -EAFNOSUPPORT;
0993 
0994     switch (family) {
0995     case PF_INET:
0996         error = __svc_rpcb_register4(net, program, version,
0997                         protocol, port);
0998         break;
0999 #if IS_ENABLED(CONFIG_IPV6)
1000     case PF_INET6:
1001         error = __svc_rpcb_register6(net, program, version,
1002                         protocol, port);
1003 #endif
1004     }
1005 
1006     trace_svc_register(progname, version, protocol, port, family, error);
1007     return error;
1008 }
1009 
1010 int svc_rpcbind_set_version(struct net *net,
1011                 const struct svc_program *progp,
1012                 u32 version, int family,
1013                 unsigned short proto,
1014                 unsigned short port)
1015 {
1016     return __svc_register(net, progp->pg_name, progp->pg_prog,
1017                 version, family, proto, port);
1018 
1019 }
1020 EXPORT_SYMBOL_GPL(svc_rpcbind_set_version);
1021 
1022 int svc_generic_rpcbind_set(struct net *net,
1023                 const struct svc_program *progp,
1024                 u32 version, int family,
1025                 unsigned short proto,
1026                 unsigned short port)
1027 {
1028     const struct svc_version *vers = progp->pg_vers[version];
1029     int error;
1030 
1031     if (vers == NULL)
1032         return 0;
1033 
1034     if (vers->vs_hidden) {
1035         trace_svc_noregister(progp->pg_name, version, proto,
1036                      port, family, 0);
1037         return 0;
1038     }
1039 
1040     /*
1041      * Don't register a UDP port if we need congestion
1042      * control.
1043      */
1044     if (vers->vs_need_cong_ctrl && proto == IPPROTO_UDP)
1045         return 0;
1046 
1047     error = svc_rpcbind_set_version(net, progp, version,
1048                     family, proto, port);
1049 
1050     return (vers->vs_rpcb_optnl) ? 0 : error;
1051 }
1052 EXPORT_SYMBOL_GPL(svc_generic_rpcbind_set);
1053 
1054 /**
1055  * svc_register - register an RPC service with the local portmapper
1056  * @serv: svc_serv struct for the service to register
1057  * @net: net namespace for the service to register
1058  * @family: protocol family of service's listener socket
1059  * @proto: transport protocol number to advertise
1060  * @port: port to advertise
1061  *
1062  * Service is registered for any address in the passed-in protocol family
1063  */
1064 int svc_register(const struct svc_serv *serv, struct net *net,
1065          const int family, const unsigned short proto,
1066          const unsigned short port)
1067 {
1068     struct svc_program  *progp;
1069     unsigned int        i;
1070     int         error = 0;
1071 
1072     WARN_ON_ONCE(proto == 0 && port == 0);
1073     if (proto == 0 && port == 0)
1074         return -EINVAL;
1075 
1076     for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1077         for (i = 0; i < progp->pg_nvers; i++) {
1078 
1079             error = progp->pg_rpcbind_set(net, progp, i,
1080                     family, proto, port);
1081             if (error < 0) {
1082                 printk(KERN_WARNING "svc: failed to register "
1083                     "%sv%u RPC service (errno %d).\n",
1084                     progp->pg_name, i, -error);
1085                 break;
1086             }
1087         }
1088     }
1089 
1090     return error;
1091 }
1092 
1093 /*
1094  * If user space is running rpcbind, it should take the v4 UNSET
1095  * and clear everything for this [program, version].  If user space
1096  * is running portmap, it will reject the v4 UNSET, but won't have
1097  * any "inet6" entries anyway.  So a PMAP_UNSET should be sufficient
1098  * in this case to clear all existing entries for [program, version].
1099  */
1100 static void __svc_unregister(struct net *net, const u32 program, const u32 version,
1101                  const char *progname)
1102 {
1103     int error;
1104 
1105     error = rpcb_v4_register(net, program, version, NULL, "");
1106 
1107     /*
1108      * User space didn't support rpcbind v4, so retry this
1109      * request with the legacy rpcbind v2 protocol.
1110      */
1111     if (error == -EPROTONOSUPPORT)
1112         error = rpcb_register(net, program, version, 0, 0);
1113 
1114     trace_svc_unregister(progname, version, error);
1115 }
1116 
1117 /*
1118  * All netids, bind addresses and ports registered for [program, version]
1119  * are removed from the local rpcbind database (if the service is not
1120  * hidden) to make way for a new instance of the service.
1121  *
1122  * The result of unregistration is reported via dprintk for those who want
1123  * verification of the result, but is otherwise not important.
1124  */
1125 static void svc_unregister(const struct svc_serv *serv, struct net *net)
1126 {
1127     struct svc_program *progp;
1128     unsigned long flags;
1129     unsigned int i;
1130 
1131     clear_thread_flag(TIF_SIGPENDING);
1132 
1133     for (progp = serv->sv_program; progp; progp = progp->pg_next) {
1134         for (i = 0; i < progp->pg_nvers; i++) {
1135             if (progp->pg_vers[i] == NULL)
1136                 continue;
1137             if (progp->pg_vers[i]->vs_hidden)
1138                 continue;
1139             __svc_unregister(net, progp->pg_prog, i, progp->pg_name);
1140         }
1141     }
1142 
1143     spin_lock_irqsave(&current->sighand->siglock, flags);
1144     recalc_sigpending();
1145     spin_unlock_irqrestore(&current->sighand->siglock, flags);
1146 }
1147 
1148 /*
1149  * dprintk the given error with the address of the client that caused it.
1150  */
1151 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
1152 static __printf(2, 3)
1153 void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
1154 {
1155     struct va_format vaf;
1156     va_list args;
1157     char    buf[RPC_MAX_ADDRBUFLEN];
1158 
1159     va_start(args, fmt);
1160 
1161     vaf.fmt = fmt;
1162     vaf.va = &args;
1163 
1164     dprintk("svc: %s: %pV", svc_print_addr(rqstp, buf, sizeof(buf)), &vaf);
1165 
1166     va_end(args);
1167 }
1168 #else
1169 static __printf(2,3) void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...) {}
1170 #endif
1171 
1172 __be32
1173 svc_generic_init_request(struct svc_rqst *rqstp,
1174         const struct svc_program *progp,
1175         struct svc_process_info *ret)
1176 {
1177     const struct svc_version *versp = NULL; /* compiler food */
1178     const struct svc_procedure *procp = NULL;
1179 
1180     if (rqstp->rq_vers >= progp->pg_nvers )
1181         goto err_bad_vers;
1182     versp = progp->pg_vers[rqstp->rq_vers];
1183     if (!versp)
1184         goto err_bad_vers;
1185 
1186     /*
1187      * Some protocol versions (namely NFSv4) require some form of
1188      * congestion control.  (See RFC 7530 section 3.1 paragraph 2)
1189      * In other words, UDP is not allowed. We mark those when setting
1190      * up the svc_xprt, and verify that here.
1191      *
1192      * The spec is not very clear about what error should be returned
1193      * when someone tries to access a server that is listening on UDP
1194      * for lower versions. RPC_PROG_MISMATCH seems to be the closest
1195      * fit.
1196      */
1197     if (versp->vs_need_cong_ctrl && rqstp->rq_xprt &&
1198         !test_bit(XPT_CONG_CTRL, &rqstp->rq_xprt->xpt_flags))
1199         goto err_bad_vers;
1200 
1201     if (rqstp->rq_proc >= versp->vs_nproc)
1202         goto err_bad_proc;
1203     rqstp->rq_procinfo = procp = &versp->vs_proc[rqstp->rq_proc];
1204     if (!procp)
1205         goto err_bad_proc;
1206 
1207     /* Initialize storage for argp and resp */
1208     memset(rqstp->rq_argp, 0, procp->pc_argsize);
1209     memset(rqstp->rq_resp, 0, procp->pc_ressize);
1210 
1211     /* Bump per-procedure stats counter */
1212     versp->vs_count[rqstp->rq_proc]++;
1213 
1214     ret->dispatch = versp->vs_dispatch;
1215     return rpc_success;
1216 err_bad_vers:
1217     ret->mismatch.lovers = progp->pg_lovers;
1218     ret->mismatch.hivers = progp->pg_hivers;
1219     return rpc_prog_mismatch;
1220 err_bad_proc:
1221     return rpc_proc_unavail;
1222 }
1223 EXPORT_SYMBOL_GPL(svc_generic_init_request);
1224 
1225 /*
1226  * Common routine for processing the RPC request.
1227  */
1228 static int
1229 svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv)
1230 {
1231     struct svc_program  *progp;
1232     const struct svc_procedure *procp = NULL;
1233     struct svc_serv     *serv = rqstp->rq_server;
1234     struct svc_process_info process;
1235     __be32          *statp;
1236     u32         prog, vers;
1237     __be32          rpc_stat;
1238     int         auth_res, rc;
1239     __be32          *reply_statp;
1240 
1241     rpc_stat = rpc_success;
1242 
1243     if (argv->iov_len < 6*4)
1244         goto err_short_len;
1245 
1246     /* Will be turned off by GSS integrity and privacy services */
1247     __set_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
1248     /* Will be turned off only when NFSv4 Sessions are used */
1249     __set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
1250     __clear_bit(RQ_DROPME, &rqstp->rq_flags);
1251 
1252     svc_putu32(resv, rqstp->rq_xid);
1253 
1254     vers = svc_getnl(argv);
1255 
1256     /* First words of reply: */
1257     svc_putnl(resv, 1);     /* REPLY */
1258 
1259     if (vers != 2)      /* RPC version number */
1260         goto err_bad_rpc;
1261 
1262     /* Save position in case we later decide to reject: */
1263     reply_statp = resv->iov_base + resv->iov_len;
1264 
1265     svc_putnl(resv, 0);     /* ACCEPT */
1266 
1267     rqstp->rq_prog = prog = svc_getnl(argv);    /* program number */
1268     rqstp->rq_vers = svc_getnl(argv);   /* version number */
1269     rqstp->rq_proc = svc_getnl(argv);   /* procedure number */
1270 
1271     for (progp = serv->sv_program; progp; progp = progp->pg_next)
1272         if (prog == progp->pg_prog)
1273             break;
1274 
1275     /*
1276      * Decode auth data, and add verifier to reply buffer.
1277      * We do this before anything else in order to get a decent
1278      * auth verifier.
1279      */
1280     auth_res = svc_authenticate(rqstp);
1281     /* Also give the program a chance to reject this call: */
1282     if (auth_res == SVC_OK && progp)
1283         auth_res = progp->pg_authenticate(rqstp);
1284     if (auth_res != SVC_OK)
1285         trace_svc_authenticate(rqstp, auth_res);
1286     switch (auth_res) {
1287     case SVC_OK:
1288         break;
1289     case SVC_GARBAGE:
1290         goto err_garbage;
1291     case SVC_SYSERR:
1292         rpc_stat = rpc_system_err;
1293         goto err_bad;
1294     case SVC_DENIED:
1295         goto err_bad_auth;
1296     case SVC_CLOSE:
1297         goto close;
1298     case SVC_DROP:
1299         goto dropit;
1300     case SVC_COMPLETE:
1301         goto sendit;
1302     }
1303 
1304     if (progp == NULL)
1305         goto err_bad_prog;
1306 
1307     rpc_stat = progp->pg_init_request(rqstp, progp, &process);
1308     switch (rpc_stat) {
1309     case rpc_success:
1310         break;
1311     case rpc_prog_unavail:
1312         goto err_bad_prog;
1313     case rpc_prog_mismatch:
1314         goto err_bad_vers;
1315     case rpc_proc_unavail:
1316         goto err_bad_proc;
1317     }
1318 
1319     procp = rqstp->rq_procinfo;
1320     /* Should this check go into the dispatcher? */
1321     if (!procp || !procp->pc_func)
1322         goto err_bad_proc;
1323 
1324     /* Syntactic check complete */
1325     serv->sv_stats->rpccnt++;
1326     trace_svc_process(rqstp, progp->pg_name);
1327 
1328     /* Build the reply header. */
1329     statp = resv->iov_base +resv->iov_len;
1330     svc_putnl(resv, RPC_SUCCESS);
1331 
1332     /* un-reserve some of the out-queue now that we have a
1333      * better idea of reply size
1334      */
1335     if (procp->pc_xdrressize)
1336         svc_reserve_auth(rqstp, procp->pc_xdrressize<<2);
1337 
1338     /* Call the function that processes the request. */
1339     rc = process.dispatch(rqstp, statp);
1340     if (procp->pc_release)
1341         procp->pc_release(rqstp);
1342     if (!rc)
1343         goto dropit;
1344     if (rqstp->rq_auth_stat != rpc_auth_ok)
1345         goto err_bad_auth;
1346 
1347     /* Check RPC status result */
1348     if (*statp != rpc_success)
1349         resv->iov_len = ((void*)statp)  - resv->iov_base + 4;
1350 
1351     if (procp->pc_encode == NULL)
1352         goto dropit;
1353 
1354  sendit:
1355     if (svc_authorise(rqstp))
1356         goto close_xprt;
1357     return 1;       /* Caller can now send it */
1358 
1359  dropit:
1360     svc_authorise(rqstp);   /* doesn't hurt to call this twice */
1361     dprintk("svc: svc_process dropit\n");
1362     return 0;
1363 
1364  close:
1365     svc_authorise(rqstp);
1366 close_xprt:
1367     if (rqstp->rq_xprt && test_bit(XPT_TEMP, &rqstp->rq_xprt->xpt_flags))
1368         svc_xprt_close(rqstp->rq_xprt);
1369     dprintk("svc: svc_process close\n");
1370     return 0;
1371 
1372 err_short_len:
1373     svc_printk(rqstp, "short len %zd, dropping request\n",
1374             argv->iov_len);
1375     goto close_xprt;
1376 
1377 err_bad_rpc:
1378     serv->sv_stats->rpcbadfmt++;
1379     svc_putnl(resv, 1); /* REJECT */
1380     svc_putnl(resv, 0); /* RPC_MISMATCH */
1381     svc_putnl(resv, 2); /* Only RPCv2 supported */
1382     svc_putnl(resv, 2);
1383     goto sendit;
1384 
1385 err_bad_auth:
1386     dprintk("svc: authentication failed (%d)\n",
1387         be32_to_cpu(rqstp->rq_auth_stat));
1388     serv->sv_stats->rpcbadauth++;
1389     /* Restore write pointer to location of accept status: */
1390     xdr_ressize_check(rqstp, reply_statp);
1391     svc_putnl(resv, 1); /* REJECT */
1392     svc_putnl(resv, 1); /* AUTH_ERROR */
1393     svc_putu32(resv, rqstp->rq_auth_stat);  /* status */
1394     goto sendit;
1395 
1396 err_bad_prog:
1397     dprintk("svc: unknown program %d\n", prog);
1398     serv->sv_stats->rpcbadfmt++;
1399     svc_putnl(resv, RPC_PROG_UNAVAIL);
1400     goto sendit;
1401 
1402 err_bad_vers:
1403     svc_printk(rqstp, "unknown version (%d for prog %d, %s)\n",
1404                rqstp->rq_vers, rqstp->rq_prog, progp->pg_name);
1405 
1406     serv->sv_stats->rpcbadfmt++;
1407     svc_putnl(resv, RPC_PROG_MISMATCH);
1408     svc_putnl(resv, process.mismatch.lovers);
1409     svc_putnl(resv, process.mismatch.hivers);
1410     goto sendit;
1411 
1412 err_bad_proc:
1413     svc_printk(rqstp, "unknown procedure (%d)\n", rqstp->rq_proc);
1414 
1415     serv->sv_stats->rpcbadfmt++;
1416     svc_putnl(resv, RPC_PROC_UNAVAIL);
1417     goto sendit;
1418 
1419 err_garbage:
1420     svc_printk(rqstp, "failed to decode args\n");
1421 
1422     rpc_stat = rpc_garbage_args;
1423 err_bad:
1424     serv->sv_stats->rpcbadfmt++;
1425     svc_putnl(resv, ntohl(rpc_stat));
1426     goto sendit;
1427 }
1428 
1429 /*
1430  * Process the RPC request.
1431  */
1432 int
1433 svc_process(struct svc_rqst *rqstp)
1434 {
1435     struct kvec     *argv = &rqstp->rq_arg.head[0];
1436     struct kvec     *resv = &rqstp->rq_res.head[0];
1437     struct svc_serv     *serv = rqstp->rq_server;
1438     u32         dir;
1439 
1440 #if IS_ENABLED(CONFIG_FAIL_SUNRPC)
1441     if (!fail_sunrpc.ignore_server_disconnect &&
1442         should_fail(&fail_sunrpc.attr, 1))
1443         svc_xprt_deferred_close(rqstp->rq_xprt);
1444 #endif
1445 
1446     /*
1447      * Setup response xdr_buf.
1448      * Initially it has just one page
1449      */
1450     rqstp->rq_next_page = &rqstp->rq_respages[1];
1451     resv->iov_base = page_address(rqstp->rq_respages[0]);
1452     resv->iov_len = 0;
1453     rqstp->rq_res.pages = rqstp->rq_respages + 1;
1454     rqstp->rq_res.len = 0;
1455     rqstp->rq_res.page_base = 0;
1456     rqstp->rq_res.page_len = 0;
1457     rqstp->rq_res.buflen = PAGE_SIZE;
1458     rqstp->rq_res.tail[0].iov_base = NULL;
1459     rqstp->rq_res.tail[0].iov_len = 0;
1460 
1461     dir  = svc_getnl(argv);
1462     if (dir != 0) {
1463         /* direction != CALL */
1464         svc_printk(rqstp, "bad direction %d, dropping request\n", dir);
1465         serv->sv_stats->rpcbadfmt++;
1466         goto out_drop;
1467     }
1468 
1469     /* Returns 1 for send, 0 for drop */
1470     if (likely(svc_process_common(rqstp, argv, resv)))
1471         return svc_send(rqstp);
1472 
1473 out_drop:
1474     svc_drop(rqstp);
1475     return 0;
1476 }
1477 EXPORT_SYMBOL_GPL(svc_process);
1478 
1479 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1480 /*
1481  * Process a backchannel RPC request that arrived over an existing
1482  * outbound connection
1483  */
1484 int
1485 bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
1486            struct svc_rqst *rqstp)
1487 {
1488     struct kvec *argv = &rqstp->rq_arg.head[0];
1489     struct kvec *resv = &rqstp->rq_res.head[0];
1490     struct rpc_task *task;
1491     int proc_error;
1492     int error;
1493 
1494     dprintk("svc: %s(%p)\n", __func__, req);
1495 
1496     /* Build the svc_rqst used by the common processing routine */
1497     rqstp->rq_xid = req->rq_xid;
1498     rqstp->rq_prot = req->rq_xprt->prot;
1499     rqstp->rq_server = serv;
1500     rqstp->rq_bc_net = req->rq_xprt->xprt_net;
1501 
1502     rqstp->rq_addrlen = sizeof(req->rq_xprt->addr);
1503     memcpy(&rqstp->rq_addr, &req->rq_xprt->addr, rqstp->rq_addrlen);
1504     memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
1505     memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
1506 
1507     /* Adjust the argument buffer length */
1508     rqstp->rq_arg.len = req->rq_private_buf.len;
1509     if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len) {
1510         rqstp->rq_arg.head[0].iov_len = rqstp->rq_arg.len;
1511         rqstp->rq_arg.page_len = 0;
1512     } else if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len +
1513             rqstp->rq_arg.page_len)
1514         rqstp->rq_arg.page_len = rqstp->rq_arg.len -
1515             rqstp->rq_arg.head[0].iov_len;
1516     else
1517         rqstp->rq_arg.len = rqstp->rq_arg.head[0].iov_len +
1518             rqstp->rq_arg.page_len;
1519 
1520     /* reset result send buffer "put" position */
1521     resv->iov_len = 0;
1522 
1523     /*
1524      * Skip the next two words because they've already been
1525      * processed in the transport
1526      */
1527     svc_getu32(argv);   /* XID */
1528     svc_getnl(argv);    /* CALLDIR */
1529 
1530     /* Parse and execute the bc call */
1531     proc_error = svc_process_common(rqstp, argv, resv);
1532 
1533     atomic_dec(&req->rq_xprt->bc_slot_count);
1534     if (!proc_error) {
1535         /* Processing error: drop the request */
1536         xprt_free_bc_request(req);
1537         error = -EINVAL;
1538         goto out;
1539     }
1540     /* Finally, send the reply synchronously */
1541     memcpy(&req->rq_snd_buf, &rqstp->rq_res, sizeof(req->rq_snd_buf));
1542     task = rpc_run_bc_task(req);
1543     if (IS_ERR(task)) {
1544         error = PTR_ERR(task);
1545         goto out;
1546     }
1547 
1548     WARN_ON_ONCE(atomic_read(&task->tk_count) != 1);
1549     error = task->tk_status;
1550     rpc_put_task(task);
1551 
1552 out:
1553     dprintk("svc: %s(), error=%d\n", __func__, error);
1554     return error;
1555 }
1556 EXPORT_SYMBOL_GPL(bc_svc_process);
1557 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1558 
1559 /*
1560  * Return (transport-specific) limit on the rpc payload.
1561  */
1562 u32 svc_max_payload(const struct svc_rqst *rqstp)
1563 {
1564     u32 max = rqstp->rq_xprt->xpt_class->xcl_max_payload;
1565 
1566     if (rqstp->rq_server->sv_max_payload < max)
1567         max = rqstp->rq_server->sv_max_payload;
1568     return max;
1569 }
1570 EXPORT_SYMBOL_GPL(svc_max_payload);
1571 
1572 /**
1573  * svc_proc_name - Return RPC procedure name in string form
1574  * @rqstp: svc_rqst to operate on
1575  *
1576  * Return value:
1577  *   Pointer to a NUL-terminated string
1578  */
1579 const char *svc_proc_name(const struct svc_rqst *rqstp)
1580 {
1581     if (rqstp && rqstp->rq_procinfo)
1582         return rqstp->rq_procinfo->pc_name;
1583     return "unknown";
1584 }
1585 
1586 
1587 /**
1588  * svc_encode_result_payload - mark a range of bytes as a result payload
1589  * @rqstp: svc_rqst to operate on
1590  * @offset: payload's byte offset in rqstp->rq_res
1591  * @length: size of payload, in bytes
1592  *
1593  * Returns zero on success, or a negative errno if a permanent
1594  * error occurred.
1595  */
1596 int svc_encode_result_payload(struct svc_rqst *rqstp, unsigned int offset,
1597                   unsigned int length)
1598 {
1599     return rqstp->rq_xprt->xpt_ops->xpo_result_payload(rqstp, offset,
1600                                length);
1601 }
1602 EXPORT_SYMBOL_GPL(svc_encode_result_payload);
1603 
1604 /**
1605  * svc_fill_write_vector - Construct data argument for VFS write call
1606  * @rqstp: svc_rqst to operate on
1607  * @payload: xdr_buf containing only the write data payload
1608  *
1609  * Fills in rqstp::rq_vec, and returns the number of elements.
1610  */
1611 unsigned int svc_fill_write_vector(struct svc_rqst *rqstp,
1612                    struct xdr_buf *payload)
1613 {
1614     struct page **pages = payload->pages;
1615     struct kvec *first = payload->head;
1616     struct kvec *vec = rqstp->rq_vec;
1617     size_t total = payload->len;
1618     unsigned int i;
1619 
1620     /* Some types of transport can present the write payload
1621      * entirely in rq_arg.pages. In this case, @first is empty.
1622      */
1623     i = 0;
1624     if (first->iov_len) {
1625         vec[i].iov_base = first->iov_base;
1626         vec[i].iov_len = min_t(size_t, total, first->iov_len);
1627         total -= vec[i].iov_len;
1628         ++i;
1629     }
1630 
1631     while (total) {
1632         vec[i].iov_base = page_address(*pages);
1633         vec[i].iov_len = min_t(size_t, total, PAGE_SIZE);
1634         total -= vec[i].iov_len;
1635         ++i;
1636         ++pages;
1637     }
1638 
1639     WARN_ON_ONCE(i > ARRAY_SIZE(rqstp->rq_vec));
1640     return i;
1641 }
1642 EXPORT_SYMBOL_GPL(svc_fill_write_vector);
1643 
1644 /**
1645  * svc_fill_symlink_pathname - Construct pathname argument for VFS symlink call
1646  * @rqstp: svc_rqst to operate on
1647  * @first: buffer containing first section of pathname
1648  * @p: buffer containing remaining section of pathname
1649  * @total: total length of the pathname argument
1650  *
1651  * The VFS symlink API demands a NUL-terminated pathname in mapped memory.
1652  * Returns pointer to a NUL-terminated string, or an ERR_PTR. Caller must free
1653  * the returned string.
1654  */
1655 char *svc_fill_symlink_pathname(struct svc_rqst *rqstp, struct kvec *first,
1656                 void *p, size_t total)
1657 {
1658     size_t len, remaining;
1659     char *result, *dst;
1660 
1661     result = kmalloc(total + 1, GFP_KERNEL);
1662     if (!result)
1663         return ERR_PTR(-ESERVERFAULT);
1664 
1665     dst = result;
1666     remaining = total;
1667 
1668     len = min_t(size_t, total, first->iov_len);
1669     if (len) {
1670         memcpy(dst, first->iov_base, len);
1671         dst += len;
1672         remaining -= len;
1673     }
1674 
1675     if (remaining) {
1676         len = min_t(size_t, remaining, PAGE_SIZE);
1677         memcpy(dst, p, len);
1678         dst += len;
1679     }
1680 
1681     *dst = '\0';
1682 
1683     /* Sanity check: Linux doesn't allow the pathname argument to
1684      * contain a NUL byte.
1685      */
1686     if (strlen(result) != total) {
1687         kfree(result);
1688         return ERR_PTR(-EINVAL);
1689     }
1690     return result;
1691 }
1692 EXPORT_SYMBOL_GPL(svc_fill_symlink_pathname);