Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-only
0002 /*
0003  *  linux/net/sunrpc/xprt.c
0004  *
0005  *  This is a generic RPC call interface supporting congestion avoidance,
0006  *  and asynchronous calls.
0007  *
0008  *  The interface works like this:
0009  *
0010  *  -   When a process places a call, it allocates a request slot if
0011  *  one is available. Otherwise, it sleeps on the backlog queue
0012  *  (xprt_reserve).
0013  *  -   Next, the caller puts together the RPC message, stuffs it into
0014  *  the request struct, and calls xprt_transmit().
0015  *  -   xprt_transmit sends the message and installs the caller on the
0016  *  transport's wait list. At the same time, if a reply is expected,
0017  *  it installs a timer that is run after the packet's timeout has
0018  *  expired.
0019  *  -   When a packet arrives, the data_ready handler walks the list of
0020  *  pending requests for that transport. If a matching XID is found, the
0021  *  caller is woken up, and the timer removed.
0022  *  -   When no reply arrives within the timeout interval, the timer is
0023  *  fired by the kernel and runs xprt_timer(). It either adjusts the
0024  *  timeout values (minor timeout) or wakes up the caller with a status
0025  *  of -ETIMEDOUT.
0026  *  -   When the caller receives a notification from RPC that a reply arrived,
0027  *  it should release the RPC slot, and process the reply.
0028  *  If the call timed out, it may choose to retry the operation by
0029  *  adjusting the initial timeout value, and simply calling rpc_call
0030  *  again.
0031  *
0032  *  Support for async RPC is done through a set of RPC-specific scheduling
0033  *  primitives that `transparently' work for processes as well as async
0034  *  tasks that rely on callbacks.
0035  *
0036  *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de>
0037  *
0038  *  Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com>
0039  */
0040 
0041 #include <linux/module.h>
0042 
0043 #include <linux/types.h>
0044 #include <linux/interrupt.h>
0045 #include <linux/workqueue.h>
0046 #include <linux/net.h>
0047 #include <linux/ktime.h>
0048 
0049 #include <linux/sunrpc/clnt.h>
0050 #include <linux/sunrpc/metrics.h>
0051 #include <linux/sunrpc/bc_xprt.h>
0052 #include <linux/rcupdate.h>
0053 #include <linux/sched/mm.h>
0054 
0055 #include <trace/events/sunrpc.h>
0056 
0057 #include "sunrpc.h"
0058 #include "sysfs.h"
0059 #include "fail.h"
0060 
0061 /*
0062  * Local variables
0063  */
0064 
0065 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
0066 # define RPCDBG_FACILITY    RPCDBG_XPRT
0067 #endif
0068 
0069 /*
0070  * Local functions
0071  */
0072 static void xprt_init(struct rpc_xprt *xprt, struct net *net);
0073 static __be32   xprt_alloc_xid(struct rpc_xprt *xprt);
0074 static void xprt_destroy(struct rpc_xprt *xprt);
0075 static void xprt_request_init(struct rpc_task *task);
0076 static int  xprt_request_prepare(struct rpc_rqst *req, struct xdr_buf *buf);
0077 
0078 static DEFINE_SPINLOCK(xprt_list_lock);
0079 static LIST_HEAD(xprt_list);
0080 
0081 static unsigned long xprt_request_timeout(const struct rpc_rqst *req)
0082 {
0083     unsigned long timeout = jiffies + req->rq_timeout;
0084 
0085     if (time_before(timeout, req->rq_majortimeo))
0086         return timeout;
0087     return req->rq_majortimeo;
0088 }
0089 
0090 /**
0091  * xprt_register_transport - register a transport implementation
0092  * @transport: transport to register
0093  *
0094  * If a transport implementation is loaded as a kernel module, it can
0095  * call this interface to make itself known to the RPC client.
0096  *
0097  * Returns:
0098  * 0:       transport successfully registered
0099  * -EEXIST: transport already registered
0100  * -EINVAL: transport module being unloaded
0101  */
0102 int xprt_register_transport(struct xprt_class *transport)
0103 {
0104     struct xprt_class *t;
0105     int result;
0106 
0107     result = -EEXIST;
0108     spin_lock(&xprt_list_lock);
0109     list_for_each_entry(t, &xprt_list, list) {
0110         /* don't register the same transport class twice */
0111         if (t->ident == transport->ident)
0112             goto out;
0113     }
0114 
0115     list_add_tail(&transport->list, &xprt_list);
0116     printk(KERN_INFO "RPC: Registered %s transport module.\n",
0117            transport->name);
0118     result = 0;
0119 
0120 out:
0121     spin_unlock(&xprt_list_lock);
0122     return result;
0123 }
0124 EXPORT_SYMBOL_GPL(xprt_register_transport);
0125 
0126 /**
0127  * xprt_unregister_transport - unregister a transport implementation
0128  * @transport: transport to unregister
0129  *
0130  * Returns:
0131  * 0:       transport successfully unregistered
0132  * -ENOENT: transport never registered
0133  */
0134 int xprt_unregister_transport(struct xprt_class *transport)
0135 {
0136     struct xprt_class *t;
0137     int result;
0138 
0139     result = 0;
0140     spin_lock(&xprt_list_lock);
0141     list_for_each_entry(t, &xprt_list, list) {
0142         if (t == transport) {
0143             printk(KERN_INFO
0144                 "RPC: Unregistered %s transport module.\n",
0145                 transport->name);
0146             list_del_init(&transport->list);
0147             goto out;
0148         }
0149     }
0150     result = -ENOENT;
0151 
0152 out:
0153     spin_unlock(&xprt_list_lock);
0154     return result;
0155 }
0156 EXPORT_SYMBOL_GPL(xprt_unregister_transport);
0157 
0158 static void
0159 xprt_class_release(const struct xprt_class *t)
0160 {
0161     module_put(t->owner);
0162 }
0163 
0164 static const struct xprt_class *
0165 xprt_class_find_by_ident_locked(int ident)
0166 {
0167     const struct xprt_class *t;
0168 
0169     list_for_each_entry(t, &xprt_list, list) {
0170         if (t->ident != ident)
0171             continue;
0172         if (!try_module_get(t->owner))
0173             continue;
0174         return t;
0175     }
0176     return NULL;
0177 }
0178 
0179 static const struct xprt_class *
0180 xprt_class_find_by_ident(int ident)
0181 {
0182     const struct xprt_class *t;
0183 
0184     spin_lock(&xprt_list_lock);
0185     t = xprt_class_find_by_ident_locked(ident);
0186     spin_unlock(&xprt_list_lock);
0187     return t;
0188 }
0189 
0190 static const struct xprt_class *
0191 xprt_class_find_by_netid_locked(const char *netid)
0192 {
0193     const struct xprt_class *t;
0194     unsigned int i;
0195 
0196     list_for_each_entry(t, &xprt_list, list) {
0197         for (i = 0; t->netid[i][0] != '\0'; i++) {
0198             if (strcmp(t->netid[i], netid) != 0)
0199                 continue;
0200             if (!try_module_get(t->owner))
0201                 continue;
0202             return t;
0203         }
0204     }
0205     return NULL;
0206 }
0207 
0208 static const struct xprt_class *
0209 xprt_class_find_by_netid(const char *netid)
0210 {
0211     const struct xprt_class *t;
0212 
0213     spin_lock(&xprt_list_lock);
0214     t = xprt_class_find_by_netid_locked(netid);
0215     if (!t) {
0216         spin_unlock(&xprt_list_lock);
0217         request_module("rpc%s", netid);
0218         spin_lock(&xprt_list_lock);
0219         t = xprt_class_find_by_netid_locked(netid);
0220     }
0221     spin_unlock(&xprt_list_lock);
0222     return t;
0223 }
0224 
0225 /**
0226  * xprt_find_transport_ident - convert a netid into a transport identifier
0227  * @netid: transport to load
0228  *
0229  * Returns:
0230  * > 0:     transport identifier
0231  * -ENOENT: transport module not available
0232  */
0233 int xprt_find_transport_ident(const char *netid)
0234 {
0235     const struct xprt_class *t;
0236     int ret;
0237 
0238     t = xprt_class_find_by_netid(netid);
0239     if (!t)
0240         return -ENOENT;
0241     ret = t->ident;
0242     xprt_class_release(t);
0243     return ret;
0244 }
0245 EXPORT_SYMBOL_GPL(xprt_find_transport_ident);
0246 
0247 static void xprt_clear_locked(struct rpc_xprt *xprt)
0248 {
0249     xprt->snd_task = NULL;
0250     if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state))
0251         clear_bit_unlock(XPRT_LOCKED, &xprt->state);
0252     else
0253         queue_work(xprtiod_workqueue, &xprt->task_cleanup);
0254 }
0255 
0256 /**
0257  * xprt_reserve_xprt - serialize write access to transports
0258  * @task: task that is requesting access to the transport
0259  * @xprt: pointer to the target transport
0260  *
0261  * This prevents mixing the payload of separate requests, and prevents
0262  * transport connects from colliding with writes.  No congestion control
0263  * is provided.
0264  */
0265 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
0266 {
0267     struct rpc_rqst *req = task->tk_rqstp;
0268 
0269     if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
0270         if (task == xprt->snd_task)
0271             goto out_locked;
0272         goto out_sleep;
0273     }
0274     if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
0275         goto out_unlock;
0276     xprt->snd_task = task;
0277 
0278 out_locked:
0279     trace_xprt_reserve_xprt(xprt, task);
0280     return 1;
0281 
0282 out_unlock:
0283     xprt_clear_locked(xprt);
0284 out_sleep:
0285     task->tk_status = -EAGAIN;
0286     if  (RPC_IS_SOFT(task))
0287         rpc_sleep_on_timeout(&xprt->sending, task, NULL,
0288                 xprt_request_timeout(req));
0289     else
0290         rpc_sleep_on(&xprt->sending, task, NULL);
0291     return 0;
0292 }
0293 EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
0294 
0295 static bool
0296 xprt_need_congestion_window_wait(struct rpc_xprt *xprt)
0297 {
0298     return test_bit(XPRT_CWND_WAIT, &xprt->state);
0299 }
0300 
0301 static void
0302 xprt_set_congestion_window_wait(struct rpc_xprt *xprt)
0303 {
0304     if (!list_empty(&xprt->xmit_queue)) {
0305         /* Peek at head of queue to see if it can make progress */
0306         if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst,
0307                     rq_xmit)->rq_cong)
0308             return;
0309     }
0310     set_bit(XPRT_CWND_WAIT, &xprt->state);
0311 }
0312 
0313 static void
0314 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt)
0315 {
0316     if (!RPCXPRT_CONGESTED(xprt))
0317         clear_bit(XPRT_CWND_WAIT, &xprt->state);
0318 }
0319 
0320 /*
0321  * xprt_reserve_xprt_cong - serialize write access to transports
0322  * @task: task that is requesting access to the transport
0323  *
0324  * Same as xprt_reserve_xprt, but Van Jacobson congestion control is
0325  * integrated into the decision of whether a request is allowed to be
0326  * woken up and given access to the transport.
0327  * Note that the lock is only granted if we know there are free slots.
0328  */
0329 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
0330 {
0331     struct rpc_rqst *req = task->tk_rqstp;
0332 
0333     if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
0334         if (task == xprt->snd_task)
0335             goto out_locked;
0336         goto out_sleep;
0337     }
0338     if (req == NULL) {
0339         xprt->snd_task = task;
0340         goto out_locked;
0341     }
0342     if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
0343         goto out_unlock;
0344     if (!xprt_need_congestion_window_wait(xprt)) {
0345         xprt->snd_task = task;
0346         goto out_locked;
0347     }
0348 out_unlock:
0349     xprt_clear_locked(xprt);
0350 out_sleep:
0351     task->tk_status = -EAGAIN;
0352     if (RPC_IS_SOFT(task))
0353         rpc_sleep_on_timeout(&xprt->sending, task, NULL,
0354                 xprt_request_timeout(req));
0355     else
0356         rpc_sleep_on(&xprt->sending, task, NULL);
0357     return 0;
0358 out_locked:
0359     trace_xprt_reserve_cong(xprt, task);
0360     return 1;
0361 }
0362 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
0363 
0364 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
0365 {
0366     int retval;
0367 
0368     if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task)
0369         return 1;
0370     spin_lock(&xprt->transport_lock);
0371     retval = xprt->ops->reserve_xprt(xprt, task);
0372     spin_unlock(&xprt->transport_lock);
0373     return retval;
0374 }
0375 
0376 static bool __xprt_lock_write_func(struct rpc_task *task, void *data)
0377 {
0378     struct rpc_xprt *xprt = data;
0379 
0380     xprt->snd_task = task;
0381     return true;
0382 }
0383 
0384 static void __xprt_lock_write_next(struct rpc_xprt *xprt)
0385 {
0386     if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
0387         return;
0388     if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
0389         goto out_unlock;
0390     if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
0391                 __xprt_lock_write_func, xprt))
0392         return;
0393 out_unlock:
0394     xprt_clear_locked(xprt);
0395 }
0396 
0397 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
0398 {
0399     if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
0400         return;
0401     if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
0402         goto out_unlock;
0403     if (xprt_need_congestion_window_wait(xprt))
0404         goto out_unlock;
0405     if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
0406                 __xprt_lock_write_func, xprt))
0407         return;
0408 out_unlock:
0409     xprt_clear_locked(xprt);
0410 }
0411 
0412 /**
0413  * xprt_release_xprt - allow other requests to use a transport
0414  * @xprt: transport with other tasks potentially waiting
0415  * @task: task that is releasing access to the transport
0416  *
0417  * Note that "task" can be NULL.  No congestion control is provided.
0418  */
0419 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
0420 {
0421     if (xprt->snd_task == task) {
0422         xprt_clear_locked(xprt);
0423         __xprt_lock_write_next(xprt);
0424     }
0425     trace_xprt_release_xprt(xprt, task);
0426 }
0427 EXPORT_SYMBOL_GPL(xprt_release_xprt);
0428 
0429 /**
0430  * xprt_release_xprt_cong - allow other requests to use a transport
0431  * @xprt: transport with other tasks potentially waiting
0432  * @task: task that is releasing access to the transport
0433  *
0434  * Note that "task" can be NULL.  Another task is awoken to use the
0435  * transport if the transport's congestion window allows it.
0436  */
0437 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
0438 {
0439     if (xprt->snd_task == task) {
0440         xprt_clear_locked(xprt);
0441         __xprt_lock_write_next_cong(xprt);
0442     }
0443     trace_xprt_release_cong(xprt, task);
0444 }
0445 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
0446 
0447 void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
0448 {
0449     if (xprt->snd_task != task)
0450         return;
0451     spin_lock(&xprt->transport_lock);
0452     xprt->ops->release_xprt(xprt, task);
0453     spin_unlock(&xprt->transport_lock);
0454 }
0455 
0456 /*
0457  * Van Jacobson congestion avoidance. Check if the congestion window
0458  * overflowed. Put the task to sleep if this is the case.
0459  */
0460 static int
0461 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
0462 {
0463     if (req->rq_cong)
0464         return 1;
0465     trace_xprt_get_cong(xprt, req->rq_task);
0466     if (RPCXPRT_CONGESTED(xprt)) {
0467         xprt_set_congestion_window_wait(xprt);
0468         return 0;
0469     }
0470     req->rq_cong = 1;
0471     xprt->cong += RPC_CWNDSCALE;
0472     return 1;
0473 }
0474 
0475 /*
0476  * Adjust the congestion window, and wake up the next task
0477  * that has been sleeping due to congestion
0478  */
0479 static void
0480 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
0481 {
0482     if (!req->rq_cong)
0483         return;
0484     req->rq_cong = 0;
0485     xprt->cong -= RPC_CWNDSCALE;
0486     xprt_test_and_clear_congestion_window_wait(xprt);
0487     trace_xprt_put_cong(xprt, req->rq_task);
0488     __xprt_lock_write_next_cong(xprt);
0489 }
0490 
0491 /**
0492  * xprt_request_get_cong - Request congestion control credits
0493  * @xprt: pointer to transport
0494  * @req: pointer to RPC request
0495  *
0496  * Useful for transports that require congestion control.
0497  */
0498 bool
0499 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
0500 {
0501     bool ret = false;
0502 
0503     if (req->rq_cong)
0504         return true;
0505     spin_lock(&xprt->transport_lock);
0506     ret = __xprt_get_cong(xprt, req) != 0;
0507     spin_unlock(&xprt->transport_lock);
0508     return ret;
0509 }
0510 EXPORT_SYMBOL_GPL(xprt_request_get_cong);
0511 
0512 /**
0513  * xprt_release_rqst_cong - housekeeping when request is complete
0514  * @task: RPC request that recently completed
0515  *
0516  * Useful for transports that require congestion control.
0517  */
0518 void xprt_release_rqst_cong(struct rpc_task *task)
0519 {
0520     struct rpc_rqst *req = task->tk_rqstp;
0521 
0522     __xprt_put_cong(req->rq_xprt, req);
0523 }
0524 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
0525 
0526 static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt)
0527 {
0528     if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state))
0529         __xprt_lock_write_next_cong(xprt);
0530 }
0531 
0532 /*
0533  * Clear the congestion window wait flag and wake up the next
0534  * entry on xprt->sending
0535  */
0536 static void
0537 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
0538 {
0539     if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
0540         spin_lock(&xprt->transport_lock);
0541         __xprt_lock_write_next_cong(xprt);
0542         spin_unlock(&xprt->transport_lock);
0543     }
0544 }
0545 
0546 /**
0547  * xprt_adjust_cwnd - adjust transport congestion window
0548  * @xprt: pointer to xprt
0549  * @task: recently completed RPC request used to adjust window
0550  * @result: result code of completed RPC request
0551  *
0552  * The transport code maintains an estimate on the maximum number of out-
0553  * standing RPC requests, using a smoothed version of the congestion
0554  * avoidance implemented in 44BSD. This is basically the Van Jacobson
0555  * congestion algorithm: If a retransmit occurs, the congestion window is
0556  * halved; otherwise, it is incremented by 1/cwnd when
0557  *
0558  *  -   a reply is received and
0559  *  -   a full number of requests are outstanding and
0560  *  -   the congestion window hasn't been updated recently.
0561  */
0562 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result)
0563 {
0564     struct rpc_rqst *req = task->tk_rqstp;
0565     unsigned long cwnd = xprt->cwnd;
0566 
0567     if (result >= 0 && cwnd <= xprt->cong) {
0568         /* The (cwnd >> 1) term makes sure
0569          * the result gets rounded properly. */
0570         cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
0571         if (cwnd > RPC_MAXCWND(xprt))
0572             cwnd = RPC_MAXCWND(xprt);
0573         __xprt_lock_write_next_cong(xprt);
0574     } else if (result == -ETIMEDOUT) {
0575         cwnd >>= 1;
0576         if (cwnd < RPC_CWNDSCALE)
0577             cwnd = RPC_CWNDSCALE;
0578     }
0579     dprintk("RPC:       cong %ld, cwnd was %ld, now %ld\n",
0580             xprt->cong, xprt->cwnd, cwnd);
0581     xprt->cwnd = cwnd;
0582     __xprt_put_cong(xprt, req);
0583 }
0584 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
0585 
0586 /**
0587  * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue
0588  * @xprt: transport with waiting tasks
0589  * @status: result code to plant in each task before waking it
0590  *
0591  */
0592 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
0593 {
0594     if (status < 0)
0595         rpc_wake_up_status(&xprt->pending, status);
0596     else
0597         rpc_wake_up(&xprt->pending);
0598 }
0599 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
0600 
0601 /**
0602  * xprt_wait_for_buffer_space - wait for transport output buffer to clear
0603  * @xprt: transport
0604  *
0605  * Note that we only set the timer for the case of RPC_IS_SOFT(), since
0606  * we don't in general want to force a socket disconnection due to
0607  * an incomplete RPC call transmission.
0608  */
0609 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt)
0610 {
0611     set_bit(XPRT_WRITE_SPACE, &xprt->state);
0612 }
0613 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
0614 
0615 static bool
0616 xprt_clear_write_space_locked(struct rpc_xprt *xprt)
0617 {
0618     if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) {
0619         __xprt_lock_write_next(xprt);
0620         dprintk("RPC:       write space: waking waiting task on "
0621                 "xprt %p\n", xprt);
0622         return true;
0623     }
0624     return false;
0625 }
0626 
0627 /**
0628  * xprt_write_space - wake the task waiting for transport output buffer space
0629  * @xprt: transport with waiting tasks
0630  *
0631  * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
0632  */
0633 bool xprt_write_space(struct rpc_xprt *xprt)
0634 {
0635     bool ret;
0636 
0637     if (!test_bit(XPRT_WRITE_SPACE, &xprt->state))
0638         return false;
0639     spin_lock(&xprt->transport_lock);
0640     ret = xprt_clear_write_space_locked(xprt);
0641     spin_unlock(&xprt->transport_lock);
0642     return ret;
0643 }
0644 EXPORT_SYMBOL_GPL(xprt_write_space);
0645 
0646 static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime)
0647 {
0648     s64 delta = ktime_to_ns(ktime_get() - abstime);
0649     return likely(delta >= 0) ?
0650         jiffies - nsecs_to_jiffies(delta) :
0651         jiffies + nsecs_to_jiffies(-delta);
0652 }
0653 
0654 static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req)
0655 {
0656     const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
0657     unsigned long majortimeo = req->rq_timeout;
0658 
0659     if (to->to_exponential)
0660         majortimeo <<= to->to_retries;
0661     else
0662         majortimeo += to->to_increment * to->to_retries;
0663     if (majortimeo > to->to_maxval || majortimeo == 0)
0664         majortimeo = to->to_maxval;
0665     return majortimeo;
0666 }
0667 
0668 static void xprt_reset_majortimeo(struct rpc_rqst *req)
0669 {
0670     req->rq_majortimeo += xprt_calc_majortimeo(req);
0671 }
0672 
0673 static void xprt_reset_minortimeo(struct rpc_rqst *req)
0674 {
0675     req->rq_minortimeo += req->rq_timeout;
0676 }
0677 
0678 static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req)
0679 {
0680     unsigned long time_init;
0681     struct rpc_xprt *xprt = req->rq_xprt;
0682 
0683     if (likely(xprt && xprt_connected(xprt)))
0684         time_init = jiffies;
0685     else
0686         time_init = xprt_abs_ktime_to_jiffies(task->tk_start);
0687     req->rq_timeout = task->tk_client->cl_timeout->to_initval;
0688     req->rq_majortimeo = time_init + xprt_calc_majortimeo(req);
0689     req->rq_minortimeo = time_init + req->rq_timeout;
0690 }
0691 
0692 /**
0693  * xprt_adjust_timeout - adjust timeout values for next retransmit
0694  * @req: RPC request containing parameters to use for the adjustment
0695  *
0696  */
0697 int xprt_adjust_timeout(struct rpc_rqst *req)
0698 {
0699     struct rpc_xprt *xprt = req->rq_xprt;
0700     const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
0701     int status = 0;
0702 
0703     if (time_before(jiffies, req->rq_majortimeo)) {
0704         if (time_before(jiffies, req->rq_minortimeo))
0705             return status;
0706         if (to->to_exponential)
0707             req->rq_timeout <<= 1;
0708         else
0709             req->rq_timeout += to->to_increment;
0710         if (to->to_maxval && req->rq_timeout >= to->to_maxval)
0711             req->rq_timeout = to->to_maxval;
0712         req->rq_retries++;
0713     } else {
0714         req->rq_timeout = to->to_initval;
0715         req->rq_retries = 0;
0716         xprt_reset_majortimeo(req);
0717         /* Reset the RTT counters == "slow start" */
0718         spin_lock(&xprt->transport_lock);
0719         rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
0720         spin_unlock(&xprt->transport_lock);
0721         status = -ETIMEDOUT;
0722     }
0723     xprt_reset_minortimeo(req);
0724 
0725     if (req->rq_timeout == 0) {
0726         printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
0727         req->rq_timeout = 5 * HZ;
0728     }
0729     return status;
0730 }
0731 
0732 static void xprt_autoclose(struct work_struct *work)
0733 {
0734     struct rpc_xprt *xprt =
0735         container_of(work, struct rpc_xprt, task_cleanup);
0736     unsigned int pflags = memalloc_nofs_save();
0737 
0738     trace_xprt_disconnect_auto(xprt);
0739     xprt->connect_cookie++;
0740     smp_mb__before_atomic();
0741     clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
0742     xprt->ops->close(xprt);
0743     xprt_release_write(xprt, NULL);
0744     wake_up_bit(&xprt->state, XPRT_LOCKED);
0745     memalloc_nofs_restore(pflags);
0746 }
0747 
0748 /**
0749  * xprt_disconnect_done - mark a transport as disconnected
0750  * @xprt: transport to flag for disconnect
0751  *
0752  */
0753 void xprt_disconnect_done(struct rpc_xprt *xprt)
0754 {
0755     trace_xprt_disconnect_done(xprt);
0756     spin_lock(&xprt->transport_lock);
0757     xprt_clear_connected(xprt);
0758     xprt_clear_write_space_locked(xprt);
0759     xprt_clear_congestion_window_wait_locked(xprt);
0760     xprt_wake_pending_tasks(xprt, -ENOTCONN);
0761     spin_unlock(&xprt->transport_lock);
0762 }
0763 EXPORT_SYMBOL_GPL(xprt_disconnect_done);
0764 
0765 /**
0766  * xprt_schedule_autoclose_locked - Try to schedule an autoclose RPC call
0767  * @xprt: transport to disconnect
0768  */
0769 static void xprt_schedule_autoclose_locked(struct rpc_xprt *xprt)
0770 {
0771     if (test_and_set_bit(XPRT_CLOSE_WAIT, &xprt->state))
0772         return;
0773     if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
0774         queue_work(xprtiod_workqueue, &xprt->task_cleanup);
0775     else if (xprt->snd_task && !test_bit(XPRT_SND_IS_COOKIE, &xprt->state))
0776         rpc_wake_up_queued_task_set_status(&xprt->pending,
0777                            xprt->snd_task, -ENOTCONN);
0778 }
0779 
0780 /**
0781  * xprt_force_disconnect - force a transport to disconnect
0782  * @xprt: transport to disconnect
0783  *
0784  */
0785 void xprt_force_disconnect(struct rpc_xprt *xprt)
0786 {
0787     trace_xprt_disconnect_force(xprt);
0788 
0789     /* Don't race with the test_bit() in xprt_clear_locked() */
0790     spin_lock(&xprt->transport_lock);
0791     xprt_schedule_autoclose_locked(xprt);
0792     spin_unlock(&xprt->transport_lock);
0793 }
0794 EXPORT_SYMBOL_GPL(xprt_force_disconnect);
0795 
0796 static unsigned int
0797 xprt_connect_cookie(struct rpc_xprt *xprt)
0798 {
0799     return READ_ONCE(xprt->connect_cookie);
0800 }
0801 
0802 static bool
0803 xprt_request_retransmit_after_disconnect(struct rpc_task *task)
0804 {
0805     struct rpc_rqst *req = task->tk_rqstp;
0806     struct rpc_xprt *xprt = req->rq_xprt;
0807 
0808     return req->rq_connect_cookie != xprt_connect_cookie(xprt) ||
0809         !xprt_connected(xprt);
0810 }
0811 
0812 /**
0813  * xprt_conditional_disconnect - force a transport to disconnect
0814  * @xprt: transport to disconnect
0815  * @cookie: 'connection cookie'
0816  *
0817  * This attempts to break the connection if and only if 'cookie' matches
0818  * the current transport 'connection cookie'. It ensures that we don't
0819  * try to break the connection more than once when we need to retransmit
0820  * a batch of RPC requests.
0821  *
0822  */
0823 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
0824 {
0825     /* Don't race with the test_bit() in xprt_clear_locked() */
0826     spin_lock(&xprt->transport_lock);
0827     if (cookie != xprt->connect_cookie)
0828         goto out;
0829     if (test_bit(XPRT_CLOSING, &xprt->state))
0830         goto out;
0831     xprt_schedule_autoclose_locked(xprt);
0832 out:
0833     spin_unlock(&xprt->transport_lock);
0834 }
0835 
0836 static bool
0837 xprt_has_timer(const struct rpc_xprt *xprt)
0838 {
0839     return xprt->idle_timeout != 0;
0840 }
0841 
0842 static void
0843 xprt_schedule_autodisconnect(struct rpc_xprt *xprt)
0844     __must_hold(&xprt->transport_lock)
0845 {
0846     xprt->last_used = jiffies;
0847     if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt))
0848         mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout);
0849 }
0850 
0851 static void
0852 xprt_init_autodisconnect(struct timer_list *t)
0853 {
0854     struct rpc_xprt *xprt = from_timer(xprt, t, timer);
0855 
0856     if (!RB_EMPTY_ROOT(&xprt->recv_queue))
0857         return;
0858     /* Reset xprt->last_used to avoid connect/autodisconnect cycling */
0859     xprt->last_used = jiffies;
0860     if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
0861         return;
0862     queue_work(xprtiod_workqueue, &xprt->task_cleanup);
0863 }
0864 
0865 #if IS_ENABLED(CONFIG_FAIL_SUNRPC)
0866 static void xprt_inject_disconnect(struct rpc_xprt *xprt)
0867 {
0868     if (!fail_sunrpc.ignore_client_disconnect &&
0869         should_fail(&fail_sunrpc.attr, 1))
0870         xprt->ops->inject_disconnect(xprt);
0871 }
0872 #else
0873 static inline void xprt_inject_disconnect(struct rpc_xprt *xprt)
0874 {
0875 }
0876 #endif
0877 
0878 bool xprt_lock_connect(struct rpc_xprt *xprt,
0879         struct rpc_task *task,
0880         void *cookie)
0881 {
0882     bool ret = false;
0883 
0884     spin_lock(&xprt->transport_lock);
0885     if (!test_bit(XPRT_LOCKED, &xprt->state))
0886         goto out;
0887     if (xprt->snd_task != task)
0888         goto out;
0889     set_bit(XPRT_SND_IS_COOKIE, &xprt->state);
0890     xprt->snd_task = cookie;
0891     ret = true;
0892 out:
0893     spin_unlock(&xprt->transport_lock);
0894     return ret;
0895 }
0896 EXPORT_SYMBOL_GPL(xprt_lock_connect);
0897 
0898 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
0899 {
0900     spin_lock(&xprt->transport_lock);
0901     if (xprt->snd_task != cookie)
0902         goto out;
0903     if (!test_bit(XPRT_LOCKED, &xprt->state))
0904         goto out;
0905     xprt->snd_task =NULL;
0906     clear_bit(XPRT_SND_IS_COOKIE, &xprt->state);
0907     xprt->ops->release_xprt(xprt, NULL);
0908     xprt_schedule_autodisconnect(xprt);
0909 out:
0910     spin_unlock(&xprt->transport_lock);
0911     wake_up_bit(&xprt->state, XPRT_LOCKED);
0912 }
0913 EXPORT_SYMBOL_GPL(xprt_unlock_connect);
0914 
0915 /**
0916  * xprt_connect - schedule a transport connect operation
0917  * @task: RPC task that is requesting the connect
0918  *
0919  */
0920 void xprt_connect(struct rpc_task *task)
0921 {
0922     struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
0923 
0924     trace_xprt_connect(xprt);
0925 
0926     if (!xprt_bound(xprt)) {
0927         task->tk_status = -EAGAIN;
0928         return;
0929     }
0930     if (!xprt_lock_write(xprt, task))
0931         return;
0932 
0933     if (!xprt_connected(xprt) && !test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
0934         task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie;
0935         rpc_sleep_on_timeout(&xprt->pending, task, NULL,
0936                 xprt_request_timeout(task->tk_rqstp));
0937 
0938         if (test_bit(XPRT_CLOSING, &xprt->state))
0939             return;
0940         if (xprt_test_and_set_connecting(xprt))
0941             return;
0942         /* Race breaker */
0943         if (!xprt_connected(xprt)) {
0944             xprt->stat.connect_start = jiffies;
0945             xprt->ops->connect(xprt, task);
0946         } else {
0947             xprt_clear_connecting(xprt);
0948             task->tk_status = 0;
0949             rpc_wake_up_queued_task(&xprt->pending, task);
0950         }
0951     }
0952     xprt_release_write(xprt, task);
0953 }
0954 
0955 /**
0956  * xprt_reconnect_delay - compute the wait before scheduling a connect
0957  * @xprt: transport instance
0958  *
0959  */
0960 unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt)
0961 {
0962     unsigned long start, now = jiffies;
0963 
0964     start = xprt->stat.connect_start + xprt->reestablish_timeout;
0965     if (time_after(start, now))
0966         return start - now;
0967     return 0;
0968 }
0969 EXPORT_SYMBOL_GPL(xprt_reconnect_delay);
0970 
0971 /**
0972  * xprt_reconnect_backoff - compute the new re-establish timeout
0973  * @xprt: transport instance
0974  * @init_to: initial reestablish timeout
0975  *
0976  */
0977 void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to)
0978 {
0979     xprt->reestablish_timeout <<= 1;
0980     if (xprt->reestablish_timeout > xprt->max_reconnect_timeout)
0981         xprt->reestablish_timeout = xprt->max_reconnect_timeout;
0982     if (xprt->reestablish_timeout < init_to)
0983         xprt->reestablish_timeout = init_to;
0984 }
0985 EXPORT_SYMBOL_GPL(xprt_reconnect_backoff);
0986 
0987 enum xprt_xid_rb_cmp {
0988     XID_RB_EQUAL,
0989     XID_RB_LEFT,
0990     XID_RB_RIGHT,
0991 };
0992 static enum xprt_xid_rb_cmp
0993 xprt_xid_cmp(__be32 xid1, __be32 xid2)
0994 {
0995     if (xid1 == xid2)
0996         return XID_RB_EQUAL;
0997     if ((__force u32)xid1 < (__force u32)xid2)
0998         return XID_RB_LEFT;
0999     return XID_RB_RIGHT;
1000 }
1001 
1002 static struct rpc_rqst *
1003 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid)
1004 {
1005     struct rb_node *n = xprt->recv_queue.rb_node;
1006     struct rpc_rqst *req;
1007 
1008     while (n != NULL) {
1009         req = rb_entry(n, struct rpc_rqst, rq_recv);
1010         switch (xprt_xid_cmp(xid, req->rq_xid)) {
1011         case XID_RB_LEFT:
1012             n = n->rb_left;
1013             break;
1014         case XID_RB_RIGHT:
1015             n = n->rb_right;
1016             break;
1017         case XID_RB_EQUAL:
1018             return req;
1019         }
1020     }
1021     return NULL;
1022 }
1023 
1024 static void
1025 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new)
1026 {
1027     struct rb_node **p = &xprt->recv_queue.rb_node;
1028     struct rb_node *n = NULL;
1029     struct rpc_rqst *req;
1030 
1031     while (*p != NULL) {
1032         n = *p;
1033         req = rb_entry(n, struct rpc_rqst, rq_recv);
1034         switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) {
1035         case XID_RB_LEFT:
1036             p = &n->rb_left;
1037             break;
1038         case XID_RB_RIGHT:
1039             p = &n->rb_right;
1040             break;
1041         case XID_RB_EQUAL:
1042             WARN_ON_ONCE(new != req);
1043             return;
1044         }
1045     }
1046     rb_link_node(&new->rq_recv, n, p);
1047     rb_insert_color(&new->rq_recv, &xprt->recv_queue);
1048 }
1049 
1050 static void
1051 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req)
1052 {
1053     rb_erase(&req->rq_recv, &xprt->recv_queue);
1054 }
1055 
1056 /**
1057  * xprt_lookup_rqst - find an RPC request corresponding to an XID
1058  * @xprt: transport on which the original request was transmitted
1059  * @xid: RPC XID of incoming reply
1060  *
1061  * Caller holds xprt->queue_lock.
1062  */
1063 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
1064 {
1065     struct rpc_rqst *entry;
1066 
1067     entry = xprt_request_rb_find(xprt, xid);
1068     if (entry != NULL) {
1069         trace_xprt_lookup_rqst(xprt, xid, 0);
1070         entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
1071         return entry;
1072     }
1073 
1074     dprintk("RPC:       xprt_lookup_rqst did not find xid %08x\n",
1075             ntohl(xid));
1076     trace_xprt_lookup_rqst(xprt, xid, -ENOENT);
1077     xprt->stat.bad_xids++;
1078     return NULL;
1079 }
1080 EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
1081 
1082 static bool
1083 xprt_is_pinned_rqst(struct rpc_rqst *req)
1084 {
1085     return atomic_read(&req->rq_pin) != 0;
1086 }
1087 
1088 /**
1089  * xprt_pin_rqst - Pin a request on the transport receive list
1090  * @req: Request to pin
1091  *
1092  * Caller must ensure this is atomic with the call to xprt_lookup_rqst()
1093  * so should be holding xprt->queue_lock.
1094  */
1095 void xprt_pin_rqst(struct rpc_rqst *req)
1096 {
1097     atomic_inc(&req->rq_pin);
1098 }
1099 EXPORT_SYMBOL_GPL(xprt_pin_rqst);
1100 
1101 /**
1102  * xprt_unpin_rqst - Unpin a request on the transport receive list
1103  * @req: Request to pin
1104  *
1105  * Caller should be holding xprt->queue_lock.
1106  */
1107 void xprt_unpin_rqst(struct rpc_rqst *req)
1108 {
1109     if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) {
1110         atomic_dec(&req->rq_pin);
1111         return;
1112     }
1113     if (atomic_dec_and_test(&req->rq_pin))
1114         wake_up_var(&req->rq_pin);
1115 }
1116 EXPORT_SYMBOL_GPL(xprt_unpin_rqst);
1117 
1118 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
1119 {
1120     wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req));
1121 }
1122 
1123 static bool
1124 xprt_request_data_received(struct rpc_task *task)
1125 {
1126     return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
1127         READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0;
1128 }
1129 
1130 static bool
1131 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req)
1132 {
1133     return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
1134         READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0;
1135 }
1136 
1137 /**
1138  * xprt_request_enqueue_receive - Add an request to the receive queue
1139  * @task: RPC task
1140  *
1141  */
1142 int
1143 xprt_request_enqueue_receive(struct rpc_task *task)
1144 {
1145     struct rpc_rqst *req = task->tk_rqstp;
1146     struct rpc_xprt *xprt = req->rq_xprt;
1147     int ret;
1148 
1149     if (!xprt_request_need_enqueue_receive(task, req))
1150         return 0;
1151 
1152     ret = xprt_request_prepare(task->tk_rqstp, &req->rq_rcv_buf);
1153     if (ret)
1154         return ret;
1155     spin_lock(&xprt->queue_lock);
1156 
1157     /* Update the softirq receive buffer */
1158     memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
1159             sizeof(req->rq_private_buf));
1160 
1161     /* Add request to the receive list */
1162     xprt_request_rb_insert(xprt, req);
1163     set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
1164     spin_unlock(&xprt->queue_lock);
1165 
1166     /* Turn off autodisconnect */
1167     del_singleshot_timer_sync(&xprt->timer);
1168     return 0;
1169 }
1170 
1171 /**
1172  * xprt_request_dequeue_receive_locked - Remove a request from the receive queue
1173  * @task: RPC task
1174  *
1175  * Caller must hold xprt->queue_lock.
1176  */
1177 static void
1178 xprt_request_dequeue_receive_locked(struct rpc_task *task)
1179 {
1180     struct rpc_rqst *req = task->tk_rqstp;
1181 
1182     if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
1183         xprt_request_rb_remove(req->rq_xprt, req);
1184 }
1185 
1186 /**
1187  * xprt_update_rtt - Update RPC RTT statistics
1188  * @task: RPC request that recently completed
1189  *
1190  * Caller holds xprt->queue_lock.
1191  */
1192 void xprt_update_rtt(struct rpc_task *task)
1193 {
1194     struct rpc_rqst *req = task->tk_rqstp;
1195     struct rpc_rtt *rtt = task->tk_client->cl_rtt;
1196     unsigned int timer = task->tk_msg.rpc_proc->p_timer;
1197     long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt));
1198 
1199     if (timer) {
1200         if (req->rq_ntrans == 1)
1201             rpc_update_rtt(rtt, timer, m);
1202         rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
1203     }
1204 }
1205 EXPORT_SYMBOL_GPL(xprt_update_rtt);
1206 
1207 /**
1208  * xprt_complete_rqst - called when reply processing is complete
1209  * @task: RPC request that recently completed
1210  * @copied: actual number of bytes received from the transport
1211  *
1212  * Caller holds xprt->queue_lock.
1213  */
1214 void xprt_complete_rqst(struct rpc_task *task, int copied)
1215 {
1216     struct rpc_rqst *req = task->tk_rqstp;
1217     struct rpc_xprt *xprt = req->rq_xprt;
1218 
1219     xprt->stat.recvs++;
1220 
1221     xdr_free_bvec(&req->rq_rcv_buf);
1222     req->rq_private_buf.bvec = NULL;
1223     req->rq_private_buf.len = copied;
1224     /* Ensure all writes are done before we update */
1225     /* req->rq_reply_bytes_recvd */
1226     smp_wmb();
1227     req->rq_reply_bytes_recvd = copied;
1228     xprt_request_dequeue_receive_locked(task);
1229     rpc_wake_up_queued_task(&xprt->pending, task);
1230 }
1231 EXPORT_SYMBOL_GPL(xprt_complete_rqst);
1232 
1233 static void xprt_timer(struct rpc_task *task)
1234 {
1235     struct rpc_rqst *req = task->tk_rqstp;
1236     struct rpc_xprt *xprt = req->rq_xprt;
1237 
1238     if (task->tk_status != -ETIMEDOUT)
1239         return;
1240 
1241     trace_xprt_timer(xprt, req->rq_xid, task->tk_status);
1242     if (!req->rq_reply_bytes_recvd) {
1243         if (xprt->ops->timer)
1244             xprt->ops->timer(xprt, task);
1245     } else
1246         task->tk_status = 0;
1247 }
1248 
1249 /**
1250  * xprt_wait_for_reply_request_def - wait for reply
1251  * @task: pointer to rpc_task
1252  *
1253  * Set a request's retransmit timeout based on the transport's
1254  * default timeout parameters.  Used by transports that don't adjust
1255  * the retransmit timeout based on round-trip time estimation,
1256  * and put the task to sleep on the pending queue.
1257  */
1258 void xprt_wait_for_reply_request_def(struct rpc_task *task)
1259 {
1260     struct rpc_rqst *req = task->tk_rqstp;
1261 
1262     rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer,
1263             xprt_request_timeout(req));
1264 }
1265 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def);
1266 
1267 /**
1268  * xprt_wait_for_reply_request_rtt - wait for reply using RTT estimator
1269  * @task: pointer to rpc_task
1270  *
1271  * Set a request's retransmit timeout using the RTT estimator,
1272  * and put the task to sleep on the pending queue.
1273  */
1274 void xprt_wait_for_reply_request_rtt(struct rpc_task *task)
1275 {
1276     int timer = task->tk_msg.rpc_proc->p_timer;
1277     struct rpc_clnt *clnt = task->tk_client;
1278     struct rpc_rtt *rtt = clnt->cl_rtt;
1279     struct rpc_rqst *req = task->tk_rqstp;
1280     unsigned long max_timeout = clnt->cl_timeout->to_maxval;
1281     unsigned long timeout;
1282 
1283     timeout = rpc_calc_rto(rtt, timer);
1284     timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
1285     if (timeout > max_timeout || timeout == 0)
1286         timeout = max_timeout;
1287     rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer,
1288             jiffies + timeout);
1289 }
1290 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt);
1291 
1292 /**
1293  * xprt_request_wait_receive - wait for the reply to an RPC request
1294  * @task: RPC task about to send a request
1295  *
1296  */
1297 void xprt_request_wait_receive(struct rpc_task *task)
1298 {
1299     struct rpc_rqst *req = task->tk_rqstp;
1300     struct rpc_xprt *xprt = req->rq_xprt;
1301 
1302     if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
1303         return;
1304     /*
1305      * Sleep on the pending queue if we're expecting a reply.
1306      * The spinlock ensures atomicity between the test of
1307      * req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
1308      */
1309     spin_lock(&xprt->queue_lock);
1310     if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
1311         xprt->ops->wait_for_reply_request(task);
1312         /*
1313          * Send an extra queue wakeup call if the
1314          * connection was dropped in case the call to
1315          * rpc_sleep_on() raced.
1316          */
1317         if (xprt_request_retransmit_after_disconnect(task))
1318             rpc_wake_up_queued_task_set_status(&xprt->pending,
1319                     task, -ENOTCONN);
1320     }
1321     spin_unlock(&xprt->queue_lock);
1322 }
1323 
1324 static bool
1325 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req)
1326 {
1327     return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1328 }
1329 
1330 /**
1331  * xprt_request_enqueue_transmit - queue a task for transmission
1332  * @task: pointer to rpc_task
1333  *
1334  * Add a task to the transmission queue.
1335  */
1336 void
1337 xprt_request_enqueue_transmit(struct rpc_task *task)
1338 {
1339     struct rpc_rqst *pos, *req = task->tk_rqstp;
1340     struct rpc_xprt *xprt = req->rq_xprt;
1341     int ret;
1342 
1343     if (xprt_request_need_enqueue_transmit(task, req)) {
1344         ret = xprt_request_prepare(task->tk_rqstp, &req->rq_snd_buf);
1345         if (ret) {
1346             task->tk_status = ret;
1347             return;
1348         }
1349         req->rq_bytes_sent = 0;
1350         spin_lock(&xprt->queue_lock);
1351         /*
1352          * Requests that carry congestion control credits are added
1353          * to the head of the list to avoid starvation issues.
1354          */
1355         if (req->rq_cong) {
1356             xprt_clear_congestion_window_wait(xprt);
1357             list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1358                 if (pos->rq_cong)
1359                     continue;
1360                 /* Note: req is added _before_ pos */
1361                 list_add_tail(&req->rq_xmit, &pos->rq_xmit);
1362                 INIT_LIST_HEAD(&req->rq_xmit2);
1363                 goto out;
1364             }
1365         } else if (!req->rq_seqno) {
1366             list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1367                 if (pos->rq_task->tk_owner != task->tk_owner)
1368                     continue;
1369                 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
1370                 INIT_LIST_HEAD(&req->rq_xmit);
1371                 goto out;
1372             }
1373         }
1374         list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
1375         INIT_LIST_HEAD(&req->rq_xmit2);
1376 out:
1377         atomic_long_inc(&xprt->xmit_queuelen);
1378         set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1379         spin_unlock(&xprt->queue_lock);
1380     }
1381 }
1382 
1383 /**
1384  * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue
1385  * @task: pointer to rpc_task
1386  *
1387  * Remove a task from the transmission queue
1388  * Caller must hold xprt->queue_lock
1389  */
1390 static void
1391 xprt_request_dequeue_transmit_locked(struct rpc_task *task)
1392 {
1393     struct rpc_rqst *req = task->tk_rqstp;
1394 
1395     if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1396         return;
1397     if (!list_empty(&req->rq_xmit)) {
1398         list_del(&req->rq_xmit);
1399         if (!list_empty(&req->rq_xmit2)) {
1400             struct rpc_rqst *next = list_first_entry(&req->rq_xmit2,
1401                     struct rpc_rqst, rq_xmit2);
1402             list_del(&req->rq_xmit2);
1403             list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue);
1404         }
1405     } else
1406         list_del(&req->rq_xmit2);
1407     atomic_long_dec(&req->rq_xprt->xmit_queuelen);
1408     xdr_free_bvec(&req->rq_snd_buf);
1409 }
1410 
1411 /**
1412  * xprt_request_dequeue_transmit - remove a task from the transmission queue
1413  * @task: pointer to rpc_task
1414  *
1415  * Remove a task from the transmission queue
1416  */
1417 static void
1418 xprt_request_dequeue_transmit(struct rpc_task *task)
1419 {
1420     struct rpc_rqst *req = task->tk_rqstp;
1421     struct rpc_xprt *xprt = req->rq_xprt;
1422 
1423     spin_lock(&xprt->queue_lock);
1424     xprt_request_dequeue_transmit_locked(task);
1425     spin_unlock(&xprt->queue_lock);
1426 }
1427 
1428 /**
1429  * xprt_request_dequeue_xprt - remove a task from the transmit+receive queue
1430  * @task: pointer to rpc_task
1431  *
1432  * Remove a task from the transmit and receive queues, and ensure that
1433  * it is not pinned by the receive work item.
1434  */
1435 void
1436 xprt_request_dequeue_xprt(struct rpc_task *task)
1437 {
1438     struct rpc_rqst *req = task->tk_rqstp;
1439     struct rpc_xprt *xprt = req->rq_xprt;
1440 
1441     if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
1442         test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
1443         xprt_is_pinned_rqst(req)) {
1444         spin_lock(&xprt->queue_lock);
1445         while (xprt_is_pinned_rqst(req)) {
1446             set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
1447             spin_unlock(&xprt->queue_lock);
1448             xprt_wait_on_pinned_rqst(req);
1449             spin_lock(&xprt->queue_lock);
1450             clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
1451         }
1452         xprt_request_dequeue_transmit_locked(task);
1453         xprt_request_dequeue_receive_locked(task);
1454         spin_unlock(&xprt->queue_lock);
1455         xdr_free_bvec(&req->rq_rcv_buf);
1456     }
1457 }
1458 
1459 /**
1460  * xprt_request_prepare - prepare an encoded request for transport
1461  * @req: pointer to rpc_rqst
1462  * @buf: pointer to send/rcv xdr_buf
1463  *
1464  * Calls into the transport layer to do whatever is needed to prepare
1465  * the request for transmission or receive.
1466  * Returns error, or zero.
1467  */
1468 static int
1469 xprt_request_prepare(struct rpc_rqst *req, struct xdr_buf *buf)
1470 {
1471     struct rpc_xprt *xprt = req->rq_xprt;
1472 
1473     if (xprt->ops->prepare_request)
1474         return xprt->ops->prepare_request(req, buf);
1475     return 0;
1476 }
1477 
1478 /**
1479  * xprt_request_need_retransmit - Test if a task needs retransmission
1480  * @task: pointer to rpc_task
1481  *
1482  * Test for whether a connection breakage requires the task to retransmit
1483  */
1484 bool
1485 xprt_request_need_retransmit(struct rpc_task *task)
1486 {
1487     return xprt_request_retransmit_after_disconnect(task);
1488 }
1489 
1490 /**
1491  * xprt_prepare_transmit - reserve the transport before sending a request
1492  * @task: RPC task about to send a request
1493  *
1494  */
1495 bool xprt_prepare_transmit(struct rpc_task *task)
1496 {
1497     struct rpc_rqst *req = task->tk_rqstp;
1498     struct rpc_xprt *xprt = req->rq_xprt;
1499 
1500     if (!xprt_lock_write(xprt, task)) {
1501         /* Race breaker: someone may have transmitted us */
1502         if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1503             rpc_wake_up_queued_task_set_status(&xprt->sending,
1504                     task, 0);
1505         return false;
1506 
1507     }
1508     if (atomic_read(&xprt->swapper))
1509         /* This will be clear in __rpc_execute */
1510         current->flags |= PF_MEMALLOC;
1511     return true;
1512 }
1513 
1514 void xprt_end_transmit(struct rpc_task *task)
1515 {
1516     struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1517 
1518     xprt_inject_disconnect(xprt);
1519     xprt_release_write(xprt, task);
1520 }
1521 
1522 /**
1523  * xprt_request_transmit - send an RPC request on a transport
1524  * @req: pointer to request to transmit
1525  * @snd_task: RPC task that owns the transport lock
1526  *
1527  * This performs the transmission of a single request.
1528  * Note that if the request is not the same as snd_task, then it
1529  * does need to be pinned.
1530  * Returns '0' on success.
1531  */
1532 static int
1533 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
1534 {
1535     struct rpc_xprt *xprt = req->rq_xprt;
1536     struct rpc_task *task = req->rq_task;
1537     unsigned int connect_cookie;
1538     int is_retrans = RPC_WAS_SENT(task);
1539     int status;
1540 
1541     if (!req->rq_bytes_sent) {
1542         if (xprt_request_data_received(task)) {
1543             status = 0;
1544             goto out_dequeue;
1545         }
1546         /* Verify that our message lies in the RPCSEC_GSS window */
1547         if (rpcauth_xmit_need_reencode(task)) {
1548             status = -EBADMSG;
1549             goto out_dequeue;
1550         }
1551         if (RPC_SIGNALLED(task)) {
1552             status = -ERESTARTSYS;
1553             goto out_dequeue;
1554         }
1555     }
1556 
1557     /*
1558      * Update req->rq_ntrans before transmitting to avoid races with
1559      * xprt_update_rtt(), which needs to know that it is recording a
1560      * reply to the first transmission.
1561      */
1562     req->rq_ntrans++;
1563 
1564     trace_rpc_xdr_sendto(task, &req->rq_snd_buf);
1565     connect_cookie = xprt->connect_cookie;
1566     status = xprt->ops->send_request(req);
1567     if (status != 0) {
1568         req->rq_ntrans--;
1569         trace_xprt_transmit(req, status);
1570         return status;
1571     }
1572 
1573     if (is_retrans) {
1574         task->tk_client->cl_stats->rpcretrans++;
1575         trace_xprt_retransmit(req);
1576     }
1577 
1578     xprt_inject_disconnect(xprt);
1579 
1580     task->tk_flags |= RPC_TASK_SENT;
1581     spin_lock(&xprt->transport_lock);
1582 
1583     xprt->stat.sends++;
1584     xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
1585     xprt->stat.bklog_u += xprt->backlog.qlen;
1586     xprt->stat.sending_u += xprt->sending.qlen;
1587     xprt->stat.pending_u += xprt->pending.qlen;
1588     spin_unlock(&xprt->transport_lock);
1589 
1590     req->rq_connect_cookie = connect_cookie;
1591 out_dequeue:
1592     trace_xprt_transmit(req, status);
1593     xprt_request_dequeue_transmit(task);
1594     rpc_wake_up_queued_task_set_status(&xprt->sending, task, status);
1595     return status;
1596 }
1597 
1598 /**
1599  * xprt_transmit - send an RPC request on a transport
1600  * @task: controlling RPC task
1601  *
1602  * Attempts to drain the transmit queue. On exit, either the transport
1603  * signalled an error that needs to be handled before transmission can
1604  * resume, or @task finished transmitting, and detected that it already
1605  * received a reply.
1606  */
1607 void
1608 xprt_transmit(struct rpc_task *task)
1609 {
1610     struct rpc_rqst *next, *req = task->tk_rqstp;
1611     struct rpc_xprt *xprt = req->rq_xprt;
1612     int status;
1613 
1614     spin_lock(&xprt->queue_lock);
1615     for (;;) {
1616         next = list_first_entry_or_null(&xprt->xmit_queue,
1617                         struct rpc_rqst, rq_xmit);
1618         if (!next)
1619             break;
1620         xprt_pin_rqst(next);
1621         spin_unlock(&xprt->queue_lock);
1622         status = xprt_request_transmit(next, task);
1623         if (status == -EBADMSG && next != req)
1624             status = 0;
1625         spin_lock(&xprt->queue_lock);
1626         xprt_unpin_rqst(next);
1627         if (status < 0) {
1628             if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1629                 task->tk_status = status;
1630             break;
1631         }
1632         /* Was @task transmitted, and has it received a reply? */
1633         if (xprt_request_data_received(task) &&
1634             !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1635             break;
1636         cond_resched_lock(&xprt->queue_lock);
1637     }
1638     spin_unlock(&xprt->queue_lock);
1639 }
1640 
1641 static void xprt_complete_request_init(struct rpc_task *task)
1642 {
1643     if (task->tk_rqstp)
1644         xprt_request_init(task);
1645 }
1646 
1647 void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
1648 {
1649     set_bit(XPRT_CONGESTED, &xprt->state);
1650     rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init);
1651 }
1652 EXPORT_SYMBOL_GPL(xprt_add_backlog);
1653 
1654 static bool __xprt_set_rq(struct rpc_task *task, void *data)
1655 {
1656     struct rpc_rqst *req = data;
1657 
1658     if (task->tk_rqstp == NULL) {
1659         memset(req, 0, sizeof(*req));   /* mark unused */
1660         task->tk_rqstp = req;
1661         return true;
1662     }
1663     return false;
1664 }
1665 
1666 bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
1667 {
1668     if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) {
1669         clear_bit(XPRT_CONGESTED, &xprt->state);
1670         return false;
1671     }
1672     return true;
1673 }
1674 EXPORT_SYMBOL_GPL(xprt_wake_up_backlog);
1675 
1676 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
1677 {
1678     bool ret = false;
1679 
1680     if (!test_bit(XPRT_CONGESTED, &xprt->state))
1681         goto out;
1682     spin_lock(&xprt->reserve_lock);
1683     if (test_bit(XPRT_CONGESTED, &xprt->state)) {
1684         xprt_add_backlog(xprt, task);
1685         ret = true;
1686     }
1687     spin_unlock(&xprt->reserve_lock);
1688 out:
1689     return ret;
1690 }
1691 
1692 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt)
1693 {
1694     struct rpc_rqst *req = ERR_PTR(-EAGAIN);
1695 
1696     if (xprt->num_reqs >= xprt->max_reqs)
1697         goto out;
1698     ++xprt->num_reqs;
1699     spin_unlock(&xprt->reserve_lock);
1700     req = kzalloc(sizeof(*req), rpc_task_gfp_mask());
1701     spin_lock(&xprt->reserve_lock);
1702     if (req != NULL)
1703         goto out;
1704     --xprt->num_reqs;
1705     req = ERR_PTR(-ENOMEM);
1706 out:
1707     return req;
1708 }
1709 
1710 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
1711 {
1712     if (xprt->num_reqs > xprt->min_reqs) {
1713         --xprt->num_reqs;
1714         kfree(req);
1715         return true;
1716     }
1717     return false;
1718 }
1719 
1720 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
1721 {
1722     struct rpc_rqst *req;
1723 
1724     spin_lock(&xprt->reserve_lock);
1725     if (!list_empty(&xprt->free)) {
1726         req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
1727         list_del(&req->rq_list);
1728         goto out_init_req;
1729     }
1730     req = xprt_dynamic_alloc_slot(xprt);
1731     if (!IS_ERR(req))
1732         goto out_init_req;
1733     switch (PTR_ERR(req)) {
1734     case -ENOMEM:
1735         dprintk("RPC:       dynamic allocation of request slot "
1736                 "failed! Retrying\n");
1737         task->tk_status = -ENOMEM;
1738         break;
1739     case -EAGAIN:
1740         xprt_add_backlog(xprt, task);
1741         dprintk("RPC:       waiting for request slot\n");
1742         fallthrough;
1743     default:
1744         task->tk_status = -EAGAIN;
1745     }
1746     spin_unlock(&xprt->reserve_lock);
1747     return;
1748 out_init_req:
1749     xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots,
1750                      xprt->num_reqs);
1751     spin_unlock(&xprt->reserve_lock);
1752 
1753     task->tk_status = 0;
1754     task->tk_rqstp = req;
1755 }
1756 EXPORT_SYMBOL_GPL(xprt_alloc_slot);
1757 
1758 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
1759 {
1760     spin_lock(&xprt->reserve_lock);
1761     if (!xprt_wake_up_backlog(xprt, req) &&
1762         !xprt_dynamic_free_slot(xprt, req)) {
1763         memset(req, 0, sizeof(*req));   /* mark unused */
1764         list_add(&req->rq_list, &xprt->free);
1765     }
1766     spin_unlock(&xprt->reserve_lock);
1767 }
1768 EXPORT_SYMBOL_GPL(xprt_free_slot);
1769 
1770 static void xprt_free_all_slots(struct rpc_xprt *xprt)
1771 {
1772     struct rpc_rqst *req;
1773     while (!list_empty(&xprt->free)) {
1774         req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list);
1775         list_del(&req->rq_list);
1776         kfree(req);
1777     }
1778 }
1779 
1780 static DEFINE_IDA(rpc_xprt_ids);
1781 
1782 void xprt_cleanup_ids(void)
1783 {
1784     ida_destroy(&rpc_xprt_ids);
1785 }
1786 
1787 static int xprt_alloc_id(struct rpc_xprt *xprt)
1788 {
1789     int id;
1790 
1791     id = ida_simple_get(&rpc_xprt_ids, 0, 0, GFP_KERNEL);
1792     if (id < 0)
1793         return id;
1794 
1795     xprt->id = id;
1796     return 0;
1797 }
1798 
1799 static void xprt_free_id(struct rpc_xprt *xprt)
1800 {
1801     ida_simple_remove(&rpc_xprt_ids, xprt->id);
1802 }
1803 
1804 struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
1805         unsigned int num_prealloc,
1806         unsigned int max_alloc)
1807 {
1808     struct rpc_xprt *xprt;
1809     struct rpc_rqst *req;
1810     int i;
1811 
1812     xprt = kzalloc(size, GFP_KERNEL);
1813     if (xprt == NULL)
1814         goto out;
1815 
1816     xprt_alloc_id(xprt);
1817     xprt_init(xprt, net);
1818 
1819     for (i = 0; i < num_prealloc; i++) {
1820         req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);
1821         if (!req)
1822             goto out_free;
1823         list_add(&req->rq_list, &xprt->free);
1824     }
1825     if (max_alloc > num_prealloc)
1826         xprt->max_reqs = max_alloc;
1827     else
1828         xprt->max_reqs = num_prealloc;
1829     xprt->min_reqs = num_prealloc;
1830     xprt->num_reqs = num_prealloc;
1831 
1832     return xprt;
1833 
1834 out_free:
1835     xprt_free(xprt);
1836 out:
1837     return NULL;
1838 }
1839 EXPORT_SYMBOL_GPL(xprt_alloc);
1840 
1841 void xprt_free(struct rpc_xprt *xprt)
1842 {
1843     put_net_track(xprt->xprt_net, &xprt->ns_tracker);
1844     xprt_free_all_slots(xprt);
1845     xprt_free_id(xprt);
1846     rpc_sysfs_xprt_destroy(xprt);
1847     kfree_rcu(xprt, rcu);
1848 }
1849 EXPORT_SYMBOL_GPL(xprt_free);
1850 
1851 static void
1852 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt)
1853 {
1854     req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1;
1855 }
1856 
1857 static __be32
1858 xprt_alloc_xid(struct rpc_xprt *xprt)
1859 {
1860     __be32 xid;
1861 
1862     spin_lock(&xprt->reserve_lock);
1863     xid = (__force __be32)xprt->xid++;
1864     spin_unlock(&xprt->reserve_lock);
1865     return xid;
1866 }
1867 
1868 static void
1869 xprt_init_xid(struct rpc_xprt *xprt)
1870 {
1871     xprt->xid = prandom_u32();
1872 }
1873 
1874 static void
1875 xprt_request_init(struct rpc_task *task)
1876 {
1877     struct rpc_xprt *xprt = task->tk_xprt;
1878     struct rpc_rqst *req = task->tk_rqstp;
1879 
1880     req->rq_task    = task;
1881     req->rq_xprt    = xprt;
1882     req->rq_buffer  = NULL;
1883     req->rq_xid = xprt_alloc_xid(xprt);
1884     xprt_init_connect_cookie(req, xprt);
1885     req->rq_snd_buf.len = 0;
1886     req->rq_snd_buf.buflen = 0;
1887     req->rq_rcv_buf.len = 0;
1888     req->rq_rcv_buf.buflen = 0;
1889     req->rq_snd_buf.bvec = NULL;
1890     req->rq_rcv_buf.bvec = NULL;
1891     req->rq_release_snd_buf = NULL;
1892     xprt_init_majortimeo(task, req);
1893 
1894     trace_xprt_reserve(req);
1895 }
1896 
1897 static void
1898 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task)
1899 {
1900     xprt->ops->alloc_slot(xprt, task);
1901     if (task->tk_rqstp != NULL)
1902         xprt_request_init(task);
1903 }
1904 
1905 /**
1906  * xprt_reserve - allocate an RPC request slot
1907  * @task: RPC task requesting a slot allocation
1908  *
1909  * If the transport is marked as being congested, or if no more
1910  * slots are available, place the task on the transport's
1911  * backlog queue.
1912  */
1913 void xprt_reserve(struct rpc_task *task)
1914 {
1915     struct rpc_xprt *xprt = task->tk_xprt;
1916 
1917     task->tk_status = 0;
1918     if (task->tk_rqstp != NULL)
1919         return;
1920 
1921     task->tk_status = -EAGAIN;
1922     if (!xprt_throttle_congested(xprt, task))
1923         xprt_do_reserve(xprt, task);
1924 }
1925 
1926 /**
1927  * xprt_retry_reserve - allocate an RPC request slot
1928  * @task: RPC task requesting a slot allocation
1929  *
1930  * If no more slots are available, place the task on the transport's
1931  * backlog queue.
1932  * Note that the only difference with xprt_reserve is that we now
1933  * ignore the value of the XPRT_CONGESTED flag.
1934  */
1935 void xprt_retry_reserve(struct rpc_task *task)
1936 {
1937     struct rpc_xprt *xprt = task->tk_xprt;
1938 
1939     task->tk_status = 0;
1940     if (task->tk_rqstp != NULL)
1941         return;
1942 
1943     task->tk_status = -EAGAIN;
1944     xprt_do_reserve(xprt, task);
1945 }
1946 
1947 /**
1948  * xprt_release - release an RPC request slot
1949  * @task: task which is finished with the slot
1950  *
1951  */
1952 void xprt_release(struct rpc_task *task)
1953 {
1954     struct rpc_xprt *xprt;
1955     struct rpc_rqst *req = task->tk_rqstp;
1956 
1957     if (req == NULL) {
1958         if (task->tk_client) {
1959             xprt = task->tk_xprt;
1960             xprt_release_write(xprt, task);
1961         }
1962         return;
1963     }
1964 
1965     xprt = req->rq_xprt;
1966     xprt_request_dequeue_xprt(task);
1967     spin_lock(&xprt->transport_lock);
1968     xprt->ops->release_xprt(xprt, task);
1969     if (xprt->ops->release_request)
1970         xprt->ops->release_request(task);
1971     xprt_schedule_autodisconnect(xprt);
1972     spin_unlock(&xprt->transport_lock);
1973     if (req->rq_buffer)
1974         xprt->ops->buf_free(task);
1975     if (req->rq_cred != NULL)
1976         put_rpccred(req->rq_cred);
1977     if (req->rq_release_snd_buf)
1978         req->rq_release_snd_buf(req);
1979 
1980     task->tk_rqstp = NULL;
1981     if (likely(!bc_prealloc(req)))
1982         xprt->ops->free_slot(xprt, req);
1983     else
1984         xprt_free_bc_request(req);
1985 }
1986 
1987 #ifdef CONFIG_SUNRPC_BACKCHANNEL
1988 void
1989 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task)
1990 {
1991     struct xdr_buf *xbufp = &req->rq_snd_buf;
1992 
1993     task->tk_rqstp = req;
1994     req->rq_task = task;
1995     xprt_init_connect_cookie(req, req->rq_xprt);
1996     /*
1997      * Set up the xdr_buf length.
1998      * This also indicates that the buffer is XDR encoded already.
1999      */
2000     xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
2001         xbufp->tail[0].iov_len;
2002 }
2003 #endif
2004 
2005 static void xprt_init(struct rpc_xprt *xprt, struct net *net)
2006 {
2007     kref_init(&xprt->kref);
2008 
2009     spin_lock_init(&xprt->transport_lock);
2010     spin_lock_init(&xprt->reserve_lock);
2011     spin_lock_init(&xprt->queue_lock);
2012 
2013     INIT_LIST_HEAD(&xprt->free);
2014     xprt->recv_queue = RB_ROOT;
2015     INIT_LIST_HEAD(&xprt->xmit_queue);
2016 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
2017     spin_lock_init(&xprt->bc_pa_lock);
2018     INIT_LIST_HEAD(&xprt->bc_pa_list);
2019 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
2020     INIT_LIST_HEAD(&xprt->xprt_switch);
2021 
2022     xprt->last_used = jiffies;
2023     xprt->cwnd = RPC_INITCWND;
2024     xprt->bind_index = 0;
2025 
2026     rpc_init_wait_queue(&xprt->binding, "xprt_binding");
2027     rpc_init_wait_queue(&xprt->pending, "xprt_pending");
2028     rpc_init_wait_queue(&xprt->sending, "xprt_sending");
2029     rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
2030 
2031     xprt_init_xid(xprt);
2032 
2033     xprt->xprt_net = get_net_track(net, &xprt->ns_tracker, GFP_KERNEL);
2034 }
2035 
2036 /**
2037  * xprt_create_transport - create an RPC transport
2038  * @args: rpc transport creation arguments
2039  *
2040  */
2041 struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
2042 {
2043     struct rpc_xprt *xprt;
2044     const struct xprt_class *t;
2045 
2046     t = xprt_class_find_by_ident(args->ident);
2047     if (!t) {
2048         dprintk("RPC: transport (%d) not supported\n", args->ident);
2049         return ERR_PTR(-EIO);
2050     }
2051 
2052     xprt = t->setup(args);
2053     xprt_class_release(t);
2054 
2055     if (IS_ERR(xprt))
2056         goto out;
2057     if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT)
2058         xprt->idle_timeout = 0;
2059     INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
2060     if (xprt_has_timer(xprt))
2061         timer_setup(&xprt->timer, xprt_init_autodisconnect, 0);
2062     else
2063         timer_setup(&xprt->timer, NULL, 0);
2064 
2065     if (strlen(args->servername) > RPC_MAXNETNAMELEN) {
2066         xprt_destroy(xprt);
2067         return ERR_PTR(-EINVAL);
2068     }
2069     xprt->servername = kstrdup(args->servername, GFP_KERNEL);
2070     if (xprt->servername == NULL) {
2071         xprt_destroy(xprt);
2072         return ERR_PTR(-ENOMEM);
2073     }
2074 
2075     rpc_xprt_debugfs_register(xprt);
2076 
2077     trace_xprt_create(xprt);
2078 out:
2079     return xprt;
2080 }
2081 
2082 static void xprt_destroy_cb(struct work_struct *work)
2083 {
2084     struct rpc_xprt *xprt =
2085         container_of(work, struct rpc_xprt, task_cleanup);
2086 
2087     trace_xprt_destroy(xprt);
2088 
2089     rpc_xprt_debugfs_unregister(xprt);
2090     rpc_destroy_wait_queue(&xprt->binding);
2091     rpc_destroy_wait_queue(&xprt->pending);
2092     rpc_destroy_wait_queue(&xprt->sending);
2093     rpc_destroy_wait_queue(&xprt->backlog);
2094     kfree(xprt->servername);
2095     /*
2096      * Destroy any existing back channel
2097      */
2098     xprt_destroy_backchannel(xprt, UINT_MAX);
2099 
2100     /*
2101      * Tear down transport state and free the rpc_xprt
2102      */
2103     xprt->ops->destroy(xprt);
2104 }
2105 
2106 /**
2107  * xprt_destroy - destroy an RPC transport, killing off all requests.
2108  * @xprt: transport to destroy
2109  *
2110  */
2111 static void xprt_destroy(struct rpc_xprt *xprt)
2112 {
2113     /*
2114      * Exclude transport connect/disconnect handlers and autoclose
2115      */
2116     wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE);
2117 
2118     /*
2119      * xprt_schedule_autodisconnect() can run after XPRT_LOCKED
2120      * is cleared.  We use ->transport_lock to ensure the mod_timer()
2121      * can only run *before* del_time_sync(), never after.
2122      */
2123     spin_lock(&xprt->transport_lock);
2124     del_timer_sync(&xprt->timer);
2125     spin_unlock(&xprt->transport_lock);
2126 
2127     /*
2128      * Destroy sockets etc from the system workqueue so they can
2129      * safely flush receive work running on rpciod.
2130      */
2131     INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb);
2132     schedule_work(&xprt->task_cleanup);
2133 }
2134 
2135 static void xprt_destroy_kref(struct kref *kref)
2136 {
2137     xprt_destroy(container_of(kref, struct rpc_xprt, kref));
2138 }
2139 
2140 /**
2141  * xprt_get - return a reference to an RPC transport.
2142  * @xprt: pointer to the transport
2143  *
2144  */
2145 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
2146 {
2147     if (xprt != NULL && kref_get_unless_zero(&xprt->kref))
2148         return xprt;
2149     return NULL;
2150 }
2151 EXPORT_SYMBOL_GPL(xprt_get);
2152 
2153 /**
2154  * xprt_put - release a reference to an RPC transport.
2155  * @xprt: pointer to the transport
2156  *
2157  */
2158 void xprt_put(struct rpc_xprt *xprt)
2159 {
2160     if (xprt != NULL)
2161         kref_put(&xprt->kref, xprt_destroy_kref);
2162 }
2163 EXPORT_SYMBOL_GPL(xprt_put);
2164 
2165 void xprt_set_offline_locked(struct rpc_xprt *xprt, struct rpc_xprt_switch *xps)
2166 {
2167     if (!test_and_set_bit(XPRT_OFFLINE, &xprt->state)) {
2168         spin_lock(&xps->xps_lock);
2169         xps->xps_nactive--;
2170         spin_unlock(&xps->xps_lock);
2171     }
2172 }
2173 
2174 void xprt_set_online_locked(struct rpc_xprt *xprt, struct rpc_xprt_switch *xps)
2175 {
2176     if (test_and_clear_bit(XPRT_OFFLINE, &xprt->state)) {
2177         spin_lock(&xps->xps_lock);
2178         xps->xps_nactive++;
2179         spin_unlock(&xps->xps_lock);
2180     }
2181 }
2182 
2183 void xprt_delete_locked(struct rpc_xprt *xprt, struct rpc_xprt_switch *xps)
2184 {
2185     if (test_and_set_bit(XPRT_REMOVE, &xprt->state))
2186         return;
2187 
2188     xprt_force_disconnect(xprt);
2189     if (!test_bit(XPRT_CONNECTED, &xprt->state))
2190         return;
2191 
2192     if (!xprt->sending.qlen && !xprt->pending.qlen &&
2193         !xprt->backlog.qlen && !atomic_long_read(&xprt->queuelen))
2194         rpc_xprt_switch_remove_xprt(xps, xprt, true);
2195 }