Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Copyright (C) 2002 Sistina Software (UK) Limited.
0003  * Copyright (C) 2006 Red Hat GmbH
0004  *
0005  * This file is released under the GPL.
0006  *
0007  * Kcopyd provides a simple interface for copying an area of one
0008  * block-device to one or more other block-devices, with an asynchronous
0009  * completion notification.
0010  */
0011 
0012 #include <linux/types.h>
0013 #include <linux/atomic.h>
0014 #include <linux/blkdev.h>
0015 #include <linux/fs.h>
0016 #include <linux/init.h>
0017 #include <linux/list.h>
0018 #include <linux/mempool.h>
0019 #include <linux/module.h>
0020 #include <linux/pagemap.h>
0021 #include <linux/slab.h>
0022 #include <linux/vmalloc.h>
0023 #include <linux/workqueue.h>
0024 #include <linux/mutex.h>
0025 #include <linux/delay.h>
0026 #include <linux/device-mapper.h>
0027 #include <linux/dm-kcopyd.h>
0028 
0029 #include "dm-core.h"
0030 
0031 #define SPLIT_COUNT 8
0032 #define MIN_JOBS    8
0033 
0034 #define DEFAULT_SUB_JOB_SIZE_KB 512
0035 #define MAX_SUB_JOB_SIZE_KB     1024
0036 
0037 static unsigned kcopyd_subjob_size_kb = DEFAULT_SUB_JOB_SIZE_KB;
0038 
0039 module_param(kcopyd_subjob_size_kb, uint, S_IRUGO | S_IWUSR);
0040 MODULE_PARM_DESC(kcopyd_subjob_size_kb, "Sub-job size for dm-kcopyd clients");
0041 
0042 static unsigned dm_get_kcopyd_subjob_size(void)
0043 {
0044     unsigned sub_job_size_kb;
0045 
0046     sub_job_size_kb = __dm_get_module_param(&kcopyd_subjob_size_kb,
0047                         DEFAULT_SUB_JOB_SIZE_KB,
0048                         MAX_SUB_JOB_SIZE_KB);
0049 
0050     return sub_job_size_kb << 1;
0051 }
0052 
0053 /*-----------------------------------------------------------------
0054  * Each kcopyd client has its own little pool of preallocated
0055  * pages for kcopyd io.
0056  *---------------------------------------------------------------*/
0057 struct dm_kcopyd_client {
0058     struct page_list *pages;
0059     unsigned nr_reserved_pages;
0060     unsigned nr_free_pages;
0061     unsigned sub_job_size;
0062 
0063     struct dm_io_client *io_client;
0064 
0065     wait_queue_head_t destroyq;
0066 
0067     mempool_t job_pool;
0068 
0069     struct workqueue_struct *kcopyd_wq;
0070     struct work_struct kcopyd_work;
0071 
0072     struct dm_kcopyd_throttle *throttle;
0073 
0074     atomic_t nr_jobs;
0075 
0076 /*
0077  * We maintain four lists of jobs:
0078  *
0079  * i)   jobs waiting for pages
0080  * ii)  jobs that have pages, and are waiting for the io to be issued.
0081  * iii) jobs that don't need to do any IO and just run a callback
0082  * iv) jobs that have completed.
0083  *
0084  * All four of these are protected by job_lock.
0085  */
0086     spinlock_t job_lock;
0087     struct list_head callback_jobs;
0088     struct list_head complete_jobs;
0089     struct list_head io_jobs;
0090     struct list_head pages_jobs;
0091 };
0092 
0093 static struct page_list zero_page_list;
0094 
0095 static DEFINE_SPINLOCK(throttle_spinlock);
0096 
0097 /*
0098  * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period.
0099  * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided
0100  * by 2.
0101  */
0102 #define ACCOUNT_INTERVAL_SHIFT      SHIFT_HZ
0103 
0104 /*
0105  * Sleep this number of milliseconds.
0106  *
0107  * The value was decided experimentally.
0108  * Smaller values seem to cause an increased copy rate above the limit.
0109  * The reason for this is unknown but possibly due to jiffies rounding errors
0110  * or read/write cache inside the disk.
0111  */
0112 #define SLEEP_MSEC          100
0113 
0114 /*
0115  * Maximum number of sleep events. There is a theoretical livelock if more
0116  * kcopyd clients do work simultaneously which this limit avoids.
0117  */
0118 #define MAX_SLEEPS          10
0119 
0120 static void io_job_start(struct dm_kcopyd_throttle *t)
0121 {
0122     unsigned throttle, now, difference;
0123     int slept = 0, skew;
0124 
0125     if (unlikely(!t))
0126         return;
0127 
0128 try_again:
0129     spin_lock_irq(&throttle_spinlock);
0130 
0131     throttle = READ_ONCE(t->throttle);
0132 
0133     if (likely(throttle >= 100))
0134         goto skip_limit;
0135 
0136     now = jiffies;
0137     difference = now - t->last_jiffies;
0138     t->last_jiffies = now;
0139     if (t->num_io_jobs)
0140         t->io_period += difference;
0141     t->total_period += difference;
0142 
0143     /*
0144      * Maintain sane values if we got a temporary overflow.
0145      */
0146     if (unlikely(t->io_period > t->total_period))
0147         t->io_period = t->total_period;
0148 
0149     if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) {
0150         int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT);
0151         t->total_period >>= shift;
0152         t->io_period >>= shift;
0153     }
0154 
0155     skew = t->io_period - throttle * t->total_period / 100;
0156 
0157     if (unlikely(skew > 0) && slept < MAX_SLEEPS) {
0158         slept++;
0159         spin_unlock_irq(&throttle_spinlock);
0160         msleep(SLEEP_MSEC);
0161         goto try_again;
0162     }
0163 
0164 skip_limit:
0165     t->num_io_jobs++;
0166 
0167     spin_unlock_irq(&throttle_spinlock);
0168 }
0169 
0170 static void io_job_finish(struct dm_kcopyd_throttle *t)
0171 {
0172     unsigned long flags;
0173 
0174     if (unlikely(!t))
0175         return;
0176 
0177     spin_lock_irqsave(&throttle_spinlock, flags);
0178 
0179     t->num_io_jobs--;
0180 
0181     if (likely(READ_ONCE(t->throttle) >= 100))
0182         goto skip_limit;
0183 
0184     if (!t->num_io_jobs) {
0185         unsigned now, difference;
0186 
0187         now = jiffies;
0188         difference = now - t->last_jiffies;
0189         t->last_jiffies = now;
0190 
0191         t->io_period += difference;
0192         t->total_period += difference;
0193 
0194         /*
0195          * Maintain sane values if we got a temporary overflow.
0196          */
0197         if (unlikely(t->io_period > t->total_period))
0198             t->io_period = t->total_period;
0199     }
0200 
0201 skip_limit:
0202     spin_unlock_irqrestore(&throttle_spinlock, flags);
0203 }
0204 
0205 
0206 static void wake(struct dm_kcopyd_client *kc)
0207 {
0208     queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
0209 }
0210 
0211 /*
0212  * Obtain one page for the use of kcopyd.
0213  */
0214 static struct page_list *alloc_pl(gfp_t gfp)
0215 {
0216     struct page_list *pl;
0217 
0218     pl = kmalloc(sizeof(*pl), gfp);
0219     if (!pl)
0220         return NULL;
0221 
0222     pl->page = alloc_page(gfp | __GFP_HIGHMEM);
0223     if (!pl->page) {
0224         kfree(pl);
0225         return NULL;
0226     }
0227 
0228     return pl;
0229 }
0230 
0231 static void free_pl(struct page_list *pl)
0232 {
0233     __free_page(pl->page);
0234     kfree(pl);
0235 }
0236 
0237 /*
0238  * Add the provided pages to a client's free page list, releasing
0239  * back to the system any beyond the reserved_pages limit.
0240  */
0241 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
0242 {
0243     struct page_list *next;
0244 
0245     do {
0246         next = pl->next;
0247 
0248         if (kc->nr_free_pages >= kc->nr_reserved_pages)
0249             free_pl(pl);
0250         else {
0251             pl->next = kc->pages;
0252             kc->pages = pl;
0253             kc->nr_free_pages++;
0254         }
0255 
0256         pl = next;
0257     } while (pl);
0258 }
0259 
0260 static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
0261                 unsigned int nr, struct page_list **pages)
0262 {
0263     struct page_list *pl;
0264 
0265     *pages = NULL;
0266 
0267     do {
0268         pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM);
0269         if (unlikely(!pl)) {
0270             /* Use reserved pages */
0271             pl = kc->pages;
0272             if (unlikely(!pl))
0273                 goto out_of_memory;
0274             kc->pages = pl->next;
0275             kc->nr_free_pages--;
0276         }
0277         pl->next = *pages;
0278         *pages = pl;
0279     } while (--nr);
0280 
0281     return 0;
0282 
0283 out_of_memory:
0284     if (*pages)
0285         kcopyd_put_pages(kc, *pages);
0286     return -ENOMEM;
0287 }
0288 
0289 /*
0290  * These three functions resize the page pool.
0291  */
0292 static void drop_pages(struct page_list *pl)
0293 {
0294     struct page_list *next;
0295 
0296     while (pl) {
0297         next = pl->next;
0298         free_pl(pl);
0299         pl = next;
0300     }
0301 }
0302 
0303 /*
0304  * Allocate and reserve nr_pages for the use of a specific client.
0305  */
0306 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
0307 {
0308     unsigned i;
0309     struct page_list *pl = NULL, *next;
0310 
0311     for (i = 0; i < nr_pages; i++) {
0312         next = alloc_pl(GFP_KERNEL);
0313         if (!next) {
0314             if (pl)
0315                 drop_pages(pl);
0316             return -ENOMEM;
0317         }
0318         next->next = pl;
0319         pl = next;
0320     }
0321 
0322     kc->nr_reserved_pages += nr_pages;
0323     kcopyd_put_pages(kc, pl);
0324 
0325     return 0;
0326 }
0327 
0328 static void client_free_pages(struct dm_kcopyd_client *kc)
0329 {
0330     BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
0331     drop_pages(kc->pages);
0332     kc->pages = NULL;
0333     kc->nr_free_pages = kc->nr_reserved_pages = 0;
0334 }
0335 
0336 /*-----------------------------------------------------------------
0337  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
0338  * for this reason we use a mempool to prevent the client from
0339  * ever having to do io (which could cause a deadlock).
0340  *---------------------------------------------------------------*/
0341 struct kcopyd_job {
0342     struct dm_kcopyd_client *kc;
0343     struct list_head list;
0344     unsigned flags;
0345 
0346     /*
0347      * Error state of the job.
0348      */
0349     int read_err;
0350     unsigned long write_err;
0351 
0352     /*
0353      * REQ_OP_READ, REQ_OP_WRITE or REQ_OP_WRITE_ZEROES.
0354      */
0355     enum req_op op;
0356     struct dm_io_region source;
0357 
0358     /*
0359      * The destinations for the transfer.
0360      */
0361     unsigned int num_dests;
0362     struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
0363 
0364     struct page_list *pages;
0365 
0366     /*
0367      * Set this to ensure you are notified when the job has
0368      * completed.  'context' is for callback to use.
0369      */
0370     dm_kcopyd_notify_fn fn;
0371     void *context;
0372 
0373     /*
0374      * These fields are only used if the job has been split
0375      * into more manageable parts.
0376      */
0377     struct mutex lock;
0378     atomic_t sub_jobs;
0379     sector_t progress;
0380     sector_t write_offset;
0381 
0382     struct kcopyd_job *master_job;
0383 };
0384 
0385 static struct kmem_cache *_job_cache;
0386 
0387 int __init dm_kcopyd_init(void)
0388 {
0389     _job_cache = kmem_cache_create("kcopyd_job",
0390                 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
0391                 __alignof__(struct kcopyd_job), 0, NULL);
0392     if (!_job_cache)
0393         return -ENOMEM;
0394 
0395     zero_page_list.next = &zero_page_list;
0396     zero_page_list.page = ZERO_PAGE(0);
0397 
0398     return 0;
0399 }
0400 
0401 void dm_kcopyd_exit(void)
0402 {
0403     kmem_cache_destroy(_job_cache);
0404     _job_cache = NULL;
0405 }
0406 
0407 /*
0408  * Functions to push and pop a job onto the head of a given job
0409  * list.
0410  */
0411 static struct kcopyd_job *pop_io_job(struct list_head *jobs,
0412                      struct dm_kcopyd_client *kc)
0413 {
0414     struct kcopyd_job *job;
0415 
0416     /*
0417      * For I/O jobs, pop any read, any write without sequential write
0418      * constraint and sequential writes that are at the right position.
0419      */
0420     list_for_each_entry(job, jobs, list) {
0421         if (job->op == REQ_OP_READ ||
0422             !(job->flags & BIT(DM_KCOPYD_WRITE_SEQ))) {
0423             list_del(&job->list);
0424             return job;
0425         }
0426 
0427         if (job->write_offset == job->master_job->write_offset) {
0428             job->master_job->write_offset += job->source.count;
0429             list_del(&job->list);
0430             return job;
0431         }
0432     }
0433 
0434     return NULL;
0435 }
0436 
0437 static struct kcopyd_job *pop(struct list_head *jobs,
0438                   struct dm_kcopyd_client *kc)
0439 {
0440     struct kcopyd_job *job = NULL;
0441 
0442     spin_lock_irq(&kc->job_lock);
0443 
0444     if (!list_empty(jobs)) {
0445         if (jobs == &kc->io_jobs)
0446             job = pop_io_job(jobs, kc);
0447         else {
0448             job = list_entry(jobs->next, struct kcopyd_job, list);
0449             list_del(&job->list);
0450         }
0451     }
0452     spin_unlock_irq(&kc->job_lock);
0453 
0454     return job;
0455 }
0456 
0457 static void push(struct list_head *jobs, struct kcopyd_job *job)
0458 {
0459     unsigned long flags;
0460     struct dm_kcopyd_client *kc = job->kc;
0461 
0462     spin_lock_irqsave(&kc->job_lock, flags);
0463     list_add_tail(&job->list, jobs);
0464     spin_unlock_irqrestore(&kc->job_lock, flags);
0465 }
0466 
0467 
0468 static void push_head(struct list_head *jobs, struct kcopyd_job *job)
0469 {
0470     struct dm_kcopyd_client *kc = job->kc;
0471 
0472     spin_lock_irq(&kc->job_lock);
0473     list_add(&job->list, jobs);
0474     spin_unlock_irq(&kc->job_lock);
0475 }
0476 
0477 /*
0478  * These three functions process 1 item from the corresponding
0479  * job list.
0480  *
0481  * They return:
0482  * < 0: error
0483  *   0: success
0484  * > 0: can't process yet.
0485  */
0486 static int run_complete_job(struct kcopyd_job *job)
0487 {
0488     void *context = job->context;
0489     int read_err = job->read_err;
0490     unsigned long write_err = job->write_err;
0491     dm_kcopyd_notify_fn fn = job->fn;
0492     struct dm_kcopyd_client *kc = job->kc;
0493 
0494     if (job->pages && job->pages != &zero_page_list)
0495         kcopyd_put_pages(kc, job->pages);
0496     /*
0497      * If this is the master job, the sub jobs have already
0498      * completed so we can free everything.
0499      */
0500     if (job->master_job == job) {
0501         mutex_destroy(&job->lock);
0502         mempool_free(job, &kc->job_pool);
0503     }
0504     fn(read_err, write_err, context);
0505 
0506     if (atomic_dec_and_test(&kc->nr_jobs))
0507         wake_up(&kc->destroyq);
0508 
0509     cond_resched();
0510 
0511     return 0;
0512 }
0513 
0514 static void complete_io(unsigned long error, void *context)
0515 {
0516     struct kcopyd_job *job = (struct kcopyd_job *) context;
0517     struct dm_kcopyd_client *kc = job->kc;
0518 
0519     io_job_finish(kc->throttle);
0520 
0521     if (error) {
0522         if (op_is_write(job->op))
0523             job->write_err |= error;
0524         else
0525             job->read_err = 1;
0526 
0527         if (!(job->flags & BIT(DM_KCOPYD_IGNORE_ERROR))) {
0528             push(&kc->complete_jobs, job);
0529             wake(kc);
0530             return;
0531         }
0532     }
0533 
0534     if (op_is_write(job->op))
0535         push(&kc->complete_jobs, job);
0536 
0537     else {
0538         job->op = REQ_OP_WRITE;
0539         push(&kc->io_jobs, job);
0540     }
0541 
0542     wake(kc);
0543 }
0544 
0545 /*
0546  * Request io on as many buffer heads as we can currently get for
0547  * a particular job.
0548  */
0549 static int run_io_job(struct kcopyd_job *job)
0550 {
0551     int r;
0552     struct dm_io_request io_req = {
0553         .bi_opf = job->op,
0554         .mem.type = DM_IO_PAGE_LIST,
0555         .mem.ptr.pl = job->pages,
0556         .mem.offset = 0,
0557         .notify.fn = complete_io,
0558         .notify.context = job,
0559         .client = job->kc->io_client,
0560     };
0561 
0562     /*
0563      * If we need to write sequentially and some reads or writes failed,
0564      * no point in continuing.
0565      */
0566     if (job->flags & BIT(DM_KCOPYD_WRITE_SEQ) &&
0567         job->master_job->write_err) {
0568         job->write_err = job->master_job->write_err;
0569         return -EIO;
0570     }
0571 
0572     io_job_start(job->kc->throttle);
0573 
0574     if (job->op == REQ_OP_READ)
0575         r = dm_io(&io_req, 1, &job->source, NULL);
0576     else
0577         r = dm_io(&io_req, job->num_dests, job->dests, NULL);
0578 
0579     return r;
0580 }
0581 
0582 static int run_pages_job(struct kcopyd_job *job)
0583 {
0584     int r;
0585     unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
0586 
0587     r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
0588     if (!r) {
0589         /* this job is ready for io */
0590         push(&job->kc->io_jobs, job);
0591         return 0;
0592     }
0593 
0594     if (r == -ENOMEM)
0595         /* can't complete now */
0596         return 1;
0597 
0598     return r;
0599 }
0600 
0601 /*
0602  * Run through a list for as long as possible.  Returns the count
0603  * of successful jobs.
0604  */
0605 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
0606             int (*fn) (struct kcopyd_job *))
0607 {
0608     struct kcopyd_job *job;
0609     int r, count = 0;
0610 
0611     while ((job = pop(jobs, kc))) {
0612 
0613         r = fn(job);
0614 
0615         if (r < 0) {
0616             /* error this rogue job */
0617             if (op_is_write(job->op))
0618                 job->write_err = (unsigned long) -1L;
0619             else
0620                 job->read_err = 1;
0621             push(&kc->complete_jobs, job);
0622             wake(kc);
0623             break;
0624         }
0625 
0626         if (r > 0) {
0627             /*
0628              * We couldn't service this job ATM, so
0629              * push this job back onto the list.
0630              */
0631             push_head(jobs, job);
0632             break;
0633         }
0634 
0635         count++;
0636     }
0637 
0638     return count;
0639 }
0640 
0641 /*
0642  * kcopyd does this every time it's woken up.
0643  */
0644 static void do_work(struct work_struct *work)
0645 {
0646     struct dm_kcopyd_client *kc = container_of(work,
0647                     struct dm_kcopyd_client, kcopyd_work);
0648     struct blk_plug plug;
0649 
0650     /*
0651      * The order that these are called is *very* important.
0652      * complete jobs can free some pages for pages jobs.
0653      * Pages jobs when successful will jump onto the io jobs
0654      * list.  io jobs call wake when they complete and it all
0655      * starts again.
0656      */
0657     spin_lock_irq(&kc->job_lock);
0658     list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs);
0659     spin_unlock_irq(&kc->job_lock);
0660 
0661     blk_start_plug(&plug);
0662     process_jobs(&kc->complete_jobs, kc, run_complete_job);
0663     process_jobs(&kc->pages_jobs, kc, run_pages_job);
0664     process_jobs(&kc->io_jobs, kc, run_io_job);
0665     blk_finish_plug(&plug);
0666 }
0667 
0668 /*
0669  * If we are copying a small region we just dispatch a single job
0670  * to do the copy, otherwise the io has to be split up into many
0671  * jobs.
0672  */
0673 static void dispatch_job(struct kcopyd_job *job)
0674 {
0675     struct dm_kcopyd_client *kc = job->kc;
0676     atomic_inc(&kc->nr_jobs);
0677     if (unlikely(!job->source.count))
0678         push(&kc->callback_jobs, job);
0679     else if (job->pages == &zero_page_list)
0680         push(&kc->io_jobs, job);
0681     else
0682         push(&kc->pages_jobs, job);
0683     wake(kc);
0684 }
0685 
0686 static void segment_complete(int read_err, unsigned long write_err,
0687                  void *context)
0688 {
0689     /* FIXME: tidy this function */
0690     sector_t progress = 0;
0691     sector_t count = 0;
0692     struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
0693     struct kcopyd_job *job = sub_job->master_job;
0694     struct dm_kcopyd_client *kc = job->kc;
0695 
0696     mutex_lock(&job->lock);
0697 
0698     /* update the error */
0699     if (read_err)
0700         job->read_err = 1;
0701 
0702     if (write_err)
0703         job->write_err |= write_err;
0704 
0705     /*
0706      * Only dispatch more work if there hasn't been an error.
0707      */
0708     if ((!job->read_err && !job->write_err) ||
0709         job->flags & BIT(DM_KCOPYD_IGNORE_ERROR)) {
0710         /* get the next chunk of work */
0711         progress = job->progress;
0712         count = job->source.count - progress;
0713         if (count) {
0714             if (count > kc->sub_job_size)
0715                 count = kc->sub_job_size;
0716 
0717             job->progress += count;
0718         }
0719     }
0720     mutex_unlock(&job->lock);
0721 
0722     if (count) {
0723         int i;
0724 
0725         *sub_job = *job;
0726         sub_job->write_offset = progress;
0727         sub_job->source.sector += progress;
0728         sub_job->source.count = count;
0729 
0730         for (i = 0; i < job->num_dests; i++) {
0731             sub_job->dests[i].sector += progress;
0732             sub_job->dests[i].count = count;
0733         }
0734 
0735         sub_job->fn = segment_complete;
0736         sub_job->context = sub_job;
0737         dispatch_job(sub_job);
0738 
0739     } else if (atomic_dec_and_test(&job->sub_jobs)) {
0740 
0741         /*
0742          * Queue the completion callback to the kcopyd thread.
0743          *
0744          * Some callers assume that all the completions are called
0745          * from a single thread and don't race with each other.
0746          *
0747          * We must not call the callback directly here because this
0748          * code may not be executing in the thread.
0749          */
0750         push(&kc->complete_jobs, job);
0751         wake(kc);
0752     }
0753 }
0754 
0755 /*
0756  * Create some sub jobs to share the work between them.
0757  */
0758 static void split_job(struct kcopyd_job *master_job)
0759 {
0760     int i;
0761 
0762     atomic_inc(&master_job->kc->nr_jobs);
0763 
0764     atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
0765     for (i = 0; i < SPLIT_COUNT; i++) {
0766         master_job[i + 1].master_job = master_job;
0767         segment_complete(0, 0u, &master_job[i + 1]);
0768     }
0769 }
0770 
0771 void dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
0772             unsigned int num_dests, struct dm_io_region *dests,
0773             unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
0774 {
0775     struct kcopyd_job *job;
0776     int i;
0777 
0778     /*
0779      * Allocate an array of jobs consisting of one master job
0780      * followed by SPLIT_COUNT sub jobs.
0781      */
0782     job = mempool_alloc(&kc->job_pool, GFP_NOIO);
0783     mutex_init(&job->lock);
0784 
0785     /*
0786      * set up for the read.
0787      */
0788     job->kc = kc;
0789     job->flags = flags;
0790     job->read_err = 0;
0791     job->write_err = 0;
0792 
0793     job->num_dests = num_dests;
0794     memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
0795 
0796     /*
0797      * If one of the destination is a host-managed zoned block device,
0798      * we need to write sequentially. If one of the destination is a
0799      * host-aware device, then leave it to the caller to choose what to do.
0800      */
0801     if (!(job->flags & BIT(DM_KCOPYD_WRITE_SEQ))) {
0802         for (i = 0; i < job->num_dests; i++) {
0803             if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) {
0804                 job->flags |= BIT(DM_KCOPYD_WRITE_SEQ);
0805                 break;
0806             }
0807         }
0808     }
0809 
0810     /*
0811      * If we need to write sequentially, errors cannot be ignored.
0812      */
0813     if (job->flags & BIT(DM_KCOPYD_WRITE_SEQ) &&
0814         job->flags & BIT(DM_KCOPYD_IGNORE_ERROR))
0815         job->flags &= ~BIT(DM_KCOPYD_IGNORE_ERROR);
0816 
0817     if (from) {
0818         job->source = *from;
0819         job->pages = NULL;
0820         job->op = REQ_OP_READ;
0821     } else {
0822         memset(&job->source, 0, sizeof job->source);
0823         job->source.count = job->dests[0].count;
0824         job->pages = &zero_page_list;
0825 
0826         /*
0827          * Use WRITE ZEROES to optimize zeroing if all dests support it.
0828          */
0829         job->op = REQ_OP_WRITE_ZEROES;
0830         for (i = 0; i < job->num_dests; i++)
0831             if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) {
0832                 job->op = REQ_OP_WRITE;
0833                 break;
0834             }
0835     }
0836 
0837     job->fn = fn;
0838     job->context = context;
0839     job->master_job = job;
0840     job->write_offset = 0;
0841 
0842     if (job->source.count <= kc->sub_job_size)
0843         dispatch_job(job);
0844     else {
0845         job->progress = 0;
0846         split_job(job);
0847     }
0848 }
0849 EXPORT_SYMBOL(dm_kcopyd_copy);
0850 
0851 void dm_kcopyd_zero(struct dm_kcopyd_client *kc,
0852             unsigned num_dests, struct dm_io_region *dests,
0853             unsigned flags, dm_kcopyd_notify_fn fn, void *context)
0854 {
0855     dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
0856 }
0857 EXPORT_SYMBOL(dm_kcopyd_zero);
0858 
0859 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
0860                  dm_kcopyd_notify_fn fn, void *context)
0861 {
0862     struct kcopyd_job *job;
0863 
0864     job = mempool_alloc(&kc->job_pool, GFP_NOIO);
0865 
0866     memset(job, 0, sizeof(struct kcopyd_job));
0867     job->kc = kc;
0868     job->fn = fn;
0869     job->context = context;
0870     job->master_job = job;
0871 
0872     atomic_inc(&kc->nr_jobs);
0873 
0874     return job;
0875 }
0876 EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
0877 
0878 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
0879 {
0880     struct kcopyd_job *job = j;
0881     struct dm_kcopyd_client *kc = job->kc;
0882 
0883     job->read_err = read_err;
0884     job->write_err = write_err;
0885 
0886     push(&kc->callback_jobs, job);
0887     wake(kc);
0888 }
0889 EXPORT_SYMBOL(dm_kcopyd_do_callback);
0890 
0891 /*
0892  * Cancels a kcopyd job, eg. someone might be deactivating a
0893  * mirror.
0894  */
0895 #if 0
0896 int kcopyd_cancel(struct kcopyd_job *job, int block)
0897 {
0898     /* FIXME: finish */
0899     return -1;
0900 }
0901 #endif  /*  0  */
0902 
0903 /*-----------------------------------------------------------------
0904  * Client setup
0905  *---------------------------------------------------------------*/
0906 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle)
0907 {
0908     int r;
0909     unsigned reserve_pages;
0910     struct dm_kcopyd_client *kc;
0911 
0912     kc = kzalloc(sizeof(*kc), GFP_KERNEL);
0913     if (!kc)
0914         return ERR_PTR(-ENOMEM);
0915 
0916     spin_lock_init(&kc->job_lock);
0917     INIT_LIST_HEAD(&kc->callback_jobs);
0918     INIT_LIST_HEAD(&kc->complete_jobs);
0919     INIT_LIST_HEAD(&kc->io_jobs);
0920     INIT_LIST_HEAD(&kc->pages_jobs);
0921     kc->throttle = throttle;
0922 
0923     r = mempool_init_slab_pool(&kc->job_pool, MIN_JOBS, _job_cache);
0924     if (r)
0925         goto bad_slab;
0926 
0927     INIT_WORK(&kc->kcopyd_work, do_work);
0928     kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0);
0929     if (!kc->kcopyd_wq) {
0930         r = -ENOMEM;
0931         goto bad_workqueue;
0932     }
0933 
0934     kc->sub_job_size = dm_get_kcopyd_subjob_size();
0935     reserve_pages = DIV_ROUND_UP(kc->sub_job_size << SECTOR_SHIFT, PAGE_SIZE);
0936 
0937     kc->pages = NULL;
0938     kc->nr_reserved_pages = kc->nr_free_pages = 0;
0939     r = client_reserve_pages(kc, reserve_pages);
0940     if (r)
0941         goto bad_client_pages;
0942 
0943     kc->io_client = dm_io_client_create();
0944     if (IS_ERR(kc->io_client)) {
0945         r = PTR_ERR(kc->io_client);
0946         goto bad_io_client;
0947     }
0948 
0949     init_waitqueue_head(&kc->destroyq);
0950     atomic_set(&kc->nr_jobs, 0);
0951 
0952     return kc;
0953 
0954 bad_io_client:
0955     client_free_pages(kc);
0956 bad_client_pages:
0957     destroy_workqueue(kc->kcopyd_wq);
0958 bad_workqueue:
0959     mempool_exit(&kc->job_pool);
0960 bad_slab:
0961     kfree(kc);
0962 
0963     return ERR_PTR(r);
0964 }
0965 EXPORT_SYMBOL(dm_kcopyd_client_create);
0966 
0967 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
0968 {
0969     /* Wait for completion of all jobs submitted by this client. */
0970     wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
0971 
0972     BUG_ON(!list_empty(&kc->callback_jobs));
0973     BUG_ON(!list_empty(&kc->complete_jobs));
0974     BUG_ON(!list_empty(&kc->io_jobs));
0975     BUG_ON(!list_empty(&kc->pages_jobs));
0976     destroy_workqueue(kc->kcopyd_wq);
0977     dm_io_client_destroy(kc->io_client);
0978     client_free_pages(kc);
0979     mempool_exit(&kc->job_pool);
0980     kfree(kc);
0981 }
0982 EXPORT_SYMBOL(dm_kcopyd_client_destroy);
0983 
0984 void dm_kcopyd_client_flush(struct dm_kcopyd_client *kc)
0985 {
0986     flush_workqueue(kc->kcopyd_wq);
0987 }
0988 EXPORT_SYMBOL(dm_kcopyd_client_flush);