0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013 #include <linux/module.h>
0014
0015 #include <linux/sched.h>
0016 #include <linux/interrupt.h>
0017 #include <linux/slab.h>
0018 #include <linux/mempool.h>
0019 #include <linux/smp.h>
0020 #include <linux/spinlock.h>
0021 #include <linux/mutex.h>
0022 #include <linux/freezer.h>
0023 #include <linux/sched/mm.h>
0024
0025 #include <linux/sunrpc/clnt.h>
0026 #include <linux/sunrpc/metrics.h>
0027
0028 #include "sunrpc.h"
0029
0030 #define CREATE_TRACE_POINTS
0031 #include <trace/events/sunrpc.h>
0032
0033
0034
0035
0036 #define RPC_BUFFER_MAXSIZE (2048)
0037 #define RPC_BUFFER_POOLSIZE (8)
0038 #define RPC_TASK_POOLSIZE (8)
0039 static struct kmem_cache *rpc_task_slabp __read_mostly;
0040 static struct kmem_cache *rpc_buffer_slabp __read_mostly;
0041 static mempool_t *rpc_task_mempool __read_mostly;
0042 static mempool_t *rpc_buffer_mempool __read_mostly;
0043
0044 static void rpc_async_schedule(struct work_struct *);
0045 static void rpc_release_task(struct rpc_task *task);
0046 static void __rpc_queue_timer_fn(struct work_struct *);
0047
0048
0049
0050
0051 static struct rpc_wait_queue delay_queue;
0052
0053
0054
0055
0056 struct workqueue_struct *rpciod_workqueue __read_mostly;
0057 struct workqueue_struct *xprtiod_workqueue __read_mostly;
0058 EXPORT_SYMBOL_GPL(xprtiod_workqueue);
0059
0060 gfp_t rpc_task_gfp_mask(void)
0061 {
0062 if (current->flags & PF_WQ_WORKER)
0063 return GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN;
0064 return GFP_KERNEL;
0065 }
0066 EXPORT_SYMBOL_GPL(rpc_task_gfp_mask);
0067
0068 unsigned long
0069 rpc_task_timeout(const struct rpc_task *task)
0070 {
0071 unsigned long timeout = READ_ONCE(task->tk_timeout);
0072
0073 if (timeout != 0) {
0074 unsigned long now = jiffies;
0075 if (time_before(now, timeout))
0076 return timeout - now;
0077 }
0078 return 0;
0079 }
0080 EXPORT_SYMBOL_GPL(rpc_task_timeout);
0081
0082
0083
0084
0085
0086
0087 static void
0088 __rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
0089 {
0090 if (list_empty(&task->u.tk_wait.timer_list))
0091 return;
0092 task->tk_timeout = 0;
0093 list_del(&task->u.tk_wait.timer_list);
0094 if (list_empty(&queue->timer_list.list))
0095 cancel_delayed_work(&queue->timer_list.dwork);
0096 }
0097
0098 static void
0099 rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
0100 {
0101 unsigned long now = jiffies;
0102 queue->timer_list.expires = expires;
0103 if (time_before_eq(expires, now))
0104 expires = 0;
0105 else
0106 expires -= now;
0107 mod_delayed_work(rpciod_workqueue, &queue->timer_list.dwork, expires);
0108 }
0109
0110
0111
0112
0113 static void
0114 __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task,
0115 unsigned long timeout)
0116 {
0117 task->tk_timeout = timeout;
0118 if (list_empty(&queue->timer_list.list) || time_before(timeout, queue->timer_list.expires))
0119 rpc_set_queue_timer(queue, timeout);
0120 list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
0121 }
0122
0123 static void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
0124 {
0125 if (queue->priority != priority) {
0126 queue->priority = priority;
0127 queue->nr = 1U << priority;
0128 }
0129 }
0130
0131 static void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
0132 {
0133 rpc_set_waitqueue_priority(queue, queue->maxpriority);
0134 }
0135
0136
0137
0138
0139 static void
0140 __rpc_list_enqueue_task(struct list_head *q, struct rpc_task *task)
0141 {
0142 struct rpc_task *t;
0143
0144 list_for_each_entry(t, q, u.tk_wait.list) {
0145 if (t->tk_owner == task->tk_owner) {
0146 list_add_tail(&task->u.tk_wait.links,
0147 &t->u.tk_wait.links);
0148
0149 task->u.tk_wait.list.next = q;
0150 task->u.tk_wait.list.prev = NULL;
0151 return;
0152 }
0153 }
0154 INIT_LIST_HEAD(&task->u.tk_wait.links);
0155 list_add_tail(&task->u.tk_wait.list, q);
0156 }
0157
0158
0159
0160
0161 static void
0162 __rpc_list_dequeue_task(struct rpc_task *task)
0163 {
0164 struct list_head *q;
0165 struct rpc_task *t;
0166
0167 if (task->u.tk_wait.list.prev == NULL) {
0168 list_del(&task->u.tk_wait.links);
0169 return;
0170 }
0171 if (!list_empty(&task->u.tk_wait.links)) {
0172 t = list_first_entry(&task->u.tk_wait.links,
0173 struct rpc_task,
0174 u.tk_wait.links);
0175
0176 q = t->u.tk_wait.list.next;
0177 list_add_tail(&t->u.tk_wait.list, q);
0178 list_del(&task->u.tk_wait.links);
0179 }
0180 list_del(&task->u.tk_wait.list);
0181 }
0182
0183
0184
0185
0186 static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue,
0187 struct rpc_task *task,
0188 unsigned char queue_priority)
0189 {
0190 if (unlikely(queue_priority > queue->maxpriority))
0191 queue_priority = queue->maxpriority;
0192 __rpc_list_enqueue_task(&queue->tasks[queue_priority], task);
0193 }
0194
0195
0196
0197
0198 static void __rpc_add_wait_queue(struct rpc_wait_queue *queue,
0199 struct rpc_task *task,
0200 unsigned char queue_priority)
0201 {
0202 INIT_LIST_HEAD(&task->u.tk_wait.timer_list);
0203 if (RPC_IS_PRIORITY(queue))
0204 __rpc_add_wait_queue_priority(queue, task, queue_priority);
0205 else
0206 list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
0207 task->tk_waitqueue = queue;
0208 queue->qlen++;
0209
0210 smp_wmb();
0211 rpc_set_queued(task);
0212 }
0213
0214
0215
0216
0217 static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
0218 {
0219 __rpc_list_dequeue_task(task);
0220 }
0221
0222
0223
0224
0225
0226 static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
0227 {
0228 __rpc_disable_timer(queue, task);
0229 if (RPC_IS_PRIORITY(queue))
0230 __rpc_remove_wait_queue_priority(task);
0231 else
0232 list_del(&task->u.tk_wait.list);
0233 queue->qlen--;
0234 }
0235
0236 static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
0237 {
0238 int i;
0239
0240 spin_lock_init(&queue->lock);
0241 for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
0242 INIT_LIST_HEAD(&queue->tasks[i]);
0243 queue->maxpriority = nr_queues - 1;
0244 rpc_reset_waitqueue_priority(queue);
0245 queue->qlen = 0;
0246 queue->timer_list.expires = 0;
0247 INIT_DELAYED_WORK(&queue->timer_list.dwork, __rpc_queue_timer_fn);
0248 INIT_LIST_HEAD(&queue->timer_list.list);
0249 rpc_assign_waitqueue_name(queue, qname);
0250 }
0251
0252 void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
0253 {
0254 __rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
0255 }
0256 EXPORT_SYMBOL_GPL(rpc_init_priority_wait_queue);
0257
0258 void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
0259 {
0260 __rpc_init_priority_wait_queue(queue, qname, 1);
0261 }
0262 EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
0263
0264 void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
0265 {
0266 cancel_delayed_work_sync(&queue->timer_list.dwork);
0267 }
0268 EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
0269
0270 static int rpc_wait_bit_killable(struct wait_bit_key *key, int mode)
0271 {
0272 freezable_schedule_unsafe();
0273 if (signal_pending_state(mode, current))
0274 return -ERESTARTSYS;
0275 return 0;
0276 }
0277
0278 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) || IS_ENABLED(CONFIG_TRACEPOINTS)
0279 static void rpc_task_set_debuginfo(struct rpc_task *task)
0280 {
0281 struct rpc_clnt *clnt = task->tk_client;
0282
0283
0284 if (!clnt) {
0285 static atomic_t rpc_pid;
0286
0287 task->tk_pid = atomic_inc_return(&rpc_pid);
0288 return;
0289 }
0290
0291 task->tk_pid = atomic_inc_return(&clnt->cl_pid);
0292 }
0293 #else
0294 static inline void rpc_task_set_debuginfo(struct rpc_task *task)
0295 {
0296 }
0297 #endif
0298
0299 static void rpc_set_active(struct rpc_task *task)
0300 {
0301 rpc_task_set_debuginfo(task);
0302 set_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
0303 trace_rpc_task_begin(task, NULL);
0304 }
0305
0306
0307
0308
0309
0310 static int rpc_complete_task(struct rpc_task *task)
0311 {
0312 void *m = &task->tk_runstate;
0313 wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
0314 struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
0315 unsigned long flags;
0316 int ret;
0317
0318 trace_rpc_task_complete(task, NULL);
0319
0320 spin_lock_irqsave(&wq->lock, flags);
0321 clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
0322 ret = atomic_dec_and_test(&task->tk_count);
0323 if (waitqueue_active(wq))
0324 __wake_up_locked_key(wq, TASK_NORMAL, &k);
0325 spin_unlock_irqrestore(&wq->lock, flags);
0326 return ret;
0327 }
0328
0329
0330
0331
0332
0333
0334
0335
0336 int __rpc_wait_for_completion_task(struct rpc_task *task, wait_bit_action_f *action)
0337 {
0338 if (action == NULL)
0339 action = rpc_wait_bit_killable;
0340 return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
0341 action, TASK_KILLABLE);
0342 }
0343 EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
0344
0345
0346
0347
0348
0349
0350
0351
0352
0353
0354
0355
0356 static void rpc_make_runnable(struct workqueue_struct *wq,
0357 struct rpc_task *task)
0358 {
0359 bool need_wakeup = !rpc_test_and_set_running(task);
0360
0361 rpc_clear_queued(task);
0362 if (!need_wakeup)
0363 return;
0364 if (RPC_IS_ASYNC(task)) {
0365 INIT_WORK(&task->u.tk_work, rpc_async_schedule);
0366 queue_work(wq, &task->u.tk_work);
0367 } else
0368 wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
0369 }
0370
0371
0372
0373
0374
0375
0376
0377 static void __rpc_do_sleep_on_priority(struct rpc_wait_queue *q,
0378 struct rpc_task *task,
0379 unsigned char queue_priority)
0380 {
0381 trace_rpc_task_sleep(task, q);
0382
0383 __rpc_add_wait_queue(q, task, queue_priority);
0384 }
0385
0386 static void __rpc_sleep_on_priority(struct rpc_wait_queue *q,
0387 struct rpc_task *task,
0388 unsigned char queue_priority)
0389 {
0390 if (WARN_ON_ONCE(RPC_IS_QUEUED(task)))
0391 return;
0392 __rpc_do_sleep_on_priority(q, task, queue_priority);
0393 }
0394
0395 static void __rpc_sleep_on_priority_timeout(struct rpc_wait_queue *q,
0396 struct rpc_task *task, unsigned long timeout,
0397 unsigned char queue_priority)
0398 {
0399 if (WARN_ON_ONCE(RPC_IS_QUEUED(task)))
0400 return;
0401 if (time_is_after_jiffies(timeout)) {
0402 __rpc_do_sleep_on_priority(q, task, queue_priority);
0403 __rpc_add_timer(q, task, timeout);
0404 } else
0405 task->tk_status = -ETIMEDOUT;
0406 }
0407
0408 static void rpc_set_tk_callback(struct rpc_task *task, rpc_action action)
0409 {
0410 if (action && !WARN_ON_ONCE(task->tk_callback != NULL))
0411 task->tk_callback = action;
0412 }
0413
0414 static bool rpc_sleep_check_activated(struct rpc_task *task)
0415 {
0416
0417 if (WARN_ON_ONCE(!RPC_IS_ACTIVATED(task))) {
0418 task->tk_status = -EIO;
0419 rpc_put_task_async(task);
0420 return false;
0421 }
0422 return true;
0423 }
0424
0425 void rpc_sleep_on_timeout(struct rpc_wait_queue *q, struct rpc_task *task,
0426 rpc_action action, unsigned long timeout)
0427 {
0428 if (!rpc_sleep_check_activated(task))
0429 return;
0430
0431 rpc_set_tk_callback(task, action);
0432
0433
0434
0435
0436 spin_lock(&q->lock);
0437 __rpc_sleep_on_priority_timeout(q, task, timeout, task->tk_priority);
0438 spin_unlock(&q->lock);
0439 }
0440 EXPORT_SYMBOL_GPL(rpc_sleep_on_timeout);
0441
0442 void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
0443 rpc_action action)
0444 {
0445 if (!rpc_sleep_check_activated(task))
0446 return;
0447
0448 rpc_set_tk_callback(task, action);
0449
0450 WARN_ON_ONCE(task->tk_timeout != 0);
0451
0452
0453
0454 spin_lock(&q->lock);
0455 __rpc_sleep_on_priority(q, task, task->tk_priority);
0456 spin_unlock(&q->lock);
0457 }
0458 EXPORT_SYMBOL_GPL(rpc_sleep_on);
0459
0460 void rpc_sleep_on_priority_timeout(struct rpc_wait_queue *q,
0461 struct rpc_task *task, unsigned long timeout, int priority)
0462 {
0463 if (!rpc_sleep_check_activated(task))
0464 return;
0465
0466 priority -= RPC_PRIORITY_LOW;
0467
0468
0469
0470 spin_lock(&q->lock);
0471 __rpc_sleep_on_priority_timeout(q, task, timeout, priority);
0472 spin_unlock(&q->lock);
0473 }
0474 EXPORT_SYMBOL_GPL(rpc_sleep_on_priority_timeout);
0475
0476 void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
0477 int priority)
0478 {
0479 if (!rpc_sleep_check_activated(task))
0480 return;
0481
0482 WARN_ON_ONCE(task->tk_timeout != 0);
0483 priority -= RPC_PRIORITY_LOW;
0484
0485
0486
0487 spin_lock(&q->lock);
0488 __rpc_sleep_on_priority(q, task, priority);
0489 spin_unlock(&q->lock);
0490 }
0491 EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);
0492
0493
0494
0495
0496
0497
0498
0499
0500
0501 static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq,
0502 struct rpc_wait_queue *queue,
0503 struct rpc_task *task)
0504 {
0505
0506 if (!RPC_IS_ACTIVATED(task)) {
0507 printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
0508 return;
0509 }
0510
0511 trace_rpc_task_wakeup(task, queue);
0512
0513 __rpc_remove_wait_queue(queue, task);
0514
0515 rpc_make_runnable(wq, task);
0516 }
0517
0518
0519
0520
0521 static struct rpc_task *
0522 rpc_wake_up_task_on_wq_queue_action_locked(struct workqueue_struct *wq,
0523 struct rpc_wait_queue *queue, struct rpc_task *task,
0524 bool (*action)(struct rpc_task *, void *), void *data)
0525 {
0526 if (RPC_IS_QUEUED(task)) {
0527 smp_rmb();
0528 if (task->tk_waitqueue == queue) {
0529 if (action == NULL || action(task, data)) {
0530 __rpc_do_wake_up_task_on_wq(wq, queue, task);
0531 return task;
0532 }
0533 }
0534 }
0535 return NULL;
0536 }
0537
0538
0539
0540
0541 static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue,
0542 struct rpc_task *task)
0543 {
0544 rpc_wake_up_task_on_wq_queue_action_locked(rpciod_workqueue, queue,
0545 task, NULL, NULL);
0546 }
0547
0548
0549
0550
0551 void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
0552 {
0553 if (!RPC_IS_QUEUED(task))
0554 return;
0555 spin_lock(&queue->lock);
0556 rpc_wake_up_task_queue_locked(queue, task);
0557 spin_unlock(&queue->lock);
0558 }
0559 EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
0560
0561 static bool rpc_task_action_set_status(struct rpc_task *task, void *status)
0562 {
0563 task->tk_status = *(int *)status;
0564 return true;
0565 }
0566
0567 static void
0568 rpc_wake_up_task_queue_set_status_locked(struct rpc_wait_queue *queue,
0569 struct rpc_task *task, int status)
0570 {
0571 rpc_wake_up_task_on_wq_queue_action_locked(rpciod_workqueue, queue,
0572 task, rpc_task_action_set_status, &status);
0573 }
0574
0575
0576
0577
0578
0579
0580
0581
0582
0583
0584 void
0585 rpc_wake_up_queued_task_set_status(struct rpc_wait_queue *queue,
0586 struct rpc_task *task, int status)
0587 {
0588 if (!RPC_IS_QUEUED(task))
0589 return;
0590 spin_lock(&queue->lock);
0591 rpc_wake_up_task_queue_set_status_locked(queue, task, status);
0592 spin_unlock(&queue->lock);
0593 }
0594
0595
0596
0597
0598 static struct rpc_task *__rpc_find_next_queued_priority(struct rpc_wait_queue *queue)
0599 {
0600 struct list_head *q;
0601 struct rpc_task *task;
0602
0603
0604
0605
0606 q = &queue->tasks[RPC_NR_PRIORITY - 1];
0607 if (queue->maxpriority > RPC_PRIORITY_PRIVILEGED && !list_empty(q)) {
0608 task = list_first_entry(q, struct rpc_task, u.tk_wait.list);
0609 goto out;
0610 }
0611
0612
0613
0614
0615 q = &queue->tasks[queue->priority];
0616 if (!list_empty(q) && queue->nr) {
0617 queue->nr--;
0618 task = list_first_entry(q, struct rpc_task, u.tk_wait.list);
0619 goto out;
0620 }
0621
0622
0623
0624
0625 do {
0626 if (q == &queue->tasks[0])
0627 q = &queue->tasks[queue->maxpriority];
0628 else
0629 q = q - 1;
0630 if (!list_empty(q)) {
0631 task = list_first_entry(q, struct rpc_task, u.tk_wait.list);
0632 goto new_queue;
0633 }
0634 } while (q != &queue->tasks[queue->priority]);
0635
0636 rpc_reset_waitqueue_priority(queue);
0637 return NULL;
0638
0639 new_queue:
0640 rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
0641 out:
0642 return task;
0643 }
0644
0645 static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue)
0646 {
0647 if (RPC_IS_PRIORITY(queue))
0648 return __rpc_find_next_queued_priority(queue);
0649 if (!list_empty(&queue->tasks[0]))
0650 return list_first_entry(&queue->tasks[0], struct rpc_task, u.tk_wait.list);
0651 return NULL;
0652 }
0653
0654
0655
0656
0657 struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
0658 struct rpc_wait_queue *queue,
0659 bool (*func)(struct rpc_task *, void *), void *data)
0660 {
0661 struct rpc_task *task = NULL;
0662
0663 spin_lock(&queue->lock);
0664 task = __rpc_find_next_queued(queue);
0665 if (task != NULL)
0666 task = rpc_wake_up_task_on_wq_queue_action_locked(wq, queue,
0667 task, func, data);
0668 spin_unlock(&queue->lock);
0669
0670 return task;
0671 }
0672
0673
0674
0675
0676 struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
0677 bool (*func)(struct rpc_task *, void *), void *data)
0678 {
0679 return rpc_wake_up_first_on_wq(rpciod_workqueue, queue, func, data);
0680 }
0681 EXPORT_SYMBOL_GPL(rpc_wake_up_first);
0682
0683 static bool rpc_wake_up_next_func(struct rpc_task *task, void *data)
0684 {
0685 return true;
0686 }
0687
0688
0689
0690
0691 struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *queue)
0692 {
0693 return rpc_wake_up_first(queue, rpc_wake_up_next_func, NULL);
0694 }
0695 EXPORT_SYMBOL_GPL(rpc_wake_up_next);
0696
0697
0698
0699
0700
0701
0702 static void rpc_wake_up_locked(struct rpc_wait_queue *queue)
0703 {
0704 struct rpc_task *task;
0705
0706 for (;;) {
0707 task = __rpc_find_next_queued(queue);
0708 if (task == NULL)
0709 break;
0710 rpc_wake_up_task_queue_locked(queue, task);
0711 }
0712 }
0713
0714
0715
0716
0717
0718
0719
0720 void rpc_wake_up(struct rpc_wait_queue *queue)
0721 {
0722 spin_lock(&queue->lock);
0723 rpc_wake_up_locked(queue);
0724 spin_unlock(&queue->lock);
0725 }
0726 EXPORT_SYMBOL_GPL(rpc_wake_up);
0727
0728
0729
0730
0731
0732
0733 static void rpc_wake_up_status_locked(struct rpc_wait_queue *queue, int status)
0734 {
0735 struct rpc_task *task;
0736
0737 for (;;) {
0738 task = __rpc_find_next_queued(queue);
0739 if (task == NULL)
0740 break;
0741 rpc_wake_up_task_queue_set_status_locked(queue, task, status);
0742 }
0743 }
0744
0745
0746
0747
0748
0749
0750
0751
0752 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
0753 {
0754 spin_lock(&queue->lock);
0755 rpc_wake_up_status_locked(queue, status);
0756 spin_unlock(&queue->lock);
0757 }
0758 EXPORT_SYMBOL_GPL(rpc_wake_up_status);
0759
0760 static void __rpc_queue_timer_fn(struct work_struct *work)
0761 {
0762 struct rpc_wait_queue *queue = container_of(work,
0763 struct rpc_wait_queue,
0764 timer_list.dwork.work);
0765 struct rpc_task *task, *n;
0766 unsigned long expires, now, timeo;
0767
0768 spin_lock(&queue->lock);
0769 expires = now = jiffies;
0770 list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
0771 timeo = task->tk_timeout;
0772 if (time_after_eq(now, timeo)) {
0773 trace_rpc_task_timeout(task, task->tk_action);
0774 task->tk_status = -ETIMEDOUT;
0775 rpc_wake_up_task_queue_locked(queue, task);
0776 continue;
0777 }
0778 if (expires == now || time_after(expires, timeo))
0779 expires = timeo;
0780 }
0781 if (!list_empty(&queue->timer_list.list))
0782 rpc_set_queue_timer(queue, expires);
0783 spin_unlock(&queue->lock);
0784 }
0785
0786 static void __rpc_atrun(struct rpc_task *task)
0787 {
0788 if (task->tk_status == -ETIMEDOUT)
0789 task->tk_status = 0;
0790 }
0791
0792
0793
0794
0795 void rpc_delay(struct rpc_task *task, unsigned long delay)
0796 {
0797 rpc_sleep_on_timeout(&delay_queue, task, __rpc_atrun, jiffies + delay);
0798 }
0799 EXPORT_SYMBOL_GPL(rpc_delay);
0800
0801
0802
0803
0804 void rpc_prepare_task(struct rpc_task *task)
0805 {
0806 task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
0807 }
0808
0809 static void
0810 rpc_init_task_statistics(struct rpc_task *task)
0811 {
0812
0813 task->tk_garb_retry = 2;
0814 task->tk_cred_retry = 2;
0815 task->tk_rebind_retry = 2;
0816
0817
0818 task->tk_start = ktime_get();
0819 }
0820
0821 static void
0822 rpc_reset_task_statistics(struct rpc_task *task)
0823 {
0824 task->tk_timeouts = 0;
0825 task->tk_flags &= ~(RPC_CALL_MAJORSEEN|RPC_TASK_SENT);
0826 rpc_init_task_statistics(task);
0827 }
0828
0829
0830
0831
0832 void rpc_exit_task(struct rpc_task *task)
0833 {
0834 trace_rpc_task_end(task, task->tk_action);
0835 task->tk_action = NULL;
0836 if (task->tk_ops->rpc_count_stats)
0837 task->tk_ops->rpc_count_stats(task, task->tk_calldata);
0838 else if (task->tk_client)
0839 rpc_count_iostats(task, task->tk_client->cl_metrics);
0840 if (task->tk_ops->rpc_call_done != NULL) {
0841 trace_rpc_task_call_done(task, task->tk_ops->rpc_call_done);
0842 task->tk_ops->rpc_call_done(task, task->tk_calldata);
0843 if (task->tk_action != NULL) {
0844
0845 xprt_release(task);
0846 rpc_reset_task_statistics(task);
0847 }
0848 }
0849 }
0850
0851 void rpc_signal_task(struct rpc_task *task)
0852 {
0853 struct rpc_wait_queue *queue;
0854
0855 if (!RPC_IS_ACTIVATED(task))
0856 return;
0857
0858 trace_rpc_task_signalled(task, task->tk_action);
0859 set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate);
0860 smp_mb__after_atomic();
0861 queue = READ_ONCE(task->tk_waitqueue);
0862 if (queue)
0863 rpc_wake_up_queued_task_set_status(queue, task, -ERESTARTSYS);
0864 }
0865
0866 void rpc_exit(struct rpc_task *task, int status)
0867 {
0868 task->tk_status = status;
0869 task->tk_action = rpc_exit_task;
0870 rpc_wake_up_queued_task(task->tk_waitqueue, task);
0871 }
0872 EXPORT_SYMBOL_GPL(rpc_exit);
0873
0874 void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
0875 {
0876 if (ops->rpc_release != NULL)
0877 ops->rpc_release(calldata);
0878 }
0879
0880 static bool xprt_needs_memalloc(struct rpc_xprt *xprt, struct rpc_task *tk)
0881 {
0882 if (!xprt)
0883 return false;
0884 if (!atomic_read(&xprt->swapper))
0885 return false;
0886 return test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == tk;
0887 }
0888
0889
0890
0891
0892 static void __rpc_execute(struct rpc_task *task)
0893 {
0894 struct rpc_wait_queue *queue;
0895 int task_is_async = RPC_IS_ASYNC(task);
0896 int status = 0;
0897 unsigned long pflags = current->flags;
0898
0899 WARN_ON_ONCE(RPC_IS_QUEUED(task));
0900 if (RPC_IS_QUEUED(task))
0901 return;
0902
0903 for (;;) {
0904 void (*do_action)(struct rpc_task *);
0905
0906
0907
0908
0909
0910
0911
0912
0913 do_action = task->tk_action;
0914 if (task->tk_callback) {
0915 do_action = task->tk_callback;
0916 task->tk_callback = NULL;
0917 }
0918 if (!do_action)
0919 break;
0920 if (RPC_IS_SWAPPER(task) ||
0921 xprt_needs_memalloc(task->tk_xprt, task))
0922 current->flags |= PF_MEMALLOC;
0923
0924 trace_rpc_task_run_action(task, do_action);
0925 do_action(task);
0926
0927
0928
0929
0930 if (!RPC_IS_QUEUED(task)) {
0931 cond_resched();
0932 continue;
0933 }
0934
0935
0936
0937
0938 if (RPC_SIGNALLED(task)) {
0939 task->tk_rpc_status = -ERESTARTSYS;
0940 rpc_exit(task, -ERESTARTSYS);
0941 }
0942
0943
0944
0945
0946
0947
0948
0949
0950
0951
0952 queue = task->tk_waitqueue;
0953 spin_lock(&queue->lock);
0954 if (!RPC_IS_QUEUED(task)) {
0955 spin_unlock(&queue->lock);
0956 continue;
0957 }
0958 rpc_clear_running(task);
0959 spin_unlock(&queue->lock);
0960 if (task_is_async)
0961 goto out;
0962
0963
0964 trace_rpc_task_sync_sleep(task, task->tk_action);
0965 status = out_of_line_wait_on_bit(&task->tk_runstate,
0966 RPC_TASK_QUEUED, rpc_wait_bit_killable,
0967 TASK_KILLABLE);
0968 if (status < 0) {
0969
0970
0971
0972
0973
0974
0975 trace_rpc_task_signalled(task, task->tk_action);
0976 set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate);
0977 task->tk_rpc_status = -ERESTARTSYS;
0978 rpc_exit(task, -ERESTARTSYS);
0979 }
0980 trace_rpc_task_sync_wake(task, task->tk_action);
0981 }
0982
0983
0984 rpc_release_task(task);
0985 out:
0986 current_restore_flags(pflags, PF_MEMALLOC);
0987 }
0988
0989
0990
0991
0992
0993
0994
0995
0996
0997
0998 void rpc_execute(struct rpc_task *task)
0999 {
1000 bool is_async = RPC_IS_ASYNC(task);
1001
1002 rpc_set_active(task);
1003 rpc_make_runnable(rpciod_workqueue, task);
1004 if (!is_async) {
1005 unsigned int pflags = memalloc_nofs_save();
1006 __rpc_execute(task);
1007 memalloc_nofs_restore(pflags);
1008 }
1009 }
1010
1011 static void rpc_async_schedule(struct work_struct *work)
1012 {
1013 unsigned int pflags = memalloc_nofs_save();
1014
1015 __rpc_execute(container_of(work, struct rpc_task, u.tk_work));
1016 memalloc_nofs_restore(pflags);
1017 }
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036 int rpc_malloc(struct rpc_task *task)
1037 {
1038 struct rpc_rqst *rqst = task->tk_rqstp;
1039 size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
1040 struct rpc_buffer *buf;
1041 gfp_t gfp = rpc_task_gfp_mask();
1042
1043 size += sizeof(struct rpc_buffer);
1044 if (size <= RPC_BUFFER_MAXSIZE) {
1045 buf = kmem_cache_alloc(rpc_buffer_slabp, gfp);
1046
1047 if (!buf && RPC_IS_ASYNC(task))
1048 buf = mempool_alloc(rpc_buffer_mempool, GFP_NOWAIT);
1049 } else
1050 buf = kmalloc(size, gfp);
1051
1052 if (!buf)
1053 return -ENOMEM;
1054
1055 buf->len = size;
1056 rqst->rq_buffer = buf->data;
1057 rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_callsize;
1058 return 0;
1059 }
1060 EXPORT_SYMBOL_GPL(rpc_malloc);
1061
1062
1063
1064
1065
1066
1067 void rpc_free(struct rpc_task *task)
1068 {
1069 void *buffer = task->tk_rqstp->rq_buffer;
1070 size_t size;
1071 struct rpc_buffer *buf;
1072
1073 buf = container_of(buffer, struct rpc_buffer, data);
1074 size = buf->len;
1075
1076 if (size <= RPC_BUFFER_MAXSIZE)
1077 mempool_free(buf, rpc_buffer_mempool);
1078 else
1079 kfree(buf);
1080 }
1081 EXPORT_SYMBOL_GPL(rpc_free);
1082
1083
1084
1085
1086 static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
1087 {
1088 memset(task, 0, sizeof(*task));
1089 atomic_set(&task->tk_count, 1);
1090 task->tk_flags = task_setup_data->flags;
1091 task->tk_ops = task_setup_data->callback_ops;
1092 task->tk_calldata = task_setup_data->callback_data;
1093 INIT_LIST_HEAD(&task->tk_task);
1094
1095 task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW;
1096 task->tk_owner = current->tgid;
1097
1098
1099 task->tk_workqueue = task_setup_data->workqueue;
1100
1101 task->tk_xprt = rpc_task_get_xprt(task_setup_data->rpc_client,
1102 xprt_get(task_setup_data->rpc_xprt));
1103
1104 task->tk_op_cred = get_rpccred(task_setup_data->rpc_op_cred);
1105
1106 if (task->tk_ops->rpc_call_prepare != NULL)
1107 task->tk_action = rpc_prepare_task;
1108
1109 rpc_init_task_statistics(task);
1110 }
1111
1112 static struct rpc_task *rpc_alloc_task(void)
1113 {
1114 struct rpc_task *task;
1115
1116 task = kmem_cache_alloc(rpc_task_slabp, rpc_task_gfp_mask());
1117 if (task)
1118 return task;
1119 return mempool_alloc(rpc_task_mempool, GFP_NOWAIT);
1120 }
1121
1122
1123
1124
1125 struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
1126 {
1127 struct rpc_task *task = setup_data->task;
1128 unsigned short flags = 0;
1129
1130 if (task == NULL) {
1131 task = rpc_alloc_task();
1132 if (task == NULL) {
1133 rpc_release_calldata(setup_data->callback_ops,
1134 setup_data->callback_data);
1135 return ERR_PTR(-ENOMEM);
1136 }
1137 flags = RPC_TASK_DYNAMIC;
1138 }
1139
1140 rpc_init_task(task, setup_data);
1141 task->tk_flags |= flags;
1142 return task;
1143 }
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164 static void rpc_free_task(struct rpc_task *task)
1165 {
1166 unsigned short tk_flags = task->tk_flags;
1167
1168 put_rpccred(task->tk_op_cred);
1169 rpc_release_calldata(task->tk_ops, task->tk_calldata);
1170
1171 if (tk_flags & RPC_TASK_DYNAMIC)
1172 mempool_free(task, rpc_task_mempool);
1173 }
1174
1175 static void rpc_async_release(struct work_struct *work)
1176 {
1177 unsigned int pflags = memalloc_nofs_save();
1178
1179 rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
1180 memalloc_nofs_restore(pflags);
1181 }
1182
1183 static void rpc_release_resources_task(struct rpc_task *task)
1184 {
1185 xprt_release(task);
1186 if (task->tk_msg.rpc_cred) {
1187 if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
1188 put_cred(task->tk_msg.rpc_cred);
1189 task->tk_msg.rpc_cred = NULL;
1190 }
1191 rpc_task_release_client(task);
1192 }
1193
1194 static void rpc_final_put_task(struct rpc_task *task,
1195 struct workqueue_struct *q)
1196 {
1197 if (q != NULL) {
1198 INIT_WORK(&task->u.tk_work, rpc_async_release);
1199 queue_work(q, &task->u.tk_work);
1200 } else
1201 rpc_free_task(task);
1202 }
1203
1204 static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
1205 {
1206 if (atomic_dec_and_test(&task->tk_count)) {
1207 rpc_release_resources_task(task);
1208 rpc_final_put_task(task, q);
1209 }
1210 }
1211
1212 void rpc_put_task(struct rpc_task *task)
1213 {
1214 rpc_do_put_task(task, NULL);
1215 }
1216 EXPORT_SYMBOL_GPL(rpc_put_task);
1217
1218 void rpc_put_task_async(struct rpc_task *task)
1219 {
1220 rpc_do_put_task(task, task->tk_workqueue);
1221 }
1222 EXPORT_SYMBOL_GPL(rpc_put_task_async);
1223
1224 static void rpc_release_task(struct rpc_task *task)
1225 {
1226 WARN_ON_ONCE(RPC_IS_QUEUED(task));
1227
1228 rpc_release_resources_task(task);
1229
1230
1231
1232
1233
1234
1235 if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
1236
1237 if (!rpc_complete_task(task))
1238 return;
1239 } else {
1240 if (!atomic_dec_and_test(&task->tk_count))
1241 return;
1242 }
1243 rpc_final_put_task(task, task->tk_workqueue);
1244 }
1245
1246 int rpciod_up(void)
1247 {
1248 return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
1249 }
1250
1251 void rpciod_down(void)
1252 {
1253 module_put(THIS_MODULE);
1254 }
1255
1256
1257
1258
1259 static int rpciod_start(void)
1260 {
1261 struct workqueue_struct *wq;
1262
1263
1264
1265
1266 wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_UNBOUND, 0);
1267 if (!wq)
1268 goto out_failed;
1269 rpciod_workqueue = wq;
1270 wq = alloc_workqueue("xprtiod", WQ_UNBOUND | WQ_MEM_RECLAIM, 0);
1271 if (!wq)
1272 goto free_rpciod;
1273 xprtiod_workqueue = wq;
1274 return 1;
1275 free_rpciod:
1276 wq = rpciod_workqueue;
1277 rpciod_workqueue = NULL;
1278 destroy_workqueue(wq);
1279 out_failed:
1280 return 0;
1281 }
1282
1283 static void rpciod_stop(void)
1284 {
1285 struct workqueue_struct *wq = NULL;
1286
1287 if (rpciod_workqueue == NULL)
1288 return;
1289
1290 wq = rpciod_workqueue;
1291 rpciod_workqueue = NULL;
1292 destroy_workqueue(wq);
1293 wq = xprtiod_workqueue;
1294 xprtiod_workqueue = NULL;
1295 destroy_workqueue(wq);
1296 }
1297
1298 void
1299 rpc_destroy_mempool(void)
1300 {
1301 rpciod_stop();
1302 mempool_destroy(rpc_buffer_mempool);
1303 mempool_destroy(rpc_task_mempool);
1304 kmem_cache_destroy(rpc_task_slabp);
1305 kmem_cache_destroy(rpc_buffer_slabp);
1306 rpc_destroy_wait_queue(&delay_queue);
1307 }
1308
1309 int
1310 rpc_init_mempool(void)
1311 {
1312
1313
1314
1315
1316 rpc_init_wait_queue(&delay_queue, "delayq");
1317 if (!rpciod_start())
1318 goto err_nomem;
1319
1320 rpc_task_slabp = kmem_cache_create("rpc_tasks",
1321 sizeof(struct rpc_task),
1322 0, SLAB_HWCACHE_ALIGN,
1323 NULL);
1324 if (!rpc_task_slabp)
1325 goto err_nomem;
1326 rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
1327 RPC_BUFFER_MAXSIZE,
1328 0, SLAB_HWCACHE_ALIGN,
1329 NULL);
1330 if (!rpc_buffer_slabp)
1331 goto err_nomem;
1332 rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
1333 rpc_task_slabp);
1334 if (!rpc_task_mempool)
1335 goto err_nomem;
1336 rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
1337 rpc_buffer_slabp);
1338 if (!rpc_buffer_mempool)
1339 goto err_nomem;
1340 return 0;
1341 err_nomem:
1342 rpc_destroy_mempool();
1343 return -ENOMEM;
1344 }