Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
0003  *
0004  * This software is available to you under a choice of one of two
0005  * licenses.  You may choose to be licensed under the terms of the GNU
0006  * General Public License (GPL) Version 2, available from the file
0007  * COPYING in the main directory of this source tree, or the
0008  * OpenIB.org BSD license below:
0009  *
0010  *     Redistribution and use in source and binary forms, with or
0011  *     without modification, are permitted provided that the following
0012  *     conditions are met:
0013  *
0014  *      - Redistributions of source code must retain the above
0015  *        copyright notice, this list of conditions and the following
0016  *        disclaimer.
0017  *
0018  *      - Redistributions in binary form must reproduce the above
0019  *        copyright notice, this list of conditions and the following
0020  *        disclaimer in the documentation and/or other materials
0021  *        provided with the distribution.
0022  *
0023  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
0024  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
0025  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
0026  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
0027  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
0028  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
0029  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
0030  * SOFTWARE.
0031  *
0032  */
0033 #include <linux/kernel.h>
0034 #include <linux/slab.h>
0035 #include <net/sock.h>
0036 #include <linux/in.h>
0037 #include <linux/export.h>
0038 #include <linux/time.h>
0039 #include <linux/rds.h>
0040 
0041 #include "rds.h"
0042 
0043 void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
0044          struct in6_addr *saddr)
0045 {
0046     refcount_set(&inc->i_refcount, 1);
0047     INIT_LIST_HEAD(&inc->i_item);
0048     inc->i_conn = conn;
0049     inc->i_saddr = *saddr;
0050     inc->i_usercopy.rdma_cookie = 0;
0051     inc->i_usercopy.rx_tstamp = ktime_set(0, 0);
0052 
0053     memset(inc->i_rx_lat_trace, 0, sizeof(inc->i_rx_lat_trace));
0054 }
0055 EXPORT_SYMBOL_GPL(rds_inc_init);
0056 
0057 void rds_inc_path_init(struct rds_incoming *inc, struct rds_conn_path *cp,
0058                struct in6_addr  *saddr)
0059 {
0060     refcount_set(&inc->i_refcount, 1);
0061     INIT_LIST_HEAD(&inc->i_item);
0062     inc->i_conn = cp->cp_conn;
0063     inc->i_conn_path = cp;
0064     inc->i_saddr = *saddr;
0065     inc->i_usercopy.rdma_cookie = 0;
0066     inc->i_usercopy.rx_tstamp = ktime_set(0, 0);
0067 }
0068 EXPORT_SYMBOL_GPL(rds_inc_path_init);
0069 
0070 static void rds_inc_addref(struct rds_incoming *inc)
0071 {
0072     rdsdebug("addref inc %p ref %d\n", inc, refcount_read(&inc->i_refcount));
0073     refcount_inc(&inc->i_refcount);
0074 }
0075 
0076 void rds_inc_put(struct rds_incoming *inc)
0077 {
0078     rdsdebug("put inc %p ref %d\n", inc, refcount_read(&inc->i_refcount));
0079     if (refcount_dec_and_test(&inc->i_refcount)) {
0080         BUG_ON(!list_empty(&inc->i_item));
0081 
0082         inc->i_conn->c_trans->inc_free(inc);
0083     }
0084 }
0085 EXPORT_SYMBOL_GPL(rds_inc_put);
0086 
0087 static void rds_recv_rcvbuf_delta(struct rds_sock *rs, struct sock *sk,
0088                   struct rds_cong_map *map,
0089                   int delta, __be16 port)
0090 {
0091     int now_congested;
0092 
0093     if (delta == 0)
0094         return;
0095 
0096     rs->rs_rcv_bytes += delta;
0097     if (delta > 0)
0098         rds_stats_add(s_recv_bytes_added_to_socket, delta);
0099     else
0100         rds_stats_add(s_recv_bytes_removed_from_socket, -delta);
0101 
0102     /* loop transport doesn't send/recv congestion updates */
0103     if (rs->rs_transport->t_type == RDS_TRANS_LOOP)
0104         return;
0105 
0106     now_congested = rs->rs_rcv_bytes > rds_sk_rcvbuf(rs);
0107 
0108     rdsdebug("rs %p (%pI6c:%u) recv bytes %d buf %d "
0109       "now_cong %d delta %d\n",
0110       rs, &rs->rs_bound_addr,
0111       ntohs(rs->rs_bound_port), rs->rs_rcv_bytes,
0112       rds_sk_rcvbuf(rs), now_congested, delta);
0113 
0114     /* wasn't -> am congested */
0115     if (!rs->rs_congested && now_congested) {
0116         rs->rs_congested = 1;
0117         rds_cong_set_bit(map, port);
0118         rds_cong_queue_updates(map);
0119     }
0120     /* was -> aren't congested */
0121     /* Require more free space before reporting uncongested to prevent
0122        bouncing cong/uncong state too often */
0123     else if (rs->rs_congested && (rs->rs_rcv_bytes < (rds_sk_rcvbuf(rs)/2))) {
0124         rs->rs_congested = 0;
0125         rds_cong_clear_bit(map, port);
0126         rds_cong_queue_updates(map);
0127     }
0128 
0129     /* do nothing if no change in cong state */
0130 }
0131 
0132 static void rds_conn_peer_gen_update(struct rds_connection *conn,
0133                      u32 peer_gen_num)
0134 {
0135     int i;
0136     struct rds_message *rm, *tmp;
0137     unsigned long flags;
0138 
0139     WARN_ON(conn->c_trans->t_type != RDS_TRANS_TCP);
0140     if (peer_gen_num != 0) {
0141         if (conn->c_peer_gen_num != 0 &&
0142             peer_gen_num != conn->c_peer_gen_num) {
0143             for (i = 0; i < RDS_MPATH_WORKERS; i++) {
0144                 struct rds_conn_path *cp;
0145 
0146                 cp = &conn->c_path[i];
0147                 spin_lock_irqsave(&cp->cp_lock, flags);
0148                 cp->cp_next_tx_seq = 1;
0149                 cp->cp_next_rx_seq = 0;
0150                 list_for_each_entry_safe(rm, tmp,
0151                              &cp->cp_retrans,
0152                              m_conn_item) {
0153                     set_bit(RDS_MSG_FLUSH, &rm->m_flags);
0154                 }
0155                 spin_unlock_irqrestore(&cp->cp_lock, flags);
0156             }
0157         }
0158         conn->c_peer_gen_num = peer_gen_num;
0159     }
0160 }
0161 
0162 /*
0163  * Process all extension headers that come with this message.
0164  */
0165 static void rds_recv_incoming_exthdrs(struct rds_incoming *inc, struct rds_sock *rs)
0166 {
0167     struct rds_header *hdr = &inc->i_hdr;
0168     unsigned int pos = 0, type, len;
0169     union {
0170         struct rds_ext_header_version version;
0171         struct rds_ext_header_rdma rdma;
0172         struct rds_ext_header_rdma_dest rdma_dest;
0173     } buffer;
0174 
0175     while (1) {
0176         len = sizeof(buffer);
0177         type = rds_message_next_extension(hdr, &pos, &buffer, &len);
0178         if (type == RDS_EXTHDR_NONE)
0179             break;
0180         /* Process extension header here */
0181         switch (type) {
0182         case RDS_EXTHDR_RDMA:
0183             rds_rdma_unuse(rs, be32_to_cpu(buffer.rdma.h_rdma_rkey), 0);
0184             break;
0185 
0186         case RDS_EXTHDR_RDMA_DEST:
0187             /* We ignore the size for now. We could stash it
0188              * somewhere and use it for error checking. */
0189             inc->i_usercopy.rdma_cookie = rds_rdma_make_cookie(
0190                     be32_to_cpu(buffer.rdma_dest.h_rdma_rkey),
0191                     be32_to_cpu(buffer.rdma_dest.h_rdma_offset));
0192 
0193             break;
0194         }
0195     }
0196 }
0197 
0198 static void rds_recv_hs_exthdrs(struct rds_header *hdr,
0199                 struct rds_connection *conn)
0200 {
0201     unsigned int pos = 0, type, len;
0202     union {
0203         struct rds_ext_header_version version;
0204         u16 rds_npaths;
0205         u32 rds_gen_num;
0206     } buffer;
0207     u32 new_peer_gen_num = 0;
0208 
0209     while (1) {
0210         len = sizeof(buffer);
0211         type = rds_message_next_extension(hdr, &pos, &buffer, &len);
0212         if (type == RDS_EXTHDR_NONE)
0213             break;
0214         /* Process extension header here */
0215         switch (type) {
0216         case RDS_EXTHDR_NPATHS:
0217             conn->c_npaths = min_t(int, RDS_MPATH_WORKERS,
0218                            be16_to_cpu(buffer.rds_npaths));
0219             break;
0220         case RDS_EXTHDR_GEN_NUM:
0221             new_peer_gen_num = be32_to_cpu(buffer.rds_gen_num);
0222             break;
0223         default:
0224             pr_warn_ratelimited("ignoring unknown exthdr type "
0225                          "0x%x\n", type);
0226         }
0227     }
0228     /* if RDS_EXTHDR_NPATHS was not found, default to a single-path */
0229     conn->c_npaths = max_t(int, conn->c_npaths, 1);
0230     conn->c_ping_triggered = 0;
0231     rds_conn_peer_gen_update(conn, new_peer_gen_num);
0232 }
0233 
0234 /* rds_start_mprds() will synchronously start multiple paths when appropriate.
0235  * The scheme is based on the following rules:
0236  *
0237  * 1. rds_sendmsg on first connect attempt sends the probe ping, with the
0238  *    sender's npaths (s_npaths)
0239  * 2. rcvr of probe-ping knows the mprds_paths = min(s_npaths, r_npaths). It
0240  *    sends back a probe-pong with r_npaths. After that, if rcvr is the
0241  *    smaller ip addr, it starts rds_conn_path_connect_if_down on all
0242  *    mprds_paths.
0243  * 3. sender gets woken up, and can move to rds_conn_path_connect_if_down.
0244  *    If it is the smaller ipaddr, rds_conn_path_connect_if_down can be
0245  *    called after reception of the probe-pong on all mprds_paths.
0246  *    Otherwise (sender of probe-ping is not the smaller ip addr): just call
0247  *    rds_conn_path_connect_if_down on the hashed path. (see rule 4)
0248  * 4. rds_connect_worker must only trigger a connection if laddr < faddr.
0249  * 5. sender may end up queuing the packet on the cp. will get sent out later.
0250  *    when connection is completed.
0251  */
0252 static void rds_start_mprds(struct rds_connection *conn)
0253 {
0254     int i;
0255     struct rds_conn_path *cp;
0256 
0257     if (conn->c_npaths > 1 &&
0258         rds_addr_cmp(&conn->c_laddr, &conn->c_faddr) < 0) {
0259         for (i = 0; i < conn->c_npaths; i++) {
0260             cp = &conn->c_path[i];
0261             rds_conn_path_connect_if_down(cp);
0262         }
0263     }
0264 }
0265 
0266 /*
0267  * The transport must make sure that this is serialized against other
0268  * rx and conn reset on this specific conn.
0269  *
0270  * We currently assert that only one fragmented message will be sent
0271  * down a connection at a time.  This lets us reassemble in the conn
0272  * instead of per-flow which means that we don't have to go digging through
0273  * flows to tear down partial reassembly progress on conn failure and
0274  * we save flow lookup and locking for each frag arrival.  It does mean
0275  * that small messages will wait behind large ones.  Fragmenting at all
0276  * is only to reduce the memory consumption of pre-posted buffers.
0277  *
0278  * The caller passes in saddr and daddr instead of us getting it from the
0279  * conn.  This lets loopback, who only has one conn for both directions,
0280  * tell us which roles the addrs in the conn are playing for this message.
0281  */
0282 void rds_recv_incoming(struct rds_connection *conn, struct in6_addr *saddr,
0283                struct in6_addr *daddr,
0284                struct rds_incoming *inc, gfp_t gfp)
0285 {
0286     struct rds_sock *rs = NULL;
0287     struct sock *sk;
0288     unsigned long flags;
0289     struct rds_conn_path *cp;
0290 
0291     inc->i_conn = conn;
0292     inc->i_rx_jiffies = jiffies;
0293     if (conn->c_trans->t_mp_capable)
0294         cp = inc->i_conn_path;
0295     else
0296         cp = &conn->c_path[0];
0297 
0298     rdsdebug("conn %p next %llu inc %p seq %llu len %u sport %u dport %u "
0299          "flags 0x%x rx_jiffies %lu\n", conn,
0300          (unsigned long long)cp->cp_next_rx_seq,
0301          inc,
0302          (unsigned long long)be64_to_cpu(inc->i_hdr.h_sequence),
0303          be32_to_cpu(inc->i_hdr.h_len),
0304          be16_to_cpu(inc->i_hdr.h_sport),
0305          be16_to_cpu(inc->i_hdr.h_dport),
0306          inc->i_hdr.h_flags,
0307          inc->i_rx_jiffies);
0308 
0309     /*
0310      * Sequence numbers should only increase.  Messages get their
0311      * sequence number as they're queued in a sending conn.  They
0312      * can be dropped, though, if the sending socket is closed before
0313      * they hit the wire.  So sequence numbers can skip forward
0314      * under normal operation.  They can also drop back in the conn
0315      * failover case as previously sent messages are resent down the
0316      * new instance of a conn.  We drop those, otherwise we have
0317      * to assume that the next valid seq does not come after a
0318      * hole in the fragment stream.
0319      *
0320      * The headers don't give us a way to realize if fragments of
0321      * a message have been dropped.  We assume that frags that arrive
0322      * to a flow are part of the current message on the flow that is
0323      * being reassembled.  This means that senders can't drop messages
0324      * from the sending conn until all their frags are sent.
0325      *
0326      * XXX we could spend more on the wire to get more robust failure
0327      * detection, arguably worth it to avoid data corruption.
0328      */
0329     if (be64_to_cpu(inc->i_hdr.h_sequence) < cp->cp_next_rx_seq &&
0330         (inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) {
0331         rds_stats_inc(s_recv_drop_old_seq);
0332         goto out;
0333     }
0334     cp->cp_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1;
0335 
0336     if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) {
0337         if (inc->i_hdr.h_sport == 0) {
0338             rdsdebug("ignore ping with 0 sport from %pI6c\n",
0339                  saddr);
0340             goto out;
0341         }
0342         rds_stats_inc(s_recv_ping);
0343         rds_send_pong(cp, inc->i_hdr.h_sport);
0344         /* if this is a handshake ping, start multipath if necessary */
0345         if (RDS_HS_PROBE(be16_to_cpu(inc->i_hdr.h_sport),
0346                  be16_to_cpu(inc->i_hdr.h_dport))) {
0347             rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn);
0348             rds_start_mprds(cp->cp_conn);
0349         }
0350         goto out;
0351     }
0352 
0353     if (be16_to_cpu(inc->i_hdr.h_dport) ==  RDS_FLAG_PROBE_PORT &&
0354         inc->i_hdr.h_sport == 0) {
0355         rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn);
0356         /* if this is a handshake pong, start multipath if necessary */
0357         rds_start_mprds(cp->cp_conn);
0358         wake_up(&cp->cp_conn->c_hs_waitq);
0359         goto out;
0360     }
0361 
0362     rs = rds_find_bound(daddr, inc->i_hdr.h_dport, conn->c_bound_if);
0363     if (!rs) {
0364         rds_stats_inc(s_recv_drop_no_sock);
0365         goto out;
0366     }
0367 
0368     /* Process extension headers */
0369     rds_recv_incoming_exthdrs(inc, rs);
0370 
0371     /* We can be racing with rds_release() which marks the socket dead. */
0372     sk = rds_rs_to_sk(rs);
0373 
0374     /* serialize with rds_release -> sock_orphan */
0375     write_lock_irqsave(&rs->rs_recv_lock, flags);
0376     if (!sock_flag(sk, SOCK_DEAD)) {
0377         rdsdebug("adding inc %p to rs %p's recv queue\n", inc, rs);
0378         rds_stats_inc(s_recv_queued);
0379         rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
0380                       be32_to_cpu(inc->i_hdr.h_len),
0381                       inc->i_hdr.h_dport);
0382         if (sock_flag(sk, SOCK_RCVTSTAMP))
0383             inc->i_usercopy.rx_tstamp = ktime_get_real();
0384         rds_inc_addref(inc);
0385         inc->i_rx_lat_trace[RDS_MSG_RX_END] = local_clock();
0386         list_add_tail(&inc->i_item, &rs->rs_recv_queue);
0387         __rds_wake_sk_sleep(sk);
0388     } else {
0389         rds_stats_inc(s_recv_drop_dead_sock);
0390     }
0391     write_unlock_irqrestore(&rs->rs_recv_lock, flags);
0392 
0393 out:
0394     if (rs)
0395         rds_sock_put(rs);
0396 }
0397 EXPORT_SYMBOL_GPL(rds_recv_incoming);
0398 
0399 /*
0400  * be very careful here.  This is being called as the condition in
0401  * wait_event_*() needs to cope with being called many times.
0402  */
0403 static int rds_next_incoming(struct rds_sock *rs, struct rds_incoming **inc)
0404 {
0405     unsigned long flags;
0406 
0407     if (!*inc) {
0408         read_lock_irqsave(&rs->rs_recv_lock, flags);
0409         if (!list_empty(&rs->rs_recv_queue)) {
0410             *inc = list_entry(rs->rs_recv_queue.next,
0411                       struct rds_incoming,
0412                       i_item);
0413             rds_inc_addref(*inc);
0414         }
0415         read_unlock_irqrestore(&rs->rs_recv_lock, flags);
0416     }
0417 
0418     return *inc != NULL;
0419 }
0420 
0421 static int rds_still_queued(struct rds_sock *rs, struct rds_incoming *inc,
0422                 int drop)
0423 {
0424     struct sock *sk = rds_rs_to_sk(rs);
0425     int ret = 0;
0426     unsigned long flags;
0427 
0428     write_lock_irqsave(&rs->rs_recv_lock, flags);
0429     if (!list_empty(&inc->i_item)) {
0430         ret = 1;
0431         if (drop) {
0432             /* XXX make sure this i_conn is reliable */
0433             rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
0434                           -be32_to_cpu(inc->i_hdr.h_len),
0435                           inc->i_hdr.h_dport);
0436             list_del_init(&inc->i_item);
0437             rds_inc_put(inc);
0438         }
0439     }
0440     write_unlock_irqrestore(&rs->rs_recv_lock, flags);
0441 
0442     rdsdebug("inc %p rs %p still %d dropped %d\n", inc, rs, ret, drop);
0443     return ret;
0444 }
0445 
0446 /*
0447  * Pull errors off the error queue.
0448  * If msghdr is NULL, we will just purge the error queue.
0449  */
0450 int rds_notify_queue_get(struct rds_sock *rs, struct msghdr *msghdr)
0451 {
0452     struct rds_notifier *notifier;
0453     struct rds_rdma_notify cmsg;
0454     unsigned int count = 0, max_messages = ~0U;
0455     unsigned long flags;
0456     LIST_HEAD(copy);
0457     int err = 0;
0458 
0459     memset(&cmsg, 0, sizeof(cmsg)); /* fill holes with zero */
0460 
0461     /* put_cmsg copies to user space and thus may sleep. We can't do this
0462      * with rs_lock held, so first grab as many notifications as we can stuff
0463      * in the user provided cmsg buffer. We don't try to copy more, to avoid
0464      * losing notifications - except when the buffer is so small that it wouldn't
0465      * even hold a single notification. Then we give him as much of this single
0466      * msg as we can squeeze in, and set MSG_CTRUNC.
0467      */
0468     if (msghdr) {
0469         max_messages = msghdr->msg_controllen / CMSG_SPACE(sizeof(cmsg));
0470         if (!max_messages)
0471             max_messages = 1;
0472     }
0473 
0474     spin_lock_irqsave(&rs->rs_lock, flags);
0475     while (!list_empty(&rs->rs_notify_queue) && count < max_messages) {
0476         notifier = list_entry(rs->rs_notify_queue.next,
0477                 struct rds_notifier, n_list);
0478         list_move(&notifier->n_list, &copy);
0479         count++;
0480     }
0481     spin_unlock_irqrestore(&rs->rs_lock, flags);
0482 
0483     if (!count)
0484         return 0;
0485 
0486     while (!list_empty(&copy)) {
0487         notifier = list_entry(copy.next, struct rds_notifier, n_list);
0488 
0489         if (msghdr) {
0490             cmsg.user_token = notifier->n_user_token;
0491             cmsg.status = notifier->n_status;
0492 
0493             err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_RDMA_STATUS,
0494                        sizeof(cmsg), &cmsg);
0495             if (err)
0496                 break;
0497         }
0498 
0499         list_del_init(&notifier->n_list);
0500         kfree(notifier);
0501     }
0502 
0503     /* If we bailed out because of an error in put_cmsg,
0504      * we may be left with one or more notifications that we
0505      * didn't process. Return them to the head of the list. */
0506     if (!list_empty(&copy)) {
0507         spin_lock_irqsave(&rs->rs_lock, flags);
0508         list_splice(&copy, &rs->rs_notify_queue);
0509         spin_unlock_irqrestore(&rs->rs_lock, flags);
0510     }
0511 
0512     return err;
0513 }
0514 
0515 /*
0516  * Queue a congestion notification
0517  */
0518 static int rds_notify_cong(struct rds_sock *rs, struct msghdr *msghdr)
0519 {
0520     uint64_t notify = rs->rs_cong_notify;
0521     unsigned long flags;
0522     int err;
0523 
0524     err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_CONG_UPDATE,
0525             sizeof(notify), &notify);
0526     if (err)
0527         return err;
0528 
0529     spin_lock_irqsave(&rs->rs_lock, flags);
0530     rs->rs_cong_notify &= ~notify;
0531     spin_unlock_irqrestore(&rs->rs_lock, flags);
0532 
0533     return 0;
0534 }
0535 
0536 /*
0537  * Receive any control messages.
0538  */
0539 static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg,
0540              struct rds_sock *rs)
0541 {
0542     int ret = 0;
0543 
0544     if (inc->i_usercopy.rdma_cookie) {
0545         ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RDMA_DEST,
0546                 sizeof(inc->i_usercopy.rdma_cookie),
0547                 &inc->i_usercopy.rdma_cookie);
0548         if (ret)
0549             goto out;
0550     }
0551 
0552     if ((inc->i_usercopy.rx_tstamp != 0) &&
0553         sock_flag(rds_rs_to_sk(rs), SOCK_RCVTSTAMP)) {
0554         struct __kernel_old_timeval tv =
0555             ns_to_kernel_old_timeval(inc->i_usercopy.rx_tstamp);
0556 
0557         if (!sock_flag(rds_rs_to_sk(rs), SOCK_TSTAMP_NEW)) {
0558             ret = put_cmsg(msg, SOL_SOCKET, SO_TIMESTAMP_OLD,
0559                        sizeof(tv), &tv);
0560         } else {
0561             struct __kernel_sock_timeval sk_tv;
0562 
0563             sk_tv.tv_sec = tv.tv_sec;
0564             sk_tv.tv_usec = tv.tv_usec;
0565 
0566             ret = put_cmsg(msg, SOL_SOCKET, SO_TIMESTAMP_NEW,
0567                        sizeof(sk_tv), &sk_tv);
0568         }
0569 
0570         if (ret)
0571             goto out;
0572     }
0573 
0574     if (rs->rs_rx_traces) {
0575         struct rds_cmsg_rx_trace t;
0576         int i, j;
0577 
0578         memset(&t, 0, sizeof(t));
0579         inc->i_rx_lat_trace[RDS_MSG_RX_CMSG] = local_clock();
0580         t.rx_traces =  rs->rs_rx_traces;
0581         for (i = 0; i < rs->rs_rx_traces; i++) {
0582             j = rs->rs_rx_trace[i];
0583             t.rx_trace_pos[i] = j;
0584             t.rx_trace[i] = inc->i_rx_lat_trace[j + 1] -
0585                       inc->i_rx_lat_trace[j];
0586         }
0587 
0588         ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RXPATH_LATENCY,
0589                    sizeof(t), &t);
0590         if (ret)
0591             goto out;
0592     }
0593 
0594 out:
0595     return ret;
0596 }
0597 
0598 static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
0599 {
0600     struct rds_msg_zcopy_queue *q = &rs->rs_zcookie_queue;
0601     struct rds_msg_zcopy_info *info = NULL;
0602     struct rds_zcopy_cookies *done;
0603     unsigned long flags;
0604 
0605     if (!msg->msg_control)
0606         return false;
0607 
0608     if (!sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY) ||
0609         msg->msg_controllen < CMSG_SPACE(sizeof(*done)))
0610         return false;
0611 
0612     spin_lock_irqsave(&q->lock, flags);
0613     if (!list_empty(&q->zcookie_head)) {
0614         info = list_entry(q->zcookie_head.next,
0615                   struct rds_msg_zcopy_info, rs_zcookie_next);
0616         list_del(&info->rs_zcookie_next);
0617     }
0618     spin_unlock_irqrestore(&q->lock, flags);
0619     if (!info)
0620         return false;
0621     done = &info->zcookies;
0622     if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done),
0623              done)) {
0624         spin_lock_irqsave(&q->lock, flags);
0625         list_add(&info->rs_zcookie_next, &q->zcookie_head);
0626         spin_unlock_irqrestore(&q->lock, flags);
0627         return false;
0628     }
0629     kfree(info);
0630     return true;
0631 }
0632 
0633 int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
0634         int msg_flags)
0635 {
0636     struct sock *sk = sock->sk;
0637     struct rds_sock *rs = rds_sk_to_rs(sk);
0638     long timeo;
0639     int ret = 0, nonblock = msg_flags & MSG_DONTWAIT;
0640     DECLARE_SOCKADDR(struct sockaddr_in6 *, sin6, msg->msg_name);
0641     DECLARE_SOCKADDR(struct sockaddr_in *, sin, msg->msg_name);
0642     struct rds_incoming *inc = NULL;
0643 
0644     /* udp_recvmsg()->sock_recvtimeo() gets away without locking too.. */
0645     timeo = sock_rcvtimeo(sk, nonblock);
0646 
0647     rdsdebug("size %zu flags 0x%x timeo %ld\n", size, msg_flags, timeo);
0648 
0649     if (msg_flags & MSG_OOB)
0650         goto out;
0651     if (msg_flags & MSG_ERRQUEUE)
0652         return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR);
0653 
0654     while (1) {
0655         /* If there are pending notifications, do those - and nothing else */
0656         if (!list_empty(&rs->rs_notify_queue)) {
0657             ret = rds_notify_queue_get(rs, msg);
0658             break;
0659         }
0660 
0661         if (rs->rs_cong_notify) {
0662             ret = rds_notify_cong(rs, msg);
0663             break;
0664         }
0665 
0666         if (!rds_next_incoming(rs, &inc)) {
0667             if (nonblock) {
0668                 bool reaped = rds_recvmsg_zcookie(rs, msg);
0669 
0670                 ret = reaped ?  0 : -EAGAIN;
0671                 break;
0672             }
0673 
0674             timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
0675                     (!list_empty(&rs->rs_notify_queue) ||
0676                      rs->rs_cong_notify ||
0677                      rds_next_incoming(rs, &inc)), timeo);
0678             rdsdebug("recvmsg woke inc %p timeo %ld\n", inc,
0679                  timeo);
0680             if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
0681                 continue;
0682 
0683             ret = timeo;
0684             if (ret == 0)
0685                 ret = -ETIMEDOUT;
0686             break;
0687         }
0688 
0689         rdsdebug("copying inc %p from %pI6c:%u to user\n", inc,
0690              &inc->i_conn->c_faddr,
0691              ntohs(inc->i_hdr.h_sport));
0692         ret = inc->i_conn->c_trans->inc_copy_to_user(inc, &msg->msg_iter);
0693         if (ret < 0)
0694             break;
0695 
0696         /*
0697          * if the message we just copied isn't at the head of the
0698          * recv queue then someone else raced us to return it, try
0699          * to get the next message.
0700          */
0701         if (!rds_still_queued(rs, inc, !(msg_flags & MSG_PEEK))) {
0702             rds_inc_put(inc);
0703             inc = NULL;
0704             rds_stats_inc(s_recv_deliver_raced);
0705             iov_iter_revert(&msg->msg_iter, ret);
0706             continue;
0707         }
0708 
0709         if (ret < be32_to_cpu(inc->i_hdr.h_len)) {
0710             if (msg_flags & MSG_TRUNC)
0711                 ret = be32_to_cpu(inc->i_hdr.h_len);
0712             msg->msg_flags |= MSG_TRUNC;
0713         }
0714 
0715         if (rds_cmsg_recv(inc, msg, rs)) {
0716             ret = -EFAULT;
0717             break;
0718         }
0719         rds_recvmsg_zcookie(rs, msg);
0720 
0721         rds_stats_inc(s_recv_delivered);
0722 
0723         if (msg->msg_name) {
0724             if (ipv6_addr_v4mapped(&inc->i_saddr)) {
0725                 sin->sin_family = AF_INET;
0726                 sin->sin_port = inc->i_hdr.h_sport;
0727                 sin->sin_addr.s_addr =
0728                     inc->i_saddr.s6_addr32[3];
0729                 memset(sin->sin_zero, 0, sizeof(sin->sin_zero));
0730                 msg->msg_namelen = sizeof(*sin);
0731             } else {
0732                 sin6->sin6_family = AF_INET6;
0733                 sin6->sin6_port = inc->i_hdr.h_sport;
0734                 sin6->sin6_addr = inc->i_saddr;
0735                 sin6->sin6_flowinfo = 0;
0736                 sin6->sin6_scope_id = rs->rs_bound_scope_id;
0737                 msg->msg_namelen = sizeof(*sin6);
0738             }
0739         }
0740         break;
0741     }
0742 
0743     if (inc)
0744         rds_inc_put(inc);
0745 
0746 out:
0747     return ret;
0748 }
0749 
0750 /*
0751  * The socket is being shut down and we're asked to drop messages that were
0752  * queued for recvmsg.  The caller has unbound the socket so the receive path
0753  * won't queue any more incoming fragments or messages on the socket.
0754  */
0755 void rds_clear_recv_queue(struct rds_sock *rs)
0756 {
0757     struct sock *sk = rds_rs_to_sk(rs);
0758     struct rds_incoming *inc, *tmp;
0759     unsigned long flags;
0760 
0761     write_lock_irqsave(&rs->rs_recv_lock, flags);
0762     list_for_each_entry_safe(inc, tmp, &rs->rs_recv_queue, i_item) {
0763         rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
0764                       -be32_to_cpu(inc->i_hdr.h_len),
0765                       inc->i_hdr.h_dport);
0766         list_del_init(&inc->i_item);
0767         rds_inc_put(inc);
0768     }
0769     write_unlock_irqrestore(&rs->rs_recv_lock, flags);
0770 }
0771 
0772 /*
0773  * inc->i_saddr isn't used here because it is only set in the receive
0774  * path.
0775  */
0776 void rds_inc_info_copy(struct rds_incoming *inc,
0777                struct rds_info_iterator *iter,
0778                __be32 saddr, __be32 daddr, int flip)
0779 {
0780     struct rds_info_message minfo;
0781 
0782     minfo.seq = be64_to_cpu(inc->i_hdr.h_sequence);
0783     minfo.len = be32_to_cpu(inc->i_hdr.h_len);
0784     minfo.tos = inc->i_conn->c_tos;
0785 
0786     if (flip) {
0787         minfo.laddr = daddr;
0788         minfo.faddr = saddr;
0789         minfo.lport = inc->i_hdr.h_dport;
0790         minfo.fport = inc->i_hdr.h_sport;
0791     } else {
0792         minfo.laddr = saddr;
0793         minfo.faddr = daddr;
0794         minfo.lport = inc->i_hdr.h_sport;
0795         minfo.fport = inc->i_hdr.h_dport;
0796     }
0797 
0798     minfo.flags = 0;
0799 
0800     rds_info_copy(iter, &minfo, sizeof(minfo));
0801 }
0802 
0803 #if IS_ENABLED(CONFIG_IPV6)
0804 void rds6_inc_info_copy(struct rds_incoming *inc,
0805             struct rds_info_iterator *iter,
0806             struct in6_addr *saddr, struct in6_addr *daddr,
0807             int flip)
0808 {
0809     struct rds6_info_message minfo6;
0810 
0811     minfo6.seq = be64_to_cpu(inc->i_hdr.h_sequence);
0812     minfo6.len = be32_to_cpu(inc->i_hdr.h_len);
0813     minfo6.tos = inc->i_conn->c_tos;
0814 
0815     if (flip) {
0816         minfo6.laddr = *daddr;
0817         minfo6.faddr = *saddr;
0818         minfo6.lport = inc->i_hdr.h_dport;
0819         minfo6.fport = inc->i_hdr.h_sport;
0820     } else {
0821         minfo6.laddr = *saddr;
0822         minfo6.faddr = *daddr;
0823         minfo6.lport = inc->i_hdr.h_sport;
0824         minfo6.fport = inc->i_hdr.h_dport;
0825     }
0826 
0827     minfo6.flags = 0;
0828 
0829     rds_info_copy(iter, &minfo6, sizeof(minfo6));
0830 }
0831 #endif