Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Copyright (c) 2007, 2017 Oracle and/or its affiliates. All rights reserved.
0003  *
0004  * This software is available to you under a choice of one of two
0005  * licenses.  You may choose to be licensed under the terms of the GNU
0006  * General Public License (GPL) Version 2, available from the file
0007  * COPYING in the main directory of this source tree, or the
0008  * OpenIB.org BSD license below:
0009  *
0010  *     Redistribution and use in source and binary forms, with or
0011  *     without modification, are permitted provided that the following
0012  *     conditions are met:
0013  *
0014  *      - Redistributions of source code must retain the above
0015  *        copyright notice, this list of conditions and the following
0016  *        disclaimer.
0017  *
0018  *      - Redistributions in binary form must reproduce the above
0019  *        copyright notice, this list of conditions and the following
0020  *        disclaimer in the documentation and/or other materials
0021  *        provided with the distribution.
0022  *
0023  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
0024  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
0025  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
0026  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
0027  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
0028  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
0029  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
0030  * SOFTWARE.
0031  *
0032  */
0033 #include <linux/slab.h>
0034 #include <linux/types.h>
0035 #include <linux/rbtree.h>
0036 #include <linux/bitops.h>
0037 #include <linux/export.h>
0038 
0039 #include "rds.h"
0040 
0041 /*
0042  * This file implements the receive side of the unconventional congestion
0043  * management in RDS.
0044  *
0045  * Messages waiting in the receive queue on the receiving socket are accounted
0046  * against the sockets SO_RCVBUF option value.  Only the payload bytes in the
0047  * message are accounted for.  If the number of bytes queued equals or exceeds
0048  * rcvbuf then the socket is congested.  All sends attempted to this socket's
0049  * address should return block or return -EWOULDBLOCK.
0050  *
0051  * Applications are expected to be reasonably tuned such that this situation
0052  * very rarely occurs.  An application encountering this "back-pressure" is
0053  * considered a bug.
0054  *
0055  * This is implemented by having each node maintain bitmaps which indicate
0056  * which ports on bound addresses are congested.  As the bitmap changes it is
0057  * sent through all the connections which terminate in the local address of the
0058  * bitmap which changed.
0059  *
0060  * The bitmaps are allocated as connections are brought up.  This avoids
0061  * allocation in the interrupt handling path which queues messages on sockets.
0062  * The dense bitmaps let transports send the entire bitmap on any bitmap change
0063  * reasonably efficiently.  This is much easier to implement than some
0064  * finer-grained communication of per-port congestion.  The sender does a very
0065  * inexpensive bit test to test if the port it's about to send to is congested
0066  * or not.
0067  */
0068 
0069 /*
0070  * Interaction with poll is a tad tricky. We want all processes stuck in
0071  * poll to wake up and check whether a congested destination became uncongested.
0072  * The really sad thing is we have no idea which destinations the application
0073  * wants to send to - we don't even know which rds_connections are involved.
0074  * So until we implement a more flexible rds poll interface, we have to make
0075  * do with this:
0076  * We maintain a global counter that is incremented each time a congestion map
0077  * update is received. Each rds socket tracks this value, and if rds_poll
0078  * finds that the saved generation number is smaller than the global generation
0079  * number, it wakes up the process.
0080  */
0081 static atomic_t     rds_cong_generation = ATOMIC_INIT(0);
0082 
0083 /*
0084  * Congestion monitoring
0085  */
0086 static LIST_HEAD(rds_cong_monitor);
0087 static DEFINE_RWLOCK(rds_cong_monitor_lock);
0088 
0089 /*
0090  * Yes, a global lock.  It's used so infrequently that it's worth keeping it
0091  * global to simplify the locking.  It's only used in the following
0092  * circumstances:
0093  *
0094  *  - on connection buildup to associate a conn with its maps
0095  *  - on map changes to inform conns of a new map to send
0096  *
0097  *  It's sadly ordered under the socket callback lock and the connection lock.
0098  *  Receive paths can mark ports congested from interrupt context so the
0099  *  lock masks interrupts.
0100  */
0101 static DEFINE_SPINLOCK(rds_cong_lock);
0102 static struct rb_root rds_cong_tree = RB_ROOT;
0103 
0104 static struct rds_cong_map *rds_cong_tree_walk(const struct in6_addr *addr,
0105                            struct rds_cong_map *insert)
0106 {
0107     struct rb_node **p = &rds_cong_tree.rb_node;
0108     struct rb_node *parent = NULL;
0109     struct rds_cong_map *map;
0110 
0111     while (*p) {
0112         int diff;
0113 
0114         parent = *p;
0115         map = rb_entry(parent, struct rds_cong_map, m_rb_node);
0116 
0117         diff = rds_addr_cmp(addr, &map->m_addr);
0118         if (diff < 0)
0119             p = &(*p)->rb_left;
0120         else if (diff > 0)
0121             p = &(*p)->rb_right;
0122         else
0123             return map;
0124     }
0125 
0126     if (insert) {
0127         rb_link_node(&insert->m_rb_node, parent, p);
0128         rb_insert_color(&insert->m_rb_node, &rds_cong_tree);
0129     }
0130     return NULL;
0131 }
0132 
0133 /*
0134  * There is only ever one bitmap for any address.  Connections try and allocate
0135  * these bitmaps in the process getting pointers to them.  The bitmaps are only
0136  * ever freed as the module is removed after all connections have been freed.
0137  */
0138 static struct rds_cong_map *rds_cong_from_addr(const struct in6_addr *addr)
0139 {
0140     struct rds_cong_map *map;
0141     struct rds_cong_map *ret = NULL;
0142     unsigned long zp;
0143     unsigned long i;
0144     unsigned long flags;
0145 
0146     map = kzalloc(sizeof(struct rds_cong_map), GFP_KERNEL);
0147     if (!map)
0148         return NULL;
0149 
0150     map->m_addr = *addr;
0151     init_waitqueue_head(&map->m_waitq);
0152     INIT_LIST_HEAD(&map->m_conn_list);
0153 
0154     for (i = 0; i < RDS_CONG_MAP_PAGES; i++) {
0155         zp = get_zeroed_page(GFP_KERNEL);
0156         if (zp == 0)
0157             goto out;
0158         map->m_page_addrs[i] = zp;
0159     }
0160 
0161     spin_lock_irqsave(&rds_cong_lock, flags);
0162     ret = rds_cong_tree_walk(addr, map);
0163     spin_unlock_irqrestore(&rds_cong_lock, flags);
0164 
0165     if (!ret) {
0166         ret = map;
0167         map = NULL;
0168     }
0169 
0170 out:
0171     if (map) {
0172         for (i = 0; i < RDS_CONG_MAP_PAGES && map->m_page_addrs[i]; i++)
0173             free_page(map->m_page_addrs[i]);
0174         kfree(map);
0175     }
0176 
0177     rdsdebug("map %p for addr %pI6c\n", ret, addr);
0178 
0179     return ret;
0180 }
0181 
0182 /*
0183  * Put the conn on its local map's list.  This is called when the conn is
0184  * really added to the hash.  It's nested under the rds_conn_lock, sadly.
0185  */
0186 void rds_cong_add_conn(struct rds_connection *conn)
0187 {
0188     unsigned long flags;
0189 
0190     rdsdebug("conn %p now on map %p\n", conn, conn->c_lcong);
0191     spin_lock_irqsave(&rds_cong_lock, flags);
0192     list_add_tail(&conn->c_map_item, &conn->c_lcong->m_conn_list);
0193     spin_unlock_irqrestore(&rds_cong_lock, flags);
0194 }
0195 
0196 void rds_cong_remove_conn(struct rds_connection *conn)
0197 {
0198     unsigned long flags;
0199 
0200     rdsdebug("removing conn %p from map %p\n", conn, conn->c_lcong);
0201     spin_lock_irqsave(&rds_cong_lock, flags);
0202     list_del_init(&conn->c_map_item);
0203     spin_unlock_irqrestore(&rds_cong_lock, flags);
0204 }
0205 
0206 int rds_cong_get_maps(struct rds_connection *conn)
0207 {
0208     conn->c_lcong = rds_cong_from_addr(&conn->c_laddr);
0209     conn->c_fcong = rds_cong_from_addr(&conn->c_faddr);
0210 
0211     if (!(conn->c_lcong && conn->c_fcong))
0212         return -ENOMEM;
0213 
0214     return 0;
0215 }
0216 
0217 void rds_cong_queue_updates(struct rds_cong_map *map)
0218 {
0219     struct rds_connection *conn;
0220     unsigned long flags;
0221 
0222     spin_lock_irqsave(&rds_cong_lock, flags);
0223 
0224     list_for_each_entry(conn, &map->m_conn_list, c_map_item) {
0225         struct rds_conn_path *cp = &conn->c_path[0];
0226 
0227         rcu_read_lock();
0228         if (!test_and_set_bit(0, &conn->c_map_queued) &&
0229             !rds_destroy_pending(cp->cp_conn)) {
0230             rds_stats_inc(s_cong_update_queued);
0231             /* We cannot inline the call to rds_send_xmit() here
0232              * for two reasons (both pertaining to a TCP transport):
0233              * 1. When we get here from the receive path, we
0234              *    are already holding the sock_lock (held by
0235              *    tcp_v4_rcv()). So inlining calls to
0236              *    tcp_setsockopt and/or tcp_sendmsg will deadlock
0237              *    when it tries to get the sock_lock())
0238              * 2. Interrupts are masked so that we can mark the
0239              *    port congested from both send and recv paths.
0240              *    (See comment around declaration of rdc_cong_lock).
0241              *    An attempt to get the sock_lock() here will
0242              *    therefore trigger warnings.
0243              * Defer the xmit to rds_send_worker() instead.
0244              */
0245             queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
0246         }
0247         rcu_read_unlock();
0248     }
0249 
0250     spin_unlock_irqrestore(&rds_cong_lock, flags);
0251 }
0252 
0253 void rds_cong_map_updated(struct rds_cong_map *map, uint64_t portmask)
0254 {
0255     rdsdebug("waking map %p for %pI4\n",
0256       map, &map->m_addr);
0257     rds_stats_inc(s_cong_update_received);
0258     atomic_inc(&rds_cong_generation);
0259     if (waitqueue_active(&map->m_waitq))
0260         wake_up(&map->m_waitq);
0261     if (waitqueue_active(&rds_poll_waitq))
0262         wake_up_all(&rds_poll_waitq);
0263 
0264     if (portmask && !list_empty(&rds_cong_monitor)) {
0265         unsigned long flags;
0266         struct rds_sock *rs;
0267 
0268         read_lock_irqsave(&rds_cong_monitor_lock, flags);
0269         list_for_each_entry(rs, &rds_cong_monitor, rs_cong_list) {
0270             spin_lock(&rs->rs_lock);
0271             rs->rs_cong_notify |= (rs->rs_cong_mask & portmask);
0272             rs->rs_cong_mask &= ~portmask;
0273             spin_unlock(&rs->rs_lock);
0274             if (rs->rs_cong_notify)
0275                 rds_wake_sk_sleep(rs);
0276         }
0277         read_unlock_irqrestore(&rds_cong_monitor_lock, flags);
0278     }
0279 }
0280 EXPORT_SYMBOL_GPL(rds_cong_map_updated);
0281 
0282 int rds_cong_updated_since(unsigned long *recent)
0283 {
0284     unsigned long gen = atomic_read(&rds_cong_generation);
0285 
0286     if (likely(*recent == gen))
0287         return 0;
0288     *recent = gen;
0289     return 1;
0290 }
0291 
0292 /*
0293  * We're called under the locking that protects the sockets receive buffer
0294  * consumption.  This makes it a lot easier for the caller to only call us
0295  * when it knows that an existing set bit needs to be cleared, and vice versa.
0296  * We can't block and we need to deal with concurrent sockets working against
0297  * the same per-address map.
0298  */
0299 void rds_cong_set_bit(struct rds_cong_map *map, __be16 port)
0300 {
0301     unsigned long i;
0302     unsigned long off;
0303 
0304     rdsdebug("setting congestion for %pI4:%u in map %p\n",
0305       &map->m_addr, ntohs(port), map);
0306 
0307     i = be16_to_cpu(port) / RDS_CONG_MAP_PAGE_BITS;
0308     off = be16_to_cpu(port) % RDS_CONG_MAP_PAGE_BITS;
0309 
0310     set_bit_le(off, (void *)map->m_page_addrs[i]);
0311 }
0312 
0313 void rds_cong_clear_bit(struct rds_cong_map *map, __be16 port)
0314 {
0315     unsigned long i;
0316     unsigned long off;
0317 
0318     rdsdebug("clearing congestion for %pI4:%u in map %p\n",
0319       &map->m_addr, ntohs(port), map);
0320 
0321     i = be16_to_cpu(port) / RDS_CONG_MAP_PAGE_BITS;
0322     off = be16_to_cpu(port) % RDS_CONG_MAP_PAGE_BITS;
0323 
0324     clear_bit_le(off, (void *)map->m_page_addrs[i]);
0325 }
0326 
0327 static int rds_cong_test_bit(struct rds_cong_map *map, __be16 port)
0328 {
0329     unsigned long i;
0330     unsigned long off;
0331 
0332     i = be16_to_cpu(port) / RDS_CONG_MAP_PAGE_BITS;
0333     off = be16_to_cpu(port) % RDS_CONG_MAP_PAGE_BITS;
0334 
0335     return test_bit_le(off, (void *)map->m_page_addrs[i]);
0336 }
0337 
0338 void rds_cong_add_socket(struct rds_sock *rs)
0339 {
0340     unsigned long flags;
0341 
0342     write_lock_irqsave(&rds_cong_monitor_lock, flags);
0343     if (list_empty(&rs->rs_cong_list))
0344         list_add(&rs->rs_cong_list, &rds_cong_monitor);
0345     write_unlock_irqrestore(&rds_cong_monitor_lock, flags);
0346 }
0347 
0348 void rds_cong_remove_socket(struct rds_sock *rs)
0349 {
0350     unsigned long flags;
0351     struct rds_cong_map *map;
0352 
0353     write_lock_irqsave(&rds_cong_monitor_lock, flags);
0354     list_del_init(&rs->rs_cong_list);
0355     write_unlock_irqrestore(&rds_cong_monitor_lock, flags);
0356 
0357     /* update congestion map for now-closed port */
0358     spin_lock_irqsave(&rds_cong_lock, flags);
0359     map = rds_cong_tree_walk(&rs->rs_bound_addr, NULL);
0360     spin_unlock_irqrestore(&rds_cong_lock, flags);
0361 
0362     if (map && rds_cong_test_bit(map, rs->rs_bound_port)) {
0363         rds_cong_clear_bit(map, rs->rs_bound_port);
0364         rds_cong_queue_updates(map);
0365     }
0366 }
0367 
0368 int rds_cong_wait(struct rds_cong_map *map, __be16 port, int nonblock,
0369           struct rds_sock *rs)
0370 {
0371     if (!rds_cong_test_bit(map, port))
0372         return 0;
0373     if (nonblock) {
0374         if (rs && rs->rs_cong_monitor) {
0375             unsigned long flags;
0376 
0377             /* It would have been nice to have an atomic set_bit on
0378              * a uint64_t. */
0379             spin_lock_irqsave(&rs->rs_lock, flags);
0380             rs->rs_cong_mask |= RDS_CONG_MONITOR_MASK(ntohs(port));
0381             spin_unlock_irqrestore(&rs->rs_lock, flags);
0382 
0383             /* Test again - a congestion update may have arrived in
0384              * the meantime. */
0385             if (!rds_cong_test_bit(map, port))
0386                 return 0;
0387         }
0388         rds_stats_inc(s_cong_send_error);
0389         return -ENOBUFS;
0390     }
0391 
0392     rds_stats_inc(s_cong_send_blocked);
0393     rdsdebug("waiting on map %p for port %u\n", map, be16_to_cpu(port));
0394 
0395     return wait_event_interruptible(map->m_waitq,
0396                     !rds_cong_test_bit(map, port));
0397 }
0398 
0399 void rds_cong_exit(void)
0400 {
0401     struct rb_node *node;
0402     struct rds_cong_map *map;
0403     unsigned long i;
0404 
0405     while ((node = rb_first(&rds_cong_tree))) {
0406         map = rb_entry(node, struct rds_cong_map, m_rb_node);
0407         rdsdebug("freeing map %p\n", map);
0408         rb_erase(&map->m_rb_node, &rds_cong_tree);
0409         for (i = 0; i < RDS_CONG_MAP_PAGES && map->m_page_addrs[i]; i++)
0410             free_page(map->m_page_addrs[i]);
0411         kfree(map);
0412     }
0413 }
0414 
0415 /*
0416  * Allocate a RDS message containing a congestion update.
0417  */
0418 struct rds_message *rds_cong_update_alloc(struct rds_connection *conn)
0419 {
0420     struct rds_cong_map *map = conn->c_lcong;
0421     struct rds_message *rm;
0422 
0423     rm = rds_message_map_pages(map->m_page_addrs, RDS_CONG_MAP_BYTES);
0424     if (!IS_ERR(rm))
0425         rm->m_inc.i_hdr.h_flags = RDS_FLAG_CONG_BITMAP;
0426 
0427     return rm;
0428 }