0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033 #include <linux/kernel.h>
0034 #include <linux/random.h>
0035 #include <linux/export.h>
0036
0037 #include "rds.h"
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071 struct workqueue_struct *rds_wq;
0072 EXPORT_SYMBOL_GPL(rds_wq);
0073
0074 void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
0075 {
0076 if (!rds_conn_path_transition(cp, curr, RDS_CONN_UP)) {
0077 printk(KERN_WARNING "%s: Cannot transition to state UP, "
0078 "current state is %d\n",
0079 __func__,
0080 atomic_read(&cp->cp_state));
0081 rds_conn_path_drop(cp, false);
0082 return;
0083 }
0084
0085 rdsdebug("conn %p for %pI6c to %pI6c complete\n",
0086 cp->cp_conn, &cp->cp_conn->c_laddr, &cp->cp_conn->c_faddr);
0087
0088 cp->cp_reconnect_jiffies = 0;
0089 set_bit(0, &cp->cp_conn->c_map_queued);
0090 rcu_read_lock();
0091 if (!rds_destroy_pending(cp->cp_conn)) {
0092 queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
0093 queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
0094 }
0095 rcu_read_unlock();
0096 cp->cp_conn->c_proposed_version = RDS_PROTOCOL_VERSION;
0097 }
0098 EXPORT_SYMBOL_GPL(rds_connect_path_complete);
0099
0100 void rds_connect_complete(struct rds_connection *conn)
0101 {
0102 rds_connect_path_complete(&conn->c_path[0], RDS_CONN_CONNECTING);
0103 }
0104 EXPORT_SYMBOL_GPL(rds_connect_complete);
0105
0106
0107
0108
0109
0110
0111
0112
0113
0114
0115
0116
0117
0118
0119
0120
0121
0122
0123
0124 void rds_queue_reconnect(struct rds_conn_path *cp)
0125 {
0126 unsigned long rand;
0127 struct rds_connection *conn = cp->cp_conn;
0128
0129 rdsdebug("conn %p for %pI6c to %pI6c reconnect jiffies %lu\n",
0130 conn, &conn->c_laddr, &conn->c_faddr,
0131 cp->cp_reconnect_jiffies);
0132
0133
0134 if (conn->c_trans->t_type == RDS_TRANS_TCP &&
0135 rds_addr_cmp(&conn->c_laddr, &conn->c_faddr) >= 0)
0136 return;
0137
0138 set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
0139 if (cp->cp_reconnect_jiffies == 0) {
0140 cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
0141 rcu_read_lock();
0142 if (!rds_destroy_pending(cp->cp_conn))
0143 queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
0144 rcu_read_unlock();
0145 return;
0146 }
0147
0148 get_random_bytes(&rand, sizeof(rand));
0149 rdsdebug("%lu delay %lu ceil conn %p for %pI6c -> %pI6c\n",
0150 rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies,
0151 conn, &conn->c_laddr, &conn->c_faddr);
0152 rcu_read_lock();
0153 if (!rds_destroy_pending(cp->cp_conn))
0154 queue_delayed_work(rds_wq, &cp->cp_conn_w,
0155 rand % cp->cp_reconnect_jiffies);
0156 rcu_read_unlock();
0157
0158 cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2,
0159 rds_sysctl_reconnect_max_jiffies);
0160 }
0161
0162 void rds_connect_worker(struct work_struct *work)
0163 {
0164 struct rds_conn_path *cp = container_of(work,
0165 struct rds_conn_path,
0166 cp_conn_w.work);
0167 struct rds_connection *conn = cp->cp_conn;
0168 int ret;
0169
0170 if (cp->cp_index > 0 &&
0171 rds_addr_cmp(&cp->cp_conn->c_laddr, &cp->cp_conn->c_faddr) >= 0)
0172 return;
0173 clear_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
0174 ret = rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_CONNECTING);
0175 if (ret) {
0176 ret = conn->c_trans->conn_path_connect(cp);
0177 rdsdebug("conn %p for %pI6c to %pI6c dispatched, ret %d\n",
0178 conn, &conn->c_laddr, &conn->c_faddr, ret);
0179
0180 if (ret) {
0181 if (rds_conn_path_transition(cp,
0182 RDS_CONN_CONNECTING,
0183 RDS_CONN_DOWN))
0184 rds_queue_reconnect(cp);
0185 else
0186 rds_conn_path_error(cp, "connect failed\n");
0187 }
0188 }
0189 }
0190
0191 void rds_send_worker(struct work_struct *work)
0192 {
0193 struct rds_conn_path *cp = container_of(work,
0194 struct rds_conn_path,
0195 cp_send_w.work);
0196 int ret;
0197
0198 if (rds_conn_path_state(cp) == RDS_CONN_UP) {
0199 clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags);
0200 ret = rds_send_xmit(cp);
0201 cond_resched();
0202 rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
0203 switch (ret) {
0204 case -EAGAIN:
0205 rds_stats_inc(s_send_immediate_retry);
0206 queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
0207 break;
0208 case -ENOMEM:
0209 rds_stats_inc(s_send_delayed_retry);
0210 queue_delayed_work(rds_wq, &cp->cp_send_w, 2);
0211 break;
0212 default:
0213 break;
0214 }
0215 }
0216 }
0217
0218 void rds_recv_worker(struct work_struct *work)
0219 {
0220 struct rds_conn_path *cp = container_of(work,
0221 struct rds_conn_path,
0222 cp_recv_w.work);
0223 int ret;
0224
0225 if (rds_conn_path_state(cp) == RDS_CONN_UP) {
0226 ret = cp->cp_conn->c_trans->recv_path(cp);
0227 rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
0228 switch (ret) {
0229 case -EAGAIN:
0230 rds_stats_inc(s_recv_immediate_retry);
0231 queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
0232 break;
0233 case -ENOMEM:
0234 rds_stats_inc(s_recv_delayed_retry);
0235 queue_delayed_work(rds_wq, &cp->cp_recv_w, 2);
0236 break;
0237 default:
0238 break;
0239 }
0240 }
0241 }
0242
0243 void rds_shutdown_worker(struct work_struct *work)
0244 {
0245 struct rds_conn_path *cp = container_of(work,
0246 struct rds_conn_path,
0247 cp_down_w);
0248
0249 rds_conn_shutdown(cp);
0250 }
0251
0252 void rds_threads_exit(void)
0253 {
0254 destroy_workqueue(rds_wq);
0255 }
0256
0257 int rds_threads_init(void)
0258 {
0259 rds_wq = create_singlethread_workqueue("krdsd");
0260 if (!rds_wq)
0261 return -ENOMEM;
0262
0263 return 0;
0264 }
0265
0266
0267
0268
0269 int rds_addr_cmp(const struct in6_addr *addr1,
0270 const struct in6_addr *addr2)
0271 {
0272 #if defined(CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS) && BITS_PER_LONG == 64
0273 const __be64 *a1, *a2;
0274 u64 x, y;
0275
0276 a1 = (__be64 *)addr1;
0277 a2 = (__be64 *)addr2;
0278
0279 if (*a1 != *a2) {
0280 if (be64_to_cpu(*a1) < be64_to_cpu(*a2))
0281 return -1;
0282 else
0283 return 1;
0284 } else {
0285 x = be64_to_cpu(*++a1);
0286 y = be64_to_cpu(*++a2);
0287 if (x < y)
0288 return -1;
0289 else if (x > y)
0290 return 1;
0291 else
0292 return 0;
0293 }
0294 #else
0295 u32 a, b;
0296 int i;
0297
0298 for (i = 0; i < 4; i++) {
0299 if (addr1->s6_addr32[i] != addr2->s6_addr32[i]) {
0300 a = ntohl(addr1->s6_addr32[i]);
0301 b = ntohl(addr2->s6_addr32[i]);
0302 if (a < b)
0303 return -1;
0304 else if (a > b)
0305 return 1;
0306 }
0307 }
0308 return 0;
0309 #endif
0310 }
0311 EXPORT_SYMBOL_GPL(rds_addr_cmp);