Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-or-later
0002 /*
0003    drbd_req.c
0004 
0005    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
0006 
0007    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
0008    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
0009    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
0010 
0011 
0012  */
0013 
0014 #include <linux/module.h>
0015 
0016 #include <linux/slab.h>
0017 #include <linux/drbd.h>
0018 #include "drbd_int.h"
0019 #include "drbd_req.h"
0020 
0021 
0022 static bool drbd_may_do_local_read(struct drbd_device *device, sector_t sector, int size);
0023 
0024 static struct drbd_request *drbd_req_new(struct drbd_device *device, struct bio *bio_src)
0025 {
0026     struct drbd_request *req;
0027 
0028     req = mempool_alloc(&drbd_request_mempool, GFP_NOIO);
0029     if (!req)
0030         return NULL;
0031     memset(req, 0, sizeof(*req));
0032 
0033     req->private_bio = bio_alloc_clone(device->ldev->backing_bdev, bio_src,
0034                        GFP_NOIO, &drbd_io_bio_set);
0035     req->private_bio->bi_private = req;
0036     req->private_bio->bi_end_io = drbd_request_endio;
0037 
0038     req->rq_state = (bio_data_dir(bio_src) == WRITE ? RQ_WRITE : 0)
0039               | (bio_op(bio_src) == REQ_OP_WRITE_ZEROES ? RQ_ZEROES : 0)
0040               | (bio_op(bio_src) == REQ_OP_DISCARD ? RQ_UNMAP : 0);
0041     req->device = device;
0042     req->master_bio = bio_src;
0043     req->epoch = 0;
0044 
0045     drbd_clear_interval(&req->i);
0046     req->i.sector     = bio_src->bi_iter.bi_sector;
0047     req->i.size      = bio_src->bi_iter.bi_size;
0048     req->i.local = true;
0049     req->i.waiting = false;
0050 
0051     INIT_LIST_HEAD(&req->tl_requests);
0052     INIT_LIST_HEAD(&req->w.list);
0053     INIT_LIST_HEAD(&req->req_pending_master_completion);
0054     INIT_LIST_HEAD(&req->req_pending_local);
0055 
0056     /* one reference to be put by __drbd_make_request */
0057     atomic_set(&req->completion_ref, 1);
0058     /* one kref as long as completion_ref > 0 */
0059     kref_init(&req->kref);
0060     return req;
0061 }
0062 
0063 static void drbd_remove_request_interval(struct rb_root *root,
0064                      struct drbd_request *req)
0065 {
0066     struct drbd_device *device = req->device;
0067     struct drbd_interval *i = &req->i;
0068 
0069     drbd_remove_interval(root, i);
0070 
0071     /* Wake up any processes waiting for this request to complete.  */
0072     if (i->waiting)
0073         wake_up(&device->misc_wait);
0074 }
0075 
0076 void drbd_req_destroy(struct kref *kref)
0077 {
0078     struct drbd_request *req = container_of(kref, struct drbd_request, kref);
0079     struct drbd_device *device = req->device;
0080     const unsigned s = req->rq_state;
0081 
0082     if ((req->master_bio && !(s & RQ_POSTPONED)) ||
0083         atomic_read(&req->completion_ref) ||
0084         (s & RQ_LOCAL_PENDING) ||
0085         ((s & RQ_NET_MASK) && !(s & RQ_NET_DONE))) {
0086         drbd_err(device, "drbd_req_destroy: Logic BUG rq_state = 0x%x, completion_ref = %d\n",
0087                 s, atomic_read(&req->completion_ref));
0088         return;
0089     }
0090 
0091     /* If called from mod_rq_state (expected normal case) or
0092      * drbd_send_and_submit (the less likely normal path), this holds the
0093      * req_lock, and req->tl_requests will typicaly be on ->transfer_log,
0094      * though it may be still empty (never added to the transfer log).
0095      *
0096      * If called from do_retry(), we do NOT hold the req_lock, but we are
0097      * still allowed to unconditionally list_del(&req->tl_requests),
0098      * because it will be on a local on-stack list only. */
0099     list_del_init(&req->tl_requests);
0100 
0101     /* finally remove the request from the conflict detection
0102      * respective block_id verification interval tree. */
0103     if (!drbd_interval_empty(&req->i)) {
0104         struct rb_root *root;
0105 
0106         if (s & RQ_WRITE)
0107             root = &device->write_requests;
0108         else
0109             root = &device->read_requests;
0110         drbd_remove_request_interval(root, req);
0111     } else if (s & (RQ_NET_MASK & ~RQ_NET_DONE) && req->i.size != 0)
0112         drbd_err(device, "drbd_req_destroy: Logic BUG: interval empty, but: rq_state=0x%x, sect=%llu, size=%u\n",
0113             s, (unsigned long long)req->i.sector, req->i.size);
0114 
0115     /* if it was a write, we may have to set the corresponding
0116      * bit(s) out-of-sync first. If it had a local part, we need to
0117      * release the reference to the activity log. */
0118     if (s & RQ_WRITE) {
0119         /* Set out-of-sync unless both OK flags are set
0120          * (local only or remote failed).
0121          * Other places where we set out-of-sync:
0122          * READ with local io-error */
0123 
0124         /* There is a special case:
0125          * we may notice late that IO was suspended,
0126          * and postpone, or schedule for retry, a write,
0127          * before it even was submitted or sent.
0128          * In that case we do not want to touch the bitmap at all.
0129          */
0130         if ((s & (RQ_POSTPONED|RQ_LOCAL_MASK|RQ_NET_MASK)) != RQ_POSTPONED) {
0131             if (!(s & RQ_NET_OK) || !(s & RQ_LOCAL_OK))
0132                 drbd_set_out_of_sync(device, req->i.sector, req->i.size);
0133 
0134             if ((s & RQ_NET_OK) && (s & RQ_LOCAL_OK) && (s & RQ_NET_SIS))
0135                 drbd_set_in_sync(device, req->i.sector, req->i.size);
0136         }
0137 
0138         /* one might be tempted to move the drbd_al_complete_io
0139          * to the local io completion callback drbd_request_endio.
0140          * but, if this was a mirror write, we may only
0141          * drbd_al_complete_io after this is RQ_NET_DONE,
0142          * otherwise the extent could be dropped from the al
0143          * before it has actually been written on the peer.
0144          * if we crash before our peer knows about the request,
0145          * but after the extent has been dropped from the al,
0146          * we would forget to resync the corresponding extent.
0147          */
0148         if (s & RQ_IN_ACT_LOG) {
0149             if (get_ldev_if_state(device, D_FAILED)) {
0150                 drbd_al_complete_io(device, &req->i);
0151                 put_ldev(device);
0152             } else if (__ratelimit(&drbd_ratelimit_state)) {
0153                 drbd_warn(device, "Should have called drbd_al_complete_io(, %llu, %u), "
0154                      "but my Disk seems to have failed :(\n",
0155                      (unsigned long long) req->i.sector, req->i.size);
0156             }
0157         }
0158     }
0159 
0160     mempool_free(req, &drbd_request_mempool);
0161 }
0162 
0163 static void wake_all_senders(struct drbd_connection *connection)
0164 {
0165     wake_up(&connection->sender_work.q_wait);
0166 }
0167 
0168 /* must hold resource->req_lock */
0169 void start_new_tl_epoch(struct drbd_connection *connection)
0170 {
0171     /* no point closing an epoch, if it is empty, anyways. */
0172     if (connection->current_tle_writes == 0)
0173         return;
0174 
0175     connection->current_tle_writes = 0;
0176     atomic_inc(&connection->current_tle_nr);
0177     wake_all_senders(connection);
0178 }
0179 
0180 void complete_master_bio(struct drbd_device *device,
0181         struct bio_and_error *m)
0182 {
0183     if (unlikely(m->error))
0184         m->bio->bi_status = errno_to_blk_status(m->error);
0185     bio_endio(m->bio);
0186     dec_ap_bio(device);
0187 }
0188 
0189 
0190 /* Helper for __req_mod().
0191  * Set m->bio to the master bio, if it is fit to be completed,
0192  * or leave it alone (it is initialized to NULL in __req_mod),
0193  * if it has already been completed, or cannot be completed yet.
0194  * If m->bio is set, the error status to be returned is placed in m->error.
0195  */
0196 static
0197 void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m)
0198 {
0199     const unsigned s = req->rq_state;
0200     struct drbd_device *device = req->device;
0201     int error, ok;
0202 
0203     /* we must not complete the master bio, while it is
0204      *  still being processed by _drbd_send_zc_bio (drbd_send_dblock)
0205      *  not yet acknowledged by the peer
0206      *  not yet completed by the local io subsystem
0207      * these flags may get cleared in any order by
0208      *  the worker,
0209      *  the receiver,
0210      *  the bio_endio completion callbacks.
0211      */
0212     if ((s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED)) ||
0213         (s & RQ_NET_QUEUED) || (s & RQ_NET_PENDING) ||
0214         (s & RQ_COMPLETION_SUSP)) {
0215         drbd_err(device, "drbd_req_complete: Logic BUG rq_state = 0x%x\n", s);
0216         return;
0217     }
0218 
0219     if (!req->master_bio) {
0220         drbd_err(device, "drbd_req_complete: Logic BUG, master_bio == NULL!\n");
0221         return;
0222     }
0223 
0224     /*
0225      * figure out whether to report success or failure.
0226      *
0227      * report success when at least one of the operations succeeded.
0228      * or, to put the other way,
0229      * only report failure, when both operations failed.
0230      *
0231      * what to do about the failures is handled elsewhere.
0232      * what we need to do here is just: complete the master_bio.
0233      *
0234      * local completion error, if any, has been stored as ERR_PTR
0235      * in private_bio within drbd_request_endio.
0236      */
0237     ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
0238     error = PTR_ERR(req->private_bio);
0239 
0240     /* Before we can signal completion to the upper layers,
0241      * we may need to close the current transfer log epoch.
0242      * We are within the request lock, so we can simply compare
0243      * the request epoch number with the current transfer log
0244      * epoch number.  If they match, increase the current_tle_nr,
0245      * and reset the transfer log epoch write_cnt.
0246      */
0247     if (op_is_write(bio_op(req->master_bio)) &&
0248         req->epoch == atomic_read(&first_peer_device(device)->connection->current_tle_nr))
0249         start_new_tl_epoch(first_peer_device(device)->connection);
0250 
0251     /* Update disk stats */
0252     bio_end_io_acct(req->master_bio, req->start_jif);
0253 
0254     /* If READ failed,
0255      * have it be pushed back to the retry work queue,
0256      * so it will re-enter __drbd_make_request(),
0257      * and be re-assigned to a suitable local or remote path,
0258      * or failed if we do not have access to good data anymore.
0259      *
0260      * Unless it was failed early by __drbd_make_request(),
0261      * because no path was available, in which case
0262      * it was not even added to the transfer_log.
0263      *
0264      * read-ahead may fail, and will not be retried.
0265      *
0266      * WRITE should have used all available paths already.
0267      */
0268     if (!ok &&
0269         bio_op(req->master_bio) == REQ_OP_READ &&
0270         !(req->master_bio->bi_opf & REQ_RAHEAD) &&
0271         !list_empty(&req->tl_requests))
0272         req->rq_state |= RQ_POSTPONED;
0273 
0274     if (!(req->rq_state & RQ_POSTPONED)) {
0275         m->error = ok ? 0 : (error ?: -EIO);
0276         m->bio = req->master_bio;
0277         req->master_bio = NULL;
0278         /* We leave it in the tree, to be able to verify later
0279          * write-acks in protocol != C during resync.
0280          * But we mark it as "complete", so it won't be counted as
0281          * conflict in a multi-primary setup. */
0282         req->i.completed = true;
0283     }
0284 
0285     if (req->i.waiting)
0286         wake_up(&device->misc_wait);
0287 
0288     /* Either we are about to complete to upper layers,
0289      * or we will restart this request.
0290      * In either case, the request object will be destroyed soon,
0291      * so better remove it from all lists. */
0292     list_del_init(&req->req_pending_master_completion);
0293 }
0294 
0295 /* still holds resource->req_lock */
0296 static void drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and_error *m, int put)
0297 {
0298     struct drbd_device *device = req->device;
0299     D_ASSERT(device, m || (req->rq_state & RQ_POSTPONED));
0300 
0301     if (!put)
0302         return;
0303 
0304     if (!atomic_sub_and_test(put, &req->completion_ref))
0305         return;
0306 
0307     drbd_req_complete(req, m);
0308 
0309     /* local completion may still come in later,
0310      * we need to keep the req object around. */
0311     if (req->rq_state & RQ_LOCAL_ABORTED)
0312         return;
0313 
0314     if (req->rq_state & RQ_POSTPONED) {
0315         /* don't destroy the req object just yet,
0316          * but queue it for retry */
0317         drbd_restart_request(req);
0318         return;
0319     }
0320 
0321     kref_put(&req->kref, drbd_req_destroy);
0322 }
0323 
0324 static void set_if_null_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req)
0325 {
0326     struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
0327     if (!connection)
0328         return;
0329     if (connection->req_next == NULL)
0330         connection->req_next = req;
0331 }
0332 
0333 static void advance_conn_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req)
0334 {
0335     struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
0336     struct drbd_request *iter = req;
0337     if (!connection)
0338         return;
0339     if (connection->req_next != req)
0340         return;
0341 
0342     req = NULL;
0343     list_for_each_entry_continue(iter, &connection->transfer_log, tl_requests) {
0344         const unsigned int s = iter->rq_state;
0345 
0346         if (s & RQ_NET_QUEUED) {
0347             req = iter;
0348             break;
0349         }
0350     }
0351     connection->req_next = req;
0352 }
0353 
0354 static void set_if_null_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req)
0355 {
0356     struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
0357     if (!connection)
0358         return;
0359     if (connection->req_ack_pending == NULL)
0360         connection->req_ack_pending = req;
0361 }
0362 
0363 static void advance_conn_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req)
0364 {
0365     struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
0366     struct drbd_request *iter = req;
0367     if (!connection)
0368         return;
0369     if (connection->req_ack_pending != req)
0370         return;
0371 
0372     req = NULL;
0373     list_for_each_entry_continue(iter, &connection->transfer_log, tl_requests) {
0374         const unsigned int s = iter->rq_state;
0375 
0376         if ((s & RQ_NET_SENT) && (s & RQ_NET_PENDING)) {
0377             req = iter;
0378             break;
0379         }
0380     }
0381     connection->req_ack_pending = req;
0382 }
0383 
0384 static void set_if_null_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req)
0385 {
0386     struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
0387     if (!connection)
0388         return;
0389     if (connection->req_not_net_done == NULL)
0390         connection->req_not_net_done = req;
0391 }
0392 
0393 static void advance_conn_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req)
0394 {
0395     struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
0396     struct drbd_request *iter = req;
0397     if (!connection)
0398         return;
0399     if (connection->req_not_net_done != req)
0400         return;
0401 
0402     req = NULL;
0403     list_for_each_entry_continue(iter, &connection->transfer_log, tl_requests) {
0404         const unsigned int s = iter->rq_state;
0405 
0406         if ((s & RQ_NET_SENT) && !(s & RQ_NET_DONE)) {
0407             req = iter;
0408             break;
0409         }
0410     }
0411     connection->req_not_net_done = req;
0412 }
0413 
0414 /* I'd like this to be the only place that manipulates
0415  * req->completion_ref and req->kref. */
0416 static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m,
0417         int clear, int set)
0418 {
0419     struct drbd_device *device = req->device;
0420     struct drbd_peer_device *peer_device = first_peer_device(device);
0421     unsigned s = req->rq_state;
0422     int c_put = 0;
0423 
0424     if (drbd_suspended(device) && !((s | clear) & RQ_COMPLETION_SUSP))
0425         set |= RQ_COMPLETION_SUSP;
0426 
0427     /* apply */
0428 
0429     req->rq_state &= ~clear;
0430     req->rq_state |= set;
0431 
0432     /* no change? */
0433     if (req->rq_state == s)
0434         return;
0435 
0436     /* intent: get references */
0437 
0438     kref_get(&req->kref);
0439 
0440     if (!(s & RQ_LOCAL_PENDING) && (set & RQ_LOCAL_PENDING))
0441         atomic_inc(&req->completion_ref);
0442 
0443     if (!(s & RQ_NET_PENDING) && (set & RQ_NET_PENDING)) {
0444         inc_ap_pending(device);
0445         atomic_inc(&req->completion_ref);
0446     }
0447 
0448     if (!(s & RQ_NET_QUEUED) && (set & RQ_NET_QUEUED)) {
0449         atomic_inc(&req->completion_ref);
0450         set_if_null_req_next(peer_device, req);
0451     }
0452 
0453     if (!(s & RQ_EXP_BARR_ACK) && (set & RQ_EXP_BARR_ACK))
0454         kref_get(&req->kref); /* wait for the DONE */
0455 
0456     if (!(s & RQ_NET_SENT) && (set & RQ_NET_SENT)) {
0457         /* potentially already completed in the ack_receiver thread */
0458         if (!(s & RQ_NET_DONE)) {
0459             atomic_add(req->i.size >> 9, &device->ap_in_flight);
0460             set_if_null_req_not_net_done(peer_device, req);
0461         }
0462         if (req->rq_state & RQ_NET_PENDING)
0463             set_if_null_req_ack_pending(peer_device, req);
0464     }
0465 
0466     if (!(s & RQ_COMPLETION_SUSP) && (set & RQ_COMPLETION_SUSP))
0467         atomic_inc(&req->completion_ref);
0468 
0469     /* progress: put references */
0470 
0471     if ((s & RQ_COMPLETION_SUSP) && (clear & RQ_COMPLETION_SUSP))
0472         ++c_put;
0473 
0474     if (!(s & RQ_LOCAL_ABORTED) && (set & RQ_LOCAL_ABORTED)) {
0475         D_ASSERT(device, req->rq_state & RQ_LOCAL_PENDING);
0476         ++c_put;
0477     }
0478 
0479     if ((s & RQ_LOCAL_PENDING) && (clear & RQ_LOCAL_PENDING)) {
0480         if (req->rq_state & RQ_LOCAL_ABORTED)
0481             kref_put(&req->kref, drbd_req_destroy);
0482         else
0483             ++c_put;
0484         list_del_init(&req->req_pending_local);
0485     }
0486 
0487     if ((s & RQ_NET_PENDING) && (clear & RQ_NET_PENDING)) {
0488         dec_ap_pending(device);
0489         ++c_put;
0490         req->acked_jif = jiffies;
0491         advance_conn_req_ack_pending(peer_device, req);
0492     }
0493 
0494     if ((s & RQ_NET_QUEUED) && (clear & RQ_NET_QUEUED)) {
0495         ++c_put;
0496         advance_conn_req_next(peer_device, req);
0497     }
0498 
0499     if (!(s & RQ_NET_DONE) && (set & RQ_NET_DONE)) {
0500         if (s & RQ_NET_SENT)
0501             atomic_sub(req->i.size >> 9, &device->ap_in_flight);
0502         if (s & RQ_EXP_BARR_ACK)
0503             kref_put(&req->kref, drbd_req_destroy);
0504         req->net_done_jif = jiffies;
0505 
0506         /* in ahead/behind mode, or just in case,
0507          * before we finally destroy this request,
0508          * the caching pointers must not reference it anymore */
0509         advance_conn_req_next(peer_device, req);
0510         advance_conn_req_ack_pending(peer_device, req);
0511         advance_conn_req_not_net_done(peer_device, req);
0512     }
0513 
0514     /* potentially complete and destroy */
0515 
0516     /* If we made progress, retry conflicting peer requests, if any. */
0517     if (req->i.waiting)
0518         wake_up(&device->misc_wait);
0519 
0520     drbd_req_put_completion_ref(req, m, c_put);
0521     kref_put(&req->kref, drbd_req_destroy);
0522 }
0523 
0524 static void drbd_report_io_error(struct drbd_device *device, struct drbd_request *req)
0525 {
0526     if (!__ratelimit(&drbd_ratelimit_state))
0527         return;
0528 
0529     drbd_warn(device, "local %s IO error sector %llu+%u on %pg\n",
0530             (req->rq_state & RQ_WRITE) ? "WRITE" : "READ",
0531             (unsigned long long)req->i.sector,
0532             req->i.size >> 9,
0533             device->ldev->backing_bdev);
0534 }
0535 
0536 /* Helper for HANDED_OVER_TO_NETWORK.
0537  * Is this a protocol A write (neither WRITE_ACK nor RECEIVE_ACK expected)?
0538  * Is it also still "PENDING"?
0539  * --> If so, clear PENDING and set NET_OK below.
0540  * If it is a protocol A write, but not RQ_PENDING anymore, neg-ack was faster
0541  * (and we must not set RQ_NET_OK) */
0542 static inline bool is_pending_write_protocol_A(struct drbd_request *req)
0543 {
0544     return (req->rq_state &
0545            (RQ_WRITE|RQ_NET_PENDING|RQ_EXP_WRITE_ACK|RQ_EXP_RECEIVE_ACK))
0546         == (RQ_WRITE|RQ_NET_PENDING);
0547 }
0548 
0549 /* obviously this could be coded as many single functions
0550  * instead of one huge switch,
0551  * or by putting the code directly in the respective locations
0552  * (as it has been before).
0553  *
0554  * but having it this way
0555  *  enforces that it is all in this one place, where it is easier to audit,
0556  *  it makes it obvious that whatever "event" "happens" to a request should
0557  *  happen "atomically" within the req_lock,
0558  *  and it enforces that we have to think in a very structured manner
0559  *  about the "events" that may happen to a request during its life time ...
0560  */
0561 int __req_mod(struct drbd_request *req, enum drbd_req_event what,
0562         struct bio_and_error *m)
0563 {
0564     struct drbd_device *const device = req->device;
0565     struct drbd_peer_device *const peer_device = first_peer_device(device);
0566     struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
0567     struct net_conf *nc;
0568     int p, rv = 0;
0569 
0570     if (m)
0571         m->bio = NULL;
0572 
0573     switch (what) {
0574     default:
0575         drbd_err(device, "LOGIC BUG in %s:%u\n", __FILE__ , __LINE__);
0576         break;
0577 
0578     /* does not happen...
0579      * initialization done in drbd_req_new
0580     case CREATED:
0581         break;
0582         */
0583 
0584     case TO_BE_SENT: /* via network */
0585         /* reached via __drbd_make_request
0586          * and from w_read_retry_remote */
0587         D_ASSERT(device, !(req->rq_state & RQ_NET_MASK));
0588         rcu_read_lock();
0589         nc = rcu_dereference(connection->net_conf);
0590         p = nc->wire_protocol;
0591         rcu_read_unlock();
0592         req->rq_state |=
0593             p == DRBD_PROT_C ? RQ_EXP_WRITE_ACK :
0594             p == DRBD_PROT_B ? RQ_EXP_RECEIVE_ACK : 0;
0595         mod_rq_state(req, m, 0, RQ_NET_PENDING);
0596         break;
0597 
0598     case TO_BE_SUBMITTED: /* locally */
0599         /* reached via __drbd_make_request */
0600         D_ASSERT(device, !(req->rq_state & RQ_LOCAL_MASK));
0601         mod_rq_state(req, m, 0, RQ_LOCAL_PENDING);
0602         break;
0603 
0604     case COMPLETED_OK:
0605         if (req->rq_state & RQ_WRITE)
0606             device->writ_cnt += req->i.size >> 9;
0607         else
0608             device->read_cnt += req->i.size >> 9;
0609 
0610         mod_rq_state(req, m, RQ_LOCAL_PENDING,
0611                 RQ_LOCAL_COMPLETED|RQ_LOCAL_OK);
0612         break;
0613 
0614     case ABORT_DISK_IO:
0615         mod_rq_state(req, m, 0, RQ_LOCAL_ABORTED);
0616         break;
0617 
0618     case WRITE_COMPLETED_WITH_ERROR:
0619         drbd_report_io_error(device, req);
0620         __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
0621         mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
0622         break;
0623 
0624     case READ_COMPLETED_WITH_ERROR:
0625         drbd_set_out_of_sync(device, req->i.sector, req->i.size);
0626         drbd_report_io_error(device, req);
0627         __drbd_chk_io_error(device, DRBD_READ_ERROR);
0628         fallthrough;
0629     case READ_AHEAD_COMPLETED_WITH_ERROR:
0630         /* it is legal to fail read-ahead, no __drbd_chk_io_error in that case. */
0631         mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
0632         break;
0633 
0634     case DISCARD_COMPLETED_NOTSUPP:
0635     case DISCARD_COMPLETED_WITH_ERROR:
0636         /* I'd rather not detach from local disk just because it
0637          * failed a REQ_OP_DISCARD. */
0638         mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
0639         break;
0640 
0641     case QUEUE_FOR_NET_READ:
0642         /* READ, and
0643          * no local disk,
0644          * or target area marked as invalid,
0645          * or just got an io-error. */
0646         /* from __drbd_make_request
0647          * or from bio_endio during read io-error recovery */
0648 
0649         /* So we can verify the handle in the answer packet.
0650          * Corresponding drbd_remove_request_interval is in
0651          * drbd_req_complete() */
0652         D_ASSERT(device, drbd_interval_empty(&req->i));
0653         drbd_insert_interval(&device->read_requests, &req->i);
0654 
0655         set_bit(UNPLUG_REMOTE, &device->flags);
0656 
0657         D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
0658         D_ASSERT(device, (req->rq_state & RQ_LOCAL_MASK) == 0);
0659         mod_rq_state(req, m, 0, RQ_NET_QUEUED);
0660         req->w.cb = w_send_read_req;
0661         drbd_queue_work(&connection->sender_work,
0662                 &req->w);
0663         break;
0664 
0665     case QUEUE_FOR_NET_WRITE:
0666         /* assert something? */
0667         /* from __drbd_make_request only */
0668 
0669         /* Corresponding drbd_remove_request_interval is in
0670          * drbd_req_complete() */
0671         D_ASSERT(device, drbd_interval_empty(&req->i));
0672         drbd_insert_interval(&device->write_requests, &req->i);
0673 
0674         /* NOTE
0675          * In case the req ended up on the transfer log before being
0676          * queued on the worker, it could lead to this request being
0677          * missed during cleanup after connection loss.
0678          * So we have to do both operations here,
0679          * within the same lock that protects the transfer log.
0680          *
0681          * _req_add_to_epoch(req); this has to be after the
0682          * _maybe_start_new_epoch(req); which happened in
0683          * __drbd_make_request, because we now may set the bit
0684          * again ourselves to close the current epoch.
0685          *
0686          * Add req to the (now) current epoch (barrier). */
0687 
0688         /* otherwise we may lose an unplug, which may cause some remote
0689          * io-scheduler timeout to expire, increasing maximum latency,
0690          * hurting performance. */
0691         set_bit(UNPLUG_REMOTE, &device->flags);
0692 
0693         /* queue work item to send data */
0694         D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
0695         mod_rq_state(req, m, 0, RQ_NET_QUEUED|RQ_EXP_BARR_ACK);
0696         req->w.cb =  w_send_dblock;
0697         drbd_queue_work(&connection->sender_work,
0698                 &req->w);
0699 
0700         /* close the epoch, in case it outgrew the limit */
0701         rcu_read_lock();
0702         nc = rcu_dereference(connection->net_conf);
0703         p = nc->max_epoch_size;
0704         rcu_read_unlock();
0705         if (connection->current_tle_writes >= p)
0706             start_new_tl_epoch(connection);
0707 
0708         break;
0709 
0710     case QUEUE_FOR_SEND_OOS:
0711         mod_rq_state(req, m, 0, RQ_NET_QUEUED);
0712         req->w.cb =  w_send_out_of_sync;
0713         drbd_queue_work(&connection->sender_work,
0714                 &req->w);
0715         break;
0716 
0717     case READ_RETRY_REMOTE_CANCELED:
0718     case SEND_CANCELED:
0719     case SEND_FAILED:
0720         /* real cleanup will be done from tl_clear.  just update flags
0721          * so it is no longer marked as on the worker queue */
0722         mod_rq_state(req, m, RQ_NET_QUEUED, 0);
0723         break;
0724 
0725     case HANDED_OVER_TO_NETWORK:
0726         /* assert something? */
0727         if (is_pending_write_protocol_A(req))
0728             /* this is what is dangerous about protocol A:
0729              * pretend it was successfully written on the peer. */
0730             mod_rq_state(req, m, RQ_NET_QUEUED|RQ_NET_PENDING,
0731                         RQ_NET_SENT|RQ_NET_OK);
0732         else
0733             mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_SENT);
0734         /* It is still not yet RQ_NET_DONE until the
0735          * corresponding epoch barrier got acked as well,
0736          * so we know what to dirty on connection loss. */
0737         break;
0738 
0739     case OOS_HANDED_TO_NETWORK:
0740         /* Was not set PENDING, no longer QUEUED, so is now DONE
0741          * as far as this connection is concerned. */
0742         mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_DONE);
0743         break;
0744 
0745     case CONNECTION_LOST_WHILE_PENDING:
0746         /* transfer log cleanup after connection loss */
0747         mod_rq_state(req, m,
0748                 RQ_NET_OK|RQ_NET_PENDING|RQ_COMPLETION_SUSP,
0749                 RQ_NET_DONE);
0750         break;
0751 
0752     case CONFLICT_RESOLVED:
0753         /* for superseded conflicting writes of multiple primaries,
0754          * there is no need to keep anything in the tl, potential
0755          * node crashes are covered by the activity log.
0756          *
0757          * If this request had been marked as RQ_POSTPONED before,
0758          * it will actually not be completed, but "restarted",
0759          * resubmitted from the retry worker context. */
0760         D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
0761         D_ASSERT(device, req->rq_state & RQ_EXP_WRITE_ACK);
0762         mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_DONE|RQ_NET_OK);
0763         break;
0764 
0765     case WRITE_ACKED_BY_PEER_AND_SIS:
0766         req->rq_state |= RQ_NET_SIS;
0767         fallthrough;
0768     case WRITE_ACKED_BY_PEER:
0769         /* Normal operation protocol C: successfully written on peer.
0770          * During resync, even in protocol != C,
0771          * we requested an explicit write ack anyways.
0772          * Which means we cannot even assert anything here.
0773          * Nothing more to do here.
0774          * We want to keep the tl in place for all protocols, to cater
0775          * for volatile write-back caches on lower level devices. */
0776         goto ack_common;
0777     case RECV_ACKED_BY_PEER:
0778         D_ASSERT(device, req->rq_state & RQ_EXP_RECEIVE_ACK);
0779         /* protocol B; pretends to be successfully written on peer.
0780          * see also notes above in HANDED_OVER_TO_NETWORK about
0781          * protocol != C */
0782     ack_common:
0783         mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK);
0784         break;
0785 
0786     case POSTPONE_WRITE:
0787         D_ASSERT(device, req->rq_state & RQ_EXP_WRITE_ACK);
0788         /* If this node has already detected the write conflict, the
0789          * worker will be waiting on misc_wait.  Wake it up once this
0790          * request has completed locally.
0791          */
0792         D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
0793         req->rq_state |= RQ_POSTPONED;
0794         if (req->i.waiting)
0795             wake_up(&device->misc_wait);
0796         /* Do not clear RQ_NET_PENDING. This request will make further
0797          * progress via restart_conflicting_writes() or
0798          * fail_postponed_requests(). Hopefully. */
0799         break;
0800 
0801     case NEG_ACKED:
0802         mod_rq_state(req, m, RQ_NET_OK|RQ_NET_PENDING, 0);
0803         break;
0804 
0805     case FAIL_FROZEN_DISK_IO:
0806         if (!(req->rq_state & RQ_LOCAL_COMPLETED))
0807             break;
0808         mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0);
0809         break;
0810 
0811     case RESTART_FROZEN_DISK_IO:
0812         if (!(req->rq_state & RQ_LOCAL_COMPLETED))
0813             break;
0814 
0815         mod_rq_state(req, m,
0816                 RQ_COMPLETION_SUSP|RQ_LOCAL_COMPLETED,
0817                 RQ_LOCAL_PENDING);
0818 
0819         rv = MR_READ;
0820         if (bio_data_dir(req->master_bio) == WRITE)
0821             rv = MR_WRITE;
0822 
0823         get_ldev(device); /* always succeeds in this call path */
0824         req->w.cb = w_restart_disk_io;
0825         drbd_queue_work(&connection->sender_work,
0826                 &req->w);
0827         break;
0828 
0829     case RESEND:
0830         /* Simply complete (local only) READs. */
0831         if (!(req->rq_state & RQ_WRITE) && !req->w.cb) {
0832             mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0);
0833             break;
0834         }
0835 
0836         /* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK
0837            before the connection loss (B&C only); only P_BARRIER_ACK
0838            (or the local completion?) was missing when we suspended.
0839            Throwing them out of the TL here by pretending we got a BARRIER_ACK.
0840            During connection handshake, we ensure that the peer was not rebooted. */
0841         if (!(req->rq_state & RQ_NET_OK)) {
0842             /* FIXME could this possibly be a req->dw.cb == w_send_out_of_sync?
0843              * in that case we must not set RQ_NET_PENDING. */
0844 
0845             mod_rq_state(req, m, RQ_COMPLETION_SUSP, RQ_NET_QUEUED|RQ_NET_PENDING);
0846             if (req->w.cb) {
0847                 /* w.cb expected to be w_send_dblock, or w_send_read_req */
0848                 drbd_queue_work(&connection->sender_work,
0849                         &req->w);
0850                 rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
0851             } /* else: FIXME can this happen? */
0852             break;
0853         }
0854         fallthrough;    /* to BARRIER_ACKED */
0855 
0856     case BARRIER_ACKED:
0857         /* barrier ack for READ requests does not make sense */
0858         if (!(req->rq_state & RQ_WRITE))
0859             break;
0860 
0861         if (req->rq_state & RQ_NET_PENDING) {
0862             /* barrier came in before all requests were acked.
0863              * this is bad, because if the connection is lost now,
0864              * we won't be able to clean them up... */
0865             drbd_err(device, "FIXME (BARRIER_ACKED but pending)\n");
0866         }
0867         /* Allowed to complete requests, even while suspended.
0868          * As this is called for all requests within a matching epoch,
0869          * we need to filter, and only set RQ_NET_DONE for those that
0870          * have actually been on the wire. */
0871         mod_rq_state(req, m, RQ_COMPLETION_SUSP,
0872                 (req->rq_state & RQ_NET_MASK) ? RQ_NET_DONE : 0);
0873         break;
0874 
0875     case DATA_RECEIVED:
0876         D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
0877         mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK|RQ_NET_DONE);
0878         break;
0879 
0880     case QUEUE_AS_DRBD_BARRIER:
0881         start_new_tl_epoch(connection);
0882         mod_rq_state(req, m, 0, RQ_NET_OK|RQ_NET_DONE);
0883         break;
0884     }
0885 
0886     return rv;
0887 }
0888 
0889 /* we may do a local read if:
0890  * - we are consistent (of course),
0891  * - or we are generally inconsistent,
0892  *   BUT we are still/already IN SYNC for this area.
0893  *   since size may be bigger than BM_BLOCK_SIZE,
0894  *   we may need to check several bits.
0895  */
0896 static bool drbd_may_do_local_read(struct drbd_device *device, sector_t sector, int size)
0897 {
0898     unsigned long sbnr, ebnr;
0899     sector_t esector, nr_sectors;
0900 
0901     if (device->state.disk == D_UP_TO_DATE)
0902         return true;
0903     if (device->state.disk != D_INCONSISTENT)
0904         return false;
0905     esector = sector + (size >> 9) - 1;
0906     nr_sectors = get_capacity(device->vdisk);
0907     D_ASSERT(device, sector  < nr_sectors);
0908     D_ASSERT(device, esector < nr_sectors);
0909 
0910     sbnr = BM_SECT_TO_BIT(sector);
0911     ebnr = BM_SECT_TO_BIT(esector);
0912 
0913     return drbd_bm_count_bits(device, sbnr, ebnr) == 0;
0914 }
0915 
0916 static bool remote_due_to_read_balancing(struct drbd_device *device, sector_t sector,
0917         enum drbd_read_balancing rbm)
0918 {
0919     int stripe_shift;
0920 
0921     switch (rbm) {
0922     case RB_CONGESTED_REMOTE:
0923         return false;
0924     case RB_LEAST_PENDING:
0925         return atomic_read(&device->local_cnt) >
0926             atomic_read(&device->ap_pending_cnt) + atomic_read(&device->rs_pending_cnt);
0927     case RB_32K_STRIPING:  /* stripe_shift = 15 */
0928     case RB_64K_STRIPING:
0929     case RB_128K_STRIPING:
0930     case RB_256K_STRIPING:
0931     case RB_512K_STRIPING:
0932     case RB_1M_STRIPING:   /* stripe_shift = 20 */
0933         stripe_shift = (rbm - RB_32K_STRIPING + 15);
0934         return (sector >> (stripe_shift - 9)) & 1;
0935     case RB_ROUND_ROBIN:
0936         return test_and_change_bit(READ_BALANCE_RR, &device->flags);
0937     case RB_PREFER_REMOTE:
0938         return true;
0939     case RB_PREFER_LOCAL:
0940     default:
0941         return false;
0942     }
0943 }
0944 
0945 /*
0946  * complete_conflicting_writes  -  wait for any conflicting write requests
0947  *
0948  * The write_requests tree contains all active write requests which we
0949  * currently know about.  Wait for any requests to complete which conflict with
0950  * the new one.
0951  *
0952  * Only way out: remove the conflicting intervals from the tree.
0953  */
0954 static void complete_conflicting_writes(struct drbd_request *req)
0955 {
0956     DEFINE_WAIT(wait);
0957     struct drbd_device *device = req->device;
0958     struct drbd_interval *i;
0959     sector_t sector = req->i.sector;
0960     int size = req->i.size;
0961 
0962     for (;;) {
0963         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
0964             /* Ignore, if already completed to upper layers. */
0965             if (i->completed)
0966                 continue;
0967             /* Handle the first found overlap.  After the schedule
0968              * we have to restart the tree walk. */
0969             break;
0970         }
0971         if (!i) /* if any */
0972             break;
0973 
0974         /* Indicate to wake up device->misc_wait on progress.  */
0975         prepare_to_wait(&device->misc_wait, &wait, TASK_UNINTERRUPTIBLE);
0976         i->waiting = true;
0977         spin_unlock_irq(&device->resource->req_lock);
0978         schedule();
0979         spin_lock_irq(&device->resource->req_lock);
0980     }
0981     finish_wait(&device->misc_wait, &wait);
0982 }
0983 
0984 /* called within req_lock */
0985 static void maybe_pull_ahead(struct drbd_device *device)
0986 {
0987     struct drbd_connection *connection = first_peer_device(device)->connection;
0988     struct net_conf *nc;
0989     bool congested = false;
0990     enum drbd_on_congestion on_congestion;
0991 
0992     rcu_read_lock();
0993     nc = rcu_dereference(connection->net_conf);
0994     on_congestion = nc ? nc->on_congestion : OC_BLOCK;
0995     rcu_read_unlock();
0996     if (on_congestion == OC_BLOCK ||
0997         connection->agreed_pro_version < 96)
0998         return;
0999 
1000     if (on_congestion == OC_PULL_AHEAD && device->state.conn == C_AHEAD)
1001         return; /* nothing to do ... */
1002 
1003     /* If I don't even have good local storage, we can not reasonably try
1004      * to pull ahead of the peer. We also need the local reference to make
1005      * sure device->act_log is there.
1006      */
1007     if (!get_ldev_if_state(device, D_UP_TO_DATE))
1008         return;
1009 
1010     if (nc->cong_fill &&
1011         atomic_read(&device->ap_in_flight) >= nc->cong_fill) {
1012         drbd_info(device, "Congestion-fill threshold reached\n");
1013         congested = true;
1014     }
1015 
1016     if (device->act_log->used >= nc->cong_extents) {
1017         drbd_info(device, "Congestion-extents threshold reached\n");
1018         congested = true;
1019     }
1020 
1021     if (congested) {
1022         /* start a new epoch for non-mirrored writes */
1023         start_new_tl_epoch(first_peer_device(device)->connection);
1024 
1025         if (on_congestion == OC_PULL_AHEAD)
1026             _drbd_set_state(_NS(device, conn, C_AHEAD), 0, NULL);
1027         else  /*nc->on_congestion == OC_DISCONNECT */
1028             _drbd_set_state(_NS(device, conn, C_DISCONNECTING), 0, NULL);
1029     }
1030     put_ldev(device);
1031 }
1032 
1033 /* If this returns false, and req->private_bio is still set,
1034  * this should be submitted locally.
1035  *
1036  * If it returns false, but req->private_bio is not set,
1037  * we do not have access to good data :(
1038  *
1039  * Otherwise, this destroys req->private_bio, if any,
1040  * and returns true.
1041  */
1042 static bool do_remote_read(struct drbd_request *req)
1043 {
1044     struct drbd_device *device = req->device;
1045     enum drbd_read_balancing rbm;
1046 
1047     if (req->private_bio) {
1048         if (!drbd_may_do_local_read(device,
1049                     req->i.sector, req->i.size)) {
1050             bio_put(req->private_bio);
1051             req->private_bio = NULL;
1052             put_ldev(device);
1053         }
1054     }
1055 
1056     if (device->state.pdsk != D_UP_TO_DATE)
1057         return false;
1058 
1059     if (req->private_bio == NULL)
1060         return true;
1061 
1062     /* TODO: improve read balancing decisions, take into account drbd
1063      * protocol, pending requests etc. */
1064 
1065     rcu_read_lock();
1066     rbm = rcu_dereference(device->ldev->disk_conf)->read_balancing;
1067     rcu_read_unlock();
1068 
1069     if (rbm == RB_PREFER_LOCAL && req->private_bio)
1070         return false; /* submit locally */
1071 
1072     if (remote_due_to_read_balancing(device, req->i.sector, rbm)) {
1073         if (req->private_bio) {
1074             bio_put(req->private_bio);
1075             req->private_bio = NULL;
1076             put_ldev(device);
1077         }
1078         return true;
1079     }
1080 
1081     return false;
1082 }
1083 
1084 bool drbd_should_do_remote(union drbd_dev_state s)
1085 {
1086     return s.pdsk == D_UP_TO_DATE ||
1087         (s.pdsk >= D_INCONSISTENT &&
1088          s.conn >= C_WF_BITMAP_T &&
1089          s.conn < C_AHEAD);
1090     /* Before proto 96 that was >= CONNECTED instead of >= C_WF_BITMAP_T.
1091        That is equivalent since before 96 IO was frozen in the C_WF_BITMAP*
1092        states. */
1093 }
1094 
1095 static bool drbd_should_send_out_of_sync(union drbd_dev_state s)
1096 {
1097     return s.conn == C_AHEAD || s.conn == C_WF_BITMAP_S;
1098     /* pdsk = D_INCONSISTENT as a consequence. Protocol 96 check not necessary
1099        since we enter state C_AHEAD only if proto >= 96 */
1100 }
1101 
1102 /* returns number of connections (== 1, for drbd 8.4)
1103  * expected to actually write this data,
1104  * which does NOT include those that we are L_AHEAD for. */
1105 static int drbd_process_write_request(struct drbd_request *req)
1106 {
1107     struct drbd_device *device = req->device;
1108     int remote, send_oos;
1109 
1110     remote = drbd_should_do_remote(device->state);
1111     send_oos = drbd_should_send_out_of_sync(device->state);
1112 
1113     /* Need to replicate writes.  Unless it is an empty flush,
1114      * which is better mapped to a DRBD P_BARRIER packet,
1115      * also for drbd wire protocol compatibility reasons.
1116      * If this was a flush, just start a new epoch.
1117      * Unless the current epoch was empty anyways, or we are not currently
1118      * replicating, in which case there is no point. */
1119     if (unlikely(req->i.size == 0)) {
1120         /* The only size==0 bios we expect are empty flushes. */
1121         D_ASSERT(device, req->master_bio->bi_opf & REQ_PREFLUSH);
1122         if (remote)
1123             _req_mod(req, QUEUE_AS_DRBD_BARRIER);
1124         return remote;
1125     }
1126 
1127     if (!remote && !send_oos)
1128         return 0;
1129 
1130     D_ASSERT(device, !(remote && send_oos));
1131 
1132     if (remote) {
1133         _req_mod(req, TO_BE_SENT);
1134         _req_mod(req, QUEUE_FOR_NET_WRITE);
1135     } else if (drbd_set_out_of_sync(device, req->i.sector, req->i.size))
1136         _req_mod(req, QUEUE_FOR_SEND_OOS);
1137 
1138     return remote;
1139 }
1140 
1141 static void drbd_process_discard_or_zeroes_req(struct drbd_request *req, int flags)
1142 {
1143     int err = drbd_issue_discard_or_zero_out(req->device,
1144                 req->i.sector, req->i.size >> 9, flags);
1145     if (err)
1146         req->private_bio->bi_status = BLK_STS_IOERR;
1147     bio_endio(req->private_bio);
1148 }
1149 
1150 static void
1151 drbd_submit_req_private_bio(struct drbd_request *req)
1152 {
1153     struct drbd_device *device = req->device;
1154     struct bio *bio = req->private_bio;
1155     unsigned int type;
1156 
1157     if (bio_op(bio) != REQ_OP_READ)
1158         type = DRBD_FAULT_DT_WR;
1159     else if (bio->bi_opf & REQ_RAHEAD)
1160         type = DRBD_FAULT_DT_RA;
1161     else
1162         type = DRBD_FAULT_DT_RD;
1163 
1164     /* State may have changed since we grabbed our reference on the
1165      * ->ldev member. Double check, and short-circuit to endio.
1166      * In case the last activity log transaction failed to get on
1167      * stable storage, and this is a WRITE, we may not even submit
1168      * this bio. */
1169     if (get_ldev(device)) {
1170         if (drbd_insert_fault(device, type))
1171             bio_io_error(bio);
1172         else if (bio_op(bio) == REQ_OP_WRITE_ZEROES)
1173             drbd_process_discard_or_zeroes_req(req, EE_ZEROOUT |
1174                 ((bio->bi_opf & REQ_NOUNMAP) ? 0 : EE_TRIM));
1175         else if (bio_op(bio) == REQ_OP_DISCARD)
1176             drbd_process_discard_or_zeroes_req(req, EE_TRIM);
1177         else
1178             submit_bio_noacct(bio);
1179         put_ldev(device);
1180     } else
1181         bio_io_error(bio);
1182 }
1183 
1184 static void drbd_queue_write(struct drbd_device *device, struct drbd_request *req)
1185 {
1186     spin_lock_irq(&device->resource->req_lock);
1187     list_add_tail(&req->tl_requests, &device->submit.writes);
1188     list_add_tail(&req->req_pending_master_completion,
1189             &device->pending_master_completion[1 /* WRITE */]);
1190     spin_unlock_irq(&device->resource->req_lock);
1191     queue_work(device->submit.wq, &device->submit.worker);
1192     /* do_submit() may sleep internally on al_wait, too */
1193     wake_up(&device->al_wait);
1194 }
1195 
1196 /* returns the new drbd_request pointer, if the caller is expected to
1197  * drbd_send_and_submit() it (to save latency), or NULL if we queued the
1198  * request on the submitter thread.
1199  * Returns ERR_PTR(-ENOMEM) if we cannot allocate a drbd_request.
1200  */
1201 static struct drbd_request *
1202 drbd_request_prepare(struct drbd_device *device, struct bio *bio)
1203 {
1204     const int rw = bio_data_dir(bio);
1205     struct drbd_request *req;
1206 
1207     /* allocate outside of all locks; */
1208     req = drbd_req_new(device, bio);
1209     if (!req) {
1210         dec_ap_bio(device);
1211         /* only pass the error to the upper layers.
1212          * if user cannot handle io errors, that's not our business. */
1213         drbd_err(device, "could not kmalloc() req\n");
1214         bio->bi_status = BLK_STS_RESOURCE;
1215         bio_endio(bio);
1216         return ERR_PTR(-ENOMEM);
1217     }
1218 
1219     /* Update disk stats */
1220     req->start_jif = bio_start_io_acct(req->master_bio);
1221 
1222     if (!get_ldev(device)) {
1223         bio_put(req->private_bio);
1224         req->private_bio = NULL;
1225     }
1226 
1227     /* process discards always from our submitter thread */
1228     if (bio_op(bio) == REQ_OP_WRITE_ZEROES ||
1229         bio_op(bio) == REQ_OP_DISCARD)
1230         goto queue_for_submitter_thread;
1231 
1232     if (rw == WRITE && req->private_bio && req->i.size
1233     && !test_bit(AL_SUSPENDED, &device->flags)) {
1234         if (!drbd_al_begin_io_fastpath(device, &req->i))
1235             goto queue_for_submitter_thread;
1236         req->rq_state |= RQ_IN_ACT_LOG;
1237         req->in_actlog_jif = jiffies;
1238     }
1239     return req;
1240 
1241  queue_for_submitter_thread:
1242     atomic_inc(&device->ap_actlog_cnt);
1243     drbd_queue_write(device, req);
1244     return NULL;
1245 }
1246 
1247 /* Require at least one path to current data.
1248  * We don't want to allow writes on C_STANDALONE D_INCONSISTENT:
1249  * We would not allow to read what was written,
1250  * we would not have bumped the data generation uuids,
1251  * we would cause data divergence for all the wrong reasons.
1252  *
1253  * If we don't see at least one D_UP_TO_DATE, we will fail this request,
1254  * which either returns EIO, or, if OND_SUSPEND_IO is set, suspends IO,
1255  * and queues for retry later.
1256  */
1257 static bool may_do_writes(struct drbd_device *device)
1258 {
1259     const union drbd_dev_state s = device->state;
1260     return s.disk == D_UP_TO_DATE || s.pdsk == D_UP_TO_DATE;
1261 }
1262 
1263 struct drbd_plug_cb {
1264     struct blk_plug_cb cb;
1265     struct drbd_request *most_recent_req;
1266     /* do we need more? */
1267 };
1268 
1269 static void drbd_unplug(struct blk_plug_cb *cb, bool from_schedule)
1270 {
1271     struct drbd_plug_cb *plug = container_of(cb, struct drbd_plug_cb, cb);
1272     struct drbd_resource *resource = plug->cb.data;
1273     struct drbd_request *req = plug->most_recent_req;
1274 
1275     kfree(cb);
1276     if (!req)
1277         return;
1278 
1279     spin_lock_irq(&resource->req_lock);
1280     /* In case the sender did not process it yet, raise the flag to
1281      * have it followed with P_UNPLUG_REMOTE just after. */
1282     req->rq_state |= RQ_UNPLUG;
1283     /* but also queue a generic unplug */
1284     drbd_queue_unplug(req->device);
1285     kref_put(&req->kref, drbd_req_destroy);
1286     spin_unlock_irq(&resource->req_lock);
1287 }
1288 
1289 static struct drbd_plug_cb* drbd_check_plugged(struct drbd_resource *resource)
1290 {
1291     /* A lot of text to say
1292      * return (struct drbd_plug_cb*)blk_check_plugged(); */
1293     struct drbd_plug_cb *plug;
1294     struct blk_plug_cb *cb = blk_check_plugged(drbd_unplug, resource, sizeof(*plug));
1295 
1296     if (cb)
1297         plug = container_of(cb, struct drbd_plug_cb, cb);
1298     else
1299         plug = NULL;
1300     return plug;
1301 }
1302 
1303 static void drbd_update_plug(struct drbd_plug_cb *plug, struct drbd_request *req)
1304 {
1305     struct drbd_request *tmp = plug->most_recent_req;
1306     /* Will be sent to some peer.
1307      * Remember to tag it with UNPLUG_REMOTE on unplug */
1308     kref_get(&req->kref);
1309     plug->most_recent_req = req;
1310     if (tmp)
1311         kref_put(&tmp->kref, drbd_req_destroy);
1312 }
1313 
1314 static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request *req)
1315 {
1316     struct drbd_resource *resource = device->resource;
1317     const int rw = bio_data_dir(req->master_bio);
1318     struct bio_and_error m = { NULL, };
1319     bool no_remote = false;
1320     bool submit_private_bio = false;
1321 
1322     spin_lock_irq(&resource->req_lock);
1323     if (rw == WRITE) {
1324         /* This may temporarily give up the req_lock,
1325          * but will re-aquire it before it returns here.
1326          * Needs to be before the check on drbd_suspended() */
1327         complete_conflicting_writes(req);
1328         /* no more giving up req_lock from now on! */
1329 
1330         /* check for congestion, and potentially stop sending
1331          * full data updates, but start sending "dirty bits" only. */
1332         maybe_pull_ahead(device);
1333     }
1334 
1335 
1336     if (drbd_suspended(device)) {
1337         /* push back and retry: */
1338         req->rq_state |= RQ_POSTPONED;
1339         if (req->private_bio) {
1340             bio_put(req->private_bio);
1341             req->private_bio = NULL;
1342             put_ldev(device);
1343         }
1344         goto out;
1345     }
1346 
1347     /* We fail READ early, if we can not serve it.
1348      * We must do this before req is registered on any lists.
1349      * Otherwise, drbd_req_complete() will queue failed READ for retry. */
1350     if (rw != WRITE) {
1351         if (!do_remote_read(req) && !req->private_bio)
1352             goto nodata;
1353     }
1354 
1355     /* which transfer log epoch does this belong to? */
1356     req->epoch = atomic_read(&first_peer_device(device)->connection->current_tle_nr);
1357 
1358     /* no point in adding empty flushes to the transfer log,
1359      * they are mapped to drbd barriers already. */
1360     if (likely(req->i.size!=0)) {
1361         if (rw == WRITE)
1362             first_peer_device(device)->connection->current_tle_writes++;
1363 
1364         list_add_tail(&req->tl_requests, &first_peer_device(device)->connection->transfer_log);
1365     }
1366 
1367     if (rw == WRITE) {
1368         if (req->private_bio && !may_do_writes(device)) {
1369             bio_put(req->private_bio);
1370             req->private_bio = NULL;
1371             put_ldev(device);
1372             goto nodata;
1373         }
1374         if (!drbd_process_write_request(req))
1375             no_remote = true;
1376     } else {
1377         /* We either have a private_bio, or we can read from remote.
1378          * Otherwise we had done the goto nodata above. */
1379         if (req->private_bio == NULL) {
1380             _req_mod(req, TO_BE_SENT);
1381             _req_mod(req, QUEUE_FOR_NET_READ);
1382         } else
1383             no_remote = true;
1384     }
1385 
1386     if (no_remote == false) {
1387         struct drbd_plug_cb *plug = drbd_check_plugged(resource);
1388         if (plug)
1389             drbd_update_plug(plug, req);
1390     }
1391 
1392     /* If it took the fast path in drbd_request_prepare, add it here.
1393      * The slow path has added it already. */
1394     if (list_empty(&req->req_pending_master_completion))
1395         list_add_tail(&req->req_pending_master_completion,
1396             &device->pending_master_completion[rw == WRITE]);
1397     if (req->private_bio) {
1398         /* needs to be marked within the same spinlock */
1399         req->pre_submit_jif = jiffies;
1400         list_add_tail(&req->req_pending_local,
1401             &device->pending_completion[rw == WRITE]);
1402         _req_mod(req, TO_BE_SUBMITTED);
1403         /* but we need to give up the spinlock to submit */
1404         submit_private_bio = true;
1405     } else if (no_remote) {
1406 nodata:
1407         if (__ratelimit(&drbd_ratelimit_state))
1408             drbd_err(device, "IO ERROR: neither local nor remote data, sector %llu+%u\n",
1409                     (unsigned long long)req->i.sector, req->i.size >> 9);
1410         /* A write may have been queued for send_oos, however.
1411          * So we can not simply free it, we must go through drbd_req_put_completion_ref() */
1412     }
1413 
1414 out:
1415     drbd_req_put_completion_ref(req, &m, 1);
1416     spin_unlock_irq(&resource->req_lock);
1417 
1418     /* Even though above is a kref_put(), this is safe.
1419      * As long as we still need to submit our private bio,
1420      * we hold a completion ref, and the request cannot disappear.
1421      * If however this request did not even have a private bio to submit
1422      * (e.g. remote read), req may already be invalid now.
1423      * That's why we cannot check on req->private_bio. */
1424     if (submit_private_bio)
1425         drbd_submit_req_private_bio(req);
1426     if (m.bio)
1427         complete_master_bio(device, &m);
1428 }
1429 
1430 void __drbd_make_request(struct drbd_device *device, struct bio *bio)
1431 {
1432     struct drbd_request *req = drbd_request_prepare(device, bio);
1433     if (IS_ERR_OR_NULL(req))
1434         return;
1435     drbd_send_and_submit(device, req);
1436 }
1437 
1438 static void submit_fast_path(struct drbd_device *device, struct list_head *incoming)
1439 {
1440     struct blk_plug plug;
1441     struct drbd_request *req, *tmp;
1442 
1443     blk_start_plug(&plug);
1444     list_for_each_entry_safe(req, tmp, incoming, tl_requests) {
1445         const int rw = bio_data_dir(req->master_bio);
1446 
1447         if (rw == WRITE /* rw != WRITE should not even end up here! */
1448         && req->private_bio && req->i.size
1449         && !test_bit(AL_SUSPENDED, &device->flags)) {
1450             if (!drbd_al_begin_io_fastpath(device, &req->i))
1451                 continue;
1452 
1453             req->rq_state |= RQ_IN_ACT_LOG;
1454             req->in_actlog_jif = jiffies;
1455             atomic_dec(&device->ap_actlog_cnt);
1456         }
1457 
1458         list_del_init(&req->tl_requests);
1459         drbd_send_and_submit(device, req);
1460     }
1461     blk_finish_plug(&plug);
1462 }
1463 
1464 static bool prepare_al_transaction_nonblock(struct drbd_device *device,
1465                         struct list_head *incoming,
1466                         struct list_head *pending,
1467                         struct list_head *later)
1468 {
1469     struct drbd_request *req;
1470     int wake = 0;
1471     int err;
1472 
1473     spin_lock_irq(&device->al_lock);
1474     while ((req = list_first_entry_or_null(incoming, struct drbd_request, tl_requests))) {
1475         err = drbd_al_begin_io_nonblock(device, &req->i);
1476         if (err == -ENOBUFS)
1477             break;
1478         if (err == -EBUSY)
1479             wake = 1;
1480         if (err)
1481             list_move_tail(&req->tl_requests, later);
1482         else
1483             list_move_tail(&req->tl_requests, pending);
1484     }
1485     spin_unlock_irq(&device->al_lock);
1486     if (wake)
1487         wake_up(&device->al_wait);
1488     return !list_empty(pending);
1489 }
1490 
1491 static void send_and_submit_pending(struct drbd_device *device, struct list_head *pending)
1492 {
1493     struct blk_plug plug;
1494     struct drbd_request *req;
1495 
1496     blk_start_plug(&plug);
1497     while ((req = list_first_entry_or_null(pending, struct drbd_request, tl_requests))) {
1498         req->rq_state |= RQ_IN_ACT_LOG;
1499         req->in_actlog_jif = jiffies;
1500         atomic_dec(&device->ap_actlog_cnt);
1501         list_del_init(&req->tl_requests);
1502         drbd_send_and_submit(device, req);
1503     }
1504     blk_finish_plug(&plug);
1505 }
1506 
1507 void do_submit(struct work_struct *ws)
1508 {
1509     struct drbd_device *device = container_of(ws, struct drbd_device, submit.worker);
1510     LIST_HEAD(incoming);    /* from drbd_make_request() */
1511     LIST_HEAD(pending); /* to be submitted after next AL-transaction commit */
1512     LIST_HEAD(busy);    /* blocked by resync requests */
1513 
1514     /* grab new incoming requests */
1515     spin_lock_irq(&device->resource->req_lock);
1516     list_splice_tail_init(&device->submit.writes, &incoming);
1517     spin_unlock_irq(&device->resource->req_lock);
1518 
1519     for (;;) {
1520         DEFINE_WAIT(wait);
1521 
1522         /* move used-to-be-busy back to front of incoming */
1523         list_splice_init(&busy, &incoming);
1524         submit_fast_path(device, &incoming);
1525         if (list_empty(&incoming))
1526             break;
1527 
1528         for (;;) {
1529             prepare_to_wait(&device->al_wait, &wait, TASK_UNINTERRUPTIBLE);
1530 
1531             list_splice_init(&busy, &incoming);
1532             prepare_al_transaction_nonblock(device, &incoming, &pending, &busy);
1533             if (!list_empty(&pending))
1534                 break;
1535 
1536             schedule();
1537 
1538             /* If all currently "hot" activity log extents are kept busy by
1539              * incoming requests, we still must not totally starve new
1540              * requests to "cold" extents.
1541              * Something left on &incoming means there had not been
1542              * enough update slots available, and the activity log
1543              * has been marked as "starving".
1544              *
1545              * Try again now, without looking for new requests,
1546              * effectively blocking all new requests until we made
1547              * at least _some_ progress with what we currently have.
1548              */
1549             if (!list_empty(&incoming))
1550                 continue;
1551 
1552             /* Nothing moved to pending, but nothing left
1553              * on incoming: all moved to busy!
1554              * Grab new and iterate. */
1555             spin_lock_irq(&device->resource->req_lock);
1556             list_splice_tail_init(&device->submit.writes, &incoming);
1557             spin_unlock_irq(&device->resource->req_lock);
1558         }
1559         finish_wait(&device->al_wait, &wait);
1560 
1561         /* If the transaction was full, before all incoming requests
1562          * had been processed, skip ahead to commit, and iterate
1563          * without splicing in more incoming requests from upper layers.
1564          *
1565          * Else, if all incoming have been processed,
1566          * they have become either "pending" (to be submitted after
1567          * next transaction commit) or "busy" (blocked by resync).
1568          *
1569          * Maybe more was queued, while we prepared the transaction?
1570          * Try to stuff those into this transaction as well.
1571          * Be strictly non-blocking here,
1572          * we already have something to commit.
1573          *
1574          * Commit if we don't make any more progres.
1575          */
1576 
1577         while (list_empty(&incoming)) {
1578             LIST_HEAD(more_pending);
1579             LIST_HEAD(more_incoming);
1580             bool made_progress;
1581 
1582             /* It is ok to look outside the lock,
1583              * it's only an optimization anyways */
1584             if (list_empty(&device->submit.writes))
1585                 break;
1586 
1587             spin_lock_irq(&device->resource->req_lock);
1588             list_splice_tail_init(&device->submit.writes, &more_incoming);
1589             spin_unlock_irq(&device->resource->req_lock);
1590 
1591             if (list_empty(&more_incoming))
1592                 break;
1593 
1594             made_progress = prepare_al_transaction_nonblock(device, &more_incoming, &more_pending, &busy);
1595 
1596             list_splice_tail_init(&more_pending, &pending);
1597             list_splice_tail_init(&more_incoming, &incoming);
1598             if (!made_progress)
1599                 break;
1600         }
1601 
1602         drbd_al_begin_io_commit(device);
1603         send_and_submit_pending(device, &pending);
1604     }
1605 }
1606 
1607 void drbd_submit_bio(struct bio *bio)
1608 {
1609     struct drbd_device *device = bio->bi_bdev->bd_disk->private_data;
1610 
1611     bio = bio_split_to_limits(bio);
1612 
1613     /*
1614      * what we "blindly" assume:
1615      */
1616     D_ASSERT(device, IS_ALIGNED(bio->bi_iter.bi_size, 512));
1617 
1618     inc_ap_bio(device);
1619     __drbd_make_request(device, bio);
1620 }
1621 
1622 static bool net_timeout_reached(struct drbd_request *net_req,
1623         struct drbd_connection *connection,
1624         unsigned long now, unsigned long ent,
1625         unsigned int ko_count, unsigned int timeout)
1626 {
1627     struct drbd_device *device = net_req->device;
1628 
1629     if (!time_after(now, net_req->pre_send_jif + ent))
1630         return false;
1631 
1632     if (time_in_range(now, connection->last_reconnect_jif, connection->last_reconnect_jif + ent))
1633         return false;
1634 
1635     if (net_req->rq_state & RQ_NET_PENDING) {
1636         drbd_warn(device, "Remote failed to finish a request within %ums > ko-count (%u) * timeout (%u * 0.1s)\n",
1637             jiffies_to_msecs(now - net_req->pre_send_jif), ko_count, timeout);
1638         return true;
1639     }
1640 
1641     /* We received an ACK already (or are using protocol A),
1642      * but are waiting for the epoch closing barrier ack.
1643      * Check if we sent the barrier already.  We should not blame the peer
1644      * for being unresponsive, if we did not even ask it yet. */
1645     if (net_req->epoch == connection->send.current_epoch_nr) {
1646         drbd_warn(device,
1647             "We did not send a P_BARRIER for %ums > ko-count (%u) * timeout (%u * 0.1s); drbd kernel thread blocked?\n",
1648             jiffies_to_msecs(now - net_req->pre_send_jif), ko_count, timeout);
1649         return false;
1650     }
1651 
1652     /* Worst case: we may have been blocked for whatever reason, then
1653      * suddenly are able to send a lot of requests (and epoch separating
1654      * barriers) in quick succession.
1655      * The timestamp of the net_req may be much too old and not correspond
1656      * to the sending time of the relevant unack'ed barrier packet, so
1657      * would trigger a spurious timeout.  The latest barrier packet may
1658      * have a too recent timestamp to trigger the timeout, potentially miss
1659      * a timeout.  Right now we don't have a place to conveniently store
1660      * these timestamps.
1661      * But in this particular situation, the application requests are still
1662      * completed to upper layers, DRBD should still "feel" responsive.
1663      * No need yet to kill this connection, it may still recover.
1664      * If not, eventually we will have queued enough into the network for
1665      * us to block. From that point of view, the timestamp of the last sent
1666      * barrier packet is relevant enough.
1667      */
1668     if (time_after(now, connection->send.last_sent_barrier_jif + ent)) {
1669         drbd_warn(device, "Remote failed to answer a P_BARRIER (sent at %lu jif; now=%lu jif) within %ums > ko-count (%u) * timeout (%u * 0.1s)\n",
1670             connection->send.last_sent_barrier_jif, now,
1671             jiffies_to_msecs(now - connection->send.last_sent_barrier_jif), ko_count, timeout);
1672         return true;
1673     }
1674     return false;
1675 }
1676 
1677 /* A request is considered timed out, if
1678  * - we have some effective timeout from the configuration,
1679  *   with some state restrictions applied,
1680  * - the oldest request is waiting for a response from the network
1681  *   resp. the local disk,
1682  * - the oldest request is in fact older than the effective timeout,
1683  * - the connection was established (resp. disk was attached)
1684  *   for longer than the timeout already.
1685  * Note that for 32bit jiffies and very stable connections/disks,
1686  * we may have a wrap around, which is catched by
1687  *   !time_in_range(now, last_..._jif, last_..._jif + timeout).
1688  *
1689  * Side effect: once per 32bit wrap-around interval, which means every
1690  * ~198 days with 250 HZ, we have a window where the timeout would need
1691  * to expire twice (worst case) to become effective. Good enough.
1692  */
1693 
1694 void request_timer_fn(struct timer_list *t)
1695 {
1696     struct drbd_device *device = from_timer(device, t, request_timer);
1697     struct drbd_connection *connection = first_peer_device(device)->connection;
1698     struct drbd_request *req_read, *req_write, *req_peer; /* oldest request */
1699     struct net_conf *nc;
1700     unsigned long oldest_submit_jif;
1701     unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */
1702     unsigned long now;
1703     unsigned int ko_count = 0, timeout = 0;
1704 
1705     rcu_read_lock();
1706     nc = rcu_dereference(connection->net_conf);
1707     if (nc && device->state.conn >= C_WF_REPORT_PARAMS) {
1708         ko_count = nc->ko_count;
1709         timeout = nc->timeout;
1710     }
1711 
1712     if (get_ldev(device)) { /* implicit state.disk >= D_INCONSISTENT */
1713         dt = rcu_dereference(device->ldev->disk_conf)->disk_timeout * HZ / 10;
1714         put_ldev(device);
1715     }
1716     rcu_read_unlock();
1717 
1718 
1719     ent = timeout * HZ/10 * ko_count;
1720     et = min_not_zero(dt, ent);
1721 
1722     if (!et)
1723         return; /* Recurring timer stopped */
1724 
1725     now = jiffies;
1726     nt = now + et;
1727 
1728     spin_lock_irq(&device->resource->req_lock);
1729     req_read = list_first_entry_or_null(&device->pending_completion[0], struct drbd_request, req_pending_local);
1730     req_write = list_first_entry_or_null(&device->pending_completion[1], struct drbd_request, req_pending_local);
1731 
1732     /* maybe the oldest request waiting for the peer is in fact still
1733      * blocking in tcp sendmsg.  That's ok, though, that's handled via the
1734      * socket send timeout, requesting a ping, and bumping ko-count in
1735      * we_should_drop_the_connection().
1736      */
1737 
1738     /* check the oldest request we did successfully sent,
1739      * but which is still waiting for an ACK. */
1740     req_peer = connection->req_ack_pending;
1741 
1742     /* if we don't have such request (e.g. protocoll A)
1743      * check the oldest requests which is still waiting on its epoch
1744      * closing barrier ack. */
1745     if (!req_peer)
1746         req_peer = connection->req_not_net_done;
1747 
1748     /* evaluate the oldest peer request only in one timer! */
1749     if (req_peer && req_peer->device != device)
1750         req_peer = NULL;
1751 
1752     /* do we have something to evaluate? */
1753     if (req_peer == NULL && req_write == NULL && req_read == NULL)
1754         goto out;
1755 
1756     oldest_submit_jif =
1757         (req_write && req_read)
1758         ? ( time_before(req_write->pre_submit_jif, req_read->pre_submit_jif)
1759           ? req_write->pre_submit_jif : req_read->pre_submit_jif )
1760         : req_write ? req_write->pre_submit_jif
1761         : req_read ? req_read->pre_submit_jif : now;
1762 
1763     if (ent && req_peer && net_timeout_reached(req_peer, connection, now, ent, ko_count, timeout))
1764         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_VERBOSE | CS_HARD);
1765 
1766     if (dt && oldest_submit_jif != now &&
1767          time_after(now, oldest_submit_jif + dt) &&
1768         !time_in_range(now, device->last_reattach_jif, device->last_reattach_jif + dt)) {
1769         drbd_warn(device, "Local backing device failed to meet the disk-timeout\n");
1770         __drbd_chk_io_error(device, DRBD_FORCE_DETACH);
1771     }
1772 
1773     /* Reschedule timer for the nearest not already expired timeout.
1774      * Fallback to now + min(effective network timeout, disk timeout). */
1775     ent = (ent && req_peer && time_before(now, req_peer->pre_send_jif + ent))
1776         ? req_peer->pre_send_jif + ent : now + et;
1777     dt = (dt && oldest_submit_jif != now && time_before(now, oldest_submit_jif + dt))
1778         ? oldest_submit_jif + dt : now + et;
1779     nt = time_before(ent, dt) ? ent : dt;
1780 out:
1781     spin_unlock_irq(&device->resource->req_lock);
1782     mod_timer(&device->request_timer, nt);
1783 }