Back to home page

OSCL-LXR

 
 

    


0001 // SPDX-License-Identifier: GPL-2.0-only
0002 /******************************************************************************
0003 *******************************************************************************
0004 **
0005 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
0006 **  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
0007 **
0008 **
0009 *******************************************************************************
0010 ******************************************************************************/
0011 
0012 /*
0013  * lowcomms.c
0014  *
0015  * This is the "low-level" comms layer.
0016  *
0017  * It is responsible for sending/receiving messages
0018  * from other nodes in the cluster.
0019  *
0020  * Cluster nodes are referred to by their nodeids. nodeids are
0021  * simply 32 bit numbers to the locking module - if they need to
0022  * be expanded for the cluster infrastructure then that is its
0023  * responsibility. It is this layer's
0024  * responsibility to resolve these into IP address or
0025  * whatever it needs for inter-node communication.
0026  *
0027  * The comms level is two kernel threads that deal mainly with
0028  * the receiving of messages from other nodes and passing them
0029  * up to the mid-level comms layer (which understands the
0030  * message format) for execution by the locking core, and
0031  * a send thread which does all the setting up of connections
0032  * to remote nodes and the sending of data. Threads are not allowed
0033  * to send their own data because it may cause them to wait in times
0034  * of high load. Also, this way, the sending thread can collect together
0035  * messages bound for one node and send them in one block.
0036  *
0037  * lowcomms will choose to use either TCP or SCTP as its transport layer
0038  * depending on the configuration variable 'protocol'. This should be set
0039  * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
0040  * cluster-wide mechanism as it must be the same on all nodes of the cluster
0041  * for the DLM to function.
0042  *
0043  */
0044 
0045 #include <asm/ioctls.h>
0046 #include <net/sock.h>
0047 #include <net/tcp.h>
0048 #include <linux/pagemap.h>
0049 #include <linux/file.h>
0050 #include <linux/mutex.h>
0051 #include <linux/sctp.h>
0052 #include <linux/slab.h>
0053 #include <net/sctp/sctp.h>
0054 #include <net/ipv6.h>
0055 
0056 #include <trace/events/dlm.h>
0057 
0058 #include "dlm_internal.h"
0059 #include "lowcomms.h"
0060 #include "midcomms.h"
0061 #include "memory.h"
0062 #include "config.h"
0063 
0064 #define NEEDED_RMEM (4*1024*1024)
0065 
0066 /* Number of messages to send before rescheduling */
0067 #define MAX_SEND_MSG_COUNT 25
0068 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
0069 
0070 struct connection {
0071     struct socket *sock;    /* NULL if not connected */
0072     uint32_t nodeid;    /* So we know who we are in the list */
0073     struct mutex sock_mutex;
0074     unsigned long flags;
0075 #define CF_READ_PENDING 1
0076 #define CF_WRITE_PENDING 2
0077 #define CF_INIT_PENDING 4
0078 #define CF_IS_OTHERCON 5
0079 #define CF_CLOSE 6
0080 #define CF_APP_LIMITED 7
0081 #define CF_CLOSING 8
0082 #define CF_SHUTDOWN 9
0083 #define CF_CONNECTED 10
0084 #define CF_RECONNECT 11
0085 #define CF_DELAY_CONNECT 12
0086 #define CF_EOF 13
0087     struct list_head writequeue;  /* List of outgoing writequeue_entries */
0088     spinlock_t writequeue_lock;
0089     atomic_t writequeue_cnt;
0090     int retries;
0091 #define MAX_CONNECT_RETRIES 3
0092     struct hlist_node list;
0093     struct connection *othercon;
0094     struct connection *sendcon;
0095     struct work_struct rwork; /* Receive workqueue */
0096     struct work_struct swork; /* Send workqueue */
0097     wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
0098     unsigned char *rx_buf;
0099     int rx_buflen;
0100     int rx_leftover;
0101     struct rcu_head rcu;
0102 };
0103 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
0104 
0105 struct listen_connection {
0106     struct socket *sock;
0107     struct work_struct rwork;
0108 };
0109 
0110 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end)
0111 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset)
0112 
0113 /* An entry waiting to be sent */
0114 struct writequeue_entry {
0115     struct list_head list;
0116     struct page *page;
0117     int offset;
0118     int len;
0119     int end;
0120     int users;
0121     bool dirty;
0122     struct connection *con;
0123     struct list_head msgs;
0124     struct kref ref;
0125 };
0126 
0127 struct dlm_msg {
0128     struct writequeue_entry *entry;
0129     struct dlm_msg *orig_msg;
0130     bool retransmit;
0131     void *ppc;
0132     int len;
0133     int idx; /* new()/commit() idx exchange */
0134 
0135     struct list_head list;
0136     struct kref ref;
0137 };
0138 
0139 struct dlm_node_addr {
0140     struct list_head list;
0141     int nodeid;
0142     int mark;
0143     int addr_count;
0144     int curr_addr_index;
0145     struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
0146 };
0147 
0148 struct dlm_proto_ops {
0149     bool try_new_addr;
0150     const char *name;
0151     int proto;
0152 
0153     int (*connect)(struct connection *con, struct socket *sock,
0154                struct sockaddr *addr, int addr_len);
0155     void (*sockopts)(struct socket *sock);
0156     int (*bind)(struct socket *sock);
0157     int (*listen_validate)(void);
0158     void (*listen_sockopts)(struct socket *sock);
0159     int (*listen_bind)(struct socket *sock);
0160     /* What to do to shutdown */
0161     void (*shutdown_action)(struct connection *con);
0162     /* What to do to eof check */
0163     bool (*eof_condition)(struct connection *con);
0164 };
0165 
0166 static struct listen_sock_callbacks {
0167     void (*sk_error_report)(struct sock *);
0168     void (*sk_data_ready)(struct sock *);
0169     void (*sk_state_change)(struct sock *);
0170     void (*sk_write_space)(struct sock *);
0171 } listen_sock;
0172 
0173 static LIST_HEAD(dlm_node_addrs);
0174 static DEFINE_SPINLOCK(dlm_node_addrs_spin);
0175 
0176 static struct listen_connection listen_con;
0177 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
0178 static int dlm_local_count;
0179 int dlm_allow_conn;
0180 
0181 /* Work queues */
0182 static struct workqueue_struct *recv_workqueue;
0183 static struct workqueue_struct *send_workqueue;
0184 
0185 static struct hlist_head connection_hash[CONN_HASH_SIZE];
0186 static DEFINE_SPINLOCK(connections_lock);
0187 DEFINE_STATIC_SRCU(connections_srcu);
0188 
0189 static const struct dlm_proto_ops *dlm_proto_ops;
0190 
0191 static void process_recv_sockets(struct work_struct *work);
0192 static void process_send_sockets(struct work_struct *work);
0193 
0194 static void writequeue_entry_ctor(void *data)
0195 {
0196     struct writequeue_entry *entry = data;
0197 
0198     INIT_LIST_HEAD(&entry->msgs);
0199 }
0200 
0201 struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
0202 {
0203     return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry),
0204                  0, 0, writequeue_entry_ctor);
0205 }
0206 
0207 struct kmem_cache *dlm_lowcomms_msg_cache_create(void)
0208 {
0209     return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL);
0210 }
0211 
0212 /* need to held writequeue_lock */
0213 static struct writequeue_entry *con_next_wq(struct connection *con)
0214 {
0215     struct writequeue_entry *e;
0216 
0217     if (list_empty(&con->writequeue))
0218         return NULL;
0219 
0220     e = list_first_entry(&con->writequeue, struct writequeue_entry,
0221                  list);
0222     /* if len is zero nothing is to send, if there are users filling
0223      * buffers we wait until the users are done so we can send more.
0224      */
0225     if (e->users || e->len == 0)
0226         return NULL;
0227 
0228     return e;
0229 }
0230 
0231 static struct connection *__find_con(int nodeid, int r)
0232 {
0233     struct connection *con;
0234 
0235     hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
0236         if (con->nodeid == nodeid)
0237             return con;
0238     }
0239 
0240     return NULL;
0241 }
0242 
0243 static bool tcp_eof_condition(struct connection *con)
0244 {
0245     return atomic_read(&con->writequeue_cnt);
0246 }
0247 
0248 static int dlm_con_init(struct connection *con, int nodeid)
0249 {
0250     con->rx_buflen = dlm_config.ci_buffer_size;
0251     con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
0252     if (!con->rx_buf)
0253         return -ENOMEM;
0254 
0255     con->nodeid = nodeid;
0256     mutex_init(&con->sock_mutex);
0257     INIT_LIST_HEAD(&con->writequeue);
0258     spin_lock_init(&con->writequeue_lock);
0259     atomic_set(&con->writequeue_cnt, 0);
0260     INIT_WORK(&con->swork, process_send_sockets);
0261     INIT_WORK(&con->rwork, process_recv_sockets);
0262     init_waitqueue_head(&con->shutdown_wait);
0263 
0264     return 0;
0265 }
0266 
0267 /*
0268  * If 'allocation' is zero then we don't attempt to create a new
0269  * connection structure for this node.
0270  */
0271 static struct connection *nodeid2con(int nodeid, gfp_t alloc)
0272 {
0273     struct connection *con, *tmp;
0274     int r, ret;
0275 
0276     r = nodeid_hash(nodeid);
0277     con = __find_con(nodeid, r);
0278     if (con || !alloc)
0279         return con;
0280 
0281     con = kzalloc(sizeof(*con), alloc);
0282     if (!con)
0283         return NULL;
0284 
0285     ret = dlm_con_init(con, nodeid);
0286     if (ret) {
0287         kfree(con);
0288         return NULL;
0289     }
0290 
0291     spin_lock(&connections_lock);
0292     /* Because multiple workqueues/threads calls this function it can
0293      * race on multiple cpu's. Instead of locking hot path __find_con()
0294      * we just check in rare cases of recently added nodes again
0295      * under protection of connections_lock. If this is the case we
0296      * abort our connection creation and return the existing connection.
0297      */
0298     tmp = __find_con(nodeid, r);
0299     if (tmp) {
0300         spin_unlock(&connections_lock);
0301         kfree(con->rx_buf);
0302         kfree(con);
0303         return tmp;
0304     }
0305 
0306     hlist_add_head_rcu(&con->list, &connection_hash[r]);
0307     spin_unlock(&connections_lock);
0308 
0309     return con;
0310 }
0311 
0312 /* Loop round all connections */
0313 static void foreach_conn(void (*conn_func)(struct connection *c))
0314 {
0315     int i;
0316     struct connection *con;
0317 
0318     for (i = 0; i < CONN_HASH_SIZE; i++) {
0319         hlist_for_each_entry_rcu(con, &connection_hash[i], list)
0320             conn_func(con);
0321     }
0322 }
0323 
0324 static struct dlm_node_addr *find_node_addr(int nodeid)
0325 {
0326     struct dlm_node_addr *na;
0327 
0328     list_for_each_entry(na, &dlm_node_addrs, list) {
0329         if (na->nodeid == nodeid)
0330             return na;
0331     }
0332     return NULL;
0333 }
0334 
0335 static int addr_compare(const struct sockaddr_storage *x,
0336             const struct sockaddr_storage *y)
0337 {
0338     switch (x->ss_family) {
0339     case AF_INET: {
0340         struct sockaddr_in *sinx = (struct sockaddr_in *)x;
0341         struct sockaddr_in *siny = (struct sockaddr_in *)y;
0342         if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
0343             return 0;
0344         if (sinx->sin_port != siny->sin_port)
0345             return 0;
0346         break;
0347     }
0348     case AF_INET6: {
0349         struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
0350         struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
0351         if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
0352             return 0;
0353         if (sinx->sin6_port != siny->sin6_port)
0354             return 0;
0355         break;
0356     }
0357     default:
0358         return 0;
0359     }
0360     return 1;
0361 }
0362 
0363 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
0364               struct sockaddr *sa_out, bool try_new_addr,
0365               unsigned int *mark)
0366 {
0367     struct sockaddr_storage sas;
0368     struct dlm_node_addr *na;
0369 
0370     if (!dlm_local_count)
0371         return -1;
0372 
0373     spin_lock(&dlm_node_addrs_spin);
0374     na = find_node_addr(nodeid);
0375     if (na && na->addr_count) {
0376         memcpy(&sas, na->addr[na->curr_addr_index],
0377                sizeof(struct sockaddr_storage));
0378 
0379         if (try_new_addr) {
0380             na->curr_addr_index++;
0381             if (na->curr_addr_index == na->addr_count)
0382                 na->curr_addr_index = 0;
0383         }
0384     }
0385     spin_unlock(&dlm_node_addrs_spin);
0386 
0387     if (!na)
0388         return -EEXIST;
0389 
0390     if (!na->addr_count)
0391         return -ENOENT;
0392 
0393     *mark = na->mark;
0394 
0395     if (sas_out)
0396         memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
0397 
0398     if (!sa_out)
0399         return 0;
0400 
0401     if (dlm_local_addr[0]->ss_family == AF_INET) {
0402         struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
0403         struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
0404         ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
0405     } else {
0406         struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
0407         struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
0408         ret6->sin6_addr = in6->sin6_addr;
0409     }
0410 
0411     return 0;
0412 }
0413 
0414 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid,
0415               unsigned int *mark)
0416 {
0417     struct dlm_node_addr *na;
0418     int rv = -EEXIST;
0419     int addr_i;
0420 
0421     spin_lock(&dlm_node_addrs_spin);
0422     list_for_each_entry(na, &dlm_node_addrs, list) {
0423         if (!na->addr_count)
0424             continue;
0425 
0426         for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
0427             if (addr_compare(na->addr[addr_i], addr)) {
0428                 *nodeid = na->nodeid;
0429                 *mark = na->mark;
0430                 rv = 0;
0431                 goto unlock;
0432             }
0433         }
0434     }
0435 unlock:
0436     spin_unlock(&dlm_node_addrs_spin);
0437     return rv;
0438 }
0439 
0440 /* caller need to held dlm_node_addrs_spin lock */
0441 static bool dlm_lowcomms_na_has_addr(const struct dlm_node_addr *na,
0442                      const struct sockaddr_storage *addr)
0443 {
0444     int i;
0445 
0446     for (i = 0; i < na->addr_count; i++) {
0447         if (addr_compare(na->addr[i], addr))
0448             return true;
0449     }
0450 
0451     return false;
0452 }
0453 
0454 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
0455 {
0456     struct sockaddr_storage *new_addr;
0457     struct dlm_node_addr *new_node, *na;
0458     bool ret;
0459 
0460     new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
0461     if (!new_node)
0462         return -ENOMEM;
0463 
0464     new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
0465     if (!new_addr) {
0466         kfree(new_node);
0467         return -ENOMEM;
0468     }
0469 
0470     memcpy(new_addr, addr, len);
0471 
0472     spin_lock(&dlm_node_addrs_spin);
0473     na = find_node_addr(nodeid);
0474     if (!na) {
0475         new_node->nodeid = nodeid;
0476         new_node->addr[0] = new_addr;
0477         new_node->addr_count = 1;
0478         new_node->mark = dlm_config.ci_mark;
0479         list_add(&new_node->list, &dlm_node_addrs);
0480         spin_unlock(&dlm_node_addrs_spin);
0481         return 0;
0482     }
0483 
0484     ret = dlm_lowcomms_na_has_addr(na, addr);
0485     if (ret) {
0486         spin_unlock(&dlm_node_addrs_spin);
0487         kfree(new_addr);
0488         kfree(new_node);
0489         return -EEXIST;
0490     }
0491 
0492     if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
0493         spin_unlock(&dlm_node_addrs_spin);
0494         kfree(new_addr);
0495         kfree(new_node);
0496         return -ENOSPC;
0497     }
0498 
0499     na->addr[na->addr_count++] = new_addr;
0500     spin_unlock(&dlm_node_addrs_spin);
0501     kfree(new_node);
0502     return 0;
0503 }
0504 
0505 /* Data available on socket or listen socket received a connect */
0506 static void lowcomms_data_ready(struct sock *sk)
0507 {
0508     struct connection *con;
0509 
0510     con = sock2con(sk);
0511     if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
0512         queue_work(recv_workqueue, &con->rwork);
0513 }
0514 
0515 static void lowcomms_listen_data_ready(struct sock *sk)
0516 {
0517     if (!dlm_allow_conn)
0518         return;
0519 
0520     queue_work(recv_workqueue, &listen_con.rwork);
0521 }
0522 
0523 static void lowcomms_write_space(struct sock *sk)
0524 {
0525     struct connection *con;
0526 
0527     con = sock2con(sk);
0528     if (!con)
0529         return;
0530 
0531     if (!test_and_set_bit(CF_CONNECTED, &con->flags)) {
0532         log_print("connected to node %d", con->nodeid);
0533         queue_work(send_workqueue, &con->swork);
0534         return;
0535     }
0536 
0537     clear_bit(SOCK_NOSPACE, &con->sock->flags);
0538 
0539     if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
0540         con->sock->sk->sk_write_pending--;
0541         clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
0542     }
0543 
0544     queue_work(send_workqueue, &con->swork);
0545 }
0546 
0547 static inline void lowcomms_connect_sock(struct connection *con)
0548 {
0549     if (test_bit(CF_CLOSE, &con->flags))
0550         return;
0551     queue_work(send_workqueue, &con->swork);
0552     cond_resched();
0553 }
0554 
0555 static void lowcomms_state_change(struct sock *sk)
0556 {
0557     /* SCTP layer is not calling sk_data_ready when the connection
0558      * is done, so we catch the signal through here. Also, it
0559      * doesn't switch socket state when entering shutdown, so we
0560      * skip the write in that case.
0561      */
0562     if (sk->sk_shutdown) {
0563         if (sk->sk_shutdown == RCV_SHUTDOWN)
0564             lowcomms_data_ready(sk);
0565     } else if (sk->sk_state == TCP_ESTABLISHED) {
0566         lowcomms_write_space(sk);
0567     }
0568 }
0569 
0570 int dlm_lowcomms_connect_node(int nodeid)
0571 {
0572     struct connection *con;
0573     int idx;
0574 
0575     if (nodeid == dlm_our_nodeid())
0576         return 0;
0577 
0578     idx = srcu_read_lock(&connections_srcu);
0579     con = nodeid2con(nodeid, GFP_NOFS);
0580     if (!con) {
0581         srcu_read_unlock(&connections_srcu, idx);
0582         return -ENOMEM;
0583     }
0584 
0585     lowcomms_connect_sock(con);
0586     srcu_read_unlock(&connections_srcu, idx);
0587 
0588     return 0;
0589 }
0590 
0591 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
0592 {
0593     struct dlm_node_addr *na;
0594 
0595     spin_lock(&dlm_node_addrs_spin);
0596     na = find_node_addr(nodeid);
0597     if (!na) {
0598         spin_unlock(&dlm_node_addrs_spin);
0599         return -ENOENT;
0600     }
0601 
0602     na->mark = mark;
0603     spin_unlock(&dlm_node_addrs_spin);
0604 
0605     return 0;
0606 }
0607 
0608 static void lowcomms_error_report(struct sock *sk)
0609 {
0610     struct connection *con;
0611     void (*orig_report)(struct sock *) = NULL;
0612     struct inet_sock *inet;
0613 
0614     con = sock2con(sk);
0615     if (con == NULL)
0616         goto out;
0617 
0618     orig_report = listen_sock.sk_error_report;
0619 
0620     inet = inet_sk(sk);
0621     switch (sk->sk_family) {
0622     case AF_INET:
0623         printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
0624                    "sending to node %d at %pI4, dport %d, "
0625                    "sk_err=%d/%d\n", dlm_our_nodeid(),
0626                    con->nodeid, &inet->inet_daddr,
0627                    ntohs(inet->inet_dport), sk->sk_err,
0628                    sk->sk_err_soft);
0629         break;
0630 #if IS_ENABLED(CONFIG_IPV6)
0631     case AF_INET6:
0632         printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
0633                    "sending to node %d at %pI6c, "
0634                    "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(),
0635                    con->nodeid, &sk->sk_v6_daddr,
0636                    ntohs(inet->inet_dport), sk->sk_err,
0637                    sk->sk_err_soft);
0638         break;
0639 #endif
0640     default:
0641         printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
0642                    "invalid socket family %d set, "
0643                    "sk_err=%d/%d\n", dlm_our_nodeid(),
0644                    sk->sk_family, sk->sk_err, sk->sk_err_soft);
0645         goto out;
0646     }
0647 
0648     /* below sendcon only handling */
0649     if (test_bit(CF_IS_OTHERCON, &con->flags))
0650         con = con->sendcon;
0651 
0652     switch (sk->sk_err) {
0653     case ECONNREFUSED:
0654         set_bit(CF_DELAY_CONNECT, &con->flags);
0655         break;
0656     default:
0657         break;
0658     }
0659 
0660     if (!test_and_set_bit(CF_RECONNECT, &con->flags))
0661         queue_work(send_workqueue, &con->swork);
0662 
0663 out:
0664     if (orig_report)
0665         orig_report(sk);
0666 }
0667 
0668 /* Note: sk_callback_lock must be locked before calling this function. */
0669 static void save_listen_callbacks(struct socket *sock)
0670 {
0671     struct sock *sk = sock->sk;
0672 
0673     listen_sock.sk_data_ready = sk->sk_data_ready;
0674     listen_sock.sk_state_change = sk->sk_state_change;
0675     listen_sock.sk_write_space = sk->sk_write_space;
0676     listen_sock.sk_error_report = sk->sk_error_report;
0677 }
0678 
0679 static void restore_callbacks(struct socket *sock)
0680 {
0681     struct sock *sk = sock->sk;
0682 
0683     lock_sock(sk);
0684     sk->sk_user_data = NULL;
0685     sk->sk_data_ready = listen_sock.sk_data_ready;
0686     sk->sk_state_change = listen_sock.sk_state_change;
0687     sk->sk_write_space = listen_sock.sk_write_space;
0688     sk->sk_error_report = listen_sock.sk_error_report;
0689     release_sock(sk);
0690 }
0691 
0692 static void add_listen_sock(struct socket *sock, struct listen_connection *con)
0693 {
0694     struct sock *sk = sock->sk;
0695 
0696     lock_sock(sk);
0697     save_listen_callbacks(sock);
0698     con->sock = sock;
0699 
0700     sk->sk_user_data = con;
0701     sk->sk_allocation = GFP_NOFS;
0702     /* Install a data_ready callback */
0703     sk->sk_data_ready = lowcomms_listen_data_ready;
0704     release_sock(sk);
0705 }
0706 
0707 /* Make a socket active */
0708 static void add_sock(struct socket *sock, struct connection *con)
0709 {
0710     struct sock *sk = sock->sk;
0711 
0712     lock_sock(sk);
0713     con->sock = sock;
0714 
0715     sk->sk_user_data = con;
0716     /* Install a data_ready callback */
0717     sk->sk_data_ready = lowcomms_data_ready;
0718     sk->sk_write_space = lowcomms_write_space;
0719     sk->sk_state_change = lowcomms_state_change;
0720     sk->sk_allocation = GFP_NOFS;
0721     sk->sk_error_report = lowcomms_error_report;
0722     release_sock(sk);
0723 }
0724 
0725 /* Add the port number to an IPv6 or 4 sockaddr and return the address
0726    length */
0727 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
0728               int *addr_len)
0729 {
0730     saddr->ss_family =  dlm_local_addr[0]->ss_family;
0731     if (saddr->ss_family == AF_INET) {
0732         struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
0733         in4_addr->sin_port = cpu_to_be16(port);
0734         *addr_len = sizeof(struct sockaddr_in);
0735         memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
0736     } else {
0737         struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
0738         in6_addr->sin6_port = cpu_to_be16(port);
0739         *addr_len = sizeof(struct sockaddr_in6);
0740     }
0741     memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
0742 }
0743 
0744 static void dlm_page_release(struct kref *kref)
0745 {
0746     struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
0747                           ref);
0748 
0749     __free_page(e->page);
0750     dlm_free_writequeue(e);
0751 }
0752 
0753 static void dlm_msg_release(struct kref *kref)
0754 {
0755     struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
0756 
0757     kref_put(&msg->entry->ref, dlm_page_release);
0758     dlm_free_msg(msg);
0759 }
0760 
0761 static void free_entry(struct writequeue_entry *e)
0762 {
0763     struct dlm_msg *msg, *tmp;
0764 
0765     list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
0766         if (msg->orig_msg) {
0767             msg->orig_msg->retransmit = false;
0768             kref_put(&msg->orig_msg->ref, dlm_msg_release);
0769         }
0770 
0771         list_del(&msg->list);
0772         kref_put(&msg->ref, dlm_msg_release);
0773     }
0774 
0775     list_del(&e->list);
0776     atomic_dec(&e->con->writequeue_cnt);
0777     kref_put(&e->ref, dlm_page_release);
0778 }
0779 
0780 static void dlm_close_sock(struct socket **sock)
0781 {
0782     if (*sock) {
0783         restore_callbacks(*sock);
0784         sock_release(*sock);
0785         *sock = NULL;
0786     }
0787 }
0788 
0789 /* Close a remote connection and tidy up */
0790 static void close_connection(struct connection *con, bool and_other,
0791                  bool tx, bool rx)
0792 {
0793     bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
0794     struct writequeue_entry *e;
0795 
0796     if (tx && !closing && cancel_work_sync(&con->swork)) {
0797         log_print("canceled swork for node %d", con->nodeid);
0798         clear_bit(CF_WRITE_PENDING, &con->flags);
0799     }
0800     if (rx && !closing && cancel_work_sync(&con->rwork)) {
0801         log_print("canceled rwork for node %d", con->nodeid);
0802         clear_bit(CF_READ_PENDING, &con->flags);
0803     }
0804 
0805     mutex_lock(&con->sock_mutex);
0806     dlm_close_sock(&con->sock);
0807 
0808     if (con->othercon && and_other) {
0809         /* Will only re-enter once. */
0810         close_connection(con->othercon, false, tx, rx);
0811     }
0812 
0813     /* if we send a writequeue entry only a half way, we drop the
0814      * whole entry because reconnection and that we not start of the
0815      * middle of a msg which will confuse the other end.
0816      *
0817      * we can always drop messages because retransmits, but what we
0818      * cannot allow is to transmit half messages which may be processed
0819      * at the other side.
0820      *
0821      * our policy is to start on a clean state when disconnects, we don't
0822      * know what's send/received on transport layer in this case.
0823      */
0824     spin_lock(&con->writequeue_lock);
0825     if (!list_empty(&con->writequeue)) {
0826         e = list_first_entry(&con->writequeue, struct writequeue_entry,
0827                      list);
0828         if (e->dirty)
0829             free_entry(e);
0830     }
0831     spin_unlock(&con->writequeue_lock);
0832 
0833     con->rx_leftover = 0;
0834     con->retries = 0;
0835     clear_bit(CF_APP_LIMITED, &con->flags);
0836     clear_bit(CF_CONNECTED, &con->flags);
0837     clear_bit(CF_DELAY_CONNECT, &con->flags);
0838     clear_bit(CF_RECONNECT, &con->flags);
0839     clear_bit(CF_EOF, &con->flags);
0840     mutex_unlock(&con->sock_mutex);
0841     clear_bit(CF_CLOSING, &con->flags);
0842 }
0843 
0844 static void shutdown_connection(struct connection *con)
0845 {
0846     int ret;
0847 
0848     flush_work(&con->swork);
0849 
0850     mutex_lock(&con->sock_mutex);
0851     /* nothing to shutdown */
0852     if (!con->sock) {
0853         mutex_unlock(&con->sock_mutex);
0854         return;
0855     }
0856 
0857     set_bit(CF_SHUTDOWN, &con->flags);
0858     ret = kernel_sock_shutdown(con->sock, SHUT_WR);
0859     mutex_unlock(&con->sock_mutex);
0860     if (ret) {
0861         log_print("Connection %p failed to shutdown: %d will force close",
0862               con, ret);
0863         goto force_close;
0864     } else {
0865         ret = wait_event_timeout(con->shutdown_wait,
0866                      !test_bit(CF_SHUTDOWN, &con->flags),
0867                      DLM_SHUTDOWN_WAIT_TIMEOUT);
0868         if (ret == 0) {
0869             log_print("Connection %p shutdown timed out, will force close",
0870                   con);
0871             goto force_close;
0872         }
0873     }
0874 
0875     return;
0876 
0877 force_close:
0878     clear_bit(CF_SHUTDOWN, &con->flags);
0879     close_connection(con, false, true, true);
0880 }
0881 
0882 static void dlm_tcp_shutdown(struct connection *con)
0883 {
0884     if (con->othercon)
0885         shutdown_connection(con->othercon);
0886     shutdown_connection(con);
0887 }
0888 
0889 static int con_realloc_receive_buf(struct connection *con, int newlen)
0890 {
0891     unsigned char *newbuf;
0892 
0893     newbuf = kmalloc(newlen, GFP_NOFS);
0894     if (!newbuf)
0895         return -ENOMEM;
0896 
0897     /* copy any leftover from last receive */
0898     if (con->rx_leftover)
0899         memmove(newbuf, con->rx_buf, con->rx_leftover);
0900 
0901     /* swap to new buffer space */
0902     kfree(con->rx_buf);
0903     con->rx_buflen = newlen;
0904     con->rx_buf = newbuf;
0905 
0906     return 0;
0907 }
0908 
0909 /* Data received from remote end */
0910 static int receive_from_sock(struct connection *con)
0911 {
0912     struct msghdr msg;
0913     struct kvec iov;
0914     int ret, buflen;
0915 
0916     mutex_lock(&con->sock_mutex);
0917 
0918     if (con->sock == NULL) {
0919         ret = -EAGAIN;
0920         goto out_close;
0921     }
0922 
0923     /* realloc if we get new buffer size to read out */
0924     buflen = dlm_config.ci_buffer_size;
0925     if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
0926         ret = con_realloc_receive_buf(con, buflen);
0927         if (ret < 0)
0928             goto out_resched;
0929     }
0930 
0931     for (;;) {
0932         /* calculate new buffer parameter regarding last receive and
0933          * possible leftover bytes
0934          */
0935         iov.iov_base = con->rx_buf + con->rx_leftover;
0936         iov.iov_len = con->rx_buflen - con->rx_leftover;
0937 
0938         memset(&msg, 0, sizeof(msg));
0939         msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
0940         ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
0941                      msg.msg_flags);
0942         trace_dlm_recv(con->nodeid, ret);
0943         if (ret == -EAGAIN)
0944             break;
0945         else if (ret <= 0)
0946             goto out_close;
0947 
0948         /* new buflen according readed bytes and leftover from last receive */
0949         buflen = ret + con->rx_leftover;
0950         ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
0951         if (ret < 0)
0952             goto out_close;
0953 
0954         /* calculate leftover bytes from process and put it into begin of
0955          * the receive buffer, so next receive we have the full message
0956          * at the start address of the receive buffer.
0957          */
0958         con->rx_leftover = buflen - ret;
0959         if (con->rx_leftover) {
0960             memmove(con->rx_buf, con->rx_buf + ret,
0961                 con->rx_leftover);
0962         }
0963     }
0964 
0965     dlm_midcomms_receive_done(con->nodeid);
0966     mutex_unlock(&con->sock_mutex);
0967     return 0;
0968 
0969 out_resched:
0970     if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
0971         queue_work(recv_workqueue, &con->rwork);
0972     mutex_unlock(&con->sock_mutex);
0973     return -EAGAIN;
0974 
0975 out_close:
0976     if (ret == 0) {
0977         log_print("connection %p got EOF from %d",
0978               con, con->nodeid);
0979 
0980         if (dlm_proto_ops->eof_condition &&
0981             dlm_proto_ops->eof_condition(con)) {
0982             set_bit(CF_EOF, &con->flags);
0983             mutex_unlock(&con->sock_mutex);
0984         } else {
0985             mutex_unlock(&con->sock_mutex);
0986             close_connection(con, false, true, false);
0987 
0988             /* handling for tcp shutdown */
0989             clear_bit(CF_SHUTDOWN, &con->flags);
0990             wake_up(&con->shutdown_wait);
0991         }
0992 
0993         /* signal to breaking receive worker */
0994         ret = -1;
0995     } else {
0996         mutex_unlock(&con->sock_mutex);
0997     }
0998     return ret;
0999 }
1000 
1001 /* Listening socket is busy, accept a connection */
1002 static int accept_from_sock(struct listen_connection *con)
1003 {
1004     int result;
1005     struct sockaddr_storage peeraddr;
1006     struct socket *newsock;
1007     int len, idx;
1008     int nodeid;
1009     struct connection *newcon;
1010     struct connection *addcon;
1011     unsigned int mark;
1012 
1013     if (!con->sock)
1014         return -ENOTCONN;
1015 
1016     result = kernel_accept(con->sock, &newsock, O_NONBLOCK);
1017     if (result < 0)
1018         goto accept_err;
1019 
1020     /* Get the connected socket's peer */
1021     memset(&peeraddr, 0, sizeof(peeraddr));
1022     len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2);
1023     if (len < 0) {
1024         result = -ECONNABORTED;
1025         goto accept_err;
1026     }
1027 
1028     /* Get the new node's NODEID */
1029     make_sockaddr(&peeraddr, 0, &len);
1030     if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) {
1031         switch (peeraddr.ss_family) {
1032         case AF_INET: {
1033             struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr;
1034 
1035             log_print("connect from non cluster IPv4 node %pI4",
1036                   &sin->sin_addr);
1037             break;
1038         }
1039 #if IS_ENABLED(CONFIG_IPV6)
1040         case AF_INET6: {
1041             struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr;
1042 
1043             log_print("connect from non cluster IPv6 node %pI6c",
1044                   &sin6->sin6_addr);
1045             break;
1046         }
1047 #endif
1048         default:
1049             log_print("invalid family from non cluster node");
1050             break;
1051         }
1052 
1053         sock_release(newsock);
1054         return -1;
1055     }
1056 
1057     log_print("got connection from %d", nodeid);
1058 
1059     /*  Check to see if we already have a connection to this node. This
1060      *  could happen if the two nodes initiate a connection at roughly
1061      *  the same time and the connections cross on the wire.
1062      *  In this case we store the incoming one in "othercon"
1063      */
1064     idx = srcu_read_lock(&connections_srcu);
1065     newcon = nodeid2con(nodeid, GFP_NOFS);
1066     if (!newcon) {
1067         srcu_read_unlock(&connections_srcu, idx);
1068         result = -ENOMEM;
1069         goto accept_err;
1070     }
1071 
1072     sock_set_mark(newsock->sk, mark);
1073 
1074     mutex_lock(&newcon->sock_mutex);
1075     if (newcon->sock) {
1076         struct connection *othercon = newcon->othercon;
1077 
1078         if (!othercon) {
1079             othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
1080             if (!othercon) {
1081                 log_print("failed to allocate incoming socket");
1082                 mutex_unlock(&newcon->sock_mutex);
1083                 srcu_read_unlock(&connections_srcu, idx);
1084                 result = -ENOMEM;
1085                 goto accept_err;
1086             }
1087 
1088             result = dlm_con_init(othercon, nodeid);
1089             if (result < 0) {
1090                 kfree(othercon);
1091                 mutex_unlock(&newcon->sock_mutex);
1092                 srcu_read_unlock(&connections_srcu, idx);
1093                 goto accept_err;
1094             }
1095 
1096             lockdep_set_subclass(&othercon->sock_mutex, 1);
1097             set_bit(CF_IS_OTHERCON, &othercon->flags);
1098             newcon->othercon = othercon;
1099             othercon->sendcon = newcon;
1100         } else {
1101             /* close other sock con if we have something new */
1102             close_connection(othercon, false, true, false);
1103         }
1104 
1105         mutex_lock(&othercon->sock_mutex);
1106         add_sock(newsock, othercon);
1107         addcon = othercon;
1108         mutex_unlock(&othercon->sock_mutex);
1109     }
1110     else {
1111         /* accept copies the sk after we've saved the callbacks, so we
1112            don't want to save them a second time or comm errors will
1113            result in calling sk_error_report recursively. */
1114         add_sock(newsock, newcon);
1115         addcon = newcon;
1116     }
1117 
1118     set_bit(CF_CONNECTED, &addcon->flags);
1119     mutex_unlock(&newcon->sock_mutex);
1120 
1121     /*
1122      * Add it to the active queue in case we got data
1123      * between processing the accept adding the socket
1124      * to the read_sockets list
1125      */
1126     if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
1127         queue_work(recv_workqueue, &addcon->rwork);
1128 
1129     srcu_read_unlock(&connections_srcu, idx);
1130 
1131     return 0;
1132 
1133 accept_err:
1134     if (newsock)
1135         sock_release(newsock);
1136 
1137     if (result != -EAGAIN)
1138         log_print("error accepting connection from node: %d", result);
1139     return result;
1140 }
1141 
1142 /*
1143  * writequeue_entry_complete - try to delete and free write queue entry
1144  * @e: write queue entry to try to delete
1145  * @completed: bytes completed
1146  *
1147  * writequeue_lock must be held.
1148  */
1149 static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
1150 {
1151     e->offset += completed;
1152     e->len -= completed;
1153     /* signal that page was half way transmitted */
1154     e->dirty = true;
1155 
1156     if (e->len == 0 && e->users == 0)
1157         free_entry(e);
1158 }
1159 
1160 /*
1161  * sctp_bind_addrs - bind a SCTP socket to all our addresses
1162  */
1163 static int sctp_bind_addrs(struct socket *sock, uint16_t port)
1164 {
1165     struct sockaddr_storage localaddr;
1166     struct sockaddr *addr = (struct sockaddr *)&localaddr;
1167     int i, addr_len, result = 0;
1168 
1169     for (i = 0; i < dlm_local_count; i++) {
1170         memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
1171         make_sockaddr(&localaddr, port, &addr_len);
1172 
1173         if (!i)
1174             result = kernel_bind(sock, addr, addr_len);
1175         else
1176             result = sock_bind_add(sock->sk, addr, addr_len);
1177 
1178         if (result < 0) {
1179             log_print("Can't bind to %d addr number %d, %d.\n",
1180                   port, i + 1, result);
1181             break;
1182         }
1183     }
1184     return result;
1185 }
1186 
1187 /* Get local addresses */
1188 static void init_local(void)
1189 {
1190     struct sockaddr_storage sas, *addr;
1191     int i;
1192 
1193     dlm_local_count = 0;
1194     for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1195         if (dlm_our_addr(&sas, i))
1196             break;
1197 
1198         addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS);
1199         if (!addr)
1200             break;
1201         dlm_local_addr[dlm_local_count++] = addr;
1202     }
1203 }
1204 
1205 static void deinit_local(void)
1206 {
1207     int i;
1208 
1209     for (i = 0; i < dlm_local_count; i++)
1210         kfree(dlm_local_addr[i]);
1211 }
1212 
1213 static struct writequeue_entry *new_writequeue_entry(struct connection *con)
1214 {
1215     struct writequeue_entry *entry;
1216 
1217     entry = dlm_allocate_writequeue();
1218     if (!entry)
1219         return NULL;
1220 
1221     entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO);
1222     if (!entry->page) {
1223         dlm_free_writequeue(entry);
1224         return NULL;
1225     }
1226 
1227     entry->offset = 0;
1228     entry->len = 0;
1229     entry->end = 0;
1230     entry->dirty = false;
1231     entry->con = con;
1232     entry->users = 1;
1233     kref_init(&entry->ref);
1234     return entry;
1235 }
1236 
1237 static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
1238                          char **ppc, void (*cb)(void *data),
1239                          void *data)
1240 {
1241     struct writequeue_entry *e;
1242 
1243     spin_lock(&con->writequeue_lock);
1244     if (!list_empty(&con->writequeue)) {
1245         e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
1246         if (DLM_WQ_REMAIN_BYTES(e) >= len) {
1247             kref_get(&e->ref);
1248 
1249             *ppc = page_address(e->page) + e->end;
1250             if (cb)
1251                 cb(data);
1252 
1253             e->end += len;
1254             e->users++;
1255             goto out;
1256         }
1257     }
1258 
1259     e = new_writequeue_entry(con);
1260     if (!e)
1261         goto out;
1262 
1263     kref_get(&e->ref);
1264     *ppc = page_address(e->page);
1265     e->end += len;
1266     atomic_inc(&con->writequeue_cnt);
1267     if (cb)
1268         cb(data);
1269 
1270     list_add_tail(&e->list, &con->writequeue);
1271 
1272 out:
1273     spin_unlock(&con->writequeue_lock);
1274     return e;
1275 };
1276 
1277 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
1278                         gfp_t allocation, char **ppc,
1279                         void (*cb)(void *data),
1280                         void *data)
1281 {
1282     struct writequeue_entry *e;
1283     struct dlm_msg *msg;
1284 
1285     msg = dlm_allocate_msg(allocation);
1286     if (!msg)
1287         return NULL;
1288 
1289     kref_init(&msg->ref);
1290 
1291     e = new_wq_entry(con, len, ppc, cb, data);
1292     if (!e) {
1293         dlm_free_msg(msg);
1294         return NULL;
1295     }
1296 
1297     msg->retransmit = false;
1298     msg->orig_msg = NULL;
1299     msg->ppc = *ppc;
1300     msg->len = len;
1301     msg->entry = e;
1302 
1303     return msg;
1304 }
1305 
1306 /* avoid false positive for nodes_srcu, unlock happens in
1307  * dlm_lowcomms_commit_msg which is a must call if success
1308  */
1309 #ifndef __CHECKER__
1310 struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
1311                      char **ppc, void (*cb)(void *data),
1312                      void *data)
1313 {
1314     struct connection *con;
1315     struct dlm_msg *msg;
1316     int idx;
1317 
1318     if (len > DLM_MAX_SOCKET_BUFSIZE ||
1319         len < sizeof(struct dlm_header)) {
1320         BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE);
1321         log_print("failed to allocate a buffer of size %d", len);
1322         WARN_ON(1);
1323         return NULL;
1324     }
1325 
1326     idx = srcu_read_lock(&connections_srcu);
1327     con = nodeid2con(nodeid, allocation);
1328     if (!con) {
1329         srcu_read_unlock(&connections_srcu, idx);
1330         return NULL;
1331     }
1332 
1333     msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data);
1334     if (!msg) {
1335         srcu_read_unlock(&connections_srcu, idx);
1336         return NULL;
1337     }
1338 
1339     /* we assume if successful commit must called */
1340     msg->idx = idx;
1341     return msg;
1342 }
1343 #endif
1344 
1345 static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
1346 {
1347     struct writequeue_entry *e = msg->entry;
1348     struct connection *con = e->con;
1349     int users;
1350 
1351     spin_lock(&con->writequeue_lock);
1352     kref_get(&msg->ref);
1353     list_add(&msg->list, &e->msgs);
1354 
1355     users = --e->users;
1356     if (users)
1357         goto out;
1358 
1359     e->len = DLM_WQ_LENGTH_BYTES(e);
1360     spin_unlock(&con->writequeue_lock);
1361 
1362     queue_work(send_workqueue, &con->swork);
1363     return;
1364 
1365 out:
1366     spin_unlock(&con->writequeue_lock);
1367     return;
1368 }
1369 
1370 /* avoid false positive for nodes_srcu, lock was happen in
1371  * dlm_lowcomms_new_msg
1372  */
1373 #ifndef __CHECKER__
1374 void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
1375 {
1376     _dlm_lowcomms_commit_msg(msg);
1377     srcu_read_unlock(&connections_srcu, msg->idx);
1378 }
1379 #endif
1380 
1381 void dlm_lowcomms_put_msg(struct dlm_msg *msg)
1382 {
1383     kref_put(&msg->ref, dlm_msg_release);
1384 }
1385 
1386 /* does not held connections_srcu, usage workqueue only */
1387 int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
1388 {
1389     struct dlm_msg *msg_resend;
1390     char *ppc;
1391 
1392     if (msg->retransmit)
1393         return 1;
1394 
1395     msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len,
1396                           GFP_ATOMIC, &ppc, NULL, NULL);
1397     if (!msg_resend)
1398         return -ENOMEM;
1399 
1400     msg->retransmit = true;
1401     kref_get(&msg->ref);
1402     msg_resend->orig_msg = msg;
1403 
1404     memcpy(ppc, msg->ppc, msg->len);
1405     _dlm_lowcomms_commit_msg(msg_resend);
1406     dlm_lowcomms_put_msg(msg_resend);
1407 
1408     return 0;
1409 }
1410 
1411 /* Send a message */
1412 static void send_to_sock(struct connection *con)
1413 {
1414     const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1415     struct writequeue_entry *e;
1416     int len, offset, ret;
1417     int count = 0;
1418 
1419     mutex_lock(&con->sock_mutex);
1420     if (con->sock == NULL)
1421         goto out_connect;
1422 
1423     spin_lock(&con->writequeue_lock);
1424     for (;;) {
1425         e = con_next_wq(con);
1426         if (!e)
1427             break;
1428 
1429         len = e->len;
1430         offset = e->offset;
1431         BUG_ON(len == 0 && e->users == 0);
1432         spin_unlock(&con->writequeue_lock);
1433 
1434         ret = kernel_sendpage(con->sock, e->page, offset, len,
1435                       msg_flags);
1436         trace_dlm_send(con->nodeid, ret);
1437         if (ret == -EAGAIN || ret == 0) {
1438             if (ret == -EAGAIN &&
1439                 test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
1440                 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1441                 /* Notify TCP that we're limited by the
1442                  * application window size.
1443                  */
1444                 set_bit(SOCK_NOSPACE, &con->sock->flags);
1445                 con->sock->sk->sk_write_pending++;
1446             }
1447             cond_resched();
1448             goto out;
1449         } else if (ret < 0)
1450             goto out;
1451 
1452         /* Don't starve people filling buffers */
1453         if (++count >= MAX_SEND_MSG_COUNT) {
1454             cond_resched();
1455             count = 0;
1456         }
1457 
1458         spin_lock(&con->writequeue_lock);
1459         writequeue_entry_complete(e, ret);
1460     }
1461     spin_unlock(&con->writequeue_lock);
1462 
1463     /* close if we got EOF */
1464     if (test_and_clear_bit(CF_EOF, &con->flags)) {
1465         mutex_unlock(&con->sock_mutex);
1466         close_connection(con, false, false, true);
1467 
1468         /* handling for tcp shutdown */
1469         clear_bit(CF_SHUTDOWN, &con->flags);
1470         wake_up(&con->shutdown_wait);
1471     } else {
1472         mutex_unlock(&con->sock_mutex);
1473     }
1474 
1475     return;
1476 
1477 out:
1478     mutex_unlock(&con->sock_mutex);
1479     return;
1480 
1481 out_connect:
1482     mutex_unlock(&con->sock_mutex);
1483     queue_work(send_workqueue, &con->swork);
1484     cond_resched();
1485 }
1486 
1487 static void clean_one_writequeue(struct connection *con)
1488 {
1489     struct writequeue_entry *e, *safe;
1490 
1491     spin_lock(&con->writequeue_lock);
1492     list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1493         free_entry(e);
1494     }
1495     spin_unlock(&con->writequeue_lock);
1496 }
1497 
1498 /* Called from recovery when it knows that a node has
1499    left the cluster */
1500 int dlm_lowcomms_close(int nodeid)
1501 {
1502     struct connection *con;
1503     struct dlm_node_addr *na;
1504     int idx;
1505 
1506     log_print("closing connection to node %d", nodeid);
1507     idx = srcu_read_lock(&connections_srcu);
1508     con = nodeid2con(nodeid, 0);
1509     if (con) {
1510         set_bit(CF_CLOSE, &con->flags);
1511         close_connection(con, true, true, true);
1512         clean_one_writequeue(con);
1513         if (con->othercon)
1514             clean_one_writequeue(con->othercon);
1515     }
1516     srcu_read_unlock(&connections_srcu, idx);
1517 
1518     spin_lock(&dlm_node_addrs_spin);
1519     na = find_node_addr(nodeid);
1520     if (na) {
1521         list_del(&na->list);
1522         while (na->addr_count--)
1523             kfree(na->addr[na->addr_count]);
1524         kfree(na);
1525     }
1526     spin_unlock(&dlm_node_addrs_spin);
1527 
1528     return 0;
1529 }
1530 
1531 /* Receive workqueue function */
1532 static void process_recv_sockets(struct work_struct *work)
1533 {
1534     struct connection *con = container_of(work, struct connection, rwork);
1535 
1536     clear_bit(CF_READ_PENDING, &con->flags);
1537     receive_from_sock(con);
1538 }
1539 
1540 static void process_listen_recv_socket(struct work_struct *work)
1541 {
1542     accept_from_sock(&listen_con);
1543 }
1544 
1545 static void dlm_connect(struct connection *con)
1546 {
1547     struct sockaddr_storage addr;
1548     int result, addr_len;
1549     struct socket *sock;
1550     unsigned int mark;
1551 
1552     /* Some odd races can cause double-connects, ignore them */
1553     if (con->retries++ > MAX_CONNECT_RETRIES)
1554         return;
1555 
1556     if (con->sock) {
1557         log_print("node %d already connected.", con->nodeid);
1558         return;
1559     }
1560 
1561     memset(&addr, 0, sizeof(addr));
1562     result = nodeid_to_addr(con->nodeid, &addr, NULL,
1563                 dlm_proto_ops->try_new_addr, &mark);
1564     if (result < 0) {
1565         log_print("no address for nodeid %d", con->nodeid);
1566         return;
1567     }
1568 
1569     /* Create a socket to communicate with */
1570     result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1571                   SOCK_STREAM, dlm_proto_ops->proto, &sock);
1572     if (result < 0)
1573         goto socket_err;
1574 
1575     sock_set_mark(sock->sk, mark);
1576     dlm_proto_ops->sockopts(sock);
1577 
1578     add_sock(sock, con);
1579 
1580     result = dlm_proto_ops->bind(sock);
1581     if (result < 0)
1582         goto add_sock_err;
1583 
1584     log_print_ratelimited("connecting to %d", con->nodeid);
1585     make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len);
1586     result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr,
1587                     addr_len);
1588     if (result < 0)
1589         goto add_sock_err;
1590 
1591     return;
1592 
1593 add_sock_err:
1594     dlm_close_sock(&con->sock);
1595 
1596 socket_err:
1597     /*
1598      * Some errors are fatal and this list might need adjusting. For other
1599      * errors we try again until the max number of retries is reached.
1600      */
1601     if (result != -EHOSTUNREACH &&
1602         result != -ENETUNREACH &&
1603         result != -ENETDOWN &&
1604         result != -EINVAL &&
1605         result != -EPROTONOSUPPORT) {
1606         log_print("connect %d try %d error %d", con->nodeid,
1607               con->retries, result);
1608         msleep(1000);
1609         lowcomms_connect_sock(con);
1610     }
1611 }
1612 
1613 /* Send workqueue function */
1614 static void process_send_sockets(struct work_struct *work)
1615 {
1616     struct connection *con = container_of(work, struct connection, swork);
1617 
1618     WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags));
1619 
1620     clear_bit(CF_WRITE_PENDING, &con->flags);
1621 
1622     if (test_and_clear_bit(CF_RECONNECT, &con->flags)) {
1623         close_connection(con, false, false, true);
1624         dlm_midcomms_unack_msg_resend(con->nodeid);
1625     }
1626 
1627     if (con->sock == NULL) {
1628         if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags))
1629             msleep(1000);
1630 
1631         mutex_lock(&con->sock_mutex);
1632         dlm_connect(con);
1633         mutex_unlock(&con->sock_mutex);
1634     }
1635 
1636     if (!list_empty(&con->writequeue))
1637         send_to_sock(con);
1638 }
1639 
1640 static void work_stop(void)
1641 {
1642     if (recv_workqueue) {
1643         destroy_workqueue(recv_workqueue);
1644         recv_workqueue = NULL;
1645     }
1646 
1647     if (send_workqueue) {
1648         destroy_workqueue(send_workqueue);
1649         send_workqueue = NULL;
1650     }
1651 }
1652 
1653 static int work_start(void)
1654 {
1655     recv_workqueue = alloc_ordered_workqueue("dlm_recv", WQ_MEM_RECLAIM);
1656     if (!recv_workqueue) {
1657         log_print("can't start dlm_recv");
1658         return -ENOMEM;
1659     }
1660 
1661     send_workqueue = alloc_ordered_workqueue("dlm_send", WQ_MEM_RECLAIM);
1662     if (!send_workqueue) {
1663         log_print("can't start dlm_send");
1664         destroy_workqueue(recv_workqueue);
1665         recv_workqueue = NULL;
1666         return -ENOMEM;
1667     }
1668 
1669     return 0;
1670 }
1671 
1672 static void shutdown_conn(struct connection *con)
1673 {
1674     if (dlm_proto_ops->shutdown_action)
1675         dlm_proto_ops->shutdown_action(con);
1676 }
1677 
1678 void dlm_lowcomms_shutdown(void)
1679 {
1680     int idx;
1681 
1682     /* Set all the flags to prevent any
1683      * socket activity.
1684      */
1685     dlm_allow_conn = 0;
1686 
1687     if (recv_workqueue)
1688         flush_workqueue(recv_workqueue);
1689     if (send_workqueue)
1690         flush_workqueue(send_workqueue);
1691 
1692     dlm_close_sock(&listen_con.sock);
1693 
1694     idx = srcu_read_lock(&connections_srcu);
1695     foreach_conn(shutdown_conn);
1696     srcu_read_unlock(&connections_srcu, idx);
1697 }
1698 
1699 static void _stop_conn(struct connection *con, bool and_other)
1700 {
1701     mutex_lock(&con->sock_mutex);
1702     set_bit(CF_CLOSE, &con->flags);
1703     set_bit(CF_READ_PENDING, &con->flags);
1704     set_bit(CF_WRITE_PENDING, &con->flags);
1705     if (con->sock && con->sock->sk) {
1706         lock_sock(con->sock->sk);
1707         con->sock->sk->sk_user_data = NULL;
1708         release_sock(con->sock->sk);
1709     }
1710     if (con->othercon && and_other)
1711         _stop_conn(con->othercon, false);
1712     mutex_unlock(&con->sock_mutex);
1713 }
1714 
1715 static void stop_conn(struct connection *con)
1716 {
1717     _stop_conn(con, true);
1718 }
1719 
1720 static void connection_release(struct rcu_head *rcu)
1721 {
1722     struct connection *con = container_of(rcu, struct connection, rcu);
1723 
1724     kfree(con->rx_buf);
1725     kfree(con);
1726 }
1727 
1728 static void free_conn(struct connection *con)
1729 {
1730     close_connection(con, true, true, true);
1731     spin_lock(&connections_lock);
1732     hlist_del_rcu(&con->list);
1733     spin_unlock(&connections_lock);
1734     if (con->othercon) {
1735         clean_one_writequeue(con->othercon);
1736         call_srcu(&connections_srcu, &con->othercon->rcu,
1737               connection_release);
1738     }
1739     clean_one_writequeue(con);
1740     call_srcu(&connections_srcu, &con->rcu, connection_release);
1741 }
1742 
1743 static void work_flush(void)
1744 {
1745     int ok;
1746     int i;
1747     struct connection *con;
1748 
1749     do {
1750         ok = 1;
1751         foreach_conn(stop_conn);
1752         if (recv_workqueue)
1753             flush_workqueue(recv_workqueue);
1754         if (send_workqueue)
1755             flush_workqueue(send_workqueue);
1756         for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
1757             hlist_for_each_entry_rcu(con, &connection_hash[i],
1758                          list) {
1759                 ok &= test_bit(CF_READ_PENDING, &con->flags);
1760                 ok &= test_bit(CF_WRITE_PENDING, &con->flags);
1761                 if (con->othercon) {
1762                     ok &= test_bit(CF_READ_PENDING,
1763                                &con->othercon->flags);
1764                     ok &= test_bit(CF_WRITE_PENDING,
1765                                &con->othercon->flags);
1766                 }
1767             }
1768         }
1769     } while (!ok);
1770 }
1771 
1772 void dlm_lowcomms_stop(void)
1773 {
1774     int idx;
1775 
1776     idx = srcu_read_lock(&connections_srcu);
1777     work_flush();
1778     foreach_conn(free_conn);
1779     srcu_read_unlock(&connections_srcu, idx);
1780     work_stop();
1781     deinit_local();
1782 
1783     dlm_proto_ops = NULL;
1784 }
1785 
1786 static int dlm_listen_for_all(void)
1787 {
1788     struct socket *sock;
1789     int result;
1790 
1791     log_print("Using %s for communications",
1792           dlm_proto_ops->name);
1793 
1794     result = dlm_proto_ops->listen_validate();
1795     if (result < 0)
1796         return result;
1797 
1798     result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1799                   SOCK_STREAM, dlm_proto_ops->proto, &sock);
1800     if (result < 0) {
1801         log_print("Can't create comms socket: %d", result);
1802         return result;
1803     }
1804 
1805     sock_set_mark(sock->sk, dlm_config.ci_mark);
1806     dlm_proto_ops->listen_sockopts(sock);
1807 
1808     result = dlm_proto_ops->listen_bind(sock);
1809     if (result < 0)
1810         goto out;
1811 
1812     save_listen_callbacks(sock);
1813     add_listen_sock(sock, &listen_con);
1814 
1815     INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
1816     result = sock->ops->listen(sock, 5);
1817     if (result < 0) {
1818         dlm_close_sock(&listen_con.sock);
1819         goto out;
1820     }
1821 
1822     return 0;
1823 
1824 out:
1825     sock_release(sock);
1826     return result;
1827 }
1828 
1829 static int dlm_tcp_bind(struct socket *sock)
1830 {
1831     struct sockaddr_storage src_addr;
1832     int result, addr_len;
1833 
1834     /* Bind to our cluster-known address connecting to avoid
1835      * routing problems.
1836      */
1837     memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1838     make_sockaddr(&src_addr, 0, &addr_len);
1839 
1840     result = sock->ops->bind(sock, (struct sockaddr *)&src_addr,
1841                  addr_len);
1842     if (result < 0) {
1843         /* This *may* not indicate a critical error */
1844         log_print("could not bind for connect: %d", result);
1845     }
1846 
1847     return 0;
1848 }
1849 
1850 static int dlm_tcp_connect(struct connection *con, struct socket *sock,
1851                struct sockaddr *addr, int addr_len)
1852 {
1853     int ret;
1854 
1855     ret = sock->ops->connect(sock, addr, addr_len, O_NONBLOCK);
1856     switch (ret) {
1857     case -EINPROGRESS:
1858         fallthrough;
1859     case 0:
1860         return 0;
1861     }
1862 
1863     return ret;
1864 }
1865 
1866 static int dlm_tcp_listen_validate(void)
1867 {
1868     /* We don't support multi-homed hosts */
1869     if (dlm_local_count > 1) {
1870         log_print("TCP protocol can't handle multi-homed hosts, try SCTP");
1871         return -EINVAL;
1872     }
1873 
1874     return 0;
1875 }
1876 
1877 static void dlm_tcp_sockopts(struct socket *sock)
1878 {
1879     /* Turn off Nagle's algorithm */
1880     tcp_sock_set_nodelay(sock->sk);
1881 }
1882 
1883 static void dlm_tcp_listen_sockopts(struct socket *sock)
1884 {
1885     dlm_tcp_sockopts(sock);
1886     sock_set_reuseaddr(sock->sk);
1887 }
1888 
1889 static int dlm_tcp_listen_bind(struct socket *sock)
1890 {
1891     int addr_len;
1892 
1893     /* Bind to our port */
1894     make_sockaddr(dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
1895     return sock->ops->bind(sock, (struct sockaddr *)dlm_local_addr[0],
1896                    addr_len);
1897 }
1898 
1899 static const struct dlm_proto_ops dlm_tcp_ops = {
1900     .name = "TCP",
1901     .proto = IPPROTO_TCP,
1902     .connect = dlm_tcp_connect,
1903     .sockopts = dlm_tcp_sockopts,
1904     .bind = dlm_tcp_bind,
1905     .listen_validate = dlm_tcp_listen_validate,
1906     .listen_sockopts = dlm_tcp_listen_sockopts,
1907     .listen_bind = dlm_tcp_listen_bind,
1908     .shutdown_action = dlm_tcp_shutdown,
1909     .eof_condition = tcp_eof_condition,
1910 };
1911 
1912 static int dlm_sctp_bind(struct socket *sock)
1913 {
1914     return sctp_bind_addrs(sock, 0);
1915 }
1916 
1917 static int dlm_sctp_connect(struct connection *con, struct socket *sock,
1918                 struct sockaddr *addr, int addr_len)
1919 {
1920     int ret;
1921 
1922     /*
1923      * Make sock->ops->connect() function return in specified time,
1924      * since O_NONBLOCK argument in connect() function does not work here,
1925      * then, we should restore the default value of this attribute.
1926      */
1927     sock_set_sndtimeo(sock->sk, 5);
1928     ret = sock->ops->connect(sock, addr, addr_len, 0);
1929     sock_set_sndtimeo(sock->sk, 0);
1930     if (ret < 0)
1931         return ret;
1932 
1933     if (!test_and_set_bit(CF_CONNECTED, &con->flags))
1934         log_print("connected to node %d", con->nodeid);
1935 
1936     return 0;
1937 }
1938 
1939 static int dlm_sctp_listen_validate(void)
1940 {
1941     if (!IS_ENABLED(CONFIG_IP_SCTP)) {
1942         log_print("SCTP is not enabled by this kernel");
1943         return -EOPNOTSUPP;
1944     }
1945 
1946     request_module("sctp");
1947     return 0;
1948 }
1949 
1950 static int dlm_sctp_bind_listen(struct socket *sock)
1951 {
1952     return sctp_bind_addrs(sock, dlm_config.ci_tcp_port);
1953 }
1954 
1955 static void dlm_sctp_sockopts(struct socket *sock)
1956 {
1957     /* Turn off Nagle's algorithm */
1958     sctp_sock_set_nodelay(sock->sk);
1959     sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
1960 }
1961 
1962 static const struct dlm_proto_ops dlm_sctp_ops = {
1963     .name = "SCTP",
1964     .proto = IPPROTO_SCTP,
1965     .try_new_addr = true,
1966     .connect = dlm_sctp_connect,
1967     .sockopts = dlm_sctp_sockopts,
1968     .bind = dlm_sctp_bind,
1969     .listen_validate = dlm_sctp_listen_validate,
1970     .listen_sockopts = dlm_sctp_sockopts,
1971     .listen_bind = dlm_sctp_bind_listen,
1972 };
1973 
1974 int dlm_lowcomms_start(void)
1975 {
1976     int error = -EINVAL;
1977     int i;
1978 
1979     for (i = 0; i < CONN_HASH_SIZE; i++)
1980         INIT_HLIST_HEAD(&connection_hash[i]);
1981 
1982     init_local();
1983     if (!dlm_local_count) {
1984         error = -ENOTCONN;
1985         log_print("no local IP address has been set");
1986         goto fail;
1987     }
1988 
1989     INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
1990 
1991     error = work_start();
1992     if (error)
1993         goto fail_local;
1994 
1995     dlm_allow_conn = 1;
1996 
1997     /* Start listening */
1998     switch (dlm_config.ci_protocol) {
1999     case DLM_PROTO_TCP:
2000         dlm_proto_ops = &dlm_tcp_ops;
2001         break;
2002     case DLM_PROTO_SCTP:
2003         dlm_proto_ops = &dlm_sctp_ops;
2004         break;
2005     default:
2006         log_print("Invalid protocol identifier %d set",
2007               dlm_config.ci_protocol);
2008         error = -EINVAL;
2009         goto fail_proto_ops;
2010     }
2011 
2012     error = dlm_listen_for_all();
2013     if (error)
2014         goto fail_listen;
2015 
2016     return 0;
2017 
2018 fail_listen:
2019     dlm_proto_ops = NULL;
2020 fail_proto_ops:
2021     dlm_allow_conn = 0;
2022     dlm_close_sock(&listen_con.sock);
2023     work_stop();
2024 fail_local:
2025     deinit_local();
2026 fail:
2027     return error;
2028 }
2029 
2030 void dlm_lowcomms_exit(void)
2031 {
2032     struct dlm_node_addr *na, *safe;
2033 
2034     spin_lock(&dlm_node_addrs_spin);
2035     list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
2036         list_del(&na->list);
2037         while (na->addr_count--)
2038             kfree(na->addr[na->addr_count]);
2039         kfree(na);
2040     }
2041     spin_unlock(&dlm_node_addrs_spin);
2042 }