Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-or-later
0002 /*
0003    drbd_receiver.c
0004 
0005    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
0006 
0007    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
0008    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
0009    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
0010 
0011  */
0012 
0013 
0014 #include <linux/module.h>
0015 
0016 #include <linux/uaccess.h>
0017 #include <net/sock.h>
0018 
0019 #include <linux/drbd.h>
0020 #include <linux/fs.h>
0021 #include <linux/file.h>
0022 #include <linux/in.h>
0023 #include <linux/mm.h>
0024 #include <linux/memcontrol.h>
0025 #include <linux/mm_inline.h>
0026 #include <linux/slab.h>
0027 #include <uapi/linux/sched/types.h>
0028 #include <linux/sched/signal.h>
0029 #include <linux/pkt_sched.h>
0030 #define __KERNEL_SYSCALLS__
0031 #include <linux/unistd.h>
0032 #include <linux/vmalloc.h>
0033 #include <linux/random.h>
0034 #include <linux/string.h>
0035 #include <linux/scatterlist.h>
0036 #include <linux/part_stat.h>
0037 #include "drbd_int.h"
0038 #include "drbd_protocol.h"
0039 #include "drbd_req.h"
0040 #include "drbd_vli.h"
0041 
0042 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES)
0043 
0044 struct packet_info {
0045     enum drbd_packet cmd;
0046     unsigned int size;
0047     unsigned int vnr;
0048     void *data;
0049 };
0050 
0051 enum finish_epoch {
0052     FE_STILL_LIVE,
0053     FE_DESTROYED,
0054     FE_RECYCLED,
0055 };
0056 
0057 static int drbd_do_features(struct drbd_connection *connection);
0058 static int drbd_do_auth(struct drbd_connection *connection);
0059 static int drbd_disconnected(struct drbd_peer_device *);
0060 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
0061 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
0062 static int e_end_block(struct drbd_work *, int);
0063 
0064 
0065 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
0066 
0067 /*
0068  * some helper functions to deal with single linked page lists,
0069  * page->private being our "next" pointer.
0070  */
0071 
0072 /* If at least n pages are linked at head, get n pages off.
0073  * Otherwise, don't modify head, and return NULL.
0074  * Locking is the responsibility of the caller.
0075  */
0076 static struct page *page_chain_del(struct page **head, int n)
0077 {
0078     struct page *page;
0079     struct page *tmp;
0080 
0081     BUG_ON(!n);
0082     BUG_ON(!head);
0083 
0084     page = *head;
0085 
0086     if (!page)
0087         return NULL;
0088 
0089     while (page) {
0090         tmp = page_chain_next(page);
0091         if (--n == 0)
0092             break; /* found sufficient pages */
0093         if (tmp == NULL)
0094             /* insufficient pages, don't use any of them. */
0095             return NULL;
0096         page = tmp;
0097     }
0098 
0099     /* add end of list marker for the returned list */
0100     set_page_private(page, 0);
0101     /* actual return value, and adjustment of head */
0102     page = *head;
0103     *head = tmp;
0104     return page;
0105 }
0106 
0107 /* may be used outside of locks to find the tail of a (usually short)
0108  * "private" page chain, before adding it back to a global chain head
0109  * with page_chain_add() under a spinlock. */
0110 static struct page *page_chain_tail(struct page *page, int *len)
0111 {
0112     struct page *tmp;
0113     int i = 1;
0114     while ((tmp = page_chain_next(page))) {
0115         ++i;
0116         page = tmp;
0117     }
0118     if (len)
0119         *len = i;
0120     return page;
0121 }
0122 
0123 static int page_chain_free(struct page *page)
0124 {
0125     struct page *tmp;
0126     int i = 0;
0127     page_chain_for_each_safe(page, tmp) {
0128         put_page(page);
0129         ++i;
0130     }
0131     return i;
0132 }
0133 
0134 static void page_chain_add(struct page **head,
0135         struct page *chain_first, struct page *chain_last)
0136 {
0137 #if 1
0138     struct page *tmp;
0139     tmp = page_chain_tail(chain_first, NULL);
0140     BUG_ON(tmp != chain_last);
0141 #endif
0142 
0143     /* add chain to head */
0144     set_page_private(chain_last, (unsigned long)*head);
0145     *head = chain_first;
0146 }
0147 
0148 static struct page *__drbd_alloc_pages(struct drbd_device *device,
0149                        unsigned int number)
0150 {
0151     struct page *page = NULL;
0152     struct page *tmp = NULL;
0153     unsigned int i = 0;
0154 
0155     /* Yes, testing drbd_pp_vacant outside the lock is racy.
0156      * So what. It saves a spin_lock. */
0157     if (drbd_pp_vacant >= number) {
0158         spin_lock(&drbd_pp_lock);
0159         page = page_chain_del(&drbd_pp_pool, number);
0160         if (page)
0161             drbd_pp_vacant -= number;
0162         spin_unlock(&drbd_pp_lock);
0163         if (page)
0164             return page;
0165     }
0166 
0167     /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
0168      * "criss-cross" setup, that might cause write-out on some other DRBD,
0169      * which in turn might block on the other node at this very place.  */
0170     for (i = 0; i < number; i++) {
0171         tmp = alloc_page(GFP_TRY);
0172         if (!tmp)
0173             break;
0174         set_page_private(tmp, (unsigned long)page);
0175         page = tmp;
0176     }
0177 
0178     if (i == number)
0179         return page;
0180 
0181     /* Not enough pages immediately available this time.
0182      * No need to jump around here, drbd_alloc_pages will retry this
0183      * function "soon". */
0184     if (page) {
0185         tmp = page_chain_tail(page, NULL);
0186         spin_lock(&drbd_pp_lock);
0187         page_chain_add(&drbd_pp_pool, page, tmp);
0188         drbd_pp_vacant += i;
0189         spin_unlock(&drbd_pp_lock);
0190     }
0191     return NULL;
0192 }
0193 
0194 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
0195                        struct list_head *to_be_freed)
0196 {
0197     struct drbd_peer_request *peer_req, *tmp;
0198 
0199     /* The EEs are always appended to the end of the list. Since
0200        they are sent in order over the wire, they have to finish
0201        in order. As soon as we see the first not finished we can
0202        stop to examine the list... */
0203 
0204     list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
0205         if (drbd_peer_req_has_active_page(peer_req))
0206             break;
0207         list_move(&peer_req->w.list, to_be_freed);
0208     }
0209 }
0210 
0211 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
0212 {
0213     LIST_HEAD(reclaimed);
0214     struct drbd_peer_request *peer_req, *t;
0215 
0216     spin_lock_irq(&device->resource->req_lock);
0217     reclaim_finished_net_peer_reqs(device, &reclaimed);
0218     spin_unlock_irq(&device->resource->req_lock);
0219     list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
0220         drbd_free_net_peer_req(device, peer_req);
0221 }
0222 
0223 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
0224 {
0225     struct drbd_peer_device *peer_device;
0226     int vnr;
0227 
0228     rcu_read_lock();
0229     idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
0230         struct drbd_device *device = peer_device->device;
0231         if (!atomic_read(&device->pp_in_use_by_net))
0232             continue;
0233 
0234         kref_get(&device->kref);
0235         rcu_read_unlock();
0236         drbd_reclaim_net_peer_reqs(device);
0237         kref_put(&device->kref, drbd_destroy_device);
0238         rcu_read_lock();
0239     }
0240     rcu_read_unlock();
0241 }
0242 
0243 /**
0244  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
0245  * @peer_device:    DRBD device.
0246  * @number:     number of pages requested
0247  * @retry:      whether to retry, if not enough pages are available right now
0248  *
0249  * Tries to allocate number pages, first from our own page pool, then from
0250  * the kernel.
0251  * Possibly retry until DRBD frees sufficient pages somewhere else.
0252  *
0253  * If this allocation would exceed the max_buffers setting, we throttle
0254  * allocation (schedule_timeout) to give the system some room to breathe.
0255  *
0256  * We do not use max-buffers as hard limit, because it could lead to
0257  * congestion and further to a distributed deadlock during online-verify or
0258  * (checksum based) resync, if the max-buffers, socket buffer sizes and
0259  * resync-rate settings are mis-configured.
0260  *
0261  * Returns a page chain linked via page->private.
0262  */
0263 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
0264                   bool retry)
0265 {
0266     struct drbd_device *device = peer_device->device;
0267     struct page *page = NULL;
0268     struct net_conf *nc;
0269     DEFINE_WAIT(wait);
0270     unsigned int mxb;
0271 
0272     rcu_read_lock();
0273     nc = rcu_dereference(peer_device->connection->net_conf);
0274     mxb = nc ? nc->max_buffers : 1000000;
0275     rcu_read_unlock();
0276 
0277     if (atomic_read(&device->pp_in_use) < mxb)
0278         page = __drbd_alloc_pages(device, number);
0279 
0280     /* Try to keep the fast path fast, but occasionally we need
0281      * to reclaim the pages we lended to the network stack. */
0282     if (page && atomic_read(&device->pp_in_use_by_net) > 512)
0283         drbd_reclaim_net_peer_reqs(device);
0284 
0285     while (page == NULL) {
0286         prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
0287 
0288         drbd_reclaim_net_peer_reqs(device);
0289 
0290         if (atomic_read(&device->pp_in_use) < mxb) {
0291             page = __drbd_alloc_pages(device, number);
0292             if (page)
0293                 break;
0294         }
0295 
0296         if (!retry)
0297             break;
0298 
0299         if (signal_pending(current)) {
0300             drbd_warn(device, "drbd_alloc_pages interrupted!\n");
0301             break;
0302         }
0303 
0304         if (schedule_timeout(HZ/10) == 0)
0305             mxb = UINT_MAX;
0306     }
0307     finish_wait(&drbd_pp_wait, &wait);
0308 
0309     if (page)
0310         atomic_add(number, &device->pp_in_use);
0311     return page;
0312 }
0313 
0314 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
0315  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
0316  * Either links the page chain back to the global pool,
0317  * or returns all pages to the system. */
0318 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
0319 {
0320     atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
0321     int i;
0322 
0323     if (page == NULL)
0324         return;
0325 
0326     if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count)
0327         i = page_chain_free(page);
0328     else {
0329         struct page *tmp;
0330         tmp = page_chain_tail(page, &i);
0331         spin_lock(&drbd_pp_lock);
0332         page_chain_add(&drbd_pp_pool, page, tmp);
0333         drbd_pp_vacant += i;
0334         spin_unlock(&drbd_pp_lock);
0335     }
0336     i = atomic_sub_return(i, a);
0337     if (i < 0)
0338         drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
0339             is_net ? "pp_in_use_by_net" : "pp_in_use", i);
0340     wake_up(&drbd_pp_wait);
0341 }
0342 
0343 /*
0344 You need to hold the req_lock:
0345  _drbd_wait_ee_list_empty()
0346 
0347 You must not have the req_lock:
0348  drbd_free_peer_req()
0349  drbd_alloc_peer_req()
0350  drbd_free_peer_reqs()
0351  drbd_ee_fix_bhs()
0352  drbd_finish_peer_reqs()
0353  drbd_clear_done_ee()
0354  drbd_wait_ee_list_empty()
0355 */
0356 
0357 /* normal: payload_size == request size (bi_size)
0358  * w_same: payload_size == logical_block_size
0359  * trim: payload_size == 0 */
0360 struct drbd_peer_request *
0361 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
0362             unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
0363 {
0364     struct drbd_device *device = peer_device->device;
0365     struct drbd_peer_request *peer_req;
0366     struct page *page = NULL;
0367     unsigned int nr_pages = PFN_UP(payload_size);
0368 
0369     if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
0370         return NULL;
0371 
0372     peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
0373     if (!peer_req) {
0374         if (!(gfp_mask & __GFP_NOWARN))
0375             drbd_err(device, "%s: allocation failed\n", __func__);
0376         return NULL;
0377     }
0378 
0379     if (nr_pages) {
0380         page = drbd_alloc_pages(peer_device, nr_pages,
0381                     gfpflags_allow_blocking(gfp_mask));
0382         if (!page)
0383             goto fail;
0384     }
0385 
0386     memset(peer_req, 0, sizeof(*peer_req));
0387     INIT_LIST_HEAD(&peer_req->w.list);
0388     drbd_clear_interval(&peer_req->i);
0389     peer_req->i.size = request_size;
0390     peer_req->i.sector = sector;
0391     peer_req->submit_jif = jiffies;
0392     peer_req->peer_device = peer_device;
0393     peer_req->pages = page;
0394     /*
0395      * The block_id is opaque to the receiver.  It is not endianness
0396      * converted, and sent back to the sender unchanged.
0397      */
0398     peer_req->block_id = id;
0399 
0400     return peer_req;
0401 
0402  fail:
0403     mempool_free(peer_req, &drbd_ee_mempool);
0404     return NULL;
0405 }
0406 
0407 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
0408                int is_net)
0409 {
0410     might_sleep();
0411     if (peer_req->flags & EE_HAS_DIGEST)
0412         kfree(peer_req->digest);
0413     drbd_free_pages(device, peer_req->pages, is_net);
0414     D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
0415     D_ASSERT(device, drbd_interval_empty(&peer_req->i));
0416     if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
0417         peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
0418         drbd_al_complete_io(device, &peer_req->i);
0419     }
0420     mempool_free(peer_req, &drbd_ee_mempool);
0421 }
0422 
0423 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
0424 {
0425     LIST_HEAD(work_list);
0426     struct drbd_peer_request *peer_req, *t;
0427     int count = 0;
0428     int is_net = list == &device->net_ee;
0429 
0430     spin_lock_irq(&device->resource->req_lock);
0431     list_splice_init(list, &work_list);
0432     spin_unlock_irq(&device->resource->req_lock);
0433 
0434     list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
0435         __drbd_free_peer_req(device, peer_req, is_net);
0436         count++;
0437     }
0438     return count;
0439 }
0440 
0441 /*
0442  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
0443  */
0444 static int drbd_finish_peer_reqs(struct drbd_device *device)
0445 {
0446     LIST_HEAD(work_list);
0447     LIST_HEAD(reclaimed);
0448     struct drbd_peer_request *peer_req, *t;
0449     int err = 0;
0450 
0451     spin_lock_irq(&device->resource->req_lock);
0452     reclaim_finished_net_peer_reqs(device, &reclaimed);
0453     list_splice_init(&device->done_ee, &work_list);
0454     spin_unlock_irq(&device->resource->req_lock);
0455 
0456     list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
0457         drbd_free_net_peer_req(device, peer_req);
0458 
0459     /* possible callbacks here:
0460      * e_end_block, and e_end_resync_block, e_send_superseded.
0461      * all ignore the last argument.
0462      */
0463     list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
0464         int err2;
0465 
0466         /* list_del not necessary, next/prev members not touched */
0467         err2 = peer_req->w.cb(&peer_req->w, !!err);
0468         if (!err)
0469             err = err2;
0470         drbd_free_peer_req(device, peer_req);
0471     }
0472     wake_up(&device->ee_wait);
0473 
0474     return err;
0475 }
0476 
0477 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
0478                      struct list_head *head)
0479 {
0480     DEFINE_WAIT(wait);
0481 
0482     /* avoids spin_lock/unlock
0483      * and calling prepare_to_wait in the fast path */
0484     while (!list_empty(head)) {
0485         prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
0486         spin_unlock_irq(&device->resource->req_lock);
0487         io_schedule();
0488         finish_wait(&device->ee_wait, &wait);
0489         spin_lock_irq(&device->resource->req_lock);
0490     }
0491 }
0492 
0493 static void drbd_wait_ee_list_empty(struct drbd_device *device,
0494                     struct list_head *head)
0495 {
0496     spin_lock_irq(&device->resource->req_lock);
0497     _drbd_wait_ee_list_empty(device, head);
0498     spin_unlock_irq(&device->resource->req_lock);
0499 }
0500 
0501 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
0502 {
0503     struct kvec iov = {
0504         .iov_base = buf,
0505         .iov_len = size,
0506     };
0507     struct msghdr msg = {
0508         .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
0509     };
0510     iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, size);
0511     return sock_recvmsg(sock, &msg, msg.msg_flags);
0512 }
0513 
0514 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
0515 {
0516     int rv;
0517 
0518     rv = drbd_recv_short(connection->data.socket, buf, size, 0);
0519 
0520     if (rv < 0) {
0521         if (rv == -ECONNRESET)
0522             drbd_info(connection, "sock was reset by peer\n");
0523         else if (rv != -ERESTARTSYS)
0524             drbd_err(connection, "sock_recvmsg returned %d\n", rv);
0525     } else if (rv == 0) {
0526         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
0527             long t;
0528             rcu_read_lock();
0529             t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
0530             rcu_read_unlock();
0531 
0532             t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
0533 
0534             if (t)
0535                 goto out;
0536         }
0537         drbd_info(connection, "sock was shut down by peer\n");
0538     }
0539 
0540     if (rv != size)
0541         conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
0542 
0543 out:
0544     return rv;
0545 }
0546 
0547 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
0548 {
0549     int err;
0550 
0551     err = drbd_recv(connection, buf, size);
0552     if (err != size) {
0553         if (err >= 0)
0554             err = -EIO;
0555     } else
0556         err = 0;
0557     return err;
0558 }
0559 
0560 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
0561 {
0562     int err;
0563 
0564     err = drbd_recv_all(connection, buf, size);
0565     if (err && !signal_pending(current))
0566         drbd_warn(connection, "short read (expected size %d)\n", (int)size);
0567     return err;
0568 }
0569 
0570 /* quoting tcp(7):
0571  *   On individual connections, the socket buffer size must be set prior to the
0572  *   listen(2) or connect(2) calls in order to have it take effect.
0573  * This is our wrapper to do so.
0574  */
0575 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
0576         unsigned int rcv)
0577 {
0578     /* open coded SO_SNDBUF, SO_RCVBUF */
0579     if (snd) {
0580         sock->sk->sk_sndbuf = snd;
0581         sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
0582     }
0583     if (rcv) {
0584         sock->sk->sk_rcvbuf = rcv;
0585         sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
0586     }
0587 }
0588 
0589 static struct socket *drbd_try_connect(struct drbd_connection *connection)
0590 {
0591     const char *what;
0592     struct socket *sock;
0593     struct sockaddr_in6 src_in6;
0594     struct sockaddr_in6 peer_in6;
0595     struct net_conf *nc;
0596     int err, peer_addr_len, my_addr_len;
0597     int sndbuf_size, rcvbuf_size, connect_int;
0598     int disconnect_on_error = 1;
0599 
0600     rcu_read_lock();
0601     nc = rcu_dereference(connection->net_conf);
0602     if (!nc) {
0603         rcu_read_unlock();
0604         return NULL;
0605     }
0606     sndbuf_size = nc->sndbuf_size;
0607     rcvbuf_size = nc->rcvbuf_size;
0608     connect_int = nc->connect_int;
0609     rcu_read_unlock();
0610 
0611     my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
0612     memcpy(&src_in6, &connection->my_addr, my_addr_len);
0613 
0614     if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
0615         src_in6.sin6_port = 0;
0616     else
0617         ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
0618 
0619     peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
0620     memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
0621 
0622     what = "sock_create_kern";
0623     err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
0624                    SOCK_STREAM, IPPROTO_TCP, &sock);
0625     if (err < 0) {
0626         sock = NULL;
0627         goto out;
0628     }
0629 
0630     sock->sk->sk_rcvtimeo =
0631     sock->sk->sk_sndtimeo = connect_int * HZ;
0632     drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
0633 
0634        /* explicitly bind to the configured IP as source IP
0635     *  for the outgoing connections.
0636     *  This is needed for multihomed hosts and to be
0637     *  able to use lo: interfaces for drbd.
0638     * Make sure to use 0 as port number, so linux selects
0639     *  a free one dynamically.
0640     */
0641     what = "bind before connect";
0642     err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
0643     if (err < 0)
0644         goto out;
0645 
0646     /* connect may fail, peer not yet available.
0647      * stay C_WF_CONNECTION, don't go Disconnecting! */
0648     disconnect_on_error = 0;
0649     what = "connect";
0650     err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
0651 
0652 out:
0653     if (err < 0) {
0654         if (sock) {
0655             sock_release(sock);
0656             sock = NULL;
0657         }
0658         switch (-err) {
0659             /* timeout, busy, signal pending */
0660         case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
0661         case EINTR: case ERESTARTSYS:
0662             /* peer not (yet) available, network problem */
0663         case ECONNREFUSED: case ENETUNREACH:
0664         case EHOSTDOWN:    case EHOSTUNREACH:
0665             disconnect_on_error = 0;
0666             break;
0667         default:
0668             drbd_err(connection, "%s failed, err = %d\n", what, err);
0669         }
0670         if (disconnect_on_error)
0671             conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
0672     }
0673 
0674     return sock;
0675 }
0676 
0677 struct accept_wait_data {
0678     struct drbd_connection *connection;
0679     struct socket *s_listen;
0680     struct completion door_bell;
0681     void (*original_sk_state_change)(struct sock *sk);
0682 
0683 };
0684 
0685 static void drbd_incoming_connection(struct sock *sk)
0686 {
0687     struct accept_wait_data *ad = sk->sk_user_data;
0688     void (*state_change)(struct sock *sk);
0689 
0690     state_change = ad->original_sk_state_change;
0691     if (sk->sk_state == TCP_ESTABLISHED)
0692         complete(&ad->door_bell);
0693     state_change(sk);
0694 }
0695 
0696 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
0697 {
0698     int err, sndbuf_size, rcvbuf_size, my_addr_len;
0699     struct sockaddr_in6 my_addr;
0700     struct socket *s_listen;
0701     struct net_conf *nc;
0702     const char *what;
0703 
0704     rcu_read_lock();
0705     nc = rcu_dereference(connection->net_conf);
0706     if (!nc) {
0707         rcu_read_unlock();
0708         return -EIO;
0709     }
0710     sndbuf_size = nc->sndbuf_size;
0711     rcvbuf_size = nc->rcvbuf_size;
0712     rcu_read_unlock();
0713 
0714     my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
0715     memcpy(&my_addr, &connection->my_addr, my_addr_len);
0716 
0717     what = "sock_create_kern";
0718     err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
0719                    SOCK_STREAM, IPPROTO_TCP, &s_listen);
0720     if (err) {
0721         s_listen = NULL;
0722         goto out;
0723     }
0724 
0725     s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
0726     drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
0727 
0728     what = "bind before listen";
0729     err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
0730     if (err < 0)
0731         goto out;
0732 
0733     ad->s_listen = s_listen;
0734     write_lock_bh(&s_listen->sk->sk_callback_lock);
0735     ad->original_sk_state_change = s_listen->sk->sk_state_change;
0736     s_listen->sk->sk_state_change = drbd_incoming_connection;
0737     s_listen->sk->sk_user_data = ad;
0738     write_unlock_bh(&s_listen->sk->sk_callback_lock);
0739 
0740     what = "listen";
0741     err = s_listen->ops->listen(s_listen, 5);
0742     if (err < 0)
0743         goto out;
0744 
0745     return 0;
0746 out:
0747     if (s_listen)
0748         sock_release(s_listen);
0749     if (err < 0) {
0750         if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
0751             drbd_err(connection, "%s failed, err = %d\n", what, err);
0752             conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
0753         }
0754     }
0755 
0756     return -EIO;
0757 }
0758 
0759 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
0760 {
0761     write_lock_bh(&sk->sk_callback_lock);
0762     sk->sk_state_change = ad->original_sk_state_change;
0763     sk->sk_user_data = NULL;
0764     write_unlock_bh(&sk->sk_callback_lock);
0765 }
0766 
0767 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
0768 {
0769     int timeo, connect_int, err = 0;
0770     struct socket *s_estab = NULL;
0771     struct net_conf *nc;
0772 
0773     rcu_read_lock();
0774     nc = rcu_dereference(connection->net_conf);
0775     if (!nc) {
0776         rcu_read_unlock();
0777         return NULL;
0778     }
0779     connect_int = nc->connect_int;
0780     rcu_read_unlock();
0781 
0782     timeo = connect_int * HZ;
0783     /* 28.5% random jitter */
0784     timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
0785 
0786     err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
0787     if (err <= 0)
0788         return NULL;
0789 
0790     err = kernel_accept(ad->s_listen, &s_estab, 0);
0791     if (err < 0) {
0792         if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
0793             drbd_err(connection, "accept failed, err = %d\n", err);
0794             conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
0795         }
0796     }
0797 
0798     if (s_estab)
0799         unregister_state_change(s_estab->sk, ad);
0800 
0801     return s_estab;
0802 }
0803 
0804 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
0805 
0806 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
0807                  enum drbd_packet cmd)
0808 {
0809     if (!conn_prepare_command(connection, sock))
0810         return -EIO;
0811     return conn_send_command(connection, sock, cmd, 0, NULL, 0);
0812 }
0813 
0814 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
0815 {
0816     unsigned int header_size = drbd_header_size(connection);
0817     struct packet_info pi;
0818     struct net_conf *nc;
0819     int err;
0820 
0821     rcu_read_lock();
0822     nc = rcu_dereference(connection->net_conf);
0823     if (!nc) {
0824         rcu_read_unlock();
0825         return -EIO;
0826     }
0827     sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
0828     rcu_read_unlock();
0829 
0830     err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
0831     if (err != header_size) {
0832         if (err >= 0)
0833             err = -EIO;
0834         return err;
0835     }
0836     err = decode_header(connection, connection->data.rbuf, &pi);
0837     if (err)
0838         return err;
0839     return pi.cmd;
0840 }
0841 
0842 /**
0843  * drbd_socket_okay() - Free the socket if its connection is not okay
0844  * @sock:   pointer to the pointer to the socket.
0845  */
0846 static bool drbd_socket_okay(struct socket **sock)
0847 {
0848     int rr;
0849     char tb[4];
0850 
0851     if (!*sock)
0852         return false;
0853 
0854     rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
0855 
0856     if (rr > 0 || rr == -EAGAIN) {
0857         return true;
0858     } else {
0859         sock_release(*sock);
0860         *sock = NULL;
0861         return false;
0862     }
0863 }
0864 
0865 static bool connection_established(struct drbd_connection *connection,
0866                    struct socket **sock1,
0867                    struct socket **sock2)
0868 {
0869     struct net_conf *nc;
0870     int timeout;
0871     bool ok;
0872 
0873     if (!*sock1 || !*sock2)
0874         return false;
0875 
0876     rcu_read_lock();
0877     nc = rcu_dereference(connection->net_conf);
0878     timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
0879     rcu_read_unlock();
0880     schedule_timeout_interruptible(timeout);
0881 
0882     ok = drbd_socket_okay(sock1);
0883     ok = drbd_socket_okay(sock2) && ok;
0884 
0885     return ok;
0886 }
0887 
0888 /* Gets called if a connection is established, or if a new minor gets created
0889    in a connection */
0890 int drbd_connected(struct drbd_peer_device *peer_device)
0891 {
0892     struct drbd_device *device = peer_device->device;
0893     int err;
0894 
0895     atomic_set(&device->packet_seq, 0);
0896     device->peer_seq = 0;
0897 
0898     device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
0899         &peer_device->connection->cstate_mutex :
0900         &device->own_state_mutex;
0901 
0902     err = drbd_send_sync_param(peer_device);
0903     if (!err)
0904         err = drbd_send_sizes(peer_device, 0, 0);
0905     if (!err)
0906         err = drbd_send_uuids(peer_device);
0907     if (!err)
0908         err = drbd_send_current_state(peer_device);
0909     clear_bit(USE_DEGR_WFC_T, &device->flags);
0910     clear_bit(RESIZE_PENDING, &device->flags);
0911     atomic_set(&device->ap_in_flight, 0);
0912     mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
0913     return err;
0914 }
0915 
0916 /*
0917  * return values:
0918  *   1 yes, we have a valid connection
0919  *   0 oops, did not work out, please try again
0920  *  -1 peer talks different language,
0921  *     no point in trying again, please go standalone.
0922  *  -2 We do not have a network config...
0923  */
0924 static int conn_connect(struct drbd_connection *connection)
0925 {
0926     struct drbd_socket sock, msock;
0927     struct drbd_peer_device *peer_device;
0928     struct net_conf *nc;
0929     int vnr, timeout, h;
0930     bool discard_my_data, ok;
0931     enum drbd_state_rv rv;
0932     struct accept_wait_data ad = {
0933         .connection = connection,
0934         .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
0935     };
0936 
0937     clear_bit(DISCONNECT_SENT, &connection->flags);
0938     if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
0939         return -2;
0940 
0941     mutex_init(&sock.mutex);
0942     sock.sbuf = connection->data.sbuf;
0943     sock.rbuf = connection->data.rbuf;
0944     sock.socket = NULL;
0945     mutex_init(&msock.mutex);
0946     msock.sbuf = connection->meta.sbuf;
0947     msock.rbuf = connection->meta.rbuf;
0948     msock.socket = NULL;
0949 
0950     /* Assume that the peer only understands protocol 80 until we know better.  */
0951     connection->agreed_pro_version = 80;
0952 
0953     if (prepare_listen_socket(connection, &ad))
0954         return 0;
0955 
0956     do {
0957         struct socket *s;
0958 
0959         s = drbd_try_connect(connection);
0960         if (s) {
0961             if (!sock.socket) {
0962                 sock.socket = s;
0963                 send_first_packet(connection, &sock, P_INITIAL_DATA);
0964             } else if (!msock.socket) {
0965                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
0966                 msock.socket = s;
0967                 send_first_packet(connection, &msock, P_INITIAL_META);
0968             } else {
0969                 drbd_err(connection, "Logic error in conn_connect()\n");
0970                 goto out_release_sockets;
0971             }
0972         }
0973 
0974         if (connection_established(connection, &sock.socket, &msock.socket))
0975             break;
0976 
0977 retry:
0978         s = drbd_wait_for_connect(connection, &ad);
0979         if (s) {
0980             int fp = receive_first_packet(connection, s);
0981             drbd_socket_okay(&sock.socket);
0982             drbd_socket_okay(&msock.socket);
0983             switch (fp) {
0984             case P_INITIAL_DATA:
0985                 if (sock.socket) {
0986                     drbd_warn(connection, "initial packet S crossed\n");
0987                     sock_release(sock.socket);
0988                     sock.socket = s;
0989                     goto randomize;
0990                 }
0991                 sock.socket = s;
0992                 break;
0993             case P_INITIAL_META:
0994                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
0995                 if (msock.socket) {
0996                     drbd_warn(connection, "initial packet M crossed\n");
0997                     sock_release(msock.socket);
0998                     msock.socket = s;
0999                     goto randomize;
1000                 }
1001                 msock.socket = s;
1002                 break;
1003             default:
1004                 drbd_warn(connection, "Error receiving initial packet\n");
1005                 sock_release(s);
1006 randomize:
1007                 if (prandom_u32() & 1)
1008                     goto retry;
1009             }
1010         }
1011 
1012         if (connection->cstate <= C_DISCONNECTING)
1013             goto out_release_sockets;
1014         if (signal_pending(current)) {
1015             flush_signals(current);
1016             smp_rmb();
1017             if (get_t_state(&connection->receiver) == EXITING)
1018                 goto out_release_sockets;
1019         }
1020 
1021         ok = connection_established(connection, &sock.socket, &msock.socket);
1022     } while (!ok);
1023 
1024     if (ad.s_listen)
1025         sock_release(ad.s_listen);
1026 
1027     sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1028     msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1029 
1030     sock.socket->sk->sk_allocation = GFP_NOIO;
1031     msock.socket->sk->sk_allocation = GFP_NOIO;
1032 
1033     sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1034     msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1035 
1036     /* NOT YET ...
1037      * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1038      * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1039      * first set it to the P_CONNECTION_FEATURES timeout,
1040      * which we set to 4x the configured ping_timeout. */
1041     rcu_read_lock();
1042     nc = rcu_dereference(connection->net_conf);
1043 
1044     sock.socket->sk->sk_sndtimeo =
1045     sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1046 
1047     msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1048     timeout = nc->timeout * HZ / 10;
1049     discard_my_data = nc->discard_my_data;
1050     rcu_read_unlock();
1051 
1052     msock.socket->sk->sk_sndtimeo = timeout;
1053 
1054     /* we don't want delays.
1055      * we use TCP_CORK where appropriate, though */
1056     tcp_sock_set_nodelay(sock.socket->sk);
1057     tcp_sock_set_nodelay(msock.socket->sk);
1058 
1059     connection->data.socket = sock.socket;
1060     connection->meta.socket = msock.socket;
1061     connection->last_received = jiffies;
1062 
1063     h = drbd_do_features(connection);
1064     if (h <= 0)
1065         return h;
1066 
1067     if (connection->cram_hmac_tfm) {
1068         /* drbd_request_state(device, NS(conn, WFAuth)); */
1069         switch (drbd_do_auth(connection)) {
1070         case -1:
1071             drbd_err(connection, "Authentication of peer failed\n");
1072             return -1;
1073         case 0:
1074             drbd_err(connection, "Authentication of peer failed, trying again.\n");
1075             return 0;
1076         }
1077     }
1078 
1079     connection->data.socket->sk->sk_sndtimeo = timeout;
1080     connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1081 
1082     if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1083         return -1;
1084 
1085     /* Prevent a race between resync-handshake and
1086      * being promoted to Primary.
1087      *
1088      * Grab and release the state mutex, so we know that any current
1089      * drbd_set_role() is finished, and any incoming drbd_set_role
1090      * will see the STATE_SENT flag, and wait for it to be cleared.
1091      */
1092     idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1093         mutex_lock(peer_device->device->state_mutex);
1094 
1095     /* avoid a race with conn_request_state( C_DISCONNECTING ) */
1096     spin_lock_irq(&connection->resource->req_lock);
1097     set_bit(STATE_SENT, &connection->flags);
1098     spin_unlock_irq(&connection->resource->req_lock);
1099 
1100     idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1101         mutex_unlock(peer_device->device->state_mutex);
1102 
1103     rcu_read_lock();
1104     idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1105         struct drbd_device *device = peer_device->device;
1106         kref_get(&device->kref);
1107         rcu_read_unlock();
1108 
1109         if (discard_my_data)
1110             set_bit(DISCARD_MY_DATA, &device->flags);
1111         else
1112             clear_bit(DISCARD_MY_DATA, &device->flags);
1113 
1114         drbd_connected(peer_device);
1115         kref_put(&device->kref, drbd_destroy_device);
1116         rcu_read_lock();
1117     }
1118     rcu_read_unlock();
1119 
1120     rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1121     if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1122         clear_bit(STATE_SENT, &connection->flags);
1123         return 0;
1124     }
1125 
1126     drbd_thread_start(&connection->ack_receiver);
1127     /* opencoded create_singlethread_workqueue(),
1128      * to be able to use format string arguments */
1129     connection->ack_sender =
1130         alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1131     if (!connection->ack_sender) {
1132         drbd_err(connection, "Failed to create workqueue ack_sender\n");
1133         return 0;
1134     }
1135 
1136     mutex_lock(&connection->resource->conf_update);
1137     /* The discard_my_data flag is a single-shot modifier to the next
1138      * connection attempt, the handshake of which is now well underway.
1139      * No need for rcu style copying of the whole struct
1140      * just to clear a single value. */
1141     connection->net_conf->discard_my_data = 0;
1142     mutex_unlock(&connection->resource->conf_update);
1143 
1144     return h;
1145 
1146 out_release_sockets:
1147     if (ad.s_listen)
1148         sock_release(ad.s_listen);
1149     if (sock.socket)
1150         sock_release(sock.socket);
1151     if (msock.socket)
1152         sock_release(msock.socket);
1153     return -1;
1154 }
1155 
1156 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1157 {
1158     unsigned int header_size = drbd_header_size(connection);
1159 
1160     if (header_size == sizeof(struct p_header100) &&
1161         *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1162         struct p_header100 *h = header;
1163         if (h->pad != 0) {
1164             drbd_err(connection, "Header padding is not zero\n");
1165             return -EINVAL;
1166         }
1167         pi->vnr = be16_to_cpu(h->volume);
1168         pi->cmd = be16_to_cpu(h->command);
1169         pi->size = be32_to_cpu(h->length);
1170     } else if (header_size == sizeof(struct p_header95) &&
1171            *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1172         struct p_header95 *h = header;
1173         pi->cmd = be16_to_cpu(h->command);
1174         pi->size = be32_to_cpu(h->length);
1175         pi->vnr = 0;
1176     } else if (header_size == sizeof(struct p_header80) &&
1177            *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1178         struct p_header80 *h = header;
1179         pi->cmd = be16_to_cpu(h->command);
1180         pi->size = be16_to_cpu(h->length);
1181         pi->vnr = 0;
1182     } else {
1183         drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1184              be32_to_cpu(*(__be32 *)header),
1185              connection->agreed_pro_version);
1186         return -EINVAL;
1187     }
1188     pi->data = header + header_size;
1189     return 0;
1190 }
1191 
1192 static void drbd_unplug_all_devices(struct drbd_connection *connection)
1193 {
1194     if (current->plug == &connection->receiver_plug) {
1195         blk_finish_plug(&connection->receiver_plug);
1196         blk_start_plug(&connection->receiver_plug);
1197     } /* else: maybe just schedule() ?? */
1198 }
1199 
1200 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1201 {
1202     void *buffer = connection->data.rbuf;
1203     int err;
1204 
1205     err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1206     if (err)
1207         return err;
1208 
1209     err = decode_header(connection, buffer, pi);
1210     connection->last_received = jiffies;
1211 
1212     return err;
1213 }
1214 
1215 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1216 {
1217     void *buffer = connection->data.rbuf;
1218     unsigned int size = drbd_header_size(connection);
1219     int err;
1220 
1221     err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1222     if (err != size) {
1223         /* If we have nothing in the receive buffer now, to reduce
1224          * application latency, try to drain the backend queues as
1225          * quickly as possible, and let remote TCP know what we have
1226          * received so far. */
1227         if (err == -EAGAIN) {
1228             tcp_sock_set_quickack(connection->data.socket->sk, 2);
1229             drbd_unplug_all_devices(connection);
1230         }
1231         if (err > 0) {
1232             buffer += err;
1233             size -= err;
1234         }
1235         err = drbd_recv_all_warn(connection, buffer, size);
1236         if (err)
1237             return err;
1238     }
1239 
1240     err = decode_header(connection, connection->data.rbuf, pi);
1241     connection->last_received = jiffies;
1242 
1243     return err;
1244 }
1245 /* This is blkdev_issue_flush, but asynchronous.
1246  * We want to submit to all component volumes in parallel,
1247  * then wait for all completions.
1248  */
1249 struct issue_flush_context {
1250     atomic_t pending;
1251     int error;
1252     struct completion done;
1253 };
1254 struct one_flush_context {
1255     struct drbd_device *device;
1256     struct issue_flush_context *ctx;
1257 };
1258 
1259 static void one_flush_endio(struct bio *bio)
1260 {
1261     struct one_flush_context *octx = bio->bi_private;
1262     struct drbd_device *device = octx->device;
1263     struct issue_flush_context *ctx = octx->ctx;
1264 
1265     if (bio->bi_status) {
1266         ctx->error = blk_status_to_errno(bio->bi_status);
1267         drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1268     }
1269     kfree(octx);
1270     bio_put(bio);
1271 
1272     clear_bit(FLUSH_PENDING, &device->flags);
1273     put_ldev(device);
1274     kref_put(&device->kref, drbd_destroy_device);
1275 
1276     if (atomic_dec_and_test(&ctx->pending))
1277         complete(&ctx->done);
1278 }
1279 
1280 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1281 {
1282     struct bio *bio = bio_alloc(device->ldev->backing_bdev, 0,
1283                     REQ_OP_FLUSH | REQ_PREFLUSH, GFP_NOIO);
1284     struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1285 
1286     if (!octx) {
1287         drbd_warn(device, "Could not allocate a octx, CANNOT ISSUE FLUSH\n");
1288         /* FIXME: what else can I do now?  disconnecting or detaching
1289          * really does not help to improve the state of the world, either.
1290          */
1291         bio_put(bio);
1292 
1293         ctx->error = -ENOMEM;
1294         put_ldev(device);
1295         kref_put(&device->kref, drbd_destroy_device);
1296         return;
1297     }
1298 
1299     octx->device = device;
1300     octx->ctx = ctx;
1301     bio->bi_private = octx;
1302     bio->bi_end_io = one_flush_endio;
1303 
1304     device->flush_jif = jiffies;
1305     set_bit(FLUSH_PENDING, &device->flags);
1306     atomic_inc(&ctx->pending);
1307     submit_bio(bio);
1308 }
1309 
1310 static void drbd_flush(struct drbd_connection *connection)
1311 {
1312     if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1313         struct drbd_peer_device *peer_device;
1314         struct issue_flush_context ctx;
1315         int vnr;
1316 
1317         atomic_set(&ctx.pending, 1);
1318         ctx.error = 0;
1319         init_completion(&ctx.done);
1320 
1321         rcu_read_lock();
1322         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1323             struct drbd_device *device = peer_device->device;
1324 
1325             if (!get_ldev(device))
1326                 continue;
1327             kref_get(&device->kref);
1328             rcu_read_unlock();
1329 
1330             submit_one_flush(device, &ctx);
1331 
1332             rcu_read_lock();
1333         }
1334         rcu_read_unlock();
1335 
1336         /* Do we want to add a timeout,
1337          * if disk-timeout is set? */
1338         if (!atomic_dec_and_test(&ctx.pending))
1339             wait_for_completion(&ctx.done);
1340 
1341         if (ctx.error) {
1342             /* would rather check on EOPNOTSUPP, but that is not reliable.
1343              * don't try again for ANY return value != 0
1344              * if (rv == -EOPNOTSUPP) */
1345             /* Any error is already reported by bio_endio callback. */
1346             drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1347         }
1348     }
1349 }
1350 
1351 /**
1352  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1353  * @connection: DRBD connection.
1354  * @epoch:  Epoch object.
1355  * @ev:     Epoch event.
1356  */
1357 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1358                            struct drbd_epoch *epoch,
1359                            enum epoch_event ev)
1360 {
1361     int epoch_size;
1362     struct drbd_epoch *next_epoch;
1363     enum finish_epoch rv = FE_STILL_LIVE;
1364 
1365     spin_lock(&connection->epoch_lock);
1366     do {
1367         next_epoch = NULL;
1368 
1369         epoch_size = atomic_read(&epoch->epoch_size);
1370 
1371         switch (ev & ~EV_CLEANUP) {
1372         case EV_PUT:
1373             atomic_dec(&epoch->active);
1374             break;
1375         case EV_GOT_BARRIER_NR:
1376             set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1377             break;
1378         case EV_BECAME_LAST:
1379             /* nothing to do*/
1380             break;
1381         }
1382 
1383         if (epoch_size != 0 &&
1384             atomic_read(&epoch->active) == 0 &&
1385             (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1386             if (!(ev & EV_CLEANUP)) {
1387                 spin_unlock(&connection->epoch_lock);
1388                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1389                 spin_lock(&connection->epoch_lock);
1390             }
1391 #if 0
1392             /* FIXME: dec unacked on connection, once we have
1393              * something to count pending connection packets in. */
1394             if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1395                 dec_unacked(epoch->connection);
1396 #endif
1397 
1398             if (connection->current_epoch != epoch) {
1399                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1400                 list_del(&epoch->list);
1401                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1402                 connection->epochs--;
1403                 kfree(epoch);
1404 
1405                 if (rv == FE_STILL_LIVE)
1406                     rv = FE_DESTROYED;
1407             } else {
1408                 epoch->flags = 0;
1409                 atomic_set(&epoch->epoch_size, 0);
1410                 /* atomic_set(&epoch->active, 0); is already zero */
1411                 if (rv == FE_STILL_LIVE)
1412                     rv = FE_RECYCLED;
1413             }
1414         }
1415 
1416         if (!next_epoch)
1417             break;
1418 
1419         epoch = next_epoch;
1420     } while (1);
1421 
1422     spin_unlock(&connection->epoch_lock);
1423 
1424     return rv;
1425 }
1426 
1427 static enum write_ordering_e
1428 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1429 {
1430     struct disk_conf *dc;
1431 
1432     dc = rcu_dereference(bdev->disk_conf);
1433 
1434     if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1435         wo = WO_DRAIN_IO;
1436     if (wo == WO_DRAIN_IO && !dc->disk_drain)
1437         wo = WO_NONE;
1438 
1439     return wo;
1440 }
1441 
1442 /*
1443  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1444  * @wo:     Write ordering method to try.
1445  */
1446 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1447                   enum write_ordering_e wo)
1448 {
1449     struct drbd_device *device;
1450     enum write_ordering_e pwo;
1451     int vnr;
1452     static char *write_ordering_str[] = {
1453         [WO_NONE] = "none",
1454         [WO_DRAIN_IO] = "drain",
1455         [WO_BDEV_FLUSH] = "flush",
1456     };
1457 
1458     pwo = resource->write_ordering;
1459     if (wo != WO_BDEV_FLUSH)
1460         wo = min(pwo, wo);
1461     rcu_read_lock();
1462     idr_for_each_entry(&resource->devices, device, vnr) {
1463         if (get_ldev(device)) {
1464             wo = max_allowed_wo(device->ldev, wo);
1465             if (device->ldev == bdev)
1466                 bdev = NULL;
1467             put_ldev(device);
1468         }
1469     }
1470 
1471     if (bdev)
1472         wo = max_allowed_wo(bdev, wo);
1473 
1474     rcu_read_unlock();
1475 
1476     resource->write_ordering = wo;
1477     if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1478         drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1479 }
1480 
1481 /*
1482  * Mapping "discard" to ZEROOUT with UNMAP does not work for us:
1483  * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it
1484  * will directly go to fallback mode, submitting normal writes, and
1485  * never even try to UNMAP.
1486  *
1487  * And dm-thin does not do this (yet), mostly because in general it has
1488  * to assume that "skip_block_zeroing" is set.  See also:
1489  * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html
1490  * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html
1491  *
1492  * We *may* ignore the discard-zeroes-data setting, if so configured.
1493  *
1494  * Assumption is that this "discard_zeroes_data=0" is only because the backend
1495  * may ignore partial unaligned discards.
1496  *
1497  * LVM/DM thin as of at least
1498  *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1499  *   Library version: 1.02.93-RHEL7 (2015-01-28)
1500  *   Driver version:  4.29.0
1501  * still behaves this way.
1502  *
1503  * For unaligned (wrt. alignment and granularity) or too small discards,
1504  * we zero-out the initial (and/or) trailing unaligned partial chunks,
1505  * but discard all the aligned full chunks.
1506  *
1507  * At least for LVM/DM thin, with skip_block_zeroing=false,
1508  * the result is effectively "discard_zeroes_data=1".
1509  */
1510 /* flags: EE_TRIM|EE_ZEROOUT */
1511 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags)
1512 {
1513     struct block_device *bdev = device->ldev->backing_bdev;
1514     sector_t tmp, nr;
1515     unsigned int max_discard_sectors, granularity;
1516     int alignment;
1517     int err = 0;
1518 
1519     if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM))
1520         goto zero_out;
1521 
1522     /* Zero-sector (unknown) and one-sector granularities are the same.  */
1523     granularity = max(bdev_discard_granularity(bdev) >> 9, 1U);
1524     alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1525 
1526     max_discard_sectors = min(bdev_max_discard_sectors(bdev), (1U << 22));
1527     max_discard_sectors -= max_discard_sectors % granularity;
1528     if (unlikely(!max_discard_sectors))
1529         goto zero_out;
1530 
1531     if (nr_sectors < granularity)
1532         goto zero_out;
1533 
1534     tmp = start;
1535     if (sector_div(tmp, granularity) != alignment) {
1536         if (nr_sectors < 2*granularity)
1537             goto zero_out;
1538         /* start + gran - (start + gran - align) % gran */
1539         tmp = start + granularity - alignment;
1540         tmp = start + granularity - sector_div(tmp, granularity);
1541 
1542         nr = tmp - start;
1543         /* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
1544          * layers are below us, some may have smaller granularity */
1545         err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1546         nr_sectors -= nr;
1547         start = tmp;
1548     }
1549     while (nr_sectors >= max_discard_sectors) {
1550         err |= blkdev_issue_discard(bdev, start, max_discard_sectors,
1551                         GFP_NOIO);
1552         nr_sectors -= max_discard_sectors;
1553         start += max_discard_sectors;
1554     }
1555     if (nr_sectors) {
1556         /* max_discard_sectors is unsigned int (and a multiple of
1557          * granularity, we made sure of that above already);
1558          * nr is < max_discard_sectors;
1559          * I don't need sector_div here, even though nr is sector_t */
1560         nr = nr_sectors;
1561         nr -= (unsigned int)nr % granularity;
1562         if (nr) {
1563             err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO);
1564             nr_sectors -= nr;
1565             start += nr;
1566         }
1567     }
1568  zero_out:
1569     if (nr_sectors) {
1570         err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO,
1571                 (flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP);
1572     }
1573     return err != 0;
1574 }
1575 
1576 static bool can_do_reliable_discards(struct drbd_device *device)
1577 {
1578     struct disk_conf *dc;
1579     bool can_do;
1580 
1581     if (!bdev_max_discard_sectors(device->ldev->backing_bdev))
1582         return false;
1583 
1584     rcu_read_lock();
1585     dc = rcu_dereference(device->ldev->disk_conf);
1586     can_do = dc->discard_zeroes_if_aligned;
1587     rcu_read_unlock();
1588     return can_do;
1589 }
1590 
1591 static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req)
1592 {
1593     /* If the backend cannot discard, or does not guarantee
1594      * read-back zeroes in discarded ranges, we fall back to
1595      * zero-out.  Unless configuration specifically requested
1596      * otherwise. */
1597     if (!can_do_reliable_discards(device))
1598         peer_req->flags |= EE_ZEROOUT;
1599 
1600     if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1601         peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM)))
1602         peer_req->flags |= EE_WAS_ERROR;
1603     drbd_endio_write_sec_final(peer_req);
1604 }
1605 
1606 /**
1607  * drbd_submit_peer_request()
1608  * @device: DRBD device.
1609  * @peer_req:   peer request
1610  *
1611  * May spread the pages to multiple bios,
1612  * depending on bio_add_page restrictions.
1613  *
1614  * Returns 0 if all bios have been submitted,
1615  * -ENOMEM if we could not allocate enough bios,
1616  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1617  *  single page to an empty bio (which should never happen and likely indicates
1618  *  that the lower level IO stack is in some way broken). This has been observed
1619  *  on certain Xen deployments.
1620  */
1621 /* TODO allocate from our own bio_set. */
1622 int drbd_submit_peer_request(struct drbd_device *device,
1623                  struct drbd_peer_request *peer_req,
1624                  const blk_opf_t opf, const int fault_type)
1625 {
1626     struct bio *bios = NULL;
1627     struct bio *bio;
1628     struct page *page = peer_req->pages;
1629     sector_t sector = peer_req->i.sector;
1630     unsigned int data_size = peer_req->i.size;
1631     unsigned int n_bios = 0;
1632     unsigned int nr_pages = PFN_UP(data_size);
1633 
1634     /* TRIM/DISCARD: for now, always use the helper function
1635      * blkdev_issue_zeroout(..., discard=true).
1636      * It's synchronous, but it does the right thing wrt. bio splitting.
1637      * Correctness first, performance later.  Next step is to code an
1638      * asynchronous variant of the same.
1639      */
1640     if (peer_req->flags & (EE_TRIM | EE_ZEROOUT)) {
1641         /* wait for all pending IO completions, before we start
1642          * zeroing things out. */
1643         conn_wait_active_ee_empty(peer_req->peer_device->connection);
1644         /* add it to the active list now,
1645          * so we can find it to present it in debugfs */
1646         peer_req->submit_jif = jiffies;
1647         peer_req->flags |= EE_SUBMITTED;
1648 
1649         /* If this was a resync request from receive_rs_deallocated(),
1650          * it is already on the sync_ee list */
1651         if (list_empty(&peer_req->w.list)) {
1652             spin_lock_irq(&device->resource->req_lock);
1653             list_add_tail(&peer_req->w.list, &device->active_ee);
1654             spin_unlock_irq(&device->resource->req_lock);
1655         }
1656 
1657         drbd_issue_peer_discard_or_zero_out(device, peer_req);
1658         return 0;
1659     }
1660 
1661     /* In most cases, we will only need one bio.  But in case the lower
1662      * level restrictions happen to be different at this offset on this
1663      * side than those of the sending peer, we may need to submit the
1664      * request in more than one bio.
1665      *
1666      * Plain bio_alloc is good enough here, this is no DRBD internally
1667      * generated bio, but a bio allocated on behalf of the peer.
1668      */
1669 next_bio:
1670     bio = bio_alloc(device->ldev->backing_bdev, nr_pages, opf, GFP_NOIO);
1671     /* > peer_req->i.sector, unless this is the first bio */
1672     bio->bi_iter.bi_sector = sector;
1673     bio->bi_private = peer_req;
1674     bio->bi_end_io = drbd_peer_request_endio;
1675 
1676     bio->bi_next = bios;
1677     bios = bio;
1678     ++n_bios;
1679 
1680     page_chain_for_each(page) {
1681         unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1682         if (!bio_add_page(bio, page, len, 0))
1683             goto next_bio;
1684         data_size -= len;
1685         sector += len >> 9;
1686         --nr_pages;
1687     }
1688     D_ASSERT(device, data_size == 0);
1689     D_ASSERT(device, page == NULL);
1690 
1691     atomic_set(&peer_req->pending_bios, n_bios);
1692     /* for debugfs: update timestamp, mark as submitted */
1693     peer_req->submit_jif = jiffies;
1694     peer_req->flags |= EE_SUBMITTED;
1695     do {
1696         bio = bios;
1697         bios = bios->bi_next;
1698         bio->bi_next = NULL;
1699 
1700         drbd_submit_bio_noacct(device, fault_type, bio);
1701     } while (bios);
1702     return 0;
1703 }
1704 
1705 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1706                          struct drbd_peer_request *peer_req)
1707 {
1708     struct drbd_interval *i = &peer_req->i;
1709 
1710     drbd_remove_interval(&device->write_requests, i);
1711     drbd_clear_interval(i);
1712 
1713     /* Wake up any processes waiting for this peer request to complete.  */
1714     if (i->waiting)
1715         wake_up(&device->misc_wait);
1716 }
1717 
1718 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1719 {
1720     struct drbd_peer_device *peer_device;
1721     int vnr;
1722 
1723     rcu_read_lock();
1724     idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1725         struct drbd_device *device = peer_device->device;
1726 
1727         kref_get(&device->kref);
1728         rcu_read_unlock();
1729         drbd_wait_ee_list_empty(device, &device->active_ee);
1730         kref_put(&device->kref, drbd_destroy_device);
1731         rcu_read_lock();
1732     }
1733     rcu_read_unlock();
1734 }
1735 
1736 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1737 {
1738     int rv;
1739     struct p_barrier *p = pi->data;
1740     struct drbd_epoch *epoch;
1741 
1742     /* FIXME these are unacked on connection,
1743      * not a specific (peer)device.
1744      */
1745     connection->current_epoch->barrier_nr = p->barrier;
1746     connection->current_epoch->connection = connection;
1747     rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1748 
1749     /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1750      * the activity log, which means it would not be resynced in case the
1751      * R_PRIMARY crashes now.
1752      * Therefore we must send the barrier_ack after the barrier request was
1753      * completed. */
1754     switch (connection->resource->write_ordering) {
1755     case WO_NONE:
1756         if (rv == FE_RECYCLED)
1757             return 0;
1758 
1759         /* receiver context, in the writeout path of the other node.
1760          * avoid potential distributed deadlock */
1761         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1762         if (epoch)
1763             break;
1764         else
1765             drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1766         fallthrough;
1767 
1768     case WO_BDEV_FLUSH:
1769     case WO_DRAIN_IO:
1770         conn_wait_active_ee_empty(connection);
1771         drbd_flush(connection);
1772 
1773         if (atomic_read(&connection->current_epoch->epoch_size)) {
1774             epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1775             if (epoch)
1776                 break;
1777         }
1778 
1779         return 0;
1780     default:
1781         drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1782              connection->resource->write_ordering);
1783         return -EIO;
1784     }
1785 
1786     epoch->flags = 0;
1787     atomic_set(&epoch->epoch_size, 0);
1788     atomic_set(&epoch->active, 0);
1789 
1790     spin_lock(&connection->epoch_lock);
1791     if (atomic_read(&connection->current_epoch->epoch_size)) {
1792         list_add(&epoch->list, &connection->current_epoch->list);
1793         connection->current_epoch = epoch;
1794         connection->epochs++;
1795     } else {
1796         /* The current_epoch got recycled while we allocated this one... */
1797         kfree(epoch);
1798     }
1799     spin_unlock(&connection->epoch_lock);
1800 
1801     return 0;
1802 }
1803 
1804 /* quick wrapper in case payload size != request_size (write same) */
1805 static void drbd_csum_ee_size(struct crypto_shash *h,
1806                   struct drbd_peer_request *r, void *d,
1807                   unsigned int payload_size)
1808 {
1809     unsigned int tmp = r->i.size;
1810     r->i.size = payload_size;
1811     drbd_csum_ee(h, r, d);
1812     r->i.size = tmp;
1813 }
1814 
1815 /* used from receive_RSDataReply (recv_resync_read)
1816  * and from receive_Data.
1817  * data_size: actual payload ("data in")
1818  *  for normal writes that is bi_size.
1819  *  for discards, that is zero.
1820  *  for write same, it is logical_block_size.
1821  * both trim and write same have the bi_size ("data len to be affected")
1822  * as extra argument in the packet header.
1823  */
1824 static struct drbd_peer_request *
1825 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1826           struct packet_info *pi) __must_hold(local)
1827 {
1828     struct drbd_device *device = peer_device->device;
1829     const sector_t capacity = get_capacity(device->vdisk);
1830     struct drbd_peer_request *peer_req;
1831     struct page *page;
1832     int digest_size, err;
1833     unsigned int data_size = pi->size, ds;
1834     void *dig_in = peer_device->connection->int_dig_in;
1835     void *dig_vv = peer_device->connection->int_dig_vv;
1836     unsigned long *data;
1837     struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1838     struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL;
1839 
1840     digest_size = 0;
1841     if (!trim && peer_device->connection->peer_integrity_tfm) {
1842         digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1843         /*
1844          * FIXME: Receive the incoming digest into the receive buffer
1845          *    here, together with its struct p_data?
1846          */
1847         err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1848         if (err)
1849             return NULL;
1850         data_size -= digest_size;
1851     }
1852 
1853     /* assume request_size == data_size, but special case trim. */
1854     ds = data_size;
1855     if (trim) {
1856         if (!expect(data_size == 0))
1857             return NULL;
1858         ds = be32_to_cpu(trim->size);
1859     } else if (zeroes) {
1860         if (!expect(data_size == 0))
1861             return NULL;
1862         ds = be32_to_cpu(zeroes->size);
1863     }
1864 
1865     if (!expect(IS_ALIGNED(ds, 512)))
1866         return NULL;
1867     if (trim || zeroes) {
1868         if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1869             return NULL;
1870     } else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1871         return NULL;
1872 
1873     /* even though we trust out peer,
1874      * we sometimes have to double check. */
1875     if (sector + (ds>>9) > capacity) {
1876         drbd_err(device, "request from peer beyond end of local disk: "
1877             "capacity: %llus < sector: %llus + size: %u\n",
1878             (unsigned long long)capacity,
1879             (unsigned long long)sector, ds);
1880         return NULL;
1881     }
1882 
1883     /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1884      * "criss-cross" setup, that might cause write-out on some other DRBD,
1885      * which in turn might block on the other node at this very place.  */
1886     peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1887     if (!peer_req)
1888         return NULL;
1889 
1890     peer_req->flags |= EE_WRITE;
1891     if (trim) {
1892         peer_req->flags |= EE_TRIM;
1893         return peer_req;
1894     }
1895     if (zeroes) {
1896         peer_req->flags |= EE_ZEROOUT;
1897         return peer_req;
1898     }
1899 
1900     /* receive payload size bytes into page chain */
1901     ds = data_size;
1902     page = peer_req->pages;
1903     page_chain_for_each(page) {
1904         unsigned len = min_t(int, ds, PAGE_SIZE);
1905         data = kmap(page);
1906         err = drbd_recv_all_warn(peer_device->connection, data, len);
1907         if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1908             drbd_err(device, "Fault injection: Corrupting data on receive\n");
1909             data[0] = data[0] ^ (unsigned long)-1;
1910         }
1911         kunmap(page);
1912         if (err) {
1913             drbd_free_peer_req(device, peer_req);
1914             return NULL;
1915         }
1916         ds -= len;
1917     }
1918 
1919     if (digest_size) {
1920         drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1921         if (memcmp(dig_in, dig_vv, digest_size)) {
1922             drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1923                 (unsigned long long)sector, data_size);
1924             drbd_free_peer_req(device, peer_req);
1925             return NULL;
1926         }
1927     }
1928     device->recv_cnt += data_size >> 9;
1929     return peer_req;
1930 }
1931 
1932 /* drbd_drain_block() just takes a data block
1933  * out of the socket input buffer, and discards it.
1934  */
1935 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1936 {
1937     struct page *page;
1938     int err = 0;
1939     void *data;
1940 
1941     if (!data_size)
1942         return 0;
1943 
1944     page = drbd_alloc_pages(peer_device, 1, 1);
1945 
1946     data = kmap(page);
1947     while (data_size) {
1948         unsigned int len = min_t(int, data_size, PAGE_SIZE);
1949 
1950         err = drbd_recv_all_warn(peer_device->connection, data, len);
1951         if (err)
1952             break;
1953         data_size -= len;
1954     }
1955     kunmap(page);
1956     drbd_free_pages(peer_device->device, page, 0);
1957     return err;
1958 }
1959 
1960 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1961                sector_t sector, int data_size)
1962 {
1963     struct bio_vec bvec;
1964     struct bvec_iter iter;
1965     struct bio *bio;
1966     int digest_size, err, expect;
1967     void *dig_in = peer_device->connection->int_dig_in;
1968     void *dig_vv = peer_device->connection->int_dig_vv;
1969 
1970     digest_size = 0;
1971     if (peer_device->connection->peer_integrity_tfm) {
1972         digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1973         err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1974         if (err)
1975             return err;
1976         data_size -= digest_size;
1977     }
1978 
1979     /* optimistically update recv_cnt.  if receiving fails below,
1980      * we disconnect anyways, and counters will be reset. */
1981     peer_device->device->recv_cnt += data_size>>9;
1982 
1983     bio = req->master_bio;
1984     D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1985 
1986     bio_for_each_segment(bvec, bio, iter) {
1987         void *mapped = bvec_kmap_local(&bvec);
1988         expect = min_t(int, data_size, bvec.bv_len);
1989         err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1990         kunmap_local(mapped);
1991         if (err)
1992             return err;
1993         data_size -= expect;
1994     }
1995 
1996     if (digest_size) {
1997         drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1998         if (memcmp(dig_in, dig_vv, digest_size)) {
1999             drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
2000             return -EINVAL;
2001         }
2002     }
2003 
2004     D_ASSERT(peer_device->device, data_size == 0);
2005     return 0;
2006 }
2007 
2008 /*
2009  * e_end_resync_block() is called in ack_sender context via
2010  * drbd_finish_peer_reqs().
2011  */
2012 static int e_end_resync_block(struct drbd_work *w, int unused)
2013 {
2014     struct drbd_peer_request *peer_req =
2015         container_of(w, struct drbd_peer_request, w);
2016     struct drbd_peer_device *peer_device = peer_req->peer_device;
2017     struct drbd_device *device = peer_device->device;
2018     sector_t sector = peer_req->i.sector;
2019     int err;
2020 
2021     D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2022 
2023     if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2024         drbd_set_in_sync(device, sector, peer_req->i.size);
2025         err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
2026     } else {
2027         /* Record failure to sync */
2028         drbd_rs_failed_io(device, sector, peer_req->i.size);
2029 
2030         err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2031     }
2032     dec_unacked(device);
2033 
2034     return err;
2035 }
2036 
2037 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
2038                 struct packet_info *pi) __releases(local)
2039 {
2040     struct drbd_device *device = peer_device->device;
2041     struct drbd_peer_request *peer_req;
2042 
2043     peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
2044     if (!peer_req)
2045         goto fail;
2046 
2047     dec_rs_pending(device);
2048 
2049     inc_unacked(device);
2050     /* corresponding dec_unacked() in e_end_resync_block()
2051      * respective _drbd_clear_done_ee */
2052 
2053     peer_req->w.cb = e_end_resync_block;
2054     peer_req->submit_jif = jiffies;
2055 
2056     spin_lock_irq(&device->resource->req_lock);
2057     list_add_tail(&peer_req->w.list, &device->sync_ee);
2058     spin_unlock_irq(&device->resource->req_lock);
2059 
2060     atomic_add(pi->size >> 9, &device->rs_sect_ev);
2061     if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE,
2062                      DRBD_FAULT_RS_WR) == 0)
2063         return 0;
2064 
2065     /* don't care for the reason here */
2066     drbd_err(device, "submit failed, triggering re-connect\n");
2067     spin_lock_irq(&device->resource->req_lock);
2068     list_del(&peer_req->w.list);
2069     spin_unlock_irq(&device->resource->req_lock);
2070 
2071     drbd_free_peer_req(device, peer_req);
2072 fail:
2073     put_ldev(device);
2074     return -EIO;
2075 }
2076 
2077 static struct drbd_request *
2078 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2079          sector_t sector, bool missing_ok, const char *func)
2080 {
2081     struct drbd_request *req;
2082 
2083     /* Request object according to our peer */
2084     req = (struct drbd_request *)(unsigned long)id;
2085     if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2086         return req;
2087     if (!missing_ok) {
2088         drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2089             (unsigned long)id, (unsigned long long)sector);
2090     }
2091     return NULL;
2092 }
2093 
2094 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2095 {
2096     struct drbd_peer_device *peer_device;
2097     struct drbd_device *device;
2098     struct drbd_request *req;
2099     sector_t sector;
2100     int err;
2101     struct p_data *p = pi->data;
2102 
2103     peer_device = conn_peer_device(connection, pi->vnr);
2104     if (!peer_device)
2105         return -EIO;
2106     device = peer_device->device;
2107 
2108     sector = be64_to_cpu(p->sector);
2109 
2110     spin_lock_irq(&device->resource->req_lock);
2111     req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2112     spin_unlock_irq(&device->resource->req_lock);
2113     if (unlikely(!req))
2114         return -EIO;
2115 
2116     /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2117      * special casing it there for the various failure cases.
2118      * still no race with drbd_fail_pending_reads */
2119     err = recv_dless_read(peer_device, req, sector, pi->size);
2120     if (!err)
2121         req_mod(req, DATA_RECEIVED);
2122     /* else: nothing. handled from drbd_disconnect...
2123      * I don't think we may complete this just yet
2124      * in case we are "on-disconnect: freeze" */
2125 
2126     return err;
2127 }
2128 
2129 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2130 {
2131     struct drbd_peer_device *peer_device;
2132     struct drbd_device *device;
2133     sector_t sector;
2134     int err;
2135     struct p_data *p = pi->data;
2136 
2137     peer_device = conn_peer_device(connection, pi->vnr);
2138     if (!peer_device)
2139         return -EIO;
2140     device = peer_device->device;
2141 
2142     sector = be64_to_cpu(p->sector);
2143     D_ASSERT(device, p->block_id == ID_SYNCER);
2144 
2145     if (get_ldev(device)) {
2146         /* data is submitted to disk within recv_resync_read.
2147          * corresponding put_ldev done below on error,
2148          * or in drbd_peer_request_endio. */
2149         err = recv_resync_read(peer_device, sector, pi);
2150     } else {
2151         if (__ratelimit(&drbd_ratelimit_state))
2152             drbd_err(device, "Can not write resync data to local disk.\n");
2153 
2154         err = drbd_drain_block(peer_device, pi->size);
2155 
2156         drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2157     }
2158 
2159     atomic_add(pi->size >> 9, &device->rs_sect_in);
2160 
2161     return err;
2162 }
2163 
2164 static void restart_conflicting_writes(struct drbd_device *device,
2165                        sector_t sector, int size)
2166 {
2167     struct drbd_interval *i;
2168     struct drbd_request *req;
2169 
2170     drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2171         if (!i->local)
2172             continue;
2173         req = container_of(i, struct drbd_request, i);
2174         if (req->rq_state & RQ_LOCAL_PENDING ||
2175             !(req->rq_state & RQ_POSTPONED))
2176             continue;
2177         /* as it is RQ_POSTPONED, this will cause it to
2178          * be queued on the retry workqueue. */
2179         __req_mod(req, CONFLICT_RESOLVED, NULL);
2180     }
2181 }
2182 
2183 /*
2184  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2185  */
2186 static int e_end_block(struct drbd_work *w, int cancel)
2187 {
2188     struct drbd_peer_request *peer_req =
2189         container_of(w, struct drbd_peer_request, w);
2190     struct drbd_peer_device *peer_device = peer_req->peer_device;
2191     struct drbd_device *device = peer_device->device;
2192     sector_t sector = peer_req->i.sector;
2193     int err = 0, pcmd;
2194 
2195     if (peer_req->flags & EE_SEND_WRITE_ACK) {
2196         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2197             pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2198                 device->state.conn <= C_PAUSED_SYNC_T &&
2199                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2200                 P_RS_WRITE_ACK : P_WRITE_ACK;
2201             err = drbd_send_ack(peer_device, pcmd, peer_req);
2202             if (pcmd == P_RS_WRITE_ACK)
2203                 drbd_set_in_sync(device, sector, peer_req->i.size);
2204         } else {
2205             err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2206             /* we expect it to be marked out of sync anyways...
2207              * maybe assert this?  */
2208         }
2209         dec_unacked(device);
2210     }
2211 
2212     /* we delete from the conflict detection hash _after_ we sent out the
2213      * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2214     if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2215         spin_lock_irq(&device->resource->req_lock);
2216         D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2217         drbd_remove_epoch_entry_interval(device, peer_req);
2218         if (peer_req->flags & EE_RESTART_REQUESTS)
2219             restart_conflicting_writes(device, sector, peer_req->i.size);
2220         spin_unlock_irq(&device->resource->req_lock);
2221     } else
2222         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2223 
2224     drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2225 
2226     return err;
2227 }
2228 
2229 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2230 {
2231     struct drbd_peer_request *peer_req =
2232         container_of(w, struct drbd_peer_request, w);
2233     struct drbd_peer_device *peer_device = peer_req->peer_device;
2234     int err;
2235 
2236     err = drbd_send_ack(peer_device, ack, peer_req);
2237     dec_unacked(peer_device->device);
2238 
2239     return err;
2240 }
2241 
2242 static int e_send_superseded(struct drbd_work *w, int unused)
2243 {
2244     return e_send_ack(w, P_SUPERSEDED);
2245 }
2246 
2247 static int e_send_retry_write(struct drbd_work *w, int unused)
2248 {
2249     struct drbd_peer_request *peer_req =
2250         container_of(w, struct drbd_peer_request, w);
2251     struct drbd_connection *connection = peer_req->peer_device->connection;
2252 
2253     return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2254                  P_RETRY_WRITE : P_SUPERSEDED);
2255 }
2256 
2257 static bool seq_greater(u32 a, u32 b)
2258 {
2259     /*
2260      * We assume 32-bit wrap-around here.
2261      * For 24-bit wrap-around, we would have to shift:
2262      *  a <<= 8; b <<= 8;
2263      */
2264     return (s32)a - (s32)b > 0;
2265 }
2266 
2267 static u32 seq_max(u32 a, u32 b)
2268 {
2269     return seq_greater(a, b) ? a : b;
2270 }
2271 
2272 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2273 {
2274     struct drbd_device *device = peer_device->device;
2275     unsigned int newest_peer_seq;
2276 
2277     if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2278         spin_lock(&device->peer_seq_lock);
2279         newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2280         device->peer_seq = newest_peer_seq;
2281         spin_unlock(&device->peer_seq_lock);
2282         /* wake up only if we actually changed device->peer_seq */
2283         if (peer_seq == newest_peer_seq)
2284             wake_up(&device->seq_wait);
2285     }
2286 }
2287 
2288 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2289 {
2290     return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2291 }
2292 
2293 /* maybe change sync_ee into interval trees as well? */
2294 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2295 {
2296     struct drbd_peer_request *rs_req;
2297     bool rv = false;
2298 
2299     spin_lock_irq(&device->resource->req_lock);
2300     list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2301         if (overlaps(peer_req->i.sector, peer_req->i.size,
2302                  rs_req->i.sector, rs_req->i.size)) {
2303             rv = true;
2304             break;
2305         }
2306     }
2307     spin_unlock_irq(&device->resource->req_lock);
2308 
2309     return rv;
2310 }
2311 
2312 /* Called from receive_Data.
2313  * Synchronize packets on sock with packets on msock.
2314  *
2315  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2316  * packet traveling on msock, they are still processed in the order they have
2317  * been sent.
2318  *
2319  * Note: we don't care for Ack packets overtaking P_DATA packets.
2320  *
2321  * In case packet_seq is larger than device->peer_seq number, there are
2322  * outstanding packets on the msock. We wait for them to arrive.
2323  * In case we are the logically next packet, we update device->peer_seq
2324  * ourselves. Correctly handles 32bit wrap around.
2325  *
2326  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2327  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2328  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2329  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2330  *
2331  * returns 0 if we may process the packet,
2332  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2333 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2334 {
2335     struct drbd_device *device = peer_device->device;
2336     DEFINE_WAIT(wait);
2337     long timeout;
2338     int ret = 0, tp;
2339 
2340     if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2341         return 0;
2342 
2343     spin_lock(&device->peer_seq_lock);
2344     for (;;) {
2345         if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2346             device->peer_seq = seq_max(device->peer_seq, peer_seq);
2347             break;
2348         }
2349 
2350         if (signal_pending(current)) {
2351             ret = -ERESTARTSYS;
2352             break;
2353         }
2354 
2355         rcu_read_lock();
2356         tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2357         rcu_read_unlock();
2358 
2359         if (!tp)
2360             break;
2361 
2362         /* Only need to wait if two_primaries is enabled */
2363         prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2364         spin_unlock(&device->peer_seq_lock);
2365         rcu_read_lock();
2366         timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2367         rcu_read_unlock();
2368         timeout = schedule_timeout(timeout);
2369         spin_lock(&device->peer_seq_lock);
2370         if (!timeout) {
2371             ret = -ETIMEDOUT;
2372             drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2373             break;
2374         }
2375     }
2376     spin_unlock(&device->peer_seq_lock);
2377     finish_wait(&device->seq_wait, &wait);
2378     return ret;
2379 }
2380 
2381 /* see also bio_flags_to_wire()
2382  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2383  * flags and back. We may replicate to other kernel versions. */
2384 static blk_opf_t wire_flags_to_bio_flags(u32 dpf)
2385 {
2386     return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2387         (dpf & DP_FUA ? REQ_FUA : 0) |
2388         (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2389 }
2390 
2391 static enum req_op wire_flags_to_bio_op(u32 dpf)
2392 {
2393     if (dpf & DP_ZEROES)
2394         return REQ_OP_WRITE_ZEROES;
2395     if (dpf & DP_DISCARD)
2396         return REQ_OP_DISCARD;
2397     else
2398         return REQ_OP_WRITE;
2399 }
2400 
2401 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2402                     unsigned int size)
2403 {
2404     struct drbd_interval *i;
2405 
2406     repeat:
2407     drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2408         struct drbd_request *req;
2409         struct bio_and_error m;
2410 
2411         if (!i->local)
2412             continue;
2413         req = container_of(i, struct drbd_request, i);
2414         if (!(req->rq_state & RQ_POSTPONED))
2415             continue;
2416         req->rq_state &= ~RQ_POSTPONED;
2417         __req_mod(req, NEG_ACKED, &m);
2418         spin_unlock_irq(&device->resource->req_lock);
2419         if (m.bio)
2420             complete_master_bio(device, &m);
2421         spin_lock_irq(&device->resource->req_lock);
2422         goto repeat;
2423     }
2424 }
2425 
2426 static int handle_write_conflicts(struct drbd_device *device,
2427                   struct drbd_peer_request *peer_req)
2428 {
2429     struct drbd_connection *connection = peer_req->peer_device->connection;
2430     bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2431     sector_t sector = peer_req->i.sector;
2432     const unsigned int size = peer_req->i.size;
2433     struct drbd_interval *i;
2434     bool equal;
2435     int err;
2436 
2437     /*
2438      * Inserting the peer request into the write_requests tree will prevent
2439      * new conflicting local requests from being added.
2440      */
2441     drbd_insert_interval(&device->write_requests, &peer_req->i);
2442 
2443     repeat:
2444     drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2445         if (i == &peer_req->i)
2446             continue;
2447         if (i->completed)
2448             continue;
2449 
2450         if (!i->local) {
2451             /*
2452              * Our peer has sent a conflicting remote request; this
2453              * should not happen in a two-node setup.  Wait for the
2454              * earlier peer request to complete.
2455              */
2456             err = drbd_wait_misc(device, i);
2457             if (err)
2458                 goto out;
2459             goto repeat;
2460         }
2461 
2462         equal = i->sector == sector && i->size == size;
2463         if (resolve_conflicts) {
2464             /*
2465              * If the peer request is fully contained within the
2466              * overlapping request, it can be considered overwritten
2467              * and thus superseded; otherwise, it will be retried
2468              * once all overlapping requests have completed.
2469              */
2470             bool superseded = i->sector <= sector && i->sector +
2471                        (i->size >> 9) >= sector + (size >> 9);
2472 
2473             if (!equal)
2474                 drbd_alert(device, "Concurrent writes detected: "
2475                            "local=%llus +%u, remote=%llus +%u, "
2476                            "assuming %s came first\n",
2477                       (unsigned long long)i->sector, i->size,
2478                       (unsigned long long)sector, size,
2479                       superseded ? "local" : "remote");
2480 
2481             peer_req->w.cb = superseded ? e_send_superseded :
2482                            e_send_retry_write;
2483             list_add_tail(&peer_req->w.list, &device->done_ee);
2484             queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2485 
2486             err = -ENOENT;
2487             goto out;
2488         } else {
2489             struct drbd_request *req =
2490                 container_of(i, struct drbd_request, i);
2491 
2492             if (!equal)
2493                 drbd_alert(device, "Concurrent writes detected: "
2494                            "local=%llus +%u, remote=%llus +%u\n",
2495                       (unsigned long long)i->sector, i->size,
2496                       (unsigned long long)sector, size);
2497 
2498             if (req->rq_state & RQ_LOCAL_PENDING ||
2499                 !(req->rq_state & RQ_POSTPONED)) {
2500                 /*
2501                  * Wait for the node with the discard flag to
2502                  * decide if this request has been superseded
2503                  * or needs to be retried.
2504                  * Requests that have been superseded will
2505                  * disappear from the write_requests tree.
2506                  *
2507                  * In addition, wait for the conflicting
2508                  * request to finish locally before submitting
2509                  * the conflicting peer request.
2510                  */
2511                 err = drbd_wait_misc(device, &req->i);
2512                 if (err) {
2513                     _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2514                     fail_postponed_requests(device, sector, size);
2515                     goto out;
2516                 }
2517                 goto repeat;
2518             }
2519             /*
2520              * Remember to restart the conflicting requests after
2521              * the new peer request has completed.
2522              */
2523             peer_req->flags |= EE_RESTART_REQUESTS;
2524         }
2525     }
2526     err = 0;
2527 
2528     out:
2529     if (err)
2530         drbd_remove_epoch_entry_interval(device, peer_req);
2531     return err;
2532 }
2533 
2534 /* mirrored write */
2535 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2536 {
2537     struct drbd_peer_device *peer_device;
2538     struct drbd_device *device;
2539     struct net_conf *nc;
2540     sector_t sector;
2541     struct drbd_peer_request *peer_req;
2542     struct p_data *p = pi->data;
2543     u32 peer_seq = be32_to_cpu(p->seq_num);
2544     enum req_op op;
2545     blk_opf_t op_flags;
2546     u32 dp_flags;
2547     int err, tp;
2548 
2549     peer_device = conn_peer_device(connection, pi->vnr);
2550     if (!peer_device)
2551         return -EIO;
2552     device = peer_device->device;
2553 
2554     if (!get_ldev(device)) {
2555         int err2;
2556 
2557         err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2558         drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2559         atomic_inc(&connection->current_epoch->epoch_size);
2560         err2 = drbd_drain_block(peer_device, pi->size);
2561         if (!err)
2562             err = err2;
2563         return err;
2564     }
2565 
2566     /*
2567      * Corresponding put_ldev done either below (on various errors), or in
2568      * drbd_peer_request_endio, if we successfully submit the data at the
2569      * end of this function.
2570      */
2571 
2572     sector = be64_to_cpu(p->sector);
2573     peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2574     if (!peer_req) {
2575         put_ldev(device);
2576         return -EIO;
2577     }
2578 
2579     peer_req->w.cb = e_end_block;
2580     peer_req->submit_jif = jiffies;
2581     peer_req->flags |= EE_APPLICATION;
2582 
2583     dp_flags = be32_to_cpu(p->dp_flags);
2584     op = wire_flags_to_bio_op(dp_flags);
2585     op_flags = wire_flags_to_bio_flags(dp_flags);
2586     if (pi->cmd == P_TRIM) {
2587         D_ASSERT(peer_device, peer_req->i.size > 0);
2588         D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2589         D_ASSERT(peer_device, peer_req->pages == NULL);
2590         /* need to play safe: an older DRBD sender
2591          * may mean zero-out while sending P_TRIM. */
2592         if (0 == (connection->agreed_features & DRBD_FF_WZEROES))
2593             peer_req->flags |= EE_ZEROOUT;
2594     } else if (pi->cmd == P_ZEROES) {
2595         D_ASSERT(peer_device, peer_req->i.size > 0);
2596         D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
2597         D_ASSERT(peer_device, peer_req->pages == NULL);
2598         /* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
2599         if (dp_flags & DP_DISCARD)
2600             peer_req->flags |= EE_TRIM;
2601     } else if (peer_req->pages == NULL) {
2602         D_ASSERT(device, peer_req->i.size == 0);
2603         D_ASSERT(device, dp_flags & DP_FLUSH);
2604     }
2605 
2606     if (dp_flags & DP_MAY_SET_IN_SYNC)
2607         peer_req->flags |= EE_MAY_SET_IN_SYNC;
2608 
2609     spin_lock(&connection->epoch_lock);
2610     peer_req->epoch = connection->current_epoch;
2611     atomic_inc(&peer_req->epoch->epoch_size);
2612     atomic_inc(&peer_req->epoch->active);
2613     spin_unlock(&connection->epoch_lock);
2614 
2615     rcu_read_lock();
2616     nc = rcu_dereference(peer_device->connection->net_conf);
2617     tp = nc->two_primaries;
2618     if (peer_device->connection->agreed_pro_version < 100) {
2619         switch (nc->wire_protocol) {
2620         case DRBD_PROT_C:
2621             dp_flags |= DP_SEND_WRITE_ACK;
2622             break;
2623         case DRBD_PROT_B:
2624             dp_flags |= DP_SEND_RECEIVE_ACK;
2625             break;
2626         }
2627     }
2628     rcu_read_unlock();
2629 
2630     if (dp_flags & DP_SEND_WRITE_ACK) {
2631         peer_req->flags |= EE_SEND_WRITE_ACK;
2632         inc_unacked(device);
2633         /* corresponding dec_unacked() in e_end_block()
2634          * respective _drbd_clear_done_ee */
2635     }
2636 
2637     if (dp_flags & DP_SEND_RECEIVE_ACK) {
2638         /* I really don't like it that the receiver thread
2639          * sends on the msock, but anyways */
2640         drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2641     }
2642 
2643     if (tp) {
2644         /* two primaries implies protocol C */
2645         D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2646         peer_req->flags |= EE_IN_INTERVAL_TREE;
2647         err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2648         if (err)
2649             goto out_interrupted;
2650         spin_lock_irq(&device->resource->req_lock);
2651         err = handle_write_conflicts(device, peer_req);
2652         if (err) {
2653             spin_unlock_irq(&device->resource->req_lock);
2654             if (err == -ENOENT) {
2655                 put_ldev(device);
2656                 return 0;
2657             }
2658             goto out_interrupted;
2659         }
2660     } else {
2661         update_peer_seq(peer_device, peer_seq);
2662         spin_lock_irq(&device->resource->req_lock);
2663     }
2664     /* TRIM and is processed synchronously,
2665      * we wait for all pending requests, respectively wait for
2666      * active_ee to become empty in drbd_submit_peer_request();
2667      * better not add ourselves here. */
2668     if ((peer_req->flags & (EE_TRIM | EE_ZEROOUT)) == 0)
2669         list_add_tail(&peer_req->w.list, &device->active_ee);
2670     spin_unlock_irq(&device->resource->req_lock);
2671 
2672     if (device->state.conn == C_SYNC_TARGET)
2673         wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2674 
2675     if (device->state.pdsk < D_INCONSISTENT) {
2676         /* In case we have the only disk of the cluster, */
2677         drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2678         peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2679         drbd_al_begin_io(device, &peer_req->i);
2680         peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2681     }
2682 
2683     err = drbd_submit_peer_request(device, peer_req, op | op_flags,
2684                        DRBD_FAULT_DT_WR);
2685     if (!err)
2686         return 0;
2687 
2688     /* don't care for the reason here */
2689     drbd_err(device, "submit failed, triggering re-connect\n");
2690     spin_lock_irq(&device->resource->req_lock);
2691     list_del(&peer_req->w.list);
2692     drbd_remove_epoch_entry_interval(device, peer_req);
2693     spin_unlock_irq(&device->resource->req_lock);
2694     if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2695         peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2696         drbd_al_complete_io(device, &peer_req->i);
2697     }
2698 
2699 out_interrupted:
2700     drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2701     put_ldev(device);
2702     drbd_free_peer_req(device, peer_req);
2703     return err;
2704 }
2705 
2706 /* We may throttle resync, if the lower device seems to be busy,
2707  * and current sync rate is above c_min_rate.
2708  *
2709  * To decide whether or not the lower device is busy, we use a scheme similar
2710  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2711  * (more than 64 sectors) of activity we cannot account for with our own resync
2712  * activity, it obviously is "busy".
2713  *
2714  * The current sync rate used here uses only the most recent two step marks,
2715  * to have a short time average so we can react faster.
2716  */
2717 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2718         bool throttle_if_app_is_waiting)
2719 {
2720     struct lc_element *tmp;
2721     bool throttle = drbd_rs_c_min_rate_throttle(device);
2722 
2723     if (!throttle || throttle_if_app_is_waiting)
2724         return throttle;
2725 
2726     spin_lock_irq(&device->al_lock);
2727     tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2728     if (tmp) {
2729         struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2730         if (test_bit(BME_PRIORITY, &bm_ext->flags))
2731             throttle = false;
2732         /* Do not slow down if app IO is already waiting for this extent,
2733          * and our progress is necessary for application IO to complete. */
2734     }
2735     spin_unlock_irq(&device->al_lock);
2736 
2737     return throttle;
2738 }
2739 
2740 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2741 {
2742     struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
2743     unsigned long db, dt, dbdt;
2744     unsigned int c_min_rate;
2745     int curr_events;
2746 
2747     rcu_read_lock();
2748     c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2749     rcu_read_unlock();
2750 
2751     /* feature disabled? */
2752     if (c_min_rate == 0)
2753         return false;
2754 
2755     curr_events = (int)part_stat_read_accum(disk->part0, sectors) -
2756             atomic_read(&device->rs_sect_ev);
2757 
2758     if (atomic_read(&device->ap_actlog_cnt)
2759         || curr_events - device->rs_last_events > 64) {
2760         unsigned long rs_left;
2761         int i;
2762 
2763         device->rs_last_events = curr_events;
2764 
2765         /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2766          * approx. */
2767         i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2768 
2769         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2770             rs_left = device->ov_left;
2771         else
2772             rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2773 
2774         dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2775         if (!dt)
2776             dt++;
2777         db = device->rs_mark_left[i] - rs_left;
2778         dbdt = Bit2KB(db/dt);
2779 
2780         if (dbdt > c_min_rate)
2781             return true;
2782     }
2783     return false;
2784 }
2785 
2786 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2787 {
2788     struct drbd_peer_device *peer_device;
2789     struct drbd_device *device;
2790     sector_t sector;
2791     sector_t capacity;
2792     struct drbd_peer_request *peer_req;
2793     struct digest_info *di = NULL;
2794     int size, verb;
2795     unsigned int fault_type;
2796     struct p_block_req *p = pi->data;
2797 
2798     peer_device = conn_peer_device(connection, pi->vnr);
2799     if (!peer_device)
2800         return -EIO;
2801     device = peer_device->device;
2802     capacity = get_capacity(device->vdisk);
2803 
2804     sector = be64_to_cpu(p->sector);
2805     size   = be32_to_cpu(p->blksize);
2806 
2807     if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2808         drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2809                 (unsigned long long)sector, size);
2810         return -EINVAL;
2811     }
2812     if (sector + (size>>9) > capacity) {
2813         drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2814                 (unsigned long long)sector, size);
2815         return -EINVAL;
2816     }
2817 
2818     if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2819         verb = 1;
2820         switch (pi->cmd) {
2821         case P_DATA_REQUEST:
2822             drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2823             break;
2824         case P_RS_THIN_REQ:
2825         case P_RS_DATA_REQUEST:
2826         case P_CSUM_RS_REQUEST:
2827         case P_OV_REQUEST:
2828             drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2829             break;
2830         case P_OV_REPLY:
2831             verb = 0;
2832             dec_rs_pending(device);
2833             drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2834             break;
2835         default:
2836             BUG();
2837         }
2838         if (verb && __ratelimit(&drbd_ratelimit_state))
2839             drbd_err(device, "Can not satisfy peer's read request, "
2840                 "no local data.\n");
2841 
2842         /* drain possibly payload */
2843         return drbd_drain_block(peer_device, pi->size);
2844     }
2845 
2846     /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2847      * "criss-cross" setup, that might cause write-out on some other DRBD,
2848      * which in turn might block on the other node at this very place.  */
2849     peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2850             size, GFP_NOIO);
2851     if (!peer_req) {
2852         put_ldev(device);
2853         return -ENOMEM;
2854     }
2855 
2856     switch (pi->cmd) {
2857     case P_DATA_REQUEST:
2858         peer_req->w.cb = w_e_end_data_req;
2859         fault_type = DRBD_FAULT_DT_RD;
2860         /* application IO, don't drbd_rs_begin_io */
2861         peer_req->flags |= EE_APPLICATION;
2862         goto submit;
2863 
2864     case P_RS_THIN_REQ:
2865         /* If at some point in the future we have a smart way to
2866            find out if this data block is completely deallocated,
2867            then we would do something smarter here than reading
2868            the block... */
2869         peer_req->flags |= EE_RS_THIN_REQ;
2870         fallthrough;
2871     case P_RS_DATA_REQUEST:
2872         peer_req->w.cb = w_e_end_rsdata_req;
2873         fault_type = DRBD_FAULT_RS_RD;
2874         /* used in the sector offset progress display */
2875         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2876         break;
2877 
2878     case P_OV_REPLY:
2879     case P_CSUM_RS_REQUEST:
2880         fault_type = DRBD_FAULT_RS_RD;
2881         di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2882         if (!di)
2883             goto out_free_e;
2884 
2885         di->digest_size = pi->size;
2886         di->digest = (((char *)di)+sizeof(struct digest_info));
2887 
2888         peer_req->digest = di;
2889         peer_req->flags |= EE_HAS_DIGEST;
2890 
2891         if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2892             goto out_free_e;
2893 
2894         if (pi->cmd == P_CSUM_RS_REQUEST) {
2895             D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2896             peer_req->w.cb = w_e_end_csum_rs_req;
2897             /* used in the sector offset progress display */
2898             device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2899             /* remember to report stats in drbd_resync_finished */
2900             device->use_csums = true;
2901         } else if (pi->cmd == P_OV_REPLY) {
2902             /* track progress, we may need to throttle */
2903             atomic_add(size >> 9, &device->rs_sect_in);
2904             peer_req->w.cb = w_e_end_ov_reply;
2905             dec_rs_pending(device);
2906             /* drbd_rs_begin_io done when we sent this request,
2907              * but accounting still needs to be done. */
2908             goto submit_for_resync;
2909         }
2910         break;
2911 
2912     case P_OV_REQUEST:
2913         if (device->ov_start_sector == ~(sector_t)0 &&
2914             peer_device->connection->agreed_pro_version >= 90) {
2915             unsigned long now = jiffies;
2916             int i;
2917             device->ov_start_sector = sector;
2918             device->ov_position = sector;
2919             device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2920             device->rs_total = device->ov_left;
2921             for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2922                 device->rs_mark_left[i] = device->ov_left;
2923                 device->rs_mark_time[i] = now;
2924             }
2925             drbd_info(device, "Online Verify start sector: %llu\n",
2926                     (unsigned long long)sector);
2927         }
2928         peer_req->w.cb = w_e_end_ov_req;
2929         fault_type = DRBD_FAULT_RS_RD;
2930         break;
2931 
2932     default:
2933         BUG();
2934     }
2935 
2936     /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2937      * wrt the receiver, but it is not as straightforward as it may seem.
2938      * Various places in the resync start and stop logic assume resync
2939      * requests are processed in order, requeuing this on the worker thread
2940      * introduces a bunch of new code for synchronization between threads.
2941      *
2942      * Unlimited throttling before drbd_rs_begin_io may stall the resync
2943      * "forever", throttling after drbd_rs_begin_io will lock that extent
2944      * for application writes for the same time.  For now, just throttle
2945      * here, where the rest of the code expects the receiver to sleep for
2946      * a while, anyways.
2947      */
2948 
2949     /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2950      * this defers syncer requests for some time, before letting at least
2951      * on request through.  The resync controller on the receiving side
2952      * will adapt to the incoming rate accordingly.
2953      *
2954      * We cannot throttle here if remote is Primary/SyncTarget:
2955      * we would also throttle its application reads.
2956      * In that case, throttling is done on the SyncTarget only.
2957      */
2958 
2959     /* Even though this may be a resync request, we do add to "read_ee";
2960      * "sync_ee" is only used for resync WRITEs.
2961      * Add to list early, so debugfs can find this request
2962      * even if we have to sleep below. */
2963     spin_lock_irq(&device->resource->req_lock);
2964     list_add_tail(&peer_req->w.list, &device->read_ee);
2965     spin_unlock_irq(&device->resource->req_lock);
2966 
2967     update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2968     if (device->state.peer != R_PRIMARY
2969     && drbd_rs_should_slow_down(device, sector, false))
2970         schedule_timeout_uninterruptible(HZ/10);
2971     update_receiver_timing_details(connection, drbd_rs_begin_io);
2972     if (drbd_rs_begin_io(device, sector))
2973         goto out_free_e;
2974 
2975 submit_for_resync:
2976     atomic_add(size >> 9, &device->rs_sect_ev);
2977 
2978 submit:
2979     update_receiver_timing_details(connection, drbd_submit_peer_request);
2980     inc_unacked(device);
2981     if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ,
2982                      fault_type) == 0)
2983         return 0;
2984 
2985     /* don't care for the reason here */
2986     drbd_err(device, "submit failed, triggering re-connect\n");
2987 
2988 out_free_e:
2989     spin_lock_irq(&device->resource->req_lock);
2990     list_del(&peer_req->w.list);
2991     spin_unlock_irq(&device->resource->req_lock);
2992     /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2993 
2994     put_ldev(device);
2995     drbd_free_peer_req(device, peer_req);
2996     return -EIO;
2997 }
2998 
2999 /*
3000  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
3001  */
3002 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
3003 {
3004     struct drbd_device *device = peer_device->device;
3005     int self, peer, rv = -100;
3006     unsigned long ch_self, ch_peer;
3007     enum drbd_after_sb_p after_sb_0p;
3008 
3009     self = device->ldev->md.uuid[UI_BITMAP] & 1;
3010     peer = device->p_uuid[UI_BITMAP] & 1;
3011 
3012     ch_peer = device->p_uuid[UI_SIZE];
3013     ch_self = device->comm_bm_set;
3014 
3015     rcu_read_lock();
3016     after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
3017     rcu_read_unlock();
3018     switch (after_sb_0p) {
3019     case ASB_CONSENSUS:
3020     case ASB_DISCARD_SECONDARY:
3021     case ASB_CALL_HELPER:
3022     case ASB_VIOLENTLY:
3023         drbd_err(device, "Configuration error.\n");
3024         break;
3025     case ASB_DISCONNECT:
3026         break;
3027     case ASB_DISCARD_YOUNGER_PRI:
3028         if (self == 0 && peer == 1) {
3029             rv = -1;
3030             break;
3031         }
3032         if (self == 1 && peer == 0) {
3033             rv =  1;
3034             break;
3035         }
3036         fallthrough;    /* to one of the other strategies */
3037     case ASB_DISCARD_OLDER_PRI:
3038         if (self == 0 && peer == 1) {
3039             rv = 1;
3040             break;
3041         }
3042         if (self == 1 && peer == 0) {
3043             rv = -1;
3044             break;
3045         }
3046         /* Else fall through to one of the other strategies... */
3047         drbd_warn(device, "Discard younger/older primary did not find a decision\n"
3048              "Using discard-least-changes instead\n");
3049         fallthrough;
3050     case ASB_DISCARD_ZERO_CHG:
3051         if (ch_peer == 0 && ch_self == 0) {
3052             rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3053                 ? -1 : 1;
3054             break;
3055         } else {
3056             if (ch_peer == 0) { rv =  1; break; }
3057             if (ch_self == 0) { rv = -1; break; }
3058         }
3059         if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
3060             break;
3061         fallthrough;
3062     case ASB_DISCARD_LEAST_CHG:
3063         if  (ch_self < ch_peer)
3064             rv = -1;
3065         else if (ch_self > ch_peer)
3066             rv =  1;
3067         else /* ( ch_self == ch_peer ) */
3068              /* Well, then use something else. */
3069             rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3070                 ? -1 : 1;
3071         break;
3072     case ASB_DISCARD_LOCAL:
3073         rv = -1;
3074         break;
3075     case ASB_DISCARD_REMOTE:
3076         rv =  1;
3077     }
3078 
3079     return rv;
3080 }
3081 
3082 /*
3083  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3084  */
3085 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3086 {
3087     struct drbd_device *device = peer_device->device;
3088     int hg, rv = -100;
3089     enum drbd_after_sb_p after_sb_1p;
3090 
3091     rcu_read_lock();
3092     after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3093     rcu_read_unlock();
3094     switch (after_sb_1p) {
3095     case ASB_DISCARD_YOUNGER_PRI:
3096     case ASB_DISCARD_OLDER_PRI:
3097     case ASB_DISCARD_LEAST_CHG:
3098     case ASB_DISCARD_LOCAL:
3099     case ASB_DISCARD_REMOTE:
3100     case ASB_DISCARD_ZERO_CHG:
3101         drbd_err(device, "Configuration error.\n");
3102         break;
3103     case ASB_DISCONNECT:
3104         break;
3105     case ASB_CONSENSUS:
3106         hg = drbd_asb_recover_0p(peer_device);
3107         if (hg == -1 && device->state.role == R_SECONDARY)
3108             rv = hg;
3109         if (hg == 1  && device->state.role == R_PRIMARY)
3110             rv = hg;
3111         break;
3112     case ASB_VIOLENTLY:
3113         rv = drbd_asb_recover_0p(peer_device);
3114         break;
3115     case ASB_DISCARD_SECONDARY:
3116         return device->state.role == R_PRIMARY ? 1 : -1;
3117     case ASB_CALL_HELPER:
3118         hg = drbd_asb_recover_0p(peer_device);
3119         if (hg == -1 && device->state.role == R_PRIMARY) {
3120             enum drbd_state_rv rv2;
3121 
3122              /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3123               * we might be here in C_WF_REPORT_PARAMS which is transient.
3124               * we do not need to wait for the after state change work either. */
3125             rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3126             if (rv2 != SS_SUCCESS) {
3127                 drbd_khelper(device, "pri-lost-after-sb");
3128             } else {
3129                 drbd_warn(device, "Successfully gave up primary role.\n");
3130                 rv = hg;
3131             }
3132         } else
3133             rv = hg;
3134     }
3135 
3136     return rv;
3137 }
3138 
3139 /*
3140  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3141  */
3142 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3143 {
3144     struct drbd_device *device = peer_device->device;
3145     int hg, rv = -100;
3146     enum drbd_after_sb_p after_sb_2p;
3147 
3148     rcu_read_lock();
3149     after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3150     rcu_read_unlock();
3151     switch (after_sb_2p) {
3152     case ASB_DISCARD_YOUNGER_PRI:
3153     case ASB_DISCARD_OLDER_PRI:
3154     case ASB_DISCARD_LEAST_CHG:
3155     case ASB_DISCARD_LOCAL:
3156     case ASB_DISCARD_REMOTE:
3157     case ASB_CONSENSUS:
3158     case ASB_DISCARD_SECONDARY:
3159     case ASB_DISCARD_ZERO_CHG:
3160         drbd_err(device, "Configuration error.\n");
3161         break;
3162     case ASB_VIOLENTLY:
3163         rv = drbd_asb_recover_0p(peer_device);
3164         break;
3165     case ASB_DISCONNECT:
3166         break;
3167     case ASB_CALL_HELPER:
3168         hg = drbd_asb_recover_0p(peer_device);
3169         if (hg == -1) {
3170             enum drbd_state_rv rv2;
3171 
3172              /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3173               * we might be here in C_WF_REPORT_PARAMS which is transient.
3174               * we do not need to wait for the after state change work either. */
3175             rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3176             if (rv2 != SS_SUCCESS) {
3177                 drbd_khelper(device, "pri-lost-after-sb");
3178             } else {
3179                 drbd_warn(device, "Successfully gave up primary role.\n");
3180                 rv = hg;
3181             }
3182         } else
3183             rv = hg;
3184     }
3185 
3186     return rv;
3187 }
3188 
3189 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3190                u64 bits, u64 flags)
3191 {
3192     if (!uuid) {
3193         drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3194         return;
3195     }
3196     drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3197          text,
3198          (unsigned long long)uuid[UI_CURRENT],
3199          (unsigned long long)uuid[UI_BITMAP],
3200          (unsigned long long)uuid[UI_HISTORY_START],
3201          (unsigned long long)uuid[UI_HISTORY_END],
3202          (unsigned long long)bits,
3203          (unsigned long long)flags);
3204 }
3205 
3206 /*
3207   100   after split brain try auto recover
3208     2   C_SYNC_SOURCE set BitMap
3209     1   C_SYNC_SOURCE use BitMap
3210     0   no Sync
3211    -1   C_SYNC_TARGET use BitMap
3212    -2   C_SYNC_TARGET set BitMap
3213  -100   after split brain, disconnect
3214 -1000   unrelated data
3215 -1091   requires proto 91
3216 -1096   requires proto 96
3217  */
3218 
3219 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3220 {
3221     struct drbd_peer_device *const peer_device = first_peer_device(device);
3222     struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3223     u64 self, peer;
3224     int i, j;
3225 
3226     self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3227     peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3228 
3229     *rule_nr = 10;
3230     if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3231         return 0;
3232 
3233     *rule_nr = 20;
3234     if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3235          peer != UUID_JUST_CREATED)
3236         return -2;
3237 
3238     *rule_nr = 30;
3239     if (self != UUID_JUST_CREATED &&
3240         (peer == UUID_JUST_CREATED || peer == (u64)0))
3241         return 2;
3242 
3243     if (self == peer) {
3244         int rct, dc; /* roles at crash time */
3245 
3246         if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3247 
3248             if (connection->agreed_pro_version < 91)
3249                 return -1091;
3250 
3251             if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3252                 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3253                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3254                 drbd_uuid_move_history(device);
3255                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3256                 device->ldev->md.uuid[UI_BITMAP] = 0;
3257 
3258                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3259                            device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3260                 *rule_nr = 34;
3261             } else {
3262                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3263                 *rule_nr = 36;
3264             }
3265 
3266             return 1;
3267         }
3268 
3269         if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3270 
3271             if (connection->agreed_pro_version < 91)
3272                 return -1091;
3273 
3274             if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3275                 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3276                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3277 
3278                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3279                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3280                 device->p_uuid[UI_BITMAP] = 0UL;
3281 
3282                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3283                 *rule_nr = 35;
3284             } else {
3285                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3286                 *rule_nr = 37;
3287             }
3288 
3289             return -1;
3290         }
3291 
3292         /* Common power [off|failure] */
3293         rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3294             (device->p_uuid[UI_FLAGS] & 2);
3295         /* lowest bit is set when we were primary,
3296          * next bit (weight 2) is set when peer was primary */
3297         *rule_nr = 40;
3298 
3299         /* Neither has the "crashed primary" flag set,
3300          * only a replication link hickup. */
3301         if (rct == 0)
3302             return 0;
3303 
3304         /* Current UUID equal and no bitmap uuid; does not necessarily
3305          * mean this was a "simultaneous hard crash", maybe IO was
3306          * frozen, so no UUID-bump happened.
3307          * This is a protocol change, overload DRBD_FF_WSAME as flag
3308          * for "new-enough" peer DRBD version. */
3309         if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3310             *rule_nr = 41;
3311             if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3312                 drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3313                 return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3314             }
3315             if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3316                 /* At least one has the "crashed primary" bit set,
3317                  * both are primary now, but neither has rotated its UUIDs?
3318                  * "Can not happen." */
3319                 drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3320                 return -100;
3321             }
3322             if (device->state.role == R_PRIMARY)
3323                 return 1;
3324             return -1;
3325         }
3326 
3327         /* Both are secondary.
3328          * Really looks like recovery from simultaneous hard crash.
3329          * Check which had been primary before, and arbitrate. */
3330         switch (rct) {
3331         case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3332         case 1: /*  self_pri && !peer_pri */ return 1;
3333         case 2: /* !self_pri &&  peer_pri */ return -1;
3334         case 3: /*  self_pri &&  peer_pri */
3335             dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3336             return dc ? -1 : 1;
3337         }
3338     }
3339 
3340     *rule_nr = 50;
3341     peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3342     if (self == peer)
3343         return -1;
3344 
3345     *rule_nr = 51;
3346     peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3347     if (self == peer) {
3348         if (connection->agreed_pro_version < 96 ?
3349             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3350             (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3351             peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3352             /* The last P_SYNC_UUID did not get though. Undo the last start of
3353                resync as sync source modifications of the peer's UUIDs. */
3354 
3355             if (connection->agreed_pro_version < 91)
3356                 return -1091;
3357 
3358             device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3359             device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3360 
3361             drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3362             drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3363 
3364             return -1;
3365         }
3366     }
3367 
3368     *rule_nr = 60;
3369     self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3370     for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3371         peer = device->p_uuid[i] & ~((u64)1);
3372         if (self == peer)
3373             return -2;
3374     }
3375 
3376     *rule_nr = 70;
3377     self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3378     peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3379     if (self == peer)
3380         return 1;
3381 
3382     *rule_nr = 71;
3383     self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3384     if (self == peer) {
3385         if (connection->agreed_pro_version < 96 ?
3386             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3387             (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3388             self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3389             /* The last P_SYNC_UUID did not get though. Undo the last start of
3390                resync as sync source modifications of our UUIDs. */
3391 
3392             if (connection->agreed_pro_version < 91)
3393                 return -1091;
3394 
3395             __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3396             __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3397 
3398             drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3399             drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3400                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3401 
3402             return 1;
3403         }
3404     }
3405 
3406 
3407     *rule_nr = 80;
3408     peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3409     for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3410         self = device->ldev->md.uuid[i] & ~((u64)1);
3411         if (self == peer)
3412             return 2;
3413     }
3414 
3415     *rule_nr = 90;
3416     self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3417     peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3418     if (self == peer && self != ((u64)0))
3419         return 100;
3420 
3421     *rule_nr = 100;
3422     for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3423         self = device->ldev->md.uuid[i] & ~((u64)1);
3424         for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3425             peer = device->p_uuid[j] & ~((u64)1);
3426             if (self == peer)
3427                 return -100;
3428         }
3429     }
3430 
3431     return -1000;
3432 }
3433 
3434 /* drbd_sync_handshake() returns the new conn state on success, or
3435    CONN_MASK (-1) on failure.
3436  */
3437 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3438                        enum drbd_role peer_role,
3439                        enum drbd_disk_state peer_disk) __must_hold(local)
3440 {
3441     struct drbd_device *device = peer_device->device;
3442     enum drbd_conns rv = C_MASK;
3443     enum drbd_disk_state mydisk;
3444     struct net_conf *nc;
3445     int hg, rule_nr, rr_conflict, tentative, always_asbp;
3446 
3447     mydisk = device->state.disk;
3448     if (mydisk == D_NEGOTIATING)
3449         mydisk = device->new_state_tmp.disk;
3450 
3451     drbd_info(device, "drbd_sync_handshake:\n");
3452 
3453     spin_lock_irq(&device->ldev->md.uuid_lock);
3454     drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3455     drbd_uuid_dump(device, "peer", device->p_uuid,
3456                device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3457 
3458     hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3459     spin_unlock_irq(&device->ldev->md.uuid_lock);
3460 
3461     drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3462 
3463     if (hg == -1000) {
3464         drbd_alert(device, "Unrelated data, aborting!\n");
3465         return C_MASK;
3466     }
3467     if (hg < -0x10000) {
3468         int proto, fflags;
3469         hg = -hg;
3470         proto = hg & 0xff;
3471         fflags = (hg >> 8) & 0xff;
3472         drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3473                     proto, fflags);
3474         return C_MASK;
3475     }
3476     if (hg < -1000) {
3477         drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3478         return C_MASK;
3479     }
3480 
3481     if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3482         (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3483         int f = (hg == -100) || abs(hg) == 2;
3484         hg = mydisk > D_INCONSISTENT ? 1 : -1;
3485         if (f)
3486             hg = hg*2;
3487         drbd_info(device, "Becoming sync %s due to disk states.\n",
3488              hg > 0 ? "source" : "target");
3489     }
3490 
3491     if (abs(hg) == 100)
3492         drbd_khelper(device, "initial-split-brain");
3493 
3494     rcu_read_lock();
3495     nc = rcu_dereference(peer_device->connection->net_conf);
3496     always_asbp = nc->always_asbp;
3497     rr_conflict = nc->rr_conflict;
3498     tentative = nc->tentative;
3499     rcu_read_unlock();
3500 
3501     if (hg == 100 || (hg == -100 && always_asbp)) {
3502         int pcount = (device->state.role == R_PRIMARY)
3503                + (peer_role == R_PRIMARY);
3504         int forced = (hg == -100);
3505 
3506         switch (pcount) {
3507         case 0:
3508             hg = drbd_asb_recover_0p(peer_device);
3509             break;
3510         case 1:
3511             hg = drbd_asb_recover_1p(peer_device);
3512             break;
3513         case 2:
3514             hg = drbd_asb_recover_2p(peer_device);
3515             break;
3516         }
3517         if (abs(hg) < 100) {
3518             drbd_warn(device, "Split-Brain detected, %d primaries, "
3519                  "automatically solved. Sync from %s node\n",
3520                  pcount, (hg < 0) ? "peer" : "this");
3521             if (forced) {
3522                 drbd_warn(device, "Doing a full sync, since"
3523                      " UUIDs where ambiguous.\n");
3524                 hg = hg*2;
3525             }
3526         }
3527     }
3528 
3529     if (hg == -100) {
3530         if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3531             hg = -1;
3532         if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3533             hg = 1;
3534 
3535         if (abs(hg) < 100)
3536             drbd_warn(device, "Split-Brain detected, manually solved. "
3537                  "Sync from %s node\n",
3538                  (hg < 0) ? "peer" : "this");
3539     }
3540 
3541     if (hg == -100) {
3542         /* FIXME this log message is not correct if we end up here
3543          * after an attempted attach on a diskless node.
3544          * We just refuse to attach -- well, we drop the "connection"
3545          * to that disk, in a way... */
3546         drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3547         drbd_khelper(device, "split-brain");
3548         return C_MASK;
3549     }
3550 
3551     if (hg > 0 && mydisk <= D_INCONSISTENT) {
3552         drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3553         return C_MASK;
3554     }
3555 
3556     if (hg < 0 && /* by intention we do not use mydisk here. */
3557         device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3558         switch (rr_conflict) {
3559         case ASB_CALL_HELPER:
3560             drbd_khelper(device, "pri-lost");
3561             fallthrough;
3562         case ASB_DISCONNECT:
3563             drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3564             return C_MASK;
3565         case ASB_VIOLENTLY:
3566             drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3567                  "assumption\n");
3568         }
3569     }
3570 
3571     if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3572         if (hg == 0)
3573             drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3574         else
3575             drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3576                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3577                  abs(hg) >= 2 ? "full" : "bit-map based");
3578         return C_MASK;
3579     }
3580 
3581     if (abs(hg) >= 2) {
3582         drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3583         if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3584                     BM_LOCKED_SET_ALLOWED))
3585             return C_MASK;
3586     }
3587 
3588     if (hg > 0) { /* become sync source. */
3589         rv = C_WF_BITMAP_S;
3590     } else if (hg < 0) { /* become sync target */
3591         rv = C_WF_BITMAP_T;
3592     } else {
3593         rv = C_CONNECTED;
3594         if (drbd_bm_total_weight(device)) {
3595             drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3596                  drbd_bm_total_weight(device));
3597         }
3598     }
3599 
3600     return rv;
3601 }
3602 
3603 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3604 {
3605     /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3606     if (peer == ASB_DISCARD_REMOTE)
3607         return ASB_DISCARD_LOCAL;
3608 
3609     /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3610     if (peer == ASB_DISCARD_LOCAL)
3611         return ASB_DISCARD_REMOTE;
3612 
3613     /* everything else is valid if they are equal on both sides. */
3614     return peer;
3615 }
3616 
3617 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3618 {
3619     struct p_protocol *p = pi->data;
3620     enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3621     int p_proto, p_discard_my_data, p_two_primaries, cf;
3622     struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3623     char integrity_alg[SHARED_SECRET_MAX] = "";
3624     struct crypto_shash *peer_integrity_tfm = NULL;
3625     void *int_dig_in = NULL, *int_dig_vv = NULL;
3626 
3627     p_proto     = be32_to_cpu(p->protocol);
3628     p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3629     p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3630     p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3631     p_two_primaries = be32_to_cpu(p->two_primaries);
3632     cf      = be32_to_cpu(p->conn_flags);
3633     p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3634 
3635     if (connection->agreed_pro_version >= 87) {
3636         int err;
3637 
3638         if (pi->size > sizeof(integrity_alg))
3639             return -EIO;
3640         err = drbd_recv_all(connection, integrity_alg, pi->size);
3641         if (err)
3642             return err;
3643         integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3644     }
3645 
3646     if (pi->cmd != P_PROTOCOL_UPDATE) {
3647         clear_bit(CONN_DRY_RUN, &connection->flags);
3648 
3649         if (cf & CF_DRY_RUN)
3650             set_bit(CONN_DRY_RUN, &connection->flags);
3651 
3652         rcu_read_lock();
3653         nc = rcu_dereference(connection->net_conf);
3654 
3655         if (p_proto != nc->wire_protocol) {
3656             drbd_err(connection, "incompatible %s settings\n", "protocol");
3657             goto disconnect_rcu_unlock;
3658         }
3659 
3660         if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3661             drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3662             goto disconnect_rcu_unlock;
3663         }
3664 
3665         if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3666             drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3667             goto disconnect_rcu_unlock;
3668         }
3669 
3670         if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3671             drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3672             goto disconnect_rcu_unlock;
3673         }
3674 
3675         if (p_discard_my_data && nc->discard_my_data) {
3676             drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3677             goto disconnect_rcu_unlock;
3678         }
3679 
3680         if (p_two_primaries != nc->two_primaries) {
3681             drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3682             goto disconnect_rcu_unlock;
3683         }
3684 
3685         if (strcmp(integrity_alg, nc->integrity_alg)) {
3686             drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3687             goto disconnect_rcu_unlock;
3688         }
3689 
3690         rcu_read_unlock();
3691     }
3692 
3693     if (integrity_alg[0]) {
3694         int hash_size;
3695 
3696         /*
3697          * We can only change the peer data integrity algorithm
3698          * here.  Changing our own data integrity algorithm
3699          * requires that we send a P_PROTOCOL_UPDATE packet at
3700          * the same time; otherwise, the peer has no way to
3701          * tell between which packets the algorithm should
3702          * change.
3703          */
3704 
3705         peer_integrity_tfm = crypto_alloc_shash(integrity_alg, 0, 0);
3706         if (IS_ERR(peer_integrity_tfm)) {
3707             peer_integrity_tfm = NULL;
3708             drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3709                  integrity_alg);
3710             goto disconnect;
3711         }
3712 
3713         hash_size = crypto_shash_digestsize(peer_integrity_tfm);
3714         int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3715         int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3716         if (!(int_dig_in && int_dig_vv)) {
3717             drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3718             goto disconnect;
3719         }
3720     }
3721 
3722     new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3723     if (!new_net_conf)
3724         goto disconnect;
3725 
3726     mutex_lock(&connection->data.mutex);
3727     mutex_lock(&connection->resource->conf_update);
3728     old_net_conf = connection->net_conf;
3729     *new_net_conf = *old_net_conf;
3730 
3731     new_net_conf->wire_protocol = p_proto;
3732     new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3733     new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3734     new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3735     new_net_conf->two_primaries = p_two_primaries;
3736 
3737     rcu_assign_pointer(connection->net_conf, new_net_conf);
3738     mutex_unlock(&connection->resource->conf_update);
3739     mutex_unlock(&connection->data.mutex);
3740 
3741     crypto_free_shash(connection->peer_integrity_tfm);
3742     kfree(connection->int_dig_in);
3743     kfree(connection->int_dig_vv);
3744     connection->peer_integrity_tfm = peer_integrity_tfm;
3745     connection->int_dig_in = int_dig_in;
3746     connection->int_dig_vv = int_dig_vv;
3747 
3748     if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3749         drbd_info(connection, "peer data-integrity-alg: %s\n",
3750               integrity_alg[0] ? integrity_alg : "(none)");
3751 
3752     kvfree_rcu(old_net_conf);
3753     return 0;
3754 
3755 disconnect_rcu_unlock:
3756     rcu_read_unlock();
3757 disconnect:
3758     crypto_free_shash(peer_integrity_tfm);
3759     kfree(int_dig_in);
3760     kfree(int_dig_vv);
3761     conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3762     return -EIO;
3763 }
3764 
3765 /* helper function
3766  * input: alg name, feature name
3767  * return: NULL (alg name was "")
3768  *         ERR_PTR(error) if something goes wrong
3769  *         or the crypto hash ptr, if it worked out ok. */
3770 static struct crypto_shash *drbd_crypto_alloc_digest_safe(
3771         const struct drbd_device *device,
3772         const char *alg, const char *name)
3773 {
3774     struct crypto_shash *tfm;
3775 
3776     if (!alg[0])
3777         return NULL;
3778 
3779     tfm = crypto_alloc_shash(alg, 0, 0);
3780     if (IS_ERR(tfm)) {
3781         drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3782             alg, name, PTR_ERR(tfm));
3783         return tfm;
3784     }
3785     return tfm;
3786 }
3787 
3788 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3789 {
3790     void *buffer = connection->data.rbuf;
3791     int size = pi->size;
3792 
3793     while (size) {
3794         int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3795         s = drbd_recv(connection, buffer, s);
3796         if (s <= 0) {
3797             if (s < 0)
3798                 return s;
3799             break;
3800         }
3801         size -= s;
3802     }
3803     if (size)
3804         return -EIO;
3805     return 0;
3806 }
3807 
3808 /*
3809  * config_unknown_volume  -  device configuration command for unknown volume
3810  *
3811  * When a device is added to an existing connection, the node on which the
3812  * device is added first will send configuration commands to its peer but the
3813  * peer will not know about the device yet.  It will warn and ignore these
3814  * commands.  Once the device is added on the second node, the second node will
3815  * send the same device configuration commands, but in the other direction.
3816  *
3817  * (We can also end up here if drbd is misconfigured.)
3818  */
3819 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3820 {
3821     drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3822           cmdname(pi->cmd), pi->vnr);
3823     return ignore_remaining_packet(connection, pi);
3824 }
3825 
3826 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3827 {
3828     struct drbd_peer_device *peer_device;
3829     struct drbd_device *device;
3830     struct p_rs_param_95 *p;
3831     unsigned int header_size, data_size, exp_max_sz;
3832     struct crypto_shash *verify_tfm = NULL;
3833     struct crypto_shash *csums_tfm = NULL;
3834     struct net_conf *old_net_conf, *new_net_conf = NULL;
3835     struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3836     const int apv = connection->agreed_pro_version;
3837     struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3838     unsigned int fifo_size = 0;
3839     int err;
3840 
3841     peer_device = conn_peer_device(connection, pi->vnr);
3842     if (!peer_device)
3843         return config_unknown_volume(connection, pi);
3844     device = peer_device->device;
3845 
3846     exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3847             : apv == 88 ? sizeof(struct p_rs_param)
3848                     + SHARED_SECRET_MAX
3849             : apv <= 94 ? sizeof(struct p_rs_param_89)
3850             : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3851 
3852     if (pi->size > exp_max_sz) {
3853         drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3854             pi->size, exp_max_sz);
3855         return -EIO;
3856     }
3857 
3858     if (apv <= 88) {
3859         header_size = sizeof(struct p_rs_param);
3860         data_size = pi->size - header_size;
3861     } else if (apv <= 94) {
3862         header_size = sizeof(struct p_rs_param_89);
3863         data_size = pi->size - header_size;
3864         D_ASSERT(device, data_size == 0);
3865     } else {
3866         header_size = sizeof(struct p_rs_param_95);
3867         data_size = pi->size - header_size;
3868         D_ASSERT(device, data_size == 0);
3869     }
3870 
3871     /* initialize verify_alg and csums_alg */
3872     p = pi->data;
3873     BUILD_BUG_ON(sizeof(p->algs) != 2 * SHARED_SECRET_MAX);
3874     memset(&p->algs, 0, sizeof(p->algs));
3875 
3876     err = drbd_recv_all(peer_device->connection, p, header_size);
3877     if (err)
3878         return err;
3879 
3880     mutex_lock(&connection->resource->conf_update);
3881     old_net_conf = peer_device->connection->net_conf;
3882     if (get_ldev(device)) {
3883         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3884         if (!new_disk_conf) {
3885             put_ldev(device);
3886             mutex_unlock(&connection->resource->conf_update);
3887             drbd_err(device, "Allocation of new disk_conf failed\n");
3888             return -ENOMEM;
3889         }
3890 
3891         old_disk_conf = device->ldev->disk_conf;
3892         *new_disk_conf = *old_disk_conf;
3893 
3894         new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3895     }
3896 
3897     if (apv >= 88) {
3898         if (apv == 88) {
3899             if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3900                 drbd_err(device, "verify-alg of wrong size, "
3901                     "peer wants %u, accepting only up to %u byte\n",
3902                     data_size, SHARED_SECRET_MAX);
3903                 goto reconnect;
3904             }
3905 
3906             err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3907             if (err)
3908                 goto reconnect;
3909             /* we expect NUL terminated string */
3910             /* but just in case someone tries to be evil */
3911             D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3912             p->verify_alg[data_size-1] = 0;
3913 
3914         } else /* apv >= 89 */ {
3915             /* we still expect NUL terminated strings */
3916             /* but just in case someone tries to be evil */
3917             D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3918             D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3919             p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3920             p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3921         }
3922 
3923         if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3924             if (device->state.conn == C_WF_REPORT_PARAMS) {
3925                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3926                     old_net_conf->verify_alg, p->verify_alg);
3927                 goto disconnect;
3928             }
3929             verify_tfm = drbd_crypto_alloc_digest_safe(device,
3930                     p->verify_alg, "verify-alg");
3931             if (IS_ERR(verify_tfm)) {
3932                 verify_tfm = NULL;
3933                 goto disconnect;
3934             }
3935         }
3936 
3937         if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3938             if (device->state.conn == C_WF_REPORT_PARAMS) {
3939                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3940                     old_net_conf->csums_alg, p->csums_alg);
3941                 goto disconnect;
3942             }
3943             csums_tfm = drbd_crypto_alloc_digest_safe(device,
3944                     p->csums_alg, "csums-alg");
3945             if (IS_ERR(csums_tfm)) {
3946                 csums_tfm = NULL;
3947                 goto disconnect;
3948             }
3949         }
3950 
3951         if (apv > 94 && new_disk_conf) {
3952             new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3953             new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3954             new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3955             new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3956 
3957             fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3958             if (fifo_size != device->rs_plan_s->size) {
3959                 new_plan = fifo_alloc(fifo_size);
3960                 if (!new_plan) {
3961                     drbd_err(device, "kmalloc of fifo_buffer failed");
3962                     put_ldev(device);
3963                     goto disconnect;
3964                 }
3965             }
3966         }
3967 
3968         if (verify_tfm || csums_tfm) {
3969             new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3970             if (!new_net_conf)
3971                 goto disconnect;
3972 
3973             *new_net_conf = *old_net_conf;
3974 
3975             if (verify_tfm) {
3976                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3977                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3978                 crypto_free_shash(peer_device->connection->verify_tfm);
3979                 peer_device->connection->verify_tfm = verify_tfm;
3980                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3981             }
3982             if (csums_tfm) {
3983                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3984                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3985                 crypto_free_shash(peer_device->connection->csums_tfm);
3986                 peer_device->connection->csums_tfm = csums_tfm;
3987                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3988             }
3989             rcu_assign_pointer(connection->net_conf, new_net_conf);
3990         }
3991     }
3992 
3993     if (new_disk_conf) {
3994         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3995         put_ldev(device);
3996     }
3997 
3998     if (new_plan) {
3999         old_plan = device->rs_plan_s;
4000         rcu_assign_pointer(device->rs_plan_s, new_plan);
4001     }
4002 
4003     mutex_unlock(&connection->resource->conf_update);
4004     synchronize_rcu();
4005     if (new_net_conf)
4006         kfree(old_net_conf);
4007     kfree(old_disk_conf);
4008     kfree(old_plan);
4009 
4010     return 0;
4011 
4012 reconnect:
4013     if (new_disk_conf) {
4014         put_ldev(device);
4015         kfree(new_disk_conf);
4016     }
4017     mutex_unlock(&connection->resource->conf_update);
4018     return -EIO;
4019 
4020 disconnect:
4021     kfree(new_plan);
4022     if (new_disk_conf) {
4023         put_ldev(device);
4024         kfree(new_disk_conf);
4025     }
4026     mutex_unlock(&connection->resource->conf_update);
4027     /* just for completeness: actually not needed,
4028      * as this is not reached if csums_tfm was ok. */
4029     crypto_free_shash(csums_tfm);
4030     /* but free the verify_tfm again, if csums_tfm did not work out */
4031     crypto_free_shash(verify_tfm);
4032     conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4033     return -EIO;
4034 }
4035 
4036 /* warn if the arguments differ by more than 12.5% */
4037 static void warn_if_differ_considerably(struct drbd_device *device,
4038     const char *s, sector_t a, sector_t b)
4039 {
4040     sector_t d;
4041     if (a == 0 || b == 0)
4042         return;
4043     d = (a > b) ? (a - b) : (b - a);
4044     if (d > (a>>3) || d > (b>>3))
4045         drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
4046              (unsigned long long)a, (unsigned long long)b);
4047 }
4048 
4049 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
4050 {
4051     struct drbd_peer_device *peer_device;
4052     struct drbd_device *device;
4053     struct p_sizes *p = pi->data;
4054     struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
4055     enum determine_dev_size dd = DS_UNCHANGED;
4056     sector_t p_size, p_usize, p_csize, my_usize;
4057     sector_t new_size, cur_size;
4058     int ldsc = 0; /* local disk size changed */
4059     enum dds_flags ddsf;
4060 
4061     peer_device = conn_peer_device(connection, pi->vnr);
4062     if (!peer_device)
4063         return config_unknown_volume(connection, pi);
4064     device = peer_device->device;
4065     cur_size = get_capacity(device->vdisk);
4066 
4067     p_size = be64_to_cpu(p->d_size);
4068     p_usize = be64_to_cpu(p->u_size);
4069     p_csize = be64_to_cpu(p->c_size);
4070 
4071     /* just store the peer's disk size for now.
4072      * we still need to figure out whether we accept that. */
4073     device->p_size = p_size;
4074 
4075     if (get_ldev(device)) {
4076         rcu_read_lock();
4077         my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4078         rcu_read_unlock();
4079 
4080         warn_if_differ_considerably(device, "lower level device sizes",
4081                p_size, drbd_get_max_capacity(device->ldev));
4082         warn_if_differ_considerably(device, "user requested size",
4083                         p_usize, my_usize);
4084 
4085         /* if this is the first connect, or an otherwise expected
4086          * param exchange, choose the minimum */
4087         if (device->state.conn == C_WF_REPORT_PARAMS)
4088             p_usize = min_not_zero(my_usize, p_usize);
4089 
4090         /* Never shrink a device with usable data during connect,
4091          * or "attach" on the peer.
4092          * But allow online shrinking if we are connected. */
4093         new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4094         if (new_size < cur_size &&
4095             device->state.disk >= D_OUTDATED &&
4096             (device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS)) {
4097             drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4098                     (unsigned long long)new_size, (unsigned long long)cur_size);
4099             conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4100             put_ldev(device);
4101             return -EIO;
4102         }
4103 
4104         if (my_usize != p_usize) {
4105             struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4106 
4107             new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4108             if (!new_disk_conf) {
4109                 put_ldev(device);
4110                 return -ENOMEM;
4111             }
4112 
4113             mutex_lock(&connection->resource->conf_update);
4114             old_disk_conf = device->ldev->disk_conf;
4115             *new_disk_conf = *old_disk_conf;
4116             new_disk_conf->disk_size = p_usize;
4117 
4118             rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4119             mutex_unlock(&connection->resource->conf_update);
4120             kvfree_rcu(old_disk_conf);
4121 
4122             drbd_info(device, "Peer sets u_size to %lu sectors (old: %lu)\n",
4123                  (unsigned long)p_usize, (unsigned long)my_usize);
4124         }
4125 
4126         put_ldev(device);
4127     }
4128 
4129     device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4130     /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4131        In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4132        drbd_reconsider_queue_parameters(), we can be sure that after
4133        drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4134 
4135     ddsf = be16_to_cpu(p->dds_flags);
4136     if (get_ldev(device)) {
4137         drbd_reconsider_queue_parameters(device, device->ldev, o);
4138         dd = drbd_determine_dev_size(device, ddsf, NULL);
4139         put_ldev(device);
4140         if (dd == DS_ERROR)
4141             return -EIO;
4142         drbd_md_sync(device);
4143     } else {
4144         /*
4145          * I am diskless, need to accept the peer's *current* size.
4146          * I must NOT accept the peers backing disk size,
4147          * it may have been larger than mine all along...
4148          *
4149          * At this point, the peer knows more about my disk, or at
4150          * least about what we last agreed upon, than myself.
4151          * So if his c_size is less than his d_size, the most likely
4152          * reason is that *my* d_size was smaller last time we checked.
4153          *
4154          * However, if he sends a zero current size,
4155          * take his (user-capped or) backing disk size anyways.
4156          *
4157          * Unless of course he does not have a disk himself.
4158          * In which case we ignore this completely.
4159          */
4160         sector_t new_size = p_csize ?: p_usize ?: p_size;
4161         drbd_reconsider_queue_parameters(device, NULL, o);
4162         if (new_size == 0) {
4163             /* Ignore, peer does not know nothing. */
4164         } else if (new_size == cur_size) {
4165             /* nothing to do */
4166         } else if (cur_size != 0 && p_size == 0) {
4167             drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
4168                     (unsigned long long)new_size, (unsigned long long)cur_size);
4169         } else if (new_size < cur_size && device->state.role == R_PRIMARY) {
4170             drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
4171                     (unsigned long long)new_size, (unsigned long long)cur_size);
4172             conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4173             return -EIO;
4174         } else {
4175             /* I believe the peer, if
4176              *  - I don't have a current size myself
4177              *  - we agree on the size anyways
4178              *  - I do have a current size, am Secondary,
4179              *    and he has the only disk
4180              *  - I do have a current size, am Primary,
4181              *    and he has the only disk,
4182              *    which is larger than my current size
4183              */
4184             drbd_set_my_capacity(device, new_size);
4185         }
4186     }
4187 
4188     if (get_ldev(device)) {
4189         if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4190             device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4191             ldsc = 1;
4192         }
4193 
4194         put_ldev(device);
4195     }
4196 
4197     if (device->state.conn > C_WF_REPORT_PARAMS) {
4198         if (be64_to_cpu(p->c_size) != get_capacity(device->vdisk) ||
4199             ldsc) {
4200             /* we have different sizes, probably peer
4201              * needs to know my new size... */
4202             drbd_send_sizes(peer_device, 0, ddsf);
4203         }
4204         if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4205             (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4206             if (device->state.pdsk >= D_INCONSISTENT &&
4207                 device->state.disk >= D_INCONSISTENT) {
4208                 if (ddsf & DDSF_NO_RESYNC)
4209                     drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4210                 else
4211                     resync_after_online_grow(device);
4212             } else
4213                 set_bit(RESYNC_AFTER_NEG, &device->flags);
4214         }
4215     }
4216 
4217     return 0;
4218 }
4219 
4220 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4221 {
4222     struct drbd_peer_device *peer_device;
4223     struct drbd_device *device;
4224     struct p_uuids *p = pi->data;
4225     u64 *p_uuid;
4226     int i, updated_uuids = 0;
4227 
4228     peer_device = conn_peer_device(connection, pi->vnr);
4229     if (!peer_device)
4230         return config_unknown_volume(connection, pi);
4231     device = peer_device->device;
4232 
4233     p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4234     if (!p_uuid)
4235         return false;
4236 
4237     for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4238         p_uuid[i] = be64_to_cpu(p->uuid[i]);
4239 
4240     kfree(device->p_uuid);
4241     device->p_uuid = p_uuid;
4242 
4243     if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
4244         device->state.disk < D_INCONSISTENT &&
4245         device->state.role == R_PRIMARY &&
4246         (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4247         drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4248             (unsigned long long)device->ed_uuid);
4249         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4250         return -EIO;
4251     }
4252 
4253     if (get_ldev(device)) {
4254         int skip_initial_sync =
4255             device->state.conn == C_CONNECTED &&
4256             peer_device->connection->agreed_pro_version >= 90 &&
4257             device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4258             (p_uuid[UI_FLAGS] & 8);
4259         if (skip_initial_sync) {
4260             drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4261             drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4262                     "clear_n_write from receive_uuids",
4263                     BM_LOCKED_TEST_ALLOWED);
4264             _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4265             _drbd_uuid_set(device, UI_BITMAP, 0);
4266             _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4267                     CS_VERBOSE, NULL);
4268             drbd_md_sync(device);
4269             updated_uuids = 1;
4270         }
4271         put_ldev(device);
4272     } else if (device->state.disk < D_INCONSISTENT &&
4273            device->state.role == R_PRIMARY) {
4274         /* I am a diskless primary, the peer just created a new current UUID
4275            for me. */
4276         updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4277     }
4278 
4279     /* Before we test for the disk state, we should wait until an eventually
4280        ongoing cluster wide state change is finished. That is important if
4281        we are primary and are detaching from our disk. We need to see the
4282        new disk state... */
4283     mutex_lock(device->state_mutex);
4284     mutex_unlock(device->state_mutex);
4285     if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4286         updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4287 
4288     if (updated_uuids)
4289         drbd_print_uuids(device, "receiver updated UUIDs to");
4290 
4291     return 0;
4292 }
4293 
4294 /**
4295  * convert_state() - Converts the peer's view of the cluster state to our point of view
4296  * @ps:     The state as seen by the peer.
4297  */
4298 static union drbd_state convert_state(union drbd_state ps)
4299 {
4300     union drbd_state ms;
4301 
4302     static enum drbd_conns c_tab[] = {
4303         [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4304         [C_CONNECTED] = C_CONNECTED,
4305 
4306         [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4307         [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4308         [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4309         [C_VERIFY_S]       = C_VERIFY_T,
4310         [C_MASK]   = C_MASK,
4311     };
4312 
4313     ms.i = ps.i;
4314 
4315     ms.conn = c_tab[ps.conn];
4316     ms.peer = ps.role;
4317     ms.role = ps.peer;
4318     ms.pdsk = ps.disk;
4319     ms.disk = ps.pdsk;
4320     ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4321 
4322     return ms;
4323 }
4324 
4325 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4326 {
4327     struct drbd_peer_device *peer_device;
4328     struct drbd_device *device;
4329     struct p_req_state *p = pi->data;
4330     union drbd_state mask, val;
4331     enum drbd_state_rv rv;
4332 
4333     peer_device = conn_peer_device(connection, pi->vnr);
4334     if (!peer_device)
4335         return -EIO;
4336     device = peer_device->device;
4337 
4338     mask.i = be32_to_cpu(p->mask);
4339     val.i = be32_to_cpu(p->val);
4340 
4341     if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4342         mutex_is_locked(device->state_mutex)) {
4343         drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4344         return 0;
4345     }
4346 
4347     mask = convert_state(mask);
4348     val = convert_state(val);
4349 
4350     rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4351     drbd_send_sr_reply(peer_device, rv);
4352 
4353     drbd_md_sync(device);
4354 
4355     return 0;
4356 }
4357 
4358 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4359 {
4360     struct p_req_state *p = pi->data;
4361     union drbd_state mask, val;
4362     enum drbd_state_rv rv;
4363 
4364     mask.i = be32_to_cpu(p->mask);
4365     val.i = be32_to_cpu(p->val);
4366 
4367     if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4368         mutex_is_locked(&connection->cstate_mutex)) {
4369         conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4370         return 0;
4371     }
4372 
4373     mask = convert_state(mask);
4374     val = convert_state(val);
4375 
4376     rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4377     conn_send_sr_reply(connection, rv);
4378 
4379     return 0;
4380 }
4381 
4382 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4383 {
4384     struct drbd_peer_device *peer_device;
4385     struct drbd_device *device;
4386     struct p_state *p = pi->data;
4387     union drbd_state os, ns, peer_state;
4388     enum drbd_disk_state real_peer_disk;
4389     enum chg_state_flags cs_flags;
4390     int rv;
4391 
4392     peer_device = conn_peer_device(connection, pi->vnr);
4393     if (!peer_device)
4394         return config_unknown_volume(connection, pi);
4395     device = peer_device->device;
4396 
4397     peer_state.i = be32_to_cpu(p->state);
4398 
4399     real_peer_disk = peer_state.disk;
4400     if (peer_state.disk == D_NEGOTIATING) {
4401         real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4402         drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4403     }
4404 
4405     spin_lock_irq(&device->resource->req_lock);
4406  retry:
4407     os = ns = drbd_read_state(device);
4408     spin_unlock_irq(&device->resource->req_lock);
4409 
4410     /* If some other part of the code (ack_receiver thread, timeout)
4411      * already decided to close the connection again,
4412      * we must not "re-establish" it here. */
4413     if (os.conn <= C_TEAR_DOWN)
4414         return -ECONNRESET;
4415 
4416     /* If this is the "end of sync" confirmation, usually the peer disk
4417      * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4418      * set) resync started in PausedSyncT, or if the timing of pause-/
4419      * unpause-sync events has been "just right", the peer disk may
4420      * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4421      */
4422     if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4423         real_peer_disk == D_UP_TO_DATE &&
4424         os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4425         /* If we are (becoming) SyncSource, but peer is still in sync
4426          * preparation, ignore its uptodate-ness to avoid flapping, it
4427          * will change to inconsistent once the peer reaches active
4428          * syncing states.
4429          * It may have changed syncer-paused flags, however, so we
4430          * cannot ignore this completely. */
4431         if (peer_state.conn > C_CONNECTED &&
4432             peer_state.conn < C_SYNC_SOURCE)
4433             real_peer_disk = D_INCONSISTENT;
4434 
4435         /* if peer_state changes to connected at the same time,
4436          * it explicitly notifies us that it finished resync.
4437          * Maybe we should finish it up, too? */
4438         else if (os.conn >= C_SYNC_SOURCE &&
4439              peer_state.conn == C_CONNECTED) {
4440             if (drbd_bm_total_weight(device) <= device->rs_failed)
4441                 drbd_resync_finished(device);
4442             return 0;
4443         }
4444     }
4445 
4446     /* explicit verify finished notification, stop sector reached. */
4447     if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4448         peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4449         ov_out_of_sync_print(device);
4450         drbd_resync_finished(device);
4451         return 0;
4452     }
4453 
4454     /* peer says his disk is inconsistent, while we think it is uptodate,
4455      * and this happens while the peer still thinks we have a sync going on,
4456      * but we think we are already done with the sync.
4457      * We ignore this to avoid flapping pdsk.
4458      * This should not happen, if the peer is a recent version of drbd. */
4459     if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4460         os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4461         real_peer_disk = D_UP_TO_DATE;
4462 
4463     if (ns.conn == C_WF_REPORT_PARAMS)
4464         ns.conn = C_CONNECTED;
4465 
4466     if (peer_state.conn == C_AHEAD)
4467         ns.conn = C_BEHIND;
4468 
4469     /* TODO:
4470      * if (primary and diskless and peer uuid != effective uuid)
4471      *     abort attach on peer;
4472      *
4473      * If this node does not have good data, was already connected, but
4474      * the peer did a late attach only now, trying to "negotiate" with me,
4475      * AND I am currently Primary, possibly frozen, with some specific
4476      * "effective" uuid, this should never be reached, really, because
4477      * we first send the uuids, then the current state.
4478      *
4479      * In this scenario, we already dropped the connection hard
4480      * when we received the unsuitable uuids (receive_uuids().
4481      *
4482      * Should we want to change this, that is: not drop the connection in
4483      * receive_uuids() already, then we would need to add a branch here
4484      * that aborts the attach of "unsuitable uuids" on the peer in case
4485      * this node is currently Diskless Primary.
4486      */
4487 
4488     if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4489         get_ldev_if_state(device, D_NEGOTIATING)) {
4490         int cr; /* consider resync */
4491 
4492         /* if we established a new connection */
4493         cr  = (os.conn < C_CONNECTED);
4494         /* if we had an established connection
4495          * and one of the nodes newly attaches a disk */
4496         cr |= (os.conn == C_CONNECTED &&
4497                (peer_state.disk == D_NEGOTIATING ||
4498             os.disk == D_NEGOTIATING));
4499         /* if we have both been inconsistent, and the peer has been
4500          * forced to be UpToDate with --force */
4501         cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4502         /* if we had been plain connected, and the admin requested to
4503          * start a sync by "invalidate" or "invalidate-remote" */
4504         cr |= (os.conn == C_CONNECTED &&
4505                 (peer_state.conn >= C_STARTING_SYNC_S &&
4506                  peer_state.conn <= C_WF_BITMAP_T));
4507 
4508         if (cr)
4509             ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4510 
4511         put_ldev(device);
4512         if (ns.conn == C_MASK) {
4513             ns.conn = C_CONNECTED;
4514             if (device->state.disk == D_NEGOTIATING) {
4515                 drbd_force_state(device, NS(disk, D_FAILED));
4516             } else if (peer_state.disk == D_NEGOTIATING) {
4517                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4518                 peer_state.disk = D_DISKLESS;
4519                 real_peer_disk = D_DISKLESS;
4520             } else {
4521                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4522                     return -EIO;
4523                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4524                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4525                 return -EIO;
4526             }
4527         }
4528     }
4529 
4530     spin_lock_irq(&device->resource->req_lock);
4531     if (os.i != drbd_read_state(device).i)
4532         goto retry;
4533     clear_bit(CONSIDER_RESYNC, &device->flags);
4534     ns.peer = peer_state.role;
4535     ns.pdsk = real_peer_disk;
4536     ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4537     if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4538         ns.disk = device->new_state_tmp.disk;
4539     cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4540     if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4541         test_bit(NEW_CUR_UUID, &device->flags)) {
4542         /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4543            for temporal network outages! */
4544         spin_unlock_irq(&device->resource->req_lock);
4545         drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4546         tl_clear(peer_device->connection);
4547         drbd_uuid_new_current(device);
4548         clear_bit(NEW_CUR_UUID, &device->flags);
4549         conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4550         return -EIO;
4551     }
4552     rv = _drbd_set_state(device, ns, cs_flags, NULL);
4553     ns = drbd_read_state(device);
4554     spin_unlock_irq(&device->resource->req_lock);
4555 
4556     if (rv < SS_SUCCESS) {
4557         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4558         return -EIO;
4559     }
4560 
4561     if (os.conn > C_WF_REPORT_PARAMS) {
4562         if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4563             peer_state.disk != D_NEGOTIATING ) {
4564             /* we want resync, peer has not yet decided to sync... */
4565             /* Nowadays only used when forcing a node into primary role and
4566                setting its disk to UpToDate with that */
4567             drbd_send_uuids(peer_device);
4568             drbd_send_current_state(peer_device);
4569         }
4570     }
4571 
4572     clear_bit(DISCARD_MY_DATA, &device->flags);
4573 
4574     drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4575 
4576     return 0;
4577 }
4578 
4579 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4580 {
4581     struct drbd_peer_device *peer_device;
4582     struct drbd_device *device;
4583     struct p_rs_uuid *p = pi->data;
4584 
4585     peer_device = conn_peer_device(connection, pi->vnr);
4586     if (!peer_device)
4587         return -EIO;
4588     device = peer_device->device;
4589 
4590     wait_event(device->misc_wait,
4591            device->state.conn == C_WF_SYNC_UUID ||
4592            device->state.conn == C_BEHIND ||
4593            device->state.conn < C_CONNECTED ||
4594            device->state.disk < D_NEGOTIATING);
4595 
4596     /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4597 
4598     /* Here the _drbd_uuid_ functions are right, current should
4599        _not_ be rotated into the history */
4600     if (get_ldev_if_state(device, D_NEGOTIATING)) {
4601         _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4602         _drbd_uuid_set(device, UI_BITMAP, 0UL);
4603 
4604         drbd_print_uuids(device, "updated sync uuid");
4605         drbd_start_resync(device, C_SYNC_TARGET);
4606 
4607         put_ldev(device);
4608     } else
4609         drbd_err(device, "Ignoring SyncUUID packet!\n");
4610 
4611     return 0;
4612 }
4613 
4614 /*
4615  * receive_bitmap_plain
4616  *
4617  * Return 0 when done, 1 when another iteration is needed, and a negative error
4618  * code upon failure.
4619  */
4620 static int
4621 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4622              unsigned long *p, struct bm_xfer_ctx *c)
4623 {
4624     unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4625                  drbd_header_size(peer_device->connection);
4626     unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4627                        c->bm_words - c->word_offset);
4628     unsigned int want = num_words * sizeof(*p);
4629     int err;
4630 
4631     if (want != size) {
4632         drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4633         return -EIO;
4634     }
4635     if (want == 0)
4636         return 0;
4637     err = drbd_recv_all(peer_device->connection, p, want);
4638     if (err)
4639         return err;
4640 
4641     drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4642 
4643     c->word_offset += num_words;
4644     c->bit_offset = c->word_offset * BITS_PER_LONG;
4645     if (c->bit_offset > c->bm_bits)
4646         c->bit_offset = c->bm_bits;
4647 
4648     return 1;
4649 }
4650 
4651 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4652 {
4653     return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4654 }
4655 
4656 static int dcbp_get_start(struct p_compressed_bm *p)
4657 {
4658     return (p->encoding & 0x80) != 0;
4659 }
4660 
4661 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4662 {
4663     return (p->encoding >> 4) & 0x7;
4664 }
4665 
4666 /*
4667  * recv_bm_rle_bits
4668  *
4669  * Return 0 when done, 1 when another iteration is needed, and a negative error
4670  * code upon failure.
4671  */
4672 static int
4673 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4674         struct p_compressed_bm *p,
4675          struct bm_xfer_ctx *c,
4676          unsigned int len)
4677 {
4678     struct bitstream bs;
4679     u64 look_ahead;
4680     u64 rl;
4681     u64 tmp;
4682     unsigned long s = c->bit_offset;
4683     unsigned long e;
4684     int toggle = dcbp_get_start(p);
4685     int have;
4686     int bits;
4687 
4688     bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4689 
4690     bits = bitstream_get_bits(&bs, &look_ahead, 64);
4691     if (bits < 0)
4692         return -EIO;
4693 
4694     for (have = bits; have > 0; s += rl, toggle = !toggle) {
4695         bits = vli_decode_bits(&rl, look_ahead);
4696         if (bits <= 0)
4697             return -EIO;
4698 
4699         if (toggle) {
4700             e = s + rl -1;
4701             if (e >= c->bm_bits) {
4702                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4703                 return -EIO;
4704             }
4705             _drbd_bm_set_bits(peer_device->device, s, e);
4706         }
4707 
4708         if (have < bits) {
4709             drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4710                 have, bits, look_ahead,
4711                 (unsigned int)(bs.cur.b - p->code),
4712                 (unsigned int)bs.buf_len);
4713             return -EIO;
4714         }
4715         /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4716         if (likely(bits < 64))
4717             look_ahead >>= bits;
4718         else
4719             look_ahead = 0;
4720         have -= bits;
4721 
4722         bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4723         if (bits < 0)
4724             return -EIO;
4725         look_ahead |= tmp << have;
4726         have += bits;
4727     }
4728 
4729     c->bit_offset = s;
4730     bm_xfer_ctx_bit_to_word_offset(c);
4731 
4732     return (s != c->bm_bits);
4733 }
4734 
4735 /*
4736  * decode_bitmap_c
4737  *
4738  * Return 0 when done, 1 when another iteration is needed, and a negative error
4739  * code upon failure.
4740  */
4741 static int
4742 decode_bitmap_c(struct drbd_peer_device *peer_device,
4743         struct p_compressed_bm *p,
4744         struct bm_xfer_ctx *c,
4745         unsigned int len)
4746 {
4747     if (dcbp_get_code(p) == RLE_VLI_Bits)
4748         return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4749 
4750     /* other variants had been implemented for evaluation,
4751      * but have been dropped as this one turned out to be "best"
4752      * during all our tests. */
4753 
4754     drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4755     conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4756     return -EIO;
4757 }
4758 
4759 void INFO_bm_xfer_stats(struct drbd_device *device,
4760         const char *direction, struct bm_xfer_ctx *c)
4761 {
4762     /* what would it take to transfer it "plaintext" */
4763     unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4764     unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4765     unsigned int plain =
4766         header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4767         c->bm_words * sizeof(unsigned long);
4768     unsigned int total = c->bytes[0] + c->bytes[1];
4769     unsigned int r;
4770 
4771     /* total can not be zero. but just in case: */
4772     if (total == 0)
4773         return;
4774 
4775     /* don't report if not compressed */
4776     if (total >= plain)
4777         return;
4778 
4779     /* total < plain. check for overflow, still */
4780     r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4781                             : (1000 * total / plain);
4782 
4783     if (r > 1000)
4784         r = 1000;
4785 
4786     r = 1000 - r;
4787     drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4788          "total %u; compression: %u.%u%%\n",
4789             direction,
4790             c->bytes[1], c->packets[1],
4791             c->bytes[0], c->packets[0],
4792             total, r/10, r % 10);
4793 }
4794 
4795 /* Since we are processing the bitfield from lower addresses to higher,
4796    it does not matter if the process it in 32 bit chunks or 64 bit
4797    chunks as long as it is little endian. (Understand it as byte stream,
4798    beginning with the lowest byte...) If we would use big endian
4799    we would need to process it from the highest address to the lowest,
4800    in order to be agnostic to the 32 vs 64 bits issue.
4801 
4802    returns 0 on failure, 1 if we successfully received it. */
4803 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4804 {
4805     struct drbd_peer_device *peer_device;
4806     struct drbd_device *device;
4807     struct bm_xfer_ctx c;
4808     int err;
4809 
4810     peer_device = conn_peer_device(connection, pi->vnr);
4811     if (!peer_device)
4812         return -EIO;
4813     device = peer_device->device;
4814 
4815     drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4816     /* you are supposed to send additional out-of-sync information
4817      * if you actually set bits during this phase */
4818 
4819     c = (struct bm_xfer_ctx) {
4820         .bm_bits = drbd_bm_bits(device),
4821         .bm_words = drbd_bm_words(device),
4822     };
4823 
4824     for(;;) {
4825         if (pi->cmd == P_BITMAP)
4826             err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4827         else if (pi->cmd == P_COMPRESSED_BITMAP) {
4828             /* MAYBE: sanity check that we speak proto >= 90,
4829              * and the feature is enabled! */
4830             struct p_compressed_bm *p = pi->data;
4831 
4832             if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4833                 drbd_err(device, "ReportCBitmap packet too large\n");
4834                 err = -EIO;
4835                 goto out;
4836             }
4837             if (pi->size <= sizeof(*p)) {
4838                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4839                 err = -EIO;
4840                 goto out;
4841             }
4842             err = drbd_recv_all(peer_device->connection, p, pi->size);
4843             if (err)
4844                    goto out;
4845             err = decode_bitmap_c(peer_device, p, &c, pi->size);
4846         } else {
4847             drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4848             err = -EIO;
4849             goto out;
4850         }
4851 
4852         c.packets[pi->cmd == P_BITMAP]++;
4853         c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4854 
4855         if (err <= 0) {
4856             if (err < 0)
4857                 goto out;
4858             break;
4859         }
4860         err = drbd_recv_header(peer_device->connection, pi);
4861         if (err)
4862             goto out;
4863     }
4864 
4865     INFO_bm_xfer_stats(device, "receive", &c);
4866 
4867     if (device->state.conn == C_WF_BITMAP_T) {
4868         enum drbd_state_rv rv;
4869 
4870         err = drbd_send_bitmap(device);
4871         if (err)
4872             goto out;
4873         /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4874         rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4875         D_ASSERT(device, rv == SS_SUCCESS);
4876     } else if (device->state.conn != C_WF_BITMAP_S) {
4877         /* admin may have requested C_DISCONNECTING,
4878          * other threads may have noticed network errors */
4879         drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4880             drbd_conn_str(device->state.conn));
4881     }
4882     err = 0;
4883 
4884  out:
4885     drbd_bm_unlock(device);
4886     if (!err && device->state.conn == C_WF_BITMAP_S)
4887         drbd_start_resync(device, C_SYNC_SOURCE);
4888     return err;
4889 }
4890 
4891 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4892 {
4893     drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4894          pi->cmd, pi->size);
4895 
4896     return ignore_remaining_packet(connection, pi);
4897 }
4898 
4899 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4900 {
4901     /* Make sure we've acked all the TCP data associated
4902      * with the data requests being unplugged */
4903     tcp_sock_set_quickack(connection->data.socket->sk, 2);
4904     return 0;
4905 }
4906 
4907 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4908 {
4909     struct drbd_peer_device *peer_device;
4910     struct drbd_device *device;
4911     struct p_block_desc *p = pi->data;
4912 
4913     peer_device = conn_peer_device(connection, pi->vnr);
4914     if (!peer_device)
4915         return -EIO;
4916     device = peer_device->device;
4917 
4918     switch (device->state.conn) {
4919     case C_WF_SYNC_UUID:
4920     case C_WF_BITMAP_T:
4921     case C_BEHIND:
4922             break;
4923     default:
4924         drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4925                 drbd_conn_str(device->state.conn));
4926     }
4927 
4928     drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4929 
4930     return 0;
4931 }
4932 
4933 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4934 {
4935     struct drbd_peer_device *peer_device;
4936     struct p_block_desc *p = pi->data;
4937     struct drbd_device *device;
4938     sector_t sector;
4939     int size, err = 0;
4940 
4941     peer_device = conn_peer_device(connection, pi->vnr);
4942     if (!peer_device)
4943         return -EIO;
4944     device = peer_device->device;
4945 
4946     sector = be64_to_cpu(p->sector);
4947     size = be32_to_cpu(p->blksize);
4948 
4949     dec_rs_pending(device);
4950 
4951     if (get_ldev(device)) {
4952         struct drbd_peer_request *peer_req;
4953         const enum req_op op = REQ_OP_WRITE_ZEROES;
4954 
4955         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4956                            size, 0, GFP_NOIO);
4957         if (!peer_req) {
4958             put_ldev(device);
4959             return -ENOMEM;
4960         }
4961 
4962         peer_req->w.cb = e_end_resync_block;
4963         peer_req->submit_jif = jiffies;
4964         peer_req->flags |= EE_TRIM;
4965 
4966         spin_lock_irq(&device->resource->req_lock);
4967         list_add_tail(&peer_req->w.list, &device->sync_ee);
4968         spin_unlock_irq(&device->resource->req_lock);
4969 
4970         atomic_add(pi->size >> 9, &device->rs_sect_ev);
4971         err = drbd_submit_peer_request(device, peer_req, op,
4972                            DRBD_FAULT_RS_WR);
4973 
4974         if (err) {
4975             spin_lock_irq(&device->resource->req_lock);
4976             list_del(&peer_req->w.list);
4977             spin_unlock_irq(&device->resource->req_lock);
4978 
4979             drbd_free_peer_req(device, peer_req);
4980             put_ldev(device);
4981             err = 0;
4982             goto fail;
4983         }
4984 
4985         inc_unacked(device);
4986 
4987         /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4988            as well as drbd_rs_complete_io() */
4989     } else {
4990     fail:
4991         drbd_rs_complete_io(device, sector);
4992         drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4993     }
4994 
4995     atomic_add(size >> 9, &device->rs_sect_in);
4996 
4997     return err;
4998 }
4999 
5000 struct data_cmd {
5001     int expect_payload;
5002     unsigned int pkt_size;
5003     int (*fn)(struct drbd_connection *, struct packet_info *);
5004 };
5005 
5006 static struct data_cmd drbd_cmd_handler[] = {
5007     [P_DATA]        = { 1, sizeof(struct p_data), receive_Data },
5008     [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
5009     [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
5010     [P_BARRIER]     = { 0, sizeof(struct p_barrier), receive_Barrier } ,
5011     [P_BITMAP]      = { 1, 0, receive_bitmap } ,
5012     [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
5013     [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
5014     [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
5015     [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
5016     [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
5017     [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
5018     [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
5019     [P_UUIDS]       = { 0, sizeof(struct p_uuids), receive_uuids },
5020     [P_SIZES]       = { 0, sizeof(struct p_sizes), receive_sizes },
5021     [P_STATE]       = { 0, sizeof(struct p_state), receive_state },
5022     [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
5023     [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
5024     [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
5025     [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
5026     [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
5027     [P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
5028     [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
5029     [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
5030     [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
5031     [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
5032     [P_TRIM]        = { 0, sizeof(struct p_trim), receive_Data },
5033     [P_ZEROES]      = { 0, sizeof(struct p_trim), receive_Data },
5034     [P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
5035 };
5036 
5037 static void drbdd(struct drbd_connection *connection)
5038 {
5039     struct packet_info pi;
5040     size_t shs; /* sub header size */
5041     int err;
5042 
5043     while (get_t_state(&connection->receiver) == RUNNING) {
5044         struct data_cmd const *cmd;
5045 
5046         drbd_thread_current_set_cpu(&connection->receiver);
5047         update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
5048         if (drbd_recv_header_maybe_unplug(connection, &pi))
5049             goto err_out;
5050 
5051         cmd = &drbd_cmd_handler[pi.cmd];
5052         if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
5053             drbd_err(connection, "Unexpected data packet %s (0x%04x)",
5054                  cmdname(pi.cmd), pi.cmd);
5055             goto err_out;
5056         }
5057 
5058         shs = cmd->pkt_size;
5059         if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
5060             shs += sizeof(struct o_qlim);
5061         if (pi.size > shs && !cmd->expect_payload) {
5062             drbd_err(connection, "No payload expected %s l:%d\n",
5063                  cmdname(pi.cmd), pi.size);
5064             goto err_out;
5065         }
5066         if (pi.size < shs) {
5067             drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
5068                  cmdname(pi.cmd), (int)shs, pi.size);
5069             goto err_out;
5070         }
5071 
5072         if (shs) {
5073             update_receiver_timing_details(connection, drbd_recv_all_warn);
5074             err = drbd_recv_all_warn(connection, pi.data, shs);
5075             if (err)
5076                 goto err_out;
5077             pi.size -= shs;
5078         }
5079 
5080         update_receiver_timing_details(connection, cmd->fn);
5081         err = cmd->fn(connection, &pi);
5082         if (err) {
5083             drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5084                  cmdname(pi.cmd), err, pi.size);
5085             goto err_out;
5086         }
5087     }
5088     return;
5089 
5090     err_out:
5091     conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5092 }
5093 
5094 static void conn_disconnect(struct drbd_connection *connection)
5095 {
5096     struct drbd_peer_device *peer_device;
5097     enum drbd_conns oc;
5098     int vnr;
5099 
5100     if (connection->cstate == C_STANDALONE)
5101         return;
5102 
5103     /* We are about to start the cleanup after connection loss.
5104      * Make sure drbd_make_request knows about that.
5105      * Usually we should be in some network failure state already,
5106      * but just in case we are not, we fix it up here.
5107      */
5108     conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5109 
5110     /* ack_receiver does not clean up anything. it must not interfere, either */
5111     drbd_thread_stop(&connection->ack_receiver);
5112     if (connection->ack_sender) {
5113         destroy_workqueue(connection->ack_sender);
5114         connection->ack_sender = NULL;
5115     }
5116     drbd_free_sock(connection);
5117 
5118     rcu_read_lock();
5119     idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5120         struct drbd_device *device = peer_device->device;
5121         kref_get(&device->kref);
5122         rcu_read_unlock();
5123         drbd_disconnected(peer_device);
5124         kref_put(&device->kref, drbd_destroy_device);
5125         rcu_read_lock();
5126     }
5127     rcu_read_unlock();
5128 
5129     if (!list_empty(&connection->current_epoch->list))
5130         drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5131     /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5132     atomic_set(&connection->current_epoch->epoch_size, 0);
5133     connection->send.seen_any_write_yet = false;
5134 
5135     drbd_info(connection, "Connection closed\n");
5136 
5137     if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5138         conn_try_outdate_peer_async(connection);
5139 
5140     spin_lock_irq(&connection->resource->req_lock);
5141     oc = connection->cstate;
5142     if (oc >= C_UNCONNECTED)
5143         _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5144 
5145     spin_unlock_irq(&connection->resource->req_lock);
5146 
5147     if (oc == C_DISCONNECTING)
5148         conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5149 }
5150 
5151 static int drbd_disconnected(struct drbd_peer_device *peer_device)
5152 {
5153     struct drbd_device *device = peer_device->device;
5154     unsigned int i;
5155 
5156     /* wait for current activity to cease. */
5157     spin_lock_irq(&device->resource->req_lock);
5158     _drbd_wait_ee_list_empty(device, &device->active_ee);
5159     _drbd_wait_ee_list_empty(device, &device->sync_ee);
5160     _drbd_wait_ee_list_empty(device, &device->read_ee);
5161     spin_unlock_irq(&device->resource->req_lock);
5162 
5163     /* We do not have data structures that would allow us to
5164      * get the rs_pending_cnt down to 0 again.
5165      *  * On C_SYNC_TARGET we do not have any data structures describing
5166      *    the pending RSDataRequest's we have sent.
5167      *  * On C_SYNC_SOURCE there is no data structure that tracks
5168      *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5169      *  And no, it is not the sum of the reference counts in the
5170      *  resync_LRU. The resync_LRU tracks the whole operation including
5171      *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5172      *  on the fly. */
5173     drbd_rs_cancel_all(device);
5174     device->rs_total = 0;
5175     device->rs_failed = 0;
5176     atomic_set(&device->rs_pending_cnt, 0);
5177     wake_up(&device->misc_wait);
5178 
5179     del_timer_sync(&device->resync_timer);
5180     resync_timer_fn(&device->resync_timer);
5181 
5182     /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5183      * w_make_resync_request etc. which may still be on the worker queue
5184      * to be "canceled" */
5185     drbd_flush_workqueue(&peer_device->connection->sender_work);
5186 
5187     drbd_finish_peer_reqs(device);
5188 
5189     /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5190        might have issued a work again. The one before drbd_finish_peer_reqs() is
5191        necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5192     drbd_flush_workqueue(&peer_device->connection->sender_work);
5193 
5194     /* need to do it again, drbd_finish_peer_reqs() may have populated it
5195      * again via drbd_try_clear_on_disk_bm(). */
5196     drbd_rs_cancel_all(device);
5197 
5198     kfree(device->p_uuid);
5199     device->p_uuid = NULL;
5200 
5201     if (!drbd_suspended(device))
5202         tl_clear(peer_device->connection);
5203 
5204     drbd_md_sync(device);
5205 
5206     if (get_ldev(device)) {
5207         drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5208                 "write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5209         put_ldev(device);
5210     }
5211 
5212     /* tcp_close and release of sendpage pages can be deferred.  I don't
5213      * want to use SO_LINGER, because apparently it can be deferred for
5214      * more than 20 seconds (longest time I checked).
5215      *
5216      * Actually we don't care for exactly when the network stack does its
5217      * put_page(), but release our reference on these pages right here.
5218      */
5219     i = drbd_free_peer_reqs(device, &device->net_ee);
5220     if (i)
5221         drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5222     i = atomic_read(&device->pp_in_use_by_net);
5223     if (i)
5224         drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5225     i = atomic_read(&device->pp_in_use);
5226     if (i)
5227         drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5228 
5229     D_ASSERT(device, list_empty(&device->read_ee));
5230     D_ASSERT(device, list_empty(&device->active_ee));
5231     D_ASSERT(device, list_empty(&device->sync_ee));
5232     D_ASSERT(device, list_empty(&device->done_ee));
5233 
5234     return 0;
5235 }
5236 
5237 /*
5238  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5239  * we can agree on is stored in agreed_pro_version.
5240  *
5241  * feature flags and the reserved array should be enough room for future
5242  * enhancements of the handshake protocol, and possible plugins...
5243  *
5244  * for now, they are expected to be zero, but ignored.
5245  */
5246 static int drbd_send_features(struct drbd_connection *connection)
5247 {
5248     struct drbd_socket *sock;
5249     struct p_connection_features *p;
5250 
5251     sock = &connection->data;
5252     p = conn_prepare_command(connection, sock);
5253     if (!p)
5254         return -EIO;
5255     memset(p, 0, sizeof(*p));
5256     p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5257     p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5258     p->feature_flags = cpu_to_be32(PRO_FEATURES);
5259     return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5260 }
5261 
5262 /*
5263  * return values:
5264  *   1 yes, we have a valid connection
5265  *   0 oops, did not work out, please try again
5266  *  -1 peer talks different language,
5267  *     no point in trying again, please go standalone.
5268  */
5269 static int drbd_do_features(struct drbd_connection *connection)
5270 {
5271     /* ASSERT current == connection->receiver ... */
5272     struct p_connection_features *p;
5273     const int expect = sizeof(struct p_connection_features);
5274     struct packet_info pi;
5275     int err;
5276 
5277     err = drbd_send_features(connection);
5278     if (err)
5279         return 0;
5280 
5281     err = drbd_recv_header(connection, &pi);
5282     if (err)
5283         return 0;
5284 
5285     if (pi.cmd != P_CONNECTION_FEATURES) {
5286         drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5287              cmdname(pi.cmd), pi.cmd);
5288         return -1;
5289     }
5290 
5291     if (pi.size != expect) {
5292         drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5293              expect, pi.size);
5294         return -1;
5295     }
5296 
5297     p = pi.data;
5298     err = drbd_recv_all_warn(connection, p, expect);
5299     if (err)
5300         return 0;
5301 
5302     p->protocol_min = be32_to_cpu(p->protocol_min);
5303     p->protocol_max = be32_to_cpu(p->protocol_max);
5304     if (p->protocol_max == 0)
5305         p->protocol_max = p->protocol_min;
5306 
5307     if (PRO_VERSION_MAX < p->protocol_min ||
5308         PRO_VERSION_MIN > p->protocol_max)
5309         goto incompat;
5310 
5311     connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5312     connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5313 
5314     drbd_info(connection, "Handshake successful: "
5315          "Agreed network protocol version %d\n", connection->agreed_pro_version);
5316 
5317     drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s%s.\n",
5318           connection->agreed_features,
5319           connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5320           connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5321           connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" : "",
5322           connection->agreed_features & DRBD_FF_WZEROES ? " WRITE_ZEROES" :
5323           connection->agreed_features ? "" : " none");
5324 
5325     return 1;
5326 
5327  incompat:
5328     drbd_err(connection, "incompatible DRBD dialects: "
5329         "I support %d-%d, peer supports %d-%d\n",
5330         PRO_VERSION_MIN, PRO_VERSION_MAX,
5331         p->protocol_min, p->protocol_max);
5332     return -1;
5333 }
5334 
5335 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5336 static int drbd_do_auth(struct drbd_connection *connection)
5337 {
5338     drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5339     drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5340     return -1;
5341 }
5342 #else
5343 #define CHALLENGE_LEN 64
5344 
5345 /* Return value:
5346     1 - auth succeeded,
5347     0 - failed, try again (network error),
5348     -1 - auth failed, don't try again.
5349 */
5350 
5351 static int drbd_do_auth(struct drbd_connection *connection)
5352 {
5353     struct drbd_socket *sock;
5354     char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5355     char *response = NULL;
5356     char *right_response = NULL;
5357     char *peers_ch = NULL;
5358     unsigned int key_len;
5359     char secret[SHARED_SECRET_MAX]; /* 64 byte */
5360     unsigned int resp_size;
5361     struct shash_desc *desc;
5362     struct packet_info pi;
5363     struct net_conf *nc;
5364     int err, rv;
5365 
5366     /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5367 
5368     rcu_read_lock();
5369     nc = rcu_dereference(connection->net_conf);
5370     key_len = strlen(nc->shared_secret);
5371     memcpy(secret, nc->shared_secret, key_len);
5372     rcu_read_unlock();
5373 
5374     desc = kmalloc(sizeof(struct shash_desc) +
5375                crypto_shash_descsize(connection->cram_hmac_tfm),
5376                GFP_KERNEL);
5377     if (!desc) {
5378         rv = -1;
5379         goto fail;
5380     }
5381     desc->tfm = connection->cram_hmac_tfm;
5382 
5383     rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5384     if (rv) {
5385         drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5386         rv = -1;
5387         goto fail;
5388     }
5389 
5390     get_random_bytes(my_challenge, CHALLENGE_LEN);
5391 
5392     sock = &connection->data;
5393     if (!conn_prepare_command(connection, sock)) {
5394         rv = 0;
5395         goto fail;
5396     }
5397     rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5398                 my_challenge, CHALLENGE_LEN);
5399     if (!rv)
5400         goto fail;
5401 
5402     err = drbd_recv_header(connection, &pi);
5403     if (err) {
5404         rv = 0;
5405         goto fail;
5406     }
5407 
5408     if (pi.cmd != P_AUTH_CHALLENGE) {
5409         drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5410              cmdname(pi.cmd), pi.cmd);
5411         rv = -1;
5412         goto fail;
5413     }
5414 
5415     if (pi.size > CHALLENGE_LEN * 2) {
5416         drbd_err(connection, "expected AuthChallenge payload too big.\n");
5417         rv = -1;
5418         goto fail;
5419     }
5420 
5421     if (pi.size < CHALLENGE_LEN) {
5422         drbd_err(connection, "AuthChallenge payload too small.\n");
5423         rv = -1;
5424         goto fail;
5425     }
5426 
5427     peers_ch = kmalloc(pi.size, GFP_NOIO);
5428     if (!peers_ch) {
5429         rv = -1;
5430         goto fail;
5431     }
5432 
5433     err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5434     if (err) {
5435         rv = 0;
5436         goto fail;
5437     }
5438 
5439     if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5440         drbd_err(connection, "Peer presented the same challenge!\n");
5441         rv = -1;
5442         goto fail;
5443     }
5444 
5445     resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5446     response = kmalloc(resp_size, GFP_NOIO);
5447     if (!response) {
5448         rv = -1;
5449         goto fail;
5450     }
5451 
5452     rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5453     if (rv) {
5454         drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5455         rv = -1;
5456         goto fail;
5457     }
5458 
5459     if (!conn_prepare_command(connection, sock)) {
5460         rv = 0;
5461         goto fail;
5462     }
5463     rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5464                 response, resp_size);
5465     if (!rv)
5466         goto fail;
5467 
5468     err = drbd_recv_header(connection, &pi);
5469     if (err) {
5470         rv = 0;
5471         goto fail;
5472     }
5473 
5474     if (pi.cmd != P_AUTH_RESPONSE) {
5475         drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5476              cmdname(pi.cmd), pi.cmd);
5477         rv = 0;
5478         goto fail;
5479     }
5480 
5481     if (pi.size != resp_size) {
5482         drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5483         rv = 0;
5484         goto fail;
5485     }
5486 
5487     err = drbd_recv_all_warn(connection, response , resp_size);
5488     if (err) {
5489         rv = 0;
5490         goto fail;
5491     }
5492 
5493     right_response = kmalloc(resp_size, GFP_NOIO);
5494     if (!right_response) {
5495         rv = -1;
5496         goto fail;
5497     }
5498 
5499     rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5500                  right_response);
5501     if (rv) {
5502         drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5503         rv = -1;
5504         goto fail;
5505     }
5506 
5507     rv = !memcmp(response, right_response, resp_size);
5508 
5509     if (rv)
5510         drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5511              resp_size);
5512     else
5513         rv = -1;
5514 
5515  fail:
5516     kfree(peers_ch);
5517     kfree(response);
5518     kfree(right_response);
5519     if (desc) {
5520         shash_desc_zero(desc);
5521         kfree(desc);
5522     }
5523 
5524     return rv;
5525 }
5526 #endif
5527 
5528 int drbd_receiver(struct drbd_thread *thi)
5529 {
5530     struct drbd_connection *connection = thi->connection;
5531     int h;
5532 
5533     drbd_info(connection, "receiver (re)started\n");
5534 
5535     do {
5536         h = conn_connect(connection);
5537         if (h == 0) {
5538             conn_disconnect(connection);
5539             schedule_timeout_interruptible(HZ);
5540         }
5541         if (h == -1) {
5542             drbd_warn(connection, "Discarding network configuration.\n");
5543             conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5544         }
5545     } while (h == 0);
5546 
5547     if (h > 0) {
5548         blk_start_plug(&connection->receiver_plug);
5549         drbdd(connection);
5550         blk_finish_plug(&connection->receiver_plug);
5551     }
5552 
5553     conn_disconnect(connection);
5554 
5555     drbd_info(connection, "receiver terminated\n");
5556     return 0;
5557 }
5558 
5559 /* ********* acknowledge sender ******** */
5560 
5561 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5562 {
5563     struct p_req_state_reply *p = pi->data;
5564     int retcode = be32_to_cpu(p->retcode);
5565 
5566     if (retcode >= SS_SUCCESS) {
5567         set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5568     } else {
5569         set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5570         drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5571              drbd_set_st_err_str(retcode), retcode);
5572     }
5573     wake_up(&connection->ping_wait);
5574 
5575     return 0;
5576 }
5577 
5578 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5579 {
5580     struct drbd_peer_device *peer_device;
5581     struct drbd_device *device;
5582     struct p_req_state_reply *p = pi->data;
5583     int retcode = be32_to_cpu(p->retcode);
5584 
5585     peer_device = conn_peer_device(connection, pi->vnr);
5586     if (!peer_device)
5587         return -EIO;
5588     device = peer_device->device;
5589 
5590     if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5591         D_ASSERT(device, connection->agreed_pro_version < 100);
5592         return got_conn_RqSReply(connection, pi);
5593     }
5594 
5595     if (retcode >= SS_SUCCESS) {
5596         set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5597     } else {
5598         set_bit(CL_ST_CHG_FAIL, &device->flags);
5599         drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5600             drbd_set_st_err_str(retcode), retcode);
5601     }
5602     wake_up(&device->state_wait);
5603 
5604     return 0;
5605 }
5606 
5607 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5608 {
5609     return drbd_send_ping_ack(connection);
5610 
5611 }
5612 
5613 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5614 {
5615     /* restore idle timeout */
5616     connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5617     if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5618         wake_up(&connection->ping_wait);
5619 
5620     return 0;
5621 }
5622 
5623 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5624 {
5625     struct drbd_peer_device *peer_device;
5626     struct drbd_device *device;
5627     struct p_block_ack *p = pi->data;
5628     sector_t sector = be64_to_cpu(p->sector);
5629     int blksize = be32_to_cpu(p->blksize);
5630 
5631     peer_device = conn_peer_device(connection, pi->vnr);
5632     if (!peer_device)
5633         return -EIO;
5634     device = peer_device->device;
5635 
5636     D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5637 
5638     update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5639 
5640     if (get_ldev(device)) {
5641         drbd_rs_complete_io(device, sector);
5642         drbd_set_in_sync(device, sector, blksize);
5643         /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5644         device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5645         put_ldev(device);
5646     }
5647     dec_rs_pending(device);
5648     atomic_add(blksize >> 9, &device->rs_sect_in);
5649 
5650     return 0;
5651 }
5652 
5653 static int
5654 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5655                   struct rb_root *root, const char *func,
5656                   enum drbd_req_event what, bool missing_ok)
5657 {
5658     struct drbd_request *req;
5659     struct bio_and_error m;
5660 
5661     spin_lock_irq(&device->resource->req_lock);
5662     req = find_request(device, root, id, sector, missing_ok, func);
5663     if (unlikely(!req)) {
5664         spin_unlock_irq(&device->resource->req_lock);
5665         return -EIO;
5666     }
5667     __req_mod(req, what, &m);
5668     spin_unlock_irq(&device->resource->req_lock);
5669 
5670     if (m.bio)
5671         complete_master_bio(device, &m);
5672     return 0;
5673 }
5674 
5675 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5676 {
5677     struct drbd_peer_device *peer_device;
5678     struct drbd_device *device;
5679     struct p_block_ack *p = pi->data;
5680     sector_t sector = be64_to_cpu(p->sector);
5681     int blksize = be32_to_cpu(p->blksize);
5682     enum drbd_req_event what;
5683 
5684     peer_device = conn_peer_device(connection, pi->vnr);
5685     if (!peer_device)
5686         return -EIO;
5687     device = peer_device->device;
5688 
5689     update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5690 
5691     if (p->block_id == ID_SYNCER) {
5692         drbd_set_in_sync(device, sector, blksize);
5693         dec_rs_pending(device);
5694         return 0;
5695     }
5696     switch (pi->cmd) {
5697     case P_RS_WRITE_ACK:
5698         what = WRITE_ACKED_BY_PEER_AND_SIS;
5699         break;
5700     case P_WRITE_ACK:
5701         what = WRITE_ACKED_BY_PEER;
5702         break;
5703     case P_RECV_ACK:
5704         what = RECV_ACKED_BY_PEER;
5705         break;
5706     case P_SUPERSEDED:
5707         what = CONFLICT_RESOLVED;
5708         break;
5709     case P_RETRY_WRITE:
5710         what = POSTPONE_WRITE;
5711         break;
5712     default:
5713         BUG();
5714     }
5715 
5716     return validate_req_change_req_state(device, p->block_id, sector,
5717                          &device->write_requests, __func__,
5718                          what, false);
5719 }
5720 
5721 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5722 {
5723     struct drbd_peer_device *peer_device;
5724     struct drbd_device *device;
5725     struct p_block_ack *p = pi->data;
5726     sector_t sector = be64_to_cpu(p->sector);
5727     int size = be32_to_cpu(p->blksize);
5728     int err;
5729 
5730     peer_device = conn_peer_device(connection, pi->vnr);
5731     if (!peer_device)
5732         return -EIO;
5733     device = peer_device->device;
5734 
5735     update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5736 
5737     if (p->block_id == ID_SYNCER) {
5738         dec_rs_pending(device);
5739         drbd_rs_failed_io(device, sector, size);
5740         return 0;
5741     }
5742 
5743     err = validate_req_change_req_state(device, p->block_id, sector,
5744                         &device->write_requests, __func__,
5745                         NEG_ACKED, true);
5746     if (err) {
5747         /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5748            The master bio might already be completed, therefore the
5749            request is no longer in the collision hash. */
5750         /* In Protocol B we might already have got a P_RECV_ACK
5751            but then get a P_NEG_ACK afterwards. */
5752         drbd_set_out_of_sync(device, sector, size);
5753     }
5754     return 0;
5755 }
5756 
5757 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5758 {
5759     struct drbd_peer_device *peer_device;
5760     struct drbd_device *device;
5761     struct p_block_ack *p = pi->data;
5762     sector_t sector = be64_to_cpu(p->sector);
5763 
5764     peer_device = conn_peer_device(connection, pi->vnr);
5765     if (!peer_device)
5766         return -EIO;
5767     device = peer_device->device;
5768 
5769     update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5770 
5771     drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5772         (unsigned long long)sector, be32_to_cpu(p->blksize));
5773 
5774     return validate_req_change_req_state(device, p->block_id, sector,
5775                          &device->read_requests, __func__,
5776                          NEG_ACKED, false);
5777 }
5778 
5779 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5780 {
5781     struct drbd_peer_device *peer_device;
5782     struct drbd_device *device;
5783     sector_t sector;
5784     int size;
5785     struct p_block_ack *p = pi->data;
5786 
5787     peer_device = conn_peer_device(connection, pi->vnr);
5788     if (!peer_device)
5789         return -EIO;
5790     device = peer_device->device;
5791 
5792     sector = be64_to_cpu(p->sector);
5793     size = be32_to_cpu(p->blksize);
5794 
5795     update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5796 
5797     dec_rs_pending(device);
5798 
5799     if (get_ldev_if_state(device, D_FAILED)) {
5800         drbd_rs_complete_io(device, sector);
5801         switch (pi->cmd) {
5802         case P_NEG_RS_DREPLY:
5803             drbd_rs_failed_io(device, sector, size);
5804             break;
5805         case P_RS_CANCEL:
5806             break;
5807         default:
5808             BUG();
5809         }
5810         put_ldev(device);
5811     }
5812 
5813     return 0;
5814 }
5815 
5816 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5817 {
5818     struct p_barrier_ack *p = pi->data;
5819     struct drbd_peer_device *peer_device;
5820     int vnr;
5821 
5822     tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5823 
5824     rcu_read_lock();
5825     idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5826         struct drbd_device *device = peer_device->device;
5827 
5828         if (device->state.conn == C_AHEAD &&
5829             atomic_read(&device->ap_in_flight) == 0 &&
5830             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5831             device->start_resync_timer.expires = jiffies + HZ;
5832             add_timer(&device->start_resync_timer);
5833         }
5834     }
5835     rcu_read_unlock();
5836 
5837     return 0;
5838 }
5839 
5840 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5841 {
5842     struct drbd_peer_device *peer_device;
5843     struct drbd_device *device;
5844     struct p_block_ack *p = pi->data;
5845     struct drbd_device_work *dw;
5846     sector_t sector;
5847     int size;
5848 
5849     peer_device = conn_peer_device(connection, pi->vnr);
5850     if (!peer_device)
5851         return -EIO;
5852     device = peer_device->device;
5853 
5854     sector = be64_to_cpu(p->sector);
5855     size = be32_to_cpu(p->blksize);
5856 
5857     update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5858 
5859     if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5860         drbd_ov_out_of_sync_found(device, sector, size);
5861     else
5862         ov_out_of_sync_print(device);
5863 
5864     if (!get_ldev(device))
5865         return 0;
5866 
5867     drbd_rs_complete_io(device, sector);
5868     dec_rs_pending(device);
5869 
5870     --device->ov_left;
5871 
5872     /* let's advance progress step marks only for every other megabyte */
5873     if ((device->ov_left & 0x200) == 0x200)
5874         drbd_advance_rs_marks(device, device->ov_left);
5875 
5876     if (device->ov_left == 0) {
5877         dw = kmalloc(sizeof(*dw), GFP_NOIO);
5878         if (dw) {
5879             dw->w.cb = w_ov_finished;
5880             dw->device = device;
5881             drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5882         } else {
5883             drbd_err(device, "kmalloc(dw) failed.");
5884             ov_out_of_sync_print(device);
5885             drbd_resync_finished(device);
5886         }
5887     }
5888     put_ldev(device);
5889     return 0;
5890 }
5891 
5892 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5893 {
5894     return 0;
5895 }
5896 
5897 struct meta_sock_cmd {
5898     size_t pkt_size;
5899     int (*fn)(struct drbd_connection *connection, struct packet_info *);
5900 };
5901 
5902 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5903 {
5904     long t;
5905     struct net_conf *nc;
5906 
5907     rcu_read_lock();
5908     nc = rcu_dereference(connection->net_conf);
5909     t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5910     rcu_read_unlock();
5911 
5912     t *= HZ;
5913     if (ping_timeout)
5914         t /= 10;
5915 
5916     connection->meta.socket->sk->sk_rcvtimeo = t;
5917 }
5918 
5919 static void set_ping_timeout(struct drbd_connection *connection)
5920 {
5921     set_rcvtimeo(connection, 1);
5922 }
5923 
5924 static void set_idle_timeout(struct drbd_connection *connection)
5925 {
5926     set_rcvtimeo(connection, 0);
5927 }
5928 
5929 static struct meta_sock_cmd ack_receiver_tbl[] = {
5930     [P_PING]        = { 0, got_Ping },
5931     [P_PING_ACK]        = { 0, got_PingAck },
5932     [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5933     [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5934     [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5935     [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5936     [P_NEG_ACK]     = { sizeof(struct p_block_ack), got_NegAck },
5937     [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5938     [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5939     [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5940     [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5941     [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5942     [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5943     [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5944     [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5945     [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5946     [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5947 };
5948 
5949 int drbd_ack_receiver(struct drbd_thread *thi)
5950 {
5951     struct drbd_connection *connection = thi->connection;
5952     struct meta_sock_cmd *cmd = NULL;
5953     struct packet_info pi;
5954     unsigned long pre_recv_jif;
5955     int rv;
5956     void *buf    = connection->meta.rbuf;
5957     int received = 0;
5958     unsigned int header_size = drbd_header_size(connection);
5959     int expect   = header_size;
5960     bool ping_timeout_active = false;
5961 
5962     sched_set_fifo_low(current);
5963 
5964     while (get_t_state(thi) == RUNNING) {
5965         drbd_thread_current_set_cpu(thi);
5966 
5967         conn_reclaim_net_peer_reqs(connection);
5968 
5969         if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5970             if (drbd_send_ping(connection)) {
5971                 drbd_err(connection, "drbd_send_ping has failed\n");
5972                 goto reconnect;
5973             }
5974             set_ping_timeout(connection);
5975             ping_timeout_active = true;
5976         }
5977 
5978         pre_recv_jif = jiffies;
5979         rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5980 
5981         /* Note:
5982          * -EINTR    (on meta) we got a signal
5983          * -EAGAIN   (on meta) rcvtimeo expired
5984          * -ECONNRESET   other side closed the connection
5985          * -ERESTARTSYS  (on data) we got a signal
5986          * rv <  0   other than above: unexpected error!
5987          * rv == expected: full header or command
5988          * rv <  expected: "woken" by signal during receive
5989          * rv == 0   : "connection shut down by peer"
5990          */
5991         if (likely(rv > 0)) {
5992             received += rv;
5993             buf  += rv;
5994         } else if (rv == 0) {
5995             if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5996                 long t;
5997                 rcu_read_lock();
5998                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5999                 rcu_read_unlock();
6000 
6001                 t = wait_event_timeout(connection->ping_wait,
6002                                connection->cstate < C_WF_REPORT_PARAMS,
6003                                t);
6004                 if (t)
6005                     break;
6006             }
6007             drbd_err(connection, "meta connection shut down by peer.\n");
6008             goto reconnect;
6009         } else if (rv == -EAGAIN) {
6010             /* If the data socket received something meanwhile,
6011              * that is good enough: peer is still alive. */
6012             if (time_after(connection->last_received, pre_recv_jif))
6013                 continue;
6014             if (ping_timeout_active) {
6015                 drbd_err(connection, "PingAck did not arrive in time.\n");
6016                 goto reconnect;
6017             }
6018             set_bit(SEND_PING, &connection->flags);
6019             continue;
6020         } else if (rv == -EINTR) {
6021             /* maybe drbd_thread_stop(): the while condition will notice.
6022              * maybe woken for send_ping: we'll send a ping above,
6023              * and change the rcvtimeo */
6024             flush_signals(current);
6025             continue;
6026         } else {
6027             drbd_err(connection, "sock_recvmsg returned %d\n", rv);
6028             goto reconnect;
6029         }
6030 
6031         if (received == expect && cmd == NULL) {
6032             if (decode_header(connection, connection->meta.rbuf, &pi))
6033                 goto reconnect;
6034             cmd = &ack_receiver_tbl[pi.cmd];
6035             if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
6036                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
6037                      cmdname(pi.cmd), pi.cmd);
6038                 goto disconnect;
6039             }
6040             expect = header_size + cmd->pkt_size;
6041             if (pi.size != expect - header_size) {
6042                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
6043                     pi.cmd, pi.size);
6044                 goto reconnect;
6045             }
6046         }
6047         if (received == expect) {
6048             bool err;
6049 
6050             err = cmd->fn(connection, &pi);
6051             if (err) {
6052                 drbd_err(connection, "%ps failed\n", cmd->fn);
6053                 goto reconnect;
6054             }
6055 
6056             connection->last_received = jiffies;
6057 
6058             if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
6059                 set_idle_timeout(connection);
6060                 ping_timeout_active = false;
6061             }
6062 
6063             buf  = connection->meta.rbuf;
6064             received = 0;
6065             expect   = header_size;
6066             cmd  = NULL;
6067         }
6068     }
6069 
6070     if (0) {
6071 reconnect:
6072         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6073         conn_md_sync(connection);
6074     }
6075     if (0) {
6076 disconnect:
6077         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
6078     }
6079 
6080     drbd_info(connection, "ack_receiver terminated\n");
6081 
6082     return 0;
6083 }
6084 
6085 void drbd_send_acks_wf(struct work_struct *ws)
6086 {
6087     struct drbd_peer_device *peer_device =
6088         container_of(ws, struct drbd_peer_device, send_acks_work);
6089     struct drbd_connection *connection = peer_device->connection;
6090     struct drbd_device *device = peer_device->device;
6091     struct net_conf *nc;
6092     int tcp_cork, err;
6093 
6094     rcu_read_lock();
6095     nc = rcu_dereference(connection->net_conf);
6096     tcp_cork = nc->tcp_cork;
6097     rcu_read_unlock();
6098 
6099     if (tcp_cork)
6100         tcp_sock_set_cork(connection->meta.socket->sk, true);
6101 
6102     err = drbd_finish_peer_reqs(device);
6103     kref_put(&device->kref, drbd_destroy_device);
6104     /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6105        struct work_struct send_acks_work alive, which is in the peer_device object */
6106 
6107     if (err) {
6108         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6109         return;
6110     }
6111 
6112     if (tcp_cork)
6113         tcp_sock_set_cork(connection->meta.socket->sk, false);
6114 
6115     return;
6116 }