0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082
0083
0084
0085
0086
0087
0088
0089
0090
0091
0092
0093
0094
0095
0096
0097
0098
0099
0100
0101
0102
0103
0104
0105
0106
0107
0108
0109
0110
0111
0112
0113
0114
0115
0116
0117
0118
0119
0120
0121
0122
0123
0124
0125
0126
0127
0128
0129
0130
0131
0132
0133 #define DLM_DEBUG_FENCE_TERMINATION 0
0134
0135 #include <net/tcp.h>
0136
0137 #include "dlm_internal.h"
0138 #include "lockspace.h"
0139 #include "lowcomms.h"
0140 #include "config.h"
0141 #include "memory.h"
0142 #include "lock.h"
0143 #include "util.h"
0144 #include "midcomms.h"
0145
0146
0147 #define DLM_SEQ_INIT 0
0148
0149 #define DLM_SHUTDOWN_TIMEOUT msecs_to_jiffies(3 * 60 * 1000)
0150 #define DLM_VERSION_NOT_SET 0
0151
0152 struct midcomms_node {
0153 int nodeid;
0154 uint32_t version;
0155 uint32_t seq_send;
0156 uint32_t seq_next;
0157
0158
0159
0160
0161
0162 struct list_head send_queue;
0163 spinlock_t send_queue_lock;
0164 atomic_t send_queue_cnt;
0165 #define DLM_NODE_FLAG_CLOSE 1
0166 #define DLM_NODE_FLAG_STOP_TX 2
0167 #define DLM_NODE_FLAG_STOP_RX 3
0168 #define DLM_NODE_ULP_DELIVERED 4
0169 unsigned long flags;
0170 wait_queue_head_t shutdown_wait;
0171
0172
0173 #define DLM_CLOSED 1
0174 #define DLM_ESTABLISHED 2
0175 #define DLM_FIN_WAIT1 3
0176 #define DLM_FIN_WAIT2 4
0177 #define DLM_CLOSE_WAIT 5
0178 #define DLM_LAST_ACK 6
0179 #define DLM_CLOSING 7
0180 int state;
0181 spinlock_t state_lock;
0182
0183
0184
0185
0186
0187 int users;
0188
0189
0190 void *debugfs;
0191
0192 struct hlist_node hlist;
0193 struct rcu_head rcu;
0194 };
0195
0196 struct dlm_mhandle {
0197 const struct dlm_header *inner_hd;
0198 struct midcomms_node *node;
0199 struct dlm_opts *opts;
0200 struct dlm_msg *msg;
0201 bool committed;
0202 uint32_t seq;
0203
0204 void (*ack_rcv)(struct midcomms_node *node);
0205
0206
0207 int idx;
0208
0209 struct list_head list;
0210 struct rcu_head rcu;
0211 };
0212
0213 static struct hlist_head node_hash[CONN_HASH_SIZE];
0214 static DEFINE_SPINLOCK(nodes_lock);
0215 DEFINE_STATIC_SRCU(nodes_srcu);
0216
0217
0218
0219
0220
0221
0222
0223 static DEFINE_MUTEX(close_lock);
0224
0225 struct kmem_cache *dlm_midcomms_cache_create(void)
0226 {
0227 return kmem_cache_create("dlm_mhandle", sizeof(struct dlm_mhandle),
0228 0, 0, NULL);
0229 }
0230
0231 static inline const char *dlm_state_str(int state)
0232 {
0233 switch (state) {
0234 case DLM_CLOSED:
0235 return "CLOSED";
0236 case DLM_ESTABLISHED:
0237 return "ESTABLISHED";
0238 case DLM_FIN_WAIT1:
0239 return "FIN_WAIT1";
0240 case DLM_FIN_WAIT2:
0241 return "FIN_WAIT2";
0242 case DLM_CLOSE_WAIT:
0243 return "CLOSE_WAIT";
0244 case DLM_LAST_ACK:
0245 return "LAST_ACK";
0246 case DLM_CLOSING:
0247 return "CLOSING";
0248 default:
0249 return "UNKNOWN";
0250 }
0251 }
0252
0253 const char *dlm_midcomms_state(struct midcomms_node *node)
0254 {
0255 return dlm_state_str(node->state);
0256 }
0257
0258 unsigned long dlm_midcomms_flags(struct midcomms_node *node)
0259 {
0260 return node->flags;
0261 }
0262
0263 int dlm_midcomms_send_queue_cnt(struct midcomms_node *node)
0264 {
0265 return atomic_read(&node->send_queue_cnt);
0266 }
0267
0268 uint32_t dlm_midcomms_version(struct midcomms_node *node)
0269 {
0270 return node->version;
0271 }
0272
0273 static struct midcomms_node *__find_node(int nodeid, int r)
0274 {
0275 struct midcomms_node *node;
0276
0277 hlist_for_each_entry_rcu(node, &node_hash[r], hlist) {
0278 if (node->nodeid == nodeid)
0279 return node;
0280 }
0281
0282 return NULL;
0283 }
0284
0285 static void dlm_mhandle_release(struct rcu_head *rcu)
0286 {
0287 struct dlm_mhandle *mh = container_of(rcu, struct dlm_mhandle, rcu);
0288
0289 dlm_lowcomms_put_msg(mh->msg);
0290 dlm_free_mhandle(mh);
0291 }
0292
0293 static void dlm_mhandle_delete(struct midcomms_node *node,
0294 struct dlm_mhandle *mh)
0295 {
0296 list_del_rcu(&mh->list);
0297 atomic_dec(&node->send_queue_cnt);
0298 call_rcu(&mh->rcu, dlm_mhandle_release);
0299 }
0300
0301 static void dlm_send_queue_flush(struct midcomms_node *node)
0302 {
0303 struct dlm_mhandle *mh;
0304
0305 pr_debug("flush midcomms send queue of node %d\n", node->nodeid);
0306
0307 rcu_read_lock();
0308 spin_lock(&node->send_queue_lock);
0309 list_for_each_entry_rcu(mh, &node->send_queue, list) {
0310 dlm_mhandle_delete(node, mh);
0311 }
0312 spin_unlock(&node->send_queue_lock);
0313 rcu_read_unlock();
0314 }
0315
0316 static void midcomms_node_reset(struct midcomms_node *node)
0317 {
0318 pr_debug("reset node %d\n", node->nodeid);
0319
0320 node->seq_next = DLM_SEQ_INIT;
0321 node->seq_send = DLM_SEQ_INIT;
0322 node->version = DLM_VERSION_NOT_SET;
0323 node->flags = 0;
0324
0325 dlm_send_queue_flush(node);
0326 node->state = DLM_CLOSED;
0327 wake_up(&node->shutdown_wait);
0328 }
0329
0330 static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc)
0331 {
0332 struct midcomms_node *node, *tmp;
0333 int r = nodeid_hash(nodeid);
0334
0335 node = __find_node(nodeid, r);
0336 if (node || !alloc)
0337 return node;
0338
0339 node = kmalloc(sizeof(*node), alloc);
0340 if (!node)
0341 return NULL;
0342
0343 node->nodeid = nodeid;
0344 spin_lock_init(&node->state_lock);
0345 spin_lock_init(&node->send_queue_lock);
0346 atomic_set(&node->send_queue_cnt, 0);
0347 INIT_LIST_HEAD(&node->send_queue);
0348 init_waitqueue_head(&node->shutdown_wait);
0349 node->users = 0;
0350 midcomms_node_reset(node);
0351
0352 spin_lock(&nodes_lock);
0353
0354
0355
0356 tmp = __find_node(nodeid, r);
0357 if (tmp) {
0358 spin_unlock(&nodes_lock);
0359 kfree(node);
0360 return tmp;
0361 }
0362
0363 hlist_add_head_rcu(&node->hlist, &node_hash[r]);
0364 spin_unlock(&nodes_lock);
0365
0366 node->debugfs = dlm_create_debug_comms_file(nodeid, node);
0367 return node;
0368 }
0369
0370 static int dlm_send_ack(int nodeid, uint32_t seq)
0371 {
0372 int mb_len = sizeof(struct dlm_header);
0373 struct dlm_header *m_header;
0374 struct dlm_msg *msg;
0375 char *ppc;
0376
0377 msg = dlm_lowcomms_new_msg(nodeid, mb_len, GFP_NOFS, &ppc,
0378 NULL, NULL);
0379 if (!msg)
0380 return -ENOMEM;
0381
0382 m_header = (struct dlm_header *)ppc;
0383
0384 m_header->h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
0385 m_header->h_nodeid = cpu_to_le32(dlm_our_nodeid());
0386 m_header->h_length = cpu_to_le16(mb_len);
0387 m_header->h_cmd = DLM_ACK;
0388 m_header->u.h_seq = cpu_to_le32(seq);
0389
0390 dlm_lowcomms_commit_msg(msg);
0391 dlm_lowcomms_put_msg(msg);
0392
0393 return 0;
0394 }
0395
0396 static int dlm_send_fin(struct midcomms_node *node,
0397 void (*ack_rcv)(struct midcomms_node *node))
0398 {
0399 int mb_len = sizeof(struct dlm_header);
0400 struct dlm_header *m_header;
0401 struct dlm_mhandle *mh;
0402 char *ppc;
0403
0404 mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, GFP_NOFS, &ppc);
0405 if (!mh)
0406 return -ENOMEM;
0407
0408 mh->ack_rcv = ack_rcv;
0409
0410 m_header = (struct dlm_header *)ppc;
0411
0412 m_header->h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
0413 m_header->h_nodeid = cpu_to_le32(dlm_our_nodeid());
0414 m_header->h_length = cpu_to_le16(mb_len);
0415 m_header->h_cmd = DLM_FIN;
0416
0417 pr_debug("sending fin msg to node %d\n", node->nodeid);
0418 dlm_midcomms_commit_mhandle(mh);
0419 set_bit(DLM_NODE_FLAG_STOP_TX, &node->flags);
0420
0421 return 0;
0422 }
0423
0424 static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
0425 {
0426 struct dlm_mhandle *mh;
0427
0428 rcu_read_lock();
0429 list_for_each_entry_rcu(mh, &node->send_queue, list) {
0430 if (before(mh->seq, seq)) {
0431 if (mh->ack_rcv)
0432 mh->ack_rcv(node);
0433 } else {
0434
0435 break;
0436 }
0437 }
0438
0439 spin_lock(&node->send_queue_lock);
0440 list_for_each_entry_rcu(mh, &node->send_queue, list) {
0441 if (before(mh->seq, seq)) {
0442 dlm_mhandle_delete(node, mh);
0443 } else {
0444
0445 break;
0446 }
0447 }
0448 spin_unlock(&node->send_queue_lock);
0449 rcu_read_unlock();
0450 }
0451
0452 static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
0453 {
0454 spin_lock(&node->state_lock);
0455 pr_debug("receive passive fin ack from node %d with state %s\n",
0456 node->nodeid, dlm_state_str(node->state));
0457
0458 switch (node->state) {
0459 case DLM_LAST_ACK:
0460
0461 midcomms_node_reset(node);
0462 break;
0463 case DLM_CLOSED:
0464
0465 wake_up(&node->shutdown_wait);
0466 break;
0467 default:
0468 spin_unlock(&node->state_lock);
0469 log_print("%s: unexpected state: %d\n",
0470 __func__, node->state);
0471 WARN_ON(1);
0472 return;
0473 }
0474 spin_unlock(&node->state_lock);
0475 }
0476
0477 static void dlm_midcomms_receive_buffer(union dlm_packet *p,
0478 struct midcomms_node *node,
0479 uint32_t seq)
0480 {
0481 if (seq == node->seq_next) {
0482 node->seq_next++;
0483
0484 switch (p->header.h_cmd) {
0485 case DLM_FIN:
0486
0487 dlm_send_ack(node->nodeid, node->seq_next);
0488
0489 spin_lock(&node->state_lock);
0490 pr_debug("receive fin msg from node %d with state %s\n",
0491 node->nodeid, dlm_state_str(node->state));
0492
0493 switch (node->state) {
0494 case DLM_ESTABLISHED:
0495 node->state = DLM_CLOSE_WAIT;
0496 pr_debug("switch node %d to state %s\n",
0497 node->nodeid, dlm_state_str(node->state));
0498
0499
0500
0501
0502 if (node->users == 0) {
0503 node->state = DLM_LAST_ACK;
0504 pr_debug("switch node %d to state %s case 1\n",
0505 node->nodeid, dlm_state_str(node->state));
0506 spin_unlock(&node->state_lock);
0507 goto send_fin;
0508 }
0509 break;
0510 case DLM_FIN_WAIT1:
0511 node->state = DLM_CLOSING;
0512 pr_debug("switch node %d to state %s\n",
0513 node->nodeid, dlm_state_str(node->state));
0514 break;
0515 case DLM_FIN_WAIT2:
0516 midcomms_node_reset(node);
0517 pr_debug("switch node %d to state %s\n",
0518 node->nodeid, dlm_state_str(node->state));
0519 wake_up(&node->shutdown_wait);
0520 break;
0521 case DLM_LAST_ACK:
0522
0523 break;
0524 default:
0525 spin_unlock(&node->state_lock);
0526 log_print("%s: unexpected state: %d\n",
0527 __func__, node->state);
0528 WARN_ON(1);
0529 return;
0530 }
0531 spin_unlock(&node->state_lock);
0532
0533 set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
0534 break;
0535 default:
0536 WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
0537 dlm_receive_buffer(p, node->nodeid);
0538 set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
0539 break;
0540 }
0541 } else {
0542
0543
0544
0545 if (seq < node->seq_next)
0546 dlm_send_ack(node->nodeid, node->seq_next);
0547
0548 log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d",
0549 seq, node->seq_next, node->nodeid);
0550 }
0551
0552 return;
0553
0554 send_fin:
0555 set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
0556 dlm_send_fin(node, dlm_pas_fin_ack_rcv);
0557 }
0558
0559 static struct midcomms_node *
0560 dlm_midcomms_recv_node_lookup(int nodeid, const union dlm_packet *p,
0561 uint16_t msglen, int (*cb)(struct midcomms_node *node))
0562 {
0563 struct midcomms_node *node = NULL;
0564 gfp_t allocation = 0;
0565 int ret;
0566
0567 switch (p->header.h_cmd) {
0568 case DLM_RCOM:
0569 if (msglen < sizeof(struct dlm_rcom)) {
0570 log_print("rcom msg too small: %u, will skip this message from node %d",
0571 msglen, nodeid);
0572 return NULL;
0573 }
0574
0575 switch (p->rcom.rc_type) {
0576 case cpu_to_le32(DLM_RCOM_NAMES):
0577 fallthrough;
0578 case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
0579 fallthrough;
0580 case cpu_to_le32(DLM_RCOM_STATUS):
0581 fallthrough;
0582 case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
0583 node = nodeid2node(nodeid, 0);
0584 if (node) {
0585 spin_lock(&node->state_lock);
0586 if (node->state != DLM_ESTABLISHED)
0587 pr_debug("receive begin RCOM msg from node %d with state %s\n",
0588 node->nodeid, dlm_state_str(node->state));
0589
0590 switch (node->state) {
0591 case DLM_CLOSED:
0592 node->state = DLM_ESTABLISHED;
0593 pr_debug("switch node %d to state %s\n",
0594 node->nodeid, dlm_state_str(node->state));
0595 break;
0596 case DLM_ESTABLISHED:
0597 break;
0598 default:
0599
0600
0601
0602
0603 log_print("reset node %d because shutdown stuck",
0604 node->nodeid);
0605
0606 midcomms_node_reset(node);
0607 node->state = DLM_ESTABLISHED;
0608 break;
0609 }
0610 spin_unlock(&node->state_lock);
0611 }
0612
0613 allocation = GFP_NOFS;
0614 break;
0615 default:
0616 break;
0617 }
0618
0619 break;
0620 default:
0621 break;
0622 }
0623
0624 node = nodeid2node(nodeid, allocation);
0625 if (!node) {
0626 switch (p->header.h_cmd) {
0627 case DLM_OPTS:
0628 if (msglen < sizeof(struct dlm_opts)) {
0629 log_print("opts msg too small: %u, will skip this message from node %d",
0630 msglen, nodeid);
0631 return NULL;
0632 }
0633
0634 log_print_ratelimited("received dlm opts message nextcmd %d from node %d in an invalid sequence",
0635 p->opts.o_nextcmd, nodeid);
0636 break;
0637 default:
0638 log_print_ratelimited("received dlm message cmd %d from node %d in an invalid sequence",
0639 p->header.h_cmd, nodeid);
0640 break;
0641 }
0642
0643 return NULL;
0644 }
0645
0646 ret = cb(node);
0647 if (ret < 0)
0648 return NULL;
0649
0650 return node;
0651 }
0652
0653 static int dlm_midcomms_version_check_3_2(struct midcomms_node *node)
0654 {
0655 switch (node->version) {
0656 case DLM_VERSION_NOT_SET:
0657 node->version = DLM_VERSION_3_2;
0658 log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
0659 node->nodeid);
0660 break;
0661 case DLM_VERSION_3_2:
0662 break;
0663 default:
0664 log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
0665 DLM_VERSION_3_2, node->nodeid, node->version);
0666 return -1;
0667 }
0668
0669 return 0;
0670 }
0671
0672 static int dlm_opts_check_msglen(union dlm_packet *p, uint16_t msglen, int nodeid)
0673 {
0674 int len = msglen;
0675
0676
0677
0678
0679 if (len < sizeof(struct dlm_opts))
0680 return -1;
0681 len -= sizeof(struct dlm_opts);
0682
0683 if (len < le16_to_cpu(p->opts.o_optlen))
0684 return -1;
0685 len -= le16_to_cpu(p->opts.o_optlen);
0686
0687 switch (p->opts.o_nextcmd) {
0688 case DLM_FIN:
0689 if (len < sizeof(struct dlm_header)) {
0690 log_print("fin too small: %d, will skip this message from node %d",
0691 len, nodeid);
0692 return -1;
0693 }
0694
0695 break;
0696 case DLM_MSG:
0697 if (len < sizeof(struct dlm_message)) {
0698 log_print("msg too small: %d, will skip this message from node %d",
0699 msglen, nodeid);
0700 return -1;
0701 }
0702
0703 break;
0704 case DLM_RCOM:
0705 if (len < sizeof(struct dlm_rcom)) {
0706 log_print("rcom msg too small: %d, will skip this message from node %d",
0707 len, nodeid);
0708 return -1;
0709 }
0710
0711 break;
0712 default:
0713 log_print("unsupported o_nextcmd received: %u, will skip this message from node %d",
0714 p->opts.o_nextcmd, nodeid);
0715 return -1;
0716 }
0717
0718 return 0;
0719 }
0720
0721 static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid)
0722 {
0723 uint16_t msglen = le16_to_cpu(p->header.h_length);
0724 struct midcomms_node *node;
0725 uint32_t seq;
0726 int ret, idx;
0727
0728 idx = srcu_read_lock(&nodes_srcu);
0729 node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
0730 dlm_midcomms_version_check_3_2);
0731 if (!node)
0732 goto out;
0733
0734 switch (p->header.h_cmd) {
0735 case DLM_RCOM:
0736
0737
0738
0739
0740
0741
0742 switch (p->rcom.rc_type) {
0743 case cpu_to_le32(DLM_RCOM_NAMES):
0744 fallthrough;
0745 case cpu_to_le32(DLM_RCOM_NAMES_REPLY):
0746 fallthrough;
0747 case cpu_to_le32(DLM_RCOM_STATUS):
0748 fallthrough;
0749 case cpu_to_le32(DLM_RCOM_STATUS_REPLY):
0750 break;
0751 default:
0752 log_print("unsupported rcom type received: %u, will skip this message from node %d",
0753 le32_to_cpu(p->rcom.rc_type), nodeid);
0754 goto out;
0755 }
0756
0757 WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
0758 dlm_receive_buffer(p, nodeid);
0759 break;
0760 case DLM_OPTS:
0761 seq = le32_to_cpu(p->header.u.h_seq);
0762
0763 ret = dlm_opts_check_msglen(p, msglen, nodeid);
0764 if (ret < 0) {
0765 log_print("opts msg too small: %u, will skip this message from node %d",
0766 msglen, nodeid);
0767 goto out;
0768 }
0769
0770 p = (union dlm_packet *)((unsigned char *)p->opts.o_opts +
0771 le16_to_cpu(p->opts.o_optlen));
0772
0773
0774 msglen = le16_to_cpu(p->header.h_length);
0775 switch (p->header.h_cmd) {
0776 case DLM_RCOM:
0777 if (msglen < sizeof(struct dlm_rcom)) {
0778 log_print("inner rcom msg too small: %u, will skip this message from node %d",
0779 msglen, nodeid);
0780 goto out;
0781 }
0782
0783 break;
0784 case DLM_MSG:
0785 if (msglen < sizeof(struct dlm_message)) {
0786 log_print("inner msg too small: %u, will skip this message from node %d",
0787 msglen, nodeid);
0788 goto out;
0789 }
0790
0791 break;
0792 case DLM_FIN:
0793 if (msglen < sizeof(struct dlm_header)) {
0794 log_print("inner fin too small: %u, will skip this message from node %d",
0795 msglen, nodeid);
0796 goto out;
0797 }
0798
0799 break;
0800 default:
0801 log_print("unsupported inner h_cmd received: %u, will skip this message from node %d",
0802 msglen, nodeid);
0803 goto out;
0804 }
0805
0806 dlm_midcomms_receive_buffer(p, node, seq);
0807 break;
0808 case DLM_ACK:
0809 seq = le32_to_cpu(p->header.u.h_seq);
0810 dlm_receive_ack(node, seq);
0811 break;
0812 default:
0813 log_print("unsupported h_cmd received: %u, will skip this message from node %d",
0814 p->header.h_cmd, nodeid);
0815 break;
0816 }
0817
0818 out:
0819 srcu_read_unlock(&nodes_srcu, idx);
0820 }
0821
0822 static int dlm_midcomms_version_check_3_1(struct midcomms_node *node)
0823 {
0824 switch (node->version) {
0825 case DLM_VERSION_NOT_SET:
0826 node->version = DLM_VERSION_3_1;
0827 log_print("version 0x%08x for node %d detected", DLM_VERSION_3_1,
0828 node->nodeid);
0829 break;
0830 case DLM_VERSION_3_1:
0831 break;
0832 default:
0833 log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
0834 DLM_VERSION_3_1, node->nodeid, node->version);
0835 return -1;
0836 }
0837
0838 return 0;
0839 }
0840
0841 static void dlm_midcomms_receive_buffer_3_1(union dlm_packet *p, int nodeid)
0842 {
0843 uint16_t msglen = le16_to_cpu(p->header.h_length);
0844 struct midcomms_node *node;
0845 int idx;
0846
0847 idx = srcu_read_lock(&nodes_srcu);
0848 node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
0849 dlm_midcomms_version_check_3_1);
0850 if (!node) {
0851 srcu_read_unlock(&nodes_srcu, idx);
0852 return;
0853 }
0854 srcu_read_unlock(&nodes_srcu, idx);
0855
0856 switch (p->header.h_cmd) {
0857 case DLM_RCOM:
0858
0859 break;
0860 case DLM_MSG:
0861 if (msglen < sizeof(struct dlm_message)) {
0862 log_print("msg too small: %u, will skip this message from node %d",
0863 msglen, nodeid);
0864 return;
0865 }
0866
0867 break;
0868 default:
0869 log_print("unsupported h_cmd received: %u, will skip this message from node %d",
0870 p->header.h_cmd, nodeid);
0871 return;
0872 }
0873
0874 dlm_receive_buffer(p, nodeid);
0875 }
0876
0877
0878
0879
0880
0881
0882 int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
0883 {
0884 const unsigned char *ptr = buf;
0885 const struct dlm_header *hd;
0886 uint16_t msglen;
0887 int ret = 0;
0888
0889 while (len >= sizeof(struct dlm_header)) {
0890 hd = (struct dlm_header *)ptr;
0891
0892
0893
0894
0895
0896
0897
0898
0899
0900
0901
0902
0903 msglen = le16_to_cpu(hd->h_length);
0904 if (msglen > DLM_MAX_SOCKET_BUFSIZE ||
0905 msglen < sizeof(struct dlm_header)) {
0906 log_print("received invalid length header: %u from node %d, will abort message parsing",
0907 msglen, nodeid);
0908 return -EBADMSG;
0909 }
0910
0911
0912
0913
0914 if (msglen > len)
0915 break;
0916
0917 switch (hd->h_version) {
0918 case cpu_to_le32(DLM_VERSION_3_1):
0919 dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid);
0920 break;
0921 case cpu_to_le32(DLM_VERSION_3_2):
0922 dlm_midcomms_receive_buffer_3_2((union dlm_packet *)ptr, nodeid);
0923 break;
0924 default:
0925 log_print("received invalid version header: %u from node %d, will skip this message",
0926 le32_to_cpu(hd->h_version), nodeid);
0927 break;
0928 }
0929
0930 ret += msglen;
0931 len -= msglen;
0932 ptr += msglen;
0933 }
0934
0935 return ret;
0936 }
0937
0938 void dlm_midcomms_receive_done(int nodeid)
0939 {
0940 struct midcomms_node *node;
0941 int idx;
0942
0943 idx = srcu_read_lock(&nodes_srcu);
0944 node = nodeid2node(nodeid, 0);
0945 if (!node) {
0946 srcu_read_unlock(&nodes_srcu, idx);
0947 return;
0948 }
0949
0950
0951 switch (node->version) {
0952 case DLM_VERSION_3_2:
0953 break;
0954 default:
0955 srcu_read_unlock(&nodes_srcu, idx);
0956 return;
0957 }
0958
0959
0960 if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED,
0961 &node->flags)) {
0962 srcu_read_unlock(&nodes_srcu, idx);
0963 return;
0964 }
0965
0966 spin_lock(&node->state_lock);
0967
0968 switch (node->state) {
0969 case DLM_ESTABLISHED:
0970 spin_unlock(&node->state_lock);
0971 dlm_send_ack(node->nodeid, node->seq_next);
0972 break;
0973 default:
0974 spin_unlock(&node->state_lock);
0975
0976 break;
0977 }
0978 srcu_read_unlock(&nodes_srcu, idx);
0979 }
0980
0981 void dlm_midcomms_unack_msg_resend(int nodeid)
0982 {
0983 struct midcomms_node *node;
0984 struct dlm_mhandle *mh;
0985 int idx, ret;
0986
0987 idx = srcu_read_lock(&nodes_srcu);
0988 node = nodeid2node(nodeid, 0);
0989 if (!node) {
0990 srcu_read_unlock(&nodes_srcu, idx);
0991 return;
0992 }
0993
0994
0995 switch (node->version) {
0996 case DLM_VERSION_3_2:
0997 break;
0998 default:
0999 srcu_read_unlock(&nodes_srcu, idx);
1000 return;
1001 }
1002
1003 rcu_read_lock();
1004 list_for_each_entry_rcu(mh, &node->send_queue, list) {
1005 if (!mh->committed)
1006 continue;
1007
1008 ret = dlm_lowcomms_resend_msg(mh->msg);
1009 if (!ret)
1010 log_print_ratelimited("retransmit dlm msg, seq %u, nodeid %d",
1011 mh->seq, node->nodeid);
1012 }
1013 rcu_read_unlock();
1014 srcu_read_unlock(&nodes_srcu, idx);
1015 }
1016
1017 static void dlm_fill_opts_header(struct dlm_opts *opts, uint16_t inner_len,
1018 uint32_t seq)
1019 {
1020 opts->o_header.h_cmd = DLM_OPTS;
1021 opts->o_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
1022 opts->o_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
1023 opts->o_header.h_length = cpu_to_le16(DLM_MIDCOMMS_OPT_LEN + inner_len);
1024 opts->o_header.u.h_seq = cpu_to_le32(seq);
1025 }
1026
1027 static void midcomms_new_msg_cb(void *data)
1028 {
1029 struct dlm_mhandle *mh = data;
1030
1031 atomic_inc(&mh->node->send_queue_cnt);
1032
1033 spin_lock(&mh->node->send_queue_lock);
1034 list_add_tail_rcu(&mh->list, &mh->node->send_queue);
1035 spin_unlock(&mh->node->send_queue_lock);
1036
1037 mh->seq = mh->node->seq_send++;
1038 }
1039
1040 static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid,
1041 int len, gfp_t allocation, char **ppc)
1042 {
1043 struct dlm_opts *opts;
1044 struct dlm_msg *msg;
1045
1046 msg = dlm_lowcomms_new_msg(nodeid, len + DLM_MIDCOMMS_OPT_LEN,
1047 allocation, ppc, midcomms_new_msg_cb, mh);
1048 if (!msg)
1049 return NULL;
1050
1051 opts = (struct dlm_opts *)*ppc;
1052 mh->opts = opts;
1053
1054
1055 dlm_fill_opts_header(opts, len, mh->seq);
1056
1057 *ppc += sizeof(*opts);
1058 mh->inner_hd = (const struct dlm_header *)*ppc;
1059 return msg;
1060 }
1061
1062
1063
1064
1065 #ifndef __CHECKER__
1066 struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
1067 gfp_t allocation, char **ppc)
1068 {
1069 struct midcomms_node *node;
1070 struct dlm_mhandle *mh;
1071 struct dlm_msg *msg;
1072 int idx;
1073
1074 idx = srcu_read_lock(&nodes_srcu);
1075 node = nodeid2node(nodeid, 0);
1076 if (!node) {
1077 WARN_ON_ONCE(1);
1078 goto err;
1079 }
1080
1081
1082 WARN_ON(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
1083
1084 mh = dlm_allocate_mhandle();
1085 if (!mh)
1086 goto err;
1087
1088 mh->committed = false;
1089 mh->ack_rcv = NULL;
1090 mh->idx = idx;
1091 mh->node = node;
1092
1093 switch (node->version) {
1094 case DLM_VERSION_3_1:
1095 msg = dlm_lowcomms_new_msg(nodeid, len, allocation, ppc,
1096 NULL, NULL);
1097 if (!msg) {
1098 dlm_free_mhandle(mh);
1099 goto err;
1100 }
1101
1102 break;
1103 case DLM_VERSION_3_2:
1104 msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, allocation,
1105 ppc);
1106 if (!msg) {
1107 dlm_free_mhandle(mh);
1108 goto err;
1109 }
1110
1111 break;
1112 default:
1113 dlm_free_mhandle(mh);
1114 WARN_ON(1);
1115 goto err;
1116 }
1117
1118 mh->msg = msg;
1119
1120
1121
1122
1123
1124
1125 return mh;
1126
1127 err:
1128 srcu_read_unlock(&nodes_srcu, idx);
1129 return NULL;
1130 }
1131 #endif
1132
1133 static void dlm_midcomms_commit_msg_3_2(struct dlm_mhandle *mh)
1134 {
1135
1136 mh->opts->o_nextcmd = mh->inner_hd->h_cmd;
1137 mh->committed = true;
1138 dlm_lowcomms_commit_msg(mh->msg);
1139 }
1140
1141
1142
1143
1144 #ifndef __CHECKER__
1145 void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh)
1146 {
1147 switch (mh->node->version) {
1148 case DLM_VERSION_3_1:
1149 srcu_read_unlock(&nodes_srcu, mh->idx);
1150
1151 dlm_lowcomms_commit_msg(mh->msg);
1152 dlm_lowcomms_put_msg(mh->msg);
1153
1154 dlm_free_mhandle(mh);
1155 break;
1156 case DLM_VERSION_3_2:
1157 dlm_midcomms_commit_msg_3_2(mh);
1158 srcu_read_unlock(&nodes_srcu, mh->idx);
1159 break;
1160 default:
1161 srcu_read_unlock(&nodes_srcu, mh->idx);
1162 WARN_ON(1);
1163 break;
1164 }
1165 }
1166 #endif
1167
1168 int dlm_midcomms_start(void)
1169 {
1170 int i;
1171
1172 for (i = 0; i < CONN_HASH_SIZE; i++)
1173 INIT_HLIST_HEAD(&node_hash[i]);
1174
1175 return dlm_lowcomms_start();
1176 }
1177
1178 static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
1179 {
1180 spin_lock(&node->state_lock);
1181 pr_debug("receive active fin ack from node %d with state %s\n",
1182 node->nodeid, dlm_state_str(node->state));
1183
1184 switch (node->state) {
1185 case DLM_FIN_WAIT1:
1186 node->state = DLM_FIN_WAIT2;
1187 pr_debug("switch node %d to state %s\n",
1188 node->nodeid, dlm_state_str(node->state));
1189 break;
1190 case DLM_CLOSING:
1191 midcomms_node_reset(node);
1192 pr_debug("switch node %d to state %s\n",
1193 node->nodeid, dlm_state_str(node->state));
1194 wake_up(&node->shutdown_wait);
1195 break;
1196 case DLM_CLOSED:
1197
1198 wake_up(&node->shutdown_wait);
1199 break;
1200 default:
1201 spin_unlock(&node->state_lock);
1202 log_print("%s: unexpected state: %d\n",
1203 __func__, node->state);
1204 WARN_ON(1);
1205 return;
1206 }
1207 spin_unlock(&node->state_lock);
1208 }
1209
1210 void dlm_midcomms_add_member(int nodeid)
1211 {
1212 struct midcomms_node *node;
1213 int idx;
1214
1215 if (nodeid == dlm_our_nodeid())
1216 return;
1217
1218 idx = srcu_read_lock(&nodes_srcu);
1219 node = nodeid2node(nodeid, GFP_NOFS);
1220 if (!node) {
1221 srcu_read_unlock(&nodes_srcu, idx);
1222 return;
1223 }
1224
1225 spin_lock(&node->state_lock);
1226 if (!node->users) {
1227 pr_debug("receive add member from node %d with state %s\n",
1228 node->nodeid, dlm_state_str(node->state));
1229 switch (node->state) {
1230 case DLM_ESTABLISHED:
1231 break;
1232 case DLM_CLOSED:
1233 node->state = DLM_ESTABLISHED;
1234 pr_debug("switch node %d to state %s\n",
1235 node->nodeid, dlm_state_str(node->state));
1236 break;
1237 default:
1238
1239
1240
1241
1242 log_print("reset node %d because shutdown stuck",
1243 node->nodeid);
1244
1245 midcomms_node_reset(node);
1246 node->state = DLM_ESTABLISHED;
1247 break;
1248 }
1249 }
1250
1251 node->users++;
1252 pr_debug("node %d users inc count %d\n", nodeid, node->users);
1253 spin_unlock(&node->state_lock);
1254
1255 srcu_read_unlock(&nodes_srcu, idx);
1256 }
1257
1258 void dlm_midcomms_remove_member(int nodeid)
1259 {
1260 struct midcomms_node *node;
1261 int idx;
1262
1263 if (nodeid == dlm_our_nodeid())
1264 return;
1265
1266 idx = srcu_read_lock(&nodes_srcu);
1267 node = nodeid2node(nodeid, 0);
1268 if (!node) {
1269 srcu_read_unlock(&nodes_srcu, idx);
1270 return;
1271 }
1272
1273 spin_lock(&node->state_lock);
1274 node->users--;
1275 pr_debug("node %d users dec count %d\n", nodeid, node->users);
1276
1277
1278
1279
1280
1281 if (node->users == 0) {
1282 pr_debug("receive remove member from node %d with state %s\n",
1283 node->nodeid, dlm_state_str(node->state));
1284 switch (node->state) {
1285 case DLM_ESTABLISHED:
1286 break;
1287 case DLM_CLOSE_WAIT:
1288
1289 node->state = DLM_LAST_ACK;
1290 spin_unlock(&node->state_lock);
1291
1292 pr_debug("switch node %d to state %s case 2\n",
1293 node->nodeid, dlm_state_str(node->state));
1294 goto send_fin;
1295 case DLM_LAST_ACK:
1296
1297 break;
1298 case DLM_CLOSED:
1299
1300 break;
1301 default:
1302 log_print("%s: unexpected state: %d\n",
1303 __func__, node->state);
1304 break;
1305 }
1306 }
1307 spin_unlock(&node->state_lock);
1308
1309 srcu_read_unlock(&nodes_srcu, idx);
1310 return;
1311
1312 send_fin:
1313 set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
1314 dlm_send_fin(node, dlm_pas_fin_ack_rcv);
1315 srcu_read_unlock(&nodes_srcu, idx);
1316 }
1317
1318 static void midcomms_node_release(struct rcu_head *rcu)
1319 {
1320 struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
1321
1322 WARN_ON(atomic_read(&node->send_queue_cnt));
1323 kfree(node);
1324 }
1325
1326 static void midcomms_shutdown(struct midcomms_node *node)
1327 {
1328 int ret;
1329
1330
1331 switch (node->version) {
1332 case DLM_VERSION_3_2:
1333 break;
1334 default:
1335 return;
1336 }
1337
1338 spin_lock(&node->state_lock);
1339 pr_debug("receive active shutdown for node %d with state %s\n",
1340 node->nodeid, dlm_state_str(node->state));
1341 switch (node->state) {
1342 case DLM_ESTABLISHED:
1343 node->state = DLM_FIN_WAIT1;
1344 pr_debug("switch node %d to state %s case 2\n",
1345 node->nodeid, dlm_state_str(node->state));
1346 break;
1347 case DLM_CLOSED:
1348
1349 spin_unlock(&node->state_lock);
1350 return;
1351 default:
1352
1353
1354
1355 break;
1356 }
1357 spin_unlock(&node->state_lock);
1358
1359 if (node->state == DLM_FIN_WAIT1) {
1360 dlm_send_fin(node, dlm_act_fin_ack_rcv);
1361
1362 if (DLM_DEBUG_FENCE_TERMINATION)
1363 msleep(5000);
1364 }
1365
1366
1367 ret = wait_event_timeout(node->shutdown_wait,
1368 node->state == DLM_CLOSED ||
1369 test_bit(DLM_NODE_FLAG_CLOSE, &node->flags),
1370 DLM_SHUTDOWN_TIMEOUT);
1371 if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags)) {
1372 pr_debug("active shutdown timed out for node %d with state %s\n",
1373 node->nodeid, dlm_state_str(node->state));
1374 midcomms_node_reset(node);
1375 return;
1376 }
1377
1378 pr_debug("active shutdown done for node %d with state %s\n",
1379 node->nodeid, dlm_state_str(node->state));
1380 }
1381
1382 void dlm_midcomms_shutdown(void)
1383 {
1384 struct midcomms_node *node;
1385 int i, idx;
1386
1387 mutex_lock(&close_lock);
1388 idx = srcu_read_lock(&nodes_srcu);
1389 for (i = 0; i < CONN_HASH_SIZE; i++) {
1390 hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
1391 midcomms_shutdown(node);
1392
1393 dlm_delete_debug_comms_file(node->debugfs);
1394
1395 spin_lock(&nodes_lock);
1396 hlist_del_rcu(&node->hlist);
1397 spin_unlock(&nodes_lock);
1398
1399 call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
1400 }
1401 }
1402 srcu_read_unlock(&nodes_srcu, idx);
1403 mutex_unlock(&close_lock);
1404
1405 dlm_lowcomms_shutdown();
1406 }
1407
1408 int dlm_midcomms_close(int nodeid)
1409 {
1410 struct midcomms_node *node;
1411 int idx, ret;
1412
1413 if (nodeid == dlm_our_nodeid())
1414 return 0;
1415
1416 dlm_stop_lockspaces_check();
1417
1418 idx = srcu_read_lock(&nodes_srcu);
1419
1420 node = nodeid2node(nodeid, 0);
1421 if (node) {
1422
1423 set_bit(DLM_NODE_FLAG_CLOSE, &node->flags);
1424 wake_up(&node->shutdown_wait);
1425 }
1426 srcu_read_unlock(&nodes_srcu, idx);
1427
1428 synchronize_srcu(&nodes_srcu);
1429
1430 idx = srcu_read_lock(&nodes_srcu);
1431 mutex_lock(&close_lock);
1432 node = nodeid2node(nodeid, 0);
1433 if (!node) {
1434 mutex_unlock(&close_lock);
1435 srcu_read_unlock(&nodes_srcu, idx);
1436 return dlm_lowcomms_close(nodeid);
1437 }
1438
1439 ret = dlm_lowcomms_close(nodeid);
1440 spin_lock(&node->state_lock);
1441 midcomms_node_reset(node);
1442 spin_unlock(&node->state_lock);
1443 srcu_read_unlock(&nodes_srcu, idx);
1444 mutex_unlock(&close_lock);
1445
1446 return ret;
1447 }
1448
1449
1450 struct dlm_rawmsg_data {
1451 struct midcomms_node *node;
1452 void *buf;
1453 };
1454
1455 static void midcomms_new_rawmsg_cb(void *data)
1456 {
1457 struct dlm_rawmsg_data *rd = data;
1458 struct dlm_header *h = rd->buf;
1459
1460 switch (h->h_version) {
1461 case cpu_to_le32(DLM_VERSION_3_1):
1462 break;
1463 default:
1464 switch (h->h_cmd) {
1465 case DLM_OPTS:
1466 if (!h->u.h_seq)
1467 h->u.h_seq = cpu_to_le32(rd->node->seq_send++);
1468 break;
1469 default:
1470 break;
1471 }
1472 break;
1473 }
1474 }
1475
1476 int dlm_midcomms_rawmsg_send(struct midcomms_node *node, void *buf,
1477 int buflen)
1478 {
1479 struct dlm_rawmsg_data rd;
1480 struct dlm_msg *msg;
1481 char *msgbuf;
1482
1483 rd.node = node;
1484 rd.buf = buf;
1485
1486 msg = dlm_lowcomms_new_msg(node->nodeid, buflen, GFP_NOFS,
1487 &msgbuf, midcomms_new_rawmsg_cb, &rd);
1488 if (!msg)
1489 return -ENOMEM;
1490
1491 memcpy(msgbuf, buf, buflen);
1492 dlm_lowcomms_commit_msg(msg);
1493 return 0;
1494 }
1495