Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
0003  *
0004  * This software is available to you under a choice of one of two
0005  * licenses.  You may choose to be licensed under the terms of the GNU
0006  * General Public License (GPL) Version 2, available from the file
0007  * COPYING in the main directory of this source tree, or the
0008  * OpenIB.org BSD license below:
0009  *
0010  *     Redistribution and use in source and binary forms, with or
0011  *     without modification, are permitted provided that the following
0012  *     conditions are met:
0013  *
0014  *      - Redistributions of source code must retain the above
0015  *        copyright notice, this list of conditions and the following
0016  *        disclaimer.
0017  *
0018  *      - Redistributions in binary form must reproduce the above
0019  *        copyright notice, this list of conditions and the following
0020  *        disclaimer in the documentation and/or other materials
0021  *        provided with the distribution.
0022  *
0023  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
0024  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
0025  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
0026  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
0027  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
0028  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
0029  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
0030  * SOFTWARE.
0031  *
0032  */
0033 #include <linux/kernel.h>
0034 #include <linux/random.h>
0035 #include <linux/export.h>
0036 
0037 #include "rds.h"
0038 
0039 /*
0040  * All of connection management is simplified by serializing it through
0041  * work queues that execute in a connection managing thread.
0042  *
0043  * TCP wants to send acks through sendpage() in response to data_ready(),
0044  * but it needs a process context to do so.
0045  *
0046  * The receive paths need to allocate but can't drop packets (!) so we have
0047  * a thread around to block allocating if the receive fast path sees an
0048  * allocation failure.
0049  */
0050 
0051 /* Grand Unified Theory of connection life cycle:
0052  * At any point in time, the connection can be in one of these states:
0053  * DOWN, CONNECTING, UP, DISCONNECTING, ERROR
0054  *
0055  * The following transitions are possible:
0056  *  ANY       -> ERROR
0057  *  UP        -> DISCONNECTING
0058  *  ERROR     -> DISCONNECTING
0059  *  DISCONNECTING -> DOWN
0060  *  DOWN      -> CONNECTING
0061  *  CONNECTING    -> UP
0062  *
0063  * Transition to state DISCONNECTING/DOWN:
0064  *  -   Inside the shutdown worker; synchronizes with xmit path
0065  *  through RDS_IN_XMIT, and with connection management callbacks
0066  *  via c_cm_lock.
0067  *
0068  *  For receive callbacks, we rely on the underlying transport
0069  *  (TCP, IB/RDMA) to provide the necessary synchronisation.
0070  */
0071 struct workqueue_struct *rds_wq;
0072 EXPORT_SYMBOL_GPL(rds_wq);
0073 
0074 void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
0075 {
0076     if (!rds_conn_path_transition(cp, curr, RDS_CONN_UP)) {
0077         printk(KERN_WARNING "%s: Cannot transition to state UP, "
0078                 "current state is %d\n",
0079                 __func__,
0080                 atomic_read(&cp->cp_state));
0081         rds_conn_path_drop(cp, false);
0082         return;
0083     }
0084 
0085     rdsdebug("conn %p for %pI6c to %pI6c complete\n",
0086          cp->cp_conn, &cp->cp_conn->c_laddr, &cp->cp_conn->c_faddr);
0087 
0088     cp->cp_reconnect_jiffies = 0;
0089     set_bit(0, &cp->cp_conn->c_map_queued);
0090     rcu_read_lock();
0091     if (!rds_destroy_pending(cp->cp_conn)) {
0092         queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
0093         queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
0094     }
0095     rcu_read_unlock();
0096     cp->cp_conn->c_proposed_version = RDS_PROTOCOL_VERSION;
0097 }
0098 EXPORT_SYMBOL_GPL(rds_connect_path_complete);
0099 
0100 void rds_connect_complete(struct rds_connection *conn)
0101 {
0102     rds_connect_path_complete(&conn->c_path[0], RDS_CONN_CONNECTING);
0103 }
0104 EXPORT_SYMBOL_GPL(rds_connect_complete);
0105 
0106 /*
0107  * This random exponential backoff is relied on to eventually resolve racing
0108  * connects.
0109  *
0110  * If connect attempts race then both parties drop both connections and come
0111  * here to wait for a random amount of time before trying again.  Eventually
0112  * the backoff range will be so much greater than the time it takes to
0113  * establish a connection that one of the pair will establish the connection
0114  * before the other's random delay fires.
0115  *
0116  * Connection attempts that arrive while a connection is already established
0117  * are also considered to be racing connects.  This lets a connection from
0118  * a rebooted machine replace an existing stale connection before the transport
0119  * notices that the connection has failed.
0120  *
0121  * We should *always* start with a random backoff; otherwise a broken connection
0122  * will always take several iterations to be re-established.
0123  */
0124 void rds_queue_reconnect(struct rds_conn_path *cp)
0125 {
0126     unsigned long rand;
0127     struct rds_connection *conn = cp->cp_conn;
0128 
0129     rdsdebug("conn %p for %pI6c to %pI6c reconnect jiffies %lu\n",
0130          conn, &conn->c_laddr, &conn->c_faddr,
0131          cp->cp_reconnect_jiffies);
0132 
0133     /* let peer with smaller addr initiate reconnect, to avoid duels */
0134     if (conn->c_trans->t_type == RDS_TRANS_TCP &&
0135         rds_addr_cmp(&conn->c_laddr, &conn->c_faddr) >= 0)
0136         return;
0137 
0138     set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
0139     if (cp->cp_reconnect_jiffies == 0) {
0140         cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
0141         rcu_read_lock();
0142         if (!rds_destroy_pending(cp->cp_conn))
0143             queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
0144         rcu_read_unlock();
0145         return;
0146     }
0147 
0148     get_random_bytes(&rand, sizeof(rand));
0149     rdsdebug("%lu delay %lu ceil conn %p for %pI6c -> %pI6c\n",
0150          rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies,
0151          conn, &conn->c_laddr, &conn->c_faddr);
0152     rcu_read_lock();
0153     if (!rds_destroy_pending(cp->cp_conn))
0154         queue_delayed_work(rds_wq, &cp->cp_conn_w,
0155                    rand % cp->cp_reconnect_jiffies);
0156     rcu_read_unlock();
0157 
0158     cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2,
0159                     rds_sysctl_reconnect_max_jiffies);
0160 }
0161 
0162 void rds_connect_worker(struct work_struct *work)
0163 {
0164     struct rds_conn_path *cp = container_of(work,
0165                         struct rds_conn_path,
0166                         cp_conn_w.work);
0167     struct rds_connection *conn = cp->cp_conn;
0168     int ret;
0169 
0170     if (cp->cp_index > 0 &&
0171         rds_addr_cmp(&cp->cp_conn->c_laddr, &cp->cp_conn->c_faddr) >= 0)
0172         return;
0173     clear_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
0174     ret = rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_CONNECTING);
0175     if (ret) {
0176         ret = conn->c_trans->conn_path_connect(cp);
0177         rdsdebug("conn %p for %pI6c to %pI6c dispatched, ret %d\n",
0178              conn, &conn->c_laddr, &conn->c_faddr, ret);
0179 
0180         if (ret) {
0181             if (rds_conn_path_transition(cp,
0182                              RDS_CONN_CONNECTING,
0183                              RDS_CONN_DOWN))
0184                 rds_queue_reconnect(cp);
0185             else
0186                 rds_conn_path_error(cp, "connect failed\n");
0187         }
0188     }
0189 }
0190 
0191 void rds_send_worker(struct work_struct *work)
0192 {
0193     struct rds_conn_path *cp = container_of(work,
0194                         struct rds_conn_path,
0195                         cp_send_w.work);
0196     int ret;
0197 
0198     if (rds_conn_path_state(cp) == RDS_CONN_UP) {
0199         clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags);
0200         ret = rds_send_xmit(cp);
0201         cond_resched();
0202         rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
0203         switch (ret) {
0204         case -EAGAIN:
0205             rds_stats_inc(s_send_immediate_retry);
0206             queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
0207             break;
0208         case -ENOMEM:
0209             rds_stats_inc(s_send_delayed_retry);
0210             queue_delayed_work(rds_wq, &cp->cp_send_w, 2);
0211             break;
0212         default:
0213             break;
0214         }
0215     }
0216 }
0217 
0218 void rds_recv_worker(struct work_struct *work)
0219 {
0220     struct rds_conn_path *cp = container_of(work,
0221                         struct rds_conn_path,
0222                         cp_recv_w.work);
0223     int ret;
0224 
0225     if (rds_conn_path_state(cp) == RDS_CONN_UP) {
0226         ret = cp->cp_conn->c_trans->recv_path(cp);
0227         rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
0228         switch (ret) {
0229         case -EAGAIN:
0230             rds_stats_inc(s_recv_immediate_retry);
0231             queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
0232             break;
0233         case -ENOMEM:
0234             rds_stats_inc(s_recv_delayed_retry);
0235             queue_delayed_work(rds_wq, &cp->cp_recv_w, 2);
0236             break;
0237         default:
0238             break;
0239         }
0240     }
0241 }
0242 
0243 void rds_shutdown_worker(struct work_struct *work)
0244 {
0245     struct rds_conn_path *cp = container_of(work,
0246                         struct rds_conn_path,
0247                         cp_down_w);
0248 
0249     rds_conn_shutdown(cp);
0250 }
0251 
0252 void rds_threads_exit(void)
0253 {
0254     destroy_workqueue(rds_wq);
0255 }
0256 
0257 int rds_threads_init(void)
0258 {
0259     rds_wq = create_singlethread_workqueue("krdsd");
0260     if (!rds_wq)
0261         return -ENOMEM;
0262 
0263     return 0;
0264 }
0265 
0266 /* Compare two IPv6 addresses.  Return 0 if the two addresses are equal.
0267  * Return 1 if the first is greater.  Return -1 if the second is greater.
0268  */
0269 int rds_addr_cmp(const struct in6_addr *addr1,
0270          const struct in6_addr *addr2)
0271 {
0272 #if defined(CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS) && BITS_PER_LONG == 64
0273     const __be64 *a1, *a2;
0274     u64 x, y;
0275 
0276     a1 = (__be64 *)addr1;
0277     a2 = (__be64 *)addr2;
0278 
0279     if (*a1 != *a2) {
0280         if (be64_to_cpu(*a1) < be64_to_cpu(*a2))
0281             return -1;
0282         else
0283             return 1;
0284     } else {
0285         x = be64_to_cpu(*++a1);
0286         y = be64_to_cpu(*++a2);
0287         if (x < y)
0288             return -1;
0289         else if (x > y)
0290             return 1;
0291         else
0292             return 0;
0293     }
0294 #else
0295     u32 a, b;
0296     int i;
0297 
0298     for (i = 0; i < 4; i++) {
0299         if (addr1->s6_addr32[i] != addr2->s6_addr32[i]) {
0300             a = ntohl(addr1->s6_addr32[i]);
0301             b = ntohl(addr2->s6_addr32[i]);
0302             if (a < b)
0303                 return -1;
0304             else if (a > b)
0305                 return 1;
0306         }
0307     }
0308     return 0;
0309 #endif
0310 }
0311 EXPORT_SYMBOL_GPL(rds_addr_cmp);