0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033 #include <linux/kernel.h>
0034 #include <linux/slab.h>
0035 #include <net/tcp.h>
0036
0037 #include "rds.h"
0038 #include "tcp.h"
0039
0040 static struct kmem_cache *rds_tcp_incoming_slab;
0041
0042 static void rds_tcp_inc_purge(struct rds_incoming *inc)
0043 {
0044 struct rds_tcp_incoming *tinc;
0045 tinc = container_of(inc, struct rds_tcp_incoming, ti_inc);
0046 rdsdebug("purging tinc %p inc %p\n", tinc, inc);
0047 skb_queue_purge(&tinc->ti_skb_list);
0048 }
0049
0050 void rds_tcp_inc_free(struct rds_incoming *inc)
0051 {
0052 struct rds_tcp_incoming *tinc;
0053 tinc = container_of(inc, struct rds_tcp_incoming, ti_inc);
0054 rds_tcp_inc_purge(inc);
0055 rdsdebug("freeing tinc %p inc %p\n", tinc, inc);
0056 kmem_cache_free(rds_tcp_incoming_slab, tinc);
0057 }
0058
0059
0060
0061
0062 int rds_tcp_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to)
0063 {
0064 struct rds_tcp_incoming *tinc;
0065 struct sk_buff *skb;
0066 int ret = 0;
0067
0068 if (!iov_iter_count(to))
0069 goto out;
0070
0071 tinc = container_of(inc, struct rds_tcp_incoming, ti_inc);
0072
0073 skb_queue_walk(&tinc->ti_skb_list, skb) {
0074 unsigned long to_copy, skb_off;
0075 for (skb_off = 0; skb_off < skb->len; skb_off += to_copy) {
0076 to_copy = iov_iter_count(to);
0077 to_copy = min(to_copy, skb->len - skb_off);
0078
0079 if (skb_copy_datagram_iter(skb, skb_off, to, to_copy))
0080 return -EFAULT;
0081
0082 rds_stats_add(s_copy_to_user, to_copy);
0083 ret += to_copy;
0084
0085 if (!iov_iter_count(to))
0086 goto out;
0087 }
0088 }
0089 out:
0090 return ret;
0091 }
0092
0093
0094
0095
0096
0097
0098
0099
0100
0101
0102
0103
0104
0105 static void rds_tcp_cong_recv(struct rds_connection *conn,
0106 struct rds_tcp_incoming *tinc)
0107 {
0108 struct sk_buff *skb;
0109 unsigned int to_copy, skb_off;
0110 unsigned int map_off;
0111 unsigned int map_page;
0112 struct rds_cong_map *map;
0113 int ret;
0114
0115
0116 if (be32_to_cpu(tinc->ti_inc.i_hdr.h_len) != RDS_CONG_MAP_BYTES)
0117 return;
0118
0119 map_page = 0;
0120 map_off = 0;
0121 map = conn->c_fcong;
0122
0123 skb_queue_walk(&tinc->ti_skb_list, skb) {
0124 skb_off = 0;
0125 while (skb_off < skb->len) {
0126 to_copy = min_t(unsigned int, PAGE_SIZE - map_off,
0127 skb->len - skb_off);
0128
0129 BUG_ON(map_page >= RDS_CONG_MAP_PAGES);
0130
0131
0132 ret = skb_copy_bits(skb, skb_off,
0133 (void *)map->m_page_addrs[map_page] + map_off,
0134 to_copy);
0135 BUG_ON(ret != 0);
0136
0137 skb_off += to_copy;
0138 map_off += to_copy;
0139 if (map_off == PAGE_SIZE) {
0140 map_off = 0;
0141 map_page++;
0142 }
0143 }
0144 }
0145
0146 rds_cong_map_updated(map, ~(u64) 0);
0147 }
0148
0149 struct rds_tcp_desc_arg {
0150 struct rds_conn_path *conn_path;
0151 gfp_t gfp;
0152 };
0153
0154 static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb,
0155 unsigned int offset, size_t len)
0156 {
0157 struct rds_tcp_desc_arg *arg = desc->arg.data;
0158 struct rds_conn_path *cp = arg->conn_path;
0159 struct rds_tcp_connection *tc = cp->cp_transport_data;
0160 struct rds_tcp_incoming *tinc = tc->t_tinc;
0161 struct sk_buff *clone;
0162 size_t left = len, to_copy;
0163
0164 rdsdebug("tcp data tc %p skb %p offset %u len %zu\n", tc, skb, offset,
0165 len);
0166
0167
0168
0169
0170
0171 while (left) {
0172 if (!tinc) {
0173 tinc = kmem_cache_alloc(rds_tcp_incoming_slab,
0174 arg->gfp);
0175 if (!tinc) {
0176 desc->error = -ENOMEM;
0177 goto out;
0178 }
0179 tc->t_tinc = tinc;
0180 rdsdebug("allocated tinc %p\n", tinc);
0181 rds_inc_path_init(&tinc->ti_inc, cp,
0182 &cp->cp_conn->c_faddr);
0183 tinc->ti_inc.i_rx_lat_trace[RDS_MSG_RX_HDR] =
0184 local_clock();
0185
0186
0187
0188
0189
0190 skb_queue_head_init(&tinc->ti_skb_list);
0191 }
0192
0193 if (left && tc->t_tinc_hdr_rem) {
0194 to_copy = min(tc->t_tinc_hdr_rem, left);
0195 rdsdebug("copying %zu header from skb %p\n", to_copy,
0196 skb);
0197 skb_copy_bits(skb, offset,
0198 (char *)&tinc->ti_inc.i_hdr +
0199 sizeof(struct rds_header) -
0200 tc->t_tinc_hdr_rem,
0201 to_copy);
0202 tc->t_tinc_hdr_rem -= to_copy;
0203 left -= to_copy;
0204 offset += to_copy;
0205
0206 if (tc->t_tinc_hdr_rem == 0) {
0207
0208 tc->t_tinc_data_rem =
0209 be32_to_cpu(tinc->ti_inc.i_hdr.h_len);
0210 tinc->ti_inc.i_rx_lat_trace[RDS_MSG_RX_START] =
0211 local_clock();
0212 }
0213 }
0214
0215 if (left && tc->t_tinc_data_rem) {
0216 to_copy = min(tc->t_tinc_data_rem, left);
0217
0218 clone = pskb_extract(skb, offset, to_copy, arg->gfp);
0219 if (!clone) {
0220 desc->error = -ENOMEM;
0221 goto out;
0222 }
0223
0224 skb_queue_tail(&tinc->ti_skb_list, clone);
0225
0226 rdsdebug("skb %p data %p len %d off %u to_copy %zu -> "
0227 "clone %p data %p len %d\n",
0228 skb, skb->data, skb->len, offset, to_copy,
0229 clone, clone->data, clone->len);
0230
0231 tc->t_tinc_data_rem -= to_copy;
0232 left -= to_copy;
0233 offset += to_copy;
0234 }
0235
0236 if (tc->t_tinc_hdr_rem == 0 && tc->t_tinc_data_rem == 0) {
0237 struct rds_connection *conn = cp->cp_conn;
0238
0239 if (tinc->ti_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP)
0240 rds_tcp_cong_recv(conn, tinc);
0241 else
0242 rds_recv_incoming(conn, &conn->c_faddr,
0243 &conn->c_laddr,
0244 &tinc->ti_inc,
0245 arg->gfp);
0246
0247 tc->t_tinc_hdr_rem = sizeof(struct rds_header);
0248 tc->t_tinc_data_rem = 0;
0249 tc->t_tinc = NULL;
0250 rds_inc_put(&tinc->ti_inc);
0251 tinc = NULL;
0252 }
0253 }
0254 out:
0255 rdsdebug("returning len %zu left %zu skb len %d rx queue depth %d\n",
0256 len, left, skb->len,
0257 skb_queue_len(&tc->t_sock->sk->sk_receive_queue));
0258 return len - left;
0259 }
0260
0261
0262 static int rds_tcp_read_sock(struct rds_conn_path *cp, gfp_t gfp)
0263 {
0264 struct rds_tcp_connection *tc = cp->cp_transport_data;
0265 struct socket *sock = tc->t_sock;
0266 read_descriptor_t desc;
0267 struct rds_tcp_desc_arg arg;
0268
0269
0270 arg.conn_path = cp;
0271 arg.gfp = gfp;
0272 desc.arg.data = &arg;
0273 desc.error = 0;
0274 desc.count = 1;
0275
0276 tcp_read_sock(sock->sk, &desc, rds_tcp_data_recv);
0277 rdsdebug("tcp_read_sock for tc %p gfp 0x%x returned %d\n", tc, gfp,
0278 desc.error);
0279
0280 return desc.error;
0281 }
0282
0283
0284
0285
0286
0287
0288
0289
0290 int rds_tcp_recv_path(struct rds_conn_path *cp)
0291 {
0292 struct rds_tcp_connection *tc = cp->cp_transport_data;
0293 struct socket *sock = tc->t_sock;
0294 int ret = 0;
0295
0296 rdsdebug("recv worker path [%d] tc %p sock %p\n",
0297 cp->cp_index, tc, sock);
0298
0299 lock_sock(sock->sk);
0300 ret = rds_tcp_read_sock(cp, GFP_KERNEL);
0301 release_sock(sock->sk);
0302
0303 return ret;
0304 }
0305
0306 void rds_tcp_data_ready(struct sock *sk)
0307 {
0308 void (*ready)(struct sock *sk);
0309 struct rds_conn_path *cp;
0310 struct rds_tcp_connection *tc;
0311
0312 rdsdebug("data ready sk %p\n", sk);
0313
0314 read_lock_bh(&sk->sk_callback_lock);
0315 cp = sk->sk_user_data;
0316 if (!cp) {
0317 ready = sk->sk_data_ready;
0318 goto out;
0319 }
0320
0321 tc = cp->cp_transport_data;
0322 ready = tc->t_orig_data_ready;
0323 rds_tcp_stats_inc(s_tcp_data_ready_calls);
0324
0325 if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) {
0326 rcu_read_lock();
0327 if (!rds_destroy_pending(cp->cp_conn))
0328 queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
0329 rcu_read_unlock();
0330 }
0331 out:
0332 read_unlock_bh(&sk->sk_callback_lock);
0333 ready(sk);
0334 }
0335
0336 int rds_tcp_recv_init(void)
0337 {
0338 rds_tcp_incoming_slab = kmem_cache_create("rds_tcp_incoming",
0339 sizeof(struct rds_tcp_incoming),
0340 0, 0, NULL);
0341 if (!rds_tcp_incoming_slab)
0342 return -ENOMEM;
0343 return 0;
0344 }
0345
0346 void rds_tcp_recv_exit(void)
0347 {
0348 kmem_cache_destroy(rds_tcp_incoming_slab);
0349 }