Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-only
0002 /******************************************************************************
0003 
0004 (c) 2007 Network Appliance, Inc.  All Rights Reserved.
0005 (c) 2009 NetApp.  All Rights Reserved.
0006 
0007 
0008 ******************************************************************************/
0009 
0010 #include <linux/tcp.h>
0011 #include <linux/slab.h>
0012 #include <linux/sunrpc/xprt.h>
0013 #include <linux/export.h>
0014 #include <linux/sunrpc/bc_xprt.h>
0015 
0016 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
0017 #define RPCDBG_FACILITY RPCDBG_TRANS
0018 #endif
0019 
0020 #define BC_MAX_SLOTS    64U
0021 
0022 unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
0023 {
0024     return BC_MAX_SLOTS;
0025 }
0026 
0027 /*
0028  * Helper routines that track the number of preallocation elements
0029  * on the transport.
0030  */
0031 static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
0032 {
0033     return xprt->bc_alloc_count < xprt->bc_alloc_max;
0034 }
0035 
0036 /*
0037  * Free the preallocated rpc_rqst structure and the memory
0038  * buffers hanging off of it.
0039  */
0040 static void xprt_free_allocation(struct rpc_rqst *req)
0041 {
0042     struct xdr_buf *xbufp;
0043 
0044     dprintk("RPC:        free allocations for req= %p\n", req);
0045     WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
0046     xbufp = &req->rq_rcv_buf;
0047     free_page((unsigned long)xbufp->head[0].iov_base);
0048     xbufp = &req->rq_snd_buf;
0049     free_page((unsigned long)xbufp->head[0].iov_base);
0050     kfree(req);
0051 }
0052 
0053 static void xprt_bc_reinit_xdr_buf(struct xdr_buf *buf)
0054 {
0055     buf->head[0].iov_len = PAGE_SIZE;
0056     buf->tail[0].iov_len = 0;
0057     buf->pages = NULL;
0058     buf->page_len = 0;
0059     buf->flags = 0;
0060     buf->len = 0;
0061     buf->buflen = PAGE_SIZE;
0062 }
0063 
0064 static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
0065 {
0066     struct page *page;
0067     /* Preallocate one XDR receive buffer */
0068     page = alloc_page(gfp_flags);
0069     if (page == NULL)
0070         return -ENOMEM;
0071     xdr_buf_init(buf, page_address(page), PAGE_SIZE);
0072     return 0;
0073 }
0074 
0075 static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt)
0076 {
0077     gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN;
0078     struct rpc_rqst *req;
0079 
0080     /* Pre-allocate one backchannel rpc_rqst */
0081     req = kzalloc(sizeof(*req), gfp_flags);
0082     if (req == NULL)
0083         return NULL;
0084 
0085     req->rq_xprt = xprt;
0086     INIT_LIST_HEAD(&req->rq_bc_list);
0087 
0088     /* Preallocate one XDR receive buffer */
0089     if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) {
0090         printk(KERN_ERR "Failed to create bc receive xbuf\n");
0091         goto out_free;
0092     }
0093     req->rq_rcv_buf.len = PAGE_SIZE;
0094 
0095     /* Preallocate one XDR send buffer */
0096     if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) {
0097         printk(KERN_ERR "Failed to create bc snd xbuf\n");
0098         goto out_free;
0099     }
0100     return req;
0101 out_free:
0102     xprt_free_allocation(req);
0103     return NULL;
0104 }
0105 
0106 /*
0107  * Preallocate up to min_reqs structures and related buffers for use
0108  * by the backchannel.  This function can be called multiple times
0109  * when creating new sessions that use the same rpc_xprt.  The
0110  * preallocated buffers are added to the pool of resources used by
0111  * the rpc_xprt.  Any one of these resources may be used by an
0112  * incoming callback request.  It's up to the higher levels in the
0113  * stack to enforce that the maximum number of session slots is not
0114  * being exceeded.
0115  *
0116  * Some callback arguments can be large.  For example, a pNFS server
0117  * using multiple deviceids.  The list can be unbound, but the client
0118  * has the ability to tell the server the maximum size of the callback
0119  * requests.  Each deviceID is 16 bytes, so allocate one page
0120  * for the arguments to have enough room to receive a number of these
0121  * deviceIDs.  The NFS client indicates to the pNFS server that its
0122  * callback requests can be up to 4096 bytes in size.
0123  */
0124 int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
0125 {
0126     if (!xprt->ops->bc_setup)
0127         return 0;
0128     return xprt->ops->bc_setup(xprt, min_reqs);
0129 }
0130 EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
0131 
0132 int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
0133 {
0134     struct rpc_rqst *req;
0135     struct list_head tmp_list;
0136     int i;
0137 
0138     dprintk("RPC:       setup backchannel transport\n");
0139 
0140     if (min_reqs > BC_MAX_SLOTS)
0141         min_reqs = BC_MAX_SLOTS;
0142 
0143     /*
0144      * We use a temporary list to keep track of the preallocated
0145      * buffers.  Once we're done building the list we splice it
0146      * into the backchannel preallocation list off of the rpc_xprt
0147      * struct.  This helps minimize the amount of time the list
0148      * lock is held on the rpc_xprt struct.  It also makes cleanup
0149      * easier in case of memory allocation errors.
0150      */
0151     INIT_LIST_HEAD(&tmp_list);
0152     for (i = 0; i < min_reqs; i++) {
0153         /* Pre-allocate one backchannel rpc_rqst */
0154         req = xprt_alloc_bc_req(xprt);
0155         if (req == NULL) {
0156             printk(KERN_ERR "Failed to create bc rpc_rqst\n");
0157             goto out_free;
0158         }
0159 
0160         /* Add the allocated buffer to the tmp list */
0161         dprintk("RPC:       adding req= %p\n", req);
0162         list_add(&req->rq_bc_pa_list, &tmp_list);
0163     }
0164 
0165     /*
0166      * Add the temporary list to the backchannel preallocation list
0167      */
0168     spin_lock(&xprt->bc_pa_lock);
0169     list_splice(&tmp_list, &xprt->bc_pa_list);
0170     xprt->bc_alloc_count += min_reqs;
0171     xprt->bc_alloc_max += min_reqs;
0172     atomic_add(min_reqs, &xprt->bc_slot_count);
0173     spin_unlock(&xprt->bc_pa_lock);
0174 
0175     dprintk("RPC:       setup backchannel transport done\n");
0176     return 0;
0177 
0178 out_free:
0179     /*
0180      * Memory allocation failed, free the temporary list
0181      */
0182     while (!list_empty(&tmp_list)) {
0183         req = list_first_entry(&tmp_list,
0184                 struct rpc_rqst,
0185                 rq_bc_pa_list);
0186         list_del(&req->rq_bc_pa_list);
0187         xprt_free_allocation(req);
0188     }
0189 
0190     dprintk("RPC:       setup backchannel transport failed\n");
0191     return -ENOMEM;
0192 }
0193 
0194 /**
0195  * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
0196  * @xprt:   the transport holding the preallocated strucures
0197  * @max_reqs:   the maximum number of preallocated structures to destroy
0198  *
0199  * Since these structures may have been allocated by multiple calls
0200  * to xprt_setup_backchannel, we only destroy up to the maximum number
0201  * of reqs specified by the caller.
0202  */
0203 void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
0204 {
0205     if (xprt->ops->bc_destroy)
0206         xprt->ops->bc_destroy(xprt, max_reqs);
0207 }
0208 EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
0209 
0210 void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
0211 {
0212     struct rpc_rqst *req = NULL, *tmp = NULL;
0213 
0214     dprintk("RPC:        destroy backchannel transport\n");
0215 
0216     if (max_reqs == 0)
0217         goto out;
0218 
0219     spin_lock_bh(&xprt->bc_pa_lock);
0220     xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max);
0221     list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
0222         dprintk("RPC:        req=%p\n", req);
0223         list_del(&req->rq_bc_pa_list);
0224         xprt_free_allocation(req);
0225         xprt->bc_alloc_count--;
0226         atomic_dec(&xprt->bc_slot_count);
0227         if (--max_reqs == 0)
0228             break;
0229     }
0230     spin_unlock_bh(&xprt->bc_pa_lock);
0231 
0232 out:
0233     dprintk("RPC:        backchannel list empty= %s\n",
0234         list_empty(&xprt->bc_pa_list) ? "true" : "false");
0235 }
0236 
0237 static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
0238         struct rpc_rqst *new)
0239 {
0240     struct rpc_rqst *req = NULL;
0241 
0242     dprintk("RPC:       allocate a backchannel request\n");
0243     if (list_empty(&xprt->bc_pa_list)) {
0244         if (!new)
0245             goto not_found;
0246         if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS)
0247             goto not_found;
0248         list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
0249         xprt->bc_alloc_count++;
0250         atomic_inc(&xprt->bc_slot_count);
0251     }
0252     req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
0253                 rq_bc_pa_list);
0254     req->rq_reply_bytes_recvd = 0;
0255     memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
0256             sizeof(req->rq_private_buf));
0257     req->rq_xid = xid;
0258     req->rq_connect_cookie = xprt->connect_cookie;
0259     dprintk("RPC:       backchannel req=%p\n", req);
0260 not_found:
0261     return req;
0262 }
0263 
0264 /*
0265  * Return the preallocated rpc_rqst structure and XDR buffers
0266  * associated with this rpc_task.
0267  */
0268 void xprt_free_bc_request(struct rpc_rqst *req)
0269 {
0270     struct rpc_xprt *xprt = req->rq_xprt;
0271 
0272     xprt->ops->bc_free_rqst(req);
0273 }
0274 
0275 void xprt_free_bc_rqst(struct rpc_rqst *req)
0276 {
0277     struct rpc_xprt *xprt = req->rq_xprt;
0278 
0279     dprintk("RPC:       free backchannel req=%p\n", req);
0280 
0281     req->rq_connect_cookie = xprt->connect_cookie - 1;
0282     smp_mb__before_atomic();
0283     clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
0284     smp_mb__after_atomic();
0285 
0286     /*
0287      * Return it to the list of preallocations so that it
0288      * may be reused by a new callback request.
0289      */
0290     spin_lock_bh(&xprt->bc_pa_lock);
0291     if (xprt_need_to_requeue(xprt)) {
0292         xprt_bc_reinit_xdr_buf(&req->rq_snd_buf);
0293         xprt_bc_reinit_xdr_buf(&req->rq_rcv_buf);
0294         req->rq_rcv_buf.len = PAGE_SIZE;
0295         list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
0296         xprt->bc_alloc_count++;
0297         atomic_inc(&xprt->bc_slot_count);
0298         req = NULL;
0299     }
0300     spin_unlock_bh(&xprt->bc_pa_lock);
0301     if (req != NULL) {
0302         /*
0303          * The last remaining session was destroyed while this
0304          * entry was in use.  Free the entry and don't attempt
0305          * to add back to the list because there is no need to
0306          * have anymore preallocated entries.
0307          */
0308         dprintk("RPC:       Last session removed req=%p\n", req);
0309         xprt_free_allocation(req);
0310     }
0311     xprt_put(xprt);
0312 }
0313 
0314 /*
0315  * One or more rpc_rqst structure have been preallocated during the
0316  * backchannel setup.  Buffer space for the send and private XDR buffers
0317  * has been preallocated as well.  Use xprt_alloc_bc_request to allocate
0318  * to this request.  Use xprt_free_bc_request to return it.
0319  *
0320  * We know that we're called in soft interrupt context, grab the spin_lock
0321  * since there is no need to grab the bottom half spin_lock.
0322  *
0323  * Return an available rpc_rqst, otherwise NULL if non are available.
0324  */
0325 struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid)
0326 {
0327     struct rpc_rqst *req, *new = NULL;
0328 
0329     do {
0330         spin_lock(&xprt->bc_pa_lock);
0331         list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
0332             if (req->rq_connect_cookie != xprt->connect_cookie)
0333                 continue;
0334             if (req->rq_xid == xid)
0335                 goto found;
0336         }
0337         req = xprt_get_bc_request(xprt, xid, new);
0338 found:
0339         spin_unlock(&xprt->bc_pa_lock);
0340         if (new) {
0341             if (req != new)
0342                 xprt_free_allocation(new);
0343             break;
0344         } else if (req)
0345             break;
0346         new = xprt_alloc_bc_req(xprt);
0347     } while (new);
0348     return req;
0349 }
0350 
0351 /*
0352  * Add callback request to callback list.  The callback
0353  * service sleeps on the sv_cb_waitq waiting for new
0354  * requests.  Wake it up after adding enqueing the
0355  * request.
0356  */
0357 void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
0358 {
0359     struct rpc_xprt *xprt = req->rq_xprt;
0360     struct svc_serv *bc_serv = xprt->bc_serv;
0361 
0362     spin_lock(&xprt->bc_pa_lock);
0363     list_del(&req->rq_bc_pa_list);
0364     xprt->bc_alloc_count--;
0365     spin_unlock(&xprt->bc_pa_lock);
0366 
0367     req->rq_private_buf.len = copied;
0368     set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
0369 
0370     dprintk("RPC:       add callback request to list\n");
0371     xprt_get(xprt);
0372     spin_lock(&bc_serv->sv_cb_lock);
0373     list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
0374     wake_up(&bc_serv->sv_cb_waitq);
0375     spin_unlock(&bc_serv->sv_cb_lock);
0376 }