0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 #include <linux/tcp.h>
0011 #include <linux/slab.h>
0012 #include <linux/sunrpc/xprt.h>
0013 #include <linux/export.h>
0014 #include <linux/sunrpc/bc_xprt.h>
0015
0016 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
0017 #define RPCDBG_FACILITY RPCDBG_TRANS
0018 #endif
0019
0020 #define BC_MAX_SLOTS 64U
0021
0022 unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
0023 {
0024 return BC_MAX_SLOTS;
0025 }
0026
0027
0028
0029
0030
0031 static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
0032 {
0033 return xprt->bc_alloc_count < xprt->bc_alloc_max;
0034 }
0035
0036
0037
0038
0039
0040 static void xprt_free_allocation(struct rpc_rqst *req)
0041 {
0042 struct xdr_buf *xbufp;
0043
0044 dprintk("RPC: free allocations for req= %p\n", req);
0045 WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
0046 xbufp = &req->rq_rcv_buf;
0047 free_page((unsigned long)xbufp->head[0].iov_base);
0048 xbufp = &req->rq_snd_buf;
0049 free_page((unsigned long)xbufp->head[0].iov_base);
0050 kfree(req);
0051 }
0052
0053 static void xprt_bc_reinit_xdr_buf(struct xdr_buf *buf)
0054 {
0055 buf->head[0].iov_len = PAGE_SIZE;
0056 buf->tail[0].iov_len = 0;
0057 buf->pages = NULL;
0058 buf->page_len = 0;
0059 buf->flags = 0;
0060 buf->len = 0;
0061 buf->buflen = PAGE_SIZE;
0062 }
0063
0064 static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
0065 {
0066 struct page *page;
0067
0068 page = alloc_page(gfp_flags);
0069 if (page == NULL)
0070 return -ENOMEM;
0071 xdr_buf_init(buf, page_address(page), PAGE_SIZE);
0072 return 0;
0073 }
0074
0075 static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt)
0076 {
0077 gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN;
0078 struct rpc_rqst *req;
0079
0080
0081 req = kzalloc(sizeof(*req), gfp_flags);
0082 if (req == NULL)
0083 return NULL;
0084
0085 req->rq_xprt = xprt;
0086 INIT_LIST_HEAD(&req->rq_bc_list);
0087
0088
0089 if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) {
0090 printk(KERN_ERR "Failed to create bc receive xbuf\n");
0091 goto out_free;
0092 }
0093 req->rq_rcv_buf.len = PAGE_SIZE;
0094
0095
0096 if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) {
0097 printk(KERN_ERR "Failed to create bc snd xbuf\n");
0098 goto out_free;
0099 }
0100 return req;
0101 out_free:
0102 xprt_free_allocation(req);
0103 return NULL;
0104 }
0105
0106
0107
0108
0109
0110
0111
0112
0113
0114
0115
0116
0117
0118
0119
0120
0121
0122
0123
0124 int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
0125 {
0126 if (!xprt->ops->bc_setup)
0127 return 0;
0128 return xprt->ops->bc_setup(xprt, min_reqs);
0129 }
0130 EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
0131
0132 int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
0133 {
0134 struct rpc_rqst *req;
0135 struct list_head tmp_list;
0136 int i;
0137
0138 dprintk("RPC: setup backchannel transport\n");
0139
0140 if (min_reqs > BC_MAX_SLOTS)
0141 min_reqs = BC_MAX_SLOTS;
0142
0143
0144
0145
0146
0147
0148
0149
0150
0151 INIT_LIST_HEAD(&tmp_list);
0152 for (i = 0; i < min_reqs; i++) {
0153
0154 req = xprt_alloc_bc_req(xprt);
0155 if (req == NULL) {
0156 printk(KERN_ERR "Failed to create bc rpc_rqst\n");
0157 goto out_free;
0158 }
0159
0160
0161 dprintk("RPC: adding req= %p\n", req);
0162 list_add(&req->rq_bc_pa_list, &tmp_list);
0163 }
0164
0165
0166
0167
0168 spin_lock(&xprt->bc_pa_lock);
0169 list_splice(&tmp_list, &xprt->bc_pa_list);
0170 xprt->bc_alloc_count += min_reqs;
0171 xprt->bc_alloc_max += min_reqs;
0172 atomic_add(min_reqs, &xprt->bc_slot_count);
0173 spin_unlock(&xprt->bc_pa_lock);
0174
0175 dprintk("RPC: setup backchannel transport done\n");
0176 return 0;
0177
0178 out_free:
0179
0180
0181
0182 while (!list_empty(&tmp_list)) {
0183 req = list_first_entry(&tmp_list,
0184 struct rpc_rqst,
0185 rq_bc_pa_list);
0186 list_del(&req->rq_bc_pa_list);
0187 xprt_free_allocation(req);
0188 }
0189
0190 dprintk("RPC: setup backchannel transport failed\n");
0191 return -ENOMEM;
0192 }
0193
0194
0195
0196
0197
0198
0199
0200
0201
0202
0203 void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
0204 {
0205 if (xprt->ops->bc_destroy)
0206 xprt->ops->bc_destroy(xprt, max_reqs);
0207 }
0208 EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
0209
0210 void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
0211 {
0212 struct rpc_rqst *req = NULL, *tmp = NULL;
0213
0214 dprintk("RPC: destroy backchannel transport\n");
0215
0216 if (max_reqs == 0)
0217 goto out;
0218
0219 spin_lock_bh(&xprt->bc_pa_lock);
0220 xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max);
0221 list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
0222 dprintk("RPC: req=%p\n", req);
0223 list_del(&req->rq_bc_pa_list);
0224 xprt_free_allocation(req);
0225 xprt->bc_alloc_count--;
0226 atomic_dec(&xprt->bc_slot_count);
0227 if (--max_reqs == 0)
0228 break;
0229 }
0230 spin_unlock_bh(&xprt->bc_pa_lock);
0231
0232 out:
0233 dprintk("RPC: backchannel list empty= %s\n",
0234 list_empty(&xprt->bc_pa_list) ? "true" : "false");
0235 }
0236
0237 static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
0238 struct rpc_rqst *new)
0239 {
0240 struct rpc_rqst *req = NULL;
0241
0242 dprintk("RPC: allocate a backchannel request\n");
0243 if (list_empty(&xprt->bc_pa_list)) {
0244 if (!new)
0245 goto not_found;
0246 if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS)
0247 goto not_found;
0248 list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
0249 xprt->bc_alloc_count++;
0250 atomic_inc(&xprt->bc_slot_count);
0251 }
0252 req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
0253 rq_bc_pa_list);
0254 req->rq_reply_bytes_recvd = 0;
0255 memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
0256 sizeof(req->rq_private_buf));
0257 req->rq_xid = xid;
0258 req->rq_connect_cookie = xprt->connect_cookie;
0259 dprintk("RPC: backchannel req=%p\n", req);
0260 not_found:
0261 return req;
0262 }
0263
0264
0265
0266
0267
0268 void xprt_free_bc_request(struct rpc_rqst *req)
0269 {
0270 struct rpc_xprt *xprt = req->rq_xprt;
0271
0272 xprt->ops->bc_free_rqst(req);
0273 }
0274
0275 void xprt_free_bc_rqst(struct rpc_rqst *req)
0276 {
0277 struct rpc_xprt *xprt = req->rq_xprt;
0278
0279 dprintk("RPC: free backchannel req=%p\n", req);
0280
0281 req->rq_connect_cookie = xprt->connect_cookie - 1;
0282 smp_mb__before_atomic();
0283 clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
0284 smp_mb__after_atomic();
0285
0286
0287
0288
0289
0290 spin_lock_bh(&xprt->bc_pa_lock);
0291 if (xprt_need_to_requeue(xprt)) {
0292 xprt_bc_reinit_xdr_buf(&req->rq_snd_buf);
0293 xprt_bc_reinit_xdr_buf(&req->rq_rcv_buf);
0294 req->rq_rcv_buf.len = PAGE_SIZE;
0295 list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
0296 xprt->bc_alloc_count++;
0297 atomic_inc(&xprt->bc_slot_count);
0298 req = NULL;
0299 }
0300 spin_unlock_bh(&xprt->bc_pa_lock);
0301 if (req != NULL) {
0302
0303
0304
0305
0306
0307
0308 dprintk("RPC: Last session removed req=%p\n", req);
0309 xprt_free_allocation(req);
0310 }
0311 xprt_put(xprt);
0312 }
0313
0314
0315
0316
0317
0318
0319
0320
0321
0322
0323
0324
0325 struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid)
0326 {
0327 struct rpc_rqst *req, *new = NULL;
0328
0329 do {
0330 spin_lock(&xprt->bc_pa_lock);
0331 list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
0332 if (req->rq_connect_cookie != xprt->connect_cookie)
0333 continue;
0334 if (req->rq_xid == xid)
0335 goto found;
0336 }
0337 req = xprt_get_bc_request(xprt, xid, new);
0338 found:
0339 spin_unlock(&xprt->bc_pa_lock);
0340 if (new) {
0341 if (req != new)
0342 xprt_free_allocation(new);
0343 break;
0344 } else if (req)
0345 break;
0346 new = xprt_alloc_bc_req(xprt);
0347 } while (new);
0348 return req;
0349 }
0350
0351
0352
0353
0354
0355
0356
0357 void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
0358 {
0359 struct rpc_xprt *xprt = req->rq_xprt;
0360 struct svc_serv *bc_serv = xprt->bc_serv;
0361
0362 spin_lock(&xprt->bc_pa_lock);
0363 list_del(&req->rq_bc_pa_list);
0364 xprt->bc_alloc_count--;
0365 spin_unlock(&xprt->bc_pa_lock);
0366
0367 req->rq_private_buf.len = copied;
0368 set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
0369
0370 dprintk("RPC: add callback request to list\n");
0371 xprt_get(xprt);
0372 spin_lock(&bc_serv->sv_cb_lock);
0373 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
0374 wake_up(&bc_serv->sv_cb_waitq);
0375 spin_unlock(&bc_serv->sv_cb_lock);
0376 }