Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-or-later
0002 /*
0003    drbd_worker.c
0004 
0005    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
0006 
0007    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
0008    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
0009    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
0010 
0011 
0012 */
0013 
0014 #include <linux/module.h>
0015 #include <linux/drbd.h>
0016 #include <linux/sched/signal.h>
0017 #include <linux/wait.h>
0018 #include <linux/mm.h>
0019 #include <linux/memcontrol.h>
0020 #include <linux/mm_inline.h>
0021 #include <linux/slab.h>
0022 #include <linux/random.h>
0023 #include <linux/string.h>
0024 #include <linux/scatterlist.h>
0025 #include <linux/part_stat.h>
0026 
0027 #include "drbd_int.h"
0028 #include "drbd_protocol.h"
0029 #include "drbd_req.h"
0030 
0031 static int make_ov_request(struct drbd_device *, int);
0032 static int make_resync_request(struct drbd_device *, int);
0033 
0034 /* endio handlers:
0035  *   drbd_md_endio (defined here)
0036  *   drbd_request_endio (defined here)
0037  *   drbd_peer_request_endio (defined here)
0038  *   drbd_bm_endio (defined in drbd_bitmap.c)
0039  *
0040  * For all these callbacks, note the following:
0041  * The callbacks will be called in irq context by the IDE drivers,
0042  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
0043  * Try to get the locking right :)
0044  *
0045  */
0046 
0047 /* used for synchronous meta data and bitmap IO
0048  * submitted by drbd_md_sync_page_io()
0049  */
0050 void drbd_md_endio(struct bio *bio)
0051 {
0052     struct drbd_device *device;
0053 
0054     device = bio->bi_private;
0055     device->md_io.error = blk_status_to_errno(bio->bi_status);
0056 
0057     /* special case: drbd_md_read() during drbd_adm_attach() */
0058     if (device->ldev)
0059         put_ldev(device);
0060     bio_put(bio);
0061 
0062     /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
0063      * to timeout on the lower level device, and eventually detach from it.
0064      * If this io completion runs after that timeout expired, this
0065      * drbd_md_put_buffer() may allow us to finally try and re-attach.
0066      * During normal operation, this only puts that extra reference
0067      * down to 1 again.
0068      * Make sure we first drop the reference, and only then signal
0069      * completion, or we may (in drbd_al_read_log()) cycle so fast into the
0070      * next drbd_md_sync_page_io(), that we trigger the
0071      * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
0072      */
0073     drbd_md_put_buffer(device);
0074     device->md_io.done = 1;
0075     wake_up(&device->misc_wait);
0076 }
0077 
0078 /* reads on behalf of the partner,
0079  * "submitted" by the receiver
0080  */
0081 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
0082 {
0083     unsigned long flags = 0;
0084     struct drbd_peer_device *peer_device = peer_req->peer_device;
0085     struct drbd_device *device = peer_device->device;
0086 
0087     spin_lock_irqsave(&device->resource->req_lock, flags);
0088     device->read_cnt += peer_req->i.size >> 9;
0089     list_del(&peer_req->w.list);
0090     if (list_empty(&device->read_ee))
0091         wake_up(&device->ee_wait);
0092     if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
0093         __drbd_chk_io_error(device, DRBD_READ_ERROR);
0094     spin_unlock_irqrestore(&device->resource->req_lock, flags);
0095 
0096     drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
0097     put_ldev(device);
0098 }
0099 
0100 /* writes on behalf of the partner, or resync writes,
0101  * "submitted" by the receiver, final stage.  */
0102 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
0103 {
0104     unsigned long flags = 0;
0105     struct drbd_peer_device *peer_device = peer_req->peer_device;
0106     struct drbd_device *device = peer_device->device;
0107     struct drbd_connection *connection = peer_device->connection;
0108     struct drbd_interval i;
0109     int do_wake;
0110     u64 block_id;
0111     int do_al_complete_io;
0112 
0113     /* after we moved peer_req to done_ee,
0114      * we may no longer access it,
0115      * it may be freed/reused already!
0116      * (as soon as we release the req_lock) */
0117     i = peer_req->i;
0118     do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
0119     block_id = peer_req->block_id;
0120     peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
0121 
0122     if (peer_req->flags & EE_WAS_ERROR) {
0123         /* In protocol != C, we usually do not send write acks.
0124          * In case of a write error, send the neg ack anyways. */
0125         if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
0126             inc_unacked(device);
0127         drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
0128     }
0129 
0130     spin_lock_irqsave(&device->resource->req_lock, flags);
0131     device->writ_cnt += peer_req->i.size >> 9;
0132     list_move_tail(&peer_req->w.list, &device->done_ee);
0133 
0134     /*
0135      * Do not remove from the write_requests tree here: we did not send the
0136      * Ack yet and did not wake possibly waiting conflicting requests.
0137      * Removed from the tree from "drbd_process_done_ee" within the
0138      * appropriate dw.cb (e_end_block/e_end_resync_block) or from
0139      * _drbd_clear_done_ee.
0140      */
0141 
0142     do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
0143 
0144     /* FIXME do we want to detach for failed REQ_OP_DISCARD?
0145      * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
0146     if (peer_req->flags & EE_WAS_ERROR)
0147         __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
0148 
0149     if (connection->cstate >= C_WF_REPORT_PARAMS) {
0150         kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
0151         if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
0152             kref_put(&device->kref, drbd_destroy_device);
0153     }
0154     spin_unlock_irqrestore(&device->resource->req_lock, flags);
0155 
0156     if (block_id == ID_SYNCER)
0157         drbd_rs_complete_io(device, i.sector);
0158 
0159     if (do_wake)
0160         wake_up(&device->ee_wait);
0161 
0162     if (do_al_complete_io)
0163         drbd_al_complete_io(device, &i);
0164 
0165     put_ldev(device);
0166 }
0167 
0168 /* writes on behalf of the partner, or resync writes,
0169  * "submitted" by the receiver.
0170  */
0171 void drbd_peer_request_endio(struct bio *bio)
0172 {
0173     struct drbd_peer_request *peer_req = bio->bi_private;
0174     struct drbd_device *device = peer_req->peer_device->device;
0175     bool is_write = bio_data_dir(bio) == WRITE;
0176     bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
0177               bio_op(bio) == REQ_OP_DISCARD;
0178 
0179     if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
0180         drbd_warn(device, "%s: error=%d s=%llus\n",
0181                 is_write ? (is_discard ? "discard" : "write")
0182                     : "read", bio->bi_status,
0183                 (unsigned long long)peer_req->i.sector);
0184 
0185     if (bio->bi_status)
0186         set_bit(__EE_WAS_ERROR, &peer_req->flags);
0187 
0188     bio_put(bio); /* no need for the bio anymore */
0189     if (atomic_dec_and_test(&peer_req->pending_bios)) {
0190         if (is_write)
0191             drbd_endio_write_sec_final(peer_req);
0192         else
0193             drbd_endio_read_sec_final(peer_req);
0194     }
0195 }
0196 
0197 static void
0198 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
0199 {
0200     panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
0201         device->minor, device->resource->name, device->vnr);
0202 }
0203 
0204 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
0205  */
0206 void drbd_request_endio(struct bio *bio)
0207 {
0208     unsigned long flags;
0209     struct drbd_request *req = bio->bi_private;
0210     struct drbd_device *device = req->device;
0211     struct bio_and_error m;
0212     enum drbd_req_event what;
0213 
0214     /* If this request was aborted locally before,
0215      * but now was completed "successfully",
0216      * chances are that this caused arbitrary data corruption.
0217      *
0218      * "aborting" requests, or force-detaching the disk, is intended for
0219      * completely blocked/hung local backing devices which do no longer
0220      * complete requests at all, not even do error completions.  In this
0221      * situation, usually a hard-reset and failover is the only way out.
0222      *
0223      * By "aborting", basically faking a local error-completion,
0224      * we allow for a more graceful swichover by cleanly migrating services.
0225      * Still the affected node has to be rebooted "soon".
0226      *
0227      * By completing these requests, we allow the upper layers to re-use
0228      * the associated data pages.
0229      *
0230      * If later the local backing device "recovers", and now DMAs some data
0231      * from disk into the original request pages, in the best case it will
0232      * just put random data into unused pages; but typically it will corrupt
0233      * meanwhile completely unrelated data, causing all sorts of damage.
0234      *
0235      * Which means delayed successful completion,
0236      * especially for READ requests,
0237      * is a reason to panic().
0238      *
0239      * We assume that a delayed *error* completion is OK,
0240      * though we still will complain noisily about it.
0241      */
0242     if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
0243         if (__ratelimit(&drbd_ratelimit_state))
0244             drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
0245 
0246         if (!bio->bi_status)
0247             drbd_panic_after_delayed_completion_of_aborted_request(device);
0248     }
0249 
0250     /* to avoid recursion in __req_mod */
0251     if (unlikely(bio->bi_status)) {
0252         switch (bio_op(bio)) {
0253         case REQ_OP_WRITE_ZEROES:
0254         case REQ_OP_DISCARD:
0255             if (bio->bi_status == BLK_STS_NOTSUPP)
0256                 what = DISCARD_COMPLETED_NOTSUPP;
0257             else
0258                 what = DISCARD_COMPLETED_WITH_ERROR;
0259             break;
0260         case REQ_OP_READ:
0261             if (bio->bi_opf & REQ_RAHEAD)
0262                 what = READ_AHEAD_COMPLETED_WITH_ERROR;
0263             else
0264                 what = READ_COMPLETED_WITH_ERROR;
0265             break;
0266         default:
0267             what = WRITE_COMPLETED_WITH_ERROR;
0268             break;
0269         }
0270     } else {
0271         what = COMPLETED_OK;
0272     }
0273 
0274     req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
0275     bio_put(bio);
0276 
0277     /* not req_mod(), we need irqsave here! */
0278     spin_lock_irqsave(&device->resource->req_lock, flags);
0279     __req_mod(req, what, &m);
0280     spin_unlock_irqrestore(&device->resource->req_lock, flags);
0281     put_ldev(device);
0282 
0283     if (m.bio)
0284         complete_master_bio(device, &m);
0285 }
0286 
0287 void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
0288 {
0289     SHASH_DESC_ON_STACK(desc, tfm);
0290     struct page *page = peer_req->pages;
0291     struct page *tmp;
0292     unsigned len;
0293     void *src;
0294 
0295     desc->tfm = tfm;
0296 
0297     crypto_shash_init(desc);
0298 
0299     src = kmap_atomic(page);
0300     while ((tmp = page_chain_next(page))) {
0301         /* all but the last page will be fully used */
0302         crypto_shash_update(desc, src, PAGE_SIZE);
0303         kunmap_atomic(src);
0304         page = tmp;
0305         src = kmap_atomic(page);
0306     }
0307     /* and now the last, possibly only partially used page */
0308     len = peer_req->i.size & (PAGE_SIZE - 1);
0309     crypto_shash_update(desc, src, len ?: PAGE_SIZE);
0310     kunmap_atomic(src);
0311 
0312     crypto_shash_final(desc, digest);
0313     shash_desc_zero(desc);
0314 }
0315 
0316 void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
0317 {
0318     SHASH_DESC_ON_STACK(desc, tfm);
0319     struct bio_vec bvec;
0320     struct bvec_iter iter;
0321 
0322     desc->tfm = tfm;
0323 
0324     crypto_shash_init(desc);
0325 
0326     bio_for_each_segment(bvec, bio, iter) {
0327         u8 *src;
0328 
0329         src = bvec_kmap_local(&bvec);
0330         crypto_shash_update(desc, src, bvec.bv_len);
0331         kunmap_local(src);
0332     }
0333     crypto_shash_final(desc, digest);
0334     shash_desc_zero(desc);
0335 }
0336 
0337 /* MAYBE merge common code with w_e_end_ov_req */
0338 static int w_e_send_csum(struct drbd_work *w, int cancel)
0339 {
0340     struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
0341     struct drbd_peer_device *peer_device = peer_req->peer_device;
0342     struct drbd_device *device = peer_device->device;
0343     int digest_size;
0344     void *digest;
0345     int err = 0;
0346 
0347     if (unlikely(cancel))
0348         goto out;
0349 
0350     if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
0351         goto out;
0352 
0353     digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
0354     digest = kmalloc(digest_size, GFP_NOIO);
0355     if (digest) {
0356         sector_t sector = peer_req->i.sector;
0357         unsigned int size = peer_req->i.size;
0358         drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
0359         /* Free peer_req and pages before send.
0360          * In case we block on congestion, we could otherwise run into
0361          * some distributed deadlock, if the other side blocks on
0362          * congestion as well, because our receiver blocks in
0363          * drbd_alloc_pages due to pp_in_use > max_buffers. */
0364         drbd_free_peer_req(device, peer_req);
0365         peer_req = NULL;
0366         inc_rs_pending(device);
0367         err = drbd_send_drequest_csum(peer_device, sector, size,
0368                           digest, digest_size,
0369                           P_CSUM_RS_REQUEST);
0370         kfree(digest);
0371     } else {
0372         drbd_err(device, "kmalloc() of digest failed.\n");
0373         err = -ENOMEM;
0374     }
0375 
0376 out:
0377     if (peer_req)
0378         drbd_free_peer_req(device, peer_req);
0379 
0380     if (unlikely(err))
0381         drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
0382     return err;
0383 }
0384 
0385 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
0386 
0387 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
0388 {
0389     struct drbd_device *device = peer_device->device;
0390     struct drbd_peer_request *peer_req;
0391 
0392     if (!get_ldev(device))
0393         return -EIO;
0394 
0395     /* GFP_TRY, because if there is no memory available right now, this may
0396      * be rescheduled for later. It is "only" background resync, after all. */
0397     peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
0398                        size, size, GFP_TRY);
0399     if (!peer_req)
0400         goto defer;
0401 
0402     peer_req->w.cb = w_e_send_csum;
0403     spin_lock_irq(&device->resource->req_lock);
0404     list_add_tail(&peer_req->w.list, &device->read_ee);
0405     spin_unlock_irq(&device->resource->req_lock);
0406 
0407     atomic_add(size >> 9, &device->rs_sect_ev);
0408     if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ,
0409                      DRBD_FAULT_RS_RD) == 0)
0410         return 0;
0411 
0412     /* If it failed because of ENOMEM, retry should help.  If it failed
0413      * because bio_add_page failed (probably broken lower level driver),
0414      * retry may or may not help.
0415      * If it does not, you may need to force disconnect. */
0416     spin_lock_irq(&device->resource->req_lock);
0417     list_del(&peer_req->w.list);
0418     spin_unlock_irq(&device->resource->req_lock);
0419 
0420     drbd_free_peer_req(device, peer_req);
0421 defer:
0422     put_ldev(device);
0423     return -EAGAIN;
0424 }
0425 
0426 int w_resync_timer(struct drbd_work *w, int cancel)
0427 {
0428     struct drbd_device *device =
0429         container_of(w, struct drbd_device, resync_work);
0430 
0431     switch (device->state.conn) {
0432     case C_VERIFY_S:
0433         make_ov_request(device, cancel);
0434         break;
0435     case C_SYNC_TARGET:
0436         make_resync_request(device, cancel);
0437         break;
0438     }
0439 
0440     return 0;
0441 }
0442 
0443 void resync_timer_fn(struct timer_list *t)
0444 {
0445     struct drbd_device *device = from_timer(device, t, resync_timer);
0446 
0447     drbd_queue_work_if_unqueued(
0448         &first_peer_device(device)->connection->sender_work,
0449         &device->resync_work);
0450 }
0451 
0452 static void fifo_set(struct fifo_buffer *fb, int value)
0453 {
0454     int i;
0455 
0456     for (i = 0; i < fb->size; i++)
0457         fb->values[i] = value;
0458 }
0459 
0460 static int fifo_push(struct fifo_buffer *fb, int value)
0461 {
0462     int ov;
0463 
0464     ov = fb->values[fb->head_index];
0465     fb->values[fb->head_index++] = value;
0466 
0467     if (fb->head_index >= fb->size)
0468         fb->head_index = 0;
0469 
0470     return ov;
0471 }
0472 
0473 static void fifo_add_val(struct fifo_buffer *fb, int value)
0474 {
0475     int i;
0476 
0477     for (i = 0; i < fb->size; i++)
0478         fb->values[i] += value;
0479 }
0480 
0481 struct fifo_buffer *fifo_alloc(unsigned int fifo_size)
0482 {
0483     struct fifo_buffer *fb;
0484 
0485     fb = kzalloc(struct_size(fb, values, fifo_size), GFP_NOIO);
0486     if (!fb)
0487         return NULL;
0488 
0489     fb->head_index = 0;
0490     fb->size = fifo_size;
0491     fb->total = 0;
0492 
0493     return fb;
0494 }
0495 
0496 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
0497 {
0498     struct disk_conf *dc;
0499     unsigned int want;     /* The number of sectors we want in-flight */
0500     int req_sect; /* Number of sectors to request in this turn */
0501     int correction; /* Number of sectors more we need in-flight */
0502     int cps; /* correction per invocation of drbd_rs_controller() */
0503     int steps; /* Number of time steps to plan ahead */
0504     int curr_corr;
0505     int max_sect;
0506     struct fifo_buffer *plan;
0507 
0508     dc = rcu_dereference(device->ldev->disk_conf);
0509     plan = rcu_dereference(device->rs_plan_s);
0510 
0511     steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
0512 
0513     if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
0514         want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
0515     } else { /* normal path */
0516         want = dc->c_fill_target ? dc->c_fill_target :
0517             sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
0518     }
0519 
0520     correction = want - device->rs_in_flight - plan->total;
0521 
0522     /* Plan ahead */
0523     cps = correction / steps;
0524     fifo_add_val(plan, cps);
0525     plan->total += cps * steps;
0526 
0527     /* What we do in this step */
0528     curr_corr = fifo_push(plan, 0);
0529     plan->total -= curr_corr;
0530 
0531     req_sect = sect_in + curr_corr;
0532     if (req_sect < 0)
0533         req_sect = 0;
0534 
0535     max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
0536     if (req_sect > max_sect)
0537         req_sect = max_sect;
0538 
0539     /*
0540     drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
0541          sect_in, device->rs_in_flight, want, correction,
0542          steps, cps, device->rs_planed, curr_corr, req_sect);
0543     */
0544 
0545     return req_sect;
0546 }
0547 
0548 static int drbd_rs_number_requests(struct drbd_device *device)
0549 {
0550     unsigned int sect_in;  /* Number of sectors that came in since the last turn */
0551     int number, mxb;
0552 
0553     sect_in = atomic_xchg(&device->rs_sect_in, 0);
0554     device->rs_in_flight -= sect_in;
0555 
0556     rcu_read_lock();
0557     mxb = drbd_get_max_buffers(device) / 2;
0558     if (rcu_dereference(device->rs_plan_s)->size) {
0559         number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
0560         device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
0561     } else {
0562         device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
0563         number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
0564     }
0565     rcu_read_unlock();
0566 
0567     /* Don't have more than "max-buffers"/2 in-flight.
0568      * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
0569      * potentially causing a distributed deadlock on congestion during
0570      * online-verify or (checksum-based) resync, if max-buffers,
0571      * socket buffer sizes and resync rate settings are mis-configured. */
0572 
0573     /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
0574      * mxb (as used here, and in drbd_alloc_pages on the peer) is
0575      * "number of pages" (typically also 4k),
0576      * but "rs_in_flight" is in "sectors" (512 Byte). */
0577     if (mxb - device->rs_in_flight/8 < number)
0578         number = mxb - device->rs_in_flight/8;
0579 
0580     return number;
0581 }
0582 
0583 static int make_resync_request(struct drbd_device *const device, int cancel)
0584 {
0585     struct drbd_peer_device *const peer_device = first_peer_device(device);
0586     struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
0587     unsigned long bit;
0588     sector_t sector;
0589     const sector_t capacity = get_capacity(device->vdisk);
0590     int max_bio_size;
0591     int number, rollback_i, size;
0592     int align, requeue = 0;
0593     int i = 0;
0594     int discard_granularity = 0;
0595 
0596     if (unlikely(cancel))
0597         return 0;
0598 
0599     if (device->rs_total == 0) {
0600         /* empty resync? */
0601         drbd_resync_finished(device);
0602         return 0;
0603     }
0604 
0605     if (!get_ldev(device)) {
0606         /* Since we only need to access device->rsync a
0607            get_ldev_if_state(device,D_FAILED) would be sufficient, but
0608            to continue resync with a broken disk makes no sense at
0609            all */
0610         drbd_err(device, "Disk broke down during resync!\n");
0611         return 0;
0612     }
0613 
0614     if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
0615         rcu_read_lock();
0616         discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
0617         rcu_read_unlock();
0618     }
0619 
0620     max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
0621     number = drbd_rs_number_requests(device);
0622     if (number <= 0)
0623         goto requeue;
0624 
0625     for (i = 0; i < number; i++) {
0626         /* Stop generating RS requests when half of the send buffer is filled,
0627          * but notify TCP that we'd like to have more space. */
0628         mutex_lock(&connection->data.mutex);
0629         if (connection->data.socket) {
0630             struct sock *sk = connection->data.socket->sk;
0631             int queued = sk->sk_wmem_queued;
0632             int sndbuf = sk->sk_sndbuf;
0633             if (queued > sndbuf / 2) {
0634                 requeue = 1;
0635                 if (sk->sk_socket)
0636                     set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
0637             }
0638         } else
0639             requeue = 1;
0640         mutex_unlock(&connection->data.mutex);
0641         if (requeue)
0642             goto requeue;
0643 
0644 next_sector:
0645         size = BM_BLOCK_SIZE;
0646         bit  = drbd_bm_find_next(device, device->bm_resync_fo);
0647 
0648         if (bit == DRBD_END_OF_BITMAP) {
0649             device->bm_resync_fo = drbd_bm_bits(device);
0650             put_ldev(device);
0651             return 0;
0652         }
0653 
0654         sector = BM_BIT_TO_SECT(bit);
0655 
0656         if (drbd_try_rs_begin_io(device, sector)) {
0657             device->bm_resync_fo = bit;
0658             goto requeue;
0659         }
0660         device->bm_resync_fo = bit + 1;
0661 
0662         if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
0663             drbd_rs_complete_io(device, sector);
0664             goto next_sector;
0665         }
0666 
0667 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
0668         /* try to find some adjacent bits.
0669          * we stop if we have already the maximum req size.
0670          *
0671          * Additionally always align bigger requests, in order to
0672          * be prepared for all stripe sizes of software RAIDs.
0673          */
0674         align = 1;
0675         rollback_i = i;
0676         while (i < number) {
0677             if (size + BM_BLOCK_SIZE > max_bio_size)
0678                 break;
0679 
0680             /* Be always aligned */
0681             if (sector & ((1<<(align+3))-1))
0682                 break;
0683 
0684             if (discard_granularity && size == discard_granularity)
0685                 break;
0686 
0687             /* do not cross extent boundaries */
0688             if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
0689                 break;
0690             /* now, is it actually dirty, after all?
0691              * caution, drbd_bm_test_bit is tri-state for some
0692              * obscure reason; ( b == 0 ) would get the out-of-band
0693              * only accidentally right because of the "oddly sized"
0694              * adjustment below */
0695             if (drbd_bm_test_bit(device, bit+1) != 1)
0696                 break;
0697             bit++;
0698             size += BM_BLOCK_SIZE;
0699             if ((BM_BLOCK_SIZE << align) <= size)
0700                 align++;
0701             i++;
0702         }
0703         /* if we merged some,
0704          * reset the offset to start the next drbd_bm_find_next from */
0705         if (size > BM_BLOCK_SIZE)
0706             device->bm_resync_fo = bit + 1;
0707 #endif
0708 
0709         /* adjust very last sectors, in case we are oddly sized */
0710         if (sector + (size>>9) > capacity)
0711             size = (capacity-sector)<<9;
0712 
0713         if (device->use_csums) {
0714             switch (read_for_csum(peer_device, sector, size)) {
0715             case -EIO: /* Disk failure */
0716                 put_ldev(device);
0717                 return -EIO;
0718             case -EAGAIN: /* allocation failed, or ldev busy */
0719                 drbd_rs_complete_io(device, sector);
0720                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
0721                 i = rollback_i;
0722                 goto requeue;
0723             case 0:
0724                 /* everything ok */
0725                 break;
0726             default:
0727                 BUG();
0728             }
0729         } else {
0730             int err;
0731 
0732             inc_rs_pending(device);
0733             err = drbd_send_drequest(peer_device,
0734                          size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
0735                          sector, size, ID_SYNCER);
0736             if (err) {
0737                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
0738                 dec_rs_pending(device);
0739                 put_ldev(device);
0740                 return err;
0741             }
0742         }
0743     }
0744 
0745     if (device->bm_resync_fo >= drbd_bm_bits(device)) {
0746         /* last syncer _request_ was sent,
0747          * but the P_RS_DATA_REPLY not yet received.  sync will end (and
0748          * next sync group will resume), as soon as we receive the last
0749          * resync data block, and the last bit is cleared.
0750          * until then resync "work" is "inactive" ...
0751          */
0752         put_ldev(device);
0753         return 0;
0754     }
0755 
0756  requeue:
0757     device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
0758     mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
0759     put_ldev(device);
0760     return 0;
0761 }
0762 
0763 static int make_ov_request(struct drbd_device *device, int cancel)
0764 {
0765     int number, i, size;
0766     sector_t sector;
0767     const sector_t capacity = get_capacity(device->vdisk);
0768     bool stop_sector_reached = false;
0769 
0770     if (unlikely(cancel))
0771         return 1;
0772 
0773     number = drbd_rs_number_requests(device);
0774 
0775     sector = device->ov_position;
0776     for (i = 0; i < number; i++) {
0777         if (sector >= capacity)
0778             return 1;
0779 
0780         /* We check for "finished" only in the reply path:
0781          * w_e_end_ov_reply().
0782          * We need to send at least one request out. */
0783         stop_sector_reached = i > 0
0784             && verify_can_do_stop_sector(device)
0785             && sector >= device->ov_stop_sector;
0786         if (stop_sector_reached)
0787             break;
0788 
0789         size = BM_BLOCK_SIZE;
0790 
0791         if (drbd_try_rs_begin_io(device, sector)) {
0792             device->ov_position = sector;
0793             goto requeue;
0794         }
0795 
0796         if (sector + (size>>9) > capacity)
0797             size = (capacity-sector)<<9;
0798 
0799         inc_rs_pending(device);
0800         if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
0801             dec_rs_pending(device);
0802             return 0;
0803         }
0804         sector += BM_SECT_PER_BIT;
0805     }
0806     device->ov_position = sector;
0807 
0808  requeue:
0809     device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
0810     if (i == 0 || !stop_sector_reached)
0811         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
0812     return 1;
0813 }
0814 
0815 int w_ov_finished(struct drbd_work *w, int cancel)
0816 {
0817     struct drbd_device_work *dw =
0818         container_of(w, struct drbd_device_work, w);
0819     struct drbd_device *device = dw->device;
0820     kfree(dw);
0821     ov_out_of_sync_print(device);
0822     drbd_resync_finished(device);
0823 
0824     return 0;
0825 }
0826 
0827 static int w_resync_finished(struct drbd_work *w, int cancel)
0828 {
0829     struct drbd_device_work *dw =
0830         container_of(w, struct drbd_device_work, w);
0831     struct drbd_device *device = dw->device;
0832     kfree(dw);
0833 
0834     drbd_resync_finished(device);
0835 
0836     return 0;
0837 }
0838 
0839 static void ping_peer(struct drbd_device *device)
0840 {
0841     struct drbd_connection *connection = first_peer_device(device)->connection;
0842 
0843     clear_bit(GOT_PING_ACK, &connection->flags);
0844     request_ping(connection);
0845     wait_event(connection->ping_wait,
0846            test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
0847 }
0848 
0849 int drbd_resync_finished(struct drbd_device *device)
0850 {
0851     struct drbd_connection *connection = first_peer_device(device)->connection;
0852     unsigned long db, dt, dbdt;
0853     unsigned long n_oos;
0854     union drbd_state os, ns;
0855     struct drbd_device_work *dw;
0856     char *khelper_cmd = NULL;
0857     int verify_done = 0;
0858 
0859     /* Remove all elements from the resync LRU. Since future actions
0860      * might set bits in the (main) bitmap, then the entries in the
0861      * resync LRU would be wrong. */
0862     if (drbd_rs_del_all(device)) {
0863         /* In case this is not possible now, most probably because
0864          * there are P_RS_DATA_REPLY Packets lingering on the worker's
0865          * queue (or even the read operations for those packets
0866          * is not finished by now).   Retry in 100ms. */
0867 
0868         schedule_timeout_interruptible(HZ / 10);
0869         dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
0870         if (dw) {
0871             dw->w.cb = w_resync_finished;
0872             dw->device = device;
0873             drbd_queue_work(&connection->sender_work, &dw->w);
0874             return 1;
0875         }
0876         drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
0877     }
0878 
0879     dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
0880     if (dt <= 0)
0881         dt = 1;
0882 
0883     db = device->rs_total;
0884     /* adjust for verify start and stop sectors, respective reached position */
0885     if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
0886         db -= device->ov_left;
0887 
0888     dbdt = Bit2KB(db/dt);
0889     device->rs_paused /= HZ;
0890 
0891     if (!get_ldev(device))
0892         goto out;
0893 
0894     ping_peer(device);
0895 
0896     spin_lock_irq(&device->resource->req_lock);
0897     os = drbd_read_state(device);
0898 
0899     verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
0900 
0901     /* This protects us against multiple calls (that can happen in the presence
0902        of application IO), and against connectivity loss just before we arrive here. */
0903     if (os.conn <= C_CONNECTED)
0904         goto out_unlock;
0905 
0906     ns = os;
0907     ns.conn = C_CONNECTED;
0908 
0909     drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
0910          verify_done ? "Online verify" : "Resync",
0911          dt + device->rs_paused, device->rs_paused, dbdt);
0912 
0913     n_oos = drbd_bm_total_weight(device);
0914 
0915     if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
0916         if (n_oos) {
0917             drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
0918                   n_oos, Bit2KB(1));
0919             khelper_cmd = "out-of-sync";
0920         }
0921     } else {
0922         D_ASSERT(device, (n_oos - device->rs_failed) == 0);
0923 
0924         if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
0925             khelper_cmd = "after-resync-target";
0926 
0927         if (device->use_csums && device->rs_total) {
0928             const unsigned long s = device->rs_same_csum;
0929             const unsigned long t = device->rs_total;
0930             const int ratio =
0931                 (t == 0)     ? 0 :
0932             (t < 100000) ? ((s*100)/t) : (s/(t/100));
0933             drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
0934                  "transferred %luK total %luK\n",
0935                  ratio,
0936                  Bit2KB(device->rs_same_csum),
0937                  Bit2KB(device->rs_total - device->rs_same_csum),
0938                  Bit2KB(device->rs_total));
0939         }
0940     }
0941 
0942     if (device->rs_failed) {
0943         drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
0944 
0945         if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
0946             ns.disk = D_INCONSISTENT;
0947             ns.pdsk = D_UP_TO_DATE;
0948         } else {
0949             ns.disk = D_UP_TO_DATE;
0950             ns.pdsk = D_INCONSISTENT;
0951         }
0952     } else {
0953         ns.disk = D_UP_TO_DATE;
0954         ns.pdsk = D_UP_TO_DATE;
0955 
0956         if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
0957             if (device->p_uuid) {
0958                 int i;
0959                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
0960                     _drbd_uuid_set(device, i, device->p_uuid[i]);
0961                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
0962                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
0963             } else {
0964                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
0965             }
0966         }
0967 
0968         if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
0969             /* for verify runs, we don't update uuids here,
0970              * so there would be nothing to report. */
0971             drbd_uuid_set_bm(device, 0UL);
0972             drbd_print_uuids(device, "updated UUIDs");
0973             if (device->p_uuid) {
0974                 /* Now the two UUID sets are equal, update what we
0975                  * know of the peer. */
0976                 int i;
0977                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
0978                     device->p_uuid[i] = device->ldev->md.uuid[i];
0979             }
0980         }
0981     }
0982 
0983     _drbd_set_state(device, ns, CS_VERBOSE, NULL);
0984 out_unlock:
0985     spin_unlock_irq(&device->resource->req_lock);
0986 
0987     /* If we have been sync source, and have an effective fencing-policy,
0988      * once *all* volumes are back in sync, call "unfence". */
0989     if (os.conn == C_SYNC_SOURCE) {
0990         enum drbd_disk_state disk_state = D_MASK;
0991         enum drbd_disk_state pdsk_state = D_MASK;
0992         enum drbd_fencing_p fp = FP_DONT_CARE;
0993 
0994         rcu_read_lock();
0995         fp = rcu_dereference(device->ldev->disk_conf)->fencing;
0996         if (fp != FP_DONT_CARE) {
0997             struct drbd_peer_device *peer_device;
0998             int vnr;
0999             idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1000                 struct drbd_device *device = peer_device->device;
1001                 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1002                 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1003             }
1004         }
1005         rcu_read_unlock();
1006         if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1007             conn_khelper(connection, "unfence-peer");
1008     }
1009 
1010     put_ldev(device);
1011 out:
1012     device->rs_total  = 0;
1013     device->rs_failed = 0;
1014     device->rs_paused = 0;
1015 
1016     /* reset start sector, if we reached end of device */
1017     if (verify_done && device->ov_left == 0)
1018         device->ov_start_sector = 0;
1019 
1020     drbd_md_sync(device);
1021 
1022     if (khelper_cmd)
1023         drbd_khelper(device, khelper_cmd);
1024 
1025     return 1;
1026 }
1027 
1028 /* helper */
1029 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1030 {
1031     if (drbd_peer_req_has_active_page(peer_req)) {
1032         /* This might happen if sendpage() has not finished */
1033         int i = PFN_UP(peer_req->i.size);
1034         atomic_add(i, &device->pp_in_use_by_net);
1035         atomic_sub(i, &device->pp_in_use);
1036         spin_lock_irq(&device->resource->req_lock);
1037         list_add_tail(&peer_req->w.list, &device->net_ee);
1038         spin_unlock_irq(&device->resource->req_lock);
1039         wake_up(&drbd_pp_wait);
1040     } else
1041         drbd_free_peer_req(device, peer_req);
1042 }
1043 
1044 /**
1045  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1046  * @w:      work object.
1047  * @cancel: The connection will be closed anyways
1048  */
1049 int w_e_end_data_req(struct drbd_work *w, int cancel)
1050 {
1051     struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1052     struct drbd_peer_device *peer_device = peer_req->peer_device;
1053     struct drbd_device *device = peer_device->device;
1054     int err;
1055 
1056     if (unlikely(cancel)) {
1057         drbd_free_peer_req(device, peer_req);
1058         dec_unacked(device);
1059         return 0;
1060     }
1061 
1062     if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1063         err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1064     } else {
1065         if (__ratelimit(&drbd_ratelimit_state))
1066             drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1067                 (unsigned long long)peer_req->i.sector);
1068 
1069         err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1070     }
1071 
1072     dec_unacked(device);
1073 
1074     move_to_net_ee_or_free(device, peer_req);
1075 
1076     if (unlikely(err))
1077         drbd_err(device, "drbd_send_block() failed\n");
1078     return err;
1079 }
1080 
1081 static bool all_zero(struct drbd_peer_request *peer_req)
1082 {
1083     struct page *page = peer_req->pages;
1084     unsigned int len = peer_req->i.size;
1085 
1086     page_chain_for_each(page) {
1087         unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1088         unsigned int i, words = l / sizeof(long);
1089         unsigned long *d;
1090 
1091         d = kmap_atomic(page);
1092         for (i = 0; i < words; i++) {
1093             if (d[i]) {
1094                 kunmap_atomic(d);
1095                 return false;
1096             }
1097         }
1098         kunmap_atomic(d);
1099         len -= l;
1100     }
1101 
1102     return true;
1103 }
1104 
1105 /**
1106  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1107  * @w:      work object.
1108  * @cancel: The connection will be closed anyways
1109  */
1110 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1111 {
1112     struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1113     struct drbd_peer_device *peer_device = peer_req->peer_device;
1114     struct drbd_device *device = peer_device->device;
1115     int err;
1116 
1117     if (unlikely(cancel)) {
1118         drbd_free_peer_req(device, peer_req);
1119         dec_unacked(device);
1120         return 0;
1121     }
1122 
1123     if (get_ldev_if_state(device, D_FAILED)) {
1124         drbd_rs_complete_io(device, peer_req->i.sector);
1125         put_ldev(device);
1126     }
1127 
1128     if (device->state.conn == C_AHEAD) {
1129         err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1130     } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1131         if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1132             inc_rs_pending(device);
1133             if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1134                 err = drbd_send_rs_deallocated(peer_device, peer_req);
1135             else
1136                 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1137         } else {
1138             if (__ratelimit(&drbd_ratelimit_state))
1139                 drbd_err(device, "Not sending RSDataReply, "
1140                     "partner DISKLESS!\n");
1141             err = 0;
1142         }
1143     } else {
1144         if (__ratelimit(&drbd_ratelimit_state))
1145             drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1146                 (unsigned long long)peer_req->i.sector);
1147 
1148         err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1149 
1150         /* update resync data with failure */
1151         drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1152     }
1153 
1154     dec_unacked(device);
1155 
1156     move_to_net_ee_or_free(device, peer_req);
1157 
1158     if (unlikely(err))
1159         drbd_err(device, "drbd_send_block() failed\n");
1160     return err;
1161 }
1162 
1163 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1164 {
1165     struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1166     struct drbd_peer_device *peer_device = peer_req->peer_device;
1167     struct drbd_device *device = peer_device->device;
1168     struct digest_info *di;
1169     int digest_size;
1170     void *digest = NULL;
1171     int err, eq = 0;
1172 
1173     if (unlikely(cancel)) {
1174         drbd_free_peer_req(device, peer_req);
1175         dec_unacked(device);
1176         return 0;
1177     }
1178 
1179     if (get_ldev(device)) {
1180         drbd_rs_complete_io(device, peer_req->i.sector);
1181         put_ldev(device);
1182     }
1183 
1184     di = peer_req->digest;
1185 
1186     if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1187         /* quick hack to try to avoid a race against reconfiguration.
1188          * a real fix would be much more involved,
1189          * introducing more locking mechanisms */
1190         if (peer_device->connection->csums_tfm) {
1191             digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1192             D_ASSERT(device, digest_size == di->digest_size);
1193             digest = kmalloc(digest_size, GFP_NOIO);
1194         }
1195         if (digest) {
1196             drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1197             eq = !memcmp(digest, di->digest, digest_size);
1198             kfree(digest);
1199         }
1200 
1201         if (eq) {
1202             drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1203             /* rs_same_csums unit is BM_BLOCK_SIZE */
1204             device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1205             err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1206         } else {
1207             inc_rs_pending(device);
1208             peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1209             peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1210             kfree(di);
1211             err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1212         }
1213     } else {
1214         err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1215         if (__ratelimit(&drbd_ratelimit_state))
1216             drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1217     }
1218 
1219     dec_unacked(device);
1220     move_to_net_ee_or_free(device, peer_req);
1221 
1222     if (unlikely(err))
1223         drbd_err(device, "drbd_send_block/ack() failed\n");
1224     return err;
1225 }
1226 
1227 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1228 {
1229     struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1230     struct drbd_peer_device *peer_device = peer_req->peer_device;
1231     struct drbd_device *device = peer_device->device;
1232     sector_t sector = peer_req->i.sector;
1233     unsigned int size = peer_req->i.size;
1234     int digest_size;
1235     void *digest;
1236     int err = 0;
1237 
1238     if (unlikely(cancel))
1239         goto out;
1240 
1241     digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1242     digest = kmalloc(digest_size, GFP_NOIO);
1243     if (!digest) {
1244         err = 1;    /* terminate the connection in case the allocation failed */
1245         goto out;
1246     }
1247 
1248     if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1249         drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1250     else
1251         memset(digest, 0, digest_size);
1252 
1253     /* Free e and pages before send.
1254      * In case we block on congestion, we could otherwise run into
1255      * some distributed deadlock, if the other side blocks on
1256      * congestion as well, because our receiver blocks in
1257      * drbd_alloc_pages due to pp_in_use > max_buffers. */
1258     drbd_free_peer_req(device, peer_req);
1259     peer_req = NULL;
1260     inc_rs_pending(device);
1261     err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1262     if (err)
1263         dec_rs_pending(device);
1264     kfree(digest);
1265 
1266 out:
1267     if (peer_req)
1268         drbd_free_peer_req(device, peer_req);
1269     dec_unacked(device);
1270     return err;
1271 }
1272 
1273 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1274 {
1275     if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1276         device->ov_last_oos_size += size>>9;
1277     } else {
1278         device->ov_last_oos_start = sector;
1279         device->ov_last_oos_size = size>>9;
1280     }
1281     drbd_set_out_of_sync(device, sector, size);
1282 }
1283 
1284 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1285 {
1286     struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1287     struct drbd_peer_device *peer_device = peer_req->peer_device;
1288     struct drbd_device *device = peer_device->device;
1289     struct digest_info *di;
1290     void *digest;
1291     sector_t sector = peer_req->i.sector;
1292     unsigned int size = peer_req->i.size;
1293     int digest_size;
1294     int err, eq = 0;
1295     bool stop_sector_reached = false;
1296 
1297     if (unlikely(cancel)) {
1298         drbd_free_peer_req(device, peer_req);
1299         dec_unacked(device);
1300         return 0;
1301     }
1302 
1303     /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1304      * the resync lru has been cleaned up already */
1305     if (get_ldev(device)) {
1306         drbd_rs_complete_io(device, peer_req->i.sector);
1307         put_ldev(device);
1308     }
1309 
1310     di = peer_req->digest;
1311 
1312     if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1313         digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1314         digest = kmalloc(digest_size, GFP_NOIO);
1315         if (digest) {
1316             drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1317 
1318             D_ASSERT(device, digest_size == di->digest_size);
1319             eq = !memcmp(digest, di->digest, digest_size);
1320             kfree(digest);
1321         }
1322     }
1323 
1324     /* Free peer_req and pages before send.
1325      * In case we block on congestion, we could otherwise run into
1326      * some distributed deadlock, if the other side blocks on
1327      * congestion as well, because our receiver blocks in
1328      * drbd_alloc_pages due to pp_in_use > max_buffers. */
1329     drbd_free_peer_req(device, peer_req);
1330     if (!eq)
1331         drbd_ov_out_of_sync_found(device, sector, size);
1332     else
1333         ov_out_of_sync_print(device);
1334 
1335     err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1336                    eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1337 
1338     dec_unacked(device);
1339 
1340     --device->ov_left;
1341 
1342     /* let's advance progress step marks only for every other megabyte */
1343     if ((device->ov_left & 0x200) == 0x200)
1344         drbd_advance_rs_marks(device, device->ov_left);
1345 
1346     stop_sector_reached = verify_can_do_stop_sector(device) &&
1347         (sector + (size>>9)) >= device->ov_stop_sector;
1348 
1349     if (device->ov_left == 0 || stop_sector_reached) {
1350         ov_out_of_sync_print(device);
1351         drbd_resync_finished(device);
1352     }
1353 
1354     return err;
1355 }
1356 
1357 /* FIXME
1358  * We need to track the number of pending barrier acks,
1359  * and to be able to wait for them.
1360  * See also comment in drbd_adm_attach before drbd_suspend_io.
1361  */
1362 static int drbd_send_barrier(struct drbd_connection *connection)
1363 {
1364     struct p_barrier *p;
1365     struct drbd_socket *sock;
1366 
1367     sock = &connection->data;
1368     p = conn_prepare_command(connection, sock);
1369     if (!p)
1370         return -EIO;
1371     p->barrier = connection->send.current_epoch_nr;
1372     p->pad = 0;
1373     connection->send.current_epoch_writes = 0;
1374     connection->send.last_sent_barrier_jif = jiffies;
1375 
1376     return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1377 }
1378 
1379 static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1380 {
1381     struct drbd_socket *sock = &pd->connection->data;
1382     if (!drbd_prepare_command(pd, sock))
1383         return -EIO;
1384     return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1385 }
1386 
1387 int w_send_write_hint(struct drbd_work *w, int cancel)
1388 {
1389     struct drbd_device *device =
1390         container_of(w, struct drbd_device, unplug_work);
1391 
1392     if (cancel)
1393         return 0;
1394     return pd_send_unplug_remote(first_peer_device(device));
1395 }
1396 
1397 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1398 {
1399     if (!connection->send.seen_any_write_yet) {
1400         connection->send.seen_any_write_yet = true;
1401         connection->send.current_epoch_nr = epoch;
1402         connection->send.current_epoch_writes = 0;
1403         connection->send.last_sent_barrier_jif = jiffies;
1404     }
1405 }
1406 
1407 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1408 {
1409     /* re-init if first write on this connection */
1410     if (!connection->send.seen_any_write_yet)
1411         return;
1412     if (connection->send.current_epoch_nr != epoch) {
1413         if (connection->send.current_epoch_writes)
1414             drbd_send_barrier(connection);
1415         connection->send.current_epoch_nr = epoch;
1416     }
1417 }
1418 
1419 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1420 {
1421     struct drbd_request *req = container_of(w, struct drbd_request, w);
1422     struct drbd_device *device = req->device;
1423     struct drbd_peer_device *const peer_device = first_peer_device(device);
1424     struct drbd_connection *const connection = peer_device->connection;
1425     int err;
1426 
1427     if (unlikely(cancel)) {
1428         req_mod(req, SEND_CANCELED);
1429         return 0;
1430     }
1431     req->pre_send_jif = jiffies;
1432 
1433     /* this time, no connection->send.current_epoch_writes++;
1434      * If it was sent, it was the closing barrier for the last
1435      * replicated epoch, before we went into AHEAD mode.
1436      * No more barriers will be sent, until we leave AHEAD mode again. */
1437     maybe_send_barrier(connection, req->epoch);
1438 
1439     err = drbd_send_out_of_sync(peer_device, req);
1440     req_mod(req, OOS_HANDED_TO_NETWORK);
1441 
1442     return err;
1443 }
1444 
1445 /**
1446  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1447  * @w:      work object.
1448  * @cancel: The connection will be closed anyways
1449  */
1450 int w_send_dblock(struct drbd_work *w, int cancel)
1451 {
1452     struct drbd_request *req = container_of(w, struct drbd_request, w);
1453     struct drbd_device *device = req->device;
1454     struct drbd_peer_device *const peer_device = first_peer_device(device);
1455     struct drbd_connection *connection = peer_device->connection;
1456     bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1457     int err;
1458 
1459     if (unlikely(cancel)) {
1460         req_mod(req, SEND_CANCELED);
1461         return 0;
1462     }
1463     req->pre_send_jif = jiffies;
1464 
1465     re_init_if_first_write(connection, req->epoch);
1466     maybe_send_barrier(connection, req->epoch);
1467     connection->send.current_epoch_writes++;
1468 
1469     err = drbd_send_dblock(peer_device, req);
1470     req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1471 
1472     if (do_send_unplug && !err)
1473         pd_send_unplug_remote(peer_device);
1474 
1475     return err;
1476 }
1477 
1478 /**
1479  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1480  * @w:      work object.
1481  * @cancel: The connection will be closed anyways
1482  */
1483 int w_send_read_req(struct drbd_work *w, int cancel)
1484 {
1485     struct drbd_request *req = container_of(w, struct drbd_request, w);
1486     struct drbd_device *device = req->device;
1487     struct drbd_peer_device *const peer_device = first_peer_device(device);
1488     struct drbd_connection *connection = peer_device->connection;
1489     bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1490     int err;
1491 
1492     if (unlikely(cancel)) {
1493         req_mod(req, SEND_CANCELED);
1494         return 0;
1495     }
1496     req->pre_send_jif = jiffies;
1497 
1498     /* Even read requests may close a write epoch,
1499      * if there was any yet. */
1500     maybe_send_barrier(connection, req->epoch);
1501 
1502     err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1503                  (unsigned long)req);
1504 
1505     req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1506 
1507     if (do_send_unplug && !err)
1508         pd_send_unplug_remote(peer_device);
1509 
1510     return err;
1511 }
1512 
1513 int w_restart_disk_io(struct drbd_work *w, int cancel)
1514 {
1515     struct drbd_request *req = container_of(w, struct drbd_request, w);
1516     struct drbd_device *device = req->device;
1517 
1518     if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1519         drbd_al_begin_io(device, &req->i);
1520 
1521     req->private_bio = bio_alloc_clone(device->ldev->backing_bdev,
1522                        req->master_bio, GFP_NOIO,
1523                       &drbd_io_bio_set);
1524     req->private_bio->bi_private = req;
1525     req->private_bio->bi_end_io = drbd_request_endio;
1526     submit_bio_noacct(req->private_bio);
1527 
1528     return 0;
1529 }
1530 
1531 static int _drbd_may_sync_now(struct drbd_device *device)
1532 {
1533     struct drbd_device *odev = device;
1534     int resync_after;
1535 
1536     while (1) {
1537         if (!odev->ldev || odev->state.disk == D_DISKLESS)
1538             return 1;
1539         rcu_read_lock();
1540         resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1541         rcu_read_unlock();
1542         if (resync_after == -1)
1543             return 1;
1544         odev = minor_to_device(resync_after);
1545         if (!odev)
1546             return 1;
1547         if ((odev->state.conn >= C_SYNC_SOURCE &&
1548              odev->state.conn <= C_PAUSED_SYNC_T) ||
1549             odev->state.aftr_isp || odev->state.peer_isp ||
1550             odev->state.user_isp)
1551             return 0;
1552     }
1553 }
1554 
1555 /**
1556  * drbd_pause_after() - Pause resync on all devices that may not resync now
1557  * @device: DRBD device.
1558  *
1559  * Called from process context only (admin command and after_state_ch).
1560  */
1561 static bool drbd_pause_after(struct drbd_device *device)
1562 {
1563     bool changed = false;
1564     struct drbd_device *odev;
1565     int i;
1566 
1567     rcu_read_lock();
1568     idr_for_each_entry(&drbd_devices, odev, i) {
1569         if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1570             continue;
1571         if (!_drbd_may_sync_now(odev) &&
1572             _drbd_set_state(_NS(odev, aftr_isp, 1),
1573                     CS_HARD, NULL) != SS_NOTHING_TO_DO)
1574             changed = true;
1575     }
1576     rcu_read_unlock();
1577 
1578     return changed;
1579 }
1580 
1581 /**
1582  * drbd_resume_next() - Resume resync on all devices that may resync now
1583  * @device: DRBD device.
1584  *
1585  * Called from process context only (admin command and worker).
1586  */
1587 static bool drbd_resume_next(struct drbd_device *device)
1588 {
1589     bool changed = false;
1590     struct drbd_device *odev;
1591     int i;
1592 
1593     rcu_read_lock();
1594     idr_for_each_entry(&drbd_devices, odev, i) {
1595         if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1596             continue;
1597         if (odev->state.aftr_isp) {
1598             if (_drbd_may_sync_now(odev) &&
1599                 _drbd_set_state(_NS(odev, aftr_isp, 0),
1600                         CS_HARD, NULL) != SS_NOTHING_TO_DO)
1601                 changed = true;
1602         }
1603     }
1604     rcu_read_unlock();
1605     return changed;
1606 }
1607 
1608 void resume_next_sg(struct drbd_device *device)
1609 {
1610     lock_all_resources();
1611     drbd_resume_next(device);
1612     unlock_all_resources();
1613 }
1614 
1615 void suspend_other_sg(struct drbd_device *device)
1616 {
1617     lock_all_resources();
1618     drbd_pause_after(device);
1619     unlock_all_resources();
1620 }
1621 
1622 /* caller must lock_all_resources() */
1623 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1624 {
1625     struct drbd_device *odev;
1626     int resync_after;
1627 
1628     if (o_minor == -1)
1629         return NO_ERROR;
1630     if (o_minor < -1 || o_minor > MINORMASK)
1631         return ERR_RESYNC_AFTER;
1632 
1633     /* check for loops */
1634     odev = minor_to_device(o_minor);
1635     while (1) {
1636         if (odev == device)
1637             return ERR_RESYNC_AFTER_CYCLE;
1638 
1639         /* You are free to depend on diskless, non-existing,
1640          * or not yet/no longer existing minors.
1641          * We only reject dependency loops.
1642          * We cannot follow the dependency chain beyond a detached or
1643          * missing minor.
1644          */
1645         if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1646             return NO_ERROR;
1647 
1648         rcu_read_lock();
1649         resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1650         rcu_read_unlock();
1651         /* dependency chain ends here, no cycles. */
1652         if (resync_after == -1)
1653             return NO_ERROR;
1654 
1655         /* follow the dependency chain */
1656         odev = minor_to_device(resync_after);
1657     }
1658 }
1659 
1660 /* caller must lock_all_resources() */
1661 void drbd_resync_after_changed(struct drbd_device *device)
1662 {
1663     int changed;
1664 
1665     do {
1666         changed  = drbd_pause_after(device);
1667         changed |= drbd_resume_next(device);
1668     } while (changed);
1669 }
1670 
1671 void drbd_rs_controller_reset(struct drbd_device *device)
1672 {
1673     struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
1674     struct fifo_buffer *plan;
1675 
1676     atomic_set(&device->rs_sect_in, 0);
1677     atomic_set(&device->rs_sect_ev, 0);
1678     device->rs_in_flight = 0;
1679     device->rs_last_events =
1680         (int)part_stat_read_accum(disk->part0, sectors);
1681 
1682     /* Updating the RCU protected object in place is necessary since
1683        this function gets called from atomic context.
1684        It is valid since all other updates also lead to an completely
1685        empty fifo */
1686     rcu_read_lock();
1687     plan = rcu_dereference(device->rs_plan_s);
1688     plan->total = 0;
1689     fifo_set(plan, 0);
1690     rcu_read_unlock();
1691 }
1692 
1693 void start_resync_timer_fn(struct timer_list *t)
1694 {
1695     struct drbd_device *device = from_timer(device, t, start_resync_timer);
1696     drbd_device_post_work(device, RS_START);
1697 }
1698 
1699 static void do_start_resync(struct drbd_device *device)
1700 {
1701     if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1702         drbd_warn(device, "postponing start_resync ...\n");
1703         device->start_resync_timer.expires = jiffies + HZ/10;
1704         add_timer(&device->start_resync_timer);
1705         return;
1706     }
1707 
1708     drbd_start_resync(device, C_SYNC_SOURCE);
1709     clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1710 }
1711 
1712 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1713 {
1714     bool csums_after_crash_only;
1715     rcu_read_lock();
1716     csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1717     rcu_read_unlock();
1718     return connection->agreed_pro_version >= 89 &&      /* supported? */
1719         connection->csums_tfm &&            /* configured? */
1720         (csums_after_crash_only == false        /* use for each resync? */
1721          || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1722 }
1723 
1724 /**
1725  * drbd_start_resync() - Start the resync process
1726  * @device: DRBD device.
1727  * @side:   Either C_SYNC_SOURCE or C_SYNC_TARGET
1728  *
1729  * This function might bring you directly into one of the
1730  * C_PAUSED_SYNC_* states.
1731  */
1732 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1733 {
1734     struct drbd_peer_device *peer_device = first_peer_device(device);
1735     struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1736     union drbd_state ns;
1737     int r;
1738 
1739     if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1740         drbd_err(device, "Resync already running!\n");
1741         return;
1742     }
1743 
1744     if (!connection) {
1745         drbd_err(device, "No connection to peer, aborting!\n");
1746         return;
1747     }
1748 
1749     if (!test_bit(B_RS_H_DONE, &device->flags)) {
1750         if (side == C_SYNC_TARGET) {
1751             /* Since application IO was locked out during C_WF_BITMAP_T and
1752                C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1753                we check that we might make the data inconsistent. */
1754             r = drbd_khelper(device, "before-resync-target");
1755             r = (r >> 8) & 0xff;
1756             if (r > 0) {
1757                 drbd_info(device, "before-resync-target handler returned %d, "
1758                      "dropping connection.\n", r);
1759                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1760                 return;
1761             }
1762         } else /* C_SYNC_SOURCE */ {
1763             r = drbd_khelper(device, "before-resync-source");
1764             r = (r >> 8) & 0xff;
1765             if (r > 0) {
1766                 if (r == 3) {
1767                     drbd_info(device, "before-resync-source handler returned %d, "
1768                          "ignoring. Old userland tools?", r);
1769                 } else {
1770                     drbd_info(device, "before-resync-source handler returned %d, "
1771                          "dropping connection.\n", r);
1772                     conn_request_state(connection,
1773                                NS(conn, C_DISCONNECTING), CS_HARD);
1774                     return;
1775                 }
1776             }
1777         }
1778     }
1779 
1780     if (current == connection->worker.task) {
1781         /* The worker should not sleep waiting for state_mutex,
1782            that can take long */
1783         if (!mutex_trylock(device->state_mutex)) {
1784             set_bit(B_RS_H_DONE, &device->flags);
1785             device->start_resync_timer.expires = jiffies + HZ/5;
1786             add_timer(&device->start_resync_timer);
1787             return;
1788         }
1789     } else {
1790         mutex_lock(device->state_mutex);
1791     }
1792 
1793     lock_all_resources();
1794     clear_bit(B_RS_H_DONE, &device->flags);
1795     /* Did some connection breakage or IO error race with us? */
1796     if (device->state.conn < C_CONNECTED
1797     || !get_ldev_if_state(device, D_NEGOTIATING)) {
1798         unlock_all_resources();
1799         goto out;
1800     }
1801 
1802     ns = drbd_read_state(device);
1803 
1804     ns.aftr_isp = !_drbd_may_sync_now(device);
1805 
1806     ns.conn = side;
1807 
1808     if (side == C_SYNC_TARGET)
1809         ns.disk = D_INCONSISTENT;
1810     else /* side == C_SYNC_SOURCE */
1811         ns.pdsk = D_INCONSISTENT;
1812 
1813     r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1814     ns = drbd_read_state(device);
1815 
1816     if (ns.conn < C_CONNECTED)
1817         r = SS_UNKNOWN_ERROR;
1818 
1819     if (r == SS_SUCCESS) {
1820         unsigned long tw = drbd_bm_total_weight(device);
1821         unsigned long now = jiffies;
1822         int i;
1823 
1824         device->rs_failed    = 0;
1825         device->rs_paused    = 0;
1826         device->rs_same_csum = 0;
1827         device->rs_last_sect_ev = 0;
1828         device->rs_total     = tw;
1829         device->rs_start     = now;
1830         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1831             device->rs_mark_left[i] = tw;
1832             device->rs_mark_time[i] = now;
1833         }
1834         drbd_pause_after(device);
1835         /* Forget potentially stale cached per resync extent bit-counts.
1836          * Open coded drbd_rs_cancel_all(device), we already have IRQs
1837          * disabled, and know the disk state is ok. */
1838         spin_lock(&device->al_lock);
1839         lc_reset(device->resync);
1840         device->resync_locked = 0;
1841         device->resync_wenr = LC_FREE;
1842         spin_unlock(&device->al_lock);
1843     }
1844     unlock_all_resources();
1845 
1846     if (r == SS_SUCCESS) {
1847         wake_up(&device->al_wait); /* for lc_reset() above */
1848         /* reset rs_last_bcast when a resync or verify is started,
1849          * to deal with potential jiffies wrap. */
1850         device->rs_last_bcast = jiffies - HZ;
1851 
1852         drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1853              drbd_conn_str(ns.conn),
1854              (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1855              (unsigned long) device->rs_total);
1856         if (side == C_SYNC_TARGET) {
1857             device->bm_resync_fo = 0;
1858             device->use_csums = use_checksum_based_resync(connection, device);
1859         } else {
1860             device->use_csums = false;
1861         }
1862 
1863         /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1864          * with w_send_oos, or the sync target will get confused as to
1865          * how much bits to resync.  We cannot do that always, because for an
1866          * empty resync and protocol < 95, we need to do it here, as we call
1867          * drbd_resync_finished from here in that case.
1868          * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1869          * and from after_state_ch otherwise. */
1870         if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1871             drbd_gen_and_send_sync_uuid(peer_device);
1872 
1873         if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1874             /* This still has a race (about when exactly the peers
1875              * detect connection loss) that can lead to a full sync
1876              * on next handshake. In 8.3.9 we fixed this with explicit
1877              * resync-finished notifications, but the fix
1878              * introduces a protocol change.  Sleeping for some
1879              * time longer than the ping interval + timeout on the
1880              * SyncSource, to give the SyncTarget the chance to
1881              * detect connection loss, then waiting for a ping
1882              * response (implicit in drbd_resync_finished) reduces
1883              * the race considerably, but does not solve it. */
1884             if (side == C_SYNC_SOURCE) {
1885                 struct net_conf *nc;
1886                 int timeo;
1887 
1888                 rcu_read_lock();
1889                 nc = rcu_dereference(connection->net_conf);
1890                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1891                 rcu_read_unlock();
1892                 schedule_timeout_interruptible(timeo);
1893             }
1894             drbd_resync_finished(device);
1895         }
1896 
1897         drbd_rs_controller_reset(device);
1898         /* ns.conn may already be != device->state.conn,
1899          * we may have been paused in between, or become paused until
1900          * the timer triggers.
1901          * No matter, that is handled in resync_timer_fn() */
1902         if (ns.conn == C_SYNC_TARGET)
1903             mod_timer(&device->resync_timer, jiffies);
1904 
1905         drbd_md_sync(device);
1906     }
1907     put_ldev(device);
1908 out:
1909     mutex_unlock(device->state_mutex);
1910 }
1911 
1912 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1913 {
1914     struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1915     device->rs_last_bcast = jiffies;
1916 
1917     if (!get_ldev(device))
1918         return;
1919 
1920     drbd_bm_write_lazy(device, 0);
1921     if (resync_done && is_sync_state(device->state.conn))
1922         drbd_resync_finished(device);
1923 
1924     drbd_bcast_event(device, &sib);
1925     /* update timestamp, in case it took a while to write out stuff */
1926     device->rs_last_bcast = jiffies;
1927     put_ldev(device);
1928 }
1929 
1930 static void drbd_ldev_destroy(struct drbd_device *device)
1931 {
1932     lc_destroy(device->resync);
1933     device->resync = NULL;
1934     lc_destroy(device->act_log);
1935     device->act_log = NULL;
1936 
1937     __acquire(local);
1938     drbd_backing_dev_free(device, device->ldev);
1939     device->ldev = NULL;
1940     __release(local);
1941 
1942     clear_bit(GOING_DISKLESS, &device->flags);
1943     wake_up(&device->misc_wait);
1944 }
1945 
1946 static void go_diskless(struct drbd_device *device)
1947 {
1948     D_ASSERT(device, device->state.disk == D_FAILED);
1949     /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1950      * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1951      * the protected members anymore, though, so once put_ldev reaches zero
1952      * again, it will be safe to free them. */
1953 
1954     /* Try to write changed bitmap pages, read errors may have just
1955      * set some bits outside the area covered by the activity log.
1956      *
1957      * If we have an IO error during the bitmap writeout,
1958      * we will want a full sync next time, just in case.
1959      * (Do we want a specific meta data flag for this?)
1960      *
1961      * If that does not make it to stable storage either,
1962      * we cannot do anything about that anymore.
1963      *
1964      * We still need to check if both bitmap and ldev are present, we may
1965      * end up here after a failed attach, before ldev was even assigned.
1966      */
1967     if (device->bitmap && device->ldev) {
1968         /* An interrupted resync or similar is allowed to recounts bits
1969          * while we detach.
1970          * Any modifications would not be expected anymore, though.
1971          */
1972         if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1973                     "detach", BM_LOCKED_TEST_ALLOWED)) {
1974             if (test_bit(WAS_READ_ERROR, &device->flags)) {
1975                 drbd_md_set_flag(device, MDF_FULL_SYNC);
1976                 drbd_md_sync(device);
1977             }
1978         }
1979     }
1980 
1981     drbd_force_state(device, NS(disk, D_DISKLESS));
1982 }
1983 
1984 static int do_md_sync(struct drbd_device *device)
1985 {
1986     drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1987     drbd_md_sync(device);
1988     return 0;
1989 }
1990 
1991 /* only called from drbd_worker thread, no locking */
1992 void __update_timing_details(
1993         struct drbd_thread_timing_details *tdp,
1994         unsigned int *cb_nr,
1995         void *cb,
1996         const char *fn, const unsigned int line)
1997 {
1998     unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1999     struct drbd_thread_timing_details *td = tdp + i;
2000 
2001     td->start_jif = jiffies;
2002     td->cb_addr = cb;
2003     td->caller_fn = fn;
2004     td->line = line;
2005     td->cb_nr = *cb_nr;
2006 
2007     i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2008     td = tdp + i;
2009     memset(td, 0, sizeof(*td));
2010 
2011     ++(*cb_nr);
2012 }
2013 
2014 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2015 {
2016     if (test_bit(MD_SYNC, &todo))
2017         do_md_sync(device);
2018     if (test_bit(RS_DONE, &todo) ||
2019         test_bit(RS_PROGRESS, &todo))
2020         update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2021     if (test_bit(GO_DISKLESS, &todo))
2022         go_diskless(device);
2023     if (test_bit(DESTROY_DISK, &todo))
2024         drbd_ldev_destroy(device);
2025     if (test_bit(RS_START, &todo))
2026         do_start_resync(device);
2027 }
2028 
2029 #define DRBD_DEVICE_WORK_MASK   \
2030     ((1UL << GO_DISKLESS)   \
2031     |(1UL << DESTROY_DISK)  \
2032     |(1UL << MD_SYNC)   \
2033     |(1UL << RS_START)  \
2034     |(1UL << RS_PROGRESS)   \
2035     |(1UL << RS_DONE)   \
2036     )
2037 
2038 static unsigned long get_work_bits(unsigned long *flags)
2039 {
2040     unsigned long old, new;
2041     do {
2042         old = *flags;
2043         new = old & ~DRBD_DEVICE_WORK_MASK;
2044     } while (cmpxchg(flags, old, new) != old);
2045     return old & DRBD_DEVICE_WORK_MASK;
2046 }
2047 
2048 static void do_unqueued_work(struct drbd_connection *connection)
2049 {
2050     struct drbd_peer_device *peer_device;
2051     int vnr;
2052 
2053     rcu_read_lock();
2054     idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2055         struct drbd_device *device = peer_device->device;
2056         unsigned long todo = get_work_bits(&device->flags);
2057         if (!todo)
2058             continue;
2059 
2060         kref_get(&device->kref);
2061         rcu_read_unlock();
2062         do_device_work(device, todo);
2063         kref_put(&device->kref, drbd_destroy_device);
2064         rcu_read_lock();
2065     }
2066     rcu_read_unlock();
2067 }
2068 
2069 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2070 {
2071     spin_lock_irq(&queue->q_lock);
2072     list_splice_tail_init(&queue->q, work_list);
2073     spin_unlock_irq(&queue->q_lock);
2074     return !list_empty(work_list);
2075 }
2076 
2077 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2078 {
2079     DEFINE_WAIT(wait);
2080     struct net_conf *nc;
2081     int uncork, cork;
2082 
2083     dequeue_work_batch(&connection->sender_work, work_list);
2084     if (!list_empty(work_list))
2085         return;
2086 
2087     /* Still nothing to do?
2088      * Maybe we still need to close the current epoch,
2089      * even if no new requests are queued yet.
2090      *
2091      * Also, poke TCP, just in case.
2092      * Then wait for new work (or signal). */
2093     rcu_read_lock();
2094     nc = rcu_dereference(connection->net_conf);
2095     uncork = nc ? nc->tcp_cork : 0;
2096     rcu_read_unlock();
2097     if (uncork) {
2098         mutex_lock(&connection->data.mutex);
2099         if (connection->data.socket)
2100             tcp_sock_set_cork(connection->data.socket->sk, false);
2101         mutex_unlock(&connection->data.mutex);
2102     }
2103 
2104     for (;;) {
2105         int send_barrier;
2106         prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2107         spin_lock_irq(&connection->resource->req_lock);
2108         spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2109         if (!list_empty(&connection->sender_work.q))
2110             list_splice_tail_init(&connection->sender_work.q, work_list);
2111         spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2112         if (!list_empty(work_list) || signal_pending(current)) {
2113             spin_unlock_irq(&connection->resource->req_lock);
2114             break;
2115         }
2116 
2117         /* We found nothing new to do, no to-be-communicated request,
2118          * no other work item.  We may still need to close the last
2119          * epoch.  Next incoming request epoch will be connection ->
2120          * current transfer log epoch number.  If that is different
2121          * from the epoch of the last request we communicated, it is
2122          * safe to send the epoch separating barrier now.
2123          */
2124         send_barrier =
2125             atomic_read(&connection->current_tle_nr) !=
2126             connection->send.current_epoch_nr;
2127         spin_unlock_irq(&connection->resource->req_lock);
2128 
2129         if (send_barrier)
2130             maybe_send_barrier(connection,
2131                     connection->send.current_epoch_nr + 1);
2132 
2133         if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2134             break;
2135 
2136         /* drbd_send() may have called flush_signals() */
2137         if (get_t_state(&connection->worker) != RUNNING)
2138             break;
2139 
2140         schedule();
2141         /* may be woken up for other things but new work, too,
2142          * e.g. if the current epoch got closed.
2143          * In which case we send the barrier above. */
2144     }
2145     finish_wait(&connection->sender_work.q_wait, &wait);
2146 
2147     /* someone may have changed the config while we have been waiting above. */
2148     rcu_read_lock();
2149     nc = rcu_dereference(connection->net_conf);
2150     cork = nc ? nc->tcp_cork : 0;
2151     rcu_read_unlock();
2152     mutex_lock(&connection->data.mutex);
2153     if (connection->data.socket) {
2154         if (cork)
2155             tcp_sock_set_cork(connection->data.socket->sk, true);
2156         else if (!uncork)
2157             tcp_sock_set_cork(connection->data.socket->sk, false);
2158     }
2159     mutex_unlock(&connection->data.mutex);
2160 }
2161 
2162 int drbd_worker(struct drbd_thread *thi)
2163 {
2164     struct drbd_connection *connection = thi->connection;
2165     struct drbd_work *w = NULL;
2166     struct drbd_peer_device *peer_device;
2167     LIST_HEAD(work_list);
2168     int vnr;
2169 
2170     while (get_t_state(thi) == RUNNING) {
2171         drbd_thread_current_set_cpu(thi);
2172 
2173         if (list_empty(&work_list)) {
2174             update_worker_timing_details(connection, wait_for_work);
2175             wait_for_work(connection, &work_list);
2176         }
2177 
2178         if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2179             update_worker_timing_details(connection, do_unqueued_work);
2180             do_unqueued_work(connection);
2181         }
2182 
2183         if (signal_pending(current)) {
2184             flush_signals(current);
2185             if (get_t_state(thi) == RUNNING) {
2186                 drbd_warn(connection, "Worker got an unexpected signal\n");
2187                 continue;
2188             }
2189             break;
2190         }
2191 
2192         if (get_t_state(thi) != RUNNING)
2193             break;
2194 
2195         if (!list_empty(&work_list)) {
2196             w = list_first_entry(&work_list, struct drbd_work, list);
2197             list_del_init(&w->list);
2198             update_worker_timing_details(connection, w->cb);
2199             if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2200                 continue;
2201             if (connection->cstate >= C_WF_REPORT_PARAMS)
2202                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2203         }
2204     }
2205 
2206     do {
2207         if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2208             update_worker_timing_details(connection, do_unqueued_work);
2209             do_unqueued_work(connection);
2210         }
2211         if (!list_empty(&work_list)) {
2212             w = list_first_entry(&work_list, struct drbd_work, list);
2213             list_del_init(&w->list);
2214             update_worker_timing_details(connection, w->cb);
2215             w->cb(w, 1);
2216         } else
2217             dequeue_work_batch(&connection->sender_work, &work_list);
2218     } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2219 
2220     rcu_read_lock();
2221     idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2222         struct drbd_device *device = peer_device->device;
2223         D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2224         kref_get(&device->kref);
2225         rcu_read_unlock();
2226         drbd_device_cleanup(device);
2227         kref_put(&device->kref, drbd_destroy_device);
2228         rcu_read_lock();
2229     }
2230     rcu_read_unlock();
2231 
2232     return 0;
2233 }