Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-only
0002 /*
0003  * linux/net/sunrpc/sched.c
0004  *
0005  * Scheduling for synchronous and asynchronous RPC requests.
0006  *
0007  * Copyright (C) 1996 Olaf Kirch, <okir@monad.swb.de>
0008  *
0009  * TCP NFS related read + write fixes
0010  * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
0011  */
0012 
0013 #include <linux/module.h>
0014 
0015 #include <linux/sched.h>
0016 #include <linux/interrupt.h>
0017 #include <linux/slab.h>
0018 #include <linux/mempool.h>
0019 #include <linux/smp.h>
0020 #include <linux/spinlock.h>
0021 #include <linux/mutex.h>
0022 #include <linux/freezer.h>
0023 #include <linux/sched/mm.h>
0024 
0025 #include <linux/sunrpc/clnt.h>
0026 #include <linux/sunrpc/metrics.h>
0027 
0028 #include "sunrpc.h"
0029 
0030 #define CREATE_TRACE_POINTS
0031 #include <trace/events/sunrpc.h>
0032 
0033 /*
0034  * RPC slabs and memory pools
0035  */
0036 #define RPC_BUFFER_MAXSIZE  (2048)
0037 #define RPC_BUFFER_POOLSIZE (8)
0038 #define RPC_TASK_POOLSIZE   (8)
0039 static struct kmem_cache    *rpc_task_slabp __read_mostly;
0040 static struct kmem_cache    *rpc_buffer_slabp __read_mostly;
0041 static mempool_t    *rpc_task_mempool __read_mostly;
0042 static mempool_t    *rpc_buffer_mempool __read_mostly;
0043 
0044 static void         rpc_async_schedule(struct work_struct *);
0045 static void          rpc_release_task(struct rpc_task *task);
0046 static void __rpc_queue_timer_fn(struct work_struct *);
0047 
0048 /*
0049  * RPC tasks sit here while waiting for conditions to improve.
0050  */
0051 static struct rpc_wait_queue delay_queue;
0052 
0053 /*
0054  * rpciod-related stuff
0055  */
0056 struct workqueue_struct *rpciod_workqueue __read_mostly;
0057 struct workqueue_struct *xprtiod_workqueue __read_mostly;
0058 EXPORT_SYMBOL_GPL(xprtiod_workqueue);
0059 
0060 gfp_t rpc_task_gfp_mask(void)
0061 {
0062     if (current->flags & PF_WQ_WORKER)
0063         return GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN;
0064     return GFP_KERNEL;
0065 }
0066 EXPORT_SYMBOL_GPL(rpc_task_gfp_mask);
0067 
0068 unsigned long
0069 rpc_task_timeout(const struct rpc_task *task)
0070 {
0071     unsigned long timeout = READ_ONCE(task->tk_timeout);
0072 
0073     if (timeout != 0) {
0074         unsigned long now = jiffies;
0075         if (time_before(now, timeout))
0076             return timeout - now;
0077     }
0078     return 0;
0079 }
0080 EXPORT_SYMBOL_GPL(rpc_task_timeout);
0081 
0082 /*
0083  * Disable the timer for a given RPC task. Should be called with
0084  * queue->lock and bh_disabled in order to avoid races within
0085  * rpc_run_timer().
0086  */
0087 static void
0088 __rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
0089 {
0090     if (list_empty(&task->u.tk_wait.timer_list))
0091         return;
0092     task->tk_timeout = 0;
0093     list_del(&task->u.tk_wait.timer_list);
0094     if (list_empty(&queue->timer_list.list))
0095         cancel_delayed_work(&queue->timer_list.dwork);
0096 }
0097 
0098 static void
0099 rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
0100 {
0101     unsigned long now = jiffies;
0102     queue->timer_list.expires = expires;
0103     if (time_before_eq(expires, now))
0104         expires = 0;
0105     else
0106         expires -= now;
0107     mod_delayed_work(rpciod_workqueue, &queue->timer_list.dwork, expires);
0108 }
0109 
0110 /*
0111  * Set up a timer for the current task.
0112  */
0113 static void
0114 __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task,
0115         unsigned long timeout)
0116 {
0117     task->tk_timeout = timeout;
0118     if (list_empty(&queue->timer_list.list) || time_before(timeout, queue->timer_list.expires))
0119         rpc_set_queue_timer(queue, timeout);
0120     list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
0121 }
0122 
0123 static void rpc_set_waitqueue_priority(struct rpc_wait_queue *queue, int priority)
0124 {
0125     if (queue->priority != priority) {
0126         queue->priority = priority;
0127         queue->nr = 1U << priority;
0128     }
0129 }
0130 
0131 static void rpc_reset_waitqueue_priority(struct rpc_wait_queue *queue)
0132 {
0133     rpc_set_waitqueue_priority(queue, queue->maxpriority);
0134 }
0135 
0136 /*
0137  * Add a request to a queue list
0138  */
0139 static void
0140 __rpc_list_enqueue_task(struct list_head *q, struct rpc_task *task)
0141 {
0142     struct rpc_task *t;
0143 
0144     list_for_each_entry(t, q, u.tk_wait.list) {
0145         if (t->tk_owner == task->tk_owner) {
0146             list_add_tail(&task->u.tk_wait.links,
0147                     &t->u.tk_wait.links);
0148             /* Cache the queue head in task->u.tk_wait.list */
0149             task->u.tk_wait.list.next = q;
0150             task->u.tk_wait.list.prev = NULL;
0151             return;
0152         }
0153     }
0154     INIT_LIST_HEAD(&task->u.tk_wait.links);
0155     list_add_tail(&task->u.tk_wait.list, q);
0156 }
0157 
0158 /*
0159  * Remove request from a queue list
0160  */
0161 static void
0162 __rpc_list_dequeue_task(struct rpc_task *task)
0163 {
0164     struct list_head *q;
0165     struct rpc_task *t;
0166 
0167     if (task->u.tk_wait.list.prev == NULL) {
0168         list_del(&task->u.tk_wait.links);
0169         return;
0170     }
0171     if (!list_empty(&task->u.tk_wait.links)) {
0172         t = list_first_entry(&task->u.tk_wait.links,
0173                 struct rpc_task,
0174                 u.tk_wait.links);
0175         /* Assume __rpc_list_enqueue_task() cached the queue head */
0176         q = t->u.tk_wait.list.next;
0177         list_add_tail(&t->u.tk_wait.list, q);
0178         list_del(&task->u.tk_wait.links);
0179     }
0180     list_del(&task->u.tk_wait.list);
0181 }
0182 
0183 /*
0184  * Add new request to a priority queue.
0185  */
0186 static void __rpc_add_wait_queue_priority(struct rpc_wait_queue *queue,
0187         struct rpc_task *task,
0188         unsigned char queue_priority)
0189 {
0190     if (unlikely(queue_priority > queue->maxpriority))
0191         queue_priority = queue->maxpriority;
0192     __rpc_list_enqueue_task(&queue->tasks[queue_priority], task);
0193 }
0194 
0195 /*
0196  * Add new request to wait queue.
0197  */
0198 static void __rpc_add_wait_queue(struct rpc_wait_queue *queue,
0199         struct rpc_task *task,
0200         unsigned char queue_priority)
0201 {
0202     INIT_LIST_HEAD(&task->u.tk_wait.timer_list);
0203     if (RPC_IS_PRIORITY(queue))
0204         __rpc_add_wait_queue_priority(queue, task, queue_priority);
0205     else
0206         list_add_tail(&task->u.tk_wait.list, &queue->tasks[0]);
0207     task->tk_waitqueue = queue;
0208     queue->qlen++;
0209     /* barrier matches the read in rpc_wake_up_task_queue_locked() */
0210     smp_wmb();
0211     rpc_set_queued(task);
0212 }
0213 
0214 /*
0215  * Remove request from a priority queue.
0216  */
0217 static void __rpc_remove_wait_queue_priority(struct rpc_task *task)
0218 {
0219     __rpc_list_dequeue_task(task);
0220 }
0221 
0222 /*
0223  * Remove request from queue.
0224  * Note: must be called with spin lock held.
0225  */
0226 static void __rpc_remove_wait_queue(struct rpc_wait_queue *queue, struct rpc_task *task)
0227 {
0228     __rpc_disable_timer(queue, task);
0229     if (RPC_IS_PRIORITY(queue))
0230         __rpc_remove_wait_queue_priority(task);
0231     else
0232         list_del(&task->u.tk_wait.list);
0233     queue->qlen--;
0234 }
0235 
0236 static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname, unsigned char nr_queues)
0237 {
0238     int i;
0239 
0240     spin_lock_init(&queue->lock);
0241     for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
0242         INIT_LIST_HEAD(&queue->tasks[i]);
0243     queue->maxpriority = nr_queues - 1;
0244     rpc_reset_waitqueue_priority(queue);
0245     queue->qlen = 0;
0246     queue->timer_list.expires = 0;
0247     INIT_DELAYED_WORK(&queue->timer_list.dwork, __rpc_queue_timer_fn);
0248     INIT_LIST_HEAD(&queue->timer_list.list);
0249     rpc_assign_waitqueue_name(queue, qname);
0250 }
0251 
0252 void rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const char *qname)
0253 {
0254     __rpc_init_priority_wait_queue(queue, qname, RPC_NR_PRIORITY);
0255 }
0256 EXPORT_SYMBOL_GPL(rpc_init_priority_wait_queue);
0257 
0258 void rpc_init_wait_queue(struct rpc_wait_queue *queue, const char *qname)
0259 {
0260     __rpc_init_priority_wait_queue(queue, qname, 1);
0261 }
0262 EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
0263 
0264 void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
0265 {
0266     cancel_delayed_work_sync(&queue->timer_list.dwork);
0267 }
0268 EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
0269 
0270 static int rpc_wait_bit_killable(struct wait_bit_key *key, int mode)
0271 {
0272     freezable_schedule_unsafe();
0273     if (signal_pending_state(mode, current))
0274         return -ERESTARTSYS;
0275     return 0;
0276 }
0277 
0278 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) || IS_ENABLED(CONFIG_TRACEPOINTS)
0279 static void rpc_task_set_debuginfo(struct rpc_task *task)
0280 {
0281     struct rpc_clnt *clnt = task->tk_client;
0282 
0283     /* Might be a task carrying a reverse-direction operation */
0284     if (!clnt) {
0285         static atomic_t rpc_pid;
0286 
0287         task->tk_pid = atomic_inc_return(&rpc_pid);
0288         return;
0289     }
0290 
0291     task->tk_pid = atomic_inc_return(&clnt->cl_pid);
0292 }
0293 #else
0294 static inline void rpc_task_set_debuginfo(struct rpc_task *task)
0295 {
0296 }
0297 #endif
0298 
0299 static void rpc_set_active(struct rpc_task *task)
0300 {
0301     rpc_task_set_debuginfo(task);
0302     set_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
0303     trace_rpc_task_begin(task, NULL);
0304 }
0305 
0306 /*
0307  * Mark an RPC call as having completed by clearing the 'active' bit
0308  * and then waking up all tasks that were sleeping.
0309  */
0310 static int rpc_complete_task(struct rpc_task *task)
0311 {
0312     void *m = &task->tk_runstate;
0313     wait_queue_head_t *wq = bit_waitqueue(m, RPC_TASK_ACTIVE);
0314     struct wait_bit_key k = __WAIT_BIT_KEY_INITIALIZER(m, RPC_TASK_ACTIVE);
0315     unsigned long flags;
0316     int ret;
0317 
0318     trace_rpc_task_complete(task, NULL);
0319 
0320     spin_lock_irqsave(&wq->lock, flags);
0321     clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
0322     ret = atomic_dec_and_test(&task->tk_count);
0323     if (waitqueue_active(wq))
0324         __wake_up_locked_key(wq, TASK_NORMAL, &k);
0325     spin_unlock_irqrestore(&wq->lock, flags);
0326     return ret;
0327 }
0328 
0329 /*
0330  * Allow callers to wait for completion of an RPC call
0331  *
0332  * Note the use of out_of_line_wait_on_bit() rather than wait_on_bit()
0333  * to enforce taking of the wq->lock and hence avoid races with
0334  * rpc_complete_task().
0335  */
0336 int __rpc_wait_for_completion_task(struct rpc_task *task, wait_bit_action_f *action)
0337 {
0338     if (action == NULL)
0339         action = rpc_wait_bit_killable;
0340     return out_of_line_wait_on_bit(&task->tk_runstate, RPC_TASK_ACTIVE,
0341             action, TASK_KILLABLE);
0342 }
0343 EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
0344 
0345 /*
0346  * Make an RPC task runnable.
0347  *
0348  * Note: If the task is ASYNC, and is being made runnable after sitting on an
0349  * rpc_wait_queue, this must be called with the queue spinlock held to protect
0350  * the wait queue operation.
0351  * Note the ordering of rpc_test_and_set_running() and rpc_clear_queued(),
0352  * which is needed to ensure that __rpc_execute() doesn't loop (due to the
0353  * lockless RPC_IS_QUEUED() test) before we've had a chance to test
0354  * the RPC_TASK_RUNNING flag.
0355  */
0356 static void rpc_make_runnable(struct workqueue_struct *wq,
0357         struct rpc_task *task)
0358 {
0359     bool need_wakeup = !rpc_test_and_set_running(task);
0360 
0361     rpc_clear_queued(task);
0362     if (!need_wakeup)
0363         return;
0364     if (RPC_IS_ASYNC(task)) {
0365         INIT_WORK(&task->u.tk_work, rpc_async_schedule);
0366         queue_work(wq, &task->u.tk_work);
0367     } else
0368         wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
0369 }
0370 
0371 /*
0372  * Prepare for sleeping on a wait queue.
0373  * By always appending tasks to the list we ensure FIFO behavior.
0374  * NB: An RPC task will only receive interrupt-driven events as long
0375  * as it's on a wait queue.
0376  */
0377 static void __rpc_do_sleep_on_priority(struct rpc_wait_queue *q,
0378         struct rpc_task *task,
0379         unsigned char queue_priority)
0380 {
0381     trace_rpc_task_sleep(task, q);
0382 
0383     __rpc_add_wait_queue(q, task, queue_priority);
0384 }
0385 
0386 static void __rpc_sleep_on_priority(struct rpc_wait_queue *q,
0387         struct rpc_task *task,
0388         unsigned char queue_priority)
0389 {
0390     if (WARN_ON_ONCE(RPC_IS_QUEUED(task)))
0391         return;
0392     __rpc_do_sleep_on_priority(q, task, queue_priority);
0393 }
0394 
0395 static void __rpc_sleep_on_priority_timeout(struct rpc_wait_queue *q,
0396         struct rpc_task *task, unsigned long timeout,
0397         unsigned char queue_priority)
0398 {
0399     if (WARN_ON_ONCE(RPC_IS_QUEUED(task)))
0400         return;
0401     if (time_is_after_jiffies(timeout)) {
0402         __rpc_do_sleep_on_priority(q, task, queue_priority);
0403         __rpc_add_timer(q, task, timeout);
0404     } else
0405         task->tk_status = -ETIMEDOUT;
0406 }
0407 
0408 static void rpc_set_tk_callback(struct rpc_task *task, rpc_action action)
0409 {
0410     if (action && !WARN_ON_ONCE(task->tk_callback != NULL))
0411         task->tk_callback = action;
0412 }
0413 
0414 static bool rpc_sleep_check_activated(struct rpc_task *task)
0415 {
0416     /* We shouldn't ever put an inactive task to sleep */
0417     if (WARN_ON_ONCE(!RPC_IS_ACTIVATED(task))) {
0418         task->tk_status = -EIO;
0419         rpc_put_task_async(task);
0420         return false;
0421     }
0422     return true;
0423 }
0424 
0425 void rpc_sleep_on_timeout(struct rpc_wait_queue *q, struct rpc_task *task,
0426                 rpc_action action, unsigned long timeout)
0427 {
0428     if (!rpc_sleep_check_activated(task))
0429         return;
0430 
0431     rpc_set_tk_callback(task, action);
0432 
0433     /*
0434      * Protect the queue operations.
0435      */
0436     spin_lock(&q->lock);
0437     __rpc_sleep_on_priority_timeout(q, task, timeout, task->tk_priority);
0438     spin_unlock(&q->lock);
0439 }
0440 EXPORT_SYMBOL_GPL(rpc_sleep_on_timeout);
0441 
0442 void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
0443                 rpc_action action)
0444 {
0445     if (!rpc_sleep_check_activated(task))
0446         return;
0447 
0448     rpc_set_tk_callback(task, action);
0449 
0450     WARN_ON_ONCE(task->tk_timeout != 0);
0451     /*
0452      * Protect the queue operations.
0453      */
0454     spin_lock(&q->lock);
0455     __rpc_sleep_on_priority(q, task, task->tk_priority);
0456     spin_unlock(&q->lock);
0457 }
0458 EXPORT_SYMBOL_GPL(rpc_sleep_on);
0459 
0460 void rpc_sleep_on_priority_timeout(struct rpc_wait_queue *q,
0461         struct rpc_task *task, unsigned long timeout, int priority)
0462 {
0463     if (!rpc_sleep_check_activated(task))
0464         return;
0465 
0466     priority -= RPC_PRIORITY_LOW;
0467     /*
0468      * Protect the queue operations.
0469      */
0470     spin_lock(&q->lock);
0471     __rpc_sleep_on_priority_timeout(q, task, timeout, priority);
0472     spin_unlock(&q->lock);
0473 }
0474 EXPORT_SYMBOL_GPL(rpc_sleep_on_priority_timeout);
0475 
0476 void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
0477         int priority)
0478 {
0479     if (!rpc_sleep_check_activated(task))
0480         return;
0481 
0482     WARN_ON_ONCE(task->tk_timeout != 0);
0483     priority -= RPC_PRIORITY_LOW;
0484     /*
0485      * Protect the queue operations.
0486      */
0487     spin_lock(&q->lock);
0488     __rpc_sleep_on_priority(q, task, priority);
0489     spin_unlock(&q->lock);
0490 }
0491 EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);
0492 
0493 /**
0494  * __rpc_do_wake_up_task_on_wq - wake up a single rpc_task
0495  * @wq: workqueue on which to run task
0496  * @queue: wait queue
0497  * @task: task to be woken up
0498  *
0499  * Caller must hold queue->lock, and have cleared the task queued flag.
0500  */
0501 static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq,
0502         struct rpc_wait_queue *queue,
0503         struct rpc_task *task)
0504 {
0505     /* Has the task been executed yet? If not, we cannot wake it up! */
0506     if (!RPC_IS_ACTIVATED(task)) {
0507         printk(KERN_ERR "RPC: Inactive task (%p) being woken up!\n", task);
0508         return;
0509     }
0510 
0511     trace_rpc_task_wakeup(task, queue);
0512 
0513     __rpc_remove_wait_queue(queue, task);
0514 
0515     rpc_make_runnable(wq, task);
0516 }
0517 
0518 /*
0519  * Wake up a queued task while the queue lock is being held
0520  */
0521 static struct rpc_task *
0522 rpc_wake_up_task_on_wq_queue_action_locked(struct workqueue_struct *wq,
0523         struct rpc_wait_queue *queue, struct rpc_task *task,
0524         bool (*action)(struct rpc_task *, void *), void *data)
0525 {
0526     if (RPC_IS_QUEUED(task)) {
0527         smp_rmb();
0528         if (task->tk_waitqueue == queue) {
0529             if (action == NULL || action(task, data)) {
0530                 __rpc_do_wake_up_task_on_wq(wq, queue, task);
0531                 return task;
0532             }
0533         }
0534     }
0535     return NULL;
0536 }
0537 
0538 /*
0539  * Wake up a queued task while the queue lock is being held
0540  */
0541 static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue,
0542                       struct rpc_task *task)
0543 {
0544     rpc_wake_up_task_on_wq_queue_action_locked(rpciod_workqueue, queue,
0545                            task, NULL, NULL);
0546 }
0547 
0548 /*
0549  * Wake up a task on a specific queue
0550  */
0551 void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task)
0552 {
0553     if (!RPC_IS_QUEUED(task))
0554         return;
0555     spin_lock(&queue->lock);
0556     rpc_wake_up_task_queue_locked(queue, task);
0557     spin_unlock(&queue->lock);
0558 }
0559 EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
0560 
0561 static bool rpc_task_action_set_status(struct rpc_task *task, void *status)
0562 {
0563     task->tk_status = *(int *)status;
0564     return true;
0565 }
0566 
0567 static void
0568 rpc_wake_up_task_queue_set_status_locked(struct rpc_wait_queue *queue,
0569         struct rpc_task *task, int status)
0570 {
0571     rpc_wake_up_task_on_wq_queue_action_locked(rpciod_workqueue, queue,
0572             task, rpc_task_action_set_status, &status);
0573 }
0574 
0575 /**
0576  * rpc_wake_up_queued_task_set_status - wake up a task and set task->tk_status
0577  * @queue: pointer to rpc_wait_queue
0578  * @task: pointer to rpc_task
0579  * @status: integer error value
0580  *
0581  * If @task is queued on @queue, then it is woken up, and @task->tk_status is
0582  * set to the value of @status.
0583  */
0584 void
0585 rpc_wake_up_queued_task_set_status(struct rpc_wait_queue *queue,
0586         struct rpc_task *task, int status)
0587 {
0588     if (!RPC_IS_QUEUED(task))
0589         return;
0590     spin_lock(&queue->lock);
0591     rpc_wake_up_task_queue_set_status_locked(queue, task, status);
0592     spin_unlock(&queue->lock);
0593 }
0594 
0595 /*
0596  * Wake up the next task on a priority queue.
0597  */
0598 static struct rpc_task *__rpc_find_next_queued_priority(struct rpc_wait_queue *queue)
0599 {
0600     struct list_head *q;
0601     struct rpc_task *task;
0602 
0603     /*
0604      * Service the privileged queue.
0605      */
0606     q = &queue->tasks[RPC_NR_PRIORITY - 1];
0607     if (queue->maxpriority > RPC_PRIORITY_PRIVILEGED && !list_empty(q)) {
0608         task = list_first_entry(q, struct rpc_task, u.tk_wait.list);
0609         goto out;
0610     }
0611 
0612     /*
0613      * Service a batch of tasks from a single owner.
0614      */
0615     q = &queue->tasks[queue->priority];
0616     if (!list_empty(q) && queue->nr) {
0617         queue->nr--;
0618         task = list_first_entry(q, struct rpc_task, u.tk_wait.list);
0619         goto out;
0620     }
0621 
0622     /*
0623      * Service the next queue.
0624      */
0625     do {
0626         if (q == &queue->tasks[0])
0627             q = &queue->tasks[queue->maxpriority];
0628         else
0629             q = q - 1;
0630         if (!list_empty(q)) {
0631             task = list_first_entry(q, struct rpc_task, u.tk_wait.list);
0632             goto new_queue;
0633         }
0634     } while (q != &queue->tasks[queue->priority]);
0635 
0636     rpc_reset_waitqueue_priority(queue);
0637     return NULL;
0638 
0639 new_queue:
0640     rpc_set_waitqueue_priority(queue, (unsigned int)(q - &queue->tasks[0]));
0641 out:
0642     return task;
0643 }
0644 
0645 static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue)
0646 {
0647     if (RPC_IS_PRIORITY(queue))
0648         return __rpc_find_next_queued_priority(queue);
0649     if (!list_empty(&queue->tasks[0]))
0650         return list_first_entry(&queue->tasks[0], struct rpc_task, u.tk_wait.list);
0651     return NULL;
0652 }
0653 
0654 /*
0655  * Wake up the first task on the wait queue.
0656  */
0657 struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
0658         struct rpc_wait_queue *queue,
0659         bool (*func)(struct rpc_task *, void *), void *data)
0660 {
0661     struct rpc_task *task = NULL;
0662 
0663     spin_lock(&queue->lock);
0664     task = __rpc_find_next_queued(queue);
0665     if (task != NULL)
0666         task = rpc_wake_up_task_on_wq_queue_action_locked(wq, queue,
0667                 task, func, data);
0668     spin_unlock(&queue->lock);
0669 
0670     return task;
0671 }
0672 
0673 /*
0674  * Wake up the first task on the wait queue.
0675  */
0676 struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
0677         bool (*func)(struct rpc_task *, void *), void *data)
0678 {
0679     return rpc_wake_up_first_on_wq(rpciod_workqueue, queue, func, data);
0680 }
0681 EXPORT_SYMBOL_GPL(rpc_wake_up_first);
0682 
0683 static bool rpc_wake_up_next_func(struct rpc_task *task, void *data)
0684 {
0685     return true;
0686 }
0687 
0688 /*
0689  * Wake up the next task on the wait queue.
0690 */
0691 struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *queue)
0692 {
0693     return rpc_wake_up_first(queue, rpc_wake_up_next_func, NULL);
0694 }
0695 EXPORT_SYMBOL_GPL(rpc_wake_up_next);
0696 
0697 /**
0698  * rpc_wake_up_locked - wake up all rpc_tasks
0699  * @queue: rpc_wait_queue on which the tasks are sleeping
0700  *
0701  */
0702 static void rpc_wake_up_locked(struct rpc_wait_queue *queue)
0703 {
0704     struct rpc_task *task;
0705 
0706     for (;;) {
0707         task = __rpc_find_next_queued(queue);
0708         if (task == NULL)
0709             break;
0710         rpc_wake_up_task_queue_locked(queue, task);
0711     }
0712 }
0713 
0714 /**
0715  * rpc_wake_up - wake up all rpc_tasks
0716  * @queue: rpc_wait_queue on which the tasks are sleeping
0717  *
0718  * Grabs queue->lock
0719  */
0720 void rpc_wake_up(struct rpc_wait_queue *queue)
0721 {
0722     spin_lock(&queue->lock);
0723     rpc_wake_up_locked(queue);
0724     spin_unlock(&queue->lock);
0725 }
0726 EXPORT_SYMBOL_GPL(rpc_wake_up);
0727 
0728 /**
0729  * rpc_wake_up_status_locked - wake up all rpc_tasks and set their status value.
0730  * @queue: rpc_wait_queue on which the tasks are sleeping
0731  * @status: status value to set
0732  */
0733 static void rpc_wake_up_status_locked(struct rpc_wait_queue *queue, int status)
0734 {
0735     struct rpc_task *task;
0736 
0737     for (;;) {
0738         task = __rpc_find_next_queued(queue);
0739         if (task == NULL)
0740             break;
0741         rpc_wake_up_task_queue_set_status_locked(queue, task, status);
0742     }
0743 }
0744 
0745 /**
0746  * rpc_wake_up_status - wake up all rpc_tasks and set their status value.
0747  * @queue: rpc_wait_queue on which the tasks are sleeping
0748  * @status: status value to set
0749  *
0750  * Grabs queue->lock
0751  */
0752 void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
0753 {
0754     spin_lock(&queue->lock);
0755     rpc_wake_up_status_locked(queue, status);
0756     spin_unlock(&queue->lock);
0757 }
0758 EXPORT_SYMBOL_GPL(rpc_wake_up_status);
0759 
0760 static void __rpc_queue_timer_fn(struct work_struct *work)
0761 {
0762     struct rpc_wait_queue *queue = container_of(work,
0763             struct rpc_wait_queue,
0764             timer_list.dwork.work);
0765     struct rpc_task *task, *n;
0766     unsigned long expires, now, timeo;
0767 
0768     spin_lock(&queue->lock);
0769     expires = now = jiffies;
0770     list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
0771         timeo = task->tk_timeout;
0772         if (time_after_eq(now, timeo)) {
0773             trace_rpc_task_timeout(task, task->tk_action);
0774             task->tk_status = -ETIMEDOUT;
0775             rpc_wake_up_task_queue_locked(queue, task);
0776             continue;
0777         }
0778         if (expires == now || time_after(expires, timeo))
0779             expires = timeo;
0780     }
0781     if (!list_empty(&queue->timer_list.list))
0782         rpc_set_queue_timer(queue, expires);
0783     spin_unlock(&queue->lock);
0784 }
0785 
0786 static void __rpc_atrun(struct rpc_task *task)
0787 {
0788     if (task->tk_status == -ETIMEDOUT)
0789         task->tk_status = 0;
0790 }
0791 
0792 /*
0793  * Run a task at a later time
0794  */
0795 void rpc_delay(struct rpc_task *task, unsigned long delay)
0796 {
0797     rpc_sleep_on_timeout(&delay_queue, task, __rpc_atrun, jiffies + delay);
0798 }
0799 EXPORT_SYMBOL_GPL(rpc_delay);
0800 
0801 /*
0802  * Helper to call task->tk_ops->rpc_call_prepare
0803  */
0804 void rpc_prepare_task(struct rpc_task *task)
0805 {
0806     task->tk_ops->rpc_call_prepare(task, task->tk_calldata);
0807 }
0808 
0809 static void
0810 rpc_init_task_statistics(struct rpc_task *task)
0811 {
0812     /* Initialize retry counters */
0813     task->tk_garb_retry = 2;
0814     task->tk_cred_retry = 2;
0815     task->tk_rebind_retry = 2;
0816 
0817     /* starting timestamp */
0818     task->tk_start = ktime_get();
0819 }
0820 
0821 static void
0822 rpc_reset_task_statistics(struct rpc_task *task)
0823 {
0824     task->tk_timeouts = 0;
0825     task->tk_flags &= ~(RPC_CALL_MAJORSEEN|RPC_TASK_SENT);
0826     rpc_init_task_statistics(task);
0827 }
0828 
0829 /*
0830  * Helper that calls task->tk_ops->rpc_call_done if it exists
0831  */
0832 void rpc_exit_task(struct rpc_task *task)
0833 {
0834     trace_rpc_task_end(task, task->tk_action);
0835     task->tk_action = NULL;
0836     if (task->tk_ops->rpc_count_stats)
0837         task->tk_ops->rpc_count_stats(task, task->tk_calldata);
0838     else if (task->tk_client)
0839         rpc_count_iostats(task, task->tk_client->cl_metrics);
0840     if (task->tk_ops->rpc_call_done != NULL) {
0841         trace_rpc_task_call_done(task, task->tk_ops->rpc_call_done);
0842         task->tk_ops->rpc_call_done(task, task->tk_calldata);
0843         if (task->tk_action != NULL) {
0844             /* Always release the RPC slot and buffer memory */
0845             xprt_release(task);
0846             rpc_reset_task_statistics(task);
0847         }
0848     }
0849 }
0850 
0851 void rpc_signal_task(struct rpc_task *task)
0852 {
0853     struct rpc_wait_queue *queue;
0854 
0855     if (!RPC_IS_ACTIVATED(task))
0856         return;
0857 
0858     trace_rpc_task_signalled(task, task->tk_action);
0859     set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate);
0860     smp_mb__after_atomic();
0861     queue = READ_ONCE(task->tk_waitqueue);
0862     if (queue)
0863         rpc_wake_up_queued_task_set_status(queue, task, -ERESTARTSYS);
0864 }
0865 
0866 void rpc_exit(struct rpc_task *task, int status)
0867 {
0868     task->tk_status = status;
0869     task->tk_action = rpc_exit_task;
0870     rpc_wake_up_queued_task(task->tk_waitqueue, task);
0871 }
0872 EXPORT_SYMBOL_GPL(rpc_exit);
0873 
0874 void rpc_release_calldata(const struct rpc_call_ops *ops, void *calldata)
0875 {
0876     if (ops->rpc_release != NULL)
0877         ops->rpc_release(calldata);
0878 }
0879 
0880 static bool xprt_needs_memalloc(struct rpc_xprt *xprt, struct rpc_task *tk)
0881 {
0882     if (!xprt)
0883         return false;
0884     if (!atomic_read(&xprt->swapper))
0885         return false;
0886     return test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == tk;
0887 }
0888 
0889 /*
0890  * This is the RPC `scheduler' (or rather, the finite state machine).
0891  */
0892 static void __rpc_execute(struct rpc_task *task)
0893 {
0894     struct rpc_wait_queue *queue;
0895     int task_is_async = RPC_IS_ASYNC(task);
0896     int status = 0;
0897     unsigned long pflags = current->flags;
0898 
0899     WARN_ON_ONCE(RPC_IS_QUEUED(task));
0900     if (RPC_IS_QUEUED(task))
0901         return;
0902 
0903     for (;;) {
0904         void (*do_action)(struct rpc_task *);
0905 
0906         /*
0907          * Perform the next FSM step or a pending callback.
0908          *
0909          * tk_action may be NULL if the task has been killed.
0910          * In particular, note that rpc_killall_tasks may
0911          * do this at any time, so beware when dereferencing.
0912          */
0913         do_action = task->tk_action;
0914         if (task->tk_callback) {
0915             do_action = task->tk_callback;
0916             task->tk_callback = NULL;
0917         }
0918         if (!do_action)
0919             break;
0920         if (RPC_IS_SWAPPER(task) ||
0921             xprt_needs_memalloc(task->tk_xprt, task))
0922             current->flags |= PF_MEMALLOC;
0923 
0924         trace_rpc_task_run_action(task, do_action);
0925         do_action(task);
0926 
0927         /*
0928          * Lockless check for whether task is sleeping or not.
0929          */
0930         if (!RPC_IS_QUEUED(task)) {
0931             cond_resched();
0932             continue;
0933         }
0934 
0935         /*
0936          * Signalled tasks should exit rather than sleep.
0937          */
0938         if (RPC_SIGNALLED(task)) {
0939             task->tk_rpc_status = -ERESTARTSYS;
0940             rpc_exit(task, -ERESTARTSYS);
0941         }
0942 
0943         /*
0944          * The queue->lock protects against races with
0945          * rpc_make_runnable().
0946          *
0947          * Note that once we clear RPC_TASK_RUNNING on an asynchronous
0948          * rpc_task, rpc_make_runnable() can assign it to a
0949          * different workqueue. We therefore cannot assume that the
0950          * rpc_task pointer may still be dereferenced.
0951          */
0952         queue = task->tk_waitqueue;
0953         spin_lock(&queue->lock);
0954         if (!RPC_IS_QUEUED(task)) {
0955             spin_unlock(&queue->lock);
0956             continue;
0957         }
0958         rpc_clear_running(task);
0959         spin_unlock(&queue->lock);
0960         if (task_is_async)
0961             goto out;
0962 
0963         /* sync task: sleep here */
0964         trace_rpc_task_sync_sleep(task, task->tk_action);
0965         status = out_of_line_wait_on_bit(&task->tk_runstate,
0966                 RPC_TASK_QUEUED, rpc_wait_bit_killable,
0967                 TASK_KILLABLE);
0968         if (status < 0) {
0969             /*
0970              * When a sync task receives a signal, it exits with
0971              * -ERESTARTSYS. In order to catch any callbacks that
0972              * clean up after sleeping on some queue, we don't
0973              * break the loop here, but go around once more.
0974              */
0975             trace_rpc_task_signalled(task, task->tk_action);
0976             set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate);
0977             task->tk_rpc_status = -ERESTARTSYS;
0978             rpc_exit(task, -ERESTARTSYS);
0979         }
0980         trace_rpc_task_sync_wake(task, task->tk_action);
0981     }
0982 
0983     /* Release all resources associated with the task */
0984     rpc_release_task(task);
0985 out:
0986     current_restore_flags(pflags, PF_MEMALLOC);
0987 }
0988 
0989 /*
0990  * User-visible entry point to the scheduler.
0991  *
0992  * This may be called recursively if e.g. an async NFS task updates
0993  * the attributes and finds that dirty pages must be flushed.
0994  * NOTE: Upon exit of this function the task is guaranteed to be
0995  *   released. In particular note that tk_release() will have
0996  *   been called, so your task memory may have been freed.
0997  */
0998 void rpc_execute(struct rpc_task *task)
0999 {
1000     bool is_async = RPC_IS_ASYNC(task);
1001 
1002     rpc_set_active(task);
1003     rpc_make_runnable(rpciod_workqueue, task);
1004     if (!is_async) {
1005         unsigned int pflags = memalloc_nofs_save();
1006         __rpc_execute(task);
1007         memalloc_nofs_restore(pflags);
1008     }
1009 }
1010 
1011 static void rpc_async_schedule(struct work_struct *work)
1012 {
1013     unsigned int pflags = memalloc_nofs_save();
1014 
1015     __rpc_execute(container_of(work, struct rpc_task, u.tk_work));
1016     memalloc_nofs_restore(pflags);
1017 }
1018 
1019 /**
1020  * rpc_malloc - allocate RPC buffer resources
1021  * @task: RPC task
1022  *
1023  * A single memory region is allocated, which is split between the
1024  * RPC call and RPC reply that this task is being used for. When
1025  * this RPC is retired, the memory is released by calling rpc_free.
1026  *
1027  * To prevent rpciod from hanging, this allocator never sleeps,
1028  * returning -ENOMEM and suppressing warning if the request cannot
1029  * be serviced immediately. The caller can arrange to sleep in a
1030  * way that is safe for rpciod.
1031  *
1032  * Most requests are 'small' (under 2KiB) and can be serviced from a
1033  * mempool, ensuring that NFS reads and writes can always proceed,
1034  * and that there is good locality of reference for these buffers.
1035  */
1036 int rpc_malloc(struct rpc_task *task)
1037 {
1038     struct rpc_rqst *rqst = task->tk_rqstp;
1039     size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
1040     struct rpc_buffer *buf;
1041     gfp_t gfp = rpc_task_gfp_mask();
1042 
1043     size += sizeof(struct rpc_buffer);
1044     if (size <= RPC_BUFFER_MAXSIZE) {
1045         buf = kmem_cache_alloc(rpc_buffer_slabp, gfp);
1046         /* Reach for the mempool if dynamic allocation fails */
1047         if (!buf && RPC_IS_ASYNC(task))
1048             buf = mempool_alloc(rpc_buffer_mempool, GFP_NOWAIT);
1049     } else
1050         buf = kmalloc(size, gfp);
1051 
1052     if (!buf)
1053         return -ENOMEM;
1054 
1055     buf->len = size;
1056     rqst->rq_buffer = buf->data;
1057     rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_callsize;
1058     return 0;
1059 }
1060 EXPORT_SYMBOL_GPL(rpc_malloc);
1061 
1062 /**
1063  * rpc_free - free RPC buffer resources allocated via rpc_malloc
1064  * @task: RPC task
1065  *
1066  */
1067 void rpc_free(struct rpc_task *task)
1068 {
1069     void *buffer = task->tk_rqstp->rq_buffer;
1070     size_t size;
1071     struct rpc_buffer *buf;
1072 
1073     buf = container_of(buffer, struct rpc_buffer, data);
1074     size = buf->len;
1075 
1076     if (size <= RPC_BUFFER_MAXSIZE)
1077         mempool_free(buf, rpc_buffer_mempool);
1078     else
1079         kfree(buf);
1080 }
1081 EXPORT_SYMBOL_GPL(rpc_free);
1082 
1083 /*
1084  * Creation and deletion of RPC task structures
1085  */
1086 static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *task_setup_data)
1087 {
1088     memset(task, 0, sizeof(*task));
1089     atomic_set(&task->tk_count, 1);
1090     task->tk_flags  = task_setup_data->flags;
1091     task->tk_ops = task_setup_data->callback_ops;
1092     task->tk_calldata = task_setup_data->callback_data;
1093     INIT_LIST_HEAD(&task->tk_task);
1094 
1095     task->tk_priority = task_setup_data->priority - RPC_PRIORITY_LOW;
1096     task->tk_owner = current->tgid;
1097 
1098     /* Initialize workqueue for async tasks */
1099     task->tk_workqueue = task_setup_data->workqueue;
1100 
1101     task->tk_xprt = rpc_task_get_xprt(task_setup_data->rpc_client,
1102             xprt_get(task_setup_data->rpc_xprt));
1103 
1104     task->tk_op_cred = get_rpccred(task_setup_data->rpc_op_cred);
1105 
1106     if (task->tk_ops->rpc_call_prepare != NULL)
1107         task->tk_action = rpc_prepare_task;
1108 
1109     rpc_init_task_statistics(task);
1110 }
1111 
1112 static struct rpc_task *rpc_alloc_task(void)
1113 {
1114     struct rpc_task *task;
1115 
1116     task = kmem_cache_alloc(rpc_task_slabp, rpc_task_gfp_mask());
1117     if (task)
1118         return task;
1119     return mempool_alloc(rpc_task_mempool, GFP_NOWAIT);
1120 }
1121 
1122 /*
1123  * Create a new task for the specified client.
1124  */
1125 struct rpc_task *rpc_new_task(const struct rpc_task_setup *setup_data)
1126 {
1127     struct rpc_task *task = setup_data->task;
1128     unsigned short flags = 0;
1129 
1130     if (task == NULL) {
1131         task = rpc_alloc_task();
1132         if (task == NULL) {
1133             rpc_release_calldata(setup_data->callback_ops,
1134                          setup_data->callback_data);
1135             return ERR_PTR(-ENOMEM);
1136         }
1137         flags = RPC_TASK_DYNAMIC;
1138     }
1139 
1140     rpc_init_task(task, setup_data);
1141     task->tk_flags |= flags;
1142     return task;
1143 }
1144 
1145 /*
1146  * rpc_free_task - release rpc task and perform cleanups
1147  *
1148  * Note that we free up the rpc_task _after_ rpc_release_calldata()
1149  * in order to work around a workqueue dependency issue.
1150  *
1151  * Tejun Heo states:
1152  * "Workqueue currently considers two work items to be the same if they're
1153  * on the same address and won't execute them concurrently - ie. it
1154  * makes a work item which is queued again while being executed wait
1155  * for the previous execution to complete.
1156  *
1157  * If a work function frees the work item, and then waits for an event
1158  * which should be performed by another work item and *that* work item
1159  * recycles the freed work item, it can create a false dependency loop.
1160  * There really is no reliable way to detect this short of verifying
1161  * every memory free."
1162  *
1163  */
1164 static void rpc_free_task(struct rpc_task *task)
1165 {
1166     unsigned short tk_flags = task->tk_flags;
1167 
1168     put_rpccred(task->tk_op_cred);
1169     rpc_release_calldata(task->tk_ops, task->tk_calldata);
1170 
1171     if (tk_flags & RPC_TASK_DYNAMIC)
1172         mempool_free(task, rpc_task_mempool);
1173 }
1174 
1175 static void rpc_async_release(struct work_struct *work)
1176 {
1177     unsigned int pflags = memalloc_nofs_save();
1178 
1179     rpc_free_task(container_of(work, struct rpc_task, u.tk_work));
1180     memalloc_nofs_restore(pflags);
1181 }
1182 
1183 static void rpc_release_resources_task(struct rpc_task *task)
1184 {
1185     xprt_release(task);
1186     if (task->tk_msg.rpc_cred) {
1187         if (!(task->tk_flags & RPC_TASK_CRED_NOREF))
1188             put_cred(task->tk_msg.rpc_cred);
1189         task->tk_msg.rpc_cred = NULL;
1190     }
1191     rpc_task_release_client(task);
1192 }
1193 
1194 static void rpc_final_put_task(struct rpc_task *task,
1195         struct workqueue_struct *q)
1196 {
1197     if (q != NULL) {
1198         INIT_WORK(&task->u.tk_work, rpc_async_release);
1199         queue_work(q, &task->u.tk_work);
1200     } else
1201         rpc_free_task(task);
1202 }
1203 
1204 static void rpc_do_put_task(struct rpc_task *task, struct workqueue_struct *q)
1205 {
1206     if (atomic_dec_and_test(&task->tk_count)) {
1207         rpc_release_resources_task(task);
1208         rpc_final_put_task(task, q);
1209     }
1210 }
1211 
1212 void rpc_put_task(struct rpc_task *task)
1213 {
1214     rpc_do_put_task(task, NULL);
1215 }
1216 EXPORT_SYMBOL_GPL(rpc_put_task);
1217 
1218 void rpc_put_task_async(struct rpc_task *task)
1219 {
1220     rpc_do_put_task(task, task->tk_workqueue);
1221 }
1222 EXPORT_SYMBOL_GPL(rpc_put_task_async);
1223 
1224 static void rpc_release_task(struct rpc_task *task)
1225 {
1226     WARN_ON_ONCE(RPC_IS_QUEUED(task));
1227 
1228     rpc_release_resources_task(task);
1229 
1230     /*
1231      * Note: at this point we have been removed from rpc_clnt->cl_tasks,
1232      * so it should be safe to use task->tk_count as a test for whether
1233      * or not any other processes still hold references to our rpc_task.
1234      */
1235     if (atomic_read(&task->tk_count) != 1 + !RPC_IS_ASYNC(task)) {
1236         /* Wake up anyone who may be waiting for task completion */
1237         if (!rpc_complete_task(task))
1238             return;
1239     } else {
1240         if (!atomic_dec_and_test(&task->tk_count))
1241             return;
1242     }
1243     rpc_final_put_task(task, task->tk_workqueue);
1244 }
1245 
1246 int rpciod_up(void)
1247 {
1248     return try_module_get(THIS_MODULE) ? 0 : -EINVAL;
1249 }
1250 
1251 void rpciod_down(void)
1252 {
1253     module_put(THIS_MODULE);
1254 }
1255 
1256 /*
1257  * Start up the rpciod workqueue.
1258  */
1259 static int rpciod_start(void)
1260 {
1261     struct workqueue_struct *wq;
1262 
1263     /*
1264      * Create the rpciod thread and wait for it to start.
1265      */
1266     wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_UNBOUND, 0);
1267     if (!wq)
1268         goto out_failed;
1269     rpciod_workqueue = wq;
1270     wq = alloc_workqueue("xprtiod", WQ_UNBOUND | WQ_MEM_RECLAIM, 0);
1271     if (!wq)
1272         goto free_rpciod;
1273     xprtiod_workqueue = wq;
1274     return 1;
1275 free_rpciod:
1276     wq = rpciod_workqueue;
1277     rpciod_workqueue = NULL;
1278     destroy_workqueue(wq);
1279 out_failed:
1280     return 0;
1281 }
1282 
1283 static void rpciod_stop(void)
1284 {
1285     struct workqueue_struct *wq = NULL;
1286 
1287     if (rpciod_workqueue == NULL)
1288         return;
1289 
1290     wq = rpciod_workqueue;
1291     rpciod_workqueue = NULL;
1292     destroy_workqueue(wq);
1293     wq = xprtiod_workqueue;
1294     xprtiod_workqueue = NULL;
1295     destroy_workqueue(wq);
1296 }
1297 
1298 void
1299 rpc_destroy_mempool(void)
1300 {
1301     rpciod_stop();
1302     mempool_destroy(rpc_buffer_mempool);
1303     mempool_destroy(rpc_task_mempool);
1304     kmem_cache_destroy(rpc_task_slabp);
1305     kmem_cache_destroy(rpc_buffer_slabp);
1306     rpc_destroy_wait_queue(&delay_queue);
1307 }
1308 
1309 int
1310 rpc_init_mempool(void)
1311 {
1312     /*
1313      * The following is not strictly a mempool initialisation,
1314      * but there is no harm in doing it here
1315      */
1316     rpc_init_wait_queue(&delay_queue, "delayq");
1317     if (!rpciod_start())
1318         goto err_nomem;
1319 
1320     rpc_task_slabp = kmem_cache_create("rpc_tasks",
1321                          sizeof(struct rpc_task),
1322                          0, SLAB_HWCACHE_ALIGN,
1323                          NULL);
1324     if (!rpc_task_slabp)
1325         goto err_nomem;
1326     rpc_buffer_slabp = kmem_cache_create("rpc_buffers",
1327                          RPC_BUFFER_MAXSIZE,
1328                          0, SLAB_HWCACHE_ALIGN,
1329                          NULL);
1330     if (!rpc_buffer_slabp)
1331         goto err_nomem;
1332     rpc_task_mempool = mempool_create_slab_pool(RPC_TASK_POOLSIZE,
1333                             rpc_task_slabp);
1334     if (!rpc_task_mempool)
1335         goto err_nomem;
1336     rpc_buffer_mempool = mempool_create_slab_pool(RPC_BUFFER_POOLSIZE,
1337                               rpc_buffer_slabp);
1338     if (!rpc_buffer_mempool)
1339         goto err_nomem;
1340     return 0;
1341 err_nomem:
1342     rpc_destroy_mempool();
1343     return -ENOMEM;
1344 }