0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033 #include <linux/kernel.h>
0034 #include <linux/moduleparam.h>
0035 #include <linux/gfp.h>
0036 #include <net/sock.h>
0037 #include <linux/in.h>
0038 #include <linux/list.h>
0039 #include <linux/ratelimit.h>
0040 #include <linux/export.h>
0041 #include <linux/sizes.h>
0042
0043 #include "rds.h"
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055 static int send_batch_count = SZ_1K;
0056 module_param(send_batch_count, int, 0444);
0057 MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
0058
0059 static void rds_send_remove_from_sock(struct list_head *messages, int status);
0060
0061
0062
0063
0064
0065 void rds_send_path_reset(struct rds_conn_path *cp)
0066 {
0067 struct rds_message *rm, *tmp;
0068 unsigned long flags;
0069
0070 if (cp->cp_xmit_rm) {
0071 rm = cp->cp_xmit_rm;
0072 cp->cp_xmit_rm = NULL;
0073
0074
0075
0076
0077 rds_message_unmapped(rm);
0078 rds_message_put(rm);
0079 }
0080
0081 cp->cp_xmit_sg = 0;
0082 cp->cp_xmit_hdr_off = 0;
0083 cp->cp_xmit_data_off = 0;
0084 cp->cp_xmit_atomic_sent = 0;
0085 cp->cp_xmit_rdma_sent = 0;
0086 cp->cp_xmit_data_sent = 0;
0087
0088 cp->cp_conn->c_map_queued = 0;
0089
0090 cp->cp_unacked_packets = rds_sysctl_max_unacked_packets;
0091 cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes;
0092
0093
0094 spin_lock_irqsave(&cp->cp_lock, flags);
0095 list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
0096 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
0097 set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
0098 }
0099 list_splice_init(&cp->cp_retrans, &cp->cp_send_queue);
0100 spin_unlock_irqrestore(&cp->cp_lock, flags);
0101 }
0102 EXPORT_SYMBOL_GPL(rds_send_path_reset);
0103
0104 static int acquire_in_xmit(struct rds_conn_path *cp)
0105 {
0106 return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0;
0107 }
0108
0109 static void release_in_xmit(struct rds_conn_path *cp)
0110 {
0111 clear_bit(RDS_IN_XMIT, &cp->cp_flags);
0112 smp_mb__after_atomic();
0113
0114
0115
0116
0117
0118
0119 if (waitqueue_active(&cp->cp_waitq))
0120 wake_up_all(&cp->cp_waitq);
0121 }
0122
0123
0124
0125
0126
0127
0128
0129
0130
0131
0132
0133
0134
0135
0136
0137 int rds_send_xmit(struct rds_conn_path *cp)
0138 {
0139 struct rds_connection *conn = cp->cp_conn;
0140 struct rds_message *rm;
0141 unsigned long flags;
0142 unsigned int tmp;
0143 struct scatterlist *sg;
0144 int ret = 0;
0145 LIST_HEAD(to_be_dropped);
0146 int batch_count;
0147 unsigned long send_gen = 0;
0148 int same_rm = 0;
0149
0150 restart:
0151 batch_count = 0;
0152
0153
0154
0155
0156
0157
0158
0159
0160 if (!acquire_in_xmit(cp)) {
0161 rds_stats_inc(s_send_lock_contention);
0162 ret = -ENOMEM;
0163 goto out;
0164 }
0165
0166 if (rds_destroy_pending(cp->cp_conn)) {
0167 release_in_xmit(cp);
0168 ret = -ENETUNREACH;
0169 goto out;
0170 }
0171
0172
0173
0174
0175
0176
0177
0178
0179
0180 send_gen = READ_ONCE(cp->cp_send_gen) + 1;
0181 WRITE_ONCE(cp->cp_send_gen, send_gen);
0182
0183
0184
0185
0186
0187 if (!rds_conn_path_up(cp)) {
0188 release_in_xmit(cp);
0189 ret = 0;
0190 goto out;
0191 }
0192
0193 if (conn->c_trans->xmit_path_prepare)
0194 conn->c_trans->xmit_path_prepare(cp);
0195
0196
0197
0198
0199
0200 while (1) {
0201
0202 rm = cp->cp_xmit_rm;
0203
0204 if (!rm) {
0205 same_rm = 0;
0206 } else {
0207 same_rm++;
0208 if (same_rm >= 4096) {
0209 rds_stats_inc(s_send_stuck_rm);
0210 ret = -EAGAIN;
0211 break;
0212 }
0213 }
0214
0215
0216
0217
0218
0219 if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) {
0220 rm = rds_cong_update_alloc(conn);
0221 if (IS_ERR(rm)) {
0222 ret = PTR_ERR(rm);
0223 break;
0224 }
0225 rm->data.op_active = 1;
0226 rm->m_inc.i_conn_path = cp;
0227 rm->m_inc.i_conn = cp->cp_conn;
0228
0229 cp->cp_xmit_rm = rm;
0230 }
0231
0232
0233
0234
0235
0236
0237
0238
0239 if (!rm) {
0240 unsigned int len;
0241
0242 batch_count++;
0243
0244
0245
0246
0247
0248
0249 if (batch_count >= send_batch_count)
0250 goto over_batch;
0251
0252 spin_lock_irqsave(&cp->cp_lock, flags);
0253
0254 if (!list_empty(&cp->cp_send_queue)) {
0255 rm = list_entry(cp->cp_send_queue.next,
0256 struct rds_message,
0257 m_conn_item);
0258 rds_message_addref(rm);
0259
0260
0261
0262
0263
0264 list_move_tail(&rm->m_conn_item,
0265 &cp->cp_retrans);
0266 }
0267
0268 spin_unlock_irqrestore(&cp->cp_lock, flags);
0269
0270 if (!rm)
0271 break;
0272
0273
0274
0275
0276
0277
0278
0279
0280 if (test_bit(RDS_MSG_FLUSH, &rm->m_flags) ||
0281 (rm->rdma.op_active &&
0282 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))) {
0283 spin_lock_irqsave(&cp->cp_lock, flags);
0284 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
0285 list_move(&rm->m_conn_item, &to_be_dropped);
0286 spin_unlock_irqrestore(&cp->cp_lock, flags);
0287 continue;
0288 }
0289
0290
0291 len = ntohl(rm->m_inc.i_hdr.h_len);
0292 if (cp->cp_unacked_packets == 0 ||
0293 cp->cp_unacked_bytes < len) {
0294 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
0295
0296 cp->cp_unacked_packets =
0297 rds_sysctl_max_unacked_packets;
0298 cp->cp_unacked_bytes =
0299 rds_sysctl_max_unacked_bytes;
0300 rds_stats_inc(s_send_ack_required);
0301 } else {
0302 cp->cp_unacked_bytes -= len;
0303 cp->cp_unacked_packets--;
0304 }
0305
0306 cp->cp_xmit_rm = rm;
0307 }
0308
0309
0310 if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) {
0311 rm->m_final_op = &rm->rdma;
0312
0313
0314
0315 set_bit(RDS_MSG_MAPPED, &rm->m_flags);
0316 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
0317 if (ret) {
0318 clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
0319 wake_up_interruptible(&rm->m_flush_wait);
0320 break;
0321 }
0322 cp->cp_xmit_rdma_sent = 1;
0323
0324 }
0325
0326 if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) {
0327 rm->m_final_op = &rm->atomic;
0328
0329
0330
0331 set_bit(RDS_MSG_MAPPED, &rm->m_flags);
0332 ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
0333 if (ret) {
0334 clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
0335 wake_up_interruptible(&rm->m_flush_wait);
0336 break;
0337 }
0338 cp->cp_xmit_atomic_sent = 1;
0339
0340 }
0341
0342
0343
0344
0345
0346
0347
0348
0349 if (rm->data.op_nents == 0) {
0350 int ops_present;
0351 int all_ops_are_silent = 1;
0352
0353 ops_present = (rm->atomic.op_active || rm->rdma.op_active);
0354 if (rm->atomic.op_active && !rm->atomic.op_silent)
0355 all_ops_are_silent = 0;
0356 if (rm->rdma.op_active && !rm->rdma.op_silent)
0357 all_ops_are_silent = 0;
0358
0359 if (ops_present && all_ops_are_silent
0360 && !rm->m_rdma_cookie)
0361 rm->data.op_active = 0;
0362 }
0363
0364 if (rm->data.op_active && !cp->cp_xmit_data_sent) {
0365 rm->m_final_op = &rm->data;
0366
0367 ret = conn->c_trans->xmit(conn, rm,
0368 cp->cp_xmit_hdr_off,
0369 cp->cp_xmit_sg,
0370 cp->cp_xmit_data_off);
0371 if (ret <= 0)
0372 break;
0373
0374 if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) {
0375 tmp = min_t(int, ret,
0376 sizeof(struct rds_header) -
0377 cp->cp_xmit_hdr_off);
0378 cp->cp_xmit_hdr_off += tmp;
0379 ret -= tmp;
0380 }
0381
0382 sg = &rm->data.op_sg[cp->cp_xmit_sg];
0383 while (ret) {
0384 tmp = min_t(int, ret, sg->length -
0385 cp->cp_xmit_data_off);
0386 cp->cp_xmit_data_off += tmp;
0387 ret -= tmp;
0388 if (cp->cp_xmit_data_off == sg->length) {
0389 cp->cp_xmit_data_off = 0;
0390 sg++;
0391 cp->cp_xmit_sg++;
0392 BUG_ON(ret != 0 && cp->cp_xmit_sg ==
0393 rm->data.op_nents);
0394 }
0395 }
0396
0397 if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) &&
0398 (cp->cp_xmit_sg == rm->data.op_nents))
0399 cp->cp_xmit_data_sent = 1;
0400 }
0401
0402
0403
0404
0405
0406
0407 if (!rm->data.op_active || cp->cp_xmit_data_sent) {
0408 cp->cp_xmit_rm = NULL;
0409 cp->cp_xmit_sg = 0;
0410 cp->cp_xmit_hdr_off = 0;
0411 cp->cp_xmit_data_off = 0;
0412 cp->cp_xmit_rdma_sent = 0;
0413 cp->cp_xmit_atomic_sent = 0;
0414 cp->cp_xmit_data_sent = 0;
0415
0416 rds_message_put(rm);
0417 }
0418 }
0419
0420 over_batch:
0421 if (conn->c_trans->xmit_path_complete)
0422 conn->c_trans->xmit_path_complete(cp);
0423 release_in_xmit(cp);
0424
0425
0426 if (!list_empty(&to_be_dropped)) {
0427
0428 list_for_each_entry(rm, &to_be_dropped, m_conn_item)
0429 rds_message_put(rm);
0430 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
0431 }
0432
0433
0434
0435
0436
0437
0438
0439
0440
0441
0442
0443
0444
0445
0446
0447
0448 if (ret == 0) {
0449 bool raced;
0450
0451 smp_mb();
0452 raced = send_gen != READ_ONCE(cp->cp_send_gen);
0453
0454 if ((test_bit(0, &conn->c_map_queued) ||
0455 !list_empty(&cp->cp_send_queue)) && !raced) {
0456 if (batch_count < send_batch_count)
0457 goto restart;
0458 rcu_read_lock();
0459 if (rds_destroy_pending(cp->cp_conn))
0460 ret = -ENETUNREACH;
0461 else
0462 queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
0463 rcu_read_unlock();
0464 } else if (raced) {
0465 rds_stats_inc(s_send_lock_queue_raced);
0466 }
0467 }
0468 out:
0469 return ret;
0470 }
0471 EXPORT_SYMBOL_GPL(rds_send_xmit);
0472
0473 static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
0474 {
0475 u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
0476
0477 assert_spin_locked(&rs->rs_lock);
0478
0479 BUG_ON(rs->rs_snd_bytes < len);
0480 rs->rs_snd_bytes -= len;
0481
0482 if (rs->rs_snd_bytes == 0)
0483 rds_stats_inc(s_send_queue_empty);
0484 }
0485
0486 static inline int rds_send_is_acked(struct rds_message *rm, u64 ack,
0487 is_acked_func is_acked)
0488 {
0489 if (is_acked)
0490 return is_acked(rm, ack);
0491 return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack;
0492 }
0493
0494
0495
0496
0497
0498
0499
0500 void rds_rdma_send_complete(struct rds_message *rm, int status)
0501 {
0502 struct rds_sock *rs = NULL;
0503 struct rm_rdma_op *ro;
0504 struct rds_notifier *notifier;
0505 unsigned long flags;
0506
0507 spin_lock_irqsave(&rm->m_rs_lock, flags);
0508
0509 ro = &rm->rdma;
0510 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
0511 ro->op_active && ro->op_notify && ro->op_notifier) {
0512 notifier = ro->op_notifier;
0513 rs = rm->m_rs;
0514 sock_hold(rds_rs_to_sk(rs));
0515
0516 notifier->n_status = status;
0517 spin_lock(&rs->rs_lock);
0518 list_add_tail(¬ifier->n_list, &rs->rs_notify_queue);
0519 spin_unlock(&rs->rs_lock);
0520
0521 ro->op_notifier = NULL;
0522 }
0523
0524 spin_unlock_irqrestore(&rm->m_rs_lock, flags);
0525
0526 if (rs) {
0527 rds_wake_sk_sleep(rs);
0528 sock_put(rds_rs_to_sk(rs));
0529 }
0530 }
0531 EXPORT_SYMBOL_GPL(rds_rdma_send_complete);
0532
0533
0534
0535
0536 void rds_atomic_send_complete(struct rds_message *rm, int status)
0537 {
0538 struct rds_sock *rs = NULL;
0539 struct rm_atomic_op *ao;
0540 struct rds_notifier *notifier;
0541 unsigned long flags;
0542
0543 spin_lock_irqsave(&rm->m_rs_lock, flags);
0544
0545 ao = &rm->atomic;
0546 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
0547 && ao->op_active && ao->op_notify && ao->op_notifier) {
0548 notifier = ao->op_notifier;
0549 rs = rm->m_rs;
0550 sock_hold(rds_rs_to_sk(rs));
0551
0552 notifier->n_status = status;
0553 spin_lock(&rs->rs_lock);
0554 list_add_tail(¬ifier->n_list, &rs->rs_notify_queue);
0555 spin_unlock(&rs->rs_lock);
0556
0557 ao->op_notifier = NULL;
0558 }
0559
0560 spin_unlock_irqrestore(&rm->m_rs_lock, flags);
0561
0562 if (rs) {
0563 rds_wake_sk_sleep(rs);
0564 sock_put(rds_rs_to_sk(rs));
0565 }
0566 }
0567 EXPORT_SYMBOL_GPL(rds_atomic_send_complete);
0568
0569
0570
0571
0572
0573
0574 static inline void
0575 __rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status)
0576 {
0577 struct rm_rdma_op *ro;
0578 struct rm_atomic_op *ao;
0579
0580 ro = &rm->rdma;
0581 if (ro->op_active && ro->op_notify && ro->op_notifier) {
0582 ro->op_notifier->n_status = status;
0583 list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue);
0584 ro->op_notifier = NULL;
0585 }
0586
0587 ao = &rm->atomic;
0588 if (ao->op_active && ao->op_notify && ao->op_notifier) {
0589 ao->op_notifier->n_status = status;
0590 list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue);
0591 ao->op_notifier = NULL;
0592 }
0593
0594
0595 }
0596
0597
0598
0599
0600
0601
0602
0603
0604
0605 static void rds_send_remove_from_sock(struct list_head *messages, int status)
0606 {
0607 unsigned long flags;
0608 struct rds_sock *rs = NULL;
0609 struct rds_message *rm;
0610
0611 while (!list_empty(messages)) {
0612 int was_on_sock = 0;
0613
0614 rm = list_entry(messages->next, struct rds_message,
0615 m_conn_item);
0616 list_del_init(&rm->m_conn_item);
0617
0618
0619
0620
0621
0622
0623
0624
0625
0626
0627
0628 spin_lock_irqsave(&rm->m_rs_lock, flags);
0629 if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags))
0630 goto unlock_and_drop;
0631
0632 if (rs != rm->m_rs) {
0633 if (rs) {
0634 rds_wake_sk_sleep(rs);
0635 sock_put(rds_rs_to_sk(rs));
0636 }
0637 rs = rm->m_rs;
0638 if (rs)
0639 sock_hold(rds_rs_to_sk(rs));
0640 }
0641 if (!rs)
0642 goto unlock_and_drop;
0643 spin_lock(&rs->rs_lock);
0644
0645 if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) {
0646 struct rm_rdma_op *ro = &rm->rdma;
0647 struct rds_notifier *notifier;
0648
0649 list_del_init(&rm->m_sock_item);
0650 rds_send_sndbuf_remove(rs, rm);
0651
0652 if (ro->op_active && ro->op_notifier &&
0653 (ro->op_notify || (ro->op_recverr && status))) {
0654 notifier = ro->op_notifier;
0655 list_add_tail(¬ifier->n_list,
0656 &rs->rs_notify_queue);
0657 if (!notifier->n_status)
0658 notifier->n_status = status;
0659 rm->rdma.op_notifier = NULL;
0660 }
0661 was_on_sock = 1;
0662 }
0663 spin_unlock(&rs->rs_lock);
0664
0665 unlock_and_drop:
0666 spin_unlock_irqrestore(&rm->m_rs_lock, flags);
0667 rds_message_put(rm);
0668 if (was_on_sock)
0669 rds_message_put(rm);
0670 }
0671
0672 if (rs) {
0673 rds_wake_sk_sleep(rs);
0674 sock_put(rds_rs_to_sk(rs));
0675 }
0676 }
0677
0678
0679
0680
0681
0682
0683
0684
0685
0686 void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack,
0687 is_acked_func is_acked)
0688 {
0689 struct rds_message *rm, *tmp;
0690 unsigned long flags;
0691 LIST_HEAD(list);
0692
0693 spin_lock_irqsave(&cp->cp_lock, flags);
0694
0695 list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
0696 if (!rds_send_is_acked(rm, ack, is_acked))
0697 break;
0698
0699 list_move(&rm->m_conn_item, &list);
0700 clear_bit(RDS_MSG_ON_CONN, &rm->m_flags);
0701 }
0702
0703
0704 if (!list_empty(&list))
0705 smp_mb__after_atomic();
0706
0707 spin_unlock_irqrestore(&cp->cp_lock, flags);
0708
0709
0710 rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
0711 }
0712 EXPORT_SYMBOL_GPL(rds_send_path_drop_acked);
0713
0714 void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
0715 is_acked_func is_acked)
0716 {
0717 WARN_ON(conn->c_trans->t_mp_capable);
0718 rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked);
0719 }
0720 EXPORT_SYMBOL_GPL(rds_send_drop_acked);
0721
0722 void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
0723 {
0724 struct rds_message *rm, *tmp;
0725 struct rds_connection *conn;
0726 struct rds_conn_path *cp;
0727 unsigned long flags;
0728 LIST_HEAD(list);
0729
0730
0731 spin_lock_irqsave(&rs->rs_lock, flags);
0732
0733 list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) {
0734 if (dest &&
0735 (!ipv6_addr_equal(&dest->sin6_addr, &rm->m_daddr) ||
0736 dest->sin6_port != rm->m_inc.i_hdr.h_dport))
0737 continue;
0738
0739 list_move(&rm->m_sock_item, &list);
0740 rds_send_sndbuf_remove(rs, rm);
0741 clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
0742 }
0743
0744
0745 smp_mb__after_atomic();
0746
0747 spin_unlock_irqrestore(&rs->rs_lock, flags);
0748
0749 if (list_empty(&list))
0750 return;
0751
0752
0753 list_for_each_entry(rm, &list, m_sock_item) {
0754
0755 conn = rm->m_inc.i_conn;
0756 if (conn->c_trans->t_mp_capable)
0757 cp = rm->m_inc.i_conn_path;
0758 else
0759 cp = &conn->c_path[0];
0760
0761 spin_lock_irqsave(&cp->cp_lock, flags);
0762
0763
0764
0765
0766
0767 if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
0768 spin_unlock_irqrestore(&cp->cp_lock, flags);
0769 continue;
0770 }
0771 list_del_init(&rm->m_conn_item);
0772 spin_unlock_irqrestore(&cp->cp_lock, flags);
0773
0774
0775
0776
0777
0778 spin_lock_irqsave(&rm->m_rs_lock, flags);
0779
0780 spin_lock(&rs->rs_lock);
0781 __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
0782 spin_unlock(&rs->rs_lock);
0783
0784 spin_unlock_irqrestore(&rm->m_rs_lock, flags);
0785
0786 rds_message_put(rm);
0787 }
0788
0789 rds_wake_sk_sleep(rs);
0790
0791 while (!list_empty(&list)) {
0792 rm = list_entry(list.next, struct rds_message, m_sock_item);
0793 list_del_init(&rm->m_sock_item);
0794 rds_message_wait(rm);
0795
0796
0797
0798
0799
0800
0801 spin_lock_irqsave(&rm->m_rs_lock, flags);
0802
0803 spin_lock(&rs->rs_lock);
0804 __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
0805 spin_unlock(&rs->rs_lock);
0806
0807 spin_unlock_irqrestore(&rm->m_rs_lock, flags);
0808
0809 rds_message_put(rm);
0810 }
0811 }
0812
0813
0814
0815
0816
0817
0818 static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
0819 struct rds_conn_path *cp,
0820 struct rds_message *rm, __be16 sport,
0821 __be16 dport, int *queued)
0822 {
0823 unsigned long flags;
0824 u32 len;
0825
0826 if (*queued)
0827 goto out;
0828
0829 len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
0830
0831
0832
0833 spin_lock_irqsave(&rs->rs_lock, flags);
0834
0835
0836
0837
0838
0839
0840
0841
0842
0843 if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) {
0844 rs->rs_snd_bytes += len;
0845
0846
0847
0848
0849
0850
0851 if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2)
0852 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
0853
0854 list_add_tail(&rm->m_sock_item, &rs->rs_send_queue);
0855 set_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
0856 rds_message_addref(rm);
0857 sock_hold(rds_rs_to_sk(rs));
0858 rm->m_rs = rs;
0859
0860
0861
0862 rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0);
0863 rm->m_inc.i_conn = conn;
0864 rm->m_inc.i_conn_path = cp;
0865 rds_message_addref(rm);
0866
0867 spin_lock(&cp->cp_lock);
0868 rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++);
0869 list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
0870 set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
0871 spin_unlock(&cp->cp_lock);
0872
0873 rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n",
0874 rm, len, rs, rs->rs_snd_bytes,
0875 (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence));
0876
0877 *queued = 1;
0878 }
0879
0880 spin_unlock_irqrestore(&rs->rs_lock, flags);
0881 out:
0882 return *queued;
0883 }
0884
0885
0886
0887
0888
0889 static int rds_rm_size(struct msghdr *msg, int num_sgs,
0890 struct rds_iov_vector_arr *vct)
0891 {
0892 struct cmsghdr *cmsg;
0893 int size = 0;
0894 int cmsg_groups = 0;
0895 int retval;
0896 bool zcopy_cookie = false;
0897 struct rds_iov_vector *iov, *tmp_iov;
0898
0899 if (num_sgs < 0)
0900 return -EINVAL;
0901
0902 for_each_cmsghdr(cmsg, msg) {
0903 if (!CMSG_OK(msg, cmsg))
0904 return -EINVAL;
0905
0906 if (cmsg->cmsg_level != SOL_RDS)
0907 continue;
0908
0909 switch (cmsg->cmsg_type) {
0910 case RDS_CMSG_RDMA_ARGS:
0911 if (vct->indx >= vct->len) {
0912 vct->len += vct->incr;
0913 tmp_iov =
0914 krealloc(vct->vec,
0915 vct->len *
0916 sizeof(struct rds_iov_vector),
0917 GFP_KERNEL);
0918 if (!tmp_iov) {
0919 vct->len -= vct->incr;
0920 return -ENOMEM;
0921 }
0922 vct->vec = tmp_iov;
0923 }
0924 iov = &vct->vec[vct->indx];
0925 memset(iov, 0, sizeof(struct rds_iov_vector));
0926 vct->indx++;
0927 cmsg_groups |= 1;
0928 retval = rds_rdma_extra_size(CMSG_DATA(cmsg), iov);
0929 if (retval < 0)
0930 return retval;
0931 size += retval;
0932
0933 break;
0934
0935 case RDS_CMSG_ZCOPY_COOKIE:
0936 zcopy_cookie = true;
0937 fallthrough;
0938
0939 case RDS_CMSG_RDMA_DEST:
0940 case RDS_CMSG_RDMA_MAP:
0941 cmsg_groups |= 2;
0942
0943 break;
0944
0945 case RDS_CMSG_ATOMIC_CSWP:
0946 case RDS_CMSG_ATOMIC_FADD:
0947 case RDS_CMSG_MASKED_ATOMIC_CSWP:
0948 case RDS_CMSG_MASKED_ATOMIC_FADD:
0949 cmsg_groups |= 1;
0950 size += sizeof(struct scatterlist);
0951 break;
0952
0953 default:
0954 return -EINVAL;
0955 }
0956
0957 }
0958
0959 if ((msg->msg_flags & MSG_ZEROCOPY) && !zcopy_cookie)
0960 return -EINVAL;
0961
0962 size += num_sgs * sizeof(struct scatterlist);
0963
0964
0965 if (cmsg_groups == 3)
0966 return -EINVAL;
0967
0968 return size;
0969 }
0970
0971 static int rds_cmsg_zcopy(struct rds_sock *rs, struct rds_message *rm,
0972 struct cmsghdr *cmsg)
0973 {
0974 u32 *cookie;
0975
0976 if (cmsg->cmsg_len < CMSG_LEN(sizeof(*cookie)) ||
0977 !rm->data.op_mmp_znotifier)
0978 return -EINVAL;
0979 cookie = CMSG_DATA(cmsg);
0980 rm->data.op_mmp_znotifier->z_cookie = *cookie;
0981 return 0;
0982 }
0983
0984 static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
0985 struct msghdr *msg, int *allocated_mr,
0986 struct rds_iov_vector_arr *vct)
0987 {
0988 struct cmsghdr *cmsg;
0989 int ret = 0, ind = 0;
0990
0991 for_each_cmsghdr(cmsg, msg) {
0992 if (!CMSG_OK(msg, cmsg))
0993 return -EINVAL;
0994
0995 if (cmsg->cmsg_level != SOL_RDS)
0996 continue;
0997
0998
0999
1000
1001 switch (cmsg->cmsg_type) {
1002 case RDS_CMSG_RDMA_ARGS:
1003 if (ind >= vct->indx)
1004 return -ENOMEM;
1005 ret = rds_cmsg_rdma_args(rs, rm, cmsg, &vct->vec[ind]);
1006 ind++;
1007 break;
1008
1009 case RDS_CMSG_RDMA_DEST:
1010 ret = rds_cmsg_rdma_dest(rs, rm, cmsg);
1011 break;
1012
1013 case RDS_CMSG_RDMA_MAP:
1014 ret = rds_cmsg_rdma_map(rs, rm, cmsg);
1015 if (!ret)
1016 *allocated_mr = 1;
1017 else if (ret == -ENODEV)
1018
1019
1020
1021 ret = -EAGAIN;
1022 break;
1023 case RDS_CMSG_ATOMIC_CSWP:
1024 case RDS_CMSG_ATOMIC_FADD:
1025 case RDS_CMSG_MASKED_ATOMIC_CSWP:
1026 case RDS_CMSG_MASKED_ATOMIC_FADD:
1027 ret = rds_cmsg_atomic(rs, rm, cmsg);
1028 break;
1029
1030 case RDS_CMSG_ZCOPY_COOKIE:
1031 ret = rds_cmsg_zcopy(rs, rm, cmsg);
1032 break;
1033
1034 default:
1035 return -EINVAL;
1036 }
1037
1038 if (ret)
1039 break;
1040 }
1041
1042 return ret;
1043 }
1044
1045 static int rds_send_mprds_hash(struct rds_sock *rs,
1046 struct rds_connection *conn, int nonblock)
1047 {
1048 int hash;
1049
1050 if (conn->c_npaths == 0)
1051 hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
1052 else
1053 hash = RDS_MPATH_HASH(rs, conn->c_npaths);
1054 if (conn->c_npaths == 0 && hash != 0) {
1055 rds_send_ping(conn, 0);
1056
1057
1058
1059
1060
1061
1062 if (conn->c_npaths == 0) {
1063
1064
1065
1066 if (nonblock)
1067 return 0;
1068 if (wait_event_interruptible(conn->c_hs_waitq,
1069 conn->c_npaths != 0))
1070 hash = 0;
1071 }
1072 if (conn->c_npaths == 1)
1073 hash = 0;
1074 }
1075 return hash;
1076 }
1077
1078 static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes)
1079 {
1080 struct rds_rdma_args *args;
1081 struct cmsghdr *cmsg;
1082
1083 for_each_cmsghdr(cmsg, msg) {
1084 if (!CMSG_OK(msg, cmsg))
1085 return -EINVAL;
1086
1087 if (cmsg->cmsg_level != SOL_RDS)
1088 continue;
1089
1090 if (cmsg->cmsg_type == RDS_CMSG_RDMA_ARGS) {
1091 if (cmsg->cmsg_len <
1092 CMSG_LEN(sizeof(struct rds_rdma_args)))
1093 return -EINVAL;
1094 args = CMSG_DATA(cmsg);
1095 *rdma_bytes += args->remote_vec.bytes;
1096 }
1097 }
1098 return 0;
1099 }
1100
1101 int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
1102 {
1103 struct sock *sk = sock->sk;
1104 struct rds_sock *rs = rds_sk_to_rs(sk);
1105 DECLARE_SOCKADDR(struct sockaddr_in6 *, sin6, msg->msg_name);
1106 DECLARE_SOCKADDR(struct sockaddr_in *, usin, msg->msg_name);
1107 __be16 dport;
1108 struct rds_message *rm = NULL;
1109 struct rds_connection *conn;
1110 int ret = 0;
1111 int queued = 0, allocated_mr = 0;
1112 int nonblock = msg->msg_flags & MSG_DONTWAIT;
1113 long timeo = sock_sndtimeo(sk, nonblock);
1114 struct rds_conn_path *cpath;
1115 struct in6_addr daddr;
1116 __u32 scope_id = 0;
1117 size_t total_payload_len = payload_len, rdma_payload_len = 0;
1118 bool zcopy = ((msg->msg_flags & MSG_ZEROCOPY) &&
1119 sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY));
1120 int num_sgs = DIV_ROUND_UP(payload_len, PAGE_SIZE);
1121 int namelen;
1122 struct rds_iov_vector_arr vct;
1123 int ind;
1124
1125 memset(&vct, 0, sizeof(vct));
1126
1127
1128 vct.incr = 1;
1129
1130
1131
1132 if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT | MSG_ZEROCOPY)) {
1133 ret = -EOPNOTSUPP;
1134 goto out;
1135 }
1136
1137 namelen = msg->msg_namelen;
1138 if (namelen != 0) {
1139 if (namelen < sizeof(*usin)) {
1140 ret = -EINVAL;
1141 goto out;
1142 }
1143 switch (usin->sin_family) {
1144 case AF_INET:
1145 if (usin->sin_addr.s_addr == htonl(INADDR_ANY) ||
1146 usin->sin_addr.s_addr == htonl(INADDR_BROADCAST) ||
1147 ipv4_is_multicast(usin->sin_addr.s_addr)) {
1148 ret = -EINVAL;
1149 goto out;
1150 }
1151 ipv6_addr_set_v4mapped(usin->sin_addr.s_addr, &daddr);
1152 dport = usin->sin_port;
1153 break;
1154
1155 #if IS_ENABLED(CONFIG_IPV6)
1156 case AF_INET6: {
1157 int addr_type;
1158
1159 if (namelen < sizeof(*sin6)) {
1160 ret = -EINVAL;
1161 goto out;
1162 }
1163 addr_type = ipv6_addr_type(&sin6->sin6_addr);
1164 if (!(addr_type & IPV6_ADDR_UNICAST)) {
1165 __be32 addr4;
1166
1167 if (!(addr_type & IPV6_ADDR_MAPPED)) {
1168 ret = -EINVAL;
1169 goto out;
1170 }
1171
1172
1173
1174
1175 addr4 = sin6->sin6_addr.s6_addr32[3];
1176 if (addr4 == htonl(INADDR_ANY) ||
1177 addr4 == htonl(INADDR_BROADCAST) ||
1178 ipv4_is_multicast(addr4)) {
1179 ret = -EINVAL;
1180 goto out;
1181 }
1182 }
1183 if (addr_type & IPV6_ADDR_LINKLOCAL) {
1184 if (sin6->sin6_scope_id == 0) {
1185 ret = -EINVAL;
1186 goto out;
1187 }
1188 scope_id = sin6->sin6_scope_id;
1189 }
1190
1191 daddr = sin6->sin6_addr;
1192 dport = sin6->sin6_port;
1193 break;
1194 }
1195 #endif
1196
1197 default:
1198 ret = -EINVAL;
1199 goto out;
1200 }
1201 } else {
1202
1203 lock_sock(sk);
1204 daddr = rs->rs_conn_addr;
1205 dport = rs->rs_conn_port;
1206 scope_id = rs->rs_bound_scope_id;
1207 release_sock(sk);
1208 }
1209
1210 lock_sock(sk);
1211 if (ipv6_addr_any(&rs->rs_bound_addr) || ipv6_addr_any(&daddr)) {
1212 release_sock(sk);
1213 ret = -ENOTCONN;
1214 goto out;
1215 } else if (namelen != 0) {
1216
1217
1218
1219
1220 if (ipv6_addr_v4mapped(&daddr) ^
1221 ipv6_addr_v4mapped(&rs->rs_bound_addr)) {
1222 release_sock(sk);
1223 ret = -EOPNOTSUPP;
1224 goto out;
1225 }
1226
1227
1228
1229
1230 if (scope_id != rs->rs_bound_scope_id) {
1231 if (!scope_id) {
1232 scope_id = rs->rs_bound_scope_id;
1233 } else if (rs->rs_bound_scope_id) {
1234 release_sock(sk);
1235 ret = -EINVAL;
1236 goto out;
1237 }
1238 }
1239 }
1240 release_sock(sk);
1241
1242 ret = rds_rdma_bytes(msg, &rdma_payload_len);
1243 if (ret)
1244 goto out;
1245
1246 total_payload_len += rdma_payload_len;
1247 if (max_t(size_t, payload_len, rdma_payload_len) > RDS_MAX_MSG_SIZE) {
1248 ret = -EMSGSIZE;
1249 goto out;
1250 }
1251
1252 if (payload_len > rds_sk_sndbuf(rs)) {
1253 ret = -EMSGSIZE;
1254 goto out;
1255 }
1256
1257 if (zcopy) {
1258 if (rs->rs_transport->t_type != RDS_TRANS_TCP) {
1259 ret = -EOPNOTSUPP;
1260 goto out;
1261 }
1262 num_sgs = iov_iter_npages(&msg->msg_iter, INT_MAX);
1263 }
1264
1265 ret = rds_rm_size(msg, num_sgs, &vct);
1266 if (ret < 0)
1267 goto out;
1268
1269 rm = rds_message_alloc(ret, GFP_KERNEL);
1270 if (!rm) {
1271 ret = -ENOMEM;
1272 goto out;
1273 }
1274
1275
1276 if (payload_len) {
1277 rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs);
1278 if (IS_ERR(rm->data.op_sg)) {
1279 ret = PTR_ERR(rm->data.op_sg);
1280 goto out;
1281 }
1282 ret = rds_message_copy_from_user(rm, &msg->msg_iter, zcopy);
1283 if (ret)
1284 goto out;
1285 }
1286 rm->data.op_active = 1;
1287
1288 rm->m_daddr = daddr;
1289
1290
1291
1292 if (rs->rs_conn && ipv6_addr_equal(&rs->rs_conn->c_faddr, &daddr) &&
1293 rs->rs_tos == rs->rs_conn->c_tos) {
1294 conn = rs->rs_conn;
1295 } else {
1296 conn = rds_conn_create_outgoing(sock_net(sock->sk),
1297 &rs->rs_bound_addr, &daddr,
1298 rs->rs_transport, rs->rs_tos,
1299 sock->sk->sk_allocation,
1300 scope_id);
1301 if (IS_ERR(conn)) {
1302 ret = PTR_ERR(conn);
1303 goto out;
1304 }
1305 rs->rs_conn = conn;
1306 }
1307
1308 if (conn->c_trans->t_mp_capable)
1309 cpath = &conn->c_path[rds_send_mprds_hash(rs, conn, nonblock)];
1310 else
1311 cpath = &conn->c_path[0];
1312
1313 rm->m_conn_path = cpath;
1314
1315
1316 ret = rds_cmsg_send(rs, rm, msg, &allocated_mr, &vct);
1317 if (ret) {
1318
1319 if (ret == -EAGAIN)
1320 rds_conn_connect_if_down(conn);
1321 goto out;
1322 }
1323
1324 if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
1325 printk_ratelimited(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
1326 &rm->rdma, conn->c_trans->xmit_rdma);
1327 ret = -EOPNOTSUPP;
1328 goto out;
1329 }
1330
1331 if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) {
1332 printk_ratelimited(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n",
1333 &rm->atomic, conn->c_trans->xmit_atomic);
1334 ret = -EOPNOTSUPP;
1335 goto out;
1336 }
1337
1338 if (rds_destroy_pending(conn)) {
1339 ret = -EAGAIN;
1340 goto out;
1341 }
1342
1343 if (rds_conn_path_down(cpath))
1344 rds_check_all_paths(conn);
1345
1346 ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
1347 if (ret) {
1348 rs->rs_seen_congestion = 1;
1349 goto out;
1350 }
1351 while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port,
1352 dport, &queued)) {
1353 rds_stats_inc(s_send_queue_full);
1354
1355 if (nonblock) {
1356 ret = -EAGAIN;
1357 goto out;
1358 }
1359
1360 timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
1361 rds_send_queue_rm(rs, conn, cpath, rm,
1362 rs->rs_bound_port,
1363 dport,
1364 &queued),
1365 timeo);
1366 rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo);
1367 if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
1368 continue;
1369
1370 ret = timeo;
1371 if (ret == 0)
1372 ret = -ETIMEDOUT;
1373 goto out;
1374 }
1375
1376
1377
1378
1379
1380 rds_stats_inc(s_send_queued);
1381
1382 ret = rds_send_xmit(cpath);
1383 if (ret == -ENOMEM || ret == -EAGAIN) {
1384 ret = 0;
1385 rcu_read_lock();
1386 if (rds_destroy_pending(cpath->cp_conn))
1387 ret = -ENETUNREACH;
1388 else
1389 queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
1390 rcu_read_unlock();
1391 }
1392 if (ret)
1393 goto out;
1394 rds_message_put(rm);
1395
1396 for (ind = 0; ind < vct.indx; ind++)
1397 kfree(vct.vec[ind].iov);
1398 kfree(vct.vec);
1399
1400 return payload_len;
1401
1402 out:
1403 for (ind = 0; ind < vct.indx; ind++)
1404 kfree(vct.vec[ind].iov);
1405 kfree(vct.vec);
1406
1407
1408
1409
1410 if (allocated_mr)
1411 rds_rdma_unuse(rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1);
1412
1413 if (rm)
1414 rds_message_put(rm);
1415 return ret;
1416 }
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426 static int
1427 rds_send_probe(struct rds_conn_path *cp, __be16 sport,
1428 __be16 dport, u8 h_flags)
1429 {
1430 struct rds_message *rm;
1431 unsigned long flags;
1432 int ret = 0;
1433
1434 rm = rds_message_alloc(0, GFP_ATOMIC);
1435 if (!rm) {
1436 ret = -ENOMEM;
1437 goto out;
1438 }
1439
1440 rm->m_daddr = cp->cp_conn->c_faddr;
1441 rm->data.op_active = 1;
1442
1443 rds_conn_path_connect_if_down(cp);
1444
1445 ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL);
1446 if (ret)
1447 goto out;
1448
1449 spin_lock_irqsave(&cp->cp_lock, flags);
1450 list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
1451 set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
1452 rds_message_addref(rm);
1453 rm->m_inc.i_conn = cp->cp_conn;
1454 rm->m_inc.i_conn_path = cp;
1455
1456 rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport,
1457 cp->cp_next_tx_seq);
1458 rm->m_inc.i_hdr.h_flags |= h_flags;
1459 cp->cp_next_tx_seq++;
1460
1461 if (RDS_HS_PROBE(be16_to_cpu(sport), be16_to_cpu(dport)) &&
1462 cp->cp_conn->c_trans->t_mp_capable) {
1463 u16 npaths = cpu_to_be16(RDS_MPATH_WORKERS);
1464 u32 my_gen_num = cpu_to_be32(cp->cp_conn->c_my_gen_num);
1465
1466 rds_message_add_extension(&rm->m_inc.i_hdr,
1467 RDS_EXTHDR_NPATHS, &npaths,
1468 sizeof(npaths));
1469 rds_message_add_extension(&rm->m_inc.i_hdr,
1470 RDS_EXTHDR_GEN_NUM,
1471 &my_gen_num,
1472 sizeof(u32));
1473 }
1474 spin_unlock_irqrestore(&cp->cp_lock, flags);
1475
1476 rds_stats_inc(s_send_queued);
1477 rds_stats_inc(s_send_pong);
1478
1479
1480 rcu_read_lock();
1481 if (!rds_destroy_pending(cp->cp_conn))
1482 queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
1483 rcu_read_unlock();
1484
1485 rds_message_put(rm);
1486 return 0;
1487
1488 out:
1489 if (rm)
1490 rds_message_put(rm);
1491 return ret;
1492 }
1493
1494 int
1495 rds_send_pong(struct rds_conn_path *cp, __be16 dport)
1496 {
1497 return rds_send_probe(cp, 0, dport, 0);
1498 }
1499
1500 void
1501 rds_send_ping(struct rds_connection *conn, int cp_index)
1502 {
1503 unsigned long flags;
1504 struct rds_conn_path *cp = &conn->c_path[cp_index];
1505
1506 spin_lock_irqsave(&cp->cp_lock, flags);
1507 if (conn->c_ping_triggered) {
1508 spin_unlock_irqrestore(&cp->cp_lock, flags);
1509 return;
1510 }
1511 conn->c_ping_triggered = 1;
1512 spin_unlock_irqrestore(&cp->cp_lock, flags);
1513 rds_send_probe(cp, cpu_to_be16(RDS_FLAG_PROBE_PORT), 0, 0);
1514 }
1515 EXPORT_SYMBOL_GPL(rds_send_ping);