0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045 #include <asm/ioctls.h>
0046 #include <net/sock.h>
0047 #include <net/tcp.h>
0048 #include <linux/pagemap.h>
0049 #include <linux/file.h>
0050 #include <linux/mutex.h>
0051 #include <linux/sctp.h>
0052 #include <linux/slab.h>
0053 #include <net/sctp/sctp.h>
0054 #include <net/ipv6.h>
0055
0056 #include <trace/events/dlm.h>
0057
0058 #include "dlm_internal.h"
0059 #include "lowcomms.h"
0060 #include "midcomms.h"
0061 #include "memory.h"
0062 #include "config.h"
0063
0064 #define NEEDED_RMEM (4*1024*1024)
0065
0066
0067 #define MAX_SEND_MSG_COUNT 25
0068 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
0069
0070 struct connection {
0071 struct socket *sock;
0072 uint32_t nodeid;
0073 struct mutex sock_mutex;
0074 unsigned long flags;
0075 #define CF_READ_PENDING 1
0076 #define CF_WRITE_PENDING 2
0077 #define CF_INIT_PENDING 4
0078 #define CF_IS_OTHERCON 5
0079 #define CF_CLOSE 6
0080 #define CF_APP_LIMITED 7
0081 #define CF_CLOSING 8
0082 #define CF_SHUTDOWN 9
0083 #define CF_CONNECTED 10
0084 #define CF_RECONNECT 11
0085 #define CF_DELAY_CONNECT 12
0086 #define CF_EOF 13
0087 struct list_head writequeue;
0088 spinlock_t writequeue_lock;
0089 atomic_t writequeue_cnt;
0090 int retries;
0091 #define MAX_CONNECT_RETRIES 3
0092 struct hlist_node list;
0093 struct connection *othercon;
0094 struct connection *sendcon;
0095 struct work_struct rwork;
0096 struct work_struct swork;
0097 wait_queue_head_t shutdown_wait;
0098 unsigned char *rx_buf;
0099 int rx_buflen;
0100 int rx_leftover;
0101 struct rcu_head rcu;
0102 };
0103 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
0104
0105 struct listen_connection {
0106 struct socket *sock;
0107 struct work_struct rwork;
0108 };
0109
0110 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end)
0111 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset)
0112
0113
0114 struct writequeue_entry {
0115 struct list_head list;
0116 struct page *page;
0117 int offset;
0118 int len;
0119 int end;
0120 int users;
0121 bool dirty;
0122 struct connection *con;
0123 struct list_head msgs;
0124 struct kref ref;
0125 };
0126
0127 struct dlm_msg {
0128 struct writequeue_entry *entry;
0129 struct dlm_msg *orig_msg;
0130 bool retransmit;
0131 void *ppc;
0132 int len;
0133 int idx;
0134
0135 struct list_head list;
0136 struct kref ref;
0137 };
0138
0139 struct dlm_node_addr {
0140 struct list_head list;
0141 int nodeid;
0142 int mark;
0143 int addr_count;
0144 int curr_addr_index;
0145 struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
0146 };
0147
0148 struct dlm_proto_ops {
0149 bool try_new_addr;
0150 const char *name;
0151 int proto;
0152
0153 int (*connect)(struct connection *con, struct socket *sock,
0154 struct sockaddr *addr, int addr_len);
0155 void (*sockopts)(struct socket *sock);
0156 int (*bind)(struct socket *sock);
0157 int (*listen_validate)(void);
0158 void (*listen_sockopts)(struct socket *sock);
0159 int (*listen_bind)(struct socket *sock);
0160
0161 void (*shutdown_action)(struct connection *con);
0162
0163 bool (*eof_condition)(struct connection *con);
0164 };
0165
0166 static struct listen_sock_callbacks {
0167 void (*sk_error_report)(struct sock *);
0168 void (*sk_data_ready)(struct sock *);
0169 void (*sk_state_change)(struct sock *);
0170 void (*sk_write_space)(struct sock *);
0171 } listen_sock;
0172
0173 static LIST_HEAD(dlm_node_addrs);
0174 static DEFINE_SPINLOCK(dlm_node_addrs_spin);
0175
0176 static struct listen_connection listen_con;
0177 static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
0178 static int dlm_local_count;
0179 int dlm_allow_conn;
0180
0181
0182 static struct workqueue_struct *recv_workqueue;
0183 static struct workqueue_struct *send_workqueue;
0184
0185 static struct hlist_head connection_hash[CONN_HASH_SIZE];
0186 static DEFINE_SPINLOCK(connections_lock);
0187 DEFINE_STATIC_SRCU(connections_srcu);
0188
0189 static const struct dlm_proto_ops *dlm_proto_ops;
0190
0191 static void process_recv_sockets(struct work_struct *work);
0192 static void process_send_sockets(struct work_struct *work);
0193
0194 static void writequeue_entry_ctor(void *data)
0195 {
0196 struct writequeue_entry *entry = data;
0197
0198 INIT_LIST_HEAD(&entry->msgs);
0199 }
0200
0201 struct kmem_cache *dlm_lowcomms_writequeue_cache_create(void)
0202 {
0203 return kmem_cache_create("dlm_writequeue", sizeof(struct writequeue_entry),
0204 0, 0, writequeue_entry_ctor);
0205 }
0206
0207 struct kmem_cache *dlm_lowcomms_msg_cache_create(void)
0208 {
0209 return kmem_cache_create("dlm_msg", sizeof(struct dlm_msg), 0, 0, NULL);
0210 }
0211
0212
0213 static struct writequeue_entry *con_next_wq(struct connection *con)
0214 {
0215 struct writequeue_entry *e;
0216
0217 if (list_empty(&con->writequeue))
0218 return NULL;
0219
0220 e = list_first_entry(&con->writequeue, struct writequeue_entry,
0221 list);
0222
0223
0224
0225 if (e->users || e->len == 0)
0226 return NULL;
0227
0228 return e;
0229 }
0230
0231 static struct connection *__find_con(int nodeid, int r)
0232 {
0233 struct connection *con;
0234
0235 hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
0236 if (con->nodeid == nodeid)
0237 return con;
0238 }
0239
0240 return NULL;
0241 }
0242
0243 static bool tcp_eof_condition(struct connection *con)
0244 {
0245 return atomic_read(&con->writequeue_cnt);
0246 }
0247
0248 static int dlm_con_init(struct connection *con, int nodeid)
0249 {
0250 con->rx_buflen = dlm_config.ci_buffer_size;
0251 con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
0252 if (!con->rx_buf)
0253 return -ENOMEM;
0254
0255 con->nodeid = nodeid;
0256 mutex_init(&con->sock_mutex);
0257 INIT_LIST_HEAD(&con->writequeue);
0258 spin_lock_init(&con->writequeue_lock);
0259 atomic_set(&con->writequeue_cnt, 0);
0260 INIT_WORK(&con->swork, process_send_sockets);
0261 INIT_WORK(&con->rwork, process_recv_sockets);
0262 init_waitqueue_head(&con->shutdown_wait);
0263
0264 return 0;
0265 }
0266
0267
0268
0269
0270
0271 static struct connection *nodeid2con(int nodeid, gfp_t alloc)
0272 {
0273 struct connection *con, *tmp;
0274 int r, ret;
0275
0276 r = nodeid_hash(nodeid);
0277 con = __find_con(nodeid, r);
0278 if (con || !alloc)
0279 return con;
0280
0281 con = kzalloc(sizeof(*con), alloc);
0282 if (!con)
0283 return NULL;
0284
0285 ret = dlm_con_init(con, nodeid);
0286 if (ret) {
0287 kfree(con);
0288 return NULL;
0289 }
0290
0291 spin_lock(&connections_lock);
0292
0293
0294
0295
0296
0297
0298 tmp = __find_con(nodeid, r);
0299 if (tmp) {
0300 spin_unlock(&connections_lock);
0301 kfree(con->rx_buf);
0302 kfree(con);
0303 return tmp;
0304 }
0305
0306 hlist_add_head_rcu(&con->list, &connection_hash[r]);
0307 spin_unlock(&connections_lock);
0308
0309 return con;
0310 }
0311
0312
0313 static void foreach_conn(void (*conn_func)(struct connection *c))
0314 {
0315 int i;
0316 struct connection *con;
0317
0318 for (i = 0; i < CONN_HASH_SIZE; i++) {
0319 hlist_for_each_entry_rcu(con, &connection_hash[i], list)
0320 conn_func(con);
0321 }
0322 }
0323
0324 static struct dlm_node_addr *find_node_addr(int nodeid)
0325 {
0326 struct dlm_node_addr *na;
0327
0328 list_for_each_entry(na, &dlm_node_addrs, list) {
0329 if (na->nodeid == nodeid)
0330 return na;
0331 }
0332 return NULL;
0333 }
0334
0335 static int addr_compare(const struct sockaddr_storage *x,
0336 const struct sockaddr_storage *y)
0337 {
0338 switch (x->ss_family) {
0339 case AF_INET: {
0340 struct sockaddr_in *sinx = (struct sockaddr_in *)x;
0341 struct sockaddr_in *siny = (struct sockaddr_in *)y;
0342 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
0343 return 0;
0344 if (sinx->sin_port != siny->sin_port)
0345 return 0;
0346 break;
0347 }
0348 case AF_INET6: {
0349 struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
0350 struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
0351 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
0352 return 0;
0353 if (sinx->sin6_port != siny->sin6_port)
0354 return 0;
0355 break;
0356 }
0357 default:
0358 return 0;
0359 }
0360 return 1;
0361 }
0362
0363 static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
0364 struct sockaddr *sa_out, bool try_new_addr,
0365 unsigned int *mark)
0366 {
0367 struct sockaddr_storage sas;
0368 struct dlm_node_addr *na;
0369
0370 if (!dlm_local_count)
0371 return -1;
0372
0373 spin_lock(&dlm_node_addrs_spin);
0374 na = find_node_addr(nodeid);
0375 if (na && na->addr_count) {
0376 memcpy(&sas, na->addr[na->curr_addr_index],
0377 sizeof(struct sockaddr_storage));
0378
0379 if (try_new_addr) {
0380 na->curr_addr_index++;
0381 if (na->curr_addr_index == na->addr_count)
0382 na->curr_addr_index = 0;
0383 }
0384 }
0385 spin_unlock(&dlm_node_addrs_spin);
0386
0387 if (!na)
0388 return -EEXIST;
0389
0390 if (!na->addr_count)
0391 return -ENOENT;
0392
0393 *mark = na->mark;
0394
0395 if (sas_out)
0396 memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
0397
0398 if (!sa_out)
0399 return 0;
0400
0401 if (dlm_local_addr[0]->ss_family == AF_INET) {
0402 struct sockaddr_in *in4 = (struct sockaddr_in *) &sas;
0403 struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
0404 ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
0405 } else {
0406 struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &sas;
0407 struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
0408 ret6->sin6_addr = in6->sin6_addr;
0409 }
0410
0411 return 0;
0412 }
0413
0414 static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid,
0415 unsigned int *mark)
0416 {
0417 struct dlm_node_addr *na;
0418 int rv = -EEXIST;
0419 int addr_i;
0420
0421 spin_lock(&dlm_node_addrs_spin);
0422 list_for_each_entry(na, &dlm_node_addrs, list) {
0423 if (!na->addr_count)
0424 continue;
0425
0426 for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
0427 if (addr_compare(na->addr[addr_i], addr)) {
0428 *nodeid = na->nodeid;
0429 *mark = na->mark;
0430 rv = 0;
0431 goto unlock;
0432 }
0433 }
0434 }
0435 unlock:
0436 spin_unlock(&dlm_node_addrs_spin);
0437 return rv;
0438 }
0439
0440
0441 static bool dlm_lowcomms_na_has_addr(const struct dlm_node_addr *na,
0442 const struct sockaddr_storage *addr)
0443 {
0444 int i;
0445
0446 for (i = 0; i < na->addr_count; i++) {
0447 if (addr_compare(na->addr[i], addr))
0448 return true;
0449 }
0450
0451 return false;
0452 }
0453
0454 int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
0455 {
0456 struct sockaddr_storage *new_addr;
0457 struct dlm_node_addr *new_node, *na;
0458 bool ret;
0459
0460 new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
0461 if (!new_node)
0462 return -ENOMEM;
0463
0464 new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
0465 if (!new_addr) {
0466 kfree(new_node);
0467 return -ENOMEM;
0468 }
0469
0470 memcpy(new_addr, addr, len);
0471
0472 spin_lock(&dlm_node_addrs_spin);
0473 na = find_node_addr(nodeid);
0474 if (!na) {
0475 new_node->nodeid = nodeid;
0476 new_node->addr[0] = new_addr;
0477 new_node->addr_count = 1;
0478 new_node->mark = dlm_config.ci_mark;
0479 list_add(&new_node->list, &dlm_node_addrs);
0480 spin_unlock(&dlm_node_addrs_spin);
0481 return 0;
0482 }
0483
0484 ret = dlm_lowcomms_na_has_addr(na, addr);
0485 if (ret) {
0486 spin_unlock(&dlm_node_addrs_spin);
0487 kfree(new_addr);
0488 kfree(new_node);
0489 return -EEXIST;
0490 }
0491
0492 if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
0493 spin_unlock(&dlm_node_addrs_spin);
0494 kfree(new_addr);
0495 kfree(new_node);
0496 return -ENOSPC;
0497 }
0498
0499 na->addr[na->addr_count++] = new_addr;
0500 spin_unlock(&dlm_node_addrs_spin);
0501 kfree(new_node);
0502 return 0;
0503 }
0504
0505
0506 static void lowcomms_data_ready(struct sock *sk)
0507 {
0508 struct connection *con;
0509
0510 con = sock2con(sk);
0511 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
0512 queue_work(recv_workqueue, &con->rwork);
0513 }
0514
0515 static void lowcomms_listen_data_ready(struct sock *sk)
0516 {
0517 if (!dlm_allow_conn)
0518 return;
0519
0520 queue_work(recv_workqueue, &listen_con.rwork);
0521 }
0522
0523 static void lowcomms_write_space(struct sock *sk)
0524 {
0525 struct connection *con;
0526
0527 con = sock2con(sk);
0528 if (!con)
0529 return;
0530
0531 if (!test_and_set_bit(CF_CONNECTED, &con->flags)) {
0532 log_print("connected to node %d", con->nodeid);
0533 queue_work(send_workqueue, &con->swork);
0534 return;
0535 }
0536
0537 clear_bit(SOCK_NOSPACE, &con->sock->flags);
0538
0539 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
0540 con->sock->sk->sk_write_pending--;
0541 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
0542 }
0543
0544 queue_work(send_workqueue, &con->swork);
0545 }
0546
0547 static inline void lowcomms_connect_sock(struct connection *con)
0548 {
0549 if (test_bit(CF_CLOSE, &con->flags))
0550 return;
0551 queue_work(send_workqueue, &con->swork);
0552 cond_resched();
0553 }
0554
0555 static void lowcomms_state_change(struct sock *sk)
0556 {
0557
0558
0559
0560
0561
0562 if (sk->sk_shutdown) {
0563 if (sk->sk_shutdown == RCV_SHUTDOWN)
0564 lowcomms_data_ready(sk);
0565 } else if (sk->sk_state == TCP_ESTABLISHED) {
0566 lowcomms_write_space(sk);
0567 }
0568 }
0569
0570 int dlm_lowcomms_connect_node(int nodeid)
0571 {
0572 struct connection *con;
0573 int idx;
0574
0575 if (nodeid == dlm_our_nodeid())
0576 return 0;
0577
0578 idx = srcu_read_lock(&connections_srcu);
0579 con = nodeid2con(nodeid, GFP_NOFS);
0580 if (!con) {
0581 srcu_read_unlock(&connections_srcu, idx);
0582 return -ENOMEM;
0583 }
0584
0585 lowcomms_connect_sock(con);
0586 srcu_read_unlock(&connections_srcu, idx);
0587
0588 return 0;
0589 }
0590
0591 int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
0592 {
0593 struct dlm_node_addr *na;
0594
0595 spin_lock(&dlm_node_addrs_spin);
0596 na = find_node_addr(nodeid);
0597 if (!na) {
0598 spin_unlock(&dlm_node_addrs_spin);
0599 return -ENOENT;
0600 }
0601
0602 na->mark = mark;
0603 spin_unlock(&dlm_node_addrs_spin);
0604
0605 return 0;
0606 }
0607
0608 static void lowcomms_error_report(struct sock *sk)
0609 {
0610 struct connection *con;
0611 void (*orig_report)(struct sock *) = NULL;
0612 struct inet_sock *inet;
0613
0614 con = sock2con(sk);
0615 if (con == NULL)
0616 goto out;
0617
0618 orig_report = listen_sock.sk_error_report;
0619
0620 inet = inet_sk(sk);
0621 switch (sk->sk_family) {
0622 case AF_INET:
0623 printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
0624 "sending to node %d at %pI4, dport %d, "
0625 "sk_err=%d/%d\n", dlm_our_nodeid(),
0626 con->nodeid, &inet->inet_daddr,
0627 ntohs(inet->inet_dport), sk->sk_err,
0628 sk->sk_err_soft);
0629 break;
0630 #if IS_ENABLED(CONFIG_IPV6)
0631 case AF_INET6:
0632 printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
0633 "sending to node %d at %pI6c, "
0634 "dport %d, sk_err=%d/%d\n", dlm_our_nodeid(),
0635 con->nodeid, &sk->sk_v6_daddr,
0636 ntohs(inet->inet_dport), sk->sk_err,
0637 sk->sk_err_soft);
0638 break;
0639 #endif
0640 default:
0641 printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
0642 "invalid socket family %d set, "
0643 "sk_err=%d/%d\n", dlm_our_nodeid(),
0644 sk->sk_family, sk->sk_err, sk->sk_err_soft);
0645 goto out;
0646 }
0647
0648
0649 if (test_bit(CF_IS_OTHERCON, &con->flags))
0650 con = con->sendcon;
0651
0652 switch (sk->sk_err) {
0653 case ECONNREFUSED:
0654 set_bit(CF_DELAY_CONNECT, &con->flags);
0655 break;
0656 default:
0657 break;
0658 }
0659
0660 if (!test_and_set_bit(CF_RECONNECT, &con->flags))
0661 queue_work(send_workqueue, &con->swork);
0662
0663 out:
0664 if (orig_report)
0665 orig_report(sk);
0666 }
0667
0668
0669 static void save_listen_callbacks(struct socket *sock)
0670 {
0671 struct sock *sk = sock->sk;
0672
0673 listen_sock.sk_data_ready = sk->sk_data_ready;
0674 listen_sock.sk_state_change = sk->sk_state_change;
0675 listen_sock.sk_write_space = sk->sk_write_space;
0676 listen_sock.sk_error_report = sk->sk_error_report;
0677 }
0678
0679 static void restore_callbacks(struct socket *sock)
0680 {
0681 struct sock *sk = sock->sk;
0682
0683 lock_sock(sk);
0684 sk->sk_user_data = NULL;
0685 sk->sk_data_ready = listen_sock.sk_data_ready;
0686 sk->sk_state_change = listen_sock.sk_state_change;
0687 sk->sk_write_space = listen_sock.sk_write_space;
0688 sk->sk_error_report = listen_sock.sk_error_report;
0689 release_sock(sk);
0690 }
0691
0692 static void add_listen_sock(struct socket *sock, struct listen_connection *con)
0693 {
0694 struct sock *sk = sock->sk;
0695
0696 lock_sock(sk);
0697 save_listen_callbacks(sock);
0698 con->sock = sock;
0699
0700 sk->sk_user_data = con;
0701 sk->sk_allocation = GFP_NOFS;
0702
0703 sk->sk_data_ready = lowcomms_listen_data_ready;
0704 release_sock(sk);
0705 }
0706
0707
0708 static void add_sock(struct socket *sock, struct connection *con)
0709 {
0710 struct sock *sk = sock->sk;
0711
0712 lock_sock(sk);
0713 con->sock = sock;
0714
0715 sk->sk_user_data = con;
0716
0717 sk->sk_data_ready = lowcomms_data_ready;
0718 sk->sk_write_space = lowcomms_write_space;
0719 sk->sk_state_change = lowcomms_state_change;
0720 sk->sk_allocation = GFP_NOFS;
0721 sk->sk_error_report = lowcomms_error_report;
0722 release_sock(sk);
0723 }
0724
0725
0726
0727 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
0728 int *addr_len)
0729 {
0730 saddr->ss_family = dlm_local_addr[0]->ss_family;
0731 if (saddr->ss_family == AF_INET) {
0732 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
0733 in4_addr->sin_port = cpu_to_be16(port);
0734 *addr_len = sizeof(struct sockaddr_in);
0735 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
0736 } else {
0737 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
0738 in6_addr->sin6_port = cpu_to_be16(port);
0739 *addr_len = sizeof(struct sockaddr_in6);
0740 }
0741 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
0742 }
0743
0744 static void dlm_page_release(struct kref *kref)
0745 {
0746 struct writequeue_entry *e = container_of(kref, struct writequeue_entry,
0747 ref);
0748
0749 __free_page(e->page);
0750 dlm_free_writequeue(e);
0751 }
0752
0753 static void dlm_msg_release(struct kref *kref)
0754 {
0755 struct dlm_msg *msg = container_of(kref, struct dlm_msg, ref);
0756
0757 kref_put(&msg->entry->ref, dlm_page_release);
0758 dlm_free_msg(msg);
0759 }
0760
0761 static void free_entry(struct writequeue_entry *e)
0762 {
0763 struct dlm_msg *msg, *tmp;
0764
0765 list_for_each_entry_safe(msg, tmp, &e->msgs, list) {
0766 if (msg->orig_msg) {
0767 msg->orig_msg->retransmit = false;
0768 kref_put(&msg->orig_msg->ref, dlm_msg_release);
0769 }
0770
0771 list_del(&msg->list);
0772 kref_put(&msg->ref, dlm_msg_release);
0773 }
0774
0775 list_del(&e->list);
0776 atomic_dec(&e->con->writequeue_cnt);
0777 kref_put(&e->ref, dlm_page_release);
0778 }
0779
0780 static void dlm_close_sock(struct socket **sock)
0781 {
0782 if (*sock) {
0783 restore_callbacks(*sock);
0784 sock_release(*sock);
0785 *sock = NULL;
0786 }
0787 }
0788
0789
0790 static void close_connection(struct connection *con, bool and_other,
0791 bool tx, bool rx)
0792 {
0793 bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
0794 struct writequeue_entry *e;
0795
0796 if (tx && !closing && cancel_work_sync(&con->swork)) {
0797 log_print("canceled swork for node %d", con->nodeid);
0798 clear_bit(CF_WRITE_PENDING, &con->flags);
0799 }
0800 if (rx && !closing && cancel_work_sync(&con->rwork)) {
0801 log_print("canceled rwork for node %d", con->nodeid);
0802 clear_bit(CF_READ_PENDING, &con->flags);
0803 }
0804
0805 mutex_lock(&con->sock_mutex);
0806 dlm_close_sock(&con->sock);
0807
0808 if (con->othercon && and_other) {
0809
0810 close_connection(con->othercon, false, tx, rx);
0811 }
0812
0813
0814
0815
0816
0817
0818
0819
0820
0821
0822
0823
0824 spin_lock(&con->writequeue_lock);
0825 if (!list_empty(&con->writequeue)) {
0826 e = list_first_entry(&con->writequeue, struct writequeue_entry,
0827 list);
0828 if (e->dirty)
0829 free_entry(e);
0830 }
0831 spin_unlock(&con->writequeue_lock);
0832
0833 con->rx_leftover = 0;
0834 con->retries = 0;
0835 clear_bit(CF_APP_LIMITED, &con->flags);
0836 clear_bit(CF_CONNECTED, &con->flags);
0837 clear_bit(CF_DELAY_CONNECT, &con->flags);
0838 clear_bit(CF_RECONNECT, &con->flags);
0839 clear_bit(CF_EOF, &con->flags);
0840 mutex_unlock(&con->sock_mutex);
0841 clear_bit(CF_CLOSING, &con->flags);
0842 }
0843
0844 static void shutdown_connection(struct connection *con)
0845 {
0846 int ret;
0847
0848 flush_work(&con->swork);
0849
0850 mutex_lock(&con->sock_mutex);
0851
0852 if (!con->sock) {
0853 mutex_unlock(&con->sock_mutex);
0854 return;
0855 }
0856
0857 set_bit(CF_SHUTDOWN, &con->flags);
0858 ret = kernel_sock_shutdown(con->sock, SHUT_WR);
0859 mutex_unlock(&con->sock_mutex);
0860 if (ret) {
0861 log_print("Connection %p failed to shutdown: %d will force close",
0862 con, ret);
0863 goto force_close;
0864 } else {
0865 ret = wait_event_timeout(con->shutdown_wait,
0866 !test_bit(CF_SHUTDOWN, &con->flags),
0867 DLM_SHUTDOWN_WAIT_TIMEOUT);
0868 if (ret == 0) {
0869 log_print("Connection %p shutdown timed out, will force close",
0870 con);
0871 goto force_close;
0872 }
0873 }
0874
0875 return;
0876
0877 force_close:
0878 clear_bit(CF_SHUTDOWN, &con->flags);
0879 close_connection(con, false, true, true);
0880 }
0881
0882 static void dlm_tcp_shutdown(struct connection *con)
0883 {
0884 if (con->othercon)
0885 shutdown_connection(con->othercon);
0886 shutdown_connection(con);
0887 }
0888
0889 static int con_realloc_receive_buf(struct connection *con, int newlen)
0890 {
0891 unsigned char *newbuf;
0892
0893 newbuf = kmalloc(newlen, GFP_NOFS);
0894 if (!newbuf)
0895 return -ENOMEM;
0896
0897
0898 if (con->rx_leftover)
0899 memmove(newbuf, con->rx_buf, con->rx_leftover);
0900
0901
0902 kfree(con->rx_buf);
0903 con->rx_buflen = newlen;
0904 con->rx_buf = newbuf;
0905
0906 return 0;
0907 }
0908
0909
0910 static int receive_from_sock(struct connection *con)
0911 {
0912 struct msghdr msg;
0913 struct kvec iov;
0914 int ret, buflen;
0915
0916 mutex_lock(&con->sock_mutex);
0917
0918 if (con->sock == NULL) {
0919 ret = -EAGAIN;
0920 goto out_close;
0921 }
0922
0923
0924 buflen = dlm_config.ci_buffer_size;
0925 if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
0926 ret = con_realloc_receive_buf(con, buflen);
0927 if (ret < 0)
0928 goto out_resched;
0929 }
0930
0931 for (;;) {
0932
0933
0934
0935 iov.iov_base = con->rx_buf + con->rx_leftover;
0936 iov.iov_len = con->rx_buflen - con->rx_leftover;
0937
0938 memset(&msg, 0, sizeof(msg));
0939 msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
0940 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
0941 msg.msg_flags);
0942 trace_dlm_recv(con->nodeid, ret);
0943 if (ret == -EAGAIN)
0944 break;
0945 else if (ret <= 0)
0946 goto out_close;
0947
0948
0949 buflen = ret + con->rx_leftover;
0950 ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
0951 if (ret < 0)
0952 goto out_close;
0953
0954
0955
0956
0957
0958 con->rx_leftover = buflen - ret;
0959 if (con->rx_leftover) {
0960 memmove(con->rx_buf, con->rx_buf + ret,
0961 con->rx_leftover);
0962 }
0963 }
0964
0965 dlm_midcomms_receive_done(con->nodeid);
0966 mutex_unlock(&con->sock_mutex);
0967 return 0;
0968
0969 out_resched:
0970 if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
0971 queue_work(recv_workqueue, &con->rwork);
0972 mutex_unlock(&con->sock_mutex);
0973 return -EAGAIN;
0974
0975 out_close:
0976 if (ret == 0) {
0977 log_print("connection %p got EOF from %d",
0978 con, con->nodeid);
0979
0980 if (dlm_proto_ops->eof_condition &&
0981 dlm_proto_ops->eof_condition(con)) {
0982 set_bit(CF_EOF, &con->flags);
0983 mutex_unlock(&con->sock_mutex);
0984 } else {
0985 mutex_unlock(&con->sock_mutex);
0986 close_connection(con, false, true, false);
0987
0988
0989 clear_bit(CF_SHUTDOWN, &con->flags);
0990 wake_up(&con->shutdown_wait);
0991 }
0992
0993
0994 ret = -1;
0995 } else {
0996 mutex_unlock(&con->sock_mutex);
0997 }
0998 return ret;
0999 }
1000
1001
1002 static int accept_from_sock(struct listen_connection *con)
1003 {
1004 int result;
1005 struct sockaddr_storage peeraddr;
1006 struct socket *newsock;
1007 int len, idx;
1008 int nodeid;
1009 struct connection *newcon;
1010 struct connection *addcon;
1011 unsigned int mark;
1012
1013 if (!con->sock)
1014 return -ENOTCONN;
1015
1016 result = kernel_accept(con->sock, &newsock, O_NONBLOCK);
1017 if (result < 0)
1018 goto accept_err;
1019
1020
1021 memset(&peeraddr, 0, sizeof(peeraddr));
1022 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2);
1023 if (len < 0) {
1024 result = -ECONNABORTED;
1025 goto accept_err;
1026 }
1027
1028
1029 make_sockaddr(&peeraddr, 0, &len);
1030 if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) {
1031 switch (peeraddr.ss_family) {
1032 case AF_INET: {
1033 struct sockaddr_in *sin = (struct sockaddr_in *)&peeraddr;
1034
1035 log_print("connect from non cluster IPv4 node %pI4",
1036 &sin->sin_addr);
1037 break;
1038 }
1039 #if IS_ENABLED(CONFIG_IPV6)
1040 case AF_INET6: {
1041 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&peeraddr;
1042
1043 log_print("connect from non cluster IPv6 node %pI6c",
1044 &sin6->sin6_addr);
1045 break;
1046 }
1047 #endif
1048 default:
1049 log_print("invalid family from non cluster node");
1050 break;
1051 }
1052
1053 sock_release(newsock);
1054 return -1;
1055 }
1056
1057 log_print("got connection from %d", nodeid);
1058
1059
1060
1061
1062
1063
1064 idx = srcu_read_lock(&connections_srcu);
1065 newcon = nodeid2con(nodeid, GFP_NOFS);
1066 if (!newcon) {
1067 srcu_read_unlock(&connections_srcu, idx);
1068 result = -ENOMEM;
1069 goto accept_err;
1070 }
1071
1072 sock_set_mark(newsock->sk, mark);
1073
1074 mutex_lock(&newcon->sock_mutex);
1075 if (newcon->sock) {
1076 struct connection *othercon = newcon->othercon;
1077
1078 if (!othercon) {
1079 othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
1080 if (!othercon) {
1081 log_print("failed to allocate incoming socket");
1082 mutex_unlock(&newcon->sock_mutex);
1083 srcu_read_unlock(&connections_srcu, idx);
1084 result = -ENOMEM;
1085 goto accept_err;
1086 }
1087
1088 result = dlm_con_init(othercon, nodeid);
1089 if (result < 0) {
1090 kfree(othercon);
1091 mutex_unlock(&newcon->sock_mutex);
1092 srcu_read_unlock(&connections_srcu, idx);
1093 goto accept_err;
1094 }
1095
1096 lockdep_set_subclass(&othercon->sock_mutex, 1);
1097 set_bit(CF_IS_OTHERCON, &othercon->flags);
1098 newcon->othercon = othercon;
1099 othercon->sendcon = newcon;
1100 } else {
1101
1102 close_connection(othercon, false, true, false);
1103 }
1104
1105 mutex_lock(&othercon->sock_mutex);
1106 add_sock(newsock, othercon);
1107 addcon = othercon;
1108 mutex_unlock(&othercon->sock_mutex);
1109 }
1110 else {
1111
1112
1113
1114 add_sock(newsock, newcon);
1115 addcon = newcon;
1116 }
1117
1118 set_bit(CF_CONNECTED, &addcon->flags);
1119 mutex_unlock(&newcon->sock_mutex);
1120
1121
1122
1123
1124
1125
1126 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
1127 queue_work(recv_workqueue, &addcon->rwork);
1128
1129 srcu_read_unlock(&connections_srcu, idx);
1130
1131 return 0;
1132
1133 accept_err:
1134 if (newsock)
1135 sock_release(newsock);
1136
1137 if (result != -EAGAIN)
1138 log_print("error accepting connection from node: %d", result);
1139 return result;
1140 }
1141
1142
1143
1144
1145
1146
1147
1148
1149 static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
1150 {
1151 e->offset += completed;
1152 e->len -= completed;
1153
1154 e->dirty = true;
1155
1156 if (e->len == 0 && e->users == 0)
1157 free_entry(e);
1158 }
1159
1160
1161
1162
1163 static int sctp_bind_addrs(struct socket *sock, uint16_t port)
1164 {
1165 struct sockaddr_storage localaddr;
1166 struct sockaddr *addr = (struct sockaddr *)&localaddr;
1167 int i, addr_len, result = 0;
1168
1169 for (i = 0; i < dlm_local_count; i++) {
1170 memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
1171 make_sockaddr(&localaddr, port, &addr_len);
1172
1173 if (!i)
1174 result = kernel_bind(sock, addr, addr_len);
1175 else
1176 result = sock_bind_add(sock->sk, addr, addr_len);
1177
1178 if (result < 0) {
1179 log_print("Can't bind to %d addr number %d, %d.\n",
1180 port, i + 1, result);
1181 break;
1182 }
1183 }
1184 return result;
1185 }
1186
1187
1188 static void init_local(void)
1189 {
1190 struct sockaddr_storage sas, *addr;
1191 int i;
1192
1193 dlm_local_count = 0;
1194 for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1195 if (dlm_our_addr(&sas, i))
1196 break;
1197
1198 addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS);
1199 if (!addr)
1200 break;
1201 dlm_local_addr[dlm_local_count++] = addr;
1202 }
1203 }
1204
1205 static void deinit_local(void)
1206 {
1207 int i;
1208
1209 for (i = 0; i < dlm_local_count; i++)
1210 kfree(dlm_local_addr[i]);
1211 }
1212
1213 static struct writequeue_entry *new_writequeue_entry(struct connection *con)
1214 {
1215 struct writequeue_entry *entry;
1216
1217 entry = dlm_allocate_writequeue();
1218 if (!entry)
1219 return NULL;
1220
1221 entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO);
1222 if (!entry->page) {
1223 dlm_free_writequeue(entry);
1224 return NULL;
1225 }
1226
1227 entry->offset = 0;
1228 entry->len = 0;
1229 entry->end = 0;
1230 entry->dirty = false;
1231 entry->con = con;
1232 entry->users = 1;
1233 kref_init(&entry->ref);
1234 return entry;
1235 }
1236
1237 static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
1238 char **ppc, void (*cb)(void *data),
1239 void *data)
1240 {
1241 struct writequeue_entry *e;
1242
1243 spin_lock(&con->writequeue_lock);
1244 if (!list_empty(&con->writequeue)) {
1245 e = list_last_entry(&con->writequeue, struct writequeue_entry, list);
1246 if (DLM_WQ_REMAIN_BYTES(e) >= len) {
1247 kref_get(&e->ref);
1248
1249 *ppc = page_address(e->page) + e->end;
1250 if (cb)
1251 cb(data);
1252
1253 e->end += len;
1254 e->users++;
1255 goto out;
1256 }
1257 }
1258
1259 e = new_writequeue_entry(con);
1260 if (!e)
1261 goto out;
1262
1263 kref_get(&e->ref);
1264 *ppc = page_address(e->page);
1265 e->end += len;
1266 atomic_inc(&con->writequeue_cnt);
1267 if (cb)
1268 cb(data);
1269
1270 list_add_tail(&e->list, &con->writequeue);
1271
1272 out:
1273 spin_unlock(&con->writequeue_lock);
1274 return e;
1275 };
1276
1277 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len,
1278 gfp_t allocation, char **ppc,
1279 void (*cb)(void *data),
1280 void *data)
1281 {
1282 struct writequeue_entry *e;
1283 struct dlm_msg *msg;
1284
1285 msg = dlm_allocate_msg(allocation);
1286 if (!msg)
1287 return NULL;
1288
1289 kref_init(&msg->ref);
1290
1291 e = new_wq_entry(con, len, ppc, cb, data);
1292 if (!e) {
1293 dlm_free_msg(msg);
1294 return NULL;
1295 }
1296
1297 msg->retransmit = false;
1298 msg->orig_msg = NULL;
1299 msg->ppc = *ppc;
1300 msg->len = len;
1301 msg->entry = e;
1302
1303 return msg;
1304 }
1305
1306
1307
1308
1309 #ifndef __CHECKER__
1310 struct dlm_msg *dlm_lowcomms_new_msg(int nodeid, int len, gfp_t allocation,
1311 char **ppc, void (*cb)(void *data),
1312 void *data)
1313 {
1314 struct connection *con;
1315 struct dlm_msg *msg;
1316 int idx;
1317
1318 if (len > DLM_MAX_SOCKET_BUFSIZE ||
1319 len < sizeof(struct dlm_header)) {
1320 BUILD_BUG_ON(PAGE_SIZE < DLM_MAX_SOCKET_BUFSIZE);
1321 log_print("failed to allocate a buffer of size %d", len);
1322 WARN_ON(1);
1323 return NULL;
1324 }
1325
1326 idx = srcu_read_lock(&connections_srcu);
1327 con = nodeid2con(nodeid, allocation);
1328 if (!con) {
1329 srcu_read_unlock(&connections_srcu, idx);
1330 return NULL;
1331 }
1332
1333 msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data);
1334 if (!msg) {
1335 srcu_read_unlock(&connections_srcu, idx);
1336 return NULL;
1337 }
1338
1339
1340 msg->idx = idx;
1341 return msg;
1342 }
1343 #endif
1344
1345 static void _dlm_lowcomms_commit_msg(struct dlm_msg *msg)
1346 {
1347 struct writequeue_entry *e = msg->entry;
1348 struct connection *con = e->con;
1349 int users;
1350
1351 spin_lock(&con->writequeue_lock);
1352 kref_get(&msg->ref);
1353 list_add(&msg->list, &e->msgs);
1354
1355 users = --e->users;
1356 if (users)
1357 goto out;
1358
1359 e->len = DLM_WQ_LENGTH_BYTES(e);
1360 spin_unlock(&con->writequeue_lock);
1361
1362 queue_work(send_workqueue, &con->swork);
1363 return;
1364
1365 out:
1366 spin_unlock(&con->writequeue_lock);
1367 return;
1368 }
1369
1370
1371
1372
1373 #ifndef __CHECKER__
1374 void dlm_lowcomms_commit_msg(struct dlm_msg *msg)
1375 {
1376 _dlm_lowcomms_commit_msg(msg);
1377 srcu_read_unlock(&connections_srcu, msg->idx);
1378 }
1379 #endif
1380
1381 void dlm_lowcomms_put_msg(struct dlm_msg *msg)
1382 {
1383 kref_put(&msg->ref, dlm_msg_release);
1384 }
1385
1386
1387 int dlm_lowcomms_resend_msg(struct dlm_msg *msg)
1388 {
1389 struct dlm_msg *msg_resend;
1390 char *ppc;
1391
1392 if (msg->retransmit)
1393 return 1;
1394
1395 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len,
1396 GFP_ATOMIC, &ppc, NULL, NULL);
1397 if (!msg_resend)
1398 return -ENOMEM;
1399
1400 msg->retransmit = true;
1401 kref_get(&msg->ref);
1402 msg_resend->orig_msg = msg;
1403
1404 memcpy(ppc, msg->ppc, msg->len);
1405 _dlm_lowcomms_commit_msg(msg_resend);
1406 dlm_lowcomms_put_msg(msg_resend);
1407
1408 return 0;
1409 }
1410
1411
1412 static void send_to_sock(struct connection *con)
1413 {
1414 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1415 struct writequeue_entry *e;
1416 int len, offset, ret;
1417 int count = 0;
1418
1419 mutex_lock(&con->sock_mutex);
1420 if (con->sock == NULL)
1421 goto out_connect;
1422
1423 spin_lock(&con->writequeue_lock);
1424 for (;;) {
1425 e = con_next_wq(con);
1426 if (!e)
1427 break;
1428
1429 len = e->len;
1430 offset = e->offset;
1431 BUG_ON(len == 0 && e->users == 0);
1432 spin_unlock(&con->writequeue_lock);
1433
1434 ret = kernel_sendpage(con->sock, e->page, offset, len,
1435 msg_flags);
1436 trace_dlm_send(con->nodeid, ret);
1437 if (ret == -EAGAIN || ret == 0) {
1438 if (ret == -EAGAIN &&
1439 test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
1440 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1441
1442
1443
1444 set_bit(SOCK_NOSPACE, &con->sock->flags);
1445 con->sock->sk->sk_write_pending++;
1446 }
1447 cond_resched();
1448 goto out;
1449 } else if (ret < 0)
1450 goto out;
1451
1452
1453 if (++count >= MAX_SEND_MSG_COUNT) {
1454 cond_resched();
1455 count = 0;
1456 }
1457
1458 spin_lock(&con->writequeue_lock);
1459 writequeue_entry_complete(e, ret);
1460 }
1461 spin_unlock(&con->writequeue_lock);
1462
1463
1464 if (test_and_clear_bit(CF_EOF, &con->flags)) {
1465 mutex_unlock(&con->sock_mutex);
1466 close_connection(con, false, false, true);
1467
1468
1469 clear_bit(CF_SHUTDOWN, &con->flags);
1470 wake_up(&con->shutdown_wait);
1471 } else {
1472 mutex_unlock(&con->sock_mutex);
1473 }
1474
1475 return;
1476
1477 out:
1478 mutex_unlock(&con->sock_mutex);
1479 return;
1480
1481 out_connect:
1482 mutex_unlock(&con->sock_mutex);
1483 queue_work(send_workqueue, &con->swork);
1484 cond_resched();
1485 }
1486
1487 static void clean_one_writequeue(struct connection *con)
1488 {
1489 struct writequeue_entry *e, *safe;
1490
1491 spin_lock(&con->writequeue_lock);
1492 list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1493 free_entry(e);
1494 }
1495 spin_unlock(&con->writequeue_lock);
1496 }
1497
1498
1499
1500 int dlm_lowcomms_close(int nodeid)
1501 {
1502 struct connection *con;
1503 struct dlm_node_addr *na;
1504 int idx;
1505
1506 log_print("closing connection to node %d", nodeid);
1507 idx = srcu_read_lock(&connections_srcu);
1508 con = nodeid2con(nodeid, 0);
1509 if (con) {
1510 set_bit(CF_CLOSE, &con->flags);
1511 close_connection(con, true, true, true);
1512 clean_one_writequeue(con);
1513 if (con->othercon)
1514 clean_one_writequeue(con->othercon);
1515 }
1516 srcu_read_unlock(&connections_srcu, idx);
1517
1518 spin_lock(&dlm_node_addrs_spin);
1519 na = find_node_addr(nodeid);
1520 if (na) {
1521 list_del(&na->list);
1522 while (na->addr_count--)
1523 kfree(na->addr[na->addr_count]);
1524 kfree(na);
1525 }
1526 spin_unlock(&dlm_node_addrs_spin);
1527
1528 return 0;
1529 }
1530
1531
1532 static void process_recv_sockets(struct work_struct *work)
1533 {
1534 struct connection *con = container_of(work, struct connection, rwork);
1535
1536 clear_bit(CF_READ_PENDING, &con->flags);
1537 receive_from_sock(con);
1538 }
1539
1540 static void process_listen_recv_socket(struct work_struct *work)
1541 {
1542 accept_from_sock(&listen_con);
1543 }
1544
1545 static void dlm_connect(struct connection *con)
1546 {
1547 struct sockaddr_storage addr;
1548 int result, addr_len;
1549 struct socket *sock;
1550 unsigned int mark;
1551
1552
1553 if (con->retries++ > MAX_CONNECT_RETRIES)
1554 return;
1555
1556 if (con->sock) {
1557 log_print("node %d already connected.", con->nodeid);
1558 return;
1559 }
1560
1561 memset(&addr, 0, sizeof(addr));
1562 result = nodeid_to_addr(con->nodeid, &addr, NULL,
1563 dlm_proto_ops->try_new_addr, &mark);
1564 if (result < 0) {
1565 log_print("no address for nodeid %d", con->nodeid);
1566 return;
1567 }
1568
1569
1570 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1571 SOCK_STREAM, dlm_proto_ops->proto, &sock);
1572 if (result < 0)
1573 goto socket_err;
1574
1575 sock_set_mark(sock->sk, mark);
1576 dlm_proto_ops->sockopts(sock);
1577
1578 add_sock(sock, con);
1579
1580 result = dlm_proto_ops->bind(sock);
1581 if (result < 0)
1582 goto add_sock_err;
1583
1584 log_print_ratelimited("connecting to %d", con->nodeid);
1585 make_sockaddr(&addr, dlm_config.ci_tcp_port, &addr_len);
1586 result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr,
1587 addr_len);
1588 if (result < 0)
1589 goto add_sock_err;
1590
1591 return;
1592
1593 add_sock_err:
1594 dlm_close_sock(&con->sock);
1595
1596 socket_err:
1597
1598
1599
1600
1601 if (result != -EHOSTUNREACH &&
1602 result != -ENETUNREACH &&
1603 result != -ENETDOWN &&
1604 result != -EINVAL &&
1605 result != -EPROTONOSUPPORT) {
1606 log_print("connect %d try %d error %d", con->nodeid,
1607 con->retries, result);
1608 msleep(1000);
1609 lowcomms_connect_sock(con);
1610 }
1611 }
1612
1613
1614 static void process_send_sockets(struct work_struct *work)
1615 {
1616 struct connection *con = container_of(work, struct connection, swork);
1617
1618 WARN_ON(test_bit(CF_IS_OTHERCON, &con->flags));
1619
1620 clear_bit(CF_WRITE_PENDING, &con->flags);
1621
1622 if (test_and_clear_bit(CF_RECONNECT, &con->flags)) {
1623 close_connection(con, false, false, true);
1624 dlm_midcomms_unack_msg_resend(con->nodeid);
1625 }
1626
1627 if (con->sock == NULL) {
1628 if (test_and_clear_bit(CF_DELAY_CONNECT, &con->flags))
1629 msleep(1000);
1630
1631 mutex_lock(&con->sock_mutex);
1632 dlm_connect(con);
1633 mutex_unlock(&con->sock_mutex);
1634 }
1635
1636 if (!list_empty(&con->writequeue))
1637 send_to_sock(con);
1638 }
1639
1640 static void work_stop(void)
1641 {
1642 if (recv_workqueue) {
1643 destroy_workqueue(recv_workqueue);
1644 recv_workqueue = NULL;
1645 }
1646
1647 if (send_workqueue) {
1648 destroy_workqueue(send_workqueue);
1649 send_workqueue = NULL;
1650 }
1651 }
1652
1653 static int work_start(void)
1654 {
1655 recv_workqueue = alloc_ordered_workqueue("dlm_recv", WQ_MEM_RECLAIM);
1656 if (!recv_workqueue) {
1657 log_print("can't start dlm_recv");
1658 return -ENOMEM;
1659 }
1660
1661 send_workqueue = alloc_ordered_workqueue("dlm_send", WQ_MEM_RECLAIM);
1662 if (!send_workqueue) {
1663 log_print("can't start dlm_send");
1664 destroy_workqueue(recv_workqueue);
1665 recv_workqueue = NULL;
1666 return -ENOMEM;
1667 }
1668
1669 return 0;
1670 }
1671
1672 static void shutdown_conn(struct connection *con)
1673 {
1674 if (dlm_proto_ops->shutdown_action)
1675 dlm_proto_ops->shutdown_action(con);
1676 }
1677
1678 void dlm_lowcomms_shutdown(void)
1679 {
1680 int idx;
1681
1682
1683
1684
1685 dlm_allow_conn = 0;
1686
1687 if (recv_workqueue)
1688 flush_workqueue(recv_workqueue);
1689 if (send_workqueue)
1690 flush_workqueue(send_workqueue);
1691
1692 dlm_close_sock(&listen_con.sock);
1693
1694 idx = srcu_read_lock(&connections_srcu);
1695 foreach_conn(shutdown_conn);
1696 srcu_read_unlock(&connections_srcu, idx);
1697 }
1698
1699 static void _stop_conn(struct connection *con, bool and_other)
1700 {
1701 mutex_lock(&con->sock_mutex);
1702 set_bit(CF_CLOSE, &con->flags);
1703 set_bit(CF_READ_PENDING, &con->flags);
1704 set_bit(CF_WRITE_PENDING, &con->flags);
1705 if (con->sock && con->sock->sk) {
1706 lock_sock(con->sock->sk);
1707 con->sock->sk->sk_user_data = NULL;
1708 release_sock(con->sock->sk);
1709 }
1710 if (con->othercon && and_other)
1711 _stop_conn(con->othercon, false);
1712 mutex_unlock(&con->sock_mutex);
1713 }
1714
1715 static void stop_conn(struct connection *con)
1716 {
1717 _stop_conn(con, true);
1718 }
1719
1720 static void connection_release(struct rcu_head *rcu)
1721 {
1722 struct connection *con = container_of(rcu, struct connection, rcu);
1723
1724 kfree(con->rx_buf);
1725 kfree(con);
1726 }
1727
1728 static void free_conn(struct connection *con)
1729 {
1730 close_connection(con, true, true, true);
1731 spin_lock(&connections_lock);
1732 hlist_del_rcu(&con->list);
1733 spin_unlock(&connections_lock);
1734 if (con->othercon) {
1735 clean_one_writequeue(con->othercon);
1736 call_srcu(&connections_srcu, &con->othercon->rcu,
1737 connection_release);
1738 }
1739 clean_one_writequeue(con);
1740 call_srcu(&connections_srcu, &con->rcu, connection_release);
1741 }
1742
1743 static void work_flush(void)
1744 {
1745 int ok;
1746 int i;
1747 struct connection *con;
1748
1749 do {
1750 ok = 1;
1751 foreach_conn(stop_conn);
1752 if (recv_workqueue)
1753 flush_workqueue(recv_workqueue);
1754 if (send_workqueue)
1755 flush_workqueue(send_workqueue);
1756 for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
1757 hlist_for_each_entry_rcu(con, &connection_hash[i],
1758 list) {
1759 ok &= test_bit(CF_READ_PENDING, &con->flags);
1760 ok &= test_bit(CF_WRITE_PENDING, &con->flags);
1761 if (con->othercon) {
1762 ok &= test_bit(CF_READ_PENDING,
1763 &con->othercon->flags);
1764 ok &= test_bit(CF_WRITE_PENDING,
1765 &con->othercon->flags);
1766 }
1767 }
1768 }
1769 } while (!ok);
1770 }
1771
1772 void dlm_lowcomms_stop(void)
1773 {
1774 int idx;
1775
1776 idx = srcu_read_lock(&connections_srcu);
1777 work_flush();
1778 foreach_conn(free_conn);
1779 srcu_read_unlock(&connections_srcu, idx);
1780 work_stop();
1781 deinit_local();
1782
1783 dlm_proto_ops = NULL;
1784 }
1785
1786 static int dlm_listen_for_all(void)
1787 {
1788 struct socket *sock;
1789 int result;
1790
1791 log_print("Using %s for communications",
1792 dlm_proto_ops->name);
1793
1794 result = dlm_proto_ops->listen_validate();
1795 if (result < 0)
1796 return result;
1797
1798 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1799 SOCK_STREAM, dlm_proto_ops->proto, &sock);
1800 if (result < 0) {
1801 log_print("Can't create comms socket: %d", result);
1802 return result;
1803 }
1804
1805 sock_set_mark(sock->sk, dlm_config.ci_mark);
1806 dlm_proto_ops->listen_sockopts(sock);
1807
1808 result = dlm_proto_ops->listen_bind(sock);
1809 if (result < 0)
1810 goto out;
1811
1812 save_listen_callbacks(sock);
1813 add_listen_sock(sock, &listen_con);
1814
1815 INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
1816 result = sock->ops->listen(sock, 5);
1817 if (result < 0) {
1818 dlm_close_sock(&listen_con.sock);
1819 goto out;
1820 }
1821
1822 return 0;
1823
1824 out:
1825 sock_release(sock);
1826 return result;
1827 }
1828
1829 static int dlm_tcp_bind(struct socket *sock)
1830 {
1831 struct sockaddr_storage src_addr;
1832 int result, addr_len;
1833
1834
1835
1836
1837 memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1838 make_sockaddr(&src_addr, 0, &addr_len);
1839
1840 result = sock->ops->bind(sock, (struct sockaddr *)&src_addr,
1841 addr_len);
1842 if (result < 0) {
1843
1844 log_print("could not bind for connect: %d", result);
1845 }
1846
1847 return 0;
1848 }
1849
1850 static int dlm_tcp_connect(struct connection *con, struct socket *sock,
1851 struct sockaddr *addr, int addr_len)
1852 {
1853 int ret;
1854
1855 ret = sock->ops->connect(sock, addr, addr_len, O_NONBLOCK);
1856 switch (ret) {
1857 case -EINPROGRESS:
1858 fallthrough;
1859 case 0:
1860 return 0;
1861 }
1862
1863 return ret;
1864 }
1865
1866 static int dlm_tcp_listen_validate(void)
1867 {
1868
1869 if (dlm_local_count > 1) {
1870 log_print("TCP protocol can't handle multi-homed hosts, try SCTP");
1871 return -EINVAL;
1872 }
1873
1874 return 0;
1875 }
1876
1877 static void dlm_tcp_sockopts(struct socket *sock)
1878 {
1879
1880 tcp_sock_set_nodelay(sock->sk);
1881 }
1882
1883 static void dlm_tcp_listen_sockopts(struct socket *sock)
1884 {
1885 dlm_tcp_sockopts(sock);
1886 sock_set_reuseaddr(sock->sk);
1887 }
1888
1889 static int dlm_tcp_listen_bind(struct socket *sock)
1890 {
1891 int addr_len;
1892
1893
1894 make_sockaddr(dlm_local_addr[0], dlm_config.ci_tcp_port, &addr_len);
1895 return sock->ops->bind(sock, (struct sockaddr *)dlm_local_addr[0],
1896 addr_len);
1897 }
1898
1899 static const struct dlm_proto_ops dlm_tcp_ops = {
1900 .name = "TCP",
1901 .proto = IPPROTO_TCP,
1902 .connect = dlm_tcp_connect,
1903 .sockopts = dlm_tcp_sockopts,
1904 .bind = dlm_tcp_bind,
1905 .listen_validate = dlm_tcp_listen_validate,
1906 .listen_sockopts = dlm_tcp_listen_sockopts,
1907 .listen_bind = dlm_tcp_listen_bind,
1908 .shutdown_action = dlm_tcp_shutdown,
1909 .eof_condition = tcp_eof_condition,
1910 };
1911
1912 static int dlm_sctp_bind(struct socket *sock)
1913 {
1914 return sctp_bind_addrs(sock, 0);
1915 }
1916
1917 static int dlm_sctp_connect(struct connection *con, struct socket *sock,
1918 struct sockaddr *addr, int addr_len)
1919 {
1920 int ret;
1921
1922
1923
1924
1925
1926
1927 sock_set_sndtimeo(sock->sk, 5);
1928 ret = sock->ops->connect(sock, addr, addr_len, 0);
1929 sock_set_sndtimeo(sock->sk, 0);
1930 if (ret < 0)
1931 return ret;
1932
1933 if (!test_and_set_bit(CF_CONNECTED, &con->flags))
1934 log_print("connected to node %d", con->nodeid);
1935
1936 return 0;
1937 }
1938
1939 static int dlm_sctp_listen_validate(void)
1940 {
1941 if (!IS_ENABLED(CONFIG_IP_SCTP)) {
1942 log_print("SCTP is not enabled by this kernel");
1943 return -EOPNOTSUPP;
1944 }
1945
1946 request_module("sctp");
1947 return 0;
1948 }
1949
1950 static int dlm_sctp_bind_listen(struct socket *sock)
1951 {
1952 return sctp_bind_addrs(sock, dlm_config.ci_tcp_port);
1953 }
1954
1955 static void dlm_sctp_sockopts(struct socket *sock)
1956 {
1957
1958 sctp_sock_set_nodelay(sock->sk);
1959 sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
1960 }
1961
1962 static const struct dlm_proto_ops dlm_sctp_ops = {
1963 .name = "SCTP",
1964 .proto = IPPROTO_SCTP,
1965 .try_new_addr = true,
1966 .connect = dlm_sctp_connect,
1967 .sockopts = dlm_sctp_sockopts,
1968 .bind = dlm_sctp_bind,
1969 .listen_validate = dlm_sctp_listen_validate,
1970 .listen_sockopts = dlm_sctp_sockopts,
1971 .listen_bind = dlm_sctp_bind_listen,
1972 };
1973
1974 int dlm_lowcomms_start(void)
1975 {
1976 int error = -EINVAL;
1977 int i;
1978
1979 for (i = 0; i < CONN_HASH_SIZE; i++)
1980 INIT_HLIST_HEAD(&connection_hash[i]);
1981
1982 init_local();
1983 if (!dlm_local_count) {
1984 error = -ENOTCONN;
1985 log_print("no local IP address has been set");
1986 goto fail;
1987 }
1988
1989 INIT_WORK(&listen_con.rwork, process_listen_recv_socket);
1990
1991 error = work_start();
1992 if (error)
1993 goto fail_local;
1994
1995 dlm_allow_conn = 1;
1996
1997
1998 switch (dlm_config.ci_protocol) {
1999 case DLM_PROTO_TCP:
2000 dlm_proto_ops = &dlm_tcp_ops;
2001 break;
2002 case DLM_PROTO_SCTP:
2003 dlm_proto_ops = &dlm_sctp_ops;
2004 break;
2005 default:
2006 log_print("Invalid protocol identifier %d set",
2007 dlm_config.ci_protocol);
2008 error = -EINVAL;
2009 goto fail_proto_ops;
2010 }
2011
2012 error = dlm_listen_for_all();
2013 if (error)
2014 goto fail_listen;
2015
2016 return 0;
2017
2018 fail_listen:
2019 dlm_proto_ops = NULL;
2020 fail_proto_ops:
2021 dlm_allow_conn = 0;
2022 dlm_close_sock(&listen_con.sock);
2023 work_stop();
2024 fail_local:
2025 deinit_local();
2026 fail:
2027 return error;
2028 }
2029
2030 void dlm_lowcomms_exit(void)
2031 {
2032 struct dlm_node_addr *na, *safe;
2033
2034 spin_lock(&dlm_node_addrs_spin);
2035 list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
2036 list_del(&na->list);
2037 while (na->addr_count--)
2038 kfree(na->addr[na->addr_count]);
2039 kfree(na);
2040 }
2041 spin_unlock(&dlm_node_addrs_spin);
2042 }