0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 #include <linux/types.h>
0013 #include <linux/atomic.h>
0014 #include <linux/blkdev.h>
0015 #include <linux/fs.h>
0016 #include <linux/init.h>
0017 #include <linux/list.h>
0018 #include <linux/mempool.h>
0019 #include <linux/module.h>
0020 #include <linux/pagemap.h>
0021 #include <linux/slab.h>
0022 #include <linux/vmalloc.h>
0023 #include <linux/workqueue.h>
0024 #include <linux/mutex.h>
0025 #include <linux/delay.h>
0026 #include <linux/device-mapper.h>
0027 #include <linux/dm-kcopyd.h>
0028
0029 #include "dm-core.h"
0030
0031 #define SPLIT_COUNT 8
0032 #define MIN_JOBS 8
0033
0034 #define DEFAULT_SUB_JOB_SIZE_KB 512
0035 #define MAX_SUB_JOB_SIZE_KB 1024
0036
0037 static unsigned kcopyd_subjob_size_kb = DEFAULT_SUB_JOB_SIZE_KB;
0038
0039 module_param(kcopyd_subjob_size_kb, uint, S_IRUGO | S_IWUSR);
0040 MODULE_PARM_DESC(kcopyd_subjob_size_kb, "Sub-job size for dm-kcopyd clients");
0041
0042 static unsigned dm_get_kcopyd_subjob_size(void)
0043 {
0044 unsigned sub_job_size_kb;
0045
0046 sub_job_size_kb = __dm_get_module_param(&kcopyd_subjob_size_kb,
0047 DEFAULT_SUB_JOB_SIZE_KB,
0048 MAX_SUB_JOB_SIZE_KB);
0049
0050 return sub_job_size_kb << 1;
0051 }
0052
0053
0054
0055
0056
0057 struct dm_kcopyd_client {
0058 struct page_list *pages;
0059 unsigned nr_reserved_pages;
0060 unsigned nr_free_pages;
0061 unsigned sub_job_size;
0062
0063 struct dm_io_client *io_client;
0064
0065 wait_queue_head_t destroyq;
0066
0067 mempool_t job_pool;
0068
0069 struct workqueue_struct *kcopyd_wq;
0070 struct work_struct kcopyd_work;
0071
0072 struct dm_kcopyd_throttle *throttle;
0073
0074 atomic_t nr_jobs;
0075
0076
0077
0078
0079
0080
0081
0082
0083
0084
0085
0086 spinlock_t job_lock;
0087 struct list_head callback_jobs;
0088 struct list_head complete_jobs;
0089 struct list_head io_jobs;
0090 struct list_head pages_jobs;
0091 };
0092
0093 static struct page_list zero_page_list;
0094
0095 static DEFINE_SPINLOCK(throttle_spinlock);
0096
0097
0098
0099
0100
0101
0102 #define ACCOUNT_INTERVAL_SHIFT SHIFT_HZ
0103
0104
0105
0106
0107
0108
0109
0110
0111
0112 #define SLEEP_MSEC 100
0113
0114
0115
0116
0117
0118 #define MAX_SLEEPS 10
0119
0120 static void io_job_start(struct dm_kcopyd_throttle *t)
0121 {
0122 unsigned throttle, now, difference;
0123 int slept = 0, skew;
0124
0125 if (unlikely(!t))
0126 return;
0127
0128 try_again:
0129 spin_lock_irq(&throttle_spinlock);
0130
0131 throttle = READ_ONCE(t->throttle);
0132
0133 if (likely(throttle >= 100))
0134 goto skip_limit;
0135
0136 now = jiffies;
0137 difference = now - t->last_jiffies;
0138 t->last_jiffies = now;
0139 if (t->num_io_jobs)
0140 t->io_period += difference;
0141 t->total_period += difference;
0142
0143
0144
0145
0146 if (unlikely(t->io_period > t->total_period))
0147 t->io_period = t->total_period;
0148
0149 if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) {
0150 int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT);
0151 t->total_period >>= shift;
0152 t->io_period >>= shift;
0153 }
0154
0155 skew = t->io_period - throttle * t->total_period / 100;
0156
0157 if (unlikely(skew > 0) && slept < MAX_SLEEPS) {
0158 slept++;
0159 spin_unlock_irq(&throttle_spinlock);
0160 msleep(SLEEP_MSEC);
0161 goto try_again;
0162 }
0163
0164 skip_limit:
0165 t->num_io_jobs++;
0166
0167 spin_unlock_irq(&throttle_spinlock);
0168 }
0169
0170 static void io_job_finish(struct dm_kcopyd_throttle *t)
0171 {
0172 unsigned long flags;
0173
0174 if (unlikely(!t))
0175 return;
0176
0177 spin_lock_irqsave(&throttle_spinlock, flags);
0178
0179 t->num_io_jobs--;
0180
0181 if (likely(READ_ONCE(t->throttle) >= 100))
0182 goto skip_limit;
0183
0184 if (!t->num_io_jobs) {
0185 unsigned now, difference;
0186
0187 now = jiffies;
0188 difference = now - t->last_jiffies;
0189 t->last_jiffies = now;
0190
0191 t->io_period += difference;
0192 t->total_period += difference;
0193
0194
0195
0196
0197 if (unlikely(t->io_period > t->total_period))
0198 t->io_period = t->total_period;
0199 }
0200
0201 skip_limit:
0202 spin_unlock_irqrestore(&throttle_spinlock, flags);
0203 }
0204
0205
0206 static void wake(struct dm_kcopyd_client *kc)
0207 {
0208 queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
0209 }
0210
0211
0212
0213
0214 static struct page_list *alloc_pl(gfp_t gfp)
0215 {
0216 struct page_list *pl;
0217
0218 pl = kmalloc(sizeof(*pl), gfp);
0219 if (!pl)
0220 return NULL;
0221
0222 pl->page = alloc_page(gfp | __GFP_HIGHMEM);
0223 if (!pl->page) {
0224 kfree(pl);
0225 return NULL;
0226 }
0227
0228 return pl;
0229 }
0230
0231 static void free_pl(struct page_list *pl)
0232 {
0233 __free_page(pl->page);
0234 kfree(pl);
0235 }
0236
0237
0238
0239
0240
0241 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
0242 {
0243 struct page_list *next;
0244
0245 do {
0246 next = pl->next;
0247
0248 if (kc->nr_free_pages >= kc->nr_reserved_pages)
0249 free_pl(pl);
0250 else {
0251 pl->next = kc->pages;
0252 kc->pages = pl;
0253 kc->nr_free_pages++;
0254 }
0255
0256 pl = next;
0257 } while (pl);
0258 }
0259
0260 static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
0261 unsigned int nr, struct page_list **pages)
0262 {
0263 struct page_list *pl;
0264
0265 *pages = NULL;
0266
0267 do {
0268 pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM);
0269 if (unlikely(!pl)) {
0270
0271 pl = kc->pages;
0272 if (unlikely(!pl))
0273 goto out_of_memory;
0274 kc->pages = pl->next;
0275 kc->nr_free_pages--;
0276 }
0277 pl->next = *pages;
0278 *pages = pl;
0279 } while (--nr);
0280
0281 return 0;
0282
0283 out_of_memory:
0284 if (*pages)
0285 kcopyd_put_pages(kc, *pages);
0286 return -ENOMEM;
0287 }
0288
0289
0290
0291
0292 static void drop_pages(struct page_list *pl)
0293 {
0294 struct page_list *next;
0295
0296 while (pl) {
0297 next = pl->next;
0298 free_pl(pl);
0299 pl = next;
0300 }
0301 }
0302
0303
0304
0305
0306 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
0307 {
0308 unsigned i;
0309 struct page_list *pl = NULL, *next;
0310
0311 for (i = 0; i < nr_pages; i++) {
0312 next = alloc_pl(GFP_KERNEL);
0313 if (!next) {
0314 if (pl)
0315 drop_pages(pl);
0316 return -ENOMEM;
0317 }
0318 next->next = pl;
0319 pl = next;
0320 }
0321
0322 kc->nr_reserved_pages += nr_pages;
0323 kcopyd_put_pages(kc, pl);
0324
0325 return 0;
0326 }
0327
0328 static void client_free_pages(struct dm_kcopyd_client *kc)
0329 {
0330 BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
0331 drop_pages(kc->pages);
0332 kc->pages = NULL;
0333 kc->nr_free_pages = kc->nr_reserved_pages = 0;
0334 }
0335
0336
0337
0338
0339
0340
0341 struct kcopyd_job {
0342 struct dm_kcopyd_client *kc;
0343 struct list_head list;
0344 unsigned flags;
0345
0346
0347
0348
0349 int read_err;
0350 unsigned long write_err;
0351
0352
0353
0354
0355 enum req_op op;
0356 struct dm_io_region source;
0357
0358
0359
0360
0361 unsigned int num_dests;
0362 struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
0363
0364 struct page_list *pages;
0365
0366
0367
0368
0369
0370 dm_kcopyd_notify_fn fn;
0371 void *context;
0372
0373
0374
0375
0376
0377 struct mutex lock;
0378 atomic_t sub_jobs;
0379 sector_t progress;
0380 sector_t write_offset;
0381
0382 struct kcopyd_job *master_job;
0383 };
0384
0385 static struct kmem_cache *_job_cache;
0386
0387 int __init dm_kcopyd_init(void)
0388 {
0389 _job_cache = kmem_cache_create("kcopyd_job",
0390 sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
0391 __alignof__(struct kcopyd_job), 0, NULL);
0392 if (!_job_cache)
0393 return -ENOMEM;
0394
0395 zero_page_list.next = &zero_page_list;
0396 zero_page_list.page = ZERO_PAGE(0);
0397
0398 return 0;
0399 }
0400
0401 void dm_kcopyd_exit(void)
0402 {
0403 kmem_cache_destroy(_job_cache);
0404 _job_cache = NULL;
0405 }
0406
0407
0408
0409
0410
0411 static struct kcopyd_job *pop_io_job(struct list_head *jobs,
0412 struct dm_kcopyd_client *kc)
0413 {
0414 struct kcopyd_job *job;
0415
0416
0417
0418
0419
0420 list_for_each_entry(job, jobs, list) {
0421 if (job->op == REQ_OP_READ ||
0422 !(job->flags & BIT(DM_KCOPYD_WRITE_SEQ))) {
0423 list_del(&job->list);
0424 return job;
0425 }
0426
0427 if (job->write_offset == job->master_job->write_offset) {
0428 job->master_job->write_offset += job->source.count;
0429 list_del(&job->list);
0430 return job;
0431 }
0432 }
0433
0434 return NULL;
0435 }
0436
0437 static struct kcopyd_job *pop(struct list_head *jobs,
0438 struct dm_kcopyd_client *kc)
0439 {
0440 struct kcopyd_job *job = NULL;
0441
0442 spin_lock_irq(&kc->job_lock);
0443
0444 if (!list_empty(jobs)) {
0445 if (jobs == &kc->io_jobs)
0446 job = pop_io_job(jobs, kc);
0447 else {
0448 job = list_entry(jobs->next, struct kcopyd_job, list);
0449 list_del(&job->list);
0450 }
0451 }
0452 spin_unlock_irq(&kc->job_lock);
0453
0454 return job;
0455 }
0456
0457 static void push(struct list_head *jobs, struct kcopyd_job *job)
0458 {
0459 unsigned long flags;
0460 struct dm_kcopyd_client *kc = job->kc;
0461
0462 spin_lock_irqsave(&kc->job_lock, flags);
0463 list_add_tail(&job->list, jobs);
0464 spin_unlock_irqrestore(&kc->job_lock, flags);
0465 }
0466
0467
0468 static void push_head(struct list_head *jobs, struct kcopyd_job *job)
0469 {
0470 struct dm_kcopyd_client *kc = job->kc;
0471
0472 spin_lock_irq(&kc->job_lock);
0473 list_add(&job->list, jobs);
0474 spin_unlock_irq(&kc->job_lock);
0475 }
0476
0477
0478
0479
0480
0481
0482
0483
0484
0485
0486 static int run_complete_job(struct kcopyd_job *job)
0487 {
0488 void *context = job->context;
0489 int read_err = job->read_err;
0490 unsigned long write_err = job->write_err;
0491 dm_kcopyd_notify_fn fn = job->fn;
0492 struct dm_kcopyd_client *kc = job->kc;
0493
0494 if (job->pages && job->pages != &zero_page_list)
0495 kcopyd_put_pages(kc, job->pages);
0496
0497
0498
0499
0500 if (job->master_job == job) {
0501 mutex_destroy(&job->lock);
0502 mempool_free(job, &kc->job_pool);
0503 }
0504 fn(read_err, write_err, context);
0505
0506 if (atomic_dec_and_test(&kc->nr_jobs))
0507 wake_up(&kc->destroyq);
0508
0509 cond_resched();
0510
0511 return 0;
0512 }
0513
0514 static void complete_io(unsigned long error, void *context)
0515 {
0516 struct kcopyd_job *job = (struct kcopyd_job *) context;
0517 struct dm_kcopyd_client *kc = job->kc;
0518
0519 io_job_finish(kc->throttle);
0520
0521 if (error) {
0522 if (op_is_write(job->op))
0523 job->write_err |= error;
0524 else
0525 job->read_err = 1;
0526
0527 if (!(job->flags & BIT(DM_KCOPYD_IGNORE_ERROR))) {
0528 push(&kc->complete_jobs, job);
0529 wake(kc);
0530 return;
0531 }
0532 }
0533
0534 if (op_is_write(job->op))
0535 push(&kc->complete_jobs, job);
0536
0537 else {
0538 job->op = REQ_OP_WRITE;
0539 push(&kc->io_jobs, job);
0540 }
0541
0542 wake(kc);
0543 }
0544
0545
0546
0547
0548
0549 static int run_io_job(struct kcopyd_job *job)
0550 {
0551 int r;
0552 struct dm_io_request io_req = {
0553 .bi_opf = job->op,
0554 .mem.type = DM_IO_PAGE_LIST,
0555 .mem.ptr.pl = job->pages,
0556 .mem.offset = 0,
0557 .notify.fn = complete_io,
0558 .notify.context = job,
0559 .client = job->kc->io_client,
0560 };
0561
0562
0563
0564
0565
0566 if (job->flags & BIT(DM_KCOPYD_WRITE_SEQ) &&
0567 job->master_job->write_err) {
0568 job->write_err = job->master_job->write_err;
0569 return -EIO;
0570 }
0571
0572 io_job_start(job->kc->throttle);
0573
0574 if (job->op == REQ_OP_READ)
0575 r = dm_io(&io_req, 1, &job->source, NULL);
0576 else
0577 r = dm_io(&io_req, job->num_dests, job->dests, NULL);
0578
0579 return r;
0580 }
0581
0582 static int run_pages_job(struct kcopyd_job *job)
0583 {
0584 int r;
0585 unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
0586
0587 r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
0588 if (!r) {
0589
0590 push(&job->kc->io_jobs, job);
0591 return 0;
0592 }
0593
0594 if (r == -ENOMEM)
0595
0596 return 1;
0597
0598 return r;
0599 }
0600
0601
0602
0603
0604
0605 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
0606 int (*fn) (struct kcopyd_job *))
0607 {
0608 struct kcopyd_job *job;
0609 int r, count = 0;
0610
0611 while ((job = pop(jobs, kc))) {
0612
0613 r = fn(job);
0614
0615 if (r < 0) {
0616
0617 if (op_is_write(job->op))
0618 job->write_err = (unsigned long) -1L;
0619 else
0620 job->read_err = 1;
0621 push(&kc->complete_jobs, job);
0622 wake(kc);
0623 break;
0624 }
0625
0626 if (r > 0) {
0627
0628
0629
0630
0631 push_head(jobs, job);
0632 break;
0633 }
0634
0635 count++;
0636 }
0637
0638 return count;
0639 }
0640
0641
0642
0643
0644 static void do_work(struct work_struct *work)
0645 {
0646 struct dm_kcopyd_client *kc = container_of(work,
0647 struct dm_kcopyd_client, kcopyd_work);
0648 struct blk_plug plug;
0649
0650
0651
0652
0653
0654
0655
0656
0657 spin_lock_irq(&kc->job_lock);
0658 list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs);
0659 spin_unlock_irq(&kc->job_lock);
0660
0661 blk_start_plug(&plug);
0662 process_jobs(&kc->complete_jobs, kc, run_complete_job);
0663 process_jobs(&kc->pages_jobs, kc, run_pages_job);
0664 process_jobs(&kc->io_jobs, kc, run_io_job);
0665 blk_finish_plug(&plug);
0666 }
0667
0668
0669
0670
0671
0672
0673 static void dispatch_job(struct kcopyd_job *job)
0674 {
0675 struct dm_kcopyd_client *kc = job->kc;
0676 atomic_inc(&kc->nr_jobs);
0677 if (unlikely(!job->source.count))
0678 push(&kc->callback_jobs, job);
0679 else if (job->pages == &zero_page_list)
0680 push(&kc->io_jobs, job);
0681 else
0682 push(&kc->pages_jobs, job);
0683 wake(kc);
0684 }
0685
0686 static void segment_complete(int read_err, unsigned long write_err,
0687 void *context)
0688 {
0689
0690 sector_t progress = 0;
0691 sector_t count = 0;
0692 struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
0693 struct kcopyd_job *job = sub_job->master_job;
0694 struct dm_kcopyd_client *kc = job->kc;
0695
0696 mutex_lock(&job->lock);
0697
0698
0699 if (read_err)
0700 job->read_err = 1;
0701
0702 if (write_err)
0703 job->write_err |= write_err;
0704
0705
0706
0707
0708 if ((!job->read_err && !job->write_err) ||
0709 job->flags & BIT(DM_KCOPYD_IGNORE_ERROR)) {
0710
0711 progress = job->progress;
0712 count = job->source.count - progress;
0713 if (count) {
0714 if (count > kc->sub_job_size)
0715 count = kc->sub_job_size;
0716
0717 job->progress += count;
0718 }
0719 }
0720 mutex_unlock(&job->lock);
0721
0722 if (count) {
0723 int i;
0724
0725 *sub_job = *job;
0726 sub_job->write_offset = progress;
0727 sub_job->source.sector += progress;
0728 sub_job->source.count = count;
0729
0730 for (i = 0; i < job->num_dests; i++) {
0731 sub_job->dests[i].sector += progress;
0732 sub_job->dests[i].count = count;
0733 }
0734
0735 sub_job->fn = segment_complete;
0736 sub_job->context = sub_job;
0737 dispatch_job(sub_job);
0738
0739 } else if (atomic_dec_and_test(&job->sub_jobs)) {
0740
0741
0742
0743
0744
0745
0746
0747
0748
0749
0750 push(&kc->complete_jobs, job);
0751 wake(kc);
0752 }
0753 }
0754
0755
0756
0757
0758 static void split_job(struct kcopyd_job *master_job)
0759 {
0760 int i;
0761
0762 atomic_inc(&master_job->kc->nr_jobs);
0763
0764 atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
0765 for (i = 0; i < SPLIT_COUNT; i++) {
0766 master_job[i + 1].master_job = master_job;
0767 segment_complete(0, 0u, &master_job[i + 1]);
0768 }
0769 }
0770
0771 void dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
0772 unsigned int num_dests, struct dm_io_region *dests,
0773 unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
0774 {
0775 struct kcopyd_job *job;
0776 int i;
0777
0778
0779
0780
0781
0782 job = mempool_alloc(&kc->job_pool, GFP_NOIO);
0783 mutex_init(&job->lock);
0784
0785
0786
0787
0788 job->kc = kc;
0789 job->flags = flags;
0790 job->read_err = 0;
0791 job->write_err = 0;
0792
0793 job->num_dests = num_dests;
0794 memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
0795
0796
0797
0798
0799
0800
0801 if (!(job->flags & BIT(DM_KCOPYD_WRITE_SEQ))) {
0802 for (i = 0; i < job->num_dests; i++) {
0803 if (bdev_zoned_model(dests[i].bdev) == BLK_ZONED_HM) {
0804 job->flags |= BIT(DM_KCOPYD_WRITE_SEQ);
0805 break;
0806 }
0807 }
0808 }
0809
0810
0811
0812
0813 if (job->flags & BIT(DM_KCOPYD_WRITE_SEQ) &&
0814 job->flags & BIT(DM_KCOPYD_IGNORE_ERROR))
0815 job->flags &= ~BIT(DM_KCOPYD_IGNORE_ERROR);
0816
0817 if (from) {
0818 job->source = *from;
0819 job->pages = NULL;
0820 job->op = REQ_OP_READ;
0821 } else {
0822 memset(&job->source, 0, sizeof job->source);
0823 job->source.count = job->dests[0].count;
0824 job->pages = &zero_page_list;
0825
0826
0827
0828
0829 job->op = REQ_OP_WRITE_ZEROES;
0830 for (i = 0; i < job->num_dests; i++)
0831 if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) {
0832 job->op = REQ_OP_WRITE;
0833 break;
0834 }
0835 }
0836
0837 job->fn = fn;
0838 job->context = context;
0839 job->master_job = job;
0840 job->write_offset = 0;
0841
0842 if (job->source.count <= kc->sub_job_size)
0843 dispatch_job(job);
0844 else {
0845 job->progress = 0;
0846 split_job(job);
0847 }
0848 }
0849 EXPORT_SYMBOL(dm_kcopyd_copy);
0850
0851 void dm_kcopyd_zero(struct dm_kcopyd_client *kc,
0852 unsigned num_dests, struct dm_io_region *dests,
0853 unsigned flags, dm_kcopyd_notify_fn fn, void *context)
0854 {
0855 dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
0856 }
0857 EXPORT_SYMBOL(dm_kcopyd_zero);
0858
0859 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
0860 dm_kcopyd_notify_fn fn, void *context)
0861 {
0862 struct kcopyd_job *job;
0863
0864 job = mempool_alloc(&kc->job_pool, GFP_NOIO);
0865
0866 memset(job, 0, sizeof(struct kcopyd_job));
0867 job->kc = kc;
0868 job->fn = fn;
0869 job->context = context;
0870 job->master_job = job;
0871
0872 atomic_inc(&kc->nr_jobs);
0873
0874 return job;
0875 }
0876 EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
0877
0878 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
0879 {
0880 struct kcopyd_job *job = j;
0881 struct dm_kcopyd_client *kc = job->kc;
0882
0883 job->read_err = read_err;
0884 job->write_err = write_err;
0885
0886 push(&kc->callback_jobs, job);
0887 wake(kc);
0888 }
0889 EXPORT_SYMBOL(dm_kcopyd_do_callback);
0890
0891
0892
0893
0894
0895 #if 0
0896 int kcopyd_cancel(struct kcopyd_job *job, int block)
0897 {
0898
0899 return -1;
0900 }
0901 #endif
0902
0903
0904
0905
0906 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle)
0907 {
0908 int r;
0909 unsigned reserve_pages;
0910 struct dm_kcopyd_client *kc;
0911
0912 kc = kzalloc(sizeof(*kc), GFP_KERNEL);
0913 if (!kc)
0914 return ERR_PTR(-ENOMEM);
0915
0916 spin_lock_init(&kc->job_lock);
0917 INIT_LIST_HEAD(&kc->callback_jobs);
0918 INIT_LIST_HEAD(&kc->complete_jobs);
0919 INIT_LIST_HEAD(&kc->io_jobs);
0920 INIT_LIST_HEAD(&kc->pages_jobs);
0921 kc->throttle = throttle;
0922
0923 r = mempool_init_slab_pool(&kc->job_pool, MIN_JOBS, _job_cache);
0924 if (r)
0925 goto bad_slab;
0926
0927 INIT_WORK(&kc->kcopyd_work, do_work);
0928 kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0);
0929 if (!kc->kcopyd_wq) {
0930 r = -ENOMEM;
0931 goto bad_workqueue;
0932 }
0933
0934 kc->sub_job_size = dm_get_kcopyd_subjob_size();
0935 reserve_pages = DIV_ROUND_UP(kc->sub_job_size << SECTOR_SHIFT, PAGE_SIZE);
0936
0937 kc->pages = NULL;
0938 kc->nr_reserved_pages = kc->nr_free_pages = 0;
0939 r = client_reserve_pages(kc, reserve_pages);
0940 if (r)
0941 goto bad_client_pages;
0942
0943 kc->io_client = dm_io_client_create();
0944 if (IS_ERR(kc->io_client)) {
0945 r = PTR_ERR(kc->io_client);
0946 goto bad_io_client;
0947 }
0948
0949 init_waitqueue_head(&kc->destroyq);
0950 atomic_set(&kc->nr_jobs, 0);
0951
0952 return kc;
0953
0954 bad_io_client:
0955 client_free_pages(kc);
0956 bad_client_pages:
0957 destroy_workqueue(kc->kcopyd_wq);
0958 bad_workqueue:
0959 mempool_exit(&kc->job_pool);
0960 bad_slab:
0961 kfree(kc);
0962
0963 return ERR_PTR(r);
0964 }
0965 EXPORT_SYMBOL(dm_kcopyd_client_create);
0966
0967 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
0968 {
0969
0970 wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
0971
0972 BUG_ON(!list_empty(&kc->callback_jobs));
0973 BUG_ON(!list_empty(&kc->complete_jobs));
0974 BUG_ON(!list_empty(&kc->io_jobs));
0975 BUG_ON(!list_empty(&kc->pages_jobs));
0976 destroy_workqueue(kc->kcopyd_wq);
0977 dm_io_client_destroy(kc->io_client);
0978 client_free_pages(kc);
0979 mempool_exit(&kc->job_pool);
0980 kfree(kc);
0981 }
0982 EXPORT_SYMBOL(dm_kcopyd_client_destroy);
0983
0984 void dm_kcopyd_client_flush(struct dm_kcopyd_client *kc)
0985 {
0986 flush_workqueue(kc->kcopyd_wq);
0987 }
0988 EXPORT_SYMBOL(dm_kcopyd_client_flush);