Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
0003  *
0004  * This software is available to you under a choice of one of two
0005  * licenses.  You may choose to be licensed under the terms of the GNU
0006  * General Public License (GPL) Version 2, available from the file
0007  * COPYING in the main directory of this source tree, or the
0008  * OpenIB.org BSD license below:
0009  *
0010  *     Redistribution and use in source and binary forms, with or
0011  *     without modification, are permitted provided that the following
0012  *     conditions are met:
0013  *
0014  *      - Redistributions of source code must retain the above
0015  *        copyright notice, this list of conditions and the following
0016  *        disclaimer.
0017  *
0018  *      - Redistributions in binary form must reproduce the above
0019  *        copyright notice, this list of conditions and the following
0020  *        disclaimer in the documentation and/or other materials
0021  *        provided with the distribution.
0022  *
0023  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
0024  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
0025  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
0026  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
0027  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
0028  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
0029  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
0030  * SOFTWARE.
0031  *
0032  */
0033 #include <linux/kernel.h>
0034 #include <linux/moduleparam.h>
0035 #include <linux/gfp.h>
0036 #include <net/sock.h>
0037 #include <linux/in.h>
0038 #include <linux/list.h>
0039 #include <linux/ratelimit.h>
0040 #include <linux/export.h>
0041 #include <linux/sizes.h>
0042 
0043 #include "rds.h"
0044 
0045 /* When transmitting messages in rds_send_xmit, we need to emerge from
0046  * time to time and briefly release the CPU. Otherwise the softlock watchdog
0047  * will kick our shin.
0048  * Also, it seems fairer to not let one busy connection stall all the
0049  * others.
0050  *
0051  * send_batch_count is the number of times we'll loop in send_xmit. Setting
0052  * it to 0 will restore the old behavior (where we looped until we had
0053  * drained the queue).
0054  */
0055 static int send_batch_count = SZ_1K;
0056 module_param(send_batch_count, int, 0444);
0057 MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
0058 
0059 static void rds_send_remove_from_sock(struct list_head *messages, int status);
0060 
0061 /*
0062  * Reset the send state.  Callers must ensure that this doesn't race with
0063  * rds_send_xmit().
0064  */
0065 void rds_send_path_reset(struct rds_conn_path *cp)
0066 {
0067     struct rds_message *rm, *tmp;
0068     unsigned long flags;
0069 
0070     if (cp->cp_xmit_rm) {
0071         rm = cp->cp_xmit_rm;
0072         cp->cp_xmit_rm = NULL;
0073         /* Tell the user the RDMA op is no longer mapped by the
0074          * transport. This isn't entirely true (it's flushed out
0075          * independently) but as the connection is down, there's
0076          * no ongoing RDMA to/from that memory */
0077         rds_message_unmapped(rm);
0078         rds_message_put(rm);
0079     }
0080 
0081     cp->cp_xmit_sg = 0;
0082     cp->cp_xmit_hdr_off = 0;
0083     cp->cp_xmit_data_off = 0;
0084     cp->cp_xmit_atomic_sent = 0;
0085     cp->cp_xmit_rdma_sent = 0;
0086     cp->cp_xmit_data_sent = 0;
0087 
0088     cp->cp_conn->c_map_queued = 0;
0089 
0090     cp->cp_unacked_packets = rds_sysctl_max_unacked_packets;
0091     cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes;
0092 
0093     /* Mark messages as retransmissions, and move them to the send q */
0094     spin_lock_irqsave(&cp->cp_lock, flags);
0095     list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
0096         set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
0097         set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
0098     }
0099     list_splice_init(&cp->cp_retrans, &cp->cp_send_queue);
0100     spin_unlock_irqrestore(&cp->cp_lock, flags);
0101 }
0102 EXPORT_SYMBOL_GPL(rds_send_path_reset);
0103 
0104 static int acquire_in_xmit(struct rds_conn_path *cp)
0105 {
0106     return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0;
0107 }
0108 
0109 static void release_in_xmit(struct rds_conn_path *cp)
0110 {
0111     clear_bit(RDS_IN_XMIT, &cp->cp_flags);
0112     smp_mb__after_atomic();
0113     /*
0114      * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
0115      * hot path and finding waiters is very rare.  We don't want to walk
0116      * the system-wide hashed waitqueue buckets in the fast path only to
0117      * almost never find waiters.
0118      */
0119     if (waitqueue_active(&cp->cp_waitq))
0120         wake_up_all(&cp->cp_waitq);
0121 }
0122 
0123 /*
0124  * We're making the conscious trade-off here to only send one message
0125  * down the connection at a time.
0126  *   Pro:
0127  *      - tx queueing is a simple fifo list
0128  *      - reassembly is optional and easily done by transports per conn
0129  *      - no per flow rx lookup at all, straight to the socket
0130  *      - less per-frag memory and wire overhead
0131  *   Con:
0132  *      - queued acks can be delayed behind large messages
0133  *   Depends:
0134  *      - small message latency is higher behind queued large messages
0135  *      - large message latency isn't starved by intervening small sends
0136  */
0137 int rds_send_xmit(struct rds_conn_path *cp)
0138 {
0139     struct rds_connection *conn = cp->cp_conn;
0140     struct rds_message *rm;
0141     unsigned long flags;
0142     unsigned int tmp;
0143     struct scatterlist *sg;
0144     int ret = 0;
0145     LIST_HEAD(to_be_dropped);
0146     int batch_count;
0147     unsigned long send_gen = 0;
0148     int same_rm = 0;
0149 
0150 restart:
0151     batch_count = 0;
0152 
0153     /*
0154      * sendmsg calls here after having queued its message on the send
0155      * queue.  We only have one task feeding the connection at a time.  If
0156      * another thread is already feeding the queue then we back off.  This
0157      * avoids blocking the caller and trading per-connection data between
0158      * caches per message.
0159      */
0160     if (!acquire_in_xmit(cp)) {
0161         rds_stats_inc(s_send_lock_contention);
0162         ret = -ENOMEM;
0163         goto out;
0164     }
0165 
0166     if (rds_destroy_pending(cp->cp_conn)) {
0167         release_in_xmit(cp);
0168         ret = -ENETUNREACH; /* dont requeue send work */
0169         goto out;
0170     }
0171 
0172     /*
0173      * we record the send generation after doing the xmit acquire.
0174      * if someone else manages to jump in and do some work, we'll use
0175      * this to avoid a goto restart farther down.
0176      *
0177      * The acquire_in_xmit() check above ensures that only one
0178      * caller can increment c_send_gen at any time.
0179      */
0180     send_gen = READ_ONCE(cp->cp_send_gen) + 1;
0181     WRITE_ONCE(cp->cp_send_gen, send_gen);
0182 
0183     /*
0184      * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
0185      * we do the opposite to avoid races.
0186      */
0187     if (!rds_conn_path_up(cp)) {
0188         release_in_xmit(cp);
0189         ret = 0;
0190         goto out;
0191     }
0192 
0193     if (conn->c_trans->xmit_path_prepare)
0194         conn->c_trans->xmit_path_prepare(cp);
0195 
0196     /*
0197      * spin trying to push headers and data down the connection until
0198      * the connection doesn't make forward progress.
0199      */
0200     while (1) {
0201 
0202         rm = cp->cp_xmit_rm;
0203 
0204         if (!rm) {
0205             same_rm = 0;
0206         } else {
0207             same_rm++;
0208             if (same_rm >= 4096) {
0209                 rds_stats_inc(s_send_stuck_rm);
0210                 ret = -EAGAIN;
0211                 break;
0212             }
0213         }
0214 
0215         /*
0216          * If between sending messages, we can send a pending congestion
0217          * map update.
0218          */
0219         if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) {
0220             rm = rds_cong_update_alloc(conn);
0221             if (IS_ERR(rm)) {
0222                 ret = PTR_ERR(rm);
0223                 break;
0224             }
0225             rm->data.op_active = 1;
0226             rm->m_inc.i_conn_path = cp;
0227             rm->m_inc.i_conn = cp->cp_conn;
0228 
0229             cp->cp_xmit_rm = rm;
0230         }
0231 
0232         /*
0233          * If not already working on one, grab the next message.
0234          *
0235          * cp_xmit_rm holds a ref while we're sending this message down
0236          * the connction.  We can use this ref while holding the
0237          * send_sem.. rds_send_reset() is serialized with it.
0238          */
0239         if (!rm) {
0240             unsigned int len;
0241 
0242             batch_count++;
0243 
0244             /* we want to process as big a batch as we can, but
0245              * we also want to avoid softlockups.  If we've been
0246              * through a lot of messages, lets back off and see
0247              * if anyone else jumps in
0248              */
0249             if (batch_count >= send_batch_count)
0250                 goto over_batch;
0251 
0252             spin_lock_irqsave(&cp->cp_lock, flags);
0253 
0254             if (!list_empty(&cp->cp_send_queue)) {
0255                 rm = list_entry(cp->cp_send_queue.next,
0256                         struct rds_message,
0257                         m_conn_item);
0258                 rds_message_addref(rm);
0259 
0260                 /*
0261                  * Move the message from the send queue to the retransmit
0262                  * list right away.
0263                  */
0264                 list_move_tail(&rm->m_conn_item,
0265                            &cp->cp_retrans);
0266             }
0267 
0268             spin_unlock_irqrestore(&cp->cp_lock, flags);
0269 
0270             if (!rm)
0271                 break;
0272 
0273             /* Unfortunately, the way Infiniband deals with
0274              * RDMA to a bad MR key is by moving the entire
0275              * queue pair to error state. We could possibly
0276              * recover from that, but right now we drop the
0277              * connection.
0278              * Therefore, we never retransmit messages with RDMA ops.
0279              */
0280             if (test_bit(RDS_MSG_FLUSH, &rm->m_flags) ||
0281                 (rm->rdma.op_active &&
0282                 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))) {
0283                 spin_lock_irqsave(&cp->cp_lock, flags);
0284                 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
0285                     list_move(&rm->m_conn_item, &to_be_dropped);
0286                 spin_unlock_irqrestore(&cp->cp_lock, flags);
0287                 continue;
0288             }
0289 
0290             /* Require an ACK every once in a while */
0291             len = ntohl(rm->m_inc.i_hdr.h_len);
0292             if (cp->cp_unacked_packets == 0 ||
0293                 cp->cp_unacked_bytes < len) {
0294                 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
0295 
0296                 cp->cp_unacked_packets =
0297                     rds_sysctl_max_unacked_packets;
0298                 cp->cp_unacked_bytes =
0299                     rds_sysctl_max_unacked_bytes;
0300                 rds_stats_inc(s_send_ack_required);
0301             } else {
0302                 cp->cp_unacked_bytes -= len;
0303                 cp->cp_unacked_packets--;
0304             }
0305 
0306             cp->cp_xmit_rm = rm;
0307         }
0308 
0309         /* The transport either sends the whole rdma or none of it */
0310         if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) {
0311             rm->m_final_op = &rm->rdma;
0312             /* The transport owns the mapped memory for now.
0313              * You can't unmap it while it's on the send queue
0314              */
0315             set_bit(RDS_MSG_MAPPED, &rm->m_flags);
0316             ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
0317             if (ret) {
0318                 clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
0319                 wake_up_interruptible(&rm->m_flush_wait);
0320                 break;
0321             }
0322             cp->cp_xmit_rdma_sent = 1;
0323 
0324         }
0325 
0326         if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) {
0327             rm->m_final_op = &rm->atomic;
0328             /* The transport owns the mapped memory for now.
0329              * You can't unmap it while it's on the send queue
0330              */
0331             set_bit(RDS_MSG_MAPPED, &rm->m_flags);
0332             ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
0333             if (ret) {
0334                 clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
0335                 wake_up_interruptible(&rm->m_flush_wait);
0336                 break;
0337             }
0338             cp->cp_xmit_atomic_sent = 1;
0339 
0340         }
0341 
0342         /*
0343          * A number of cases require an RDS header to be sent
0344          * even if there is no data.
0345          * We permit 0-byte sends; rds-ping depends on this.
0346          * However, if there are exclusively attached silent ops,
0347          * we skip the hdr/data send, to enable silent operation.
0348          */
0349         if (rm->data.op_nents == 0) {
0350             int ops_present;
0351             int all_ops_are_silent = 1;
0352 
0353             ops_present = (rm->atomic.op_active || rm->rdma.op_active);
0354             if (rm->atomic.op_active && !rm->atomic.op_silent)
0355                 all_ops_are_silent = 0;
0356             if (rm->rdma.op_active && !rm->rdma.op_silent)
0357                 all_ops_are_silent = 0;
0358 
0359             if (ops_present && all_ops_are_silent
0360                 && !rm->m_rdma_cookie)
0361                 rm->data.op_active = 0;
0362         }
0363 
0364         if (rm->data.op_active && !cp->cp_xmit_data_sent) {
0365             rm->m_final_op = &rm->data;
0366 
0367             ret = conn->c_trans->xmit(conn, rm,
0368                           cp->cp_xmit_hdr_off,
0369                           cp->cp_xmit_sg,
0370                           cp->cp_xmit_data_off);
0371             if (ret <= 0)
0372                 break;
0373 
0374             if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) {
0375                 tmp = min_t(int, ret,
0376                         sizeof(struct rds_header) -
0377                         cp->cp_xmit_hdr_off);
0378                 cp->cp_xmit_hdr_off += tmp;
0379                 ret -= tmp;
0380             }
0381 
0382             sg = &rm->data.op_sg[cp->cp_xmit_sg];
0383             while (ret) {
0384                 tmp = min_t(int, ret, sg->length -
0385                               cp->cp_xmit_data_off);
0386                 cp->cp_xmit_data_off += tmp;
0387                 ret -= tmp;
0388                 if (cp->cp_xmit_data_off == sg->length) {
0389                     cp->cp_xmit_data_off = 0;
0390                     sg++;
0391                     cp->cp_xmit_sg++;
0392                     BUG_ON(ret != 0 && cp->cp_xmit_sg ==
0393                            rm->data.op_nents);
0394                 }
0395             }
0396 
0397             if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) &&
0398                 (cp->cp_xmit_sg == rm->data.op_nents))
0399                 cp->cp_xmit_data_sent = 1;
0400         }
0401 
0402         /*
0403          * A rm will only take multiple times through this loop
0404          * if there is a data op. Thus, if the data is sent (or there was
0405          * none), then we're done with the rm.
0406          */
0407         if (!rm->data.op_active || cp->cp_xmit_data_sent) {
0408             cp->cp_xmit_rm = NULL;
0409             cp->cp_xmit_sg = 0;
0410             cp->cp_xmit_hdr_off = 0;
0411             cp->cp_xmit_data_off = 0;
0412             cp->cp_xmit_rdma_sent = 0;
0413             cp->cp_xmit_atomic_sent = 0;
0414             cp->cp_xmit_data_sent = 0;
0415 
0416             rds_message_put(rm);
0417         }
0418     }
0419 
0420 over_batch:
0421     if (conn->c_trans->xmit_path_complete)
0422         conn->c_trans->xmit_path_complete(cp);
0423     release_in_xmit(cp);
0424 
0425     /* Nuke any messages we decided not to retransmit. */
0426     if (!list_empty(&to_be_dropped)) {
0427         /* irqs on here, so we can put(), unlike above */
0428         list_for_each_entry(rm, &to_be_dropped, m_conn_item)
0429             rds_message_put(rm);
0430         rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
0431     }
0432 
0433     /*
0434      * Other senders can queue a message after we last test the send queue
0435      * but before we clear RDS_IN_XMIT.  In that case they'd back off and
0436      * not try and send their newly queued message.  We need to check the
0437      * send queue after having cleared RDS_IN_XMIT so that their message
0438      * doesn't get stuck on the send queue.
0439      *
0440      * If the transport cannot continue (i.e ret != 0), then it must
0441      * call us when more room is available, such as from the tx
0442      * completion handler.
0443      *
0444      * We have an extra generation check here so that if someone manages
0445      * to jump in after our release_in_xmit, we'll see that they have done
0446      * some work and we will skip our goto
0447      */
0448     if (ret == 0) {
0449         bool raced;
0450 
0451         smp_mb();
0452         raced = send_gen != READ_ONCE(cp->cp_send_gen);
0453 
0454         if ((test_bit(0, &conn->c_map_queued) ||
0455             !list_empty(&cp->cp_send_queue)) && !raced) {
0456             if (batch_count < send_batch_count)
0457                 goto restart;
0458             rcu_read_lock();
0459             if (rds_destroy_pending(cp->cp_conn))
0460                 ret = -ENETUNREACH;
0461             else
0462                 queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
0463             rcu_read_unlock();
0464         } else if (raced) {
0465             rds_stats_inc(s_send_lock_queue_raced);
0466         }
0467     }
0468 out:
0469     return ret;
0470 }
0471 EXPORT_SYMBOL_GPL(rds_send_xmit);
0472 
0473 static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
0474 {
0475     u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
0476 
0477     assert_spin_locked(&rs->rs_lock);
0478 
0479     BUG_ON(rs->rs_snd_bytes < len);
0480     rs->rs_snd_bytes -= len;
0481 
0482     if (rs->rs_snd_bytes == 0)
0483         rds_stats_inc(s_send_queue_empty);
0484 }
0485 
0486 static inline int rds_send_is_acked(struct rds_message *rm, u64 ack,
0487                     is_acked_func is_acked)
0488 {
0489     if (is_acked)
0490         return is_acked(rm, ack);
0491     return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack;
0492 }
0493 
0494 /*
0495  * This is pretty similar to what happens below in the ACK
0496  * handling code - except that we call here as soon as we get
0497  * the IB send completion on the RDMA op and the accompanying
0498  * message.
0499  */
0500 void rds_rdma_send_complete(struct rds_message *rm, int status)
0501 {
0502     struct rds_sock *rs = NULL;
0503     struct rm_rdma_op *ro;
0504     struct rds_notifier *notifier;
0505     unsigned long flags;
0506 
0507     spin_lock_irqsave(&rm->m_rs_lock, flags);
0508 
0509     ro = &rm->rdma;
0510     if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
0511         ro->op_active && ro->op_notify && ro->op_notifier) {
0512         notifier = ro->op_notifier;
0513         rs = rm->m_rs;
0514         sock_hold(rds_rs_to_sk(rs));
0515 
0516         notifier->n_status = status;
0517         spin_lock(&rs->rs_lock);
0518         list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
0519         spin_unlock(&rs->rs_lock);
0520 
0521         ro->op_notifier = NULL;
0522     }
0523 
0524     spin_unlock_irqrestore(&rm->m_rs_lock, flags);
0525 
0526     if (rs) {
0527         rds_wake_sk_sleep(rs);
0528         sock_put(rds_rs_to_sk(rs));
0529     }
0530 }
0531 EXPORT_SYMBOL_GPL(rds_rdma_send_complete);
0532 
0533 /*
0534  * Just like above, except looks at atomic op
0535  */
0536 void rds_atomic_send_complete(struct rds_message *rm, int status)
0537 {
0538     struct rds_sock *rs = NULL;
0539     struct rm_atomic_op *ao;
0540     struct rds_notifier *notifier;
0541     unsigned long flags;
0542 
0543     spin_lock_irqsave(&rm->m_rs_lock, flags);
0544 
0545     ao = &rm->atomic;
0546     if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
0547         && ao->op_active && ao->op_notify && ao->op_notifier) {
0548         notifier = ao->op_notifier;
0549         rs = rm->m_rs;
0550         sock_hold(rds_rs_to_sk(rs));
0551 
0552         notifier->n_status = status;
0553         spin_lock(&rs->rs_lock);
0554         list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
0555         spin_unlock(&rs->rs_lock);
0556 
0557         ao->op_notifier = NULL;
0558     }
0559 
0560     spin_unlock_irqrestore(&rm->m_rs_lock, flags);
0561 
0562     if (rs) {
0563         rds_wake_sk_sleep(rs);
0564         sock_put(rds_rs_to_sk(rs));
0565     }
0566 }
0567 EXPORT_SYMBOL_GPL(rds_atomic_send_complete);
0568 
0569 /*
0570  * This is the same as rds_rdma_send_complete except we
0571  * don't do any locking - we have all the ingredients (message,
0572  * socket, socket lock) and can just move the notifier.
0573  */
0574 static inline void
0575 __rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status)
0576 {
0577     struct rm_rdma_op *ro;
0578     struct rm_atomic_op *ao;
0579 
0580     ro = &rm->rdma;
0581     if (ro->op_active && ro->op_notify && ro->op_notifier) {
0582         ro->op_notifier->n_status = status;
0583         list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue);
0584         ro->op_notifier = NULL;
0585     }
0586 
0587     ao = &rm->atomic;
0588     if (ao->op_active && ao->op_notify && ao->op_notifier) {
0589         ao->op_notifier->n_status = status;
0590         list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue);
0591         ao->op_notifier = NULL;
0592     }
0593 
0594     /* No need to wake the app - caller does this */
0595 }
0596 
0597 /*
0598  * This removes messages from the socket's list if they're on it.  The list
0599  * argument must be private to the caller, we must be able to modify it
0600  * without locks.  The messages must have a reference held for their
0601  * position on the list.  This function will drop that reference after
0602  * removing the messages from the 'messages' list regardless of if it found
0603  * the messages on the socket list or not.
0604  */
0605 static void rds_send_remove_from_sock(struct list_head *messages, int status)
0606 {
0607     unsigned long flags;
0608     struct rds_sock *rs = NULL;
0609     struct rds_message *rm;
0610 
0611     while (!list_empty(messages)) {
0612         int was_on_sock = 0;
0613 
0614         rm = list_entry(messages->next, struct rds_message,
0615                 m_conn_item);
0616         list_del_init(&rm->m_conn_item);
0617 
0618         /*
0619          * If we see this flag cleared then we're *sure* that someone
0620          * else beat us to removing it from the sock.  If we race
0621          * with their flag update we'll get the lock and then really
0622          * see that the flag has been cleared.
0623          *
0624          * The message spinlock makes sure nobody clears rm->m_rs
0625          * while we're messing with it. It does not prevent the
0626          * message from being removed from the socket, though.
0627          */
0628         spin_lock_irqsave(&rm->m_rs_lock, flags);
0629         if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags))
0630             goto unlock_and_drop;
0631 
0632         if (rs != rm->m_rs) {
0633             if (rs) {
0634                 rds_wake_sk_sleep(rs);
0635                 sock_put(rds_rs_to_sk(rs));
0636             }
0637             rs = rm->m_rs;
0638             if (rs)
0639                 sock_hold(rds_rs_to_sk(rs));
0640         }
0641         if (!rs)
0642             goto unlock_and_drop;
0643         spin_lock(&rs->rs_lock);
0644 
0645         if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) {
0646             struct rm_rdma_op *ro = &rm->rdma;
0647             struct rds_notifier *notifier;
0648 
0649             list_del_init(&rm->m_sock_item);
0650             rds_send_sndbuf_remove(rs, rm);
0651 
0652             if (ro->op_active && ro->op_notifier &&
0653                    (ro->op_notify || (ro->op_recverr && status))) {
0654                 notifier = ro->op_notifier;
0655                 list_add_tail(&notifier->n_list,
0656                         &rs->rs_notify_queue);
0657                 if (!notifier->n_status)
0658                     notifier->n_status = status;
0659                 rm->rdma.op_notifier = NULL;
0660             }
0661             was_on_sock = 1;
0662         }
0663         spin_unlock(&rs->rs_lock);
0664 
0665 unlock_and_drop:
0666         spin_unlock_irqrestore(&rm->m_rs_lock, flags);
0667         rds_message_put(rm);
0668         if (was_on_sock)
0669             rds_message_put(rm);
0670     }
0671 
0672     if (rs) {
0673         rds_wake_sk_sleep(rs);
0674         sock_put(rds_rs_to_sk(rs));
0675     }
0676 }
0677 
0678 /*
0679  * Transports call here when they've determined that the receiver queued
0680  * messages up to, and including, the given sequence number.  Messages are
0681  * moved to the retrans queue when rds_send_xmit picks them off the send
0682  * queue. This means that in the TCP case, the message may not have been
0683  * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked
0684  * checks the RDS_MSG_HAS_ACK_SEQ bit.
0685  */
0686 void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack,
0687                   is_acked_func is_acked)
0688 {
0689     struct rds_message *rm, *tmp;
0690     unsigned long flags;
0691     LIST_HEAD(list);
0692 
0693     spin_lock_irqsave(&cp->cp_lock, flags);
0694 
0695     list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
0696         if (!rds_send_is_acked(rm, ack, is_acked))
0697             break;
0698 
0699         list_move(&rm->m_conn_item, &list);
0700         clear_bit(RDS_MSG_ON_CONN, &rm->m_flags);
0701     }
0702 
0703     /* order flag updates with spin locks */
0704     if (!list_empty(&list))
0705         smp_mb__after_atomic();
0706 
0707     spin_unlock_irqrestore(&cp->cp_lock, flags);
0708 
0709     /* now remove the messages from the sock list as needed */
0710     rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
0711 }
0712 EXPORT_SYMBOL_GPL(rds_send_path_drop_acked);
0713 
0714 void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
0715              is_acked_func is_acked)
0716 {
0717     WARN_ON(conn->c_trans->t_mp_capable);
0718     rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked);
0719 }
0720 EXPORT_SYMBOL_GPL(rds_send_drop_acked);
0721 
0722 void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
0723 {
0724     struct rds_message *rm, *tmp;
0725     struct rds_connection *conn;
0726     struct rds_conn_path *cp;
0727     unsigned long flags;
0728     LIST_HEAD(list);
0729 
0730     /* get all the messages we're dropping under the rs lock */
0731     spin_lock_irqsave(&rs->rs_lock, flags);
0732 
0733     list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) {
0734         if (dest &&
0735             (!ipv6_addr_equal(&dest->sin6_addr, &rm->m_daddr) ||
0736              dest->sin6_port != rm->m_inc.i_hdr.h_dport))
0737             continue;
0738 
0739         list_move(&rm->m_sock_item, &list);
0740         rds_send_sndbuf_remove(rs, rm);
0741         clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
0742     }
0743 
0744     /* order flag updates with the rs lock */
0745     smp_mb__after_atomic();
0746 
0747     spin_unlock_irqrestore(&rs->rs_lock, flags);
0748 
0749     if (list_empty(&list))
0750         return;
0751 
0752     /* Remove the messages from the conn */
0753     list_for_each_entry(rm, &list, m_sock_item) {
0754 
0755         conn = rm->m_inc.i_conn;
0756         if (conn->c_trans->t_mp_capable)
0757             cp = rm->m_inc.i_conn_path;
0758         else
0759             cp = &conn->c_path[0];
0760 
0761         spin_lock_irqsave(&cp->cp_lock, flags);
0762         /*
0763          * Maybe someone else beat us to removing rm from the conn.
0764          * If we race with their flag update we'll get the lock and
0765          * then really see that the flag has been cleared.
0766          */
0767         if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
0768             spin_unlock_irqrestore(&cp->cp_lock, flags);
0769             continue;
0770         }
0771         list_del_init(&rm->m_conn_item);
0772         spin_unlock_irqrestore(&cp->cp_lock, flags);
0773 
0774         /*
0775          * Couldn't grab m_rs_lock in top loop (lock ordering),
0776          * but we can now.
0777          */
0778         spin_lock_irqsave(&rm->m_rs_lock, flags);
0779 
0780         spin_lock(&rs->rs_lock);
0781         __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
0782         spin_unlock(&rs->rs_lock);
0783 
0784         spin_unlock_irqrestore(&rm->m_rs_lock, flags);
0785 
0786         rds_message_put(rm);
0787     }
0788 
0789     rds_wake_sk_sleep(rs);
0790 
0791     while (!list_empty(&list)) {
0792         rm = list_entry(list.next, struct rds_message, m_sock_item);
0793         list_del_init(&rm->m_sock_item);
0794         rds_message_wait(rm);
0795 
0796         /* just in case the code above skipped this message
0797          * because RDS_MSG_ON_CONN wasn't set, run it again here
0798          * taking m_rs_lock is the only thing that keeps us
0799          * from racing with ack processing.
0800          */
0801         spin_lock_irqsave(&rm->m_rs_lock, flags);
0802 
0803         spin_lock(&rs->rs_lock);
0804         __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
0805         spin_unlock(&rs->rs_lock);
0806 
0807         spin_unlock_irqrestore(&rm->m_rs_lock, flags);
0808 
0809         rds_message_put(rm);
0810     }
0811 }
0812 
0813 /*
0814  * we only want this to fire once so we use the callers 'queued'.  It's
0815  * possible that another thread can race with us and remove the
0816  * message from the flow with RDS_CANCEL_SENT_TO.
0817  */
0818 static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
0819                  struct rds_conn_path *cp,
0820                  struct rds_message *rm, __be16 sport,
0821                  __be16 dport, int *queued)
0822 {
0823     unsigned long flags;
0824     u32 len;
0825 
0826     if (*queued)
0827         goto out;
0828 
0829     len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
0830 
0831     /* this is the only place which holds both the socket's rs_lock
0832      * and the connection's c_lock */
0833     spin_lock_irqsave(&rs->rs_lock, flags);
0834 
0835     /*
0836      * If there is a little space in sndbuf, we don't queue anything,
0837      * and userspace gets -EAGAIN. But poll() indicates there's send
0838      * room. This can lead to bad behavior (spinning) if snd_bytes isn't
0839      * freed up by incoming acks. So we check the *old* value of
0840      * rs_snd_bytes here to allow the last msg to exceed the buffer,
0841      * and poll() now knows no more data can be sent.
0842      */
0843     if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) {
0844         rs->rs_snd_bytes += len;
0845 
0846         /* let recv side know we are close to send space exhaustion.
0847          * This is probably not the optimal way to do it, as this
0848          * means we set the flag on *all* messages as soon as our
0849          * throughput hits a certain threshold.
0850          */
0851         if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2)
0852             set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
0853 
0854         list_add_tail(&rm->m_sock_item, &rs->rs_send_queue);
0855         set_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
0856         rds_message_addref(rm);
0857         sock_hold(rds_rs_to_sk(rs));
0858         rm->m_rs = rs;
0859 
0860         /* The code ordering is a little weird, but we're
0861            trying to minimize the time we hold c_lock */
0862         rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0);
0863         rm->m_inc.i_conn = conn;
0864         rm->m_inc.i_conn_path = cp;
0865         rds_message_addref(rm);
0866 
0867         spin_lock(&cp->cp_lock);
0868         rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++);
0869         list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
0870         set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
0871         spin_unlock(&cp->cp_lock);
0872 
0873         rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n",
0874              rm, len, rs, rs->rs_snd_bytes,
0875              (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence));
0876 
0877         *queued = 1;
0878     }
0879 
0880     spin_unlock_irqrestore(&rs->rs_lock, flags);
0881 out:
0882     return *queued;
0883 }
0884 
0885 /*
0886  * rds_message is getting to be quite complicated, and we'd like to allocate
0887  * it all in one go. This figures out how big it needs to be up front.
0888  */
0889 static int rds_rm_size(struct msghdr *msg, int num_sgs,
0890                struct rds_iov_vector_arr *vct)
0891 {
0892     struct cmsghdr *cmsg;
0893     int size = 0;
0894     int cmsg_groups = 0;
0895     int retval;
0896     bool zcopy_cookie = false;
0897     struct rds_iov_vector *iov, *tmp_iov;
0898 
0899     if (num_sgs < 0)
0900         return -EINVAL;
0901 
0902     for_each_cmsghdr(cmsg, msg) {
0903         if (!CMSG_OK(msg, cmsg))
0904             return -EINVAL;
0905 
0906         if (cmsg->cmsg_level != SOL_RDS)
0907             continue;
0908 
0909         switch (cmsg->cmsg_type) {
0910         case RDS_CMSG_RDMA_ARGS:
0911             if (vct->indx >= vct->len) {
0912                 vct->len += vct->incr;
0913                 tmp_iov =
0914                     krealloc(vct->vec,
0915                          vct->len *
0916                          sizeof(struct rds_iov_vector),
0917                          GFP_KERNEL);
0918                 if (!tmp_iov) {
0919                     vct->len -= vct->incr;
0920                     return -ENOMEM;
0921                 }
0922                 vct->vec = tmp_iov;
0923             }
0924             iov = &vct->vec[vct->indx];
0925             memset(iov, 0, sizeof(struct rds_iov_vector));
0926             vct->indx++;
0927             cmsg_groups |= 1;
0928             retval = rds_rdma_extra_size(CMSG_DATA(cmsg), iov);
0929             if (retval < 0)
0930                 return retval;
0931             size += retval;
0932 
0933             break;
0934 
0935         case RDS_CMSG_ZCOPY_COOKIE:
0936             zcopy_cookie = true;
0937             fallthrough;
0938 
0939         case RDS_CMSG_RDMA_DEST:
0940         case RDS_CMSG_RDMA_MAP:
0941             cmsg_groups |= 2;
0942             /* these are valid but do no add any size */
0943             break;
0944 
0945         case RDS_CMSG_ATOMIC_CSWP:
0946         case RDS_CMSG_ATOMIC_FADD:
0947         case RDS_CMSG_MASKED_ATOMIC_CSWP:
0948         case RDS_CMSG_MASKED_ATOMIC_FADD:
0949             cmsg_groups |= 1;
0950             size += sizeof(struct scatterlist);
0951             break;
0952 
0953         default:
0954             return -EINVAL;
0955         }
0956 
0957     }
0958 
0959     if ((msg->msg_flags & MSG_ZEROCOPY) && !zcopy_cookie)
0960         return -EINVAL;
0961 
0962     size += num_sgs * sizeof(struct scatterlist);
0963 
0964     /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */
0965     if (cmsg_groups == 3)
0966         return -EINVAL;
0967 
0968     return size;
0969 }
0970 
0971 static int rds_cmsg_zcopy(struct rds_sock *rs, struct rds_message *rm,
0972               struct cmsghdr *cmsg)
0973 {
0974     u32 *cookie;
0975 
0976     if (cmsg->cmsg_len < CMSG_LEN(sizeof(*cookie)) ||
0977         !rm->data.op_mmp_znotifier)
0978         return -EINVAL;
0979     cookie = CMSG_DATA(cmsg);
0980     rm->data.op_mmp_znotifier->z_cookie = *cookie;
0981     return 0;
0982 }
0983 
0984 static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
0985              struct msghdr *msg, int *allocated_mr,
0986              struct rds_iov_vector_arr *vct)
0987 {
0988     struct cmsghdr *cmsg;
0989     int ret = 0, ind = 0;
0990 
0991     for_each_cmsghdr(cmsg, msg) {
0992         if (!CMSG_OK(msg, cmsg))
0993             return -EINVAL;
0994 
0995         if (cmsg->cmsg_level != SOL_RDS)
0996             continue;
0997 
0998         /* As a side effect, RDMA_DEST and RDMA_MAP will set
0999          * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr.
1000          */
1001         switch (cmsg->cmsg_type) {
1002         case RDS_CMSG_RDMA_ARGS:
1003             if (ind >= vct->indx)
1004                 return -ENOMEM;
1005             ret = rds_cmsg_rdma_args(rs, rm, cmsg, &vct->vec[ind]);
1006             ind++;
1007             break;
1008 
1009         case RDS_CMSG_RDMA_DEST:
1010             ret = rds_cmsg_rdma_dest(rs, rm, cmsg);
1011             break;
1012 
1013         case RDS_CMSG_RDMA_MAP:
1014             ret = rds_cmsg_rdma_map(rs, rm, cmsg);
1015             if (!ret)
1016                 *allocated_mr = 1;
1017             else if (ret == -ENODEV)
1018                 /* Accommodate the get_mr() case which can fail
1019                  * if connection isn't established yet.
1020                  */
1021                 ret = -EAGAIN;
1022             break;
1023         case RDS_CMSG_ATOMIC_CSWP:
1024         case RDS_CMSG_ATOMIC_FADD:
1025         case RDS_CMSG_MASKED_ATOMIC_CSWP:
1026         case RDS_CMSG_MASKED_ATOMIC_FADD:
1027             ret = rds_cmsg_atomic(rs, rm, cmsg);
1028             break;
1029 
1030         case RDS_CMSG_ZCOPY_COOKIE:
1031             ret = rds_cmsg_zcopy(rs, rm, cmsg);
1032             break;
1033 
1034         default:
1035             return -EINVAL;
1036         }
1037 
1038         if (ret)
1039             break;
1040     }
1041 
1042     return ret;
1043 }
1044 
1045 static int rds_send_mprds_hash(struct rds_sock *rs,
1046                    struct rds_connection *conn, int nonblock)
1047 {
1048     int hash;
1049 
1050     if (conn->c_npaths == 0)
1051         hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
1052     else
1053         hash = RDS_MPATH_HASH(rs, conn->c_npaths);
1054     if (conn->c_npaths == 0 && hash != 0) {
1055         rds_send_ping(conn, 0);
1056 
1057         /* The underlying connection is not up yet.  Need to wait
1058          * until it is up to be sure that the non-zero c_path can be
1059          * used.  But if we are interrupted, we have to use the zero
1060          * c_path in case the connection ends up being non-MP capable.
1061          */
1062         if (conn->c_npaths == 0) {
1063             /* Cannot wait for the connection be made, so just use
1064              * the base c_path.
1065              */
1066             if (nonblock)
1067                 return 0;
1068             if (wait_event_interruptible(conn->c_hs_waitq,
1069                              conn->c_npaths != 0))
1070                 hash = 0;
1071         }
1072         if (conn->c_npaths == 1)
1073             hash = 0;
1074     }
1075     return hash;
1076 }
1077 
1078 static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes)
1079 {
1080     struct rds_rdma_args *args;
1081     struct cmsghdr *cmsg;
1082 
1083     for_each_cmsghdr(cmsg, msg) {
1084         if (!CMSG_OK(msg, cmsg))
1085             return -EINVAL;
1086 
1087         if (cmsg->cmsg_level != SOL_RDS)
1088             continue;
1089 
1090         if (cmsg->cmsg_type == RDS_CMSG_RDMA_ARGS) {
1091             if (cmsg->cmsg_len <
1092                 CMSG_LEN(sizeof(struct rds_rdma_args)))
1093                 return -EINVAL;
1094             args = CMSG_DATA(cmsg);
1095             *rdma_bytes += args->remote_vec.bytes;
1096         }
1097     }
1098     return 0;
1099 }
1100 
1101 int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
1102 {
1103     struct sock *sk = sock->sk;
1104     struct rds_sock *rs = rds_sk_to_rs(sk);
1105     DECLARE_SOCKADDR(struct sockaddr_in6 *, sin6, msg->msg_name);
1106     DECLARE_SOCKADDR(struct sockaddr_in *, usin, msg->msg_name);
1107     __be16 dport;
1108     struct rds_message *rm = NULL;
1109     struct rds_connection *conn;
1110     int ret = 0;
1111     int queued = 0, allocated_mr = 0;
1112     int nonblock = msg->msg_flags & MSG_DONTWAIT;
1113     long timeo = sock_sndtimeo(sk, nonblock);
1114     struct rds_conn_path *cpath;
1115     struct in6_addr daddr;
1116     __u32 scope_id = 0;
1117     size_t total_payload_len = payload_len, rdma_payload_len = 0;
1118     bool zcopy = ((msg->msg_flags & MSG_ZEROCOPY) &&
1119               sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY));
1120     int num_sgs = DIV_ROUND_UP(payload_len, PAGE_SIZE);
1121     int namelen;
1122     struct rds_iov_vector_arr vct;
1123     int ind;
1124 
1125     memset(&vct, 0, sizeof(vct));
1126 
1127     /* expect 1 RDMA CMSG per rds_sendmsg. can still grow if more needed. */
1128     vct.incr = 1;
1129 
1130     /* Mirror Linux UDP mirror of BSD error message compatibility */
1131     /* XXX: Perhaps MSG_MORE someday */
1132     if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT | MSG_ZEROCOPY)) {
1133         ret = -EOPNOTSUPP;
1134         goto out;
1135     }
1136 
1137     namelen = msg->msg_namelen;
1138     if (namelen != 0) {
1139         if (namelen < sizeof(*usin)) {
1140             ret = -EINVAL;
1141             goto out;
1142         }
1143         switch (usin->sin_family) {
1144         case AF_INET:
1145             if (usin->sin_addr.s_addr == htonl(INADDR_ANY) ||
1146                 usin->sin_addr.s_addr == htonl(INADDR_BROADCAST) ||
1147                 ipv4_is_multicast(usin->sin_addr.s_addr)) {
1148                 ret = -EINVAL;
1149                 goto out;
1150             }
1151             ipv6_addr_set_v4mapped(usin->sin_addr.s_addr, &daddr);
1152             dport = usin->sin_port;
1153             break;
1154 
1155 #if IS_ENABLED(CONFIG_IPV6)
1156         case AF_INET6: {
1157             int addr_type;
1158 
1159             if (namelen < sizeof(*sin6)) {
1160                 ret = -EINVAL;
1161                 goto out;
1162             }
1163             addr_type = ipv6_addr_type(&sin6->sin6_addr);
1164             if (!(addr_type & IPV6_ADDR_UNICAST)) {
1165                 __be32 addr4;
1166 
1167                 if (!(addr_type & IPV6_ADDR_MAPPED)) {
1168                     ret = -EINVAL;
1169                     goto out;
1170                 }
1171 
1172                 /* It is a mapped address.  Need to do some
1173                  * sanity checks.
1174                  */
1175                 addr4 = sin6->sin6_addr.s6_addr32[3];
1176                 if (addr4 == htonl(INADDR_ANY) ||
1177                     addr4 == htonl(INADDR_BROADCAST) ||
1178                     ipv4_is_multicast(addr4)) {
1179                     ret = -EINVAL;
1180                     goto out;
1181                 }
1182             }
1183             if (addr_type & IPV6_ADDR_LINKLOCAL) {
1184                 if (sin6->sin6_scope_id == 0) {
1185                     ret = -EINVAL;
1186                     goto out;
1187                 }
1188                 scope_id = sin6->sin6_scope_id;
1189             }
1190 
1191             daddr = sin6->sin6_addr;
1192             dport = sin6->sin6_port;
1193             break;
1194         }
1195 #endif
1196 
1197         default:
1198             ret = -EINVAL;
1199             goto out;
1200         }
1201     } else {
1202         /* We only care about consistency with ->connect() */
1203         lock_sock(sk);
1204         daddr = rs->rs_conn_addr;
1205         dport = rs->rs_conn_port;
1206         scope_id = rs->rs_bound_scope_id;
1207         release_sock(sk);
1208     }
1209 
1210     lock_sock(sk);
1211     if (ipv6_addr_any(&rs->rs_bound_addr) || ipv6_addr_any(&daddr)) {
1212         release_sock(sk);
1213         ret = -ENOTCONN;
1214         goto out;
1215     } else if (namelen != 0) {
1216         /* Cannot send to an IPv4 address using an IPv6 source
1217          * address and cannot send to an IPv6 address using an
1218          * IPv4 source address.
1219          */
1220         if (ipv6_addr_v4mapped(&daddr) ^
1221             ipv6_addr_v4mapped(&rs->rs_bound_addr)) {
1222             release_sock(sk);
1223             ret = -EOPNOTSUPP;
1224             goto out;
1225         }
1226         /* If the socket is already bound to a link local address,
1227          * it can only send to peers on the same link.  But allow
1228          * communicating between link local and non-link local address.
1229          */
1230         if (scope_id != rs->rs_bound_scope_id) {
1231             if (!scope_id) {
1232                 scope_id = rs->rs_bound_scope_id;
1233             } else if (rs->rs_bound_scope_id) {
1234                 release_sock(sk);
1235                 ret = -EINVAL;
1236                 goto out;
1237             }
1238         }
1239     }
1240     release_sock(sk);
1241 
1242     ret = rds_rdma_bytes(msg, &rdma_payload_len);
1243     if (ret)
1244         goto out;
1245 
1246     total_payload_len += rdma_payload_len;
1247     if (max_t(size_t, payload_len, rdma_payload_len) > RDS_MAX_MSG_SIZE) {
1248         ret = -EMSGSIZE;
1249         goto out;
1250     }
1251 
1252     if (payload_len > rds_sk_sndbuf(rs)) {
1253         ret = -EMSGSIZE;
1254         goto out;
1255     }
1256 
1257     if (zcopy) {
1258         if (rs->rs_transport->t_type != RDS_TRANS_TCP) {
1259             ret = -EOPNOTSUPP;
1260             goto out;
1261         }
1262         num_sgs = iov_iter_npages(&msg->msg_iter, INT_MAX);
1263     }
1264     /* size of rm including all sgs */
1265     ret = rds_rm_size(msg, num_sgs, &vct);
1266     if (ret < 0)
1267         goto out;
1268 
1269     rm = rds_message_alloc(ret, GFP_KERNEL);
1270     if (!rm) {
1271         ret = -ENOMEM;
1272         goto out;
1273     }
1274 
1275     /* Attach data to the rm */
1276     if (payload_len) {
1277         rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs);
1278         if (IS_ERR(rm->data.op_sg)) {
1279             ret = PTR_ERR(rm->data.op_sg);
1280             goto out;
1281         }
1282         ret = rds_message_copy_from_user(rm, &msg->msg_iter, zcopy);
1283         if (ret)
1284             goto out;
1285     }
1286     rm->data.op_active = 1;
1287 
1288     rm->m_daddr = daddr;
1289 
1290     /* rds_conn_create has a spinlock that runs with IRQ off.
1291      * Caching the conn in the socket helps a lot. */
1292     if (rs->rs_conn && ipv6_addr_equal(&rs->rs_conn->c_faddr, &daddr) &&
1293         rs->rs_tos == rs->rs_conn->c_tos) {
1294         conn = rs->rs_conn;
1295     } else {
1296         conn = rds_conn_create_outgoing(sock_net(sock->sk),
1297                         &rs->rs_bound_addr, &daddr,
1298                         rs->rs_transport, rs->rs_tos,
1299                         sock->sk->sk_allocation,
1300                         scope_id);
1301         if (IS_ERR(conn)) {
1302             ret = PTR_ERR(conn);
1303             goto out;
1304         }
1305         rs->rs_conn = conn;
1306     }
1307 
1308     if (conn->c_trans->t_mp_capable)
1309         cpath = &conn->c_path[rds_send_mprds_hash(rs, conn, nonblock)];
1310     else
1311         cpath = &conn->c_path[0];
1312 
1313     rm->m_conn_path = cpath;
1314 
1315     /* Parse any control messages the user may have included. */
1316     ret = rds_cmsg_send(rs, rm, msg, &allocated_mr, &vct);
1317     if (ret) {
1318         /* Trigger connection so that its ready for the next retry */
1319         if (ret ==  -EAGAIN)
1320             rds_conn_connect_if_down(conn);
1321         goto out;
1322     }
1323 
1324     if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
1325         printk_ratelimited(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
1326                    &rm->rdma, conn->c_trans->xmit_rdma);
1327         ret = -EOPNOTSUPP;
1328         goto out;
1329     }
1330 
1331     if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) {
1332         printk_ratelimited(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n",
1333                    &rm->atomic, conn->c_trans->xmit_atomic);
1334         ret = -EOPNOTSUPP;
1335         goto out;
1336     }
1337 
1338     if (rds_destroy_pending(conn)) {
1339         ret = -EAGAIN;
1340         goto out;
1341     }
1342 
1343     if (rds_conn_path_down(cpath))
1344         rds_check_all_paths(conn);
1345 
1346     ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
1347     if (ret) {
1348         rs->rs_seen_congestion = 1;
1349         goto out;
1350     }
1351     while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port,
1352                   dport, &queued)) {
1353         rds_stats_inc(s_send_queue_full);
1354 
1355         if (nonblock) {
1356             ret = -EAGAIN;
1357             goto out;
1358         }
1359 
1360         timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
1361                     rds_send_queue_rm(rs, conn, cpath, rm,
1362                               rs->rs_bound_port,
1363                               dport,
1364                               &queued),
1365                     timeo);
1366         rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo);
1367         if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
1368             continue;
1369 
1370         ret = timeo;
1371         if (ret == 0)
1372             ret = -ETIMEDOUT;
1373         goto out;
1374     }
1375 
1376     /*
1377      * By now we've committed to the send.  We reuse rds_send_worker()
1378      * to retry sends in the rds thread if the transport asks us to.
1379      */
1380     rds_stats_inc(s_send_queued);
1381 
1382     ret = rds_send_xmit(cpath);
1383     if (ret == -ENOMEM || ret == -EAGAIN) {
1384         ret = 0;
1385         rcu_read_lock();
1386         if (rds_destroy_pending(cpath->cp_conn))
1387             ret = -ENETUNREACH;
1388         else
1389             queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
1390         rcu_read_unlock();
1391     }
1392     if (ret)
1393         goto out;
1394     rds_message_put(rm);
1395 
1396     for (ind = 0; ind < vct.indx; ind++)
1397         kfree(vct.vec[ind].iov);
1398     kfree(vct.vec);
1399 
1400     return payload_len;
1401 
1402 out:
1403     for (ind = 0; ind < vct.indx; ind++)
1404         kfree(vct.vec[ind].iov);
1405     kfree(vct.vec);
1406 
1407     /* If the user included a RDMA_MAP cmsg, we allocated a MR on the fly.
1408      * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN
1409      * or in any other way, we need to destroy the MR again */
1410     if (allocated_mr)
1411         rds_rdma_unuse(rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1);
1412 
1413     if (rm)
1414         rds_message_put(rm);
1415     return ret;
1416 }
1417 
1418 /*
1419  * send out a probe. Can be shared by rds_send_ping,
1420  * rds_send_pong, rds_send_hb.
1421  * rds_send_hb should use h_flags
1422  *   RDS_FLAG_HB_PING|RDS_FLAG_ACK_REQUIRED
1423  * or
1424  *   RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED
1425  */
1426 static int
1427 rds_send_probe(struct rds_conn_path *cp, __be16 sport,
1428            __be16 dport, u8 h_flags)
1429 {
1430     struct rds_message *rm;
1431     unsigned long flags;
1432     int ret = 0;
1433 
1434     rm = rds_message_alloc(0, GFP_ATOMIC);
1435     if (!rm) {
1436         ret = -ENOMEM;
1437         goto out;
1438     }
1439 
1440     rm->m_daddr = cp->cp_conn->c_faddr;
1441     rm->data.op_active = 1;
1442 
1443     rds_conn_path_connect_if_down(cp);
1444 
1445     ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL);
1446     if (ret)
1447         goto out;
1448 
1449     spin_lock_irqsave(&cp->cp_lock, flags);
1450     list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
1451     set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
1452     rds_message_addref(rm);
1453     rm->m_inc.i_conn = cp->cp_conn;
1454     rm->m_inc.i_conn_path = cp;
1455 
1456     rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport,
1457                     cp->cp_next_tx_seq);
1458     rm->m_inc.i_hdr.h_flags |= h_flags;
1459     cp->cp_next_tx_seq++;
1460 
1461     if (RDS_HS_PROBE(be16_to_cpu(sport), be16_to_cpu(dport)) &&
1462         cp->cp_conn->c_trans->t_mp_capable) {
1463         u16 npaths = cpu_to_be16(RDS_MPATH_WORKERS);
1464         u32 my_gen_num = cpu_to_be32(cp->cp_conn->c_my_gen_num);
1465 
1466         rds_message_add_extension(&rm->m_inc.i_hdr,
1467                       RDS_EXTHDR_NPATHS, &npaths,
1468                       sizeof(npaths));
1469         rds_message_add_extension(&rm->m_inc.i_hdr,
1470                       RDS_EXTHDR_GEN_NUM,
1471                       &my_gen_num,
1472                       sizeof(u32));
1473     }
1474     spin_unlock_irqrestore(&cp->cp_lock, flags);
1475 
1476     rds_stats_inc(s_send_queued);
1477     rds_stats_inc(s_send_pong);
1478 
1479     /* schedule the send work on rds_wq */
1480     rcu_read_lock();
1481     if (!rds_destroy_pending(cp->cp_conn))
1482         queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
1483     rcu_read_unlock();
1484 
1485     rds_message_put(rm);
1486     return 0;
1487 
1488 out:
1489     if (rm)
1490         rds_message_put(rm);
1491     return ret;
1492 }
1493 
1494 int
1495 rds_send_pong(struct rds_conn_path *cp, __be16 dport)
1496 {
1497     return rds_send_probe(cp, 0, dport, 0);
1498 }
1499 
1500 void
1501 rds_send_ping(struct rds_connection *conn, int cp_index)
1502 {
1503     unsigned long flags;
1504     struct rds_conn_path *cp = &conn->c_path[cp_index];
1505 
1506     spin_lock_irqsave(&cp->cp_lock, flags);
1507     if (conn->c_ping_triggered) {
1508         spin_unlock_irqrestore(&cp->cp_lock, flags);
1509         return;
1510     }
1511     conn->c_ping_triggered = 1;
1512     spin_unlock_irqrestore(&cp->cp_lock, flags);
1513     rds_send_probe(cp, cpu_to_be16(RDS_FLAG_PROBE_PORT), 0, 0);
1514 }
1515 EXPORT_SYMBOL_GPL(rds_send_ping);