Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
0003  *
0004  * This software is available to you under a choice of one of two
0005  * licenses.  You may choose to be licensed under the terms of the GNU
0006  * General Public License (GPL) Version 2, available from the file
0007  * COPYING in the main directory of this source tree, or the
0008  * OpenIB.org BSD license below:
0009  *
0010  *     Redistribution and use in source and binary forms, with or
0011  *     without modification, are permitted provided that the following
0012  *     conditions are met:
0013  *
0014  *      - Redistributions of source code must retain the above
0015  *        copyright notice, this list of conditions and the following
0016  *        disclaimer.
0017  *
0018  *      - Redistributions in binary form must reproduce the above
0019  *        copyright notice, this list of conditions and the following
0020  *        disclaimer in the documentation and/or other materials
0021  *        provided with the distribution.
0022  *
0023  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
0024  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
0025  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
0026  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
0027  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
0028  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
0029  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
0030  * SOFTWARE.
0031  *
0032  */
0033 #include <linux/kernel.h>
0034 #include <linux/list.h>
0035 #include <linux/slab.h>
0036 #include <linux/export.h>
0037 #include <net/ipv6.h>
0038 #include <net/inet6_hashtables.h>
0039 #include <net/addrconf.h>
0040 
0041 #include "rds.h"
0042 #include "loop.h"
0043 
0044 #define RDS_CONNECTION_HASH_BITS 12
0045 #define RDS_CONNECTION_HASH_ENTRIES (1 << RDS_CONNECTION_HASH_BITS)
0046 #define RDS_CONNECTION_HASH_MASK (RDS_CONNECTION_HASH_ENTRIES - 1)
0047 
0048 /* converting this to RCU is a chore for another day.. */
0049 static DEFINE_SPINLOCK(rds_conn_lock);
0050 static unsigned long rds_conn_count;
0051 static struct hlist_head rds_conn_hash[RDS_CONNECTION_HASH_ENTRIES];
0052 static struct kmem_cache *rds_conn_slab;
0053 
0054 static struct hlist_head *rds_conn_bucket(const struct in6_addr *laddr,
0055                       const struct in6_addr *faddr)
0056 {
0057     static u32 rds6_hash_secret __read_mostly;
0058     static u32 rds_hash_secret __read_mostly;
0059 
0060     u32 lhash, fhash, hash;
0061 
0062     net_get_random_once(&rds_hash_secret, sizeof(rds_hash_secret));
0063     net_get_random_once(&rds6_hash_secret, sizeof(rds6_hash_secret));
0064 
0065     lhash = (__force u32)laddr->s6_addr32[3];
0066 #if IS_ENABLED(CONFIG_IPV6)
0067     fhash = __ipv6_addr_jhash(faddr, rds6_hash_secret);
0068 #else
0069     fhash = (__force u32)faddr->s6_addr32[3];
0070 #endif
0071     hash = __inet_ehashfn(lhash, 0, fhash, 0, rds_hash_secret);
0072 
0073     return &rds_conn_hash[hash & RDS_CONNECTION_HASH_MASK];
0074 }
0075 
0076 #define rds_conn_info_set(var, test, suffix) do {       \
0077     if (test)                       \
0078         var |= RDS_INFO_CONNECTION_FLAG_##suffix;   \
0079 } while (0)
0080 
0081 /* rcu read lock must be held or the connection spinlock */
0082 static struct rds_connection *rds_conn_lookup(struct net *net,
0083                           struct hlist_head *head,
0084                           const struct in6_addr *laddr,
0085                           const struct in6_addr *faddr,
0086                           struct rds_transport *trans,
0087                           u8 tos, int dev_if)
0088 {
0089     struct rds_connection *conn, *ret = NULL;
0090 
0091     hlist_for_each_entry_rcu(conn, head, c_hash_node) {
0092         if (ipv6_addr_equal(&conn->c_faddr, faddr) &&
0093             ipv6_addr_equal(&conn->c_laddr, laddr) &&
0094             conn->c_trans == trans &&
0095             conn->c_tos == tos &&
0096             net == rds_conn_net(conn) &&
0097             conn->c_dev_if == dev_if) {
0098             ret = conn;
0099             break;
0100         }
0101     }
0102     rdsdebug("returning conn %p for %pI6c -> %pI6c\n", ret,
0103          laddr, faddr);
0104     return ret;
0105 }
0106 
0107 /*
0108  * This is called by transports as they're bringing down a connection.
0109  * It clears partial message state so that the transport can start sending
0110  * and receiving over this connection again in the future.  It is up to
0111  * the transport to have serialized this call with its send and recv.
0112  */
0113 static void rds_conn_path_reset(struct rds_conn_path *cp)
0114 {
0115     struct rds_connection *conn = cp->cp_conn;
0116 
0117     rdsdebug("connection %pI6c to %pI6c reset\n",
0118          &conn->c_laddr, &conn->c_faddr);
0119 
0120     rds_stats_inc(s_conn_reset);
0121     rds_send_path_reset(cp);
0122     cp->cp_flags = 0;
0123 
0124     /* Do not clear next_rx_seq here, else we cannot distinguish
0125      * retransmitted packets from new packets, and will hand all
0126      * of them to the application. That is not consistent with the
0127      * reliability guarantees of RDS. */
0128 }
0129 
0130 static void __rds_conn_path_init(struct rds_connection *conn,
0131                  struct rds_conn_path *cp, bool is_outgoing)
0132 {
0133     spin_lock_init(&cp->cp_lock);
0134     cp->cp_next_tx_seq = 1;
0135     init_waitqueue_head(&cp->cp_waitq);
0136     INIT_LIST_HEAD(&cp->cp_send_queue);
0137     INIT_LIST_HEAD(&cp->cp_retrans);
0138 
0139     cp->cp_conn = conn;
0140     atomic_set(&cp->cp_state, RDS_CONN_DOWN);
0141     cp->cp_send_gen = 0;
0142     cp->cp_reconnect_jiffies = 0;
0143     cp->cp_conn->c_proposed_version = RDS_PROTOCOL_VERSION;
0144     INIT_DELAYED_WORK(&cp->cp_send_w, rds_send_worker);
0145     INIT_DELAYED_WORK(&cp->cp_recv_w, rds_recv_worker);
0146     INIT_DELAYED_WORK(&cp->cp_conn_w, rds_connect_worker);
0147     INIT_WORK(&cp->cp_down_w, rds_shutdown_worker);
0148     mutex_init(&cp->cp_cm_lock);
0149     cp->cp_flags = 0;
0150 }
0151 
0152 /*
0153  * There is only every one 'conn' for a given pair of addresses in the
0154  * system at a time.  They contain messages to be retransmitted and so
0155  * span the lifetime of the actual underlying transport connections.
0156  *
0157  * For now they are not garbage collected once they're created.  They
0158  * are torn down as the module is removed, if ever.
0159  */
0160 static struct rds_connection *__rds_conn_create(struct net *net,
0161                         const struct in6_addr *laddr,
0162                         const struct in6_addr *faddr,
0163                         struct rds_transport *trans,
0164                         gfp_t gfp, u8 tos,
0165                         int is_outgoing,
0166                         int dev_if)
0167 {
0168     struct rds_connection *conn, *parent = NULL;
0169     struct hlist_head *head = rds_conn_bucket(laddr, faddr);
0170     struct rds_transport *loop_trans;
0171     unsigned long flags;
0172     int ret, i;
0173     int npaths = (trans->t_mp_capable ? RDS_MPATH_WORKERS : 1);
0174 
0175     rcu_read_lock();
0176     conn = rds_conn_lookup(net, head, laddr, faddr, trans, tos, dev_if);
0177     if (conn &&
0178         conn->c_loopback &&
0179         conn->c_trans != &rds_loop_transport &&
0180         ipv6_addr_equal(laddr, faddr) &&
0181         !is_outgoing) {
0182         /* This is a looped back IB connection, and we're
0183          * called by the code handling the incoming connect.
0184          * We need a second connection object into which we
0185          * can stick the other QP. */
0186         parent = conn;
0187         conn = parent->c_passive;
0188     }
0189     rcu_read_unlock();
0190     if (conn)
0191         goto out;
0192 
0193     conn = kmem_cache_zalloc(rds_conn_slab, gfp);
0194     if (!conn) {
0195         conn = ERR_PTR(-ENOMEM);
0196         goto out;
0197     }
0198     conn->c_path = kcalloc(npaths, sizeof(struct rds_conn_path), gfp);
0199     if (!conn->c_path) {
0200         kmem_cache_free(rds_conn_slab, conn);
0201         conn = ERR_PTR(-ENOMEM);
0202         goto out;
0203     }
0204 
0205     INIT_HLIST_NODE(&conn->c_hash_node);
0206     conn->c_laddr = *laddr;
0207     conn->c_isv6 = !ipv6_addr_v4mapped(laddr);
0208     conn->c_faddr = *faddr;
0209     conn->c_dev_if = dev_if;
0210     conn->c_tos = tos;
0211 
0212 #if IS_ENABLED(CONFIG_IPV6)
0213     /* If the local address is link local, set c_bound_if to be the
0214      * index used for this connection.  Otherwise, set it to 0 as
0215      * the socket is not bound to an interface.  c_bound_if is used
0216      * to look up a socket when a packet is received
0217      */
0218     if (ipv6_addr_type(laddr) & IPV6_ADDR_LINKLOCAL)
0219         conn->c_bound_if = dev_if;
0220     else
0221 #endif
0222         conn->c_bound_if = 0;
0223 
0224     rds_conn_net_set(conn, net);
0225 
0226     ret = rds_cong_get_maps(conn);
0227     if (ret) {
0228         kfree(conn->c_path);
0229         kmem_cache_free(rds_conn_slab, conn);
0230         conn = ERR_PTR(ret);
0231         goto out;
0232     }
0233 
0234     /*
0235      * This is where a connection becomes loopback.  If *any* RDS sockets
0236      * can bind to the destination address then we'd rather the messages
0237      * flow through loopback rather than either transport.
0238      */
0239     loop_trans = rds_trans_get_preferred(net, faddr, conn->c_dev_if);
0240     if (loop_trans) {
0241         rds_trans_put(loop_trans);
0242         conn->c_loopback = 1;
0243         if (trans->t_prefer_loopback) {
0244             if (likely(is_outgoing)) {
0245                 /* "outgoing" connection to local address.
0246                  * Protocol says it wants the connection
0247                  * handled by the loopback transport.
0248                  * This is what TCP does.
0249                  */
0250                 trans = &rds_loop_transport;
0251             } else {
0252                 /* No transport currently in use
0253                  * should end up here, but if it
0254                  * does, reset/destroy the connection.
0255                  */
0256                 kfree(conn->c_path);
0257                 kmem_cache_free(rds_conn_slab, conn);
0258                 conn = ERR_PTR(-EOPNOTSUPP);
0259                 goto out;
0260             }
0261         }
0262     }
0263 
0264     conn->c_trans = trans;
0265 
0266     init_waitqueue_head(&conn->c_hs_waitq);
0267     for (i = 0; i < npaths; i++) {
0268         __rds_conn_path_init(conn, &conn->c_path[i],
0269                      is_outgoing);
0270         conn->c_path[i].cp_index = i;
0271     }
0272     rcu_read_lock();
0273     if (rds_destroy_pending(conn))
0274         ret = -ENETDOWN;
0275     else
0276         ret = trans->conn_alloc(conn, GFP_ATOMIC);
0277     if (ret) {
0278         rcu_read_unlock();
0279         kfree(conn->c_path);
0280         kmem_cache_free(rds_conn_slab, conn);
0281         conn = ERR_PTR(ret);
0282         goto out;
0283     }
0284 
0285     rdsdebug("allocated conn %p for %pI6c -> %pI6c over %s %s\n",
0286          conn, laddr, faddr,
0287          strnlen(trans->t_name, sizeof(trans->t_name)) ?
0288          trans->t_name : "[unknown]", is_outgoing ? "(outgoing)" : "");
0289 
0290     /*
0291      * Since we ran without holding the conn lock, someone could
0292      * have created the same conn (either normal or passive) in the
0293      * interim. We check while holding the lock. If we won, we complete
0294      * init and return our conn. If we lost, we rollback and return the
0295      * other one.
0296      */
0297     spin_lock_irqsave(&rds_conn_lock, flags);
0298     if (parent) {
0299         /* Creating passive conn */
0300         if (parent->c_passive) {
0301             trans->conn_free(conn->c_path[0].cp_transport_data);
0302             kfree(conn->c_path);
0303             kmem_cache_free(rds_conn_slab, conn);
0304             conn = parent->c_passive;
0305         } else {
0306             parent->c_passive = conn;
0307             rds_cong_add_conn(conn);
0308             rds_conn_count++;
0309         }
0310     } else {
0311         /* Creating normal conn */
0312         struct rds_connection *found;
0313 
0314         found = rds_conn_lookup(net, head, laddr, faddr, trans,
0315                     tos, dev_if);
0316         if (found) {
0317             struct rds_conn_path *cp;
0318             int i;
0319 
0320             for (i = 0; i < npaths; i++) {
0321                 cp = &conn->c_path[i];
0322                 /* The ->conn_alloc invocation may have
0323                  * allocated resource for all paths, so all
0324                  * of them may have to be freed here.
0325                  */
0326                 if (cp->cp_transport_data)
0327                     trans->conn_free(cp->cp_transport_data);
0328             }
0329             kfree(conn->c_path);
0330             kmem_cache_free(rds_conn_slab, conn);
0331             conn = found;
0332         } else {
0333             conn->c_my_gen_num = rds_gen_num;
0334             conn->c_peer_gen_num = 0;
0335             hlist_add_head_rcu(&conn->c_hash_node, head);
0336             rds_cong_add_conn(conn);
0337             rds_conn_count++;
0338         }
0339     }
0340     spin_unlock_irqrestore(&rds_conn_lock, flags);
0341     rcu_read_unlock();
0342 
0343 out:
0344     return conn;
0345 }
0346 
0347 struct rds_connection *rds_conn_create(struct net *net,
0348                        const struct in6_addr *laddr,
0349                        const struct in6_addr *faddr,
0350                        struct rds_transport *trans, u8 tos,
0351                        gfp_t gfp, int dev_if)
0352 {
0353     return __rds_conn_create(net, laddr, faddr, trans, gfp, tos, 0, dev_if);
0354 }
0355 EXPORT_SYMBOL_GPL(rds_conn_create);
0356 
0357 struct rds_connection *rds_conn_create_outgoing(struct net *net,
0358                         const struct in6_addr *laddr,
0359                         const struct in6_addr *faddr,
0360                         struct rds_transport *trans,
0361                         u8 tos, gfp_t gfp, int dev_if)
0362 {
0363     return __rds_conn_create(net, laddr, faddr, trans, gfp, tos, 1, dev_if);
0364 }
0365 EXPORT_SYMBOL_GPL(rds_conn_create_outgoing);
0366 
0367 void rds_conn_shutdown(struct rds_conn_path *cp)
0368 {
0369     struct rds_connection *conn = cp->cp_conn;
0370 
0371     /* shut it down unless it's down already */
0372     if (!rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_DOWN)) {
0373         /*
0374          * Quiesce the connection mgmt handlers before we start tearing
0375          * things down. We don't hold the mutex for the entire
0376          * duration of the shutdown operation, else we may be
0377          * deadlocking with the CM handler. Instead, the CM event
0378          * handler is supposed to check for state DISCONNECTING
0379          */
0380         mutex_lock(&cp->cp_cm_lock);
0381         if (!rds_conn_path_transition(cp, RDS_CONN_UP,
0382                           RDS_CONN_DISCONNECTING) &&
0383             !rds_conn_path_transition(cp, RDS_CONN_ERROR,
0384                           RDS_CONN_DISCONNECTING)) {
0385             rds_conn_path_error(cp,
0386                         "shutdown called in state %d\n",
0387                         atomic_read(&cp->cp_state));
0388             mutex_unlock(&cp->cp_cm_lock);
0389             return;
0390         }
0391         mutex_unlock(&cp->cp_cm_lock);
0392 
0393         wait_event(cp->cp_waitq,
0394                !test_bit(RDS_IN_XMIT, &cp->cp_flags));
0395         wait_event(cp->cp_waitq,
0396                !test_bit(RDS_RECV_REFILL, &cp->cp_flags));
0397 
0398         conn->c_trans->conn_path_shutdown(cp);
0399         rds_conn_path_reset(cp);
0400 
0401         if (!rds_conn_path_transition(cp, RDS_CONN_DISCONNECTING,
0402                           RDS_CONN_DOWN) &&
0403             !rds_conn_path_transition(cp, RDS_CONN_ERROR,
0404                           RDS_CONN_DOWN)) {
0405             /* This can happen - eg when we're in the middle of tearing
0406              * down the connection, and someone unloads the rds module.
0407              * Quite reproducible with loopback connections.
0408              * Mostly harmless.
0409              *
0410              * Note that this also happens with rds-tcp because
0411              * we could have triggered rds_conn_path_drop in irq
0412              * mode from rds_tcp_state change on the receipt of
0413              * a FIN, thus we need to recheck for RDS_CONN_ERROR
0414              * here.
0415              */
0416             rds_conn_path_error(cp, "%s: failed to transition "
0417                         "to state DOWN, current state "
0418                         "is %d\n", __func__,
0419                         atomic_read(&cp->cp_state));
0420             return;
0421         }
0422     }
0423 
0424     /* Then reconnect if it's still live.
0425      * The passive side of an IB loopback connection is never added
0426      * to the conn hash, so we never trigger a reconnect on this
0427      * conn - the reconnect is always triggered by the active peer. */
0428     cancel_delayed_work_sync(&cp->cp_conn_w);
0429     rcu_read_lock();
0430     if (!hlist_unhashed(&conn->c_hash_node)) {
0431         rcu_read_unlock();
0432         rds_queue_reconnect(cp);
0433     } else {
0434         rcu_read_unlock();
0435     }
0436 }
0437 
0438 /* destroy a single rds_conn_path. rds_conn_destroy() iterates over
0439  * all paths using rds_conn_path_destroy()
0440  */
0441 static void rds_conn_path_destroy(struct rds_conn_path *cp)
0442 {
0443     struct rds_message *rm, *rtmp;
0444 
0445     if (!cp->cp_transport_data)
0446         return;
0447 
0448     /* make sure lingering queued work won't try to ref the conn */
0449     cancel_delayed_work_sync(&cp->cp_send_w);
0450     cancel_delayed_work_sync(&cp->cp_recv_w);
0451 
0452     rds_conn_path_drop(cp, true);
0453     flush_work(&cp->cp_down_w);
0454 
0455     /* tear down queued messages */
0456     list_for_each_entry_safe(rm, rtmp,
0457                  &cp->cp_send_queue,
0458                  m_conn_item) {
0459         list_del_init(&rm->m_conn_item);
0460         BUG_ON(!list_empty(&rm->m_sock_item));
0461         rds_message_put(rm);
0462     }
0463     if (cp->cp_xmit_rm)
0464         rds_message_put(cp->cp_xmit_rm);
0465 
0466     WARN_ON(delayed_work_pending(&cp->cp_send_w));
0467     WARN_ON(delayed_work_pending(&cp->cp_recv_w));
0468     WARN_ON(delayed_work_pending(&cp->cp_conn_w));
0469     WARN_ON(work_pending(&cp->cp_down_w));
0470 
0471     cp->cp_conn->c_trans->conn_free(cp->cp_transport_data);
0472 }
0473 
0474 /*
0475  * Stop and free a connection.
0476  *
0477  * This can only be used in very limited circumstances.  It assumes that once
0478  * the conn has been shutdown that no one else is referencing the connection.
0479  * We can only ensure this in the rmmod path in the current code.
0480  */
0481 void rds_conn_destroy(struct rds_connection *conn)
0482 {
0483     unsigned long flags;
0484     int i;
0485     struct rds_conn_path *cp;
0486     int npaths = (conn->c_trans->t_mp_capable ? RDS_MPATH_WORKERS : 1);
0487 
0488     rdsdebug("freeing conn %p for %pI4 -> "
0489          "%pI4\n", conn, &conn->c_laddr,
0490          &conn->c_faddr);
0491 
0492     /* Ensure conn will not be scheduled for reconnect */
0493     spin_lock_irq(&rds_conn_lock);
0494     hlist_del_init_rcu(&conn->c_hash_node);
0495     spin_unlock_irq(&rds_conn_lock);
0496     synchronize_rcu();
0497 
0498     /* shut the connection down */
0499     for (i = 0; i < npaths; i++) {
0500         cp = &conn->c_path[i];
0501         rds_conn_path_destroy(cp);
0502         BUG_ON(!list_empty(&cp->cp_retrans));
0503     }
0504 
0505     /*
0506      * The congestion maps aren't freed up here.  They're
0507      * freed by rds_cong_exit() after all the connections
0508      * have been freed.
0509      */
0510     rds_cong_remove_conn(conn);
0511 
0512     kfree(conn->c_path);
0513     kmem_cache_free(rds_conn_slab, conn);
0514 
0515     spin_lock_irqsave(&rds_conn_lock, flags);
0516     rds_conn_count--;
0517     spin_unlock_irqrestore(&rds_conn_lock, flags);
0518 }
0519 EXPORT_SYMBOL_GPL(rds_conn_destroy);
0520 
0521 static void __rds_inc_msg_cp(struct rds_incoming *inc,
0522                  struct rds_info_iterator *iter,
0523                  void *saddr, void *daddr, int flip, bool isv6)
0524 {
0525 #if IS_ENABLED(CONFIG_IPV6)
0526     if (isv6)
0527         rds6_inc_info_copy(inc, iter, saddr, daddr, flip);
0528     else
0529 #endif
0530         rds_inc_info_copy(inc, iter, *(__be32 *)saddr,
0531                   *(__be32 *)daddr, flip);
0532 }
0533 
0534 static void rds_conn_message_info_cmn(struct socket *sock, unsigned int len,
0535                       struct rds_info_iterator *iter,
0536                       struct rds_info_lengths *lens,
0537                       int want_send, bool isv6)
0538 {
0539     struct hlist_head *head;
0540     struct list_head *list;
0541     struct rds_connection *conn;
0542     struct rds_message *rm;
0543     unsigned int total = 0;
0544     unsigned long flags;
0545     size_t i;
0546     int j;
0547 
0548     if (isv6)
0549         len /= sizeof(struct rds6_info_message);
0550     else
0551         len /= sizeof(struct rds_info_message);
0552 
0553     rcu_read_lock();
0554 
0555     for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
0556          i++, head++) {
0557         hlist_for_each_entry_rcu(conn, head, c_hash_node) {
0558             struct rds_conn_path *cp;
0559             int npaths;
0560 
0561             if (!isv6 && conn->c_isv6)
0562                 continue;
0563 
0564             npaths = (conn->c_trans->t_mp_capable ?
0565                  RDS_MPATH_WORKERS : 1);
0566 
0567             for (j = 0; j < npaths; j++) {
0568                 cp = &conn->c_path[j];
0569                 if (want_send)
0570                     list = &cp->cp_send_queue;
0571                 else
0572                     list = &cp->cp_retrans;
0573 
0574                 spin_lock_irqsave(&cp->cp_lock, flags);
0575 
0576                 /* XXX too lazy to maintain counts.. */
0577                 list_for_each_entry(rm, list, m_conn_item) {
0578                     total++;
0579                     if (total <= len)
0580                         __rds_inc_msg_cp(&rm->m_inc,
0581                                  iter,
0582                                  &conn->c_laddr,
0583                                  &conn->c_faddr,
0584                                  0, isv6);
0585                 }
0586 
0587                 spin_unlock_irqrestore(&cp->cp_lock, flags);
0588             }
0589         }
0590     }
0591     rcu_read_unlock();
0592 
0593     lens->nr = total;
0594     if (isv6)
0595         lens->each = sizeof(struct rds6_info_message);
0596     else
0597         lens->each = sizeof(struct rds_info_message);
0598 }
0599 
0600 static void rds_conn_message_info(struct socket *sock, unsigned int len,
0601                   struct rds_info_iterator *iter,
0602                   struct rds_info_lengths *lens,
0603                   int want_send)
0604 {
0605     rds_conn_message_info_cmn(sock, len, iter, lens, want_send, false);
0606 }
0607 
0608 #if IS_ENABLED(CONFIG_IPV6)
0609 static void rds6_conn_message_info(struct socket *sock, unsigned int len,
0610                    struct rds_info_iterator *iter,
0611                    struct rds_info_lengths *lens,
0612                    int want_send)
0613 {
0614     rds_conn_message_info_cmn(sock, len, iter, lens, want_send, true);
0615 }
0616 #endif
0617 
0618 static void rds_conn_message_info_send(struct socket *sock, unsigned int len,
0619                        struct rds_info_iterator *iter,
0620                        struct rds_info_lengths *lens)
0621 {
0622     rds_conn_message_info(sock, len, iter, lens, 1);
0623 }
0624 
0625 #if IS_ENABLED(CONFIG_IPV6)
0626 static void rds6_conn_message_info_send(struct socket *sock, unsigned int len,
0627                     struct rds_info_iterator *iter,
0628                     struct rds_info_lengths *lens)
0629 {
0630     rds6_conn_message_info(sock, len, iter, lens, 1);
0631 }
0632 #endif
0633 
0634 static void rds_conn_message_info_retrans(struct socket *sock,
0635                       unsigned int len,
0636                       struct rds_info_iterator *iter,
0637                       struct rds_info_lengths *lens)
0638 {
0639     rds_conn_message_info(sock, len, iter, lens, 0);
0640 }
0641 
0642 #if IS_ENABLED(CONFIG_IPV6)
0643 static void rds6_conn_message_info_retrans(struct socket *sock,
0644                        unsigned int len,
0645                        struct rds_info_iterator *iter,
0646                        struct rds_info_lengths *lens)
0647 {
0648     rds6_conn_message_info(sock, len, iter, lens, 0);
0649 }
0650 #endif
0651 
0652 void rds_for_each_conn_info(struct socket *sock, unsigned int len,
0653               struct rds_info_iterator *iter,
0654               struct rds_info_lengths *lens,
0655               int (*visitor)(struct rds_connection *, void *),
0656               u64 *buffer,
0657               size_t item_len)
0658 {
0659     struct hlist_head *head;
0660     struct rds_connection *conn;
0661     size_t i;
0662 
0663     rcu_read_lock();
0664 
0665     lens->nr = 0;
0666     lens->each = item_len;
0667 
0668     for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
0669          i++, head++) {
0670         hlist_for_each_entry_rcu(conn, head, c_hash_node) {
0671 
0672             /* XXX no c_lock usage.. */
0673             if (!visitor(conn, buffer))
0674                 continue;
0675 
0676             /* We copy as much as we can fit in the buffer,
0677              * but we count all items so that the caller
0678              * can resize the buffer. */
0679             if (len >= item_len) {
0680                 rds_info_copy(iter, buffer, item_len);
0681                 len -= item_len;
0682             }
0683             lens->nr++;
0684         }
0685     }
0686     rcu_read_unlock();
0687 }
0688 EXPORT_SYMBOL_GPL(rds_for_each_conn_info);
0689 
0690 static void rds_walk_conn_path_info(struct socket *sock, unsigned int len,
0691                     struct rds_info_iterator *iter,
0692                     struct rds_info_lengths *lens,
0693                     int (*visitor)(struct rds_conn_path *, void *),
0694                     u64 *buffer,
0695                     size_t item_len)
0696 {
0697     struct hlist_head *head;
0698     struct rds_connection *conn;
0699     size_t i;
0700 
0701     rcu_read_lock();
0702 
0703     lens->nr = 0;
0704     lens->each = item_len;
0705 
0706     for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
0707          i++, head++) {
0708         hlist_for_each_entry_rcu(conn, head, c_hash_node) {
0709             struct rds_conn_path *cp;
0710 
0711             /* XXX We only copy the information from the first
0712              * path for now.  The problem is that if there are
0713              * more than one underlying paths, we cannot report
0714              * information of all of them using the existing
0715              * API.  For example, there is only one next_tx_seq,
0716              * which path's next_tx_seq should we report?  It is
0717              * a bug in the design of MPRDS.
0718              */
0719             cp = conn->c_path;
0720 
0721             /* XXX no cp_lock usage.. */
0722             if (!visitor(cp, buffer))
0723                 continue;
0724 
0725             /* We copy as much as we can fit in the buffer,
0726              * but we count all items so that the caller
0727              * can resize the buffer.
0728              */
0729             if (len >= item_len) {
0730                 rds_info_copy(iter, buffer, item_len);
0731                 len -= item_len;
0732             }
0733             lens->nr++;
0734         }
0735     }
0736     rcu_read_unlock();
0737 }
0738 
0739 static int rds_conn_info_visitor(struct rds_conn_path *cp, void *buffer)
0740 {
0741     struct rds_info_connection *cinfo = buffer;
0742     struct rds_connection *conn = cp->cp_conn;
0743 
0744     if (conn->c_isv6)
0745         return 0;
0746 
0747     cinfo->next_tx_seq = cp->cp_next_tx_seq;
0748     cinfo->next_rx_seq = cp->cp_next_rx_seq;
0749     cinfo->laddr = conn->c_laddr.s6_addr32[3];
0750     cinfo->faddr = conn->c_faddr.s6_addr32[3];
0751     cinfo->tos = conn->c_tos;
0752     strncpy(cinfo->transport, conn->c_trans->t_name,
0753         sizeof(cinfo->transport));
0754     cinfo->flags = 0;
0755 
0756     rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &cp->cp_flags),
0757               SENDING);
0758     /* XXX Future: return the state rather than these funky bits */
0759     rds_conn_info_set(cinfo->flags,
0760               atomic_read(&cp->cp_state) == RDS_CONN_CONNECTING,
0761               CONNECTING);
0762     rds_conn_info_set(cinfo->flags,
0763               atomic_read(&cp->cp_state) == RDS_CONN_UP,
0764               CONNECTED);
0765     return 1;
0766 }
0767 
0768 #if IS_ENABLED(CONFIG_IPV6)
0769 static int rds6_conn_info_visitor(struct rds_conn_path *cp, void *buffer)
0770 {
0771     struct rds6_info_connection *cinfo6 = buffer;
0772     struct rds_connection *conn = cp->cp_conn;
0773 
0774     cinfo6->next_tx_seq = cp->cp_next_tx_seq;
0775     cinfo6->next_rx_seq = cp->cp_next_rx_seq;
0776     cinfo6->laddr = conn->c_laddr;
0777     cinfo6->faddr = conn->c_faddr;
0778     strncpy(cinfo6->transport, conn->c_trans->t_name,
0779         sizeof(cinfo6->transport));
0780     cinfo6->flags = 0;
0781 
0782     rds_conn_info_set(cinfo6->flags, test_bit(RDS_IN_XMIT, &cp->cp_flags),
0783               SENDING);
0784     /* XXX Future: return the state rather than these funky bits */
0785     rds_conn_info_set(cinfo6->flags,
0786               atomic_read(&cp->cp_state) == RDS_CONN_CONNECTING,
0787               CONNECTING);
0788     rds_conn_info_set(cinfo6->flags,
0789               atomic_read(&cp->cp_state) == RDS_CONN_UP,
0790               CONNECTED);
0791     /* Just return 1 as there is no error case. This is a helper function
0792      * for rds_walk_conn_path_info() and it wants a return value.
0793      */
0794     return 1;
0795 }
0796 #endif
0797 
0798 static void rds_conn_info(struct socket *sock, unsigned int len,
0799               struct rds_info_iterator *iter,
0800               struct rds_info_lengths *lens)
0801 {
0802     u64 buffer[(sizeof(struct rds_info_connection) + 7) / 8];
0803 
0804     rds_walk_conn_path_info(sock, len, iter, lens,
0805                 rds_conn_info_visitor,
0806                 buffer,
0807                 sizeof(struct rds_info_connection));
0808 }
0809 
0810 #if IS_ENABLED(CONFIG_IPV6)
0811 static void rds6_conn_info(struct socket *sock, unsigned int len,
0812                struct rds_info_iterator *iter,
0813                struct rds_info_lengths *lens)
0814 {
0815     u64 buffer[(sizeof(struct rds6_info_connection) + 7) / 8];
0816 
0817     rds_walk_conn_path_info(sock, len, iter, lens,
0818                 rds6_conn_info_visitor,
0819                 buffer,
0820                 sizeof(struct rds6_info_connection));
0821 }
0822 #endif
0823 
0824 int rds_conn_init(void)
0825 {
0826     int ret;
0827 
0828     ret = rds_loop_net_init(); /* register pernet callback */
0829     if (ret)
0830         return ret;
0831 
0832     rds_conn_slab = kmem_cache_create("rds_connection",
0833                       sizeof(struct rds_connection),
0834                       0, 0, NULL);
0835     if (!rds_conn_slab) {
0836         rds_loop_net_exit();
0837         return -ENOMEM;
0838     }
0839 
0840     rds_info_register_func(RDS_INFO_CONNECTIONS, rds_conn_info);
0841     rds_info_register_func(RDS_INFO_SEND_MESSAGES,
0842                    rds_conn_message_info_send);
0843     rds_info_register_func(RDS_INFO_RETRANS_MESSAGES,
0844                    rds_conn_message_info_retrans);
0845 #if IS_ENABLED(CONFIG_IPV6)
0846     rds_info_register_func(RDS6_INFO_CONNECTIONS, rds6_conn_info);
0847     rds_info_register_func(RDS6_INFO_SEND_MESSAGES,
0848                    rds6_conn_message_info_send);
0849     rds_info_register_func(RDS6_INFO_RETRANS_MESSAGES,
0850                    rds6_conn_message_info_retrans);
0851 #endif
0852     return 0;
0853 }
0854 
0855 void rds_conn_exit(void)
0856 {
0857     rds_loop_net_exit(); /* unregister pernet callback */
0858     rds_loop_exit();
0859 
0860     WARN_ON(!hlist_empty(rds_conn_hash));
0861 
0862     kmem_cache_destroy(rds_conn_slab);
0863 
0864     rds_info_deregister_func(RDS_INFO_CONNECTIONS, rds_conn_info);
0865     rds_info_deregister_func(RDS_INFO_SEND_MESSAGES,
0866                  rds_conn_message_info_send);
0867     rds_info_deregister_func(RDS_INFO_RETRANS_MESSAGES,
0868                  rds_conn_message_info_retrans);
0869 #if IS_ENABLED(CONFIG_IPV6)
0870     rds_info_deregister_func(RDS6_INFO_CONNECTIONS, rds6_conn_info);
0871     rds_info_deregister_func(RDS6_INFO_SEND_MESSAGES,
0872                  rds6_conn_message_info_send);
0873     rds_info_deregister_func(RDS6_INFO_RETRANS_MESSAGES,
0874                  rds6_conn_message_info_retrans);
0875 #endif
0876 }
0877 
0878 /*
0879  * Force a disconnect
0880  */
0881 void rds_conn_path_drop(struct rds_conn_path *cp, bool destroy)
0882 {
0883     atomic_set(&cp->cp_state, RDS_CONN_ERROR);
0884 
0885     rcu_read_lock();
0886     if (!destroy && rds_destroy_pending(cp->cp_conn)) {
0887         rcu_read_unlock();
0888         return;
0889     }
0890     queue_work(rds_wq, &cp->cp_down_w);
0891     rcu_read_unlock();
0892 }
0893 EXPORT_SYMBOL_GPL(rds_conn_path_drop);
0894 
0895 void rds_conn_drop(struct rds_connection *conn)
0896 {
0897     WARN_ON(conn->c_trans->t_mp_capable);
0898     rds_conn_path_drop(&conn->c_path[0], false);
0899 }
0900 EXPORT_SYMBOL_GPL(rds_conn_drop);
0901 
0902 /*
0903  * If the connection is down, trigger a connect. We may have scheduled a
0904  * delayed reconnect however - in this case we should not interfere.
0905  */
0906 void rds_conn_path_connect_if_down(struct rds_conn_path *cp)
0907 {
0908     rcu_read_lock();
0909     if (rds_destroy_pending(cp->cp_conn)) {
0910         rcu_read_unlock();
0911         return;
0912     }
0913     if (rds_conn_path_state(cp) == RDS_CONN_DOWN &&
0914         !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags))
0915         queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
0916     rcu_read_unlock();
0917 }
0918 EXPORT_SYMBOL_GPL(rds_conn_path_connect_if_down);
0919 
0920 /* Check connectivity of all paths
0921  */
0922 void rds_check_all_paths(struct rds_connection *conn)
0923 {
0924     int i = 0;
0925 
0926     do {
0927         rds_conn_path_connect_if_down(&conn->c_path[i]);
0928     } while (++i < conn->c_npaths);
0929 }
0930 
0931 void rds_conn_connect_if_down(struct rds_connection *conn)
0932 {
0933     WARN_ON(conn->c_trans->t_mp_capable);
0934     rds_conn_path_connect_if_down(&conn->c_path[0]);
0935 }
0936 EXPORT_SYMBOL_GPL(rds_conn_connect_if_down);
0937 
0938 void
0939 __rds_conn_path_error(struct rds_conn_path *cp, const char *fmt, ...)
0940 {
0941     va_list ap;
0942 
0943     va_start(ap, fmt);
0944     vprintk(fmt, ap);
0945     va_end(ap);
0946 
0947     rds_conn_path_drop(cp, false);
0948 }