Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0
0002 /*
0003  * padata.c - generic interface to process data streams in parallel
0004  *
0005  * See Documentation/core-api/padata.rst for more information.
0006  *
0007  * Copyright (C) 2008, 2009 secunet Security Networks AG
0008  * Copyright (C) 2008, 2009 Steffen Klassert <steffen.klassert@secunet.com>
0009  *
0010  * Copyright (c) 2020 Oracle and/or its affiliates.
0011  * Author: Daniel Jordan <daniel.m.jordan@oracle.com>
0012  */
0013 
0014 #include <linux/completion.h>
0015 #include <linux/export.h>
0016 #include <linux/cpumask.h>
0017 #include <linux/err.h>
0018 #include <linux/cpu.h>
0019 #include <linux/padata.h>
0020 #include <linux/mutex.h>
0021 #include <linux/sched.h>
0022 #include <linux/slab.h>
0023 #include <linux/sysfs.h>
0024 #include <linux/rcupdate.h>
0025 
0026 #define PADATA_WORK_ONSTACK 1   /* Work's memory is on stack */
0027 
0028 struct padata_work {
0029     struct work_struct  pw_work;
0030     struct list_head    pw_list;  /* padata_free_works linkage */
0031     void            *pw_data;
0032 };
0033 
0034 static DEFINE_SPINLOCK(padata_works_lock);
0035 static struct padata_work *padata_works;
0036 static LIST_HEAD(padata_free_works);
0037 
0038 struct padata_mt_job_state {
0039     spinlock_t      lock;
0040     struct completion   completion;
0041     struct padata_mt_job    *job;
0042     int         nworks;
0043     int         nworks_fini;
0044     unsigned long       chunk_size;
0045 };
0046 
0047 static void padata_free_pd(struct parallel_data *pd);
0048 static void __init padata_mt_helper(struct work_struct *work);
0049 
0050 static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index)
0051 {
0052     int cpu, target_cpu;
0053 
0054     target_cpu = cpumask_first(pd->cpumask.pcpu);
0055     for (cpu = 0; cpu < cpu_index; cpu++)
0056         target_cpu = cpumask_next(target_cpu, pd->cpumask.pcpu);
0057 
0058     return target_cpu;
0059 }
0060 
0061 static int padata_cpu_hash(struct parallel_data *pd, unsigned int seq_nr)
0062 {
0063     /*
0064      * Hash the sequence numbers to the cpus by taking
0065      * seq_nr mod. number of cpus in use.
0066      */
0067     int cpu_index = seq_nr % cpumask_weight(pd->cpumask.pcpu);
0068 
0069     return padata_index_to_cpu(pd, cpu_index);
0070 }
0071 
0072 static struct padata_work *padata_work_alloc(void)
0073 {
0074     struct padata_work *pw;
0075 
0076     lockdep_assert_held(&padata_works_lock);
0077 
0078     if (list_empty(&padata_free_works))
0079         return NULL;    /* No more work items allowed to be queued. */
0080 
0081     pw = list_first_entry(&padata_free_works, struct padata_work, pw_list);
0082     list_del(&pw->pw_list);
0083     return pw;
0084 }
0085 
0086 static void padata_work_init(struct padata_work *pw, work_func_t work_fn,
0087                  void *data, int flags)
0088 {
0089     if (flags & PADATA_WORK_ONSTACK)
0090         INIT_WORK_ONSTACK(&pw->pw_work, work_fn);
0091     else
0092         INIT_WORK(&pw->pw_work, work_fn);
0093     pw->pw_data = data;
0094 }
0095 
0096 static int __init padata_work_alloc_mt(int nworks, void *data,
0097                        struct list_head *head)
0098 {
0099     int i;
0100 
0101     spin_lock(&padata_works_lock);
0102     /* Start at 1 because the current task participates in the job. */
0103     for (i = 1; i < nworks; ++i) {
0104         struct padata_work *pw = padata_work_alloc();
0105 
0106         if (!pw)
0107             break;
0108         padata_work_init(pw, padata_mt_helper, data, 0);
0109         list_add(&pw->pw_list, head);
0110     }
0111     spin_unlock(&padata_works_lock);
0112 
0113     return i;
0114 }
0115 
0116 static void padata_work_free(struct padata_work *pw)
0117 {
0118     lockdep_assert_held(&padata_works_lock);
0119     list_add(&pw->pw_list, &padata_free_works);
0120 }
0121 
0122 static void __init padata_works_free(struct list_head *works)
0123 {
0124     struct padata_work *cur, *next;
0125 
0126     if (list_empty(works))
0127         return;
0128 
0129     spin_lock(&padata_works_lock);
0130     list_for_each_entry_safe(cur, next, works, pw_list) {
0131         list_del(&cur->pw_list);
0132         padata_work_free(cur);
0133     }
0134     spin_unlock(&padata_works_lock);
0135 }
0136 
0137 static void padata_parallel_worker(struct work_struct *parallel_work)
0138 {
0139     struct padata_work *pw = container_of(parallel_work, struct padata_work,
0140                           pw_work);
0141     struct padata_priv *padata = pw->pw_data;
0142 
0143     local_bh_disable();
0144     padata->parallel(padata);
0145     spin_lock(&padata_works_lock);
0146     padata_work_free(pw);
0147     spin_unlock(&padata_works_lock);
0148     local_bh_enable();
0149 }
0150 
0151 /**
0152  * padata_do_parallel - padata parallelization function
0153  *
0154  * @ps: padatashell
0155  * @padata: object to be parallelized
0156  * @cb_cpu: pointer to the CPU that the serialization callback function should
0157  *          run on.  If it's not in the serial cpumask of @pinst
0158  *          (i.e. cpumask.cbcpu), this function selects a fallback CPU and if
0159  *          none found, returns -EINVAL.
0160  *
0161  * The parallelization callback function will run with BHs off.
0162  * Note: Every object which is parallelized by padata_do_parallel
0163  * must be seen by padata_do_serial.
0164  *
0165  * Return: 0 on success or else negative error code.
0166  */
0167 int padata_do_parallel(struct padata_shell *ps,
0168                struct padata_priv *padata, int *cb_cpu)
0169 {
0170     struct padata_instance *pinst = ps->pinst;
0171     int i, cpu, cpu_index, err;
0172     struct parallel_data *pd;
0173     struct padata_work *pw;
0174 
0175     rcu_read_lock_bh();
0176 
0177     pd = rcu_dereference_bh(ps->pd);
0178 
0179     err = -EINVAL;
0180     if (!(pinst->flags & PADATA_INIT) || pinst->flags & PADATA_INVALID)
0181         goto out;
0182 
0183     if (!cpumask_test_cpu(*cb_cpu, pd->cpumask.cbcpu)) {
0184         if (cpumask_empty(pd->cpumask.cbcpu))
0185             goto out;
0186 
0187         /* Select an alternate fallback CPU and notify the caller. */
0188         cpu_index = *cb_cpu % cpumask_weight(pd->cpumask.cbcpu);
0189 
0190         cpu = cpumask_first(pd->cpumask.cbcpu);
0191         for (i = 0; i < cpu_index; i++)
0192             cpu = cpumask_next(cpu, pd->cpumask.cbcpu);
0193 
0194         *cb_cpu = cpu;
0195     }
0196 
0197     err =  -EBUSY;
0198     if ((pinst->flags & PADATA_RESET))
0199         goto out;
0200 
0201     refcount_inc(&pd->refcnt);
0202     padata->pd = pd;
0203     padata->cb_cpu = *cb_cpu;
0204 
0205     spin_lock(&padata_works_lock);
0206     padata->seq_nr = ++pd->seq_nr;
0207     pw = padata_work_alloc();
0208     spin_unlock(&padata_works_lock);
0209 
0210     rcu_read_unlock_bh();
0211 
0212     if (pw) {
0213         padata_work_init(pw, padata_parallel_worker, padata, 0);
0214         queue_work(pinst->parallel_wq, &pw->pw_work);
0215     } else {
0216         /* Maximum works limit exceeded, run in the current task. */
0217         padata->parallel(padata);
0218     }
0219 
0220     return 0;
0221 out:
0222     rcu_read_unlock_bh();
0223 
0224     return err;
0225 }
0226 EXPORT_SYMBOL(padata_do_parallel);
0227 
0228 /*
0229  * padata_find_next - Find the next object that needs serialization.
0230  *
0231  * Return:
0232  * * A pointer to the control struct of the next object that needs
0233  *   serialization, if present in one of the percpu reorder queues.
0234  * * NULL, if the next object that needs serialization will
0235  *   be parallel processed by another cpu and is not yet present in
0236  *   the cpu's reorder queue.
0237  */
0238 static struct padata_priv *padata_find_next(struct parallel_data *pd,
0239                         bool remove_object)
0240 {
0241     struct padata_priv *padata;
0242     struct padata_list *reorder;
0243     int cpu = pd->cpu;
0244 
0245     reorder = per_cpu_ptr(pd->reorder_list, cpu);
0246 
0247     spin_lock(&reorder->lock);
0248     if (list_empty(&reorder->list)) {
0249         spin_unlock(&reorder->lock);
0250         return NULL;
0251     }
0252 
0253     padata = list_entry(reorder->list.next, struct padata_priv, list);
0254 
0255     /*
0256      * Checks the rare case where two or more parallel jobs have hashed to
0257      * the same CPU and one of the later ones finishes first.
0258      */
0259     if (padata->seq_nr != pd->processed) {
0260         spin_unlock(&reorder->lock);
0261         return NULL;
0262     }
0263 
0264     if (remove_object) {
0265         list_del_init(&padata->list);
0266         ++pd->processed;
0267         pd->cpu = cpumask_next_wrap(cpu, pd->cpumask.pcpu, -1, false);
0268     }
0269 
0270     spin_unlock(&reorder->lock);
0271     return padata;
0272 }
0273 
0274 static void padata_reorder(struct parallel_data *pd)
0275 {
0276     struct padata_instance *pinst = pd->ps->pinst;
0277     int cb_cpu;
0278     struct padata_priv *padata;
0279     struct padata_serial_queue *squeue;
0280     struct padata_list *reorder;
0281 
0282     /*
0283      * We need to ensure that only one cpu can work on dequeueing of
0284      * the reorder queue the time. Calculating in which percpu reorder
0285      * queue the next object will arrive takes some time. A spinlock
0286      * would be highly contended. Also it is not clear in which order
0287      * the objects arrive to the reorder queues. So a cpu could wait to
0288      * get the lock just to notice that there is nothing to do at the
0289      * moment. Therefore we use a trylock and let the holder of the lock
0290      * care for all the objects enqueued during the holdtime of the lock.
0291      */
0292     if (!spin_trylock_bh(&pd->lock))
0293         return;
0294 
0295     while (1) {
0296         padata = padata_find_next(pd, true);
0297 
0298         /*
0299          * If the next object that needs serialization is parallel
0300          * processed by another cpu and is still on it's way to the
0301          * cpu's reorder queue, nothing to do for now.
0302          */
0303         if (!padata)
0304             break;
0305 
0306         cb_cpu = padata->cb_cpu;
0307         squeue = per_cpu_ptr(pd->squeue, cb_cpu);
0308 
0309         spin_lock(&squeue->serial.lock);
0310         list_add_tail(&padata->list, &squeue->serial.list);
0311         spin_unlock(&squeue->serial.lock);
0312 
0313         queue_work_on(cb_cpu, pinst->serial_wq, &squeue->work);
0314     }
0315 
0316     spin_unlock_bh(&pd->lock);
0317 
0318     /*
0319      * The next object that needs serialization might have arrived to
0320      * the reorder queues in the meantime.
0321      *
0322      * Ensure reorder queue is read after pd->lock is dropped so we see
0323      * new objects from another task in padata_do_serial.  Pairs with
0324      * smp_mb in padata_do_serial.
0325      */
0326     smp_mb();
0327 
0328     reorder = per_cpu_ptr(pd->reorder_list, pd->cpu);
0329     if (!list_empty(&reorder->list) && padata_find_next(pd, false))
0330         queue_work(pinst->serial_wq, &pd->reorder_work);
0331 }
0332 
0333 static void invoke_padata_reorder(struct work_struct *work)
0334 {
0335     struct parallel_data *pd;
0336 
0337     local_bh_disable();
0338     pd = container_of(work, struct parallel_data, reorder_work);
0339     padata_reorder(pd);
0340     local_bh_enable();
0341 }
0342 
0343 static void padata_serial_worker(struct work_struct *serial_work)
0344 {
0345     struct padata_serial_queue *squeue;
0346     struct parallel_data *pd;
0347     LIST_HEAD(local_list);
0348     int cnt;
0349 
0350     local_bh_disable();
0351     squeue = container_of(serial_work, struct padata_serial_queue, work);
0352     pd = squeue->pd;
0353 
0354     spin_lock(&squeue->serial.lock);
0355     list_replace_init(&squeue->serial.list, &local_list);
0356     spin_unlock(&squeue->serial.lock);
0357 
0358     cnt = 0;
0359 
0360     while (!list_empty(&local_list)) {
0361         struct padata_priv *padata;
0362 
0363         padata = list_entry(local_list.next,
0364                     struct padata_priv, list);
0365 
0366         list_del_init(&padata->list);
0367 
0368         padata->serial(padata);
0369         cnt++;
0370     }
0371     local_bh_enable();
0372 
0373     if (refcount_sub_and_test(cnt, &pd->refcnt))
0374         padata_free_pd(pd);
0375 }
0376 
0377 /**
0378  * padata_do_serial - padata serialization function
0379  *
0380  * @padata: object to be serialized.
0381  *
0382  * padata_do_serial must be called for every parallelized object.
0383  * The serialization callback function will run with BHs off.
0384  */
0385 void padata_do_serial(struct padata_priv *padata)
0386 {
0387     struct parallel_data *pd = padata->pd;
0388     int hashed_cpu = padata_cpu_hash(pd, padata->seq_nr);
0389     struct padata_list *reorder = per_cpu_ptr(pd->reorder_list, hashed_cpu);
0390     struct padata_priv *cur;
0391 
0392     spin_lock(&reorder->lock);
0393     /* Sort in ascending order of sequence number. */
0394     list_for_each_entry_reverse(cur, &reorder->list, list)
0395         if (cur->seq_nr < padata->seq_nr)
0396             break;
0397     list_add(&padata->list, &cur->list);
0398     spin_unlock(&reorder->lock);
0399 
0400     /*
0401      * Ensure the addition to the reorder list is ordered correctly
0402      * with the trylock of pd->lock in padata_reorder.  Pairs with smp_mb
0403      * in padata_reorder.
0404      */
0405     smp_mb();
0406 
0407     padata_reorder(pd);
0408 }
0409 EXPORT_SYMBOL(padata_do_serial);
0410 
0411 static int padata_setup_cpumasks(struct padata_instance *pinst)
0412 {
0413     struct workqueue_attrs *attrs;
0414     int err;
0415 
0416     attrs = alloc_workqueue_attrs();
0417     if (!attrs)
0418         return -ENOMEM;
0419 
0420     /* Restrict parallel_wq workers to pd->cpumask.pcpu. */
0421     cpumask_copy(attrs->cpumask, pinst->cpumask.pcpu);
0422     err = apply_workqueue_attrs(pinst->parallel_wq, attrs);
0423     free_workqueue_attrs(attrs);
0424 
0425     return err;
0426 }
0427 
0428 static void __init padata_mt_helper(struct work_struct *w)
0429 {
0430     struct padata_work *pw = container_of(w, struct padata_work, pw_work);
0431     struct padata_mt_job_state *ps = pw->pw_data;
0432     struct padata_mt_job *job = ps->job;
0433     bool done;
0434 
0435     spin_lock(&ps->lock);
0436 
0437     while (job->size > 0) {
0438         unsigned long start, size, end;
0439 
0440         start = job->start;
0441         /* So end is chunk size aligned if enough work remains. */
0442         size = roundup(start + 1, ps->chunk_size) - start;
0443         size = min(size, job->size);
0444         end = start + size;
0445 
0446         job->start = end;
0447         job->size -= size;
0448 
0449         spin_unlock(&ps->lock);
0450         job->thread_fn(start, end, job->fn_arg);
0451         spin_lock(&ps->lock);
0452     }
0453 
0454     ++ps->nworks_fini;
0455     done = (ps->nworks_fini == ps->nworks);
0456     spin_unlock(&ps->lock);
0457 
0458     if (done)
0459         complete(&ps->completion);
0460 }
0461 
0462 /**
0463  * padata_do_multithreaded - run a multithreaded job
0464  * @job: Description of the job.
0465  *
0466  * See the definition of struct padata_mt_job for more details.
0467  */
0468 void __init padata_do_multithreaded(struct padata_mt_job *job)
0469 {
0470     /* In case threads finish at different times. */
0471     static const unsigned long load_balance_factor = 4;
0472     struct padata_work my_work, *pw;
0473     struct padata_mt_job_state ps;
0474     LIST_HEAD(works);
0475     int nworks;
0476 
0477     if (job->size == 0)
0478         return;
0479 
0480     /* Ensure at least one thread when size < min_chunk. */
0481     nworks = max(job->size / job->min_chunk, 1ul);
0482     nworks = min(nworks, job->max_threads);
0483 
0484     if (nworks == 1) {
0485         /* Single thread, no coordination needed, cut to the chase. */
0486         job->thread_fn(job->start, job->start + job->size, job->fn_arg);
0487         return;
0488     }
0489 
0490     spin_lock_init(&ps.lock);
0491     init_completion(&ps.completion);
0492     ps.job         = job;
0493     ps.nworks      = padata_work_alloc_mt(nworks, &ps, &works);
0494     ps.nworks_fini = 0;
0495 
0496     /*
0497      * Chunk size is the amount of work a helper does per call to the
0498      * thread function.  Load balance large jobs between threads by
0499      * increasing the number of chunks, guarantee at least the minimum
0500      * chunk size from the caller, and honor the caller's alignment.
0501      */
0502     ps.chunk_size = job->size / (ps.nworks * load_balance_factor);
0503     ps.chunk_size = max(ps.chunk_size, job->min_chunk);
0504     ps.chunk_size = roundup(ps.chunk_size, job->align);
0505 
0506     list_for_each_entry(pw, &works, pw_list)
0507         queue_work(system_unbound_wq, &pw->pw_work);
0508 
0509     /* Use the current thread, which saves starting a workqueue worker. */
0510     padata_work_init(&my_work, padata_mt_helper, &ps, PADATA_WORK_ONSTACK);
0511     padata_mt_helper(&my_work.pw_work);
0512 
0513     /* Wait for all the helpers to finish. */
0514     wait_for_completion(&ps.completion);
0515 
0516     destroy_work_on_stack(&my_work.pw_work);
0517     padata_works_free(&works);
0518 }
0519 
0520 static void __padata_list_init(struct padata_list *pd_list)
0521 {
0522     INIT_LIST_HEAD(&pd_list->list);
0523     spin_lock_init(&pd_list->lock);
0524 }
0525 
0526 /* Initialize all percpu queues used by serial workers */
0527 static void padata_init_squeues(struct parallel_data *pd)
0528 {
0529     int cpu;
0530     struct padata_serial_queue *squeue;
0531 
0532     for_each_cpu(cpu, pd->cpumask.cbcpu) {
0533         squeue = per_cpu_ptr(pd->squeue, cpu);
0534         squeue->pd = pd;
0535         __padata_list_init(&squeue->serial);
0536         INIT_WORK(&squeue->work, padata_serial_worker);
0537     }
0538 }
0539 
0540 /* Initialize per-CPU reorder lists */
0541 static void padata_init_reorder_list(struct parallel_data *pd)
0542 {
0543     int cpu;
0544     struct padata_list *list;
0545 
0546     for_each_cpu(cpu, pd->cpumask.pcpu) {
0547         list = per_cpu_ptr(pd->reorder_list, cpu);
0548         __padata_list_init(list);
0549     }
0550 }
0551 
0552 /* Allocate and initialize the internal cpumask dependend resources. */
0553 static struct parallel_data *padata_alloc_pd(struct padata_shell *ps)
0554 {
0555     struct padata_instance *pinst = ps->pinst;
0556     struct parallel_data *pd;
0557 
0558     pd = kzalloc(sizeof(struct parallel_data), GFP_KERNEL);
0559     if (!pd)
0560         goto err;
0561 
0562     pd->reorder_list = alloc_percpu(struct padata_list);
0563     if (!pd->reorder_list)
0564         goto err_free_pd;
0565 
0566     pd->squeue = alloc_percpu(struct padata_serial_queue);
0567     if (!pd->squeue)
0568         goto err_free_reorder_list;
0569 
0570     pd->ps = ps;
0571 
0572     if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL))
0573         goto err_free_squeue;
0574     if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL))
0575         goto err_free_pcpu;
0576 
0577     cpumask_and(pd->cpumask.pcpu, pinst->cpumask.pcpu, cpu_online_mask);
0578     cpumask_and(pd->cpumask.cbcpu, pinst->cpumask.cbcpu, cpu_online_mask);
0579 
0580     padata_init_reorder_list(pd);
0581     padata_init_squeues(pd);
0582     pd->seq_nr = -1;
0583     refcount_set(&pd->refcnt, 1);
0584     spin_lock_init(&pd->lock);
0585     pd->cpu = cpumask_first(pd->cpumask.pcpu);
0586     INIT_WORK(&pd->reorder_work, invoke_padata_reorder);
0587 
0588     return pd;
0589 
0590 err_free_pcpu:
0591     free_cpumask_var(pd->cpumask.pcpu);
0592 err_free_squeue:
0593     free_percpu(pd->squeue);
0594 err_free_reorder_list:
0595     free_percpu(pd->reorder_list);
0596 err_free_pd:
0597     kfree(pd);
0598 err:
0599     return NULL;
0600 }
0601 
0602 static void padata_free_pd(struct parallel_data *pd)
0603 {
0604     free_cpumask_var(pd->cpumask.pcpu);
0605     free_cpumask_var(pd->cpumask.cbcpu);
0606     free_percpu(pd->reorder_list);
0607     free_percpu(pd->squeue);
0608     kfree(pd);
0609 }
0610 
0611 static void __padata_start(struct padata_instance *pinst)
0612 {
0613     pinst->flags |= PADATA_INIT;
0614 }
0615 
0616 static void __padata_stop(struct padata_instance *pinst)
0617 {
0618     if (!(pinst->flags & PADATA_INIT))
0619         return;
0620 
0621     pinst->flags &= ~PADATA_INIT;
0622 
0623     synchronize_rcu();
0624 }
0625 
0626 /* Replace the internal control structure with a new one. */
0627 static int padata_replace_one(struct padata_shell *ps)
0628 {
0629     struct parallel_data *pd_new;
0630 
0631     pd_new = padata_alloc_pd(ps);
0632     if (!pd_new)
0633         return -ENOMEM;
0634 
0635     ps->opd = rcu_dereference_protected(ps->pd, 1);
0636     rcu_assign_pointer(ps->pd, pd_new);
0637 
0638     return 0;
0639 }
0640 
0641 static int padata_replace(struct padata_instance *pinst)
0642 {
0643     struct padata_shell *ps;
0644     int err = 0;
0645 
0646     pinst->flags |= PADATA_RESET;
0647 
0648     list_for_each_entry(ps, &pinst->pslist, list) {
0649         err = padata_replace_one(ps);
0650         if (err)
0651             break;
0652     }
0653 
0654     synchronize_rcu();
0655 
0656     list_for_each_entry_continue_reverse(ps, &pinst->pslist, list)
0657         if (refcount_dec_and_test(&ps->opd->refcnt))
0658             padata_free_pd(ps->opd);
0659 
0660     pinst->flags &= ~PADATA_RESET;
0661 
0662     return err;
0663 }
0664 
0665 /* If cpumask contains no active cpu, we mark the instance as invalid. */
0666 static bool padata_validate_cpumask(struct padata_instance *pinst,
0667                     const struct cpumask *cpumask)
0668 {
0669     if (!cpumask_intersects(cpumask, cpu_online_mask)) {
0670         pinst->flags |= PADATA_INVALID;
0671         return false;
0672     }
0673 
0674     pinst->flags &= ~PADATA_INVALID;
0675     return true;
0676 }
0677 
0678 static int __padata_set_cpumasks(struct padata_instance *pinst,
0679                  cpumask_var_t pcpumask,
0680                  cpumask_var_t cbcpumask)
0681 {
0682     int valid;
0683     int err;
0684 
0685     valid = padata_validate_cpumask(pinst, pcpumask);
0686     if (!valid) {
0687         __padata_stop(pinst);
0688         goto out_replace;
0689     }
0690 
0691     valid = padata_validate_cpumask(pinst, cbcpumask);
0692     if (!valid)
0693         __padata_stop(pinst);
0694 
0695 out_replace:
0696     cpumask_copy(pinst->cpumask.pcpu, pcpumask);
0697     cpumask_copy(pinst->cpumask.cbcpu, cbcpumask);
0698 
0699     err = padata_setup_cpumasks(pinst) ?: padata_replace(pinst);
0700 
0701     if (valid)
0702         __padata_start(pinst);
0703 
0704     return err;
0705 }
0706 
0707 /**
0708  * padata_set_cpumask - Sets specified by @cpumask_type cpumask to the value
0709  *                      equivalent to @cpumask.
0710  * @pinst: padata instance
0711  * @cpumask_type: PADATA_CPU_SERIAL or PADATA_CPU_PARALLEL corresponding
0712  *                to parallel and serial cpumasks respectively.
0713  * @cpumask: the cpumask to use
0714  *
0715  * Return: 0 on success or negative error code
0716  */
0717 int padata_set_cpumask(struct padata_instance *pinst, int cpumask_type,
0718                cpumask_var_t cpumask)
0719 {
0720     struct cpumask *serial_mask, *parallel_mask;
0721     int err = -EINVAL;
0722 
0723     cpus_read_lock();
0724     mutex_lock(&pinst->lock);
0725 
0726     switch (cpumask_type) {
0727     case PADATA_CPU_PARALLEL:
0728         serial_mask = pinst->cpumask.cbcpu;
0729         parallel_mask = cpumask;
0730         break;
0731     case PADATA_CPU_SERIAL:
0732         parallel_mask = pinst->cpumask.pcpu;
0733         serial_mask = cpumask;
0734         break;
0735     default:
0736          goto out;
0737     }
0738 
0739     err =  __padata_set_cpumasks(pinst, parallel_mask, serial_mask);
0740 
0741 out:
0742     mutex_unlock(&pinst->lock);
0743     cpus_read_unlock();
0744 
0745     return err;
0746 }
0747 EXPORT_SYMBOL(padata_set_cpumask);
0748 
0749 #ifdef CONFIG_HOTPLUG_CPU
0750 
0751 static int __padata_add_cpu(struct padata_instance *pinst, int cpu)
0752 {
0753     int err = 0;
0754 
0755     if (cpumask_test_cpu(cpu, cpu_online_mask)) {
0756         err = padata_replace(pinst);
0757 
0758         if (padata_validate_cpumask(pinst, pinst->cpumask.pcpu) &&
0759             padata_validate_cpumask(pinst, pinst->cpumask.cbcpu))
0760             __padata_start(pinst);
0761     }
0762 
0763     return err;
0764 }
0765 
0766 static int __padata_remove_cpu(struct padata_instance *pinst, int cpu)
0767 {
0768     int err = 0;
0769 
0770     if (!cpumask_test_cpu(cpu, cpu_online_mask)) {
0771         if (!padata_validate_cpumask(pinst, pinst->cpumask.pcpu) ||
0772             !padata_validate_cpumask(pinst, pinst->cpumask.cbcpu))
0773             __padata_stop(pinst);
0774 
0775         err = padata_replace(pinst);
0776     }
0777 
0778     return err;
0779 }
0780 
0781 static inline int pinst_has_cpu(struct padata_instance *pinst, int cpu)
0782 {
0783     return cpumask_test_cpu(cpu, pinst->cpumask.pcpu) ||
0784         cpumask_test_cpu(cpu, pinst->cpumask.cbcpu);
0785 }
0786 
0787 static int padata_cpu_online(unsigned int cpu, struct hlist_node *node)
0788 {
0789     struct padata_instance *pinst;
0790     int ret;
0791 
0792     pinst = hlist_entry_safe(node, struct padata_instance, cpu_online_node);
0793     if (!pinst_has_cpu(pinst, cpu))
0794         return 0;
0795 
0796     mutex_lock(&pinst->lock);
0797     ret = __padata_add_cpu(pinst, cpu);
0798     mutex_unlock(&pinst->lock);
0799     return ret;
0800 }
0801 
0802 static int padata_cpu_dead(unsigned int cpu, struct hlist_node *node)
0803 {
0804     struct padata_instance *pinst;
0805     int ret;
0806 
0807     pinst = hlist_entry_safe(node, struct padata_instance, cpu_dead_node);
0808     if (!pinst_has_cpu(pinst, cpu))
0809         return 0;
0810 
0811     mutex_lock(&pinst->lock);
0812     ret = __padata_remove_cpu(pinst, cpu);
0813     mutex_unlock(&pinst->lock);
0814     return ret;
0815 }
0816 
0817 static enum cpuhp_state hp_online;
0818 #endif
0819 
0820 static void __padata_free(struct padata_instance *pinst)
0821 {
0822 #ifdef CONFIG_HOTPLUG_CPU
0823     cpuhp_state_remove_instance_nocalls(CPUHP_PADATA_DEAD,
0824                         &pinst->cpu_dead_node);
0825     cpuhp_state_remove_instance_nocalls(hp_online, &pinst->cpu_online_node);
0826 #endif
0827 
0828     WARN_ON(!list_empty(&pinst->pslist));
0829 
0830     free_cpumask_var(pinst->cpumask.pcpu);
0831     free_cpumask_var(pinst->cpumask.cbcpu);
0832     destroy_workqueue(pinst->serial_wq);
0833     destroy_workqueue(pinst->parallel_wq);
0834     kfree(pinst);
0835 }
0836 
0837 #define kobj2pinst(_kobj)                   \
0838     container_of(_kobj, struct padata_instance, kobj)
0839 #define attr2pentry(_attr)                  \
0840     container_of(_attr, struct padata_sysfs_entry, attr)
0841 
0842 static void padata_sysfs_release(struct kobject *kobj)
0843 {
0844     struct padata_instance *pinst = kobj2pinst(kobj);
0845     __padata_free(pinst);
0846 }
0847 
0848 struct padata_sysfs_entry {
0849     struct attribute attr;
0850     ssize_t (*show)(struct padata_instance *, struct attribute *, char *);
0851     ssize_t (*store)(struct padata_instance *, struct attribute *,
0852              const char *, size_t);
0853 };
0854 
0855 static ssize_t show_cpumask(struct padata_instance *pinst,
0856                 struct attribute *attr,  char *buf)
0857 {
0858     struct cpumask *cpumask;
0859     ssize_t len;
0860 
0861     mutex_lock(&pinst->lock);
0862     if (!strcmp(attr->name, "serial_cpumask"))
0863         cpumask = pinst->cpumask.cbcpu;
0864     else
0865         cpumask = pinst->cpumask.pcpu;
0866 
0867     len = snprintf(buf, PAGE_SIZE, "%*pb\n",
0868                nr_cpu_ids, cpumask_bits(cpumask));
0869     mutex_unlock(&pinst->lock);
0870     return len < PAGE_SIZE ? len : -EINVAL;
0871 }
0872 
0873 static ssize_t store_cpumask(struct padata_instance *pinst,
0874                  struct attribute *attr,
0875                  const char *buf, size_t count)
0876 {
0877     cpumask_var_t new_cpumask;
0878     ssize_t ret;
0879     int mask_type;
0880 
0881     if (!alloc_cpumask_var(&new_cpumask, GFP_KERNEL))
0882         return -ENOMEM;
0883 
0884     ret = bitmap_parse(buf, count, cpumask_bits(new_cpumask),
0885                nr_cpumask_bits);
0886     if (ret < 0)
0887         goto out;
0888 
0889     mask_type = !strcmp(attr->name, "serial_cpumask") ?
0890         PADATA_CPU_SERIAL : PADATA_CPU_PARALLEL;
0891     ret = padata_set_cpumask(pinst, mask_type, new_cpumask);
0892     if (!ret)
0893         ret = count;
0894 
0895 out:
0896     free_cpumask_var(new_cpumask);
0897     return ret;
0898 }
0899 
0900 #define PADATA_ATTR_RW(_name, _show_name, _store_name)      \
0901     static struct padata_sysfs_entry _name##_attr =     \
0902         __ATTR(_name, 0644, _show_name, _store_name)
0903 #define PADATA_ATTR_RO(_name, _show_name)       \
0904     static struct padata_sysfs_entry _name##_attr = \
0905         __ATTR(_name, 0400, _show_name, NULL)
0906 
0907 PADATA_ATTR_RW(serial_cpumask, show_cpumask, store_cpumask);
0908 PADATA_ATTR_RW(parallel_cpumask, show_cpumask, store_cpumask);
0909 
0910 /*
0911  * Padata sysfs provides the following objects:
0912  * serial_cpumask   [RW] - cpumask for serial workers
0913  * parallel_cpumask [RW] - cpumask for parallel workers
0914  */
0915 static struct attribute *padata_default_attrs[] = {
0916     &serial_cpumask_attr.attr,
0917     &parallel_cpumask_attr.attr,
0918     NULL,
0919 };
0920 ATTRIBUTE_GROUPS(padata_default);
0921 
0922 static ssize_t padata_sysfs_show(struct kobject *kobj,
0923                  struct attribute *attr, char *buf)
0924 {
0925     struct padata_instance *pinst;
0926     struct padata_sysfs_entry *pentry;
0927     ssize_t ret = -EIO;
0928 
0929     pinst = kobj2pinst(kobj);
0930     pentry = attr2pentry(attr);
0931     if (pentry->show)
0932         ret = pentry->show(pinst, attr, buf);
0933 
0934     return ret;
0935 }
0936 
0937 static ssize_t padata_sysfs_store(struct kobject *kobj, struct attribute *attr,
0938                   const char *buf, size_t count)
0939 {
0940     struct padata_instance *pinst;
0941     struct padata_sysfs_entry *pentry;
0942     ssize_t ret = -EIO;
0943 
0944     pinst = kobj2pinst(kobj);
0945     pentry = attr2pentry(attr);
0946     if (pentry->show)
0947         ret = pentry->store(pinst, attr, buf, count);
0948 
0949     return ret;
0950 }
0951 
0952 static const struct sysfs_ops padata_sysfs_ops = {
0953     .show = padata_sysfs_show,
0954     .store = padata_sysfs_store,
0955 };
0956 
0957 static struct kobj_type padata_attr_type = {
0958     .sysfs_ops = &padata_sysfs_ops,
0959     .default_groups = padata_default_groups,
0960     .release = padata_sysfs_release,
0961 };
0962 
0963 /**
0964  * padata_alloc - allocate and initialize a padata instance
0965  * @name: used to identify the instance
0966  *
0967  * Return: new instance on success, NULL on error
0968  */
0969 struct padata_instance *padata_alloc(const char *name)
0970 {
0971     struct padata_instance *pinst;
0972 
0973     pinst = kzalloc(sizeof(struct padata_instance), GFP_KERNEL);
0974     if (!pinst)
0975         goto err;
0976 
0977     pinst->parallel_wq = alloc_workqueue("%s_parallel", WQ_UNBOUND, 0,
0978                          name);
0979     if (!pinst->parallel_wq)
0980         goto err_free_inst;
0981 
0982     cpus_read_lock();
0983 
0984     pinst->serial_wq = alloc_workqueue("%s_serial", WQ_MEM_RECLAIM |
0985                        WQ_CPU_INTENSIVE, 1, name);
0986     if (!pinst->serial_wq)
0987         goto err_put_cpus;
0988 
0989     if (!alloc_cpumask_var(&pinst->cpumask.pcpu, GFP_KERNEL))
0990         goto err_free_serial_wq;
0991     if (!alloc_cpumask_var(&pinst->cpumask.cbcpu, GFP_KERNEL)) {
0992         free_cpumask_var(pinst->cpumask.pcpu);
0993         goto err_free_serial_wq;
0994     }
0995 
0996     INIT_LIST_HEAD(&pinst->pslist);
0997 
0998     cpumask_copy(pinst->cpumask.pcpu, cpu_possible_mask);
0999     cpumask_copy(pinst->cpumask.cbcpu, cpu_possible_mask);
1000 
1001     if (padata_setup_cpumasks(pinst))
1002         goto err_free_masks;
1003 
1004     __padata_start(pinst);
1005 
1006     kobject_init(&pinst->kobj, &padata_attr_type);
1007     mutex_init(&pinst->lock);
1008 
1009 #ifdef CONFIG_HOTPLUG_CPU
1010     cpuhp_state_add_instance_nocalls_cpuslocked(hp_online,
1011                             &pinst->cpu_online_node);
1012     cpuhp_state_add_instance_nocalls_cpuslocked(CPUHP_PADATA_DEAD,
1013                             &pinst->cpu_dead_node);
1014 #endif
1015 
1016     cpus_read_unlock();
1017 
1018     return pinst;
1019 
1020 err_free_masks:
1021     free_cpumask_var(pinst->cpumask.pcpu);
1022     free_cpumask_var(pinst->cpumask.cbcpu);
1023 err_free_serial_wq:
1024     destroy_workqueue(pinst->serial_wq);
1025 err_put_cpus:
1026     cpus_read_unlock();
1027     destroy_workqueue(pinst->parallel_wq);
1028 err_free_inst:
1029     kfree(pinst);
1030 err:
1031     return NULL;
1032 }
1033 EXPORT_SYMBOL(padata_alloc);
1034 
1035 /**
1036  * padata_free - free a padata instance
1037  *
1038  * @pinst: padata instance to free
1039  */
1040 void padata_free(struct padata_instance *pinst)
1041 {
1042     kobject_put(&pinst->kobj);
1043 }
1044 EXPORT_SYMBOL(padata_free);
1045 
1046 /**
1047  * padata_alloc_shell - Allocate and initialize padata shell.
1048  *
1049  * @pinst: Parent padata_instance object.
1050  *
1051  * Return: new shell on success, NULL on error
1052  */
1053 struct padata_shell *padata_alloc_shell(struct padata_instance *pinst)
1054 {
1055     struct parallel_data *pd;
1056     struct padata_shell *ps;
1057 
1058     ps = kzalloc(sizeof(*ps), GFP_KERNEL);
1059     if (!ps)
1060         goto out;
1061 
1062     ps->pinst = pinst;
1063 
1064     cpus_read_lock();
1065     pd = padata_alloc_pd(ps);
1066     cpus_read_unlock();
1067 
1068     if (!pd)
1069         goto out_free_ps;
1070 
1071     mutex_lock(&pinst->lock);
1072     RCU_INIT_POINTER(ps->pd, pd);
1073     list_add(&ps->list, &pinst->pslist);
1074     mutex_unlock(&pinst->lock);
1075 
1076     return ps;
1077 
1078 out_free_ps:
1079     kfree(ps);
1080 out:
1081     return NULL;
1082 }
1083 EXPORT_SYMBOL(padata_alloc_shell);
1084 
1085 /**
1086  * padata_free_shell - free a padata shell
1087  *
1088  * @ps: padata shell to free
1089  */
1090 void padata_free_shell(struct padata_shell *ps)
1091 {
1092     if (!ps)
1093         return;
1094 
1095     mutex_lock(&ps->pinst->lock);
1096     list_del(&ps->list);
1097     padata_free_pd(rcu_dereference_protected(ps->pd, 1));
1098     mutex_unlock(&ps->pinst->lock);
1099 
1100     kfree(ps);
1101 }
1102 EXPORT_SYMBOL(padata_free_shell);
1103 
1104 void __init padata_init(void)
1105 {
1106     unsigned int i, possible_cpus;
1107 #ifdef CONFIG_HOTPLUG_CPU
1108     int ret;
1109 
1110     ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "padata:online",
1111                       padata_cpu_online, NULL);
1112     if (ret < 0)
1113         goto err;
1114     hp_online = ret;
1115 
1116     ret = cpuhp_setup_state_multi(CPUHP_PADATA_DEAD, "padata:dead",
1117                       NULL, padata_cpu_dead);
1118     if (ret < 0)
1119         goto remove_online_state;
1120 #endif
1121 
1122     possible_cpus = num_possible_cpus();
1123     padata_works = kmalloc_array(possible_cpus, sizeof(struct padata_work),
1124                      GFP_KERNEL);
1125     if (!padata_works)
1126         goto remove_dead_state;
1127 
1128     for (i = 0; i < possible_cpus; ++i)
1129         list_add(&padata_works[i].pw_list, &padata_free_works);
1130 
1131     return;
1132 
1133 remove_dead_state:
1134 #ifdef CONFIG_HOTPLUG_CPU
1135     cpuhp_remove_multi_state(CPUHP_PADATA_DEAD);
1136 remove_online_state:
1137     cpuhp_remove_multi_state(hp_online);
1138 err:
1139 #endif
1140     pr_warn("padata: initialization failed\n");
1141 }