0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041 #include <linux/module.h>
0042
0043 #include <linux/types.h>
0044 #include <linux/interrupt.h>
0045 #include <linux/workqueue.h>
0046 #include <linux/net.h>
0047 #include <linux/ktime.h>
0048
0049 #include <linux/sunrpc/clnt.h>
0050 #include <linux/sunrpc/metrics.h>
0051 #include <linux/sunrpc/bc_xprt.h>
0052 #include <linux/rcupdate.h>
0053 #include <linux/sched/mm.h>
0054
0055 #include <trace/events/sunrpc.h>
0056
0057 #include "sunrpc.h"
0058 #include "sysfs.h"
0059 #include "fail.h"
0060
0061
0062
0063
0064
0065 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
0066 # define RPCDBG_FACILITY RPCDBG_XPRT
0067 #endif
0068
0069
0070
0071
0072 static void xprt_init(struct rpc_xprt *xprt, struct net *net);
0073 static __be32 xprt_alloc_xid(struct rpc_xprt *xprt);
0074 static void xprt_destroy(struct rpc_xprt *xprt);
0075 static void xprt_request_init(struct rpc_task *task);
0076 static int xprt_request_prepare(struct rpc_rqst *req, struct xdr_buf *buf);
0077
0078 static DEFINE_SPINLOCK(xprt_list_lock);
0079 static LIST_HEAD(xprt_list);
0080
0081 static unsigned long xprt_request_timeout(const struct rpc_rqst *req)
0082 {
0083 unsigned long timeout = jiffies + req->rq_timeout;
0084
0085 if (time_before(timeout, req->rq_majortimeo))
0086 return timeout;
0087 return req->rq_majortimeo;
0088 }
0089
0090
0091
0092
0093
0094
0095
0096
0097
0098
0099
0100
0101
0102 int xprt_register_transport(struct xprt_class *transport)
0103 {
0104 struct xprt_class *t;
0105 int result;
0106
0107 result = -EEXIST;
0108 spin_lock(&xprt_list_lock);
0109 list_for_each_entry(t, &xprt_list, list) {
0110
0111 if (t->ident == transport->ident)
0112 goto out;
0113 }
0114
0115 list_add_tail(&transport->list, &xprt_list);
0116 printk(KERN_INFO "RPC: Registered %s transport module.\n",
0117 transport->name);
0118 result = 0;
0119
0120 out:
0121 spin_unlock(&xprt_list_lock);
0122 return result;
0123 }
0124 EXPORT_SYMBOL_GPL(xprt_register_transport);
0125
0126
0127
0128
0129
0130
0131
0132
0133
0134 int xprt_unregister_transport(struct xprt_class *transport)
0135 {
0136 struct xprt_class *t;
0137 int result;
0138
0139 result = 0;
0140 spin_lock(&xprt_list_lock);
0141 list_for_each_entry(t, &xprt_list, list) {
0142 if (t == transport) {
0143 printk(KERN_INFO
0144 "RPC: Unregistered %s transport module.\n",
0145 transport->name);
0146 list_del_init(&transport->list);
0147 goto out;
0148 }
0149 }
0150 result = -ENOENT;
0151
0152 out:
0153 spin_unlock(&xprt_list_lock);
0154 return result;
0155 }
0156 EXPORT_SYMBOL_GPL(xprt_unregister_transport);
0157
0158 static void
0159 xprt_class_release(const struct xprt_class *t)
0160 {
0161 module_put(t->owner);
0162 }
0163
0164 static const struct xprt_class *
0165 xprt_class_find_by_ident_locked(int ident)
0166 {
0167 const struct xprt_class *t;
0168
0169 list_for_each_entry(t, &xprt_list, list) {
0170 if (t->ident != ident)
0171 continue;
0172 if (!try_module_get(t->owner))
0173 continue;
0174 return t;
0175 }
0176 return NULL;
0177 }
0178
0179 static const struct xprt_class *
0180 xprt_class_find_by_ident(int ident)
0181 {
0182 const struct xprt_class *t;
0183
0184 spin_lock(&xprt_list_lock);
0185 t = xprt_class_find_by_ident_locked(ident);
0186 spin_unlock(&xprt_list_lock);
0187 return t;
0188 }
0189
0190 static const struct xprt_class *
0191 xprt_class_find_by_netid_locked(const char *netid)
0192 {
0193 const struct xprt_class *t;
0194 unsigned int i;
0195
0196 list_for_each_entry(t, &xprt_list, list) {
0197 for (i = 0; t->netid[i][0] != '\0'; i++) {
0198 if (strcmp(t->netid[i], netid) != 0)
0199 continue;
0200 if (!try_module_get(t->owner))
0201 continue;
0202 return t;
0203 }
0204 }
0205 return NULL;
0206 }
0207
0208 static const struct xprt_class *
0209 xprt_class_find_by_netid(const char *netid)
0210 {
0211 const struct xprt_class *t;
0212
0213 spin_lock(&xprt_list_lock);
0214 t = xprt_class_find_by_netid_locked(netid);
0215 if (!t) {
0216 spin_unlock(&xprt_list_lock);
0217 request_module("rpc%s", netid);
0218 spin_lock(&xprt_list_lock);
0219 t = xprt_class_find_by_netid_locked(netid);
0220 }
0221 spin_unlock(&xprt_list_lock);
0222 return t;
0223 }
0224
0225
0226
0227
0228
0229
0230
0231
0232
0233 int xprt_find_transport_ident(const char *netid)
0234 {
0235 const struct xprt_class *t;
0236 int ret;
0237
0238 t = xprt_class_find_by_netid(netid);
0239 if (!t)
0240 return -ENOENT;
0241 ret = t->ident;
0242 xprt_class_release(t);
0243 return ret;
0244 }
0245 EXPORT_SYMBOL_GPL(xprt_find_transport_ident);
0246
0247 static void xprt_clear_locked(struct rpc_xprt *xprt)
0248 {
0249 xprt->snd_task = NULL;
0250 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state))
0251 clear_bit_unlock(XPRT_LOCKED, &xprt->state);
0252 else
0253 queue_work(xprtiod_workqueue, &xprt->task_cleanup);
0254 }
0255
0256
0257
0258
0259
0260
0261
0262
0263
0264
0265 int xprt_reserve_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
0266 {
0267 struct rpc_rqst *req = task->tk_rqstp;
0268
0269 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
0270 if (task == xprt->snd_task)
0271 goto out_locked;
0272 goto out_sleep;
0273 }
0274 if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
0275 goto out_unlock;
0276 xprt->snd_task = task;
0277
0278 out_locked:
0279 trace_xprt_reserve_xprt(xprt, task);
0280 return 1;
0281
0282 out_unlock:
0283 xprt_clear_locked(xprt);
0284 out_sleep:
0285 task->tk_status = -EAGAIN;
0286 if (RPC_IS_SOFT(task))
0287 rpc_sleep_on_timeout(&xprt->sending, task, NULL,
0288 xprt_request_timeout(req));
0289 else
0290 rpc_sleep_on(&xprt->sending, task, NULL);
0291 return 0;
0292 }
0293 EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
0294
0295 static bool
0296 xprt_need_congestion_window_wait(struct rpc_xprt *xprt)
0297 {
0298 return test_bit(XPRT_CWND_WAIT, &xprt->state);
0299 }
0300
0301 static void
0302 xprt_set_congestion_window_wait(struct rpc_xprt *xprt)
0303 {
0304 if (!list_empty(&xprt->xmit_queue)) {
0305
0306 if (list_first_entry(&xprt->xmit_queue, struct rpc_rqst,
0307 rq_xmit)->rq_cong)
0308 return;
0309 }
0310 set_bit(XPRT_CWND_WAIT, &xprt->state);
0311 }
0312
0313 static void
0314 xprt_test_and_clear_congestion_window_wait(struct rpc_xprt *xprt)
0315 {
0316 if (!RPCXPRT_CONGESTED(xprt))
0317 clear_bit(XPRT_CWND_WAIT, &xprt->state);
0318 }
0319
0320
0321
0322
0323
0324
0325
0326
0327
0328
0329 int xprt_reserve_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
0330 {
0331 struct rpc_rqst *req = task->tk_rqstp;
0332
0333 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
0334 if (task == xprt->snd_task)
0335 goto out_locked;
0336 goto out_sleep;
0337 }
0338 if (req == NULL) {
0339 xprt->snd_task = task;
0340 goto out_locked;
0341 }
0342 if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
0343 goto out_unlock;
0344 if (!xprt_need_congestion_window_wait(xprt)) {
0345 xprt->snd_task = task;
0346 goto out_locked;
0347 }
0348 out_unlock:
0349 xprt_clear_locked(xprt);
0350 out_sleep:
0351 task->tk_status = -EAGAIN;
0352 if (RPC_IS_SOFT(task))
0353 rpc_sleep_on_timeout(&xprt->sending, task, NULL,
0354 xprt_request_timeout(req));
0355 else
0356 rpc_sleep_on(&xprt->sending, task, NULL);
0357 return 0;
0358 out_locked:
0359 trace_xprt_reserve_cong(xprt, task);
0360 return 1;
0361 }
0362 EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
0363
0364 static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
0365 {
0366 int retval;
0367
0368 if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task)
0369 return 1;
0370 spin_lock(&xprt->transport_lock);
0371 retval = xprt->ops->reserve_xprt(xprt, task);
0372 spin_unlock(&xprt->transport_lock);
0373 return retval;
0374 }
0375
0376 static bool __xprt_lock_write_func(struct rpc_task *task, void *data)
0377 {
0378 struct rpc_xprt *xprt = data;
0379
0380 xprt->snd_task = task;
0381 return true;
0382 }
0383
0384 static void __xprt_lock_write_next(struct rpc_xprt *xprt)
0385 {
0386 if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
0387 return;
0388 if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
0389 goto out_unlock;
0390 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
0391 __xprt_lock_write_func, xprt))
0392 return;
0393 out_unlock:
0394 xprt_clear_locked(xprt);
0395 }
0396
0397 static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
0398 {
0399 if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
0400 return;
0401 if (test_bit(XPRT_WRITE_SPACE, &xprt->state))
0402 goto out_unlock;
0403 if (xprt_need_congestion_window_wait(xprt))
0404 goto out_unlock;
0405 if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
0406 __xprt_lock_write_func, xprt))
0407 return;
0408 out_unlock:
0409 xprt_clear_locked(xprt);
0410 }
0411
0412
0413
0414
0415
0416
0417
0418
0419 void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
0420 {
0421 if (xprt->snd_task == task) {
0422 xprt_clear_locked(xprt);
0423 __xprt_lock_write_next(xprt);
0424 }
0425 trace_xprt_release_xprt(xprt, task);
0426 }
0427 EXPORT_SYMBOL_GPL(xprt_release_xprt);
0428
0429
0430
0431
0432
0433
0434
0435
0436
0437 void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
0438 {
0439 if (xprt->snd_task == task) {
0440 xprt_clear_locked(xprt);
0441 __xprt_lock_write_next_cong(xprt);
0442 }
0443 trace_xprt_release_cong(xprt, task);
0444 }
0445 EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
0446
0447 void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
0448 {
0449 if (xprt->snd_task != task)
0450 return;
0451 spin_lock(&xprt->transport_lock);
0452 xprt->ops->release_xprt(xprt, task);
0453 spin_unlock(&xprt->transport_lock);
0454 }
0455
0456
0457
0458
0459
0460 static int
0461 __xprt_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
0462 {
0463 if (req->rq_cong)
0464 return 1;
0465 trace_xprt_get_cong(xprt, req->rq_task);
0466 if (RPCXPRT_CONGESTED(xprt)) {
0467 xprt_set_congestion_window_wait(xprt);
0468 return 0;
0469 }
0470 req->rq_cong = 1;
0471 xprt->cong += RPC_CWNDSCALE;
0472 return 1;
0473 }
0474
0475
0476
0477
0478
0479 static void
0480 __xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
0481 {
0482 if (!req->rq_cong)
0483 return;
0484 req->rq_cong = 0;
0485 xprt->cong -= RPC_CWNDSCALE;
0486 xprt_test_and_clear_congestion_window_wait(xprt);
0487 trace_xprt_put_cong(xprt, req->rq_task);
0488 __xprt_lock_write_next_cong(xprt);
0489 }
0490
0491
0492
0493
0494
0495
0496
0497
0498 bool
0499 xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
0500 {
0501 bool ret = false;
0502
0503 if (req->rq_cong)
0504 return true;
0505 spin_lock(&xprt->transport_lock);
0506 ret = __xprt_get_cong(xprt, req) != 0;
0507 spin_unlock(&xprt->transport_lock);
0508 return ret;
0509 }
0510 EXPORT_SYMBOL_GPL(xprt_request_get_cong);
0511
0512
0513
0514
0515
0516
0517
0518 void xprt_release_rqst_cong(struct rpc_task *task)
0519 {
0520 struct rpc_rqst *req = task->tk_rqstp;
0521
0522 __xprt_put_cong(req->rq_xprt, req);
0523 }
0524 EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
0525
0526 static void xprt_clear_congestion_window_wait_locked(struct rpc_xprt *xprt)
0527 {
0528 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state))
0529 __xprt_lock_write_next_cong(xprt);
0530 }
0531
0532
0533
0534
0535
0536 static void
0537 xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
0538 {
0539 if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
0540 spin_lock(&xprt->transport_lock);
0541 __xprt_lock_write_next_cong(xprt);
0542 spin_unlock(&xprt->transport_lock);
0543 }
0544 }
0545
0546
0547
0548
0549
0550
0551
0552
0553
0554
0555
0556
0557
0558
0559
0560
0561
0562 void xprt_adjust_cwnd(struct rpc_xprt *xprt, struct rpc_task *task, int result)
0563 {
0564 struct rpc_rqst *req = task->tk_rqstp;
0565 unsigned long cwnd = xprt->cwnd;
0566
0567 if (result >= 0 && cwnd <= xprt->cong) {
0568
0569
0570 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
0571 if (cwnd > RPC_MAXCWND(xprt))
0572 cwnd = RPC_MAXCWND(xprt);
0573 __xprt_lock_write_next_cong(xprt);
0574 } else if (result == -ETIMEDOUT) {
0575 cwnd >>= 1;
0576 if (cwnd < RPC_CWNDSCALE)
0577 cwnd = RPC_CWNDSCALE;
0578 }
0579 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
0580 xprt->cong, xprt->cwnd, cwnd);
0581 xprt->cwnd = cwnd;
0582 __xprt_put_cong(xprt, req);
0583 }
0584 EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
0585
0586
0587
0588
0589
0590
0591
0592 void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
0593 {
0594 if (status < 0)
0595 rpc_wake_up_status(&xprt->pending, status);
0596 else
0597 rpc_wake_up(&xprt->pending);
0598 }
0599 EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
0600
0601
0602
0603
0604
0605
0606
0607
0608
0609 void xprt_wait_for_buffer_space(struct rpc_xprt *xprt)
0610 {
0611 set_bit(XPRT_WRITE_SPACE, &xprt->state);
0612 }
0613 EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
0614
0615 static bool
0616 xprt_clear_write_space_locked(struct rpc_xprt *xprt)
0617 {
0618 if (test_and_clear_bit(XPRT_WRITE_SPACE, &xprt->state)) {
0619 __xprt_lock_write_next(xprt);
0620 dprintk("RPC: write space: waking waiting task on "
0621 "xprt %p\n", xprt);
0622 return true;
0623 }
0624 return false;
0625 }
0626
0627
0628
0629
0630
0631
0632
0633 bool xprt_write_space(struct rpc_xprt *xprt)
0634 {
0635 bool ret;
0636
0637 if (!test_bit(XPRT_WRITE_SPACE, &xprt->state))
0638 return false;
0639 spin_lock(&xprt->transport_lock);
0640 ret = xprt_clear_write_space_locked(xprt);
0641 spin_unlock(&xprt->transport_lock);
0642 return ret;
0643 }
0644 EXPORT_SYMBOL_GPL(xprt_write_space);
0645
0646 static unsigned long xprt_abs_ktime_to_jiffies(ktime_t abstime)
0647 {
0648 s64 delta = ktime_to_ns(ktime_get() - abstime);
0649 return likely(delta >= 0) ?
0650 jiffies - nsecs_to_jiffies(delta) :
0651 jiffies + nsecs_to_jiffies(-delta);
0652 }
0653
0654 static unsigned long xprt_calc_majortimeo(struct rpc_rqst *req)
0655 {
0656 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
0657 unsigned long majortimeo = req->rq_timeout;
0658
0659 if (to->to_exponential)
0660 majortimeo <<= to->to_retries;
0661 else
0662 majortimeo += to->to_increment * to->to_retries;
0663 if (majortimeo > to->to_maxval || majortimeo == 0)
0664 majortimeo = to->to_maxval;
0665 return majortimeo;
0666 }
0667
0668 static void xprt_reset_majortimeo(struct rpc_rqst *req)
0669 {
0670 req->rq_majortimeo += xprt_calc_majortimeo(req);
0671 }
0672
0673 static void xprt_reset_minortimeo(struct rpc_rqst *req)
0674 {
0675 req->rq_minortimeo += req->rq_timeout;
0676 }
0677
0678 static void xprt_init_majortimeo(struct rpc_task *task, struct rpc_rqst *req)
0679 {
0680 unsigned long time_init;
0681 struct rpc_xprt *xprt = req->rq_xprt;
0682
0683 if (likely(xprt && xprt_connected(xprt)))
0684 time_init = jiffies;
0685 else
0686 time_init = xprt_abs_ktime_to_jiffies(task->tk_start);
0687 req->rq_timeout = task->tk_client->cl_timeout->to_initval;
0688 req->rq_majortimeo = time_init + xprt_calc_majortimeo(req);
0689 req->rq_minortimeo = time_init + req->rq_timeout;
0690 }
0691
0692
0693
0694
0695
0696
0697 int xprt_adjust_timeout(struct rpc_rqst *req)
0698 {
0699 struct rpc_xprt *xprt = req->rq_xprt;
0700 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
0701 int status = 0;
0702
0703 if (time_before(jiffies, req->rq_majortimeo)) {
0704 if (time_before(jiffies, req->rq_minortimeo))
0705 return status;
0706 if (to->to_exponential)
0707 req->rq_timeout <<= 1;
0708 else
0709 req->rq_timeout += to->to_increment;
0710 if (to->to_maxval && req->rq_timeout >= to->to_maxval)
0711 req->rq_timeout = to->to_maxval;
0712 req->rq_retries++;
0713 } else {
0714 req->rq_timeout = to->to_initval;
0715 req->rq_retries = 0;
0716 xprt_reset_majortimeo(req);
0717
0718 spin_lock(&xprt->transport_lock);
0719 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
0720 spin_unlock(&xprt->transport_lock);
0721 status = -ETIMEDOUT;
0722 }
0723 xprt_reset_minortimeo(req);
0724
0725 if (req->rq_timeout == 0) {
0726 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
0727 req->rq_timeout = 5 * HZ;
0728 }
0729 return status;
0730 }
0731
0732 static void xprt_autoclose(struct work_struct *work)
0733 {
0734 struct rpc_xprt *xprt =
0735 container_of(work, struct rpc_xprt, task_cleanup);
0736 unsigned int pflags = memalloc_nofs_save();
0737
0738 trace_xprt_disconnect_auto(xprt);
0739 xprt->connect_cookie++;
0740 smp_mb__before_atomic();
0741 clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
0742 xprt->ops->close(xprt);
0743 xprt_release_write(xprt, NULL);
0744 wake_up_bit(&xprt->state, XPRT_LOCKED);
0745 memalloc_nofs_restore(pflags);
0746 }
0747
0748
0749
0750
0751
0752
0753 void xprt_disconnect_done(struct rpc_xprt *xprt)
0754 {
0755 trace_xprt_disconnect_done(xprt);
0756 spin_lock(&xprt->transport_lock);
0757 xprt_clear_connected(xprt);
0758 xprt_clear_write_space_locked(xprt);
0759 xprt_clear_congestion_window_wait_locked(xprt);
0760 xprt_wake_pending_tasks(xprt, -ENOTCONN);
0761 spin_unlock(&xprt->transport_lock);
0762 }
0763 EXPORT_SYMBOL_GPL(xprt_disconnect_done);
0764
0765
0766
0767
0768
0769 static void xprt_schedule_autoclose_locked(struct rpc_xprt *xprt)
0770 {
0771 if (test_and_set_bit(XPRT_CLOSE_WAIT, &xprt->state))
0772 return;
0773 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
0774 queue_work(xprtiod_workqueue, &xprt->task_cleanup);
0775 else if (xprt->snd_task && !test_bit(XPRT_SND_IS_COOKIE, &xprt->state))
0776 rpc_wake_up_queued_task_set_status(&xprt->pending,
0777 xprt->snd_task, -ENOTCONN);
0778 }
0779
0780
0781
0782
0783
0784
0785 void xprt_force_disconnect(struct rpc_xprt *xprt)
0786 {
0787 trace_xprt_disconnect_force(xprt);
0788
0789
0790 spin_lock(&xprt->transport_lock);
0791 xprt_schedule_autoclose_locked(xprt);
0792 spin_unlock(&xprt->transport_lock);
0793 }
0794 EXPORT_SYMBOL_GPL(xprt_force_disconnect);
0795
0796 static unsigned int
0797 xprt_connect_cookie(struct rpc_xprt *xprt)
0798 {
0799 return READ_ONCE(xprt->connect_cookie);
0800 }
0801
0802 static bool
0803 xprt_request_retransmit_after_disconnect(struct rpc_task *task)
0804 {
0805 struct rpc_rqst *req = task->tk_rqstp;
0806 struct rpc_xprt *xprt = req->rq_xprt;
0807
0808 return req->rq_connect_cookie != xprt_connect_cookie(xprt) ||
0809 !xprt_connected(xprt);
0810 }
0811
0812
0813
0814
0815
0816
0817
0818
0819
0820
0821
0822
0823 void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
0824 {
0825
0826 spin_lock(&xprt->transport_lock);
0827 if (cookie != xprt->connect_cookie)
0828 goto out;
0829 if (test_bit(XPRT_CLOSING, &xprt->state))
0830 goto out;
0831 xprt_schedule_autoclose_locked(xprt);
0832 out:
0833 spin_unlock(&xprt->transport_lock);
0834 }
0835
0836 static bool
0837 xprt_has_timer(const struct rpc_xprt *xprt)
0838 {
0839 return xprt->idle_timeout != 0;
0840 }
0841
0842 static void
0843 xprt_schedule_autodisconnect(struct rpc_xprt *xprt)
0844 __must_hold(&xprt->transport_lock)
0845 {
0846 xprt->last_used = jiffies;
0847 if (RB_EMPTY_ROOT(&xprt->recv_queue) && xprt_has_timer(xprt))
0848 mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout);
0849 }
0850
0851 static void
0852 xprt_init_autodisconnect(struct timer_list *t)
0853 {
0854 struct rpc_xprt *xprt = from_timer(xprt, t, timer);
0855
0856 if (!RB_EMPTY_ROOT(&xprt->recv_queue))
0857 return;
0858
0859 xprt->last_used = jiffies;
0860 if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
0861 return;
0862 queue_work(xprtiod_workqueue, &xprt->task_cleanup);
0863 }
0864
0865 #if IS_ENABLED(CONFIG_FAIL_SUNRPC)
0866 static void xprt_inject_disconnect(struct rpc_xprt *xprt)
0867 {
0868 if (!fail_sunrpc.ignore_client_disconnect &&
0869 should_fail(&fail_sunrpc.attr, 1))
0870 xprt->ops->inject_disconnect(xprt);
0871 }
0872 #else
0873 static inline void xprt_inject_disconnect(struct rpc_xprt *xprt)
0874 {
0875 }
0876 #endif
0877
0878 bool xprt_lock_connect(struct rpc_xprt *xprt,
0879 struct rpc_task *task,
0880 void *cookie)
0881 {
0882 bool ret = false;
0883
0884 spin_lock(&xprt->transport_lock);
0885 if (!test_bit(XPRT_LOCKED, &xprt->state))
0886 goto out;
0887 if (xprt->snd_task != task)
0888 goto out;
0889 set_bit(XPRT_SND_IS_COOKIE, &xprt->state);
0890 xprt->snd_task = cookie;
0891 ret = true;
0892 out:
0893 spin_unlock(&xprt->transport_lock);
0894 return ret;
0895 }
0896 EXPORT_SYMBOL_GPL(xprt_lock_connect);
0897
0898 void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
0899 {
0900 spin_lock(&xprt->transport_lock);
0901 if (xprt->snd_task != cookie)
0902 goto out;
0903 if (!test_bit(XPRT_LOCKED, &xprt->state))
0904 goto out;
0905 xprt->snd_task =NULL;
0906 clear_bit(XPRT_SND_IS_COOKIE, &xprt->state);
0907 xprt->ops->release_xprt(xprt, NULL);
0908 xprt_schedule_autodisconnect(xprt);
0909 out:
0910 spin_unlock(&xprt->transport_lock);
0911 wake_up_bit(&xprt->state, XPRT_LOCKED);
0912 }
0913 EXPORT_SYMBOL_GPL(xprt_unlock_connect);
0914
0915
0916
0917
0918
0919
0920 void xprt_connect(struct rpc_task *task)
0921 {
0922 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
0923
0924 trace_xprt_connect(xprt);
0925
0926 if (!xprt_bound(xprt)) {
0927 task->tk_status = -EAGAIN;
0928 return;
0929 }
0930 if (!xprt_lock_write(xprt, task))
0931 return;
0932
0933 if (!xprt_connected(xprt) && !test_bit(XPRT_CLOSE_WAIT, &xprt->state)) {
0934 task->tk_rqstp->rq_connect_cookie = xprt->connect_cookie;
0935 rpc_sleep_on_timeout(&xprt->pending, task, NULL,
0936 xprt_request_timeout(task->tk_rqstp));
0937
0938 if (test_bit(XPRT_CLOSING, &xprt->state))
0939 return;
0940 if (xprt_test_and_set_connecting(xprt))
0941 return;
0942
0943 if (!xprt_connected(xprt)) {
0944 xprt->stat.connect_start = jiffies;
0945 xprt->ops->connect(xprt, task);
0946 } else {
0947 xprt_clear_connecting(xprt);
0948 task->tk_status = 0;
0949 rpc_wake_up_queued_task(&xprt->pending, task);
0950 }
0951 }
0952 xprt_release_write(xprt, task);
0953 }
0954
0955
0956
0957
0958
0959
0960 unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt)
0961 {
0962 unsigned long start, now = jiffies;
0963
0964 start = xprt->stat.connect_start + xprt->reestablish_timeout;
0965 if (time_after(start, now))
0966 return start - now;
0967 return 0;
0968 }
0969 EXPORT_SYMBOL_GPL(xprt_reconnect_delay);
0970
0971
0972
0973
0974
0975
0976
0977 void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to)
0978 {
0979 xprt->reestablish_timeout <<= 1;
0980 if (xprt->reestablish_timeout > xprt->max_reconnect_timeout)
0981 xprt->reestablish_timeout = xprt->max_reconnect_timeout;
0982 if (xprt->reestablish_timeout < init_to)
0983 xprt->reestablish_timeout = init_to;
0984 }
0985 EXPORT_SYMBOL_GPL(xprt_reconnect_backoff);
0986
0987 enum xprt_xid_rb_cmp {
0988 XID_RB_EQUAL,
0989 XID_RB_LEFT,
0990 XID_RB_RIGHT,
0991 };
0992 static enum xprt_xid_rb_cmp
0993 xprt_xid_cmp(__be32 xid1, __be32 xid2)
0994 {
0995 if (xid1 == xid2)
0996 return XID_RB_EQUAL;
0997 if ((__force u32)xid1 < (__force u32)xid2)
0998 return XID_RB_LEFT;
0999 return XID_RB_RIGHT;
1000 }
1001
1002 static struct rpc_rqst *
1003 xprt_request_rb_find(struct rpc_xprt *xprt, __be32 xid)
1004 {
1005 struct rb_node *n = xprt->recv_queue.rb_node;
1006 struct rpc_rqst *req;
1007
1008 while (n != NULL) {
1009 req = rb_entry(n, struct rpc_rqst, rq_recv);
1010 switch (xprt_xid_cmp(xid, req->rq_xid)) {
1011 case XID_RB_LEFT:
1012 n = n->rb_left;
1013 break;
1014 case XID_RB_RIGHT:
1015 n = n->rb_right;
1016 break;
1017 case XID_RB_EQUAL:
1018 return req;
1019 }
1020 }
1021 return NULL;
1022 }
1023
1024 static void
1025 xprt_request_rb_insert(struct rpc_xprt *xprt, struct rpc_rqst *new)
1026 {
1027 struct rb_node **p = &xprt->recv_queue.rb_node;
1028 struct rb_node *n = NULL;
1029 struct rpc_rqst *req;
1030
1031 while (*p != NULL) {
1032 n = *p;
1033 req = rb_entry(n, struct rpc_rqst, rq_recv);
1034 switch(xprt_xid_cmp(new->rq_xid, req->rq_xid)) {
1035 case XID_RB_LEFT:
1036 p = &n->rb_left;
1037 break;
1038 case XID_RB_RIGHT:
1039 p = &n->rb_right;
1040 break;
1041 case XID_RB_EQUAL:
1042 WARN_ON_ONCE(new != req);
1043 return;
1044 }
1045 }
1046 rb_link_node(&new->rq_recv, n, p);
1047 rb_insert_color(&new->rq_recv, &xprt->recv_queue);
1048 }
1049
1050 static void
1051 xprt_request_rb_remove(struct rpc_xprt *xprt, struct rpc_rqst *req)
1052 {
1053 rb_erase(&req->rq_recv, &xprt->recv_queue);
1054 }
1055
1056
1057
1058
1059
1060
1061
1062
1063 struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
1064 {
1065 struct rpc_rqst *entry;
1066
1067 entry = xprt_request_rb_find(xprt, xid);
1068 if (entry != NULL) {
1069 trace_xprt_lookup_rqst(xprt, xid, 0);
1070 entry->rq_rtt = ktime_sub(ktime_get(), entry->rq_xtime);
1071 return entry;
1072 }
1073
1074 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n",
1075 ntohl(xid));
1076 trace_xprt_lookup_rqst(xprt, xid, -ENOENT);
1077 xprt->stat.bad_xids++;
1078 return NULL;
1079 }
1080 EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
1081
1082 static bool
1083 xprt_is_pinned_rqst(struct rpc_rqst *req)
1084 {
1085 return atomic_read(&req->rq_pin) != 0;
1086 }
1087
1088
1089
1090
1091
1092
1093
1094
1095 void xprt_pin_rqst(struct rpc_rqst *req)
1096 {
1097 atomic_inc(&req->rq_pin);
1098 }
1099 EXPORT_SYMBOL_GPL(xprt_pin_rqst);
1100
1101
1102
1103
1104
1105
1106
1107 void xprt_unpin_rqst(struct rpc_rqst *req)
1108 {
1109 if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) {
1110 atomic_dec(&req->rq_pin);
1111 return;
1112 }
1113 if (atomic_dec_and_test(&req->rq_pin))
1114 wake_up_var(&req->rq_pin);
1115 }
1116 EXPORT_SYMBOL_GPL(xprt_unpin_rqst);
1117
1118 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
1119 {
1120 wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req));
1121 }
1122
1123 static bool
1124 xprt_request_data_received(struct rpc_task *task)
1125 {
1126 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
1127 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) != 0;
1128 }
1129
1130 static bool
1131 xprt_request_need_enqueue_receive(struct rpc_task *task, struct rpc_rqst *req)
1132 {
1133 return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
1134 READ_ONCE(task->tk_rqstp->rq_reply_bytes_recvd) == 0;
1135 }
1136
1137
1138
1139
1140
1141
1142 int
1143 xprt_request_enqueue_receive(struct rpc_task *task)
1144 {
1145 struct rpc_rqst *req = task->tk_rqstp;
1146 struct rpc_xprt *xprt = req->rq_xprt;
1147 int ret;
1148
1149 if (!xprt_request_need_enqueue_receive(task, req))
1150 return 0;
1151
1152 ret = xprt_request_prepare(task->tk_rqstp, &req->rq_rcv_buf);
1153 if (ret)
1154 return ret;
1155 spin_lock(&xprt->queue_lock);
1156
1157
1158 memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
1159 sizeof(req->rq_private_buf));
1160
1161
1162 xprt_request_rb_insert(xprt, req);
1163 set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
1164 spin_unlock(&xprt->queue_lock);
1165
1166
1167 del_singleshot_timer_sync(&xprt->timer);
1168 return 0;
1169 }
1170
1171
1172
1173
1174
1175
1176
1177 static void
1178 xprt_request_dequeue_receive_locked(struct rpc_task *task)
1179 {
1180 struct rpc_rqst *req = task->tk_rqstp;
1181
1182 if (test_and_clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
1183 xprt_request_rb_remove(req->rq_xprt, req);
1184 }
1185
1186
1187
1188
1189
1190
1191
1192 void xprt_update_rtt(struct rpc_task *task)
1193 {
1194 struct rpc_rqst *req = task->tk_rqstp;
1195 struct rpc_rtt *rtt = task->tk_client->cl_rtt;
1196 unsigned int timer = task->tk_msg.rpc_proc->p_timer;
1197 long m = usecs_to_jiffies(ktime_to_us(req->rq_rtt));
1198
1199 if (timer) {
1200 if (req->rq_ntrans == 1)
1201 rpc_update_rtt(rtt, timer, m);
1202 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
1203 }
1204 }
1205 EXPORT_SYMBOL_GPL(xprt_update_rtt);
1206
1207
1208
1209
1210
1211
1212
1213
1214 void xprt_complete_rqst(struct rpc_task *task, int copied)
1215 {
1216 struct rpc_rqst *req = task->tk_rqstp;
1217 struct rpc_xprt *xprt = req->rq_xprt;
1218
1219 xprt->stat.recvs++;
1220
1221 xdr_free_bvec(&req->rq_rcv_buf);
1222 req->rq_private_buf.bvec = NULL;
1223 req->rq_private_buf.len = copied;
1224
1225
1226 smp_wmb();
1227 req->rq_reply_bytes_recvd = copied;
1228 xprt_request_dequeue_receive_locked(task);
1229 rpc_wake_up_queued_task(&xprt->pending, task);
1230 }
1231 EXPORT_SYMBOL_GPL(xprt_complete_rqst);
1232
1233 static void xprt_timer(struct rpc_task *task)
1234 {
1235 struct rpc_rqst *req = task->tk_rqstp;
1236 struct rpc_xprt *xprt = req->rq_xprt;
1237
1238 if (task->tk_status != -ETIMEDOUT)
1239 return;
1240
1241 trace_xprt_timer(xprt, req->rq_xid, task->tk_status);
1242 if (!req->rq_reply_bytes_recvd) {
1243 if (xprt->ops->timer)
1244 xprt->ops->timer(xprt, task);
1245 } else
1246 task->tk_status = 0;
1247 }
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258 void xprt_wait_for_reply_request_def(struct rpc_task *task)
1259 {
1260 struct rpc_rqst *req = task->tk_rqstp;
1261
1262 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer,
1263 xprt_request_timeout(req));
1264 }
1265 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_def);
1266
1267
1268
1269
1270
1271
1272
1273
1274 void xprt_wait_for_reply_request_rtt(struct rpc_task *task)
1275 {
1276 int timer = task->tk_msg.rpc_proc->p_timer;
1277 struct rpc_clnt *clnt = task->tk_client;
1278 struct rpc_rtt *rtt = clnt->cl_rtt;
1279 struct rpc_rqst *req = task->tk_rqstp;
1280 unsigned long max_timeout = clnt->cl_timeout->to_maxval;
1281 unsigned long timeout;
1282
1283 timeout = rpc_calc_rto(rtt, timer);
1284 timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
1285 if (timeout > max_timeout || timeout == 0)
1286 timeout = max_timeout;
1287 rpc_sleep_on_timeout(&req->rq_xprt->pending, task, xprt_timer,
1288 jiffies + timeout);
1289 }
1290 EXPORT_SYMBOL_GPL(xprt_wait_for_reply_request_rtt);
1291
1292
1293
1294
1295
1296
1297 void xprt_request_wait_receive(struct rpc_task *task)
1298 {
1299 struct rpc_rqst *req = task->tk_rqstp;
1300 struct rpc_xprt *xprt = req->rq_xprt;
1301
1302 if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
1303 return;
1304
1305
1306
1307
1308
1309 spin_lock(&xprt->queue_lock);
1310 if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
1311 xprt->ops->wait_for_reply_request(task);
1312
1313
1314
1315
1316
1317 if (xprt_request_retransmit_after_disconnect(task))
1318 rpc_wake_up_queued_task_set_status(&xprt->pending,
1319 task, -ENOTCONN);
1320 }
1321 spin_unlock(&xprt->queue_lock);
1322 }
1323
1324 static bool
1325 xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req)
1326 {
1327 return !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1328 }
1329
1330
1331
1332
1333
1334
1335
1336 void
1337 xprt_request_enqueue_transmit(struct rpc_task *task)
1338 {
1339 struct rpc_rqst *pos, *req = task->tk_rqstp;
1340 struct rpc_xprt *xprt = req->rq_xprt;
1341 int ret;
1342
1343 if (xprt_request_need_enqueue_transmit(task, req)) {
1344 ret = xprt_request_prepare(task->tk_rqstp, &req->rq_snd_buf);
1345 if (ret) {
1346 task->tk_status = ret;
1347 return;
1348 }
1349 req->rq_bytes_sent = 0;
1350 spin_lock(&xprt->queue_lock);
1351
1352
1353
1354
1355 if (req->rq_cong) {
1356 xprt_clear_congestion_window_wait(xprt);
1357 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1358 if (pos->rq_cong)
1359 continue;
1360
1361 list_add_tail(&req->rq_xmit, &pos->rq_xmit);
1362 INIT_LIST_HEAD(&req->rq_xmit2);
1363 goto out;
1364 }
1365 } else if (!req->rq_seqno) {
1366 list_for_each_entry(pos, &xprt->xmit_queue, rq_xmit) {
1367 if (pos->rq_task->tk_owner != task->tk_owner)
1368 continue;
1369 list_add_tail(&req->rq_xmit2, &pos->rq_xmit2);
1370 INIT_LIST_HEAD(&req->rq_xmit);
1371 goto out;
1372 }
1373 }
1374 list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
1375 INIT_LIST_HEAD(&req->rq_xmit2);
1376 out:
1377 atomic_long_inc(&xprt->xmit_queuelen);
1378 set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
1379 spin_unlock(&xprt->queue_lock);
1380 }
1381 }
1382
1383
1384
1385
1386
1387
1388
1389
1390 static void
1391 xprt_request_dequeue_transmit_locked(struct rpc_task *task)
1392 {
1393 struct rpc_rqst *req = task->tk_rqstp;
1394
1395 if (!test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1396 return;
1397 if (!list_empty(&req->rq_xmit)) {
1398 list_del(&req->rq_xmit);
1399 if (!list_empty(&req->rq_xmit2)) {
1400 struct rpc_rqst *next = list_first_entry(&req->rq_xmit2,
1401 struct rpc_rqst, rq_xmit2);
1402 list_del(&req->rq_xmit2);
1403 list_add_tail(&next->rq_xmit, &next->rq_xprt->xmit_queue);
1404 }
1405 } else
1406 list_del(&req->rq_xmit2);
1407 atomic_long_dec(&req->rq_xprt->xmit_queuelen);
1408 xdr_free_bvec(&req->rq_snd_buf);
1409 }
1410
1411
1412
1413
1414
1415
1416
1417 static void
1418 xprt_request_dequeue_transmit(struct rpc_task *task)
1419 {
1420 struct rpc_rqst *req = task->tk_rqstp;
1421 struct rpc_xprt *xprt = req->rq_xprt;
1422
1423 spin_lock(&xprt->queue_lock);
1424 xprt_request_dequeue_transmit_locked(task);
1425 spin_unlock(&xprt->queue_lock);
1426 }
1427
1428
1429
1430
1431
1432
1433
1434
1435 void
1436 xprt_request_dequeue_xprt(struct rpc_task *task)
1437 {
1438 struct rpc_rqst *req = task->tk_rqstp;
1439 struct rpc_xprt *xprt = req->rq_xprt;
1440
1441 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
1442 test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
1443 xprt_is_pinned_rqst(req)) {
1444 spin_lock(&xprt->queue_lock);
1445 while (xprt_is_pinned_rqst(req)) {
1446 set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
1447 spin_unlock(&xprt->queue_lock);
1448 xprt_wait_on_pinned_rqst(req);
1449 spin_lock(&xprt->queue_lock);
1450 clear_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
1451 }
1452 xprt_request_dequeue_transmit_locked(task);
1453 xprt_request_dequeue_receive_locked(task);
1454 spin_unlock(&xprt->queue_lock);
1455 xdr_free_bvec(&req->rq_rcv_buf);
1456 }
1457 }
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468 static int
1469 xprt_request_prepare(struct rpc_rqst *req, struct xdr_buf *buf)
1470 {
1471 struct rpc_xprt *xprt = req->rq_xprt;
1472
1473 if (xprt->ops->prepare_request)
1474 return xprt->ops->prepare_request(req, buf);
1475 return 0;
1476 }
1477
1478
1479
1480
1481
1482
1483
1484 bool
1485 xprt_request_need_retransmit(struct rpc_task *task)
1486 {
1487 return xprt_request_retransmit_after_disconnect(task);
1488 }
1489
1490
1491
1492
1493
1494
1495 bool xprt_prepare_transmit(struct rpc_task *task)
1496 {
1497 struct rpc_rqst *req = task->tk_rqstp;
1498 struct rpc_xprt *xprt = req->rq_xprt;
1499
1500 if (!xprt_lock_write(xprt, task)) {
1501
1502 if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1503 rpc_wake_up_queued_task_set_status(&xprt->sending,
1504 task, 0);
1505 return false;
1506
1507 }
1508 if (atomic_read(&xprt->swapper))
1509
1510 current->flags |= PF_MEMALLOC;
1511 return true;
1512 }
1513
1514 void xprt_end_transmit(struct rpc_task *task)
1515 {
1516 struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
1517
1518 xprt_inject_disconnect(xprt);
1519 xprt_release_write(xprt, task);
1520 }
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532 static int
1533 xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
1534 {
1535 struct rpc_xprt *xprt = req->rq_xprt;
1536 struct rpc_task *task = req->rq_task;
1537 unsigned int connect_cookie;
1538 int is_retrans = RPC_WAS_SENT(task);
1539 int status;
1540
1541 if (!req->rq_bytes_sent) {
1542 if (xprt_request_data_received(task)) {
1543 status = 0;
1544 goto out_dequeue;
1545 }
1546
1547 if (rpcauth_xmit_need_reencode(task)) {
1548 status = -EBADMSG;
1549 goto out_dequeue;
1550 }
1551 if (RPC_SIGNALLED(task)) {
1552 status = -ERESTARTSYS;
1553 goto out_dequeue;
1554 }
1555 }
1556
1557
1558
1559
1560
1561
1562 req->rq_ntrans++;
1563
1564 trace_rpc_xdr_sendto(task, &req->rq_snd_buf);
1565 connect_cookie = xprt->connect_cookie;
1566 status = xprt->ops->send_request(req);
1567 if (status != 0) {
1568 req->rq_ntrans--;
1569 trace_xprt_transmit(req, status);
1570 return status;
1571 }
1572
1573 if (is_retrans) {
1574 task->tk_client->cl_stats->rpcretrans++;
1575 trace_xprt_retransmit(req);
1576 }
1577
1578 xprt_inject_disconnect(xprt);
1579
1580 task->tk_flags |= RPC_TASK_SENT;
1581 spin_lock(&xprt->transport_lock);
1582
1583 xprt->stat.sends++;
1584 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
1585 xprt->stat.bklog_u += xprt->backlog.qlen;
1586 xprt->stat.sending_u += xprt->sending.qlen;
1587 xprt->stat.pending_u += xprt->pending.qlen;
1588 spin_unlock(&xprt->transport_lock);
1589
1590 req->rq_connect_cookie = connect_cookie;
1591 out_dequeue:
1592 trace_xprt_transmit(req, status);
1593 xprt_request_dequeue_transmit(task);
1594 rpc_wake_up_queued_task_set_status(&xprt->sending, task, status);
1595 return status;
1596 }
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607 void
1608 xprt_transmit(struct rpc_task *task)
1609 {
1610 struct rpc_rqst *next, *req = task->tk_rqstp;
1611 struct rpc_xprt *xprt = req->rq_xprt;
1612 int status;
1613
1614 spin_lock(&xprt->queue_lock);
1615 for (;;) {
1616 next = list_first_entry_or_null(&xprt->xmit_queue,
1617 struct rpc_rqst, rq_xmit);
1618 if (!next)
1619 break;
1620 xprt_pin_rqst(next);
1621 spin_unlock(&xprt->queue_lock);
1622 status = xprt_request_transmit(next, task);
1623 if (status == -EBADMSG && next != req)
1624 status = 0;
1625 spin_lock(&xprt->queue_lock);
1626 xprt_unpin_rqst(next);
1627 if (status < 0) {
1628 if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1629 task->tk_status = status;
1630 break;
1631 }
1632
1633 if (xprt_request_data_received(task) &&
1634 !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
1635 break;
1636 cond_resched_lock(&xprt->queue_lock);
1637 }
1638 spin_unlock(&xprt->queue_lock);
1639 }
1640
1641 static void xprt_complete_request_init(struct rpc_task *task)
1642 {
1643 if (task->tk_rqstp)
1644 xprt_request_init(task);
1645 }
1646
1647 void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
1648 {
1649 set_bit(XPRT_CONGESTED, &xprt->state);
1650 rpc_sleep_on(&xprt->backlog, task, xprt_complete_request_init);
1651 }
1652 EXPORT_SYMBOL_GPL(xprt_add_backlog);
1653
1654 static bool __xprt_set_rq(struct rpc_task *task, void *data)
1655 {
1656 struct rpc_rqst *req = data;
1657
1658 if (task->tk_rqstp == NULL) {
1659 memset(req, 0, sizeof(*req));
1660 task->tk_rqstp = req;
1661 return true;
1662 }
1663 return false;
1664 }
1665
1666 bool xprt_wake_up_backlog(struct rpc_xprt *xprt, struct rpc_rqst *req)
1667 {
1668 if (rpc_wake_up_first(&xprt->backlog, __xprt_set_rq, req) == NULL) {
1669 clear_bit(XPRT_CONGESTED, &xprt->state);
1670 return false;
1671 }
1672 return true;
1673 }
1674 EXPORT_SYMBOL_GPL(xprt_wake_up_backlog);
1675
1676 static bool xprt_throttle_congested(struct rpc_xprt *xprt, struct rpc_task *task)
1677 {
1678 bool ret = false;
1679
1680 if (!test_bit(XPRT_CONGESTED, &xprt->state))
1681 goto out;
1682 spin_lock(&xprt->reserve_lock);
1683 if (test_bit(XPRT_CONGESTED, &xprt->state)) {
1684 xprt_add_backlog(xprt, task);
1685 ret = true;
1686 }
1687 spin_unlock(&xprt->reserve_lock);
1688 out:
1689 return ret;
1690 }
1691
1692 static struct rpc_rqst *xprt_dynamic_alloc_slot(struct rpc_xprt *xprt)
1693 {
1694 struct rpc_rqst *req = ERR_PTR(-EAGAIN);
1695
1696 if (xprt->num_reqs >= xprt->max_reqs)
1697 goto out;
1698 ++xprt->num_reqs;
1699 spin_unlock(&xprt->reserve_lock);
1700 req = kzalloc(sizeof(*req), rpc_task_gfp_mask());
1701 spin_lock(&xprt->reserve_lock);
1702 if (req != NULL)
1703 goto out;
1704 --xprt->num_reqs;
1705 req = ERR_PTR(-ENOMEM);
1706 out:
1707 return req;
1708 }
1709
1710 static bool xprt_dynamic_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
1711 {
1712 if (xprt->num_reqs > xprt->min_reqs) {
1713 --xprt->num_reqs;
1714 kfree(req);
1715 return true;
1716 }
1717 return false;
1718 }
1719
1720 void xprt_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task)
1721 {
1722 struct rpc_rqst *req;
1723
1724 spin_lock(&xprt->reserve_lock);
1725 if (!list_empty(&xprt->free)) {
1726 req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
1727 list_del(&req->rq_list);
1728 goto out_init_req;
1729 }
1730 req = xprt_dynamic_alloc_slot(xprt);
1731 if (!IS_ERR(req))
1732 goto out_init_req;
1733 switch (PTR_ERR(req)) {
1734 case -ENOMEM:
1735 dprintk("RPC: dynamic allocation of request slot "
1736 "failed! Retrying\n");
1737 task->tk_status = -ENOMEM;
1738 break;
1739 case -EAGAIN:
1740 xprt_add_backlog(xprt, task);
1741 dprintk("RPC: waiting for request slot\n");
1742 fallthrough;
1743 default:
1744 task->tk_status = -EAGAIN;
1745 }
1746 spin_unlock(&xprt->reserve_lock);
1747 return;
1748 out_init_req:
1749 xprt->stat.max_slots = max_t(unsigned int, xprt->stat.max_slots,
1750 xprt->num_reqs);
1751 spin_unlock(&xprt->reserve_lock);
1752
1753 task->tk_status = 0;
1754 task->tk_rqstp = req;
1755 }
1756 EXPORT_SYMBOL_GPL(xprt_alloc_slot);
1757
1758 void xprt_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *req)
1759 {
1760 spin_lock(&xprt->reserve_lock);
1761 if (!xprt_wake_up_backlog(xprt, req) &&
1762 !xprt_dynamic_free_slot(xprt, req)) {
1763 memset(req, 0, sizeof(*req));
1764 list_add(&req->rq_list, &xprt->free);
1765 }
1766 spin_unlock(&xprt->reserve_lock);
1767 }
1768 EXPORT_SYMBOL_GPL(xprt_free_slot);
1769
1770 static void xprt_free_all_slots(struct rpc_xprt *xprt)
1771 {
1772 struct rpc_rqst *req;
1773 while (!list_empty(&xprt->free)) {
1774 req = list_first_entry(&xprt->free, struct rpc_rqst, rq_list);
1775 list_del(&req->rq_list);
1776 kfree(req);
1777 }
1778 }
1779
1780 static DEFINE_IDA(rpc_xprt_ids);
1781
1782 void xprt_cleanup_ids(void)
1783 {
1784 ida_destroy(&rpc_xprt_ids);
1785 }
1786
1787 static int xprt_alloc_id(struct rpc_xprt *xprt)
1788 {
1789 int id;
1790
1791 id = ida_simple_get(&rpc_xprt_ids, 0, 0, GFP_KERNEL);
1792 if (id < 0)
1793 return id;
1794
1795 xprt->id = id;
1796 return 0;
1797 }
1798
1799 static void xprt_free_id(struct rpc_xprt *xprt)
1800 {
1801 ida_simple_remove(&rpc_xprt_ids, xprt->id);
1802 }
1803
1804 struct rpc_xprt *xprt_alloc(struct net *net, size_t size,
1805 unsigned int num_prealloc,
1806 unsigned int max_alloc)
1807 {
1808 struct rpc_xprt *xprt;
1809 struct rpc_rqst *req;
1810 int i;
1811
1812 xprt = kzalloc(size, GFP_KERNEL);
1813 if (xprt == NULL)
1814 goto out;
1815
1816 xprt_alloc_id(xprt);
1817 xprt_init(xprt, net);
1818
1819 for (i = 0; i < num_prealloc; i++) {
1820 req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);
1821 if (!req)
1822 goto out_free;
1823 list_add(&req->rq_list, &xprt->free);
1824 }
1825 if (max_alloc > num_prealloc)
1826 xprt->max_reqs = max_alloc;
1827 else
1828 xprt->max_reqs = num_prealloc;
1829 xprt->min_reqs = num_prealloc;
1830 xprt->num_reqs = num_prealloc;
1831
1832 return xprt;
1833
1834 out_free:
1835 xprt_free(xprt);
1836 out:
1837 return NULL;
1838 }
1839 EXPORT_SYMBOL_GPL(xprt_alloc);
1840
1841 void xprt_free(struct rpc_xprt *xprt)
1842 {
1843 put_net_track(xprt->xprt_net, &xprt->ns_tracker);
1844 xprt_free_all_slots(xprt);
1845 xprt_free_id(xprt);
1846 rpc_sysfs_xprt_destroy(xprt);
1847 kfree_rcu(xprt, rcu);
1848 }
1849 EXPORT_SYMBOL_GPL(xprt_free);
1850
1851 static void
1852 xprt_init_connect_cookie(struct rpc_rqst *req, struct rpc_xprt *xprt)
1853 {
1854 req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1;
1855 }
1856
1857 static __be32
1858 xprt_alloc_xid(struct rpc_xprt *xprt)
1859 {
1860 __be32 xid;
1861
1862 spin_lock(&xprt->reserve_lock);
1863 xid = (__force __be32)xprt->xid++;
1864 spin_unlock(&xprt->reserve_lock);
1865 return xid;
1866 }
1867
1868 static void
1869 xprt_init_xid(struct rpc_xprt *xprt)
1870 {
1871 xprt->xid = prandom_u32();
1872 }
1873
1874 static void
1875 xprt_request_init(struct rpc_task *task)
1876 {
1877 struct rpc_xprt *xprt = task->tk_xprt;
1878 struct rpc_rqst *req = task->tk_rqstp;
1879
1880 req->rq_task = task;
1881 req->rq_xprt = xprt;
1882 req->rq_buffer = NULL;
1883 req->rq_xid = xprt_alloc_xid(xprt);
1884 xprt_init_connect_cookie(req, xprt);
1885 req->rq_snd_buf.len = 0;
1886 req->rq_snd_buf.buflen = 0;
1887 req->rq_rcv_buf.len = 0;
1888 req->rq_rcv_buf.buflen = 0;
1889 req->rq_snd_buf.bvec = NULL;
1890 req->rq_rcv_buf.bvec = NULL;
1891 req->rq_release_snd_buf = NULL;
1892 xprt_init_majortimeo(task, req);
1893
1894 trace_xprt_reserve(req);
1895 }
1896
1897 static void
1898 xprt_do_reserve(struct rpc_xprt *xprt, struct rpc_task *task)
1899 {
1900 xprt->ops->alloc_slot(xprt, task);
1901 if (task->tk_rqstp != NULL)
1902 xprt_request_init(task);
1903 }
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913 void xprt_reserve(struct rpc_task *task)
1914 {
1915 struct rpc_xprt *xprt = task->tk_xprt;
1916
1917 task->tk_status = 0;
1918 if (task->tk_rqstp != NULL)
1919 return;
1920
1921 task->tk_status = -EAGAIN;
1922 if (!xprt_throttle_congested(xprt, task))
1923 xprt_do_reserve(xprt, task);
1924 }
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935 void xprt_retry_reserve(struct rpc_task *task)
1936 {
1937 struct rpc_xprt *xprt = task->tk_xprt;
1938
1939 task->tk_status = 0;
1940 if (task->tk_rqstp != NULL)
1941 return;
1942
1943 task->tk_status = -EAGAIN;
1944 xprt_do_reserve(xprt, task);
1945 }
1946
1947
1948
1949
1950
1951
1952 void xprt_release(struct rpc_task *task)
1953 {
1954 struct rpc_xprt *xprt;
1955 struct rpc_rqst *req = task->tk_rqstp;
1956
1957 if (req == NULL) {
1958 if (task->tk_client) {
1959 xprt = task->tk_xprt;
1960 xprt_release_write(xprt, task);
1961 }
1962 return;
1963 }
1964
1965 xprt = req->rq_xprt;
1966 xprt_request_dequeue_xprt(task);
1967 spin_lock(&xprt->transport_lock);
1968 xprt->ops->release_xprt(xprt, task);
1969 if (xprt->ops->release_request)
1970 xprt->ops->release_request(task);
1971 xprt_schedule_autodisconnect(xprt);
1972 spin_unlock(&xprt->transport_lock);
1973 if (req->rq_buffer)
1974 xprt->ops->buf_free(task);
1975 if (req->rq_cred != NULL)
1976 put_rpccred(req->rq_cred);
1977 if (req->rq_release_snd_buf)
1978 req->rq_release_snd_buf(req);
1979
1980 task->tk_rqstp = NULL;
1981 if (likely(!bc_prealloc(req)))
1982 xprt->ops->free_slot(xprt, req);
1983 else
1984 xprt_free_bc_request(req);
1985 }
1986
1987 #ifdef CONFIG_SUNRPC_BACKCHANNEL
1988 void
1989 xprt_init_bc_request(struct rpc_rqst *req, struct rpc_task *task)
1990 {
1991 struct xdr_buf *xbufp = &req->rq_snd_buf;
1992
1993 task->tk_rqstp = req;
1994 req->rq_task = task;
1995 xprt_init_connect_cookie(req, req->rq_xprt);
1996
1997
1998
1999
2000 xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
2001 xbufp->tail[0].iov_len;
2002 }
2003 #endif
2004
2005 static void xprt_init(struct rpc_xprt *xprt, struct net *net)
2006 {
2007 kref_init(&xprt->kref);
2008
2009 spin_lock_init(&xprt->transport_lock);
2010 spin_lock_init(&xprt->reserve_lock);
2011 spin_lock_init(&xprt->queue_lock);
2012
2013 INIT_LIST_HEAD(&xprt->free);
2014 xprt->recv_queue = RB_ROOT;
2015 INIT_LIST_HEAD(&xprt->xmit_queue);
2016 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
2017 spin_lock_init(&xprt->bc_pa_lock);
2018 INIT_LIST_HEAD(&xprt->bc_pa_list);
2019 #endif
2020 INIT_LIST_HEAD(&xprt->xprt_switch);
2021
2022 xprt->last_used = jiffies;
2023 xprt->cwnd = RPC_INITCWND;
2024 xprt->bind_index = 0;
2025
2026 rpc_init_wait_queue(&xprt->binding, "xprt_binding");
2027 rpc_init_wait_queue(&xprt->pending, "xprt_pending");
2028 rpc_init_wait_queue(&xprt->sending, "xprt_sending");
2029 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
2030
2031 xprt_init_xid(xprt);
2032
2033 xprt->xprt_net = get_net_track(net, &xprt->ns_tracker, GFP_KERNEL);
2034 }
2035
2036
2037
2038
2039
2040
2041 struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
2042 {
2043 struct rpc_xprt *xprt;
2044 const struct xprt_class *t;
2045
2046 t = xprt_class_find_by_ident(args->ident);
2047 if (!t) {
2048 dprintk("RPC: transport (%d) not supported\n", args->ident);
2049 return ERR_PTR(-EIO);
2050 }
2051
2052 xprt = t->setup(args);
2053 xprt_class_release(t);
2054
2055 if (IS_ERR(xprt))
2056 goto out;
2057 if (args->flags & XPRT_CREATE_NO_IDLE_TIMEOUT)
2058 xprt->idle_timeout = 0;
2059 INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
2060 if (xprt_has_timer(xprt))
2061 timer_setup(&xprt->timer, xprt_init_autodisconnect, 0);
2062 else
2063 timer_setup(&xprt->timer, NULL, 0);
2064
2065 if (strlen(args->servername) > RPC_MAXNETNAMELEN) {
2066 xprt_destroy(xprt);
2067 return ERR_PTR(-EINVAL);
2068 }
2069 xprt->servername = kstrdup(args->servername, GFP_KERNEL);
2070 if (xprt->servername == NULL) {
2071 xprt_destroy(xprt);
2072 return ERR_PTR(-ENOMEM);
2073 }
2074
2075 rpc_xprt_debugfs_register(xprt);
2076
2077 trace_xprt_create(xprt);
2078 out:
2079 return xprt;
2080 }
2081
2082 static void xprt_destroy_cb(struct work_struct *work)
2083 {
2084 struct rpc_xprt *xprt =
2085 container_of(work, struct rpc_xprt, task_cleanup);
2086
2087 trace_xprt_destroy(xprt);
2088
2089 rpc_xprt_debugfs_unregister(xprt);
2090 rpc_destroy_wait_queue(&xprt->binding);
2091 rpc_destroy_wait_queue(&xprt->pending);
2092 rpc_destroy_wait_queue(&xprt->sending);
2093 rpc_destroy_wait_queue(&xprt->backlog);
2094 kfree(xprt->servername);
2095
2096
2097
2098 xprt_destroy_backchannel(xprt, UINT_MAX);
2099
2100
2101
2102
2103 xprt->ops->destroy(xprt);
2104 }
2105
2106
2107
2108
2109
2110
2111 static void xprt_destroy(struct rpc_xprt *xprt)
2112 {
2113
2114
2115
2116 wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_UNINTERRUPTIBLE);
2117
2118
2119
2120
2121
2122
2123 spin_lock(&xprt->transport_lock);
2124 del_timer_sync(&xprt->timer);
2125 spin_unlock(&xprt->transport_lock);
2126
2127
2128
2129
2130
2131 INIT_WORK(&xprt->task_cleanup, xprt_destroy_cb);
2132 schedule_work(&xprt->task_cleanup);
2133 }
2134
2135 static void xprt_destroy_kref(struct kref *kref)
2136 {
2137 xprt_destroy(container_of(kref, struct rpc_xprt, kref));
2138 }
2139
2140
2141
2142
2143
2144
2145 struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
2146 {
2147 if (xprt != NULL && kref_get_unless_zero(&xprt->kref))
2148 return xprt;
2149 return NULL;
2150 }
2151 EXPORT_SYMBOL_GPL(xprt_get);
2152
2153
2154
2155
2156
2157
2158 void xprt_put(struct rpc_xprt *xprt)
2159 {
2160 if (xprt != NULL)
2161 kref_put(&xprt->kref, xprt_destroy_kref);
2162 }
2163 EXPORT_SYMBOL_GPL(xprt_put);
2164
2165 void xprt_set_offline_locked(struct rpc_xprt *xprt, struct rpc_xprt_switch *xps)
2166 {
2167 if (!test_and_set_bit(XPRT_OFFLINE, &xprt->state)) {
2168 spin_lock(&xps->xps_lock);
2169 xps->xps_nactive--;
2170 spin_unlock(&xps->xps_lock);
2171 }
2172 }
2173
2174 void xprt_set_online_locked(struct rpc_xprt *xprt, struct rpc_xprt_switch *xps)
2175 {
2176 if (test_and_clear_bit(XPRT_OFFLINE, &xprt->state)) {
2177 spin_lock(&xps->xps_lock);
2178 xps->xps_nactive++;
2179 spin_unlock(&xps->xps_lock);
2180 }
2181 }
2182
2183 void xprt_delete_locked(struct rpc_xprt *xprt, struct rpc_xprt_switch *xps)
2184 {
2185 if (test_and_set_bit(XPRT_REMOVE, &xprt->state))
2186 return;
2187
2188 xprt_force_disconnect(xprt);
2189 if (!test_bit(XPRT_CONNECTED, &xprt->state))
2190 return;
2191
2192 if (!xprt->sending.qlen && !xprt->pending.qlen &&
2193 !xprt->backlog.qlen && !atomic_long_read(&xprt->queuelen))
2194 rpc_xprt_switch_remove_xprt(xps, xprt, true);
2195 }