Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Copyright (c) 2006, 2017 Oracle and/or its affiliates. All rights reserved.
0003  *
0004  * This software is available to you under a choice of one of two
0005  * licenses.  You may choose to be licensed under the terms of the GNU
0006  * General Public License (GPL) Version 2, available from the file
0007  * COPYING in the main directory of this source tree, or the
0008  * OpenIB.org BSD license below:
0009  *
0010  *     Redistribution and use in source and binary forms, with or
0011  *     without modification, are permitted provided that the following
0012  *     conditions are met:
0013  *
0014  *      - Redistributions of source code must retain the above
0015  *        copyright notice, this list of conditions and the following
0016  *        disclaimer.
0017  *
0018  *      - Redistributions in binary form must reproduce the above
0019  *        copyright notice, this list of conditions and the following
0020  *        disclaimer in the documentation and/or other materials
0021  *        provided with the distribution.
0022  *
0023  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
0024  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
0025  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
0026  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
0027  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
0028  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
0029  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
0030  * SOFTWARE.
0031  *
0032  */
0033 #include <linux/kernel.h>
0034 #include <linux/slab.h>
0035 #include <net/tcp.h>
0036 
0037 #include "rds.h"
0038 #include "tcp.h"
0039 
0040 static struct kmem_cache *rds_tcp_incoming_slab;
0041 
0042 static void rds_tcp_inc_purge(struct rds_incoming *inc)
0043 {
0044     struct rds_tcp_incoming *tinc;
0045     tinc = container_of(inc, struct rds_tcp_incoming, ti_inc);
0046     rdsdebug("purging tinc %p inc %p\n", tinc, inc);
0047     skb_queue_purge(&tinc->ti_skb_list);
0048 }
0049 
0050 void rds_tcp_inc_free(struct rds_incoming *inc)
0051 {
0052     struct rds_tcp_incoming *tinc;
0053     tinc = container_of(inc, struct rds_tcp_incoming, ti_inc);
0054     rds_tcp_inc_purge(inc);
0055     rdsdebug("freeing tinc %p inc %p\n", tinc, inc);
0056     kmem_cache_free(rds_tcp_incoming_slab, tinc);
0057 }
0058 
0059 /*
0060  * this is pretty lame, but, whatever.
0061  */
0062 int rds_tcp_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to)
0063 {
0064     struct rds_tcp_incoming *tinc;
0065     struct sk_buff *skb;
0066     int ret = 0;
0067 
0068     if (!iov_iter_count(to))
0069         goto out;
0070 
0071     tinc = container_of(inc, struct rds_tcp_incoming, ti_inc);
0072 
0073     skb_queue_walk(&tinc->ti_skb_list, skb) {
0074         unsigned long to_copy, skb_off;
0075         for (skb_off = 0; skb_off < skb->len; skb_off += to_copy) {
0076             to_copy = iov_iter_count(to);
0077             to_copy = min(to_copy, skb->len - skb_off);
0078 
0079             if (skb_copy_datagram_iter(skb, skb_off, to, to_copy))
0080                 return -EFAULT;
0081 
0082             rds_stats_add(s_copy_to_user, to_copy);
0083             ret += to_copy;
0084 
0085             if (!iov_iter_count(to))
0086                 goto out;
0087         }
0088     }
0089 out:
0090     return ret;
0091 }
0092 
0093 /*
0094  * We have a series of skbs that have fragmented pieces of the congestion
0095  * bitmap.  They must add up to the exact size of the congestion bitmap.  We
0096  * use the skb helpers to copy those into the pages that make up the in-memory
0097  * congestion bitmap for the remote address of this connection.  We then tell
0098  * the congestion core that the bitmap has been changed so that it can wake up
0099  * sleepers.
0100  *
0101  * This is racing with sending paths which are using test_bit to see if the
0102  * bitmap indicates that their recipient is congested.
0103  */
0104 
0105 static void rds_tcp_cong_recv(struct rds_connection *conn,
0106                   struct rds_tcp_incoming *tinc)
0107 {
0108     struct sk_buff *skb;
0109     unsigned int to_copy, skb_off;
0110     unsigned int map_off;
0111     unsigned int map_page;
0112     struct rds_cong_map *map;
0113     int ret;
0114 
0115     /* catch completely corrupt packets */
0116     if (be32_to_cpu(tinc->ti_inc.i_hdr.h_len) != RDS_CONG_MAP_BYTES)
0117         return;
0118 
0119     map_page = 0;
0120     map_off = 0;
0121     map = conn->c_fcong;
0122 
0123     skb_queue_walk(&tinc->ti_skb_list, skb) {
0124         skb_off = 0;
0125         while (skb_off < skb->len) {
0126             to_copy = min_t(unsigned int, PAGE_SIZE - map_off,
0127                     skb->len - skb_off);
0128 
0129             BUG_ON(map_page >= RDS_CONG_MAP_PAGES);
0130 
0131             /* only returns 0 or -error */
0132             ret = skb_copy_bits(skb, skb_off,
0133                 (void *)map->m_page_addrs[map_page] + map_off,
0134                 to_copy);
0135             BUG_ON(ret != 0);
0136 
0137             skb_off += to_copy;
0138             map_off += to_copy;
0139             if (map_off == PAGE_SIZE) {
0140                 map_off = 0;
0141                 map_page++;
0142             }
0143         }
0144     }
0145 
0146     rds_cong_map_updated(map, ~(u64) 0);
0147 }
0148 
0149 struct rds_tcp_desc_arg {
0150     struct rds_conn_path *conn_path;
0151     gfp_t gfp;
0152 };
0153 
0154 static int rds_tcp_data_recv(read_descriptor_t *desc, struct sk_buff *skb,
0155                  unsigned int offset, size_t len)
0156 {
0157     struct rds_tcp_desc_arg *arg = desc->arg.data;
0158     struct rds_conn_path *cp = arg->conn_path;
0159     struct rds_tcp_connection *tc = cp->cp_transport_data;
0160     struct rds_tcp_incoming *tinc = tc->t_tinc;
0161     struct sk_buff *clone;
0162     size_t left = len, to_copy;
0163 
0164     rdsdebug("tcp data tc %p skb %p offset %u len %zu\n", tc, skb, offset,
0165          len);
0166 
0167     /*
0168      * tcp_read_sock() interprets partial progress as an indication to stop
0169      * processing.
0170      */
0171     while (left) {
0172         if (!tinc) {
0173             tinc = kmem_cache_alloc(rds_tcp_incoming_slab,
0174                         arg->gfp);
0175             if (!tinc) {
0176                 desc->error = -ENOMEM;
0177                 goto out;
0178             }
0179             tc->t_tinc = tinc;
0180             rdsdebug("allocated tinc %p\n", tinc);
0181             rds_inc_path_init(&tinc->ti_inc, cp,
0182                       &cp->cp_conn->c_faddr);
0183             tinc->ti_inc.i_rx_lat_trace[RDS_MSG_RX_HDR] =
0184                     local_clock();
0185 
0186             /*
0187              * XXX * we might be able to use the __ variants when
0188              * we've already serialized at a higher level.
0189              */
0190             skb_queue_head_init(&tinc->ti_skb_list);
0191         }
0192 
0193         if (left && tc->t_tinc_hdr_rem) {
0194             to_copy = min(tc->t_tinc_hdr_rem, left);
0195             rdsdebug("copying %zu header from skb %p\n", to_copy,
0196                  skb);
0197             skb_copy_bits(skb, offset,
0198                       (char *)&tinc->ti_inc.i_hdr +
0199                         sizeof(struct rds_header) -
0200                         tc->t_tinc_hdr_rem,
0201                       to_copy);
0202             tc->t_tinc_hdr_rem -= to_copy;
0203             left -= to_copy;
0204             offset += to_copy;
0205 
0206             if (tc->t_tinc_hdr_rem == 0) {
0207                 /* could be 0 for a 0 len message */
0208                 tc->t_tinc_data_rem =
0209                     be32_to_cpu(tinc->ti_inc.i_hdr.h_len);
0210                 tinc->ti_inc.i_rx_lat_trace[RDS_MSG_RX_START] =
0211                     local_clock();
0212             }
0213         }
0214 
0215         if (left && tc->t_tinc_data_rem) {
0216             to_copy = min(tc->t_tinc_data_rem, left);
0217 
0218             clone = pskb_extract(skb, offset, to_copy, arg->gfp);
0219             if (!clone) {
0220                 desc->error = -ENOMEM;
0221                 goto out;
0222             }
0223 
0224             skb_queue_tail(&tinc->ti_skb_list, clone);
0225 
0226             rdsdebug("skb %p data %p len %d off %u to_copy %zu -> "
0227                  "clone %p data %p len %d\n",
0228                  skb, skb->data, skb->len, offset, to_copy,
0229                  clone, clone->data, clone->len);
0230 
0231             tc->t_tinc_data_rem -= to_copy;
0232             left -= to_copy;
0233             offset += to_copy;
0234         }
0235 
0236         if (tc->t_tinc_hdr_rem == 0 && tc->t_tinc_data_rem == 0) {
0237             struct rds_connection *conn = cp->cp_conn;
0238 
0239             if (tinc->ti_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP)
0240                 rds_tcp_cong_recv(conn, tinc);
0241             else
0242                 rds_recv_incoming(conn, &conn->c_faddr,
0243                           &conn->c_laddr,
0244                           &tinc->ti_inc,
0245                           arg->gfp);
0246 
0247             tc->t_tinc_hdr_rem = sizeof(struct rds_header);
0248             tc->t_tinc_data_rem = 0;
0249             tc->t_tinc = NULL;
0250             rds_inc_put(&tinc->ti_inc);
0251             tinc = NULL;
0252         }
0253     }
0254 out:
0255     rdsdebug("returning len %zu left %zu skb len %d rx queue depth %d\n",
0256          len, left, skb->len,
0257          skb_queue_len(&tc->t_sock->sk->sk_receive_queue));
0258     return len - left;
0259 }
0260 
0261 /* the caller has to hold the sock lock */
0262 static int rds_tcp_read_sock(struct rds_conn_path *cp, gfp_t gfp)
0263 {
0264     struct rds_tcp_connection *tc = cp->cp_transport_data;
0265     struct socket *sock = tc->t_sock;
0266     read_descriptor_t desc;
0267     struct rds_tcp_desc_arg arg;
0268 
0269     /* It's like glib in the kernel! */
0270     arg.conn_path = cp;
0271     arg.gfp = gfp;
0272     desc.arg.data = &arg;
0273     desc.error = 0;
0274     desc.count = 1; /* give more than one skb per call */
0275 
0276     tcp_read_sock(sock->sk, &desc, rds_tcp_data_recv);
0277     rdsdebug("tcp_read_sock for tc %p gfp 0x%x returned %d\n", tc, gfp,
0278          desc.error);
0279 
0280     return desc.error;
0281 }
0282 
0283 /*
0284  * We hold the sock lock to serialize our rds_tcp_recv->tcp_read_sock from
0285  * data_ready.
0286  *
0287  * if we fail to allocate we're in trouble.. blindly wait some time before
0288  * trying again to see if the VM can free up something for us.
0289  */
0290 int rds_tcp_recv_path(struct rds_conn_path *cp)
0291 {
0292     struct rds_tcp_connection *tc = cp->cp_transport_data;
0293     struct socket *sock = tc->t_sock;
0294     int ret = 0;
0295 
0296     rdsdebug("recv worker path [%d] tc %p sock %p\n",
0297          cp->cp_index, tc, sock);
0298 
0299     lock_sock(sock->sk);
0300     ret = rds_tcp_read_sock(cp, GFP_KERNEL);
0301     release_sock(sock->sk);
0302 
0303     return ret;
0304 }
0305 
0306 void rds_tcp_data_ready(struct sock *sk)
0307 {
0308     void (*ready)(struct sock *sk);
0309     struct rds_conn_path *cp;
0310     struct rds_tcp_connection *tc;
0311 
0312     rdsdebug("data ready sk %p\n", sk);
0313 
0314     read_lock_bh(&sk->sk_callback_lock);
0315     cp = sk->sk_user_data;
0316     if (!cp) { /* check for teardown race */
0317         ready = sk->sk_data_ready;
0318         goto out;
0319     }
0320 
0321     tc = cp->cp_transport_data;
0322     ready = tc->t_orig_data_ready;
0323     rds_tcp_stats_inc(s_tcp_data_ready_calls);
0324 
0325     if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) {
0326         rcu_read_lock();
0327         if (!rds_destroy_pending(cp->cp_conn))
0328             queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
0329         rcu_read_unlock();
0330     }
0331 out:
0332     read_unlock_bh(&sk->sk_callback_lock);
0333     ready(sk);
0334 }
0335 
0336 int rds_tcp_recv_init(void)
0337 {
0338     rds_tcp_incoming_slab = kmem_cache_create("rds_tcp_incoming",
0339                     sizeof(struct rds_tcp_incoming),
0340                     0, 0, NULL);
0341     if (!rds_tcp_incoming_slab)
0342         return -ENOMEM;
0343     return 0;
0344 }
0345 
0346 void rds_tcp_recv_exit(void)
0347 {
0348     kmem_cache_destroy(rds_tcp_incoming_slab);
0349 }