Back to home page

LXR

 
 

    


0001 /*
0002  * padata.c - generic interface to process data streams in parallel
0003  *
0004  * See Documentation/padata.txt for an api documentation.
0005  *
0006  * Copyright (C) 2008, 2009 secunet Security Networks AG
0007  * Copyright (C) 2008, 2009 Steffen Klassert <steffen.klassert@secunet.com>
0008  *
0009  * This program is free software; you can redistribute it and/or modify it
0010  * under the terms and conditions of the GNU General Public License,
0011  * version 2, as published by the Free Software Foundation.
0012  *
0013  * This program is distributed in the hope it will be useful, but WITHOUT
0014  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
0015  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
0016  * more details.
0017  *
0018  * You should have received a copy of the GNU General Public License along with
0019  * this program; if not, write to the Free Software Foundation, Inc.,
0020  * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
0021  */
0022 
0023 #include <linux/export.h>
0024 #include <linux/cpumask.h>
0025 #include <linux/err.h>
0026 #include <linux/cpu.h>
0027 #include <linux/padata.h>
0028 #include <linux/mutex.h>
0029 #include <linux/sched.h>
0030 #include <linux/slab.h>
0031 #include <linux/sysfs.h>
0032 #include <linux/rcupdate.h>
0033 #include <linux/module.h>
0034 
0035 #define MAX_OBJ_NUM 1000
0036 
0037 static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index)
0038 {
0039     int cpu, target_cpu;
0040 
0041     target_cpu = cpumask_first(pd->cpumask.pcpu);
0042     for (cpu = 0; cpu < cpu_index; cpu++)
0043         target_cpu = cpumask_next(target_cpu, pd->cpumask.pcpu);
0044 
0045     return target_cpu;
0046 }
0047 
0048 static int padata_cpu_hash(struct parallel_data *pd)
0049 {
0050     unsigned int seq_nr;
0051     int cpu_index;
0052 
0053     /*
0054      * Hash the sequence numbers to the cpus by taking
0055      * seq_nr mod. number of cpus in use.
0056      */
0057 
0058     seq_nr = atomic_inc_return(&pd->seq_nr);
0059     cpu_index = seq_nr % cpumask_weight(pd->cpumask.pcpu);
0060 
0061     return padata_index_to_cpu(pd, cpu_index);
0062 }
0063 
0064 static void padata_parallel_worker(struct work_struct *parallel_work)
0065 {
0066     struct padata_parallel_queue *pqueue;
0067     LIST_HEAD(local_list);
0068 
0069     local_bh_disable();
0070     pqueue = container_of(parallel_work,
0071                   struct padata_parallel_queue, work);
0072 
0073     spin_lock(&pqueue->parallel.lock);
0074     list_replace_init(&pqueue->parallel.list, &local_list);
0075     spin_unlock(&pqueue->parallel.lock);
0076 
0077     while (!list_empty(&local_list)) {
0078         struct padata_priv *padata;
0079 
0080         padata = list_entry(local_list.next,
0081                     struct padata_priv, list);
0082 
0083         list_del_init(&padata->list);
0084 
0085         padata->parallel(padata);
0086     }
0087 
0088     local_bh_enable();
0089 }
0090 
0091 /**
0092  * padata_do_parallel - padata parallelization function
0093  *
0094  * @pinst: padata instance
0095  * @padata: object to be parallelized
0096  * @cb_cpu: cpu the serialization callback function will run on,
0097  *          must be in the serial cpumask of padata(i.e. cpumask.cbcpu).
0098  *
0099  * The parallelization callback function will run with BHs off.
0100  * Note: Every object which is parallelized by padata_do_parallel
0101  * must be seen by padata_do_serial.
0102  */
0103 int padata_do_parallel(struct padata_instance *pinst,
0104                struct padata_priv *padata, int cb_cpu)
0105 {
0106     int target_cpu, err;
0107     struct padata_parallel_queue *queue;
0108     struct parallel_data *pd;
0109 
0110     rcu_read_lock_bh();
0111 
0112     pd = rcu_dereference_bh(pinst->pd);
0113 
0114     err = -EINVAL;
0115     if (!(pinst->flags & PADATA_INIT) || pinst->flags & PADATA_INVALID)
0116         goto out;
0117 
0118     if (!cpumask_test_cpu(cb_cpu, pd->cpumask.cbcpu))
0119         goto out;
0120 
0121     err =  -EBUSY;
0122     if ((pinst->flags & PADATA_RESET))
0123         goto out;
0124 
0125     if (atomic_read(&pd->refcnt) >= MAX_OBJ_NUM)
0126         goto out;
0127 
0128     err = 0;
0129     atomic_inc(&pd->refcnt);
0130     padata->pd = pd;
0131     padata->cb_cpu = cb_cpu;
0132 
0133     target_cpu = padata_cpu_hash(pd);
0134     queue = per_cpu_ptr(pd->pqueue, target_cpu);
0135 
0136     spin_lock(&queue->parallel.lock);
0137     list_add_tail(&padata->list, &queue->parallel.list);
0138     spin_unlock(&queue->parallel.lock);
0139 
0140     queue_work_on(target_cpu, pinst->wq, &queue->work);
0141 
0142 out:
0143     rcu_read_unlock_bh();
0144 
0145     return err;
0146 }
0147 EXPORT_SYMBOL(padata_do_parallel);
0148 
0149 /*
0150  * padata_get_next - Get the next object that needs serialization.
0151  *
0152  * Return values are:
0153  *
0154  * A pointer to the control struct of the next object that needs
0155  * serialization, if present in one of the percpu reorder queues.
0156  *
0157  * NULL, if all percpu reorder queues are empty.
0158  *
0159  * -EINPROGRESS, if the next object that needs serialization will
0160  *  be parallel processed by another cpu and is not yet present in
0161  *  the cpu's reorder queue.
0162  *
0163  * -ENODATA, if this cpu has to do the parallel processing for
0164  *  the next object.
0165  */
0166 static struct padata_priv *padata_get_next(struct parallel_data *pd)
0167 {
0168     int cpu, num_cpus;
0169     unsigned int next_nr, next_index;
0170     struct padata_parallel_queue *next_queue;
0171     struct padata_priv *padata;
0172     struct padata_list *reorder;
0173 
0174     num_cpus = cpumask_weight(pd->cpumask.pcpu);
0175 
0176     /*
0177      * Calculate the percpu reorder queue and the sequence
0178      * number of the next object.
0179      */
0180     next_nr = pd->processed;
0181     next_index = next_nr % num_cpus;
0182     cpu = padata_index_to_cpu(pd, next_index);
0183     next_queue = per_cpu_ptr(pd->pqueue, cpu);
0184 
0185     padata = NULL;
0186 
0187     reorder = &next_queue->reorder;
0188 
0189     if (!list_empty(&reorder->list)) {
0190         padata = list_entry(reorder->list.next,
0191                     struct padata_priv, list);
0192 
0193         spin_lock(&reorder->lock);
0194         list_del_init(&padata->list);
0195         atomic_dec(&pd->reorder_objects);
0196         spin_unlock(&reorder->lock);
0197 
0198         pd->processed++;
0199 
0200         goto out;
0201     }
0202 
0203     if (__this_cpu_read(pd->pqueue->cpu_index) == next_queue->cpu_index) {
0204         padata = ERR_PTR(-ENODATA);
0205         goto out;
0206     }
0207 
0208     padata = ERR_PTR(-EINPROGRESS);
0209 out:
0210     return padata;
0211 }
0212 
0213 static void padata_reorder(struct parallel_data *pd)
0214 {
0215     int cb_cpu;
0216     struct padata_priv *padata;
0217     struct padata_serial_queue *squeue;
0218     struct padata_instance *pinst = pd->pinst;
0219 
0220     /*
0221      * We need to ensure that only one cpu can work on dequeueing of
0222      * the reorder queue the time. Calculating in which percpu reorder
0223      * queue the next object will arrive takes some time. A spinlock
0224      * would be highly contended. Also it is not clear in which order
0225      * the objects arrive to the reorder queues. So a cpu could wait to
0226      * get the lock just to notice that there is nothing to do at the
0227      * moment. Therefore we use a trylock and let the holder of the lock
0228      * care for all the objects enqueued during the holdtime of the lock.
0229      */
0230     if (!spin_trylock_bh(&pd->lock))
0231         return;
0232 
0233     while (1) {
0234         padata = padata_get_next(pd);
0235 
0236         /*
0237          * All reorder queues are empty, or the next object that needs
0238          * serialization is parallel processed by another cpu and is
0239          * still on it's way to the cpu's reorder queue, nothing to
0240          * do for now.
0241          */
0242         if (!padata || PTR_ERR(padata) == -EINPROGRESS)
0243             break;
0244 
0245         /*
0246          * This cpu has to do the parallel processing of the next
0247          * object. It's waiting in the cpu's parallelization queue,
0248          * so exit immediately.
0249          */
0250         if (PTR_ERR(padata) == -ENODATA) {
0251             del_timer(&pd->timer);
0252             spin_unlock_bh(&pd->lock);
0253             return;
0254         }
0255 
0256         cb_cpu = padata->cb_cpu;
0257         squeue = per_cpu_ptr(pd->squeue, cb_cpu);
0258 
0259         spin_lock(&squeue->serial.lock);
0260         list_add_tail(&padata->list, &squeue->serial.list);
0261         spin_unlock(&squeue->serial.lock);
0262 
0263         queue_work_on(cb_cpu, pinst->wq, &squeue->work);
0264     }
0265 
0266     spin_unlock_bh(&pd->lock);
0267 
0268     /*
0269      * The next object that needs serialization might have arrived to
0270      * the reorder queues in the meantime, we will be called again
0271      * from the timer function if no one else cares for it.
0272      */
0273     if (atomic_read(&pd->reorder_objects)
0274             && !(pinst->flags & PADATA_RESET))
0275         mod_timer(&pd->timer, jiffies + HZ);
0276     else
0277         del_timer(&pd->timer);
0278 
0279     return;
0280 }
0281 
0282 static void padata_reorder_timer(unsigned long arg)
0283 {
0284     struct parallel_data *pd = (struct parallel_data *)arg;
0285 
0286     padata_reorder(pd);
0287 }
0288 
0289 static void padata_serial_worker(struct work_struct *serial_work)
0290 {
0291     struct padata_serial_queue *squeue;
0292     struct parallel_data *pd;
0293     LIST_HEAD(local_list);
0294 
0295     local_bh_disable();
0296     squeue = container_of(serial_work, struct padata_serial_queue, work);
0297     pd = squeue->pd;
0298 
0299     spin_lock(&squeue->serial.lock);
0300     list_replace_init(&squeue->serial.list, &local_list);
0301     spin_unlock(&squeue->serial.lock);
0302 
0303     while (!list_empty(&local_list)) {
0304         struct padata_priv *padata;
0305 
0306         padata = list_entry(local_list.next,
0307                     struct padata_priv, list);
0308 
0309         list_del_init(&padata->list);
0310 
0311         padata->serial(padata);
0312         atomic_dec(&pd->refcnt);
0313     }
0314     local_bh_enable();
0315 }
0316 
0317 /**
0318  * padata_do_serial - padata serialization function
0319  *
0320  * @padata: object to be serialized.
0321  *
0322  * padata_do_serial must be called for every parallelized object.
0323  * The serialization callback function will run with BHs off.
0324  */
0325 void padata_do_serial(struct padata_priv *padata)
0326 {
0327     int cpu;
0328     struct padata_parallel_queue *pqueue;
0329     struct parallel_data *pd;
0330 
0331     pd = padata->pd;
0332 
0333     cpu = get_cpu();
0334     pqueue = per_cpu_ptr(pd->pqueue, cpu);
0335 
0336     spin_lock(&pqueue->reorder.lock);
0337     atomic_inc(&pd->reorder_objects);
0338     list_add_tail(&padata->list, &pqueue->reorder.list);
0339     spin_unlock(&pqueue->reorder.lock);
0340 
0341     put_cpu();
0342 
0343     padata_reorder(pd);
0344 }
0345 EXPORT_SYMBOL(padata_do_serial);
0346 
0347 static int padata_setup_cpumasks(struct parallel_data *pd,
0348                  const struct cpumask *pcpumask,
0349                  const struct cpumask *cbcpumask)
0350 {
0351     if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL))
0352         return -ENOMEM;
0353 
0354     cpumask_and(pd->cpumask.pcpu, pcpumask, cpu_online_mask);
0355     if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL)) {
0356         free_cpumask_var(pd->cpumask.cbcpu);
0357         return -ENOMEM;
0358     }
0359 
0360     cpumask_and(pd->cpumask.cbcpu, cbcpumask, cpu_online_mask);
0361     return 0;
0362 }
0363 
0364 static void __padata_list_init(struct padata_list *pd_list)
0365 {
0366     INIT_LIST_HEAD(&pd_list->list);
0367     spin_lock_init(&pd_list->lock);
0368 }
0369 
0370 /* Initialize all percpu queues used by serial workers */
0371 static void padata_init_squeues(struct parallel_data *pd)
0372 {
0373     int cpu;
0374     struct padata_serial_queue *squeue;
0375 
0376     for_each_cpu(cpu, pd->cpumask.cbcpu) {
0377         squeue = per_cpu_ptr(pd->squeue, cpu);
0378         squeue->pd = pd;
0379         __padata_list_init(&squeue->serial);
0380         INIT_WORK(&squeue->work, padata_serial_worker);
0381     }
0382 }
0383 
0384 /* Initialize all percpu queues used by parallel workers */
0385 static void padata_init_pqueues(struct parallel_data *pd)
0386 {
0387     int cpu_index, cpu;
0388     struct padata_parallel_queue *pqueue;
0389 
0390     cpu_index = 0;
0391     for_each_cpu(cpu, pd->cpumask.pcpu) {
0392         pqueue = per_cpu_ptr(pd->pqueue, cpu);
0393         pqueue->pd = pd;
0394         pqueue->cpu_index = cpu_index;
0395         cpu_index++;
0396 
0397         __padata_list_init(&pqueue->reorder);
0398         __padata_list_init(&pqueue->parallel);
0399         INIT_WORK(&pqueue->work, padata_parallel_worker);
0400         atomic_set(&pqueue->num_obj, 0);
0401     }
0402 }
0403 
0404 /* Allocate and initialize the internal cpumask dependend resources. */
0405 static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,
0406                          const struct cpumask *pcpumask,
0407                          const struct cpumask *cbcpumask)
0408 {
0409     struct parallel_data *pd;
0410 
0411     pd = kzalloc(sizeof(struct parallel_data), GFP_KERNEL);
0412     if (!pd)
0413         goto err;
0414 
0415     pd->pqueue = alloc_percpu(struct padata_parallel_queue);
0416     if (!pd->pqueue)
0417         goto err_free_pd;
0418 
0419     pd->squeue = alloc_percpu(struct padata_serial_queue);
0420     if (!pd->squeue)
0421         goto err_free_pqueue;
0422     if (padata_setup_cpumasks(pd, pcpumask, cbcpumask) < 0)
0423         goto err_free_squeue;
0424 
0425     padata_init_pqueues(pd);
0426     padata_init_squeues(pd);
0427     setup_timer(&pd->timer, padata_reorder_timer, (unsigned long)pd);
0428     atomic_set(&pd->seq_nr, -1);
0429     atomic_set(&pd->reorder_objects, 0);
0430     atomic_set(&pd->refcnt, 0);
0431     pd->pinst = pinst;
0432     spin_lock_init(&pd->lock);
0433 
0434     return pd;
0435 
0436 err_free_squeue:
0437     free_percpu(pd->squeue);
0438 err_free_pqueue:
0439     free_percpu(pd->pqueue);
0440 err_free_pd:
0441     kfree(pd);
0442 err:
0443     return NULL;
0444 }
0445 
0446 static void padata_free_pd(struct parallel_data *pd)
0447 {
0448     free_cpumask_var(pd->cpumask.pcpu);
0449     free_cpumask_var(pd->cpumask.cbcpu);
0450     free_percpu(pd->pqueue);
0451     free_percpu(pd->squeue);
0452     kfree(pd);
0453 }
0454 
0455 /* Flush all objects out of the padata queues. */
0456 static void padata_flush_queues(struct parallel_data *pd)
0457 {
0458     int cpu;
0459     struct padata_parallel_queue *pqueue;
0460     struct padata_serial_queue *squeue;
0461 
0462     for_each_cpu(cpu, pd->cpumask.pcpu) {
0463         pqueue = per_cpu_ptr(pd->pqueue, cpu);
0464         flush_work(&pqueue->work);
0465     }
0466 
0467     del_timer_sync(&pd->timer);
0468 
0469     if (atomic_read(&pd->reorder_objects))
0470         padata_reorder(pd);
0471 
0472     for_each_cpu(cpu, pd->cpumask.cbcpu) {
0473         squeue = per_cpu_ptr(pd->squeue, cpu);
0474         flush_work(&squeue->work);
0475     }
0476 
0477     BUG_ON(atomic_read(&pd->refcnt) != 0);
0478 }
0479 
0480 static void __padata_start(struct padata_instance *pinst)
0481 {
0482     pinst->flags |= PADATA_INIT;
0483 }
0484 
0485 static void __padata_stop(struct padata_instance *pinst)
0486 {
0487     if (!(pinst->flags & PADATA_INIT))
0488         return;
0489 
0490     pinst->flags &= ~PADATA_INIT;
0491 
0492     synchronize_rcu();
0493 
0494     get_online_cpus();
0495     padata_flush_queues(pinst->pd);
0496     put_online_cpus();
0497 }
0498 
0499 /* Replace the internal control structure with a new one. */
0500 static void padata_replace(struct padata_instance *pinst,
0501                struct parallel_data *pd_new)
0502 {
0503     struct parallel_data *pd_old = pinst->pd;
0504     int notification_mask = 0;
0505 
0506     pinst->flags |= PADATA_RESET;
0507 
0508     rcu_assign_pointer(pinst->pd, pd_new);
0509 
0510     synchronize_rcu();
0511 
0512     if (!cpumask_equal(pd_old->cpumask.pcpu, pd_new->cpumask.pcpu))
0513         notification_mask |= PADATA_CPU_PARALLEL;
0514     if (!cpumask_equal(pd_old->cpumask.cbcpu, pd_new->cpumask.cbcpu))
0515         notification_mask |= PADATA_CPU_SERIAL;
0516 
0517     padata_flush_queues(pd_old);
0518     padata_free_pd(pd_old);
0519 
0520     if (notification_mask)
0521         blocking_notifier_call_chain(&pinst->cpumask_change_notifier,
0522                          notification_mask,
0523                          &pd_new->cpumask);
0524 
0525     pinst->flags &= ~PADATA_RESET;
0526 }
0527 
0528 /**
0529  * padata_register_cpumask_notifier - Registers a notifier that will be called
0530  *                             if either pcpu or cbcpu or both cpumasks change.
0531  *
0532  * @pinst: A poineter to padata instance
0533  * @nblock: A pointer to notifier block.
0534  */
0535 int padata_register_cpumask_notifier(struct padata_instance *pinst,
0536                      struct notifier_block *nblock)
0537 {
0538     return blocking_notifier_chain_register(&pinst->cpumask_change_notifier,
0539                         nblock);
0540 }
0541 EXPORT_SYMBOL(padata_register_cpumask_notifier);
0542 
0543 /**
0544  * padata_unregister_cpumask_notifier - Unregisters cpumask notifier
0545  *        registered earlier  using padata_register_cpumask_notifier
0546  *
0547  * @pinst: A pointer to data instance.
0548  * @nlock: A pointer to notifier block.
0549  */
0550 int padata_unregister_cpumask_notifier(struct padata_instance *pinst,
0551                        struct notifier_block *nblock)
0552 {
0553     return blocking_notifier_chain_unregister(
0554         &pinst->cpumask_change_notifier,
0555         nblock);
0556 }
0557 EXPORT_SYMBOL(padata_unregister_cpumask_notifier);
0558 
0559 
0560 /* If cpumask contains no active cpu, we mark the instance as invalid. */
0561 static bool padata_validate_cpumask(struct padata_instance *pinst,
0562                     const struct cpumask *cpumask)
0563 {
0564     if (!cpumask_intersects(cpumask, cpu_online_mask)) {
0565         pinst->flags |= PADATA_INVALID;
0566         return false;
0567     }
0568 
0569     pinst->flags &= ~PADATA_INVALID;
0570     return true;
0571 }
0572 
0573 static int __padata_set_cpumasks(struct padata_instance *pinst,
0574                  cpumask_var_t pcpumask,
0575                  cpumask_var_t cbcpumask)
0576 {
0577     int valid;
0578     struct parallel_data *pd;
0579 
0580     valid = padata_validate_cpumask(pinst, pcpumask);
0581     if (!valid) {
0582         __padata_stop(pinst);
0583         goto out_replace;
0584     }
0585 
0586     valid = padata_validate_cpumask(pinst, cbcpumask);
0587     if (!valid)
0588         __padata_stop(pinst);
0589 
0590 out_replace:
0591     pd = padata_alloc_pd(pinst, pcpumask, cbcpumask);
0592     if (!pd)
0593         return -ENOMEM;
0594 
0595     cpumask_copy(pinst->cpumask.pcpu, pcpumask);
0596     cpumask_copy(pinst->cpumask.cbcpu, cbcpumask);
0597 
0598     padata_replace(pinst, pd);
0599 
0600     if (valid)
0601         __padata_start(pinst);
0602 
0603     return 0;
0604 }
0605 
0606 /**
0607  * padata_set_cpumask: Sets specified by @cpumask_type cpumask to the value
0608  *                     equivalent to @cpumask.
0609  *
0610  * @pinst: padata instance
0611  * @cpumask_type: PADATA_CPU_SERIAL or PADATA_CPU_PARALLEL corresponding
0612  *                to parallel and serial cpumasks respectively.
0613  * @cpumask: the cpumask to use
0614  */
0615 int padata_set_cpumask(struct padata_instance *pinst, int cpumask_type,
0616                cpumask_var_t cpumask)
0617 {
0618     struct cpumask *serial_mask, *parallel_mask;
0619     int err = -EINVAL;
0620 
0621     mutex_lock(&pinst->lock);
0622     get_online_cpus();
0623 
0624     switch (cpumask_type) {
0625     case PADATA_CPU_PARALLEL:
0626         serial_mask = pinst->cpumask.cbcpu;
0627         parallel_mask = cpumask;
0628         break;
0629     case PADATA_CPU_SERIAL:
0630         parallel_mask = pinst->cpumask.pcpu;
0631         serial_mask = cpumask;
0632         break;
0633     default:
0634          goto out;
0635     }
0636 
0637     err =  __padata_set_cpumasks(pinst, parallel_mask, serial_mask);
0638 
0639 out:
0640     put_online_cpus();
0641     mutex_unlock(&pinst->lock);
0642 
0643     return err;
0644 }
0645 EXPORT_SYMBOL(padata_set_cpumask);
0646 
0647 /**
0648  * padata_start - start the parallel processing
0649  *
0650  * @pinst: padata instance to start
0651  */
0652 int padata_start(struct padata_instance *pinst)
0653 {
0654     int err = 0;
0655 
0656     mutex_lock(&pinst->lock);
0657 
0658     if (pinst->flags & PADATA_INVALID)
0659         err = -EINVAL;
0660 
0661      __padata_start(pinst);
0662 
0663     mutex_unlock(&pinst->lock);
0664 
0665     return err;
0666 }
0667 EXPORT_SYMBOL(padata_start);
0668 
0669 /**
0670  * padata_stop - stop the parallel processing
0671  *
0672  * @pinst: padata instance to stop
0673  */
0674 void padata_stop(struct padata_instance *pinst)
0675 {
0676     mutex_lock(&pinst->lock);
0677     __padata_stop(pinst);
0678     mutex_unlock(&pinst->lock);
0679 }
0680 EXPORT_SYMBOL(padata_stop);
0681 
0682 #ifdef CONFIG_HOTPLUG_CPU
0683 
0684 static int __padata_add_cpu(struct padata_instance *pinst, int cpu)
0685 {
0686     struct parallel_data *pd;
0687 
0688     if (cpumask_test_cpu(cpu, cpu_online_mask)) {
0689         pd = padata_alloc_pd(pinst, pinst->cpumask.pcpu,
0690                      pinst->cpumask.cbcpu);
0691         if (!pd)
0692             return -ENOMEM;
0693 
0694         padata_replace(pinst, pd);
0695 
0696         if (padata_validate_cpumask(pinst, pinst->cpumask.pcpu) &&
0697             padata_validate_cpumask(pinst, pinst->cpumask.cbcpu))
0698             __padata_start(pinst);
0699     }
0700 
0701     return 0;
0702 }
0703 
0704 static int __padata_remove_cpu(struct padata_instance *pinst, int cpu)
0705 {
0706     struct parallel_data *pd = NULL;
0707 
0708     if (cpumask_test_cpu(cpu, cpu_online_mask)) {
0709 
0710         if (!padata_validate_cpumask(pinst, pinst->cpumask.pcpu) ||
0711             !padata_validate_cpumask(pinst, pinst->cpumask.cbcpu))
0712             __padata_stop(pinst);
0713 
0714         pd = padata_alloc_pd(pinst, pinst->cpumask.pcpu,
0715                      pinst->cpumask.cbcpu);
0716         if (!pd)
0717             return -ENOMEM;
0718 
0719         padata_replace(pinst, pd);
0720 
0721         cpumask_clear_cpu(cpu, pd->cpumask.cbcpu);
0722         cpumask_clear_cpu(cpu, pd->cpumask.pcpu);
0723     }
0724 
0725     return 0;
0726 }
0727 
0728  /**
0729  * padata_remove_cpu - remove a cpu from the one or both(serial and parallel)
0730  *                     padata cpumasks.
0731  *
0732  * @pinst: padata instance
0733  * @cpu: cpu to remove
0734  * @mask: bitmask specifying from which cpumask @cpu should be removed
0735  *        The @mask may be any combination of the following flags:
0736  *          PADATA_CPU_SERIAL   - serial cpumask
0737  *          PADATA_CPU_PARALLEL - parallel cpumask
0738  */
0739 int padata_remove_cpu(struct padata_instance *pinst, int cpu, int mask)
0740 {
0741     int err;
0742 
0743     if (!(mask & (PADATA_CPU_SERIAL | PADATA_CPU_PARALLEL)))
0744         return -EINVAL;
0745 
0746     mutex_lock(&pinst->lock);
0747 
0748     get_online_cpus();
0749     if (mask & PADATA_CPU_SERIAL)
0750         cpumask_clear_cpu(cpu, pinst->cpumask.cbcpu);
0751     if (mask & PADATA_CPU_PARALLEL)
0752         cpumask_clear_cpu(cpu, pinst->cpumask.pcpu);
0753 
0754     err = __padata_remove_cpu(pinst, cpu);
0755     put_online_cpus();
0756 
0757     mutex_unlock(&pinst->lock);
0758 
0759     return err;
0760 }
0761 EXPORT_SYMBOL(padata_remove_cpu);
0762 
0763 static inline int pinst_has_cpu(struct padata_instance *pinst, int cpu)
0764 {
0765     return cpumask_test_cpu(cpu, pinst->cpumask.pcpu) ||
0766         cpumask_test_cpu(cpu, pinst->cpumask.cbcpu);
0767 }
0768 
0769 static int padata_cpu_online(unsigned int cpu, struct hlist_node *node)
0770 {
0771     struct padata_instance *pinst;
0772     int ret;
0773 
0774     pinst = hlist_entry_safe(node, struct padata_instance, node);
0775     if (!pinst_has_cpu(pinst, cpu))
0776         return 0;
0777 
0778     mutex_lock(&pinst->lock);
0779     ret = __padata_add_cpu(pinst, cpu);
0780     mutex_unlock(&pinst->lock);
0781     return ret;
0782 }
0783 
0784 static int padata_cpu_prep_down(unsigned int cpu, struct hlist_node *node)
0785 {
0786     struct padata_instance *pinst;
0787     int ret;
0788 
0789     pinst = hlist_entry_safe(node, struct padata_instance, node);
0790     if (!pinst_has_cpu(pinst, cpu))
0791         return 0;
0792 
0793     mutex_lock(&pinst->lock);
0794     ret = __padata_remove_cpu(pinst, cpu);
0795     mutex_unlock(&pinst->lock);
0796     return ret;
0797 }
0798 
0799 static enum cpuhp_state hp_online;
0800 #endif
0801 
0802 static void __padata_free(struct padata_instance *pinst)
0803 {
0804 #ifdef CONFIG_HOTPLUG_CPU
0805     cpuhp_state_remove_instance_nocalls(hp_online, &pinst->node);
0806 #endif
0807 
0808     padata_stop(pinst);
0809     padata_free_pd(pinst->pd);
0810     free_cpumask_var(pinst->cpumask.pcpu);
0811     free_cpumask_var(pinst->cpumask.cbcpu);
0812     kfree(pinst);
0813 }
0814 
0815 #define kobj2pinst(_kobj)                   \
0816     container_of(_kobj, struct padata_instance, kobj)
0817 #define attr2pentry(_attr)                  \
0818     container_of(_attr, struct padata_sysfs_entry, attr)
0819 
0820 static void padata_sysfs_release(struct kobject *kobj)
0821 {
0822     struct padata_instance *pinst = kobj2pinst(kobj);
0823     __padata_free(pinst);
0824 }
0825 
0826 struct padata_sysfs_entry {
0827     struct attribute attr;
0828     ssize_t (*show)(struct padata_instance *, struct attribute *, char *);
0829     ssize_t (*store)(struct padata_instance *, struct attribute *,
0830              const char *, size_t);
0831 };
0832 
0833 static ssize_t show_cpumask(struct padata_instance *pinst,
0834                 struct attribute *attr,  char *buf)
0835 {
0836     struct cpumask *cpumask;
0837     ssize_t len;
0838 
0839     mutex_lock(&pinst->lock);
0840     if (!strcmp(attr->name, "serial_cpumask"))
0841         cpumask = pinst->cpumask.cbcpu;
0842     else
0843         cpumask = pinst->cpumask.pcpu;
0844 
0845     len = snprintf(buf, PAGE_SIZE, "%*pb\n",
0846                nr_cpu_ids, cpumask_bits(cpumask));
0847     mutex_unlock(&pinst->lock);
0848     return len < PAGE_SIZE ? len : -EINVAL;
0849 }
0850 
0851 static ssize_t store_cpumask(struct padata_instance *pinst,
0852                  struct attribute *attr,
0853                  const char *buf, size_t count)
0854 {
0855     cpumask_var_t new_cpumask;
0856     ssize_t ret;
0857     int mask_type;
0858 
0859     if (!alloc_cpumask_var(&new_cpumask, GFP_KERNEL))
0860         return -ENOMEM;
0861 
0862     ret = bitmap_parse(buf, count, cpumask_bits(new_cpumask),
0863                nr_cpumask_bits);
0864     if (ret < 0)
0865         goto out;
0866 
0867     mask_type = !strcmp(attr->name, "serial_cpumask") ?
0868         PADATA_CPU_SERIAL : PADATA_CPU_PARALLEL;
0869     ret = padata_set_cpumask(pinst, mask_type, new_cpumask);
0870     if (!ret)
0871         ret = count;
0872 
0873 out:
0874     free_cpumask_var(new_cpumask);
0875     return ret;
0876 }
0877 
0878 #define PADATA_ATTR_RW(_name, _show_name, _store_name)      \
0879     static struct padata_sysfs_entry _name##_attr =     \
0880         __ATTR(_name, 0644, _show_name, _store_name)
0881 #define PADATA_ATTR_RO(_name, _show_name)       \
0882     static struct padata_sysfs_entry _name##_attr = \
0883         __ATTR(_name, 0400, _show_name, NULL)
0884 
0885 PADATA_ATTR_RW(serial_cpumask, show_cpumask, store_cpumask);
0886 PADATA_ATTR_RW(parallel_cpumask, show_cpumask, store_cpumask);
0887 
0888 /*
0889  * Padata sysfs provides the following objects:
0890  * serial_cpumask   [RW] - cpumask for serial workers
0891  * parallel_cpumask [RW] - cpumask for parallel workers
0892  */
0893 static struct attribute *padata_default_attrs[] = {
0894     &serial_cpumask_attr.attr,
0895     &parallel_cpumask_attr.attr,
0896     NULL,
0897 };
0898 
0899 static ssize_t padata_sysfs_show(struct kobject *kobj,
0900                  struct attribute *attr, char *buf)
0901 {
0902     struct padata_instance *pinst;
0903     struct padata_sysfs_entry *pentry;
0904     ssize_t ret = -EIO;
0905 
0906     pinst = kobj2pinst(kobj);
0907     pentry = attr2pentry(attr);
0908     if (pentry->show)
0909         ret = pentry->show(pinst, attr, buf);
0910 
0911     return ret;
0912 }
0913 
0914 static ssize_t padata_sysfs_store(struct kobject *kobj, struct attribute *attr,
0915                   const char *buf, size_t count)
0916 {
0917     struct padata_instance *pinst;
0918     struct padata_sysfs_entry *pentry;
0919     ssize_t ret = -EIO;
0920 
0921     pinst = kobj2pinst(kobj);
0922     pentry = attr2pentry(attr);
0923     if (pentry->show)
0924         ret = pentry->store(pinst, attr, buf, count);
0925 
0926     return ret;
0927 }
0928 
0929 static const struct sysfs_ops padata_sysfs_ops = {
0930     .show = padata_sysfs_show,
0931     .store = padata_sysfs_store,
0932 };
0933 
0934 static struct kobj_type padata_attr_type = {
0935     .sysfs_ops = &padata_sysfs_ops,
0936     .default_attrs = padata_default_attrs,
0937     .release = padata_sysfs_release,
0938 };
0939 
0940 /**
0941  * padata_alloc_possible - Allocate and initialize padata instance.
0942  *                         Use the cpu_possible_mask for serial and
0943  *                         parallel workers.
0944  *
0945  * @wq: workqueue to use for the allocated padata instance
0946  */
0947 struct padata_instance *padata_alloc_possible(struct workqueue_struct *wq)
0948 {
0949     return padata_alloc(wq, cpu_possible_mask, cpu_possible_mask);
0950 }
0951 EXPORT_SYMBOL(padata_alloc_possible);
0952 
0953 /**
0954  * padata_alloc - allocate and initialize a padata instance and specify
0955  *                cpumasks for serial and parallel workers.
0956  *
0957  * @wq: workqueue to use for the allocated padata instance
0958  * @pcpumask: cpumask that will be used for padata parallelization
0959  * @cbcpumask: cpumask that will be used for padata serialization
0960  */
0961 struct padata_instance *padata_alloc(struct workqueue_struct *wq,
0962                      const struct cpumask *pcpumask,
0963                      const struct cpumask *cbcpumask)
0964 {
0965     struct padata_instance *pinst;
0966     struct parallel_data *pd = NULL;
0967 
0968     pinst = kzalloc(sizeof(struct padata_instance), GFP_KERNEL);
0969     if (!pinst)
0970         goto err;
0971 
0972     get_online_cpus();
0973     if (!alloc_cpumask_var(&pinst->cpumask.pcpu, GFP_KERNEL))
0974         goto err_free_inst;
0975     if (!alloc_cpumask_var(&pinst->cpumask.cbcpu, GFP_KERNEL)) {
0976         free_cpumask_var(pinst->cpumask.pcpu);
0977         goto err_free_inst;
0978     }
0979     if (!padata_validate_cpumask(pinst, pcpumask) ||
0980         !padata_validate_cpumask(pinst, cbcpumask))
0981         goto err_free_masks;
0982 
0983     pd = padata_alloc_pd(pinst, pcpumask, cbcpumask);
0984     if (!pd)
0985         goto err_free_masks;
0986 
0987     rcu_assign_pointer(pinst->pd, pd);
0988 
0989     pinst->wq = wq;
0990 
0991     cpumask_copy(pinst->cpumask.pcpu, pcpumask);
0992     cpumask_copy(pinst->cpumask.cbcpu, cbcpumask);
0993 
0994     pinst->flags = 0;
0995 
0996     put_online_cpus();
0997 
0998     BLOCKING_INIT_NOTIFIER_HEAD(&pinst->cpumask_change_notifier);
0999     kobject_init(&pinst->kobj, &padata_attr_type);
1000     mutex_init(&pinst->lock);
1001 
1002 #ifdef CONFIG_HOTPLUG_CPU
1003     cpuhp_state_add_instance_nocalls(hp_online, &pinst->node);
1004 #endif
1005     return pinst;
1006 
1007 err_free_masks:
1008     free_cpumask_var(pinst->cpumask.pcpu);
1009     free_cpumask_var(pinst->cpumask.cbcpu);
1010 err_free_inst:
1011     kfree(pinst);
1012     put_online_cpus();
1013 err:
1014     return NULL;
1015 }
1016 
1017 /**
1018  * padata_free - free a padata instance
1019  *
1020  * @padata_inst: padata instance to free
1021  */
1022 void padata_free(struct padata_instance *pinst)
1023 {
1024     kobject_put(&pinst->kobj);
1025 }
1026 EXPORT_SYMBOL(padata_free);
1027 
1028 #ifdef CONFIG_HOTPLUG_CPU
1029 
1030 static __init int padata_driver_init(void)
1031 {
1032     int ret;
1033 
1034     ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "padata:online",
1035                       padata_cpu_online,
1036                       padata_cpu_prep_down);
1037     if (ret < 0)
1038         return ret;
1039     hp_online = ret;
1040     return 0;
1041 }
1042 module_init(padata_driver_init);
1043 
1044 static __exit void padata_driver_exit(void)
1045 {
1046     cpuhp_remove_multi_state(hp_online);
1047 }
1048 module_exit(padata_driver_exit);
1049 #endif