0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033 #include <linux/kernel.h>
0034 #include <linux/in.h>
0035 #include <net/tcp.h>
0036
0037 #include "rds_single_path.h"
0038 #include "rds.h"
0039 #include "tcp.h"
0040
0041 void rds_tcp_xmit_path_prepare(struct rds_conn_path *cp)
0042 {
0043 struct rds_tcp_connection *tc = cp->cp_transport_data;
0044
0045 tcp_sock_set_cork(tc->t_sock->sk, true);
0046 }
0047
0048 void rds_tcp_xmit_path_complete(struct rds_conn_path *cp)
0049 {
0050 struct rds_tcp_connection *tc = cp->cp_transport_data;
0051
0052 tcp_sock_set_cork(tc->t_sock->sk, false);
0053 }
0054
0055
0056 static int rds_tcp_sendmsg(struct socket *sock, void *data, unsigned int len)
0057 {
0058 struct kvec vec = {
0059 .iov_base = data,
0060 .iov_len = len,
0061 };
0062 struct msghdr msg = {
0063 .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL,
0064 };
0065
0066 return kernel_sendmsg(sock, &msg, &vec, 1, vec.iov_len);
0067 }
0068
0069
0070 int rds_tcp_xmit(struct rds_connection *conn, struct rds_message *rm,
0071 unsigned int hdr_off, unsigned int sg, unsigned int off)
0072 {
0073 struct rds_conn_path *cp = rm->m_inc.i_conn_path;
0074 struct rds_tcp_connection *tc = cp->cp_transport_data;
0075 int done = 0;
0076 int ret = 0;
0077 int more;
0078
0079 if (hdr_off == 0) {
0080
0081
0082
0083
0084 tc->t_last_sent_nxt = rds_tcp_write_seq(tc);
0085 rm->m_ack_seq = tc->t_last_sent_nxt +
0086 sizeof(struct rds_header) +
0087 be32_to_cpu(rm->m_inc.i_hdr.h_len) - 1;
0088 smp_mb__before_atomic();
0089 set_bit(RDS_MSG_HAS_ACK_SEQ, &rm->m_flags);
0090 tc->t_last_expected_una = rm->m_ack_seq + 1;
0091
0092 if (test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))
0093 rm->m_inc.i_hdr.h_flags |= RDS_FLAG_RETRANSMITTED;
0094
0095 rdsdebug("rm %p tcp nxt %u ack_seq %llu\n",
0096 rm, rds_tcp_write_seq(tc),
0097 (unsigned long long)rm->m_ack_seq);
0098 }
0099
0100 if (hdr_off < sizeof(struct rds_header)) {
0101
0102 set_bit(SOCK_NOSPACE, &tc->t_sock->sk->sk_socket->flags);
0103
0104 ret = rds_tcp_sendmsg(tc->t_sock,
0105 (void *)&rm->m_inc.i_hdr + hdr_off,
0106 sizeof(rm->m_inc.i_hdr) - hdr_off);
0107 if (ret < 0)
0108 goto out;
0109 done += ret;
0110 if (hdr_off + done != sizeof(struct rds_header))
0111 goto out;
0112 }
0113
0114 more = rm->data.op_nents > 1 ? (MSG_MORE | MSG_SENDPAGE_NOTLAST) : 0;
0115 while (sg < rm->data.op_nents) {
0116 int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more;
0117
0118 ret = tc->t_sock->ops->sendpage(tc->t_sock,
0119 sg_page(&rm->data.op_sg[sg]),
0120 rm->data.op_sg[sg].offset + off,
0121 rm->data.op_sg[sg].length - off,
0122 flags);
0123 rdsdebug("tcp sendpage %p:%u:%u ret %d\n", (void *)sg_page(&rm->data.op_sg[sg]),
0124 rm->data.op_sg[sg].offset + off, rm->data.op_sg[sg].length - off,
0125 ret);
0126 if (ret <= 0)
0127 break;
0128
0129 off += ret;
0130 done += ret;
0131 if (off == rm->data.op_sg[sg].length) {
0132 off = 0;
0133 sg++;
0134 }
0135 if (sg == rm->data.op_nents - 1)
0136 more = 0;
0137 }
0138
0139 out:
0140 if (ret <= 0) {
0141
0142 if (ret == -EAGAIN) {
0143 rds_tcp_stats_inc(s_tcp_sndbuf_full);
0144 ret = 0;
0145 } else {
0146
0147
0148
0149
0150 if (rds_conn_path_up(cp)) {
0151 pr_warn("RDS/tcp: send to %pI6c on cp [%d]"
0152 "returned %d, "
0153 "disconnecting and reconnecting\n",
0154 &conn->c_faddr, cp->cp_index, ret);
0155 rds_conn_path_drop(cp, false);
0156 }
0157 }
0158 }
0159 if (done == 0)
0160 done = ret;
0161 return done;
0162 }
0163
0164
0165
0166
0167
0168
0169
0170
0171 static int rds_tcp_is_acked(struct rds_message *rm, uint64_t ack)
0172 {
0173 if (!test_bit(RDS_MSG_HAS_ACK_SEQ, &rm->m_flags))
0174 return 0;
0175 return (__s32)((u32)rm->m_ack_seq - (u32)ack) < 0;
0176 }
0177
0178 void rds_tcp_write_space(struct sock *sk)
0179 {
0180 void (*write_space)(struct sock *sk);
0181 struct rds_conn_path *cp;
0182 struct rds_tcp_connection *tc;
0183
0184 read_lock_bh(&sk->sk_callback_lock);
0185 cp = sk->sk_user_data;
0186 if (!cp) {
0187 write_space = sk->sk_write_space;
0188 goto out;
0189 }
0190
0191 tc = cp->cp_transport_data;
0192 rdsdebug("write_space for tc %p\n", tc);
0193 write_space = tc->t_orig_write_space;
0194 rds_tcp_stats_inc(s_tcp_write_space_calls);
0195
0196 rdsdebug("tcp una %u\n", rds_tcp_snd_una(tc));
0197 tc->t_last_seen_una = rds_tcp_snd_una(tc);
0198 rds_send_path_drop_acked(cp, rds_tcp_snd_una(tc), rds_tcp_is_acked);
0199
0200 rcu_read_lock();
0201 if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf &&
0202 !rds_destroy_pending(cp->cp_conn))
0203 queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
0204 rcu_read_unlock();
0205
0206 out:
0207 read_unlock_bh(&sk->sk_callback_lock);
0208
0209
0210
0211
0212
0213
0214
0215
0216
0217
0218
0219
0220
0221 write_space(sk);
0222
0223 if (sk->sk_socket)
0224 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
0225 }