Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-only
0002 /*
0003  *  linux/net/sunrpc/clnt.c
0004  *
0005  *  This file contains the high-level RPC interface.
0006  *  It is modeled as a finite state machine to support both synchronous
0007  *  and asynchronous requests.
0008  *
0009  *  -   RPC header generation and argument serialization.
0010  *  -   Credential refresh.
0011  *  -   TCP connect handling.
0012  *  -   Retry of operation when it is suspected the operation failed because
0013  *  of uid squashing on the server, or when the credentials were stale
0014  *  and need to be refreshed, or when a packet was damaged in transit.
0015  *  This may be have to be moved to the VFS layer.
0016  *
0017  *  Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com>
0018  *  Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de>
0019  */
0020 
0021 
0022 #include <linux/module.h>
0023 #include <linux/types.h>
0024 #include <linux/kallsyms.h>
0025 #include <linux/mm.h>
0026 #include <linux/namei.h>
0027 #include <linux/mount.h>
0028 #include <linux/slab.h>
0029 #include <linux/rcupdate.h>
0030 #include <linux/utsname.h>
0031 #include <linux/workqueue.h>
0032 #include <linux/in.h>
0033 #include <linux/in6.h>
0034 #include <linux/un.h>
0035 
0036 #include <linux/sunrpc/clnt.h>
0037 #include <linux/sunrpc/addr.h>
0038 #include <linux/sunrpc/rpc_pipe_fs.h>
0039 #include <linux/sunrpc/metrics.h>
0040 #include <linux/sunrpc/bc_xprt.h>
0041 #include <trace/events/sunrpc.h>
0042 
0043 #include "sunrpc.h"
0044 #include "sysfs.h"
0045 #include "netns.h"
0046 
0047 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
0048 # define RPCDBG_FACILITY    RPCDBG_CALL
0049 #endif
0050 
0051 /*
0052  * All RPC clients are linked into this list
0053  */
0054 
0055 static DECLARE_WAIT_QUEUE_HEAD(destroy_wait);
0056 
0057 
0058 static void call_start(struct rpc_task *task);
0059 static void call_reserve(struct rpc_task *task);
0060 static void call_reserveresult(struct rpc_task *task);
0061 static void call_allocate(struct rpc_task *task);
0062 static void call_encode(struct rpc_task *task);
0063 static void call_decode(struct rpc_task *task);
0064 static void call_bind(struct rpc_task *task);
0065 static void call_bind_status(struct rpc_task *task);
0066 static void call_transmit(struct rpc_task *task);
0067 static void call_status(struct rpc_task *task);
0068 static void call_transmit_status(struct rpc_task *task);
0069 static void call_refresh(struct rpc_task *task);
0070 static void call_refreshresult(struct rpc_task *task);
0071 static void call_connect(struct rpc_task *task);
0072 static void call_connect_status(struct rpc_task *task);
0073 
0074 static int  rpc_encode_header(struct rpc_task *task,
0075                   struct xdr_stream *xdr);
0076 static int  rpc_decode_header(struct rpc_task *task,
0077                   struct xdr_stream *xdr);
0078 static int  rpc_ping(struct rpc_clnt *clnt);
0079 static int  rpc_ping_noreply(struct rpc_clnt *clnt);
0080 static void rpc_check_timeout(struct rpc_task *task);
0081 
0082 static void rpc_register_client(struct rpc_clnt *clnt)
0083 {
0084     struct net *net = rpc_net_ns(clnt);
0085     struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
0086 
0087     spin_lock(&sn->rpc_client_lock);
0088     list_add(&clnt->cl_clients, &sn->all_clients);
0089     spin_unlock(&sn->rpc_client_lock);
0090 }
0091 
0092 static void rpc_unregister_client(struct rpc_clnt *clnt)
0093 {
0094     struct net *net = rpc_net_ns(clnt);
0095     struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
0096 
0097     spin_lock(&sn->rpc_client_lock);
0098     list_del(&clnt->cl_clients);
0099     spin_unlock(&sn->rpc_client_lock);
0100 }
0101 
0102 static void __rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
0103 {
0104     rpc_remove_client_dir(clnt);
0105 }
0106 
0107 static void rpc_clnt_remove_pipedir(struct rpc_clnt *clnt)
0108 {
0109     struct net *net = rpc_net_ns(clnt);
0110     struct super_block *pipefs_sb;
0111 
0112     pipefs_sb = rpc_get_sb_net(net);
0113     if (pipefs_sb) {
0114         __rpc_clnt_remove_pipedir(clnt);
0115         rpc_put_sb_net(net);
0116     }
0117 }
0118 
0119 static struct dentry *rpc_setup_pipedir_sb(struct super_block *sb,
0120                     struct rpc_clnt *clnt)
0121 {
0122     static uint32_t clntid;
0123     const char *dir_name = clnt->cl_program->pipe_dir_name;
0124     char name[15];
0125     struct dentry *dir, *dentry;
0126 
0127     dir = rpc_d_lookup_sb(sb, dir_name);
0128     if (dir == NULL) {
0129         pr_info("RPC: pipefs directory doesn't exist: %s\n", dir_name);
0130         return dir;
0131     }
0132     for (;;) {
0133         snprintf(name, sizeof(name), "clnt%x", (unsigned int)clntid++);
0134         name[sizeof(name) - 1] = '\0';
0135         dentry = rpc_create_client_dir(dir, name, clnt);
0136         if (!IS_ERR(dentry))
0137             break;
0138         if (dentry == ERR_PTR(-EEXIST))
0139             continue;
0140         printk(KERN_INFO "RPC: Couldn't create pipefs entry"
0141                 " %s/%s, error %ld\n",
0142                 dir_name, name, PTR_ERR(dentry));
0143         break;
0144     }
0145     dput(dir);
0146     return dentry;
0147 }
0148 
0149 static int
0150 rpc_setup_pipedir(struct super_block *pipefs_sb, struct rpc_clnt *clnt)
0151 {
0152     struct dentry *dentry;
0153 
0154     if (clnt->cl_program->pipe_dir_name != NULL) {
0155         dentry = rpc_setup_pipedir_sb(pipefs_sb, clnt);
0156         if (IS_ERR(dentry))
0157             return PTR_ERR(dentry);
0158     }
0159     return 0;
0160 }
0161 
0162 static int rpc_clnt_skip_event(struct rpc_clnt *clnt, unsigned long event)
0163 {
0164     if (clnt->cl_program->pipe_dir_name == NULL)
0165         return 1;
0166 
0167     switch (event) {
0168     case RPC_PIPEFS_MOUNT:
0169         if (clnt->cl_pipedir_objects.pdh_dentry != NULL)
0170             return 1;
0171         if (refcount_read(&clnt->cl_count) == 0)
0172             return 1;
0173         break;
0174     case RPC_PIPEFS_UMOUNT:
0175         if (clnt->cl_pipedir_objects.pdh_dentry == NULL)
0176             return 1;
0177         break;
0178     }
0179     return 0;
0180 }
0181 
0182 static int __rpc_clnt_handle_event(struct rpc_clnt *clnt, unsigned long event,
0183                    struct super_block *sb)
0184 {
0185     struct dentry *dentry;
0186 
0187     switch (event) {
0188     case RPC_PIPEFS_MOUNT:
0189         dentry = rpc_setup_pipedir_sb(sb, clnt);
0190         if (!dentry)
0191             return -ENOENT;
0192         if (IS_ERR(dentry))
0193             return PTR_ERR(dentry);
0194         break;
0195     case RPC_PIPEFS_UMOUNT:
0196         __rpc_clnt_remove_pipedir(clnt);
0197         break;
0198     default:
0199         printk(KERN_ERR "%s: unknown event: %ld\n", __func__, event);
0200         return -ENOTSUPP;
0201     }
0202     return 0;
0203 }
0204 
0205 static int __rpc_pipefs_event(struct rpc_clnt *clnt, unsigned long event,
0206                 struct super_block *sb)
0207 {
0208     int error = 0;
0209 
0210     for (;; clnt = clnt->cl_parent) {
0211         if (!rpc_clnt_skip_event(clnt, event))
0212             error = __rpc_clnt_handle_event(clnt, event, sb);
0213         if (error || clnt == clnt->cl_parent)
0214             break;
0215     }
0216     return error;
0217 }
0218 
0219 static struct rpc_clnt *rpc_get_client_for_event(struct net *net, int event)
0220 {
0221     struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
0222     struct rpc_clnt *clnt;
0223 
0224     spin_lock(&sn->rpc_client_lock);
0225     list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
0226         if (rpc_clnt_skip_event(clnt, event))
0227             continue;
0228         spin_unlock(&sn->rpc_client_lock);
0229         return clnt;
0230     }
0231     spin_unlock(&sn->rpc_client_lock);
0232     return NULL;
0233 }
0234 
0235 static int rpc_pipefs_event(struct notifier_block *nb, unsigned long event,
0236                 void *ptr)
0237 {
0238     struct super_block *sb = ptr;
0239     struct rpc_clnt *clnt;
0240     int error = 0;
0241 
0242     while ((clnt = rpc_get_client_for_event(sb->s_fs_info, event))) {
0243         error = __rpc_pipefs_event(clnt, event, sb);
0244         if (error)
0245             break;
0246     }
0247     return error;
0248 }
0249 
0250 static struct notifier_block rpc_clients_block = {
0251     .notifier_call  = rpc_pipefs_event,
0252     .priority   = SUNRPC_PIPEFS_RPC_PRIO,
0253 };
0254 
0255 int rpc_clients_notifier_register(void)
0256 {
0257     return rpc_pipefs_notifier_register(&rpc_clients_block);
0258 }
0259 
0260 void rpc_clients_notifier_unregister(void)
0261 {
0262     return rpc_pipefs_notifier_unregister(&rpc_clients_block);
0263 }
0264 
0265 static struct rpc_xprt *rpc_clnt_set_transport(struct rpc_clnt *clnt,
0266         struct rpc_xprt *xprt,
0267         const struct rpc_timeout *timeout)
0268 {
0269     struct rpc_xprt *old;
0270 
0271     spin_lock(&clnt->cl_lock);
0272     old = rcu_dereference_protected(clnt->cl_xprt,
0273             lockdep_is_held(&clnt->cl_lock));
0274 
0275     if (!xprt_bound(xprt))
0276         clnt->cl_autobind = 1;
0277 
0278     clnt->cl_timeout = timeout;
0279     rcu_assign_pointer(clnt->cl_xprt, xprt);
0280     spin_unlock(&clnt->cl_lock);
0281 
0282     return old;
0283 }
0284 
0285 static void rpc_clnt_set_nodename(struct rpc_clnt *clnt, const char *nodename)
0286 {
0287     clnt->cl_nodelen = strlcpy(clnt->cl_nodename,
0288             nodename, sizeof(clnt->cl_nodename));
0289 }
0290 
0291 static int rpc_client_register(struct rpc_clnt *clnt,
0292                    rpc_authflavor_t pseudoflavor,
0293                    const char *client_name)
0294 {
0295     struct rpc_auth_create_args auth_args = {
0296         .pseudoflavor = pseudoflavor,
0297         .target_name = client_name,
0298     };
0299     struct rpc_auth *auth;
0300     struct net *net = rpc_net_ns(clnt);
0301     struct super_block *pipefs_sb;
0302     int err;
0303 
0304     rpc_clnt_debugfs_register(clnt);
0305 
0306     pipefs_sb = rpc_get_sb_net(net);
0307     if (pipefs_sb) {
0308         err = rpc_setup_pipedir(pipefs_sb, clnt);
0309         if (err)
0310             goto out;
0311     }
0312 
0313     rpc_register_client(clnt);
0314     if (pipefs_sb)
0315         rpc_put_sb_net(net);
0316 
0317     auth = rpcauth_create(&auth_args, clnt);
0318     if (IS_ERR(auth)) {
0319         dprintk("RPC:       Couldn't create auth handle (flavor %u)\n",
0320                 pseudoflavor);
0321         err = PTR_ERR(auth);
0322         goto err_auth;
0323     }
0324     return 0;
0325 err_auth:
0326     pipefs_sb = rpc_get_sb_net(net);
0327     rpc_unregister_client(clnt);
0328     __rpc_clnt_remove_pipedir(clnt);
0329 out:
0330     if (pipefs_sb)
0331         rpc_put_sb_net(net);
0332     rpc_sysfs_client_destroy(clnt);
0333     rpc_clnt_debugfs_unregister(clnt);
0334     return err;
0335 }
0336 
0337 static DEFINE_IDA(rpc_clids);
0338 
0339 void rpc_cleanup_clids(void)
0340 {
0341     ida_destroy(&rpc_clids);
0342 }
0343 
0344 static int rpc_alloc_clid(struct rpc_clnt *clnt)
0345 {
0346     int clid;
0347 
0348     clid = ida_simple_get(&rpc_clids, 0, 0, GFP_KERNEL);
0349     if (clid < 0)
0350         return clid;
0351     clnt->cl_clid = clid;
0352     return 0;
0353 }
0354 
0355 static void rpc_free_clid(struct rpc_clnt *clnt)
0356 {
0357     ida_simple_remove(&rpc_clids, clnt->cl_clid);
0358 }
0359 
0360 static struct rpc_clnt * rpc_new_client(const struct rpc_create_args *args,
0361         struct rpc_xprt_switch *xps,
0362         struct rpc_xprt *xprt,
0363         struct rpc_clnt *parent)
0364 {
0365     const struct rpc_program *program = args->program;
0366     const struct rpc_version *version;
0367     struct rpc_clnt *clnt = NULL;
0368     const struct rpc_timeout *timeout;
0369     const char *nodename = args->nodename;
0370     int err;
0371 
0372     err = rpciod_up();
0373     if (err)
0374         goto out_no_rpciod;
0375 
0376     err = -EINVAL;
0377     if (args->version >= program->nrvers)
0378         goto out_err;
0379     version = program->version[args->version];
0380     if (version == NULL)
0381         goto out_err;
0382 
0383     err = -ENOMEM;
0384     clnt = kzalloc(sizeof(*clnt), GFP_KERNEL);
0385     if (!clnt)
0386         goto out_err;
0387     clnt->cl_parent = parent ? : clnt;
0388 
0389     err = rpc_alloc_clid(clnt);
0390     if (err)
0391         goto out_no_clid;
0392 
0393     clnt->cl_cred     = get_cred(args->cred);
0394     clnt->cl_procinfo = version->procs;
0395     clnt->cl_maxproc  = version->nrprocs;
0396     clnt->cl_prog     = args->prognumber ? : program->number;
0397     clnt->cl_vers     = version->number;
0398     clnt->cl_stats    = program->stats;
0399     clnt->cl_metrics  = rpc_alloc_iostats(clnt);
0400     rpc_init_pipe_dir_head(&clnt->cl_pipedir_objects);
0401     err = -ENOMEM;
0402     if (clnt->cl_metrics == NULL)
0403         goto out_no_stats;
0404     clnt->cl_program  = program;
0405     INIT_LIST_HEAD(&clnt->cl_tasks);
0406     spin_lock_init(&clnt->cl_lock);
0407 
0408     timeout = xprt->timeout;
0409     if (args->timeout != NULL) {
0410         memcpy(&clnt->cl_timeout_default, args->timeout,
0411                 sizeof(clnt->cl_timeout_default));
0412         timeout = &clnt->cl_timeout_default;
0413     }
0414 
0415     rpc_clnt_set_transport(clnt, xprt, timeout);
0416     xprt->main = true;
0417     xprt_iter_init(&clnt->cl_xpi, xps);
0418     xprt_switch_put(xps);
0419 
0420     clnt->cl_rtt = &clnt->cl_rtt_default;
0421     rpc_init_rtt(&clnt->cl_rtt_default, clnt->cl_timeout->to_initval);
0422 
0423     refcount_set(&clnt->cl_count, 1);
0424 
0425     if (nodename == NULL)
0426         nodename = utsname()->nodename;
0427     /* save the nodename */
0428     rpc_clnt_set_nodename(clnt, nodename);
0429 
0430     rpc_sysfs_client_setup(clnt, xps, rpc_net_ns(clnt));
0431     err = rpc_client_register(clnt, args->authflavor, args->client_name);
0432     if (err)
0433         goto out_no_path;
0434     if (parent)
0435         refcount_inc(&parent->cl_count);
0436 
0437     trace_rpc_clnt_new(clnt, xprt, program->name, args->servername);
0438     return clnt;
0439 
0440 out_no_path:
0441     rpc_free_iostats(clnt->cl_metrics);
0442 out_no_stats:
0443     put_cred(clnt->cl_cred);
0444     rpc_free_clid(clnt);
0445 out_no_clid:
0446     kfree(clnt);
0447 out_err:
0448     rpciod_down();
0449 out_no_rpciod:
0450     xprt_switch_put(xps);
0451     xprt_put(xprt);
0452     trace_rpc_clnt_new_err(program->name, args->servername, err);
0453     return ERR_PTR(err);
0454 }
0455 
0456 static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
0457                     struct rpc_xprt *xprt)
0458 {
0459     struct rpc_clnt *clnt = NULL;
0460     struct rpc_xprt_switch *xps;
0461 
0462     if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) {
0463         WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
0464         xps = args->bc_xprt->xpt_bc_xps;
0465         xprt_switch_get(xps);
0466     } else {
0467         xps = xprt_switch_alloc(xprt, GFP_KERNEL);
0468         if (xps == NULL) {
0469             xprt_put(xprt);
0470             return ERR_PTR(-ENOMEM);
0471         }
0472         if (xprt->bc_xprt) {
0473             xprt_switch_get(xps);
0474             xprt->bc_xprt->xpt_bc_xps = xps;
0475         }
0476     }
0477     clnt = rpc_new_client(args, xps, xprt, NULL);
0478     if (IS_ERR(clnt))
0479         return clnt;
0480 
0481     if (!(args->flags & RPC_CLNT_CREATE_NOPING)) {
0482         int err = rpc_ping(clnt);
0483         if (err != 0) {
0484             rpc_shutdown_client(clnt);
0485             return ERR_PTR(err);
0486         }
0487     } else if (args->flags & RPC_CLNT_CREATE_CONNECTED) {
0488         int err = rpc_ping_noreply(clnt);
0489         if (err != 0) {
0490             rpc_shutdown_client(clnt);
0491             return ERR_PTR(err);
0492         }
0493     }
0494 
0495     clnt->cl_softrtry = 1;
0496     if (args->flags & (RPC_CLNT_CREATE_HARDRTRY|RPC_CLNT_CREATE_SOFTERR)) {
0497         clnt->cl_softrtry = 0;
0498         if (args->flags & RPC_CLNT_CREATE_SOFTERR)
0499             clnt->cl_softerr = 1;
0500     }
0501 
0502     if (args->flags & RPC_CLNT_CREATE_AUTOBIND)
0503         clnt->cl_autobind = 1;
0504     if (args->flags & RPC_CLNT_CREATE_NO_RETRANS_TIMEOUT)
0505         clnt->cl_noretranstimeo = 1;
0506     if (args->flags & RPC_CLNT_CREATE_DISCRTRY)
0507         clnt->cl_discrtry = 1;
0508     if (!(args->flags & RPC_CLNT_CREATE_QUIET))
0509         clnt->cl_chatty = 1;
0510 
0511     return clnt;
0512 }
0513 
0514 /**
0515  * rpc_create - create an RPC client and transport with one call
0516  * @args: rpc_clnt create argument structure
0517  *
0518  * Creates and initializes an RPC transport and an RPC client.
0519  *
0520  * It can ping the server in order to determine if it is up, and to see if
0521  * it supports this program and version.  RPC_CLNT_CREATE_NOPING disables
0522  * this behavior so asynchronous tasks can also use rpc_create.
0523  */
0524 struct rpc_clnt *rpc_create(struct rpc_create_args *args)
0525 {
0526     struct rpc_xprt *xprt;
0527     struct xprt_create xprtargs = {
0528         .net = args->net,
0529         .ident = args->protocol,
0530         .srcaddr = args->saddress,
0531         .dstaddr = args->address,
0532         .addrlen = args->addrsize,
0533         .servername = args->servername,
0534         .bc_xprt = args->bc_xprt,
0535     };
0536     char servername[48];
0537     struct rpc_clnt *clnt;
0538     int i;
0539 
0540     if (args->bc_xprt) {
0541         WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC));
0542         xprt = args->bc_xprt->xpt_bc_xprt;
0543         if (xprt) {
0544             xprt_get(xprt);
0545             return rpc_create_xprt(args, xprt);
0546         }
0547     }
0548 
0549     if (args->flags & RPC_CLNT_CREATE_INFINITE_SLOTS)
0550         xprtargs.flags |= XPRT_CREATE_INFINITE_SLOTS;
0551     if (args->flags & RPC_CLNT_CREATE_NO_IDLE_TIMEOUT)
0552         xprtargs.flags |= XPRT_CREATE_NO_IDLE_TIMEOUT;
0553     /*
0554      * If the caller chooses not to specify a hostname, whip
0555      * up a string representation of the passed-in address.
0556      */
0557     if (xprtargs.servername == NULL) {
0558         struct sockaddr_un *sun =
0559                 (struct sockaddr_un *)args->address;
0560         struct sockaddr_in *sin =
0561                 (struct sockaddr_in *)args->address;
0562         struct sockaddr_in6 *sin6 =
0563                 (struct sockaddr_in6 *)args->address;
0564 
0565         servername[0] = '\0';
0566         switch (args->address->sa_family) {
0567         case AF_LOCAL:
0568             snprintf(servername, sizeof(servername), "%s",
0569                  sun->sun_path);
0570             break;
0571         case AF_INET:
0572             snprintf(servername, sizeof(servername), "%pI4",
0573                  &sin->sin_addr.s_addr);
0574             break;
0575         case AF_INET6:
0576             snprintf(servername, sizeof(servername), "%pI6",
0577                  &sin6->sin6_addr);
0578             break;
0579         default:
0580             /* caller wants default server name, but
0581              * address family isn't recognized. */
0582             return ERR_PTR(-EINVAL);
0583         }
0584         xprtargs.servername = servername;
0585     }
0586 
0587     xprt = xprt_create_transport(&xprtargs);
0588     if (IS_ERR(xprt))
0589         return (struct rpc_clnt *)xprt;
0590 
0591     /*
0592      * By default, kernel RPC client connects from a reserved port.
0593      * CAP_NET_BIND_SERVICE will not be set for unprivileged requesters,
0594      * but it is always enabled for rpciod, which handles the connect
0595      * operation.
0596      */
0597     xprt->resvport = 1;
0598     if (args->flags & RPC_CLNT_CREATE_NONPRIVPORT)
0599         xprt->resvport = 0;
0600     xprt->reuseport = 0;
0601     if (args->flags & RPC_CLNT_CREATE_REUSEPORT)
0602         xprt->reuseport = 1;
0603 
0604     clnt = rpc_create_xprt(args, xprt);
0605     if (IS_ERR(clnt) || args->nconnect <= 1)
0606         return clnt;
0607 
0608     for (i = 0; i < args->nconnect - 1; i++) {
0609         if (rpc_clnt_add_xprt(clnt, &xprtargs, NULL, NULL) < 0)
0610             break;
0611     }
0612     return clnt;
0613 }
0614 EXPORT_SYMBOL_GPL(rpc_create);
0615 
0616 /*
0617  * This function clones the RPC client structure. It allows us to share the
0618  * same transport while varying parameters such as the authentication
0619  * flavour.
0620  */
0621 static struct rpc_clnt *__rpc_clone_client(struct rpc_create_args *args,
0622                        struct rpc_clnt *clnt)
0623 {
0624     struct rpc_xprt_switch *xps;
0625     struct rpc_xprt *xprt;
0626     struct rpc_clnt *new;
0627     int err;
0628 
0629     err = -ENOMEM;
0630     rcu_read_lock();
0631     xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
0632     xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
0633     rcu_read_unlock();
0634     if (xprt == NULL || xps == NULL) {
0635         xprt_put(xprt);
0636         xprt_switch_put(xps);
0637         goto out_err;
0638     }
0639     args->servername = xprt->servername;
0640     args->nodename = clnt->cl_nodename;
0641 
0642     new = rpc_new_client(args, xps, xprt, clnt);
0643     if (IS_ERR(new))
0644         return new;
0645 
0646     /* Turn off autobind on clones */
0647     new->cl_autobind = 0;
0648     new->cl_softrtry = clnt->cl_softrtry;
0649     new->cl_softerr = clnt->cl_softerr;
0650     new->cl_noretranstimeo = clnt->cl_noretranstimeo;
0651     new->cl_discrtry = clnt->cl_discrtry;
0652     new->cl_chatty = clnt->cl_chatty;
0653     new->cl_principal = clnt->cl_principal;
0654     new->cl_max_connect = clnt->cl_max_connect;
0655     return new;
0656 
0657 out_err:
0658     trace_rpc_clnt_clone_err(clnt, err);
0659     return ERR_PTR(err);
0660 }
0661 
0662 /**
0663  * rpc_clone_client - Clone an RPC client structure
0664  *
0665  * @clnt: RPC client whose parameters are copied
0666  *
0667  * Returns a fresh RPC client or an ERR_PTR.
0668  */
0669 struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt)
0670 {
0671     struct rpc_create_args args = {
0672         .program    = clnt->cl_program,
0673         .prognumber = clnt->cl_prog,
0674         .version    = clnt->cl_vers,
0675         .authflavor = clnt->cl_auth->au_flavor,
0676         .cred       = clnt->cl_cred,
0677     };
0678     return __rpc_clone_client(&args, clnt);
0679 }
0680 EXPORT_SYMBOL_GPL(rpc_clone_client);
0681 
0682 /**
0683  * rpc_clone_client_set_auth - Clone an RPC client structure and set its auth
0684  *
0685  * @clnt: RPC client whose parameters are copied
0686  * @flavor: security flavor for new client
0687  *
0688  * Returns a fresh RPC client or an ERR_PTR.
0689  */
0690 struct rpc_clnt *
0691 rpc_clone_client_set_auth(struct rpc_clnt *clnt, rpc_authflavor_t flavor)
0692 {
0693     struct rpc_create_args args = {
0694         .program    = clnt->cl_program,
0695         .prognumber = clnt->cl_prog,
0696         .version    = clnt->cl_vers,
0697         .authflavor = flavor,
0698         .cred       = clnt->cl_cred,
0699     };
0700     return __rpc_clone_client(&args, clnt);
0701 }
0702 EXPORT_SYMBOL_GPL(rpc_clone_client_set_auth);
0703 
0704 /**
0705  * rpc_switch_client_transport: switch the RPC transport on the fly
0706  * @clnt: pointer to a struct rpc_clnt
0707  * @args: pointer to the new transport arguments
0708  * @timeout: pointer to the new timeout parameters
0709  *
0710  * This function allows the caller to switch the RPC transport for the
0711  * rpc_clnt structure 'clnt' to allow it to connect to a mirrored NFS
0712  * server, for instance.  It assumes that the caller has ensured that
0713  * there are no active RPC tasks by using some form of locking.
0714  *
0715  * Returns zero if "clnt" is now using the new xprt.  Otherwise a
0716  * negative errno is returned, and "clnt" continues to use the old
0717  * xprt.
0718  */
0719 int rpc_switch_client_transport(struct rpc_clnt *clnt,
0720         struct xprt_create *args,
0721         const struct rpc_timeout *timeout)
0722 {
0723     const struct rpc_timeout *old_timeo;
0724     rpc_authflavor_t pseudoflavor;
0725     struct rpc_xprt_switch *xps, *oldxps;
0726     struct rpc_xprt *xprt, *old;
0727     struct rpc_clnt *parent;
0728     int err;
0729 
0730     xprt = xprt_create_transport(args);
0731     if (IS_ERR(xprt))
0732         return PTR_ERR(xprt);
0733 
0734     xps = xprt_switch_alloc(xprt, GFP_KERNEL);
0735     if (xps == NULL) {
0736         xprt_put(xprt);
0737         return -ENOMEM;
0738     }
0739 
0740     pseudoflavor = clnt->cl_auth->au_flavor;
0741 
0742     old_timeo = clnt->cl_timeout;
0743     old = rpc_clnt_set_transport(clnt, xprt, timeout);
0744     oldxps = xprt_iter_xchg_switch(&clnt->cl_xpi, xps);
0745 
0746     rpc_unregister_client(clnt);
0747     __rpc_clnt_remove_pipedir(clnt);
0748     rpc_sysfs_client_destroy(clnt);
0749     rpc_clnt_debugfs_unregister(clnt);
0750 
0751     /*
0752      * A new transport was created.  "clnt" therefore
0753      * becomes the root of a new cl_parent tree.  clnt's
0754      * children, if it has any, still point to the old xprt.
0755      */
0756     parent = clnt->cl_parent;
0757     clnt->cl_parent = clnt;
0758 
0759     /*
0760      * The old rpc_auth cache cannot be re-used.  GSS
0761      * contexts in particular are between a single
0762      * client and server.
0763      */
0764     err = rpc_client_register(clnt, pseudoflavor, NULL);
0765     if (err)
0766         goto out_revert;
0767 
0768     synchronize_rcu();
0769     if (parent != clnt)
0770         rpc_release_client(parent);
0771     xprt_switch_put(oldxps);
0772     xprt_put(old);
0773     trace_rpc_clnt_replace_xprt(clnt);
0774     return 0;
0775 
0776 out_revert:
0777     xps = xprt_iter_xchg_switch(&clnt->cl_xpi, oldxps);
0778     rpc_clnt_set_transport(clnt, old, old_timeo);
0779     clnt->cl_parent = parent;
0780     rpc_client_register(clnt, pseudoflavor, NULL);
0781     xprt_switch_put(xps);
0782     xprt_put(xprt);
0783     trace_rpc_clnt_replace_xprt_err(clnt);
0784     return err;
0785 }
0786 EXPORT_SYMBOL_GPL(rpc_switch_client_transport);
0787 
0788 static
0789 int _rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi,
0790                  void func(struct rpc_xprt_iter *xpi, struct rpc_xprt_switch *xps))
0791 {
0792     struct rpc_xprt_switch *xps;
0793 
0794     rcu_read_lock();
0795     xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
0796     rcu_read_unlock();
0797     if (xps == NULL)
0798         return -EAGAIN;
0799     func(xpi, xps);
0800     xprt_switch_put(xps);
0801     return 0;
0802 }
0803 
0804 static
0805 int rpc_clnt_xprt_iter_init(struct rpc_clnt *clnt, struct rpc_xprt_iter *xpi)
0806 {
0807     return _rpc_clnt_xprt_iter_init(clnt, xpi, xprt_iter_init_listall);
0808 }
0809 
0810 static
0811 int rpc_clnt_xprt_iter_offline_init(struct rpc_clnt *clnt,
0812                     struct rpc_xprt_iter *xpi)
0813 {
0814     return _rpc_clnt_xprt_iter_init(clnt, xpi, xprt_iter_init_listoffline);
0815 }
0816 
0817 /**
0818  * rpc_clnt_iterate_for_each_xprt - Apply a function to all transports
0819  * @clnt: pointer to client
0820  * @fn: function to apply
0821  * @data: void pointer to function data
0822  *
0823  * Iterates through the list of RPC transports currently attached to the
0824  * client and applies the function fn(clnt, xprt, data).
0825  *
0826  * On error, the iteration stops, and the function returns the error value.
0827  */
0828 int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
0829         int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
0830         void *data)
0831 {
0832     struct rpc_xprt_iter xpi;
0833     int ret;
0834 
0835     ret = rpc_clnt_xprt_iter_init(clnt, &xpi);
0836     if (ret)
0837         return ret;
0838     for (;;) {
0839         struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
0840 
0841         if (!xprt)
0842             break;
0843         ret = fn(clnt, xprt, data);
0844         xprt_put(xprt);
0845         if (ret < 0)
0846             break;
0847     }
0848     xprt_iter_destroy(&xpi);
0849     return ret;
0850 }
0851 EXPORT_SYMBOL_GPL(rpc_clnt_iterate_for_each_xprt);
0852 
0853 /*
0854  * Kill all tasks for the given client.
0855  * XXX: kill their descendants as well?
0856  */
0857 void rpc_killall_tasks(struct rpc_clnt *clnt)
0858 {
0859     struct rpc_task *rovr;
0860 
0861 
0862     if (list_empty(&clnt->cl_tasks))
0863         return;
0864 
0865     /*
0866      * Spin lock all_tasks to prevent changes...
0867      */
0868     trace_rpc_clnt_killall(clnt);
0869     spin_lock(&clnt->cl_lock);
0870     list_for_each_entry(rovr, &clnt->cl_tasks, tk_task)
0871         rpc_signal_task(rovr);
0872     spin_unlock(&clnt->cl_lock);
0873 }
0874 EXPORT_SYMBOL_GPL(rpc_killall_tasks);
0875 
0876 /*
0877  * Properly shut down an RPC client, terminating all outstanding
0878  * requests.
0879  */
0880 void rpc_shutdown_client(struct rpc_clnt *clnt)
0881 {
0882     might_sleep();
0883 
0884     trace_rpc_clnt_shutdown(clnt);
0885 
0886     while (!list_empty(&clnt->cl_tasks)) {
0887         rpc_killall_tasks(clnt);
0888         wait_event_timeout(destroy_wait,
0889             list_empty(&clnt->cl_tasks), 1*HZ);
0890     }
0891 
0892     rpc_release_client(clnt);
0893 }
0894 EXPORT_SYMBOL_GPL(rpc_shutdown_client);
0895 
0896 /*
0897  * Free an RPC client
0898  */
0899 static void rpc_free_client_work(struct work_struct *work)
0900 {
0901     struct rpc_clnt *clnt = container_of(work, struct rpc_clnt, cl_work);
0902 
0903     trace_rpc_clnt_free(clnt);
0904 
0905     /* These might block on processes that might allocate memory,
0906      * so they cannot be called in rpciod, so they are handled separately
0907      * here.
0908      */
0909     rpc_sysfs_client_destroy(clnt);
0910     rpc_clnt_debugfs_unregister(clnt);
0911     rpc_free_clid(clnt);
0912     rpc_clnt_remove_pipedir(clnt);
0913     xprt_put(rcu_dereference_raw(clnt->cl_xprt));
0914 
0915     kfree(clnt);
0916     rpciod_down();
0917 }
0918 static struct rpc_clnt *
0919 rpc_free_client(struct rpc_clnt *clnt)
0920 {
0921     struct rpc_clnt *parent = NULL;
0922 
0923     trace_rpc_clnt_release(clnt);
0924     if (clnt->cl_parent != clnt)
0925         parent = clnt->cl_parent;
0926     rpc_unregister_client(clnt);
0927     rpc_free_iostats(clnt->cl_metrics);
0928     clnt->cl_metrics = NULL;
0929     xprt_iter_destroy(&clnt->cl_xpi);
0930     put_cred(clnt->cl_cred);
0931 
0932     INIT_WORK(&clnt->cl_work, rpc_free_client_work);
0933     schedule_work(&clnt->cl_work);
0934     return parent;
0935 }
0936 
0937 /*
0938  * Free an RPC client
0939  */
0940 static struct rpc_clnt *
0941 rpc_free_auth(struct rpc_clnt *clnt)
0942 {
0943     /*
0944      * Note: RPCSEC_GSS may need to send NULL RPC calls in order to
0945      *       release remaining GSS contexts. This mechanism ensures
0946      *       that it can do so safely.
0947      */
0948     if (clnt->cl_auth != NULL) {
0949         rpcauth_release(clnt->cl_auth);
0950         clnt->cl_auth = NULL;
0951     }
0952     if (refcount_dec_and_test(&clnt->cl_count))
0953         return rpc_free_client(clnt);
0954     return NULL;
0955 }
0956 
0957 /*
0958  * Release reference to the RPC client
0959  */
0960 void
0961 rpc_release_client(struct rpc_clnt *clnt)
0962 {
0963     do {
0964         if (list_empty(&clnt->cl_tasks))
0965             wake_up(&destroy_wait);
0966         if (refcount_dec_not_one(&clnt->cl_count))
0967             break;
0968         clnt = rpc_free_auth(clnt);
0969     } while (clnt != NULL);
0970 }
0971 EXPORT_SYMBOL_GPL(rpc_release_client);
0972 
0973 /**
0974  * rpc_bind_new_program - bind a new RPC program to an existing client
0975  * @old: old rpc_client
0976  * @program: rpc program to set
0977  * @vers: rpc program version
0978  *
0979  * Clones the rpc client and sets up a new RPC program. This is mainly
0980  * of use for enabling different RPC programs to share the same transport.
0981  * The Sun NFSv2/v3 ACL protocol can do this.
0982  */
0983 struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *old,
0984                       const struct rpc_program *program,
0985                       u32 vers)
0986 {
0987     struct rpc_create_args args = {
0988         .program    = program,
0989         .prognumber = program->number,
0990         .version    = vers,
0991         .authflavor = old->cl_auth->au_flavor,
0992         .cred       = old->cl_cred,
0993     };
0994     struct rpc_clnt *clnt;
0995     int err;
0996 
0997     clnt = __rpc_clone_client(&args, old);
0998     if (IS_ERR(clnt))
0999         goto out;
1000     err = rpc_ping(clnt);
1001     if (err != 0) {
1002         rpc_shutdown_client(clnt);
1003         clnt = ERR_PTR(err);
1004     }
1005 out:
1006     return clnt;
1007 }
1008 EXPORT_SYMBOL_GPL(rpc_bind_new_program);
1009 
1010 struct rpc_xprt *
1011 rpc_task_get_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1012 {
1013     struct rpc_xprt_switch *xps;
1014 
1015     if (!xprt)
1016         return NULL;
1017     rcu_read_lock();
1018     xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1019     atomic_long_inc(&xps->xps_queuelen);
1020     rcu_read_unlock();
1021     atomic_long_inc(&xprt->queuelen);
1022 
1023     return xprt;
1024 }
1025 
1026 static void
1027 rpc_task_release_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
1028 {
1029     struct rpc_xprt_switch *xps;
1030 
1031     atomic_long_dec(&xprt->queuelen);
1032     rcu_read_lock();
1033     xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
1034     atomic_long_dec(&xps->xps_queuelen);
1035     rcu_read_unlock();
1036 
1037     xprt_put(xprt);
1038 }
1039 
1040 void rpc_task_release_transport(struct rpc_task *task)
1041 {
1042     struct rpc_xprt *xprt = task->tk_xprt;
1043 
1044     if (xprt) {
1045         task->tk_xprt = NULL;
1046         if (task->tk_client)
1047             rpc_task_release_xprt(task->tk_client, xprt);
1048         else
1049             xprt_put(xprt);
1050     }
1051 }
1052 EXPORT_SYMBOL_GPL(rpc_task_release_transport);
1053 
1054 void rpc_task_release_client(struct rpc_task *task)
1055 {
1056     struct rpc_clnt *clnt = task->tk_client;
1057 
1058     rpc_task_release_transport(task);
1059     if (clnt != NULL) {
1060         /* Remove from client task list */
1061         spin_lock(&clnt->cl_lock);
1062         list_del(&task->tk_task);
1063         spin_unlock(&clnt->cl_lock);
1064         task->tk_client = NULL;
1065 
1066         rpc_release_client(clnt);
1067     }
1068 }
1069 
1070 static struct rpc_xprt *
1071 rpc_task_get_first_xprt(struct rpc_clnt *clnt)
1072 {
1073     struct rpc_xprt *xprt;
1074 
1075     rcu_read_lock();
1076     xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
1077     rcu_read_unlock();
1078     return rpc_task_get_xprt(clnt, xprt);
1079 }
1080 
1081 static struct rpc_xprt *
1082 rpc_task_get_next_xprt(struct rpc_clnt *clnt)
1083 {
1084     return rpc_task_get_xprt(clnt, xprt_iter_get_next(&clnt->cl_xpi));
1085 }
1086 
1087 static
1088 void rpc_task_set_transport(struct rpc_task *task, struct rpc_clnt *clnt)
1089 {
1090     if (task->tk_xprt) {
1091         if (!(test_bit(XPRT_OFFLINE, &task->tk_xprt->state) &&
1092               (task->tk_flags & RPC_TASK_MOVEABLE)))
1093             return;
1094         xprt_release(task);
1095         xprt_put(task->tk_xprt);
1096     }
1097     if (task->tk_flags & RPC_TASK_NO_ROUND_ROBIN)
1098         task->tk_xprt = rpc_task_get_first_xprt(clnt);
1099     else
1100         task->tk_xprt = rpc_task_get_next_xprt(clnt);
1101 }
1102 
1103 static
1104 void rpc_task_set_client(struct rpc_task *task, struct rpc_clnt *clnt)
1105 {
1106     rpc_task_set_transport(task, clnt);
1107     task->tk_client = clnt;
1108     refcount_inc(&clnt->cl_count);
1109     if (clnt->cl_softrtry)
1110         task->tk_flags |= RPC_TASK_SOFT;
1111     if (clnt->cl_softerr)
1112         task->tk_flags |= RPC_TASK_TIMEOUT;
1113     if (clnt->cl_noretranstimeo)
1114         task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
1115     /* Add to the client's list of all tasks */
1116     spin_lock(&clnt->cl_lock);
1117     list_add_tail(&task->tk_task, &clnt->cl_tasks);
1118     spin_unlock(&clnt->cl_lock);
1119 }
1120 
1121 static void
1122 rpc_task_set_rpc_message(struct rpc_task *task, const struct rpc_message *msg)
1123 {
1124     if (msg != NULL) {
1125         task->tk_msg.rpc_proc = msg->rpc_proc;
1126         task->tk_msg.rpc_argp = msg->rpc_argp;
1127         task->tk_msg.rpc_resp = msg->rpc_resp;
1128         task->tk_msg.rpc_cred = msg->rpc_cred;
1129         if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
1130             get_cred(task->tk_msg.rpc_cred);
1131     }
1132 }
1133 
1134 /*
1135  * Default callback for async RPC calls
1136  */
1137 static void
1138 rpc_default_callback(struct rpc_task *task, void *data)
1139 {
1140 }
1141 
1142 static const struct rpc_call_ops rpc_default_ops = {
1143     .rpc_call_done = rpc_default_callback,
1144 };
1145 
1146 /**
1147  * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it
1148  * @task_setup_data: pointer to task initialisation data
1149  */
1150 struct rpc_task *rpc_run_task(const struct rpc_task_setup *task_setup_data)
1151 {
1152     struct rpc_task *task;
1153 
1154     task = rpc_new_task(task_setup_data);
1155     if (IS_ERR(task))
1156         return task;
1157 
1158     if (!RPC_IS_ASYNC(task))
1159         task->tk_flags |= RPC_TASK_CRED_NOREF;
1160 
1161     rpc_task_set_client(task, task_setup_data->rpc_client);
1162     rpc_task_set_rpc_message(task, task_setup_data->rpc_message);
1163 
1164     if (task->tk_action == NULL)
1165         rpc_call_start(task);
1166 
1167     atomic_inc(&task->tk_count);
1168     rpc_execute(task);
1169     return task;
1170 }
1171 EXPORT_SYMBOL_GPL(rpc_run_task);
1172 
1173 /**
1174  * rpc_call_sync - Perform a synchronous RPC call
1175  * @clnt: pointer to RPC client
1176  * @msg: RPC call parameters
1177  * @flags: RPC call flags
1178  */
1179 int rpc_call_sync(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags)
1180 {
1181     struct rpc_task *task;
1182     struct rpc_task_setup task_setup_data = {
1183         .rpc_client = clnt,
1184         .rpc_message = msg,
1185         .callback_ops = &rpc_default_ops,
1186         .flags = flags,
1187     };
1188     int status;
1189 
1190     WARN_ON_ONCE(flags & RPC_TASK_ASYNC);
1191     if (flags & RPC_TASK_ASYNC) {
1192         rpc_release_calldata(task_setup_data.callback_ops,
1193             task_setup_data.callback_data);
1194         return -EINVAL;
1195     }
1196 
1197     task = rpc_run_task(&task_setup_data);
1198     if (IS_ERR(task))
1199         return PTR_ERR(task);
1200     status = task->tk_status;
1201     rpc_put_task(task);
1202     return status;
1203 }
1204 EXPORT_SYMBOL_GPL(rpc_call_sync);
1205 
1206 /**
1207  * rpc_call_async - Perform an asynchronous RPC call
1208  * @clnt: pointer to RPC client
1209  * @msg: RPC call parameters
1210  * @flags: RPC call flags
1211  * @tk_ops: RPC call ops
1212  * @data: user call data
1213  */
1214 int
1215 rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
1216            const struct rpc_call_ops *tk_ops, void *data)
1217 {
1218     struct rpc_task *task;
1219     struct rpc_task_setup task_setup_data = {
1220         .rpc_client = clnt,
1221         .rpc_message = msg,
1222         .callback_ops = tk_ops,
1223         .callback_data = data,
1224         .flags = flags|RPC_TASK_ASYNC,
1225     };
1226 
1227     task = rpc_run_task(&task_setup_data);
1228     if (IS_ERR(task))
1229         return PTR_ERR(task);
1230     rpc_put_task(task);
1231     return 0;
1232 }
1233 EXPORT_SYMBOL_GPL(rpc_call_async);
1234 
1235 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1236 static void call_bc_encode(struct rpc_task *task);
1237 
1238 /**
1239  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
1240  * rpc_execute against it
1241  * @req: RPC request
1242  */
1243 struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
1244 {
1245     struct rpc_task *task;
1246     struct rpc_task_setup task_setup_data = {
1247         .callback_ops = &rpc_default_ops,
1248         .flags = RPC_TASK_SOFTCONN |
1249             RPC_TASK_NO_RETRANS_TIMEOUT,
1250     };
1251 
1252     dprintk("RPC: rpc_run_bc_task req= %p\n", req);
1253     /*
1254      * Create an rpc_task to send the data
1255      */
1256     task = rpc_new_task(&task_setup_data);
1257     if (IS_ERR(task)) {
1258         xprt_free_bc_request(req);
1259         return task;
1260     }
1261 
1262     xprt_init_bc_request(req, task);
1263 
1264     task->tk_action = call_bc_encode;
1265     atomic_inc(&task->tk_count);
1266     WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
1267     rpc_execute(task);
1268 
1269     dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
1270     return task;
1271 }
1272 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
1273 
1274 /**
1275  * rpc_prepare_reply_pages - Prepare to receive a reply data payload into pages
1276  * @req: RPC request to prepare
1277  * @pages: vector of struct page pointers
1278  * @base: offset in first page where receive should start, in bytes
1279  * @len: expected size of the upper layer data payload, in bytes
1280  * @hdrsize: expected size of upper layer reply header, in XDR words
1281  *
1282  */
1283 void rpc_prepare_reply_pages(struct rpc_rqst *req, struct page **pages,
1284                  unsigned int base, unsigned int len,
1285                  unsigned int hdrsize)
1286 {
1287     hdrsize += RPC_REPHDRSIZE + req->rq_cred->cr_auth->au_ralign;
1288 
1289     xdr_inline_pages(&req->rq_rcv_buf, hdrsize << 2, pages, base, len);
1290     trace_rpc_xdr_reply_pages(req->rq_task, &req->rq_rcv_buf);
1291 }
1292 EXPORT_SYMBOL_GPL(rpc_prepare_reply_pages);
1293 
1294 void
1295 rpc_call_start(struct rpc_task *task)
1296 {
1297     task->tk_action = call_start;
1298 }
1299 EXPORT_SYMBOL_GPL(rpc_call_start);
1300 
1301 /**
1302  * rpc_peeraddr - extract remote peer address from clnt's xprt
1303  * @clnt: RPC client structure
1304  * @buf: target buffer
1305  * @bufsize: length of target buffer
1306  *
1307  * Returns the number of bytes that are actually in the stored address.
1308  */
1309 size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize)
1310 {
1311     size_t bytes;
1312     struct rpc_xprt *xprt;
1313 
1314     rcu_read_lock();
1315     xprt = rcu_dereference(clnt->cl_xprt);
1316 
1317     bytes = xprt->addrlen;
1318     if (bytes > bufsize)
1319         bytes = bufsize;
1320     memcpy(buf, &xprt->addr, bytes);
1321     rcu_read_unlock();
1322 
1323     return bytes;
1324 }
1325 EXPORT_SYMBOL_GPL(rpc_peeraddr);
1326 
1327 /**
1328  * rpc_peeraddr2str - return remote peer address in printable format
1329  * @clnt: RPC client structure
1330  * @format: address format
1331  *
1332  * NB: the lifetime of the memory referenced by the returned pointer is
1333  * the same as the rpc_xprt itself.  As long as the caller uses this
1334  * pointer, it must hold the RCU read lock.
1335  */
1336 const char *rpc_peeraddr2str(struct rpc_clnt *clnt,
1337                  enum rpc_display_format_t format)
1338 {
1339     struct rpc_xprt *xprt;
1340 
1341     xprt = rcu_dereference(clnt->cl_xprt);
1342 
1343     if (xprt->address_strings[format] != NULL)
1344         return xprt->address_strings[format];
1345     else
1346         return "unprintable";
1347 }
1348 EXPORT_SYMBOL_GPL(rpc_peeraddr2str);
1349 
1350 static const struct sockaddr_in rpc_inaddr_loopback = {
1351     .sin_family     = AF_INET,
1352     .sin_addr.s_addr    = htonl(INADDR_ANY),
1353 };
1354 
1355 static const struct sockaddr_in6 rpc_in6addr_loopback = {
1356     .sin6_family        = AF_INET6,
1357     .sin6_addr      = IN6ADDR_ANY_INIT,
1358 };
1359 
1360 /*
1361  * Try a getsockname() on a connected datagram socket.  Using a
1362  * connected datagram socket prevents leaving a socket in TIME_WAIT.
1363  * This conserves the ephemeral port number space.
1364  *
1365  * Returns zero and fills in "buf" if successful; otherwise, a
1366  * negative errno is returned.
1367  */
1368 static int rpc_sockname(struct net *net, struct sockaddr *sap, size_t salen,
1369             struct sockaddr *buf)
1370 {
1371     struct socket *sock;
1372     int err;
1373 
1374     err = __sock_create(net, sap->sa_family,
1375                 SOCK_DGRAM, IPPROTO_UDP, &sock, 1);
1376     if (err < 0) {
1377         dprintk("RPC:       can't create UDP socket (%d)\n", err);
1378         goto out;
1379     }
1380 
1381     switch (sap->sa_family) {
1382     case AF_INET:
1383         err = kernel_bind(sock,
1384                 (struct sockaddr *)&rpc_inaddr_loopback,
1385                 sizeof(rpc_inaddr_loopback));
1386         break;
1387     case AF_INET6:
1388         err = kernel_bind(sock,
1389                 (struct sockaddr *)&rpc_in6addr_loopback,
1390                 sizeof(rpc_in6addr_loopback));
1391         break;
1392     default:
1393         err = -EAFNOSUPPORT;
1394         goto out;
1395     }
1396     if (err < 0) {
1397         dprintk("RPC:       can't bind UDP socket (%d)\n", err);
1398         goto out_release;
1399     }
1400 
1401     err = kernel_connect(sock, sap, salen, 0);
1402     if (err < 0) {
1403         dprintk("RPC:       can't connect UDP socket (%d)\n", err);
1404         goto out_release;
1405     }
1406 
1407     err = kernel_getsockname(sock, buf);
1408     if (err < 0) {
1409         dprintk("RPC:       getsockname failed (%d)\n", err);
1410         goto out_release;
1411     }
1412 
1413     err = 0;
1414     if (buf->sa_family == AF_INET6) {
1415         struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)buf;
1416         sin6->sin6_scope_id = 0;
1417     }
1418     dprintk("RPC:       %s succeeded\n", __func__);
1419 
1420 out_release:
1421     sock_release(sock);
1422 out:
1423     return err;
1424 }
1425 
1426 /*
1427  * Scraping a connected socket failed, so we don't have a useable
1428  * local address.  Fallback: generate an address that will prevent
1429  * the server from calling us back.
1430  *
1431  * Returns zero and fills in "buf" if successful; otherwise, a
1432  * negative errno is returned.
1433  */
1434 static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen)
1435 {
1436     switch (family) {
1437     case AF_INET:
1438         if (buflen < sizeof(rpc_inaddr_loopback))
1439             return -EINVAL;
1440         memcpy(buf, &rpc_inaddr_loopback,
1441                 sizeof(rpc_inaddr_loopback));
1442         break;
1443     case AF_INET6:
1444         if (buflen < sizeof(rpc_in6addr_loopback))
1445             return -EINVAL;
1446         memcpy(buf, &rpc_in6addr_loopback,
1447                 sizeof(rpc_in6addr_loopback));
1448         break;
1449     default:
1450         dprintk("RPC:       %s: address family not supported\n",
1451             __func__);
1452         return -EAFNOSUPPORT;
1453     }
1454     dprintk("RPC:       %s: succeeded\n", __func__);
1455     return 0;
1456 }
1457 
1458 /**
1459  * rpc_localaddr - discover local endpoint address for an RPC client
1460  * @clnt: RPC client structure
1461  * @buf: target buffer
1462  * @buflen: size of target buffer, in bytes
1463  *
1464  * Returns zero and fills in "buf" and "buflen" if successful;
1465  * otherwise, a negative errno is returned.
1466  *
1467  * This works even if the underlying transport is not currently connected,
1468  * or if the upper layer never previously provided a source address.
1469  *
1470  * The result of this function call is transient: multiple calls in
1471  * succession may give different results, depending on how local
1472  * networking configuration changes over time.
1473  */
1474 int rpc_localaddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t buflen)
1475 {
1476     struct sockaddr_storage address;
1477     struct sockaddr *sap = (struct sockaddr *)&address;
1478     struct rpc_xprt *xprt;
1479     struct net *net;
1480     size_t salen;
1481     int err;
1482 
1483     rcu_read_lock();
1484     xprt = rcu_dereference(clnt->cl_xprt);
1485     salen = xprt->addrlen;
1486     memcpy(sap, &xprt->addr, salen);
1487     net = get_net(xprt->xprt_net);
1488     rcu_read_unlock();
1489 
1490     rpc_set_port(sap, 0);
1491     err = rpc_sockname(net, sap, salen, buf);
1492     put_net(net);
1493     if (err != 0)
1494         /* Couldn't discover local address, return ANYADDR */
1495         return rpc_anyaddr(sap->sa_family, buf, buflen);
1496     return 0;
1497 }
1498 EXPORT_SYMBOL_GPL(rpc_localaddr);
1499 
1500 void
1501 rpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize)
1502 {
1503     struct rpc_xprt *xprt;
1504 
1505     rcu_read_lock();
1506     xprt = rcu_dereference(clnt->cl_xprt);
1507     if (xprt->ops->set_buffer_size)
1508         xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);
1509     rcu_read_unlock();
1510 }
1511 EXPORT_SYMBOL_GPL(rpc_setbufsize);
1512 
1513 /**
1514  * rpc_net_ns - Get the network namespace for this RPC client
1515  * @clnt: RPC client to query
1516  *
1517  */
1518 struct net *rpc_net_ns(struct rpc_clnt *clnt)
1519 {
1520     struct net *ret;
1521 
1522     rcu_read_lock();
1523     ret = rcu_dereference(clnt->cl_xprt)->xprt_net;
1524     rcu_read_unlock();
1525     return ret;
1526 }
1527 EXPORT_SYMBOL_GPL(rpc_net_ns);
1528 
1529 /**
1530  * rpc_max_payload - Get maximum payload size for a transport, in bytes
1531  * @clnt: RPC client to query
1532  *
1533  * For stream transports, this is one RPC record fragment (see RFC
1534  * 1831), as we don't support multi-record requests yet.  For datagram
1535  * transports, this is the size of an IP packet minus the IP, UDP, and
1536  * RPC header sizes.
1537  */
1538 size_t rpc_max_payload(struct rpc_clnt *clnt)
1539 {
1540     size_t ret;
1541 
1542     rcu_read_lock();
1543     ret = rcu_dereference(clnt->cl_xprt)->max_payload;
1544     rcu_read_unlock();
1545     return ret;
1546 }
1547 EXPORT_SYMBOL_GPL(rpc_max_payload);
1548 
1549 /**
1550  * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes
1551  * @clnt: RPC client to query
1552  */
1553 size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
1554 {
1555     struct rpc_xprt *xprt;
1556     size_t ret;
1557 
1558     rcu_read_lock();
1559     xprt = rcu_dereference(clnt->cl_xprt);
1560     ret = xprt->ops->bc_maxpayload(xprt);
1561     rcu_read_unlock();
1562     return ret;
1563 }
1564 EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
1565 
1566 unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
1567 {
1568     struct rpc_xprt *xprt;
1569     unsigned int ret;
1570 
1571     rcu_read_lock();
1572     xprt = rcu_dereference(clnt->cl_xprt);
1573     ret = xprt->ops->bc_num_slots(xprt);
1574     rcu_read_unlock();
1575     return ret;
1576 }
1577 EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
1578 
1579 /**
1580  * rpc_force_rebind - force transport to check that remote port is unchanged
1581  * @clnt: client to rebind
1582  *
1583  */
1584 void rpc_force_rebind(struct rpc_clnt *clnt)
1585 {
1586     if (clnt->cl_autobind) {
1587         rcu_read_lock();
1588         xprt_clear_bound(rcu_dereference(clnt->cl_xprt));
1589         rcu_read_unlock();
1590     }
1591 }
1592 EXPORT_SYMBOL_GPL(rpc_force_rebind);
1593 
1594 static int
1595 __rpc_restart_call(struct rpc_task *task, void (*action)(struct rpc_task *))
1596 {
1597     task->tk_status = 0;
1598     task->tk_rpc_status = 0;
1599     task->tk_action = action;
1600     return 1;
1601 }
1602 
1603 /*
1604  * Restart an (async) RPC call. Usually called from within the
1605  * exit handler.
1606  */
1607 int
1608 rpc_restart_call(struct rpc_task *task)
1609 {
1610     return __rpc_restart_call(task, call_start);
1611 }
1612 EXPORT_SYMBOL_GPL(rpc_restart_call);
1613 
1614 /*
1615  * Restart an (async) RPC call from the call_prepare state.
1616  * Usually called from within the exit handler.
1617  */
1618 int
1619 rpc_restart_call_prepare(struct rpc_task *task)
1620 {
1621     if (task->tk_ops->rpc_call_prepare != NULL)
1622         return __rpc_restart_call(task, rpc_prepare_task);
1623     return rpc_restart_call(task);
1624 }
1625 EXPORT_SYMBOL_GPL(rpc_restart_call_prepare);
1626 
1627 const char
1628 *rpc_proc_name(const struct rpc_task *task)
1629 {
1630     const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1631 
1632     if (proc) {
1633         if (proc->p_name)
1634             return proc->p_name;
1635         else
1636             return "NULL";
1637     } else
1638         return "no proc";
1639 }
1640 
1641 static void
1642 __rpc_call_rpcerror(struct rpc_task *task, int tk_status, int rpc_status)
1643 {
1644     trace_rpc_call_rpcerror(task, tk_status, rpc_status);
1645     task->tk_rpc_status = rpc_status;
1646     rpc_exit(task, tk_status);
1647 }
1648 
1649 static void
1650 rpc_call_rpcerror(struct rpc_task *task, int status)
1651 {
1652     __rpc_call_rpcerror(task, status, status);
1653 }
1654 
1655 /*
1656  * 0.  Initial state
1657  *
1658  *     Other FSM states can be visited zero or more times, but
1659  *     this state is visited exactly once for each RPC.
1660  */
1661 static void
1662 call_start(struct rpc_task *task)
1663 {
1664     struct rpc_clnt *clnt = task->tk_client;
1665     int idx = task->tk_msg.rpc_proc->p_statidx;
1666 
1667     trace_rpc_request(task);
1668 
1669     /* Increment call count (version might not be valid for ping) */
1670     if (clnt->cl_program->version[clnt->cl_vers])
1671         clnt->cl_program->version[clnt->cl_vers]->counts[idx]++;
1672     clnt->cl_stats->rpccnt++;
1673     task->tk_action = call_reserve;
1674     rpc_task_set_transport(task, clnt);
1675 }
1676 
1677 /*
1678  * 1.   Reserve an RPC call slot
1679  */
1680 static void
1681 call_reserve(struct rpc_task *task)
1682 {
1683     task->tk_status  = 0;
1684     task->tk_action  = call_reserveresult;
1685     xprt_reserve(task);
1686 }
1687 
1688 static void call_retry_reserve(struct rpc_task *task);
1689 
1690 /*
1691  * 1b.  Grok the result of xprt_reserve()
1692  */
1693 static void
1694 call_reserveresult(struct rpc_task *task)
1695 {
1696     int status = task->tk_status;
1697 
1698     /*
1699      * After a call to xprt_reserve(), we must have either
1700      * a request slot or else an error status.
1701      */
1702     task->tk_status = 0;
1703     if (status >= 0) {
1704         if (task->tk_rqstp) {
1705             task->tk_action = call_refresh;
1706             return;
1707         }
1708 
1709         rpc_call_rpcerror(task, -EIO);
1710         return;
1711     }
1712 
1713     switch (status) {
1714     case -ENOMEM:
1715         rpc_delay(task, HZ >> 2);
1716         fallthrough;
1717     case -EAGAIN:   /* woken up; retry */
1718         task->tk_action = call_retry_reserve;
1719         return;
1720     default:
1721         rpc_call_rpcerror(task, status);
1722     }
1723 }
1724 
1725 /*
1726  * 1c.  Retry reserving an RPC call slot
1727  */
1728 static void
1729 call_retry_reserve(struct rpc_task *task)
1730 {
1731     task->tk_status  = 0;
1732     task->tk_action  = call_reserveresult;
1733     xprt_retry_reserve(task);
1734 }
1735 
1736 /*
1737  * 2.   Bind and/or refresh the credentials
1738  */
1739 static void
1740 call_refresh(struct rpc_task *task)
1741 {
1742     task->tk_action = call_refreshresult;
1743     task->tk_status = 0;
1744     task->tk_client->cl_stats->rpcauthrefresh++;
1745     rpcauth_refreshcred(task);
1746 }
1747 
1748 /*
1749  * 2a.  Process the results of a credential refresh
1750  */
1751 static void
1752 call_refreshresult(struct rpc_task *task)
1753 {
1754     int status = task->tk_status;
1755 
1756     task->tk_status = 0;
1757     task->tk_action = call_refresh;
1758     switch (status) {
1759     case 0:
1760         if (rpcauth_uptodatecred(task)) {
1761             task->tk_action = call_allocate;
1762             return;
1763         }
1764         /* Use rate-limiting and a max number of retries if refresh
1765          * had status 0 but failed to update the cred.
1766          */
1767         fallthrough;
1768     case -ETIMEDOUT:
1769         rpc_delay(task, 3*HZ);
1770         fallthrough;
1771     case -EAGAIN:
1772         status = -EACCES;
1773         fallthrough;
1774     case -EKEYEXPIRED:
1775         if (!task->tk_cred_retry)
1776             break;
1777         task->tk_cred_retry--;
1778         trace_rpc_retry_refresh_status(task);
1779         return;
1780     case -ENOMEM:
1781         rpc_delay(task, HZ >> 4);
1782         return;
1783     }
1784     trace_rpc_refresh_status(task);
1785     rpc_call_rpcerror(task, status);
1786 }
1787 
1788 /*
1789  * 2b.  Allocate the buffer. For details, see sched.c:rpc_malloc.
1790  *  (Note: buffer memory is freed in xprt_release).
1791  */
1792 static void
1793 call_allocate(struct rpc_task *task)
1794 {
1795     const struct rpc_auth *auth = task->tk_rqstp->rq_cred->cr_auth;
1796     struct rpc_rqst *req = task->tk_rqstp;
1797     struct rpc_xprt *xprt = req->rq_xprt;
1798     const struct rpc_procinfo *proc = task->tk_msg.rpc_proc;
1799     int status;
1800 
1801     task->tk_status = 0;
1802     task->tk_action = call_encode;
1803 
1804     if (req->rq_buffer)
1805         return;
1806 
1807     if (proc->p_proc != 0) {
1808         BUG_ON(proc->p_arglen == 0);
1809         if (proc->p_decode != NULL)
1810             BUG_ON(proc->p_replen == 0);
1811     }
1812 
1813     /*
1814      * Calculate the size (in quads) of the RPC call
1815      * and reply headers, and convert both values
1816      * to byte sizes.
1817      */
1818     req->rq_callsize = RPC_CALLHDRSIZE + (auth->au_cslack << 1) +
1819                proc->p_arglen;
1820     req->rq_callsize <<= 2;
1821     /*
1822      * Note: the reply buffer must at minimum allocate enough space
1823      * for the 'struct accepted_reply' from RFC5531.
1824      */
1825     req->rq_rcvsize = RPC_REPHDRSIZE + auth->au_rslack + \
1826             max_t(size_t, proc->p_replen, 2);
1827     req->rq_rcvsize <<= 2;
1828 
1829     status = xprt->ops->buf_alloc(task);
1830     trace_rpc_buf_alloc(task, status);
1831     if (status == 0)
1832         return;
1833     if (status != -ENOMEM) {
1834         rpc_call_rpcerror(task, status);
1835         return;
1836     }
1837 
1838     if (RPC_IS_ASYNC(task) || !fatal_signal_pending(current)) {
1839         task->tk_action = call_allocate;
1840         rpc_delay(task, HZ>>4);
1841         return;
1842     }
1843 
1844     rpc_call_rpcerror(task, -ERESTARTSYS);
1845 }
1846 
1847 static int
1848 rpc_task_need_encode(struct rpc_task *task)
1849 {
1850     return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0 &&
1851         (!(task->tk_flags & RPC_TASK_SENT) ||
1852          !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
1853          xprt_request_need_retransmit(task));
1854 }
1855 
1856 static void
1857 rpc_xdr_encode(struct rpc_task *task)
1858 {
1859     struct rpc_rqst *req = task->tk_rqstp;
1860     struct xdr_stream xdr;
1861 
1862     xdr_buf_init(&req->rq_snd_buf,
1863              req->rq_buffer,
1864              req->rq_callsize);
1865     xdr_buf_init(&req->rq_rcv_buf,
1866              req->rq_rbuffer,
1867              req->rq_rcvsize);
1868 
1869     req->rq_reply_bytes_recvd = 0;
1870     req->rq_snd_buf.head[0].iov_len = 0;
1871     xdr_init_encode(&xdr, &req->rq_snd_buf,
1872             req->rq_snd_buf.head[0].iov_base, req);
1873     if (rpc_encode_header(task, &xdr))
1874         return;
1875 
1876     task->tk_status = rpcauth_wrap_req(task, &xdr);
1877 }
1878 
1879 /*
1880  * 3.   Encode arguments of an RPC call
1881  */
1882 static void
1883 call_encode(struct rpc_task *task)
1884 {
1885     if (!rpc_task_need_encode(task))
1886         goto out;
1887 
1888     /* Dequeue task from the receive queue while we're encoding */
1889     xprt_request_dequeue_xprt(task);
1890     /* Encode here so that rpcsec_gss can use correct sequence number. */
1891     rpc_xdr_encode(task);
1892     /* Add task to reply queue before transmission to avoid races */
1893     if (task->tk_status == 0 && rpc_reply_expected(task))
1894         task->tk_status = xprt_request_enqueue_receive(task);
1895     /* Did the encode result in an error condition? */
1896     if (task->tk_status != 0) {
1897         /* Was the error nonfatal? */
1898         switch (task->tk_status) {
1899         case -EAGAIN:
1900         case -ENOMEM:
1901             rpc_delay(task, HZ >> 4);
1902             break;
1903         case -EKEYEXPIRED:
1904             if (!task->tk_cred_retry) {
1905                 rpc_call_rpcerror(task, task->tk_status);
1906             } else {
1907                 task->tk_action = call_refresh;
1908                 task->tk_cred_retry--;
1909                 trace_rpc_retry_refresh_status(task);
1910             }
1911             break;
1912         default:
1913             rpc_call_rpcerror(task, task->tk_status);
1914         }
1915         return;
1916     }
1917 
1918     xprt_request_enqueue_transmit(task);
1919 out:
1920     task->tk_action = call_transmit;
1921     /* Check that the connection is OK */
1922     if (!xprt_bound(task->tk_xprt))
1923         task->tk_action = call_bind;
1924     else if (!xprt_connected(task->tk_xprt))
1925         task->tk_action = call_connect;
1926 }
1927 
1928 /*
1929  * Helpers to check if the task was already transmitted, and
1930  * to take action when that is the case.
1931  */
1932 static bool
1933 rpc_task_transmitted(struct rpc_task *task)
1934 {
1935     return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1936 }
1937 
1938 static void
1939 rpc_task_handle_transmitted(struct rpc_task *task)
1940 {
1941     xprt_end_transmit(task);
1942     task->tk_action = call_transmit_status;
1943 }
1944 
1945 /*
1946  * 4.   Get the server port number if not yet set
1947  */
1948 static void
1949 call_bind(struct rpc_task *task)
1950 {
1951     struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1952 
1953     if (rpc_task_transmitted(task)) {
1954         rpc_task_handle_transmitted(task);
1955         return;
1956     }
1957 
1958     if (xprt_bound(xprt)) {
1959         task->tk_action = call_connect;
1960         return;
1961     }
1962 
1963     task->tk_action = call_bind_status;
1964     if (!xprt_prepare_transmit(task))
1965         return;
1966 
1967     xprt->ops->rpcbind(task);
1968 }
1969 
1970 /*
1971  * 4a.  Sort out bind result
1972  */
1973 static void
1974 call_bind_status(struct rpc_task *task)
1975 {
1976     struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1977     int status = -EIO;
1978 
1979     if (rpc_task_transmitted(task)) {
1980         rpc_task_handle_transmitted(task);
1981         return;
1982     }
1983 
1984     if (task->tk_status >= 0)
1985         goto out_next;
1986     if (xprt_bound(xprt)) {
1987         task->tk_status = 0;
1988         goto out_next;
1989     }
1990 
1991     switch (task->tk_status) {
1992     case -ENOMEM:
1993         rpc_delay(task, HZ >> 2);
1994         goto retry_timeout;
1995     case -EACCES:
1996         trace_rpcb_prog_unavail_err(task);
1997         /* fail immediately if this is an RPC ping */
1998         if (task->tk_msg.rpc_proc->p_proc == 0) {
1999             status = -EOPNOTSUPP;
2000             break;
2001         }
2002         if (task->tk_rebind_retry == 0)
2003             break;
2004         task->tk_rebind_retry--;
2005         rpc_delay(task, 3*HZ);
2006         goto retry_timeout;
2007     case -ENOBUFS:
2008         rpc_delay(task, HZ >> 2);
2009         goto retry_timeout;
2010     case -EAGAIN:
2011         goto retry_timeout;
2012     case -ETIMEDOUT:
2013         trace_rpcb_timeout_err(task);
2014         goto retry_timeout;
2015     case -EPFNOSUPPORT:
2016         /* server doesn't support any rpcbind version we know of */
2017         trace_rpcb_bind_version_err(task);
2018         break;
2019     case -EPROTONOSUPPORT:
2020         trace_rpcb_bind_version_err(task);
2021         goto retry_timeout;
2022     case -ECONNREFUSED:     /* connection problems */
2023     case -ECONNRESET:
2024     case -ECONNABORTED:
2025     case -ENOTCONN:
2026     case -EHOSTDOWN:
2027     case -ENETDOWN:
2028     case -EHOSTUNREACH:
2029     case -ENETUNREACH:
2030     case -EPIPE:
2031         trace_rpcb_unreachable_err(task);
2032         if (!RPC_IS_SOFTCONN(task)) {
2033             rpc_delay(task, 5*HZ);
2034             goto retry_timeout;
2035         }
2036         status = task->tk_status;
2037         break;
2038     default:
2039         trace_rpcb_unrecognized_err(task);
2040     }
2041 
2042     rpc_call_rpcerror(task, status);
2043     return;
2044 out_next:
2045     task->tk_action = call_connect;
2046     return;
2047 retry_timeout:
2048     task->tk_status = 0;
2049     task->tk_action = call_bind;
2050     rpc_check_timeout(task);
2051 }
2052 
2053 /*
2054  * 4b.  Connect to the RPC server
2055  */
2056 static void
2057 call_connect(struct rpc_task *task)
2058 {
2059     struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2060 
2061     if (rpc_task_transmitted(task)) {
2062         rpc_task_handle_transmitted(task);
2063         return;
2064     }
2065 
2066     if (xprt_connected(xprt)) {
2067         task->tk_action = call_transmit;
2068         return;
2069     }
2070 
2071     task->tk_action = call_connect_status;
2072     if (task->tk_status < 0)
2073         return;
2074     if (task->tk_flags & RPC_TASK_NOCONNECT) {
2075         rpc_call_rpcerror(task, -ENOTCONN);
2076         return;
2077     }
2078     if (!xprt_prepare_transmit(task))
2079         return;
2080     xprt_connect(task);
2081 }
2082 
2083 /*
2084  * 4c.  Sort out connect result
2085  */
2086 static void
2087 call_connect_status(struct rpc_task *task)
2088 {
2089     struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
2090     struct rpc_clnt *clnt = task->tk_client;
2091     int status = task->tk_status;
2092 
2093     if (rpc_task_transmitted(task)) {
2094         rpc_task_handle_transmitted(task);
2095         return;
2096     }
2097 
2098     trace_rpc_connect_status(task);
2099 
2100     if (task->tk_status == 0) {
2101         clnt->cl_stats->netreconn++;
2102         goto out_next;
2103     }
2104     if (xprt_connected(xprt)) {
2105         task->tk_status = 0;
2106         goto out_next;
2107     }
2108 
2109     task->tk_status = 0;
2110     switch (status) {
2111     case -ECONNREFUSED:
2112         /* A positive refusal suggests a rebind is needed. */
2113         if (RPC_IS_SOFTCONN(task))
2114             break;
2115         if (clnt->cl_autobind) {
2116             rpc_force_rebind(clnt);
2117             goto out_retry;
2118         }
2119         fallthrough;
2120     case -ECONNRESET:
2121     case -ECONNABORTED:
2122     case -ENETDOWN:
2123     case -ENETUNREACH:
2124     case -EHOSTUNREACH:
2125     case -EPIPE:
2126     case -EPROTO:
2127         xprt_conditional_disconnect(task->tk_rqstp->rq_xprt,
2128                         task->tk_rqstp->rq_connect_cookie);
2129         if (RPC_IS_SOFTCONN(task))
2130             break;
2131         /* retry with existing socket, after a delay */
2132         rpc_delay(task, 3*HZ);
2133         fallthrough;
2134     case -EADDRINUSE:
2135     case -ENOTCONN:
2136     case -EAGAIN:
2137     case -ETIMEDOUT:
2138         if (!(task->tk_flags & RPC_TASK_NO_ROUND_ROBIN) &&
2139             (task->tk_flags & RPC_TASK_MOVEABLE) &&
2140             test_bit(XPRT_REMOVE, &xprt->state)) {
2141             struct rpc_xprt *saved = task->tk_xprt;
2142             struct rpc_xprt_switch *xps;
2143 
2144             rcu_read_lock();
2145             xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2146             rcu_read_unlock();
2147             if (xps->xps_nxprts > 1) {
2148                 long value;
2149 
2150                 xprt_release(task);
2151                 value = atomic_long_dec_return(&xprt->queuelen);
2152                 if (value == 0)
2153                     rpc_xprt_switch_remove_xprt(xps, saved,
2154                                     true);
2155                 xprt_put(saved);
2156                 task->tk_xprt = NULL;
2157                 task->tk_action = call_start;
2158             }
2159             xprt_switch_put(xps);
2160             if (!task->tk_xprt)
2161                 return;
2162         }
2163         goto out_retry;
2164     case -ENOBUFS:
2165         rpc_delay(task, HZ >> 2);
2166         goto out_retry;
2167     }
2168     rpc_call_rpcerror(task, status);
2169     return;
2170 out_next:
2171     task->tk_action = call_transmit;
2172     return;
2173 out_retry:
2174     /* Check for timeouts before looping back to call_bind */
2175     task->tk_action = call_bind;
2176     rpc_check_timeout(task);
2177 }
2178 
2179 /*
2180  * 5.   Transmit the RPC request, and wait for reply
2181  */
2182 static void
2183 call_transmit(struct rpc_task *task)
2184 {
2185     if (rpc_task_transmitted(task)) {
2186         rpc_task_handle_transmitted(task);
2187         return;
2188     }
2189 
2190     task->tk_action = call_transmit_status;
2191     if (!xprt_prepare_transmit(task))
2192         return;
2193     task->tk_status = 0;
2194     if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2195         if (!xprt_connected(task->tk_xprt)) {
2196             task->tk_status = -ENOTCONN;
2197             return;
2198         }
2199         xprt_transmit(task);
2200     }
2201     xprt_end_transmit(task);
2202 }
2203 
2204 /*
2205  * 5a.  Handle cleanup after a transmission
2206  */
2207 static void
2208 call_transmit_status(struct rpc_task *task)
2209 {
2210     task->tk_action = call_status;
2211 
2212     /*
2213      * Common case: success.  Force the compiler to put this
2214      * test first.
2215      */
2216     if (rpc_task_transmitted(task)) {
2217         task->tk_status = 0;
2218         xprt_request_wait_receive(task);
2219         return;
2220     }
2221 
2222     switch (task->tk_status) {
2223     default:
2224         break;
2225     case -EBADMSG:
2226         task->tk_status = 0;
2227         task->tk_action = call_encode;
2228         break;
2229         /*
2230          * Special cases: if we've been waiting on the
2231          * socket's write_space() callback, or if the
2232          * socket just returned a connection error,
2233          * then hold onto the transport lock.
2234          */
2235     case -ENOMEM:
2236     case -ENOBUFS:
2237         rpc_delay(task, HZ>>2);
2238         fallthrough;
2239     case -EBADSLT:
2240     case -EAGAIN:
2241         task->tk_action = call_transmit;
2242         task->tk_status = 0;
2243         break;
2244     case -ECONNREFUSED:
2245     case -EHOSTDOWN:
2246     case -ENETDOWN:
2247     case -EHOSTUNREACH:
2248     case -ENETUNREACH:
2249     case -EPERM:
2250         if (RPC_IS_SOFTCONN(task)) {
2251             if (!task->tk_msg.rpc_proc->p_proc)
2252                 trace_xprt_ping(task->tk_xprt,
2253                         task->tk_status);
2254             rpc_call_rpcerror(task, task->tk_status);
2255             return;
2256         }
2257         fallthrough;
2258     case -ECONNRESET:
2259     case -ECONNABORTED:
2260     case -EADDRINUSE:
2261     case -ENOTCONN:
2262     case -EPIPE:
2263         task->tk_action = call_bind;
2264         task->tk_status = 0;
2265         break;
2266     }
2267     rpc_check_timeout(task);
2268 }
2269 
2270 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
2271 static void call_bc_transmit(struct rpc_task *task);
2272 static void call_bc_transmit_status(struct rpc_task *task);
2273 
2274 static void
2275 call_bc_encode(struct rpc_task *task)
2276 {
2277     xprt_request_enqueue_transmit(task);
2278     task->tk_action = call_bc_transmit;
2279 }
2280 
2281 /*
2282  * 5b.  Send the backchannel RPC reply.  On error, drop the reply.  In
2283  * addition, disconnect on connectivity errors.
2284  */
2285 static void
2286 call_bc_transmit(struct rpc_task *task)
2287 {
2288     task->tk_action = call_bc_transmit_status;
2289     if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) {
2290         if (!xprt_prepare_transmit(task))
2291             return;
2292         task->tk_status = 0;
2293         xprt_transmit(task);
2294     }
2295     xprt_end_transmit(task);
2296 }
2297 
2298 static void
2299 call_bc_transmit_status(struct rpc_task *task)
2300 {
2301     struct rpc_rqst *req = task->tk_rqstp;
2302 
2303     if (rpc_task_transmitted(task))
2304         task->tk_status = 0;
2305 
2306     switch (task->tk_status) {
2307     case 0:
2308         /* Success */
2309     case -ENETDOWN:
2310     case -EHOSTDOWN:
2311     case -EHOSTUNREACH:
2312     case -ENETUNREACH:
2313     case -ECONNRESET:
2314     case -ECONNREFUSED:
2315     case -EADDRINUSE:
2316     case -ENOTCONN:
2317     case -EPIPE:
2318         break;
2319     case -ENOMEM:
2320     case -ENOBUFS:
2321         rpc_delay(task, HZ>>2);
2322         fallthrough;
2323     case -EBADSLT:
2324     case -EAGAIN:
2325         task->tk_status = 0;
2326         task->tk_action = call_bc_transmit;
2327         return;
2328     case -ETIMEDOUT:
2329         /*
2330          * Problem reaching the server.  Disconnect and let the
2331          * forechannel reestablish the connection.  The server will
2332          * have to retransmit the backchannel request and we'll
2333          * reprocess it.  Since these ops are idempotent, there's no
2334          * need to cache our reply at this time.
2335          */
2336         printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2337             "error: %d\n", task->tk_status);
2338         xprt_conditional_disconnect(req->rq_xprt,
2339             req->rq_connect_cookie);
2340         break;
2341     default:
2342         /*
2343          * We were unable to reply and will have to drop the
2344          * request.  The server should reconnect and retransmit.
2345          */
2346         printk(KERN_NOTICE "RPC: Could not send backchannel reply "
2347             "error: %d\n", task->tk_status);
2348         break;
2349     }
2350     task->tk_action = rpc_exit_task;
2351 }
2352 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
2353 
2354 /*
2355  * 6.   Sort out the RPC call status
2356  */
2357 static void
2358 call_status(struct rpc_task *task)
2359 {
2360     struct rpc_clnt *clnt = task->tk_client;
2361     int     status;
2362 
2363     if (!task->tk_msg.rpc_proc->p_proc)
2364         trace_xprt_ping(task->tk_xprt, task->tk_status);
2365 
2366     status = task->tk_status;
2367     if (status >= 0) {
2368         task->tk_action = call_decode;
2369         return;
2370     }
2371 
2372     trace_rpc_call_status(task);
2373     task->tk_status = 0;
2374     switch(status) {
2375     case -EHOSTDOWN:
2376     case -ENETDOWN:
2377     case -EHOSTUNREACH:
2378     case -ENETUNREACH:
2379     case -EPERM:
2380         if (RPC_IS_SOFTCONN(task))
2381             goto out_exit;
2382         /*
2383          * Delay any retries for 3 seconds, then handle as if it
2384          * were a timeout.
2385          */
2386         rpc_delay(task, 3*HZ);
2387         fallthrough;
2388     case -ETIMEDOUT:
2389         break;
2390     case -ECONNREFUSED:
2391     case -ECONNRESET:
2392     case -ECONNABORTED:
2393     case -ENOTCONN:
2394         rpc_force_rebind(clnt);
2395         break;
2396     case -EADDRINUSE:
2397         rpc_delay(task, 3*HZ);
2398         fallthrough;
2399     case -EPIPE:
2400     case -EAGAIN:
2401         break;
2402     case -ENFILE:
2403     case -ENOBUFS:
2404     case -ENOMEM:
2405         rpc_delay(task, HZ>>2);
2406         break;
2407     case -EIO:
2408         /* shutdown or soft timeout */
2409         goto out_exit;
2410     default:
2411         if (clnt->cl_chatty)
2412             printk("%s: RPC call returned error %d\n",
2413                    clnt->cl_program->name, -status);
2414         goto out_exit;
2415     }
2416     task->tk_action = call_encode;
2417     if (status != -ECONNRESET && status != -ECONNABORTED)
2418         rpc_check_timeout(task);
2419     return;
2420 out_exit:
2421     rpc_call_rpcerror(task, status);
2422 }
2423 
2424 static bool
2425 rpc_check_connected(const struct rpc_rqst *req)
2426 {
2427     /* No allocated request or transport? return true */
2428     if (!req || !req->rq_xprt)
2429         return true;
2430     return xprt_connected(req->rq_xprt);
2431 }
2432 
2433 static void
2434 rpc_check_timeout(struct rpc_task *task)
2435 {
2436     struct rpc_clnt *clnt = task->tk_client;
2437 
2438     if (RPC_SIGNALLED(task)) {
2439         rpc_call_rpcerror(task, -ERESTARTSYS);
2440         return;
2441     }
2442 
2443     if (xprt_adjust_timeout(task->tk_rqstp) == 0)
2444         return;
2445 
2446     trace_rpc_timeout_status(task);
2447     task->tk_timeouts++;
2448 
2449     if (RPC_IS_SOFTCONN(task) && !rpc_check_connected(task->tk_rqstp)) {
2450         rpc_call_rpcerror(task, -ETIMEDOUT);
2451         return;
2452     }
2453 
2454     if (RPC_IS_SOFT(task)) {
2455         /*
2456          * Once a "no retrans timeout" soft tasks (a.k.a NFSv4) has
2457          * been sent, it should time out only if the transport
2458          * connection gets terminally broken.
2459          */
2460         if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
2461             rpc_check_connected(task->tk_rqstp))
2462             return;
2463 
2464         if (clnt->cl_chatty) {
2465             pr_notice_ratelimited(
2466                 "%s: server %s not responding, timed out\n",
2467                 clnt->cl_program->name,
2468                 task->tk_xprt->servername);
2469         }
2470         if (task->tk_flags & RPC_TASK_TIMEOUT)
2471             rpc_call_rpcerror(task, -ETIMEDOUT);
2472         else
2473             __rpc_call_rpcerror(task, -EIO, -ETIMEDOUT);
2474         return;
2475     }
2476 
2477     if (!(task->tk_flags & RPC_CALL_MAJORSEEN)) {
2478         task->tk_flags |= RPC_CALL_MAJORSEEN;
2479         if (clnt->cl_chatty) {
2480             pr_notice_ratelimited(
2481                 "%s: server %s not responding, still trying\n",
2482                 clnt->cl_program->name,
2483                 task->tk_xprt->servername);
2484         }
2485     }
2486     rpc_force_rebind(clnt);
2487     /*
2488      * Did our request time out due to an RPCSEC_GSS out-of-sequence
2489      * event? RFC2203 requires the server to drop all such requests.
2490      */
2491     rpcauth_invalcred(task);
2492 }
2493 
2494 /*
2495  * 7.   Decode the RPC reply
2496  */
2497 static void
2498 call_decode(struct rpc_task *task)
2499 {
2500     struct rpc_clnt *clnt = task->tk_client;
2501     struct rpc_rqst *req = task->tk_rqstp;
2502     struct xdr_stream xdr;
2503     int err;
2504 
2505     if (!task->tk_msg.rpc_proc->p_decode) {
2506         task->tk_action = rpc_exit_task;
2507         return;
2508     }
2509 
2510     if (task->tk_flags & RPC_CALL_MAJORSEEN) {
2511         if (clnt->cl_chatty) {
2512             pr_notice_ratelimited("%s: server %s OK\n",
2513                 clnt->cl_program->name,
2514                 task->tk_xprt->servername);
2515         }
2516         task->tk_flags &= ~RPC_CALL_MAJORSEEN;
2517     }
2518 
2519     /*
2520      * Did we ever call xprt_complete_rqst()? If not, we should assume
2521      * the message is incomplete.
2522      */
2523     err = -EAGAIN;
2524     if (!req->rq_reply_bytes_recvd)
2525         goto out;
2526 
2527     /* Ensure that we see all writes made by xprt_complete_rqst()
2528      * before it changed req->rq_reply_bytes_recvd.
2529      */
2530     smp_rmb();
2531 
2532     req->rq_rcv_buf.len = req->rq_private_buf.len;
2533     trace_rpc_xdr_recvfrom(task, &req->rq_rcv_buf);
2534 
2535     /* Check that the softirq receive buffer is valid */
2536     WARN_ON(memcmp(&req->rq_rcv_buf, &req->rq_private_buf,
2537                 sizeof(req->rq_rcv_buf)) != 0);
2538 
2539     xdr_init_decode(&xdr, &req->rq_rcv_buf,
2540             req->rq_rcv_buf.head[0].iov_base, req);
2541     err = rpc_decode_header(task, &xdr);
2542 out:
2543     switch (err) {
2544     case 0:
2545         task->tk_action = rpc_exit_task;
2546         task->tk_status = rpcauth_unwrap_resp(task, &xdr);
2547         return;
2548     case -EAGAIN:
2549         task->tk_status = 0;
2550         if (task->tk_client->cl_discrtry)
2551             xprt_conditional_disconnect(req->rq_xprt,
2552                             req->rq_connect_cookie);
2553         task->tk_action = call_encode;
2554         rpc_check_timeout(task);
2555         break;
2556     case -EKEYREJECTED:
2557         task->tk_action = call_reserve;
2558         rpc_check_timeout(task);
2559         rpcauth_invalcred(task);
2560         /* Ensure we obtain a new XID if we retry! */
2561         xprt_release(task);
2562     }
2563 }
2564 
2565 static int
2566 rpc_encode_header(struct rpc_task *task, struct xdr_stream *xdr)
2567 {
2568     struct rpc_clnt *clnt = task->tk_client;
2569     struct rpc_rqst *req = task->tk_rqstp;
2570     __be32 *p;
2571     int error;
2572 
2573     error = -EMSGSIZE;
2574     p = xdr_reserve_space(xdr, RPC_CALLHDRSIZE << 2);
2575     if (!p)
2576         goto out_fail;
2577     *p++ = req->rq_xid;
2578     *p++ = rpc_call;
2579     *p++ = cpu_to_be32(RPC_VERSION);
2580     *p++ = cpu_to_be32(clnt->cl_prog);
2581     *p++ = cpu_to_be32(clnt->cl_vers);
2582     *p   = cpu_to_be32(task->tk_msg.rpc_proc->p_proc);
2583 
2584     error = rpcauth_marshcred(task, xdr);
2585     if (error < 0)
2586         goto out_fail;
2587     return 0;
2588 out_fail:
2589     trace_rpc_bad_callhdr(task);
2590     rpc_call_rpcerror(task, error);
2591     return error;
2592 }
2593 
2594 static noinline int
2595 rpc_decode_header(struct rpc_task *task, struct xdr_stream *xdr)
2596 {
2597     struct rpc_clnt *clnt = task->tk_client;
2598     int error;
2599     __be32 *p;
2600 
2601     /* RFC-1014 says that the representation of XDR data must be a
2602      * multiple of four bytes
2603      * - if it isn't pointer subtraction in the NFS client may give
2604      *   undefined results
2605      */
2606     if (task->tk_rqstp->rq_rcv_buf.len & 3)
2607         goto out_unparsable;
2608 
2609     p = xdr_inline_decode(xdr, 3 * sizeof(*p));
2610     if (!p)
2611         goto out_unparsable;
2612     p++;    /* skip XID */
2613     if (*p++ != rpc_reply)
2614         goto out_unparsable;
2615     if (*p++ != rpc_msg_accepted)
2616         goto out_msg_denied;
2617 
2618     error = rpcauth_checkverf(task, xdr);
2619     if (error)
2620         goto out_verifier;
2621 
2622     p = xdr_inline_decode(xdr, sizeof(*p));
2623     if (!p)
2624         goto out_unparsable;
2625     switch (*p) {
2626     case rpc_success:
2627         return 0;
2628     case rpc_prog_unavail:
2629         trace_rpc__prog_unavail(task);
2630         error = -EPFNOSUPPORT;
2631         goto out_err;
2632     case rpc_prog_mismatch:
2633         trace_rpc__prog_mismatch(task);
2634         error = -EPROTONOSUPPORT;
2635         goto out_err;
2636     case rpc_proc_unavail:
2637         trace_rpc__proc_unavail(task);
2638         error = -EOPNOTSUPP;
2639         goto out_err;
2640     case rpc_garbage_args:
2641     case rpc_system_err:
2642         trace_rpc__garbage_args(task);
2643         error = -EIO;
2644         break;
2645     default:
2646         goto out_unparsable;
2647     }
2648 
2649 out_garbage:
2650     clnt->cl_stats->rpcgarbage++;
2651     if (task->tk_garb_retry) {
2652         task->tk_garb_retry--;
2653         task->tk_action = call_encode;
2654         return -EAGAIN;
2655     }
2656 out_err:
2657     rpc_call_rpcerror(task, error);
2658     return error;
2659 
2660 out_unparsable:
2661     trace_rpc__unparsable(task);
2662     error = -EIO;
2663     goto out_garbage;
2664 
2665 out_verifier:
2666     trace_rpc_bad_verifier(task);
2667     goto out_err;
2668 
2669 out_msg_denied:
2670     error = -EACCES;
2671     p = xdr_inline_decode(xdr, sizeof(*p));
2672     if (!p)
2673         goto out_unparsable;
2674     switch (*p++) {
2675     case rpc_auth_error:
2676         break;
2677     case rpc_mismatch:
2678         trace_rpc__mismatch(task);
2679         error = -EPROTONOSUPPORT;
2680         goto out_err;
2681     default:
2682         goto out_unparsable;
2683     }
2684 
2685     p = xdr_inline_decode(xdr, sizeof(*p));
2686     if (!p)
2687         goto out_unparsable;
2688     switch (*p++) {
2689     case rpc_autherr_rejectedcred:
2690     case rpc_autherr_rejectedverf:
2691     case rpcsec_gsserr_credproblem:
2692     case rpcsec_gsserr_ctxproblem:
2693         if (!task->tk_cred_retry)
2694             break;
2695         task->tk_cred_retry--;
2696         trace_rpc__stale_creds(task);
2697         return -EKEYREJECTED;
2698     case rpc_autherr_badcred:
2699     case rpc_autherr_badverf:
2700         /* possibly garbled cred/verf? */
2701         if (!task->tk_garb_retry)
2702             break;
2703         task->tk_garb_retry--;
2704         trace_rpc__bad_creds(task);
2705         task->tk_action = call_encode;
2706         return -EAGAIN;
2707     case rpc_autherr_tooweak:
2708         trace_rpc__auth_tooweak(task);
2709         pr_warn("RPC: server %s requires stronger authentication.\n",
2710             task->tk_xprt->servername);
2711         break;
2712     default:
2713         goto out_unparsable;
2714     }
2715     goto out_err;
2716 }
2717 
2718 static void rpcproc_encode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2719         const void *obj)
2720 {
2721 }
2722 
2723 static int rpcproc_decode_null(struct rpc_rqst *rqstp, struct xdr_stream *xdr,
2724         void *obj)
2725 {
2726     return 0;
2727 }
2728 
2729 static const struct rpc_procinfo rpcproc_null = {
2730     .p_encode = rpcproc_encode_null,
2731     .p_decode = rpcproc_decode_null,
2732 };
2733 
2734 static const struct rpc_procinfo rpcproc_null_noreply = {
2735     .p_encode = rpcproc_encode_null,
2736 };
2737 
2738 static void
2739 rpc_null_call_prepare(struct rpc_task *task, void *data)
2740 {
2741     task->tk_flags &= ~RPC_TASK_NO_RETRANS_TIMEOUT;
2742     rpc_call_start(task);
2743 }
2744 
2745 static const struct rpc_call_ops rpc_null_ops = {
2746     .rpc_call_prepare = rpc_null_call_prepare,
2747     .rpc_call_done = rpc_default_callback,
2748 };
2749 
2750 static
2751 struct rpc_task *rpc_call_null_helper(struct rpc_clnt *clnt,
2752         struct rpc_xprt *xprt, struct rpc_cred *cred, int flags,
2753         const struct rpc_call_ops *ops, void *data)
2754 {
2755     struct rpc_message msg = {
2756         .rpc_proc = &rpcproc_null,
2757     };
2758     struct rpc_task_setup task_setup_data = {
2759         .rpc_client = clnt,
2760         .rpc_xprt = xprt,
2761         .rpc_message = &msg,
2762         .rpc_op_cred = cred,
2763         .callback_ops = ops ?: &rpc_null_ops,
2764         .callback_data = data,
2765         .flags = flags | RPC_TASK_SOFT | RPC_TASK_SOFTCONN |
2766              RPC_TASK_NULLCREDS,
2767     };
2768 
2769     return rpc_run_task(&task_setup_data);
2770 }
2771 
2772 struct rpc_task *rpc_call_null(struct rpc_clnt *clnt, struct rpc_cred *cred, int flags)
2773 {
2774     return rpc_call_null_helper(clnt, NULL, cred, flags, NULL, NULL);
2775 }
2776 EXPORT_SYMBOL_GPL(rpc_call_null);
2777 
2778 static int rpc_ping(struct rpc_clnt *clnt)
2779 {
2780     struct rpc_task *task;
2781     int status;
2782 
2783     task = rpc_call_null_helper(clnt, NULL, NULL, 0, NULL, NULL);
2784     if (IS_ERR(task))
2785         return PTR_ERR(task);
2786     status = task->tk_status;
2787     rpc_put_task(task);
2788     return status;
2789 }
2790 
2791 static int rpc_ping_noreply(struct rpc_clnt *clnt)
2792 {
2793     struct rpc_message msg = {
2794         .rpc_proc = &rpcproc_null_noreply,
2795     };
2796     struct rpc_task_setup task_setup_data = {
2797         .rpc_client = clnt,
2798         .rpc_message = &msg,
2799         .callback_ops = &rpc_null_ops,
2800         .flags = RPC_TASK_SOFT | RPC_TASK_SOFTCONN | RPC_TASK_NULLCREDS,
2801     };
2802     struct rpc_task *task;
2803     int status;
2804 
2805     task = rpc_run_task(&task_setup_data);
2806     if (IS_ERR(task))
2807         return PTR_ERR(task);
2808     status = task->tk_status;
2809     rpc_put_task(task);
2810     return status;
2811 }
2812 
2813 struct rpc_cb_add_xprt_calldata {
2814     struct rpc_xprt_switch *xps;
2815     struct rpc_xprt *xprt;
2816 };
2817 
2818 static void rpc_cb_add_xprt_done(struct rpc_task *task, void *calldata)
2819 {
2820     struct rpc_cb_add_xprt_calldata *data = calldata;
2821 
2822     if (task->tk_status == 0)
2823         rpc_xprt_switch_add_xprt(data->xps, data->xprt);
2824 }
2825 
2826 static void rpc_cb_add_xprt_release(void *calldata)
2827 {
2828     struct rpc_cb_add_xprt_calldata *data = calldata;
2829 
2830     xprt_put(data->xprt);
2831     xprt_switch_put(data->xps);
2832     kfree(data);
2833 }
2834 
2835 static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = {
2836     .rpc_call_prepare = rpc_null_call_prepare,
2837     .rpc_call_done = rpc_cb_add_xprt_done,
2838     .rpc_release = rpc_cb_add_xprt_release,
2839 };
2840 
2841 /**
2842  * rpc_clnt_test_and_add_xprt - Test and add a new transport to a rpc_clnt
2843  * @clnt: pointer to struct rpc_clnt
2844  * @xps: pointer to struct rpc_xprt_switch,
2845  * @xprt: pointer struct rpc_xprt
2846  * @dummy: unused
2847  */
2848 int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
2849         struct rpc_xprt_switch *xps, struct rpc_xprt *xprt,
2850         void *dummy)
2851 {
2852     struct rpc_cb_add_xprt_calldata *data;
2853     struct rpc_task *task;
2854 
2855     if (xps->xps_nunique_destaddr_xprts + 1 > clnt->cl_max_connect) {
2856         rcu_read_lock();
2857         pr_warn("SUNRPC: reached max allowed number (%d) did not add "
2858             "transport to server: %s\n", clnt->cl_max_connect,
2859             rpc_peeraddr2str(clnt, RPC_DISPLAY_ADDR));
2860         rcu_read_unlock();
2861         return -EINVAL;
2862     }
2863 
2864     data = kmalloc(sizeof(*data), GFP_KERNEL);
2865     if (!data)
2866         return -ENOMEM;
2867     data->xps = xprt_switch_get(xps);
2868     data->xprt = xprt_get(xprt);
2869     if (rpc_xprt_switch_has_addr(data->xps, (struct sockaddr *)&xprt->addr)) {
2870         rpc_cb_add_xprt_release(data);
2871         goto success;
2872     }
2873 
2874     task = rpc_call_null_helper(clnt, xprt, NULL, RPC_TASK_ASYNC,
2875             &rpc_cb_add_xprt_call_ops, data);
2876     if (IS_ERR(task))
2877         return PTR_ERR(task);
2878 
2879     data->xps->xps_nunique_destaddr_xprts++;
2880     rpc_put_task(task);
2881 success:
2882     return 1;
2883 }
2884 EXPORT_SYMBOL_GPL(rpc_clnt_test_and_add_xprt);
2885 
2886 static int rpc_clnt_add_xprt_helper(struct rpc_clnt *clnt,
2887                     struct rpc_xprt *xprt,
2888                     struct rpc_add_xprt_test *data)
2889 {
2890     struct rpc_task *task;
2891     int status = -EADDRINUSE;
2892 
2893     /* Test the connection */
2894     task = rpc_call_null_helper(clnt, xprt, NULL, 0, NULL, NULL);
2895     if (IS_ERR(task))
2896         return PTR_ERR(task);
2897 
2898     status = task->tk_status;
2899     rpc_put_task(task);
2900 
2901     if (status < 0)
2902         return status;
2903 
2904     /* rpc_xprt_switch and rpc_xprt are deferrenced by add_xprt_test() */
2905     data->add_xprt_test(clnt, xprt, data->data);
2906 
2907     return 0;
2908 }
2909 
2910 /**
2911  * rpc_clnt_setup_test_and_add_xprt()
2912  *
2913  * This is an rpc_clnt_add_xprt setup() function which returns 1 so:
2914  *   1) caller of the test function must dereference the rpc_xprt_switch
2915  *   and the rpc_xprt.
2916  *   2) test function must call rpc_xprt_switch_add_xprt, usually in
2917  *   the rpc_call_done routine.
2918  *
2919  * Upon success (return of 1), the test function adds the new
2920  * transport to the rpc_clnt xprt switch
2921  *
2922  * @clnt: struct rpc_clnt to get the new transport
2923  * @xps:  the rpc_xprt_switch to hold the new transport
2924  * @xprt: the rpc_xprt to test
2925  * @data: a struct rpc_add_xprt_test pointer that holds the test function
2926  *        and test function call data
2927  */
2928 int rpc_clnt_setup_test_and_add_xprt(struct rpc_clnt *clnt,
2929                      struct rpc_xprt_switch *xps,
2930                      struct rpc_xprt *xprt,
2931                      void *data)
2932 {
2933     int status = -EADDRINUSE;
2934 
2935     xprt = xprt_get(xprt);
2936     xprt_switch_get(xps);
2937 
2938     if (rpc_xprt_switch_has_addr(xps, (struct sockaddr *)&xprt->addr))
2939         goto out_err;
2940 
2941     status = rpc_clnt_add_xprt_helper(clnt, xprt, data);
2942     if (status < 0)
2943         goto out_err;
2944 
2945     status = 1;
2946 out_err:
2947     xprt_put(xprt);
2948     xprt_switch_put(xps);
2949     if (status < 0)
2950         pr_info("RPC:   rpc_clnt_test_xprt failed: %d addr %s not "
2951             "added\n", status,
2952             xprt->address_strings[RPC_DISPLAY_ADDR]);
2953     /* so that rpc_clnt_add_xprt does not call rpc_xprt_switch_add_xprt */
2954     return status;
2955 }
2956 EXPORT_SYMBOL_GPL(rpc_clnt_setup_test_and_add_xprt);
2957 
2958 /**
2959  * rpc_clnt_add_xprt - Add a new transport to a rpc_clnt
2960  * @clnt: pointer to struct rpc_clnt
2961  * @xprtargs: pointer to struct xprt_create
2962  * @setup: callback to test and/or set up the connection
2963  * @data: pointer to setup function data
2964  *
2965  * Creates a new transport using the parameters set in args and
2966  * adds it to clnt.
2967  * If ping is set, then test that connectivity succeeds before
2968  * adding the new transport.
2969  *
2970  */
2971 int rpc_clnt_add_xprt(struct rpc_clnt *clnt,
2972         struct xprt_create *xprtargs,
2973         int (*setup)(struct rpc_clnt *,
2974             struct rpc_xprt_switch *,
2975             struct rpc_xprt *,
2976             void *),
2977         void *data)
2978 {
2979     struct rpc_xprt_switch *xps;
2980     struct rpc_xprt *xprt;
2981     unsigned long connect_timeout;
2982     unsigned long reconnect_timeout;
2983     unsigned char resvport, reuseport;
2984     int ret = 0, ident;
2985 
2986     rcu_read_lock();
2987     xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
2988     xprt = xprt_iter_xprt(&clnt->cl_xpi);
2989     if (xps == NULL || xprt == NULL) {
2990         rcu_read_unlock();
2991         xprt_switch_put(xps);
2992         return -EAGAIN;
2993     }
2994     resvport = xprt->resvport;
2995     reuseport = xprt->reuseport;
2996     connect_timeout = xprt->connect_timeout;
2997     reconnect_timeout = xprt->max_reconnect_timeout;
2998     ident = xprt->xprt_class->ident;
2999     rcu_read_unlock();
3000 
3001     if (!xprtargs->ident)
3002         xprtargs->ident = ident;
3003     xprt = xprt_create_transport(xprtargs);
3004     if (IS_ERR(xprt)) {
3005         ret = PTR_ERR(xprt);
3006         goto out_put_switch;
3007     }
3008     xprt->resvport = resvport;
3009     xprt->reuseport = reuseport;
3010     if (xprt->ops->set_connect_timeout != NULL)
3011         xprt->ops->set_connect_timeout(xprt,
3012                 connect_timeout,
3013                 reconnect_timeout);
3014 
3015     rpc_xprt_switch_set_roundrobin(xps);
3016     if (setup) {
3017         ret = setup(clnt, xps, xprt, data);
3018         if (ret != 0)
3019             goto out_put_xprt;
3020     }
3021     rpc_xprt_switch_add_xprt(xps, xprt);
3022 out_put_xprt:
3023     xprt_put(xprt);
3024 out_put_switch:
3025     xprt_switch_put(xps);
3026     return ret;
3027 }
3028 EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt);
3029 
3030 static int rpc_xprt_probe_trunked(struct rpc_clnt *clnt,
3031                   struct rpc_xprt *xprt,
3032                   struct rpc_add_xprt_test *data)
3033 {
3034     struct rpc_xprt_switch *xps;
3035     struct rpc_xprt *main_xprt;
3036     int status = 0;
3037 
3038     xprt_get(xprt);
3039 
3040     rcu_read_lock();
3041     main_xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
3042     xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3043     status = rpc_cmp_addr_port((struct sockaddr *)&xprt->addr,
3044                    (struct sockaddr *)&main_xprt->addr);
3045     rcu_read_unlock();
3046     xprt_put(main_xprt);
3047     if (status || !test_bit(XPRT_OFFLINE, &xprt->state))
3048         goto out;
3049 
3050     status = rpc_clnt_add_xprt_helper(clnt, xprt, data);
3051 out:
3052     xprt_put(xprt);
3053     xprt_switch_put(xps);
3054     return status;
3055 }
3056 
3057 /* rpc_clnt_probe_trunked_xprt -- probe offlined transport for session trunking
3058  * @clnt rpc_clnt structure
3059  *
3060  * For each offlined transport found in the rpc_clnt structure call
3061  * the function rpc_xprt_probe_trunked() which will determine if this
3062  * transport still belongs to the trunking group.
3063  */
3064 void rpc_clnt_probe_trunked_xprts(struct rpc_clnt *clnt,
3065                   struct rpc_add_xprt_test *data)
3066 {
3067     struct rpc_xprt_iter xpi;
3068     int ret;
3069 
3070     ret = rpc_clnt_xprt_iter_offline_init(clnt, &xpi);
3071     if (ret)
3072         return;
3073     for (;;) {
3074         struct rpc_xprt *xprt = xprt_iter_get_next(&xpi);
3075 
3076         if (!xprt)
3077             break;
3078         ret = rpc_xprt_probe_trunked(clnt, xprt, data);
3079         xprt_put(xprt);
3080         if (ret < 0)
3081             break;
3082         xprt_iter_rewind(&xpi);
3083     }
3084     xprt_iter_destroy(&xpi);
3085 }
3086 EXPORT_SYMBOL_GPL(rpc_clnt_probe_trunked_xprts);
3087 
3088 static int rpc_xprt_offline(struct rpc_clnt *clnt,
3089                 struct rpc_xprt *xprt,
3090                 void *data)
3091 {
3092     struct rpc_xprt *main_xprt;
3093     struct rpc_xprt_switch *xps;
3094     int err = 0;
3095 
3096     xprt_get(xprt);
3097 
3098     rcu_read_lock();
3099     main_xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
3100     xps = xprt_switch_get(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3101     err = rpc_cmp_addr_port((struct sockaddr *)&xprt->addr,
3102                 (struct sockaddr *)&main_xprt->addr);
3103     rcu_read_unlock();
3104     xprt_put(main_xprt);
3105     if (err)
3106         goto out;
3107 
3108     if (wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_KILLABLE)) {
3109         err = -EINTR;
3110         goto out;
3111     }
3112     xprt_set_offline_locked(xprt, xps);
3113 
3114     xprt_release_write(xprt, NULL);
3115 out:
3116     xprt_put(xprt);
3117     xprt_switch_put(xps);
3118     return err;
3119 }
3120 
3121 /* rpc_clnt_manage_trunked_xprts -- offline trunked transports
3122  * @clnt rpc_clnt structure
3123  *
3124  * For each active transport found in the rpc_clnt structure call
3125  * the function rpc_xprt_offline() which will identify trunked transports
3126  * and will mark them offline.
3127  */
3128 void rpc_clnt_manage_trunked_xprts(struct rpc_clnt *clnt)
3129 {
3130     rpc_clnt_iterate_for_each_xprt(clnt, rpc_xprt_offline, NULL);
3131 }
3132 EXPORT_SYMBOL_GPL(rpc_clnt_manage_trunked_xprts);
3133 
3134 struct connect_timeout_data {
3135     unsigned long connect_timeout;
3136     unsigned long reconnect_timeout;
3137 };
3138 
3139 static int
3140 rpc_xprt_set_connect_timeout(struct rpc_clnt *clnt,
3141         struct rpc_xprt *xprt,
3142         void *data)
3143 {
3144     struct connect_timeout_data *timeo = data;
3145 
3146     if (xprt->ops->set_connect_timeout)
3147         xprt->ops->set_connect_timeout(xprt,
3148                 timeo->connect_timeout,
3149                 timeo->reconnect_timeout);
3150     return 0;
3151 }
3152 
3153 void
3154 rpc_set_connect_timeout(struct rpc_clnt *clnt,
3155         unsigned long connect_timeout,
3156         unsigned long reconnect_timeout)
3157 {
3158     struct connect_timeout_data timeout = {
3159         .connect_timeout = connect_timeout,
3160         .reconnect_timeout = reconnect_timeout,
3161     };
3162     rpc_clnt_iterate_for_each_xprt(clnt,
3163             rpc_xprt_set_connect_timeout,
3164             &timeout);
3165 }
3166 EXPORT_SYMBOL_GPL(rpc_set_connect_timeout);
3167 
3168 void rpc_clnt_xprt_switch_put(struct rpc_clnt *clnt)
3169 {
3170     rcu_read_lock();
3171     xprt_switch_put(rcu_dereference(clnt->cl_xpi.xpi_xpswitch));
3172     rcu_read_unlock();
3173 }
3174 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_put);
3175 
3176 void rpc_clnt_xprt_set_online(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3177 {
3178     struct rpc_xprt_switch *xps;
3179 
3180     rcu_read_lock();
3181     xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3182     rcu_read_unlock();
3183     xprt_set_online_locked(xprt, xps);
3184 }
3185 
3186 void rpc_clnt_xprt_switch_add_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3187 {
3188     if (rpc_clnt_xprt_switch_has_addr(clnt,
3189         (const struct sockaddr *)&xprt->addr)) {
3190         return rpc_clnt_xprt_set_online(clnt, xprt);
3191     }
3192     rcu_read_lock();
3193     rpc_xprt_switch_add_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
3194                  xprt);
3195     rcu_read_unlock();
3196 }
3197 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_add_xprt);
3198 
3199 void rpc_clnt_xprt_switch_remove_xprt(struct rpc_clnt *clnt, struct rpc_xprt *xprt)
3200 {
3201     struct rpc_xprt_switch *xps;
3202 
3203     rcu_read_lock();
3204     xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3205     rpc_xprt_switch_remove_xprt(rcu_dereference(clnt->cl_xpi.xpi_xpswitch),
3206                     xprt, 0);
3207     xps->xps_nunique_destaddr_xprts--;
3208     rcu_read_unlock();
3209 }
3210 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_remove_xprt);
3211 
3212 bool rpc_clnt_xprt_switch_has_addr(struct rpc_clnt *clnt,
3213                    const struct sockaddr *sap)
3214 {
3215     struct rpc_xprt_switch *xps;
3216     bool ret;
3217 
3218     rcu_read_lock();
3219     xps = rcu_dereference(clnt->cl_xpi.xpi_xpswitch);
3220     ret = rpc_xprt_switch_has_addr(xps, sap);
3221     rcu_read_unlock();
3222     return ret;
3223 }
3224 EXPORT_SYMBOL_GPL(rpc_clnt_xprt_switch_has_addr);
3225 
3226 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
3227 static void rpc_show_header(void)
3228 {
3229     printk(KERN_INFO "-pid- flgs status -client- --rqstp- "
3230         "-timeout ---ops--\n");
3231 }
3232 
3233 static void rpc_show_task(const struct rpc_clnt *clnt,
3234               const struct rpc_task *task)
3235 {
3236     const char *rpc_waitq = "none";
3237 
3238     if (RPC_IS_QUEUED(task))
3239         rpc_waitq = rpc_qname(task->tk_waitqueue);
3240 
3241     printk(KERN_INFO "%5u %04x %6d %8p %8p %8ld %8p %sv%u %s a:%ps q:%s\n",
3242         task->tk_pid, task->tk_flags, task->tk_status,
3243         clnt, task->tk_rqstp, rpc_task_timeout(task), task->tk_ops,
3244         clnt->cl_program->name, clnt->cl_vers, rpc_proc_name(task),
3245         task->tk_action, rpc_waitq);
3246 }
3247 
3248 void rpc_show_tasks(struct net *net)
3249 {
3250     struct rpc_clnt *clnt;
3251     struct rpc_task *task;
3252     int header = 0;
3253     struct sunrpc_net *sn = net_generic(net, sunrpc_net_id);
3254 
3255     spin_lock(&sn->rpc_client_lock);
3256     list_for_each_entry(clnt, &sn->all_clients, cl_clients) {
3257         spin_lock(&clnt->cl_lock);
3258         list_for_each_entry(task, &clnt->cl_tasks, tk_task) {
3259             if (!header) {
3260                 rpc_show_header();
3261                 header++;
3262             }
3263             rpc_show_task(clnt, task);
3264         }
3265         spin_unlock(&clnt->cl_lock);
3266     }
3267     spin_unlock(&sn->rpc_client_lock);
3268 }
3269 #endif
3270 
3271 #if IS_ENABLED(CONFIG_SUNRPC_SWAP)
3272 static int
3273 rpc_clnt_swap_activate_callback(struct rpc_clnt *clnt,
3274         struct rpc_xprt *xprt,
3275         void *dummy)
3276 {
3277     return xprt_enable_swap(xprt);
3278 }
3279 
3280 int
3281 rpc_clnt_swap_activate(struct rpc_clnt *clnt)
3282 {
3283     while (clnt != clnt->cl_parent)
3284         clnt = clnt->cl_parent;
3285     if (atomic_inc_return(&clnt->cl_swapper) == 1)
3286         return rpc_clnt_iterate_for_each_xprt(clnt,
3287                 rpc_clnt_swap_activate_callback, NULL);
3288     return 0;
3289 }
3290 EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
3291 
3292 static int
3293 rpc_clnt_swap_deactivate_callback(struct rpc_clnt *clnt,
3294         struct rpc_xprt *xprt,
3295         void *dummy)
3296 {
3297     xprt_disable_swap(xprt);
3298     return 0;
3299 }
3300 
3301 void
3302 rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
3303 {
3304     if (atomic_dec_if_positive(&clnt->cl_swapper) == 0)
3305         rpc_clnt_iterate_for_each_xprt(clnt,
3306                 rpc_clnt_swap_deactivate_callback, NULL);
3307 }
3308 EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
3309 #endif /* CONFIG_SUNRPC_SWAP */