Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * net/tipc/name_distr.c: TIPC name distribution code
0003  *
0004  * Copyright (c) 2000-2006, 2014-2019, Ericsson AB
0005  * Copyright (c) 2005, 2010-2011, Wind River Systems
0006  * Copyright (c) 2020-2021, Red Hat Inc
0007  * All rights reserved.
0008  *
0009  * Redistribution and use in source and binary forms, with or without
0010  * modification, are permitted provided that the following conditions are met:
0011  *
0012  * 1. Redistributions of source code must retain the above copyright
0013  *    notice, this list of conditions and the following disclaimer.
0014  * 2. Redistributions in binary form must reproduce the above copyright
0015  *    notice, this list of conditions and the following disclaimer in the
0016  *    documentation and/or other materials provided with the distribution.
0017  * 3. Neither the names of the copyright holders nor the names of its
0018  *    contributors may be used to endorse or promote products derived from
0019  *    this software without specific prior written permission.
0020  *
0021  * Alternatively, this software may be distributed under the terms of the
0022  * GNU General Public License ("GPL") version 2 as published by the Free
0023  * Software Foundation.
0024  *
0025  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
0026  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
0027  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
0028  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
0029  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
0030  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
0031  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
0032  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
0033  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
0034  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
0035  * POSSIBILITY OF SUCH DAMAGE.
0036  */
0037 
0038 #include "core.h"
0039 #include "link.h"
0040 #include "name_distr.h"
0041 
0042 int sysctl_tipc_named_timeout __read_mostly = 2000;
0043 
0044 struct distr_queue_item {
0045     struct distr_item i;
0046     u32 dtype;
0047     u32 node;
0048     unsigned long expires;
0049     struct list_head next;
0050 };
0051 
0052 /**
0053  * publ_to_item - add publication info to a publication message
0054  * @p: publication info
0055  * @i: location of item in the message
0056  */
0057 static void publ_to_item(struct distr_item *i, struct publication *p)
0058 {
0059     i->type = htonl(p->sr.type);
0060     i->lower = htonl(p->sr.lower);
0061     i->upper = htonl(p->sr.upper);
0062     i->port = htonl(p->sk.ref);
0063     i->key = htonl(p->key);
0064 }
0065 
0066 /**
0067  * named_prepare_buf - allocate & initialize a publication message
0068  * @net: the associated network namespace
0069  * @type: message type
0070  * @size: payload size
0071  * @dest: destination node
0072  *
0073  * The buffer returned is of size INT_H_SIZE + payload size
0074  */
0075 static struct sk_buff *named_prepare_buf(struct net *net, u32 type, u32 size,
0076                      u32 dest)
0077 {
0078     struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE + size, GFP_ATOMIC);
0079     u32 self = tipc_own_addr(net);
0080     struct tipc_msg *msg;
0081 
0082     if (buf != NULL) {
0083         msg = buf_msg(buf);
0084         tipc_msg_init(self, msg, NAME_DISTRIBUTOR,
0085                   type, INT_H_SIZE, dest);
0086         msg_set_size(msg, INT_H_SIZE + size);
0087     }
0088     return buf;
0089 }
0090 
0091 /**
0092  * tipc_named_publish - tell other nodes about a new publication by this node
0093  * @net: the associated network namespace
0094  * @p: the new publication
0095  */
0096 struct sk_buff *tipc_named_publish(struct net *net, struct publication *p)
0097 {
0098     struct name_table *nt = tipc_name_table(net);
0099     struct distr_item *item;
0100     struct sk_buff *skb;
0101 
0102     if (p->scope == TIPC_NODE_SCOPE) {
0103         list_add_tail_rcu(&p->binding_node, &nt->node_scope);
0104         return NULL;
0105     }
0106     write_lock_bh(&nt->cluster_scope_lock);
0107     list_add_tail(&p->binding_node, &nt->cluster_scope);
0108     write_unlock_bh(&nt->cluster_scope_lock);
0109     skb = named_prepare_buf(net, PUBLICATION, ITEM_SIZE, 0);
0110     if (!skb) {
0111         pr_warn("Publication distribution failure\n");
0112         return NULL;
0113     }
0114     msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++);
0115     msg_set_non_legacy(buf_msg(skb));
0116     item = (struct distr_item *)msg_data(buf_msg(skb));
0117     publ_to_item(item, p);
0118     return skb;
0119 }
0120 
0121 /**
0122  * tipc_named_withdraw - tell other nodes about a withdrawn publication by this node
0123  * @net: the associated network namespace
0124  * @p: the withdrawn publication
0125  */
0126 struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *p)
0127 {
0128     struct name_table *nt = tipc_name_table(net);
0129     struct distr_item *item;
0130     struct sk_buff *skb;
0131 
0132     write_lock_bh(&nt->cluster_scope_lock);
0133     list_del(&p->binding_node);
0134     write_unlock_bh(&nt->cluster_scope_lock);
0135     if (p->scope == TIPC_NODE_SCOPE)
0136         return NULL;
0137 
0138     skb = named_prepare_buf(net, WITHDRAWAL, ITEM_SIZE, 0);
0139     if (!skb) {
0140         pr_warn("Withdrawal distribution failure\n");
0141         return NULL;
0142     }
0143     msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++);
0144     msg_set_non_legacy(buf_msg(skb));
0145     item = (struct distr_item *)msg_data(buf_msg(skb));
0146     publ_to_item(item, p);
0147     return skb;
0148 }
0149 
0150 /**
0151  * named_distribute - prepare name info for bulk distribution to another node
0152  * @net: the associated network namespace
0153  * @list: list of messages (buffers) to be returned from this function
0154  * @dnode: node to be updated
0155  * @pls: linked list of publication items to be packed into buffer chain
0156  * @seqno: sequence number for this message
0157  */
0158 static void named_distribute(struct net *net, struct sk_buff_head *list,
0159                  u32 dnode, struct list_head *pls, u16 seqno)
0160 {
0161     struct publication *publ;
0162     struct sk_buff *skb = NULL;
0163     struct distr_item *item = NULL;
0164     u32 msg_dsz = ((tipc_node_get_mtu(net, dnode, 0, false) - INT_H_SIZE) /
0165             ITEM_SIZE) * ITEM_SIZE;
0166     u32 msg_rem = msg_dsz;
0167     struct tipc_msg *hdr;
0168 
0169     list_for_each_entry(publ, pls, binding_node) {
0170         /* Prepare next buffer: */
0171         if (!skb) {
0172             skb = named_prepare_buf(net, PUBLICATION, msg_rem,
0173                         dnode);
0174             if (!skb) {
0175                 pr_warn("Bulk publication failure\n");
0176                 return;
0177             }
0178             hdr = buf_msg(skb);
0179             msg_set_bc_ack_invalid(hdr, true);
0180             msg_set_bulk(hdr);
0181             msg_set_non_legacy(hdr);
0182             item = (struct distr_item *)msg_data(hdr);
0183         }
0184 
0185         /* Pack publication into message: */
0186         publ_to_item(item, publ);
0187         item++;
0188         msg_rem -= ITEM_SIZE;
0189 
0190         /* Append full buffer to list: */
0191         if (!msg_rem) {
0192             __skb_queue_tail(list, skb);
0193             skb = NULL;
0194             msg_rem = msg_dsz;
0195         }
0196     }
0197     if (skb) {
0198         hdr = buf_msg(skb);
0199         msg_set_size(hdr, INT_H_SIZE + (msg_dsz - msg_rem));
0200         skb_trim(skb, INT_H_SIZE + (msg_dsz - msg_rem));
0201         __skb_queue_tail(list, skb);
0202     }
0203     hdr = buf_msg(skb_peek_tail(list));
0204     msg_set_last_bulk(hdr);
0205     msg_set_named_seqno(hdr, seqno);
0206 }
0207 
0208 /**
0209  * tipc_named_node_up - tell specified node about all publications by this node
0210  * @net: the associated network namespace
0211  * @dnode: destination node
0212  * @capabilities: peer node's capabilities
0213  */
0214 void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities)
0215 {
0216     struct name_table *nt = tipc_name_table(net);
0217     struct tipc_net *tn = tipc_net(net);
0218     struct sk_buff_head head;
0219     u16 seqno;
0220 
0221     __skb_queue_head_init(&head);
0222     spin_lock_bh(&tn->nametbl_lock);
0223     if (!(capabilities & TIPC_NAMED_BCAST))
0224         nt->rc_dests++;
0225     seqno = nt->snd_nxt;
0226     spin_unlock_bh(&tn->nametbl_lock);
0227 
0228     read_lock_bh(&nt->cluster_scope_lock);
0229     named_distribute(net, &head, dnode, &nt->cluster_scope, seqno);
0230     tipc_node_xmit(net, &head, dnode, 0);
0231     read_unlock_bh(&nt->cluster_scope_lock);
0232 }
0233 
0234 /**
0235  * tipc_publ_purge - remove publication associated with a failed node
0236  * @net: the associated network namespace
0237  * @p: the publication to remove
0238  * @addr: failed node's address
0239  *
0240  * Invoked for each publication issued by a newly failed node.
0241  * Removes publication structure from name table & deletes it.
0242  */
0243 static void tipc_publ_purge(struct net *net, struct publication *p, u32 addr)
0244 {
0245     struct tipc_net *tn = tipc_net(net);
0246     struct publication *_p;
0247     struct tipc_uaddr ua;
0248 
0249     tipc_uaddr(&ua, TIPC_SERVICE_RANGE, p->scope, p->sr.type,
0250            p->sr.lower, p->sr.upper);
0251     spin_lock_bh(&tn->nametbl_lock);
0252     _p = tipc_nametbl_remove_publ(net, &ua, &p->sk, p->key);
0253     if (_p)
0254         tipc_node_unsubscribe(net, &_p->binding_node, addr);
0255     spin_unlock_bh(&tn->nametbl_lock);
0256     if (_p)
0257         kfree_rcu(_p, rcu);
0258 }
0259 
0260 void tipc_publ_notify(struct net *net, struct list_head *nsub_list,
0261               u32 addr, u16 capabilities)
0262 {
0263     struct name_table *nt = tipc_name_table(net);
0264     struct tipc_net *tn = tipc_net(net);
0265 
0266     struct publication *publ, *tmp;
0267 
0268     list_for_each_entry_safe(publ, tmp, nsub_list, binding_node)
0269         tipc_publ_purge(net, publ, addr);
0270     spin_lock_bh(&tn->nametbl_lock);
0271     if (!(capabilities & TIPC_NAMED_BCAST))
0272         nt->rc_dests--;
0273     spin_unlock_bh(&tn->nametbl_lock);
0274 }
0275 
0276 /**
0277  * tipc_update_nametbl - try to process a nametable update and notify
0278  *           subscribers
0279  * @net: the associated network namespace
0280  * @i: location of item in the message
0281  * @node: node address
0282  * @dtype: name distributor message type
0283  *
0284  * tipc_nametbl_lock must be held.
0285  * Return: the publication item if successful, otherwise NULL.
0286  */
0287 static bool tipc_update_nametbl(struct net *net, struct distr_item *i,
0288                 u32 node, u32 dtype)
0289 {
0290     struct publication *p = NULL;
0291     struct tipc_socket_addr sk;
0292     struct tipc_uaddr ua;
0293     u32 key = ntohl(i->key);
0294 
0295     tipc_uaddr(&ua, TIPC_SERVICE_RANGE, TIPC_CLUSTER_SCOPE,
0296            ntohl(i->type), ntohl(i->lower), ntohl(i->upper));
0297     sk.ref = ntohl(i->port);
0298     sk.node = node;
0299 
0300     if (dtype == PUBLICATION) {
0301         p = tipc_nametbl_insert_publ(net, &ua, &sk, key);
0302         if (p) {
0303             tipc_node_subscribe(net, &p->binding_node, node);
0304             return true;
0305         }
0306     } else if (dtype == WITHDRAWAL) {
0307         p = tipc_nametbl_remove_publ(net, &ua, &sk, key);
0308         if (p) {
0309             tipc_node_unsubscribe(net, &p->binding_node, node);
0310             kfree_rcu(p, rcu);
0311             return true;
0312         }
0313         pr_warn_ratelimited("Failed to remove binding %u,%u from %u\n",
0314                     ua.sr.type, ua.sr.lower, node);
0315     } else {
0316         pr_warn_ratelimited("Unknown name table message received\n");
0317     }
0318     return false;
0319 }
0320 
0321 static struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq,
0322                       u16 *rcv_nxt, bool *open)
0323 {
0324     struct sk_buff *skb, *tmp;
0325     struct tipc_msg *hdr;
0326     u16 seqno;
0327 
0328     spin_lock_bh(&namedq->lock);
0329     skb_queue_walk_safe(namedq, skb, tmp) {
0330         if (unlikely(skb_linearize(skb))) {
0331             __skb_unlink(skb, namedq);
0332             kfree_skb(skb);
0333             continue;
0334         }
0335         hdr = buf_msg(skb);
0336         seqno = msg_named_seqno(hdr);
0337         if (msg_is_last_bulk(hdr)) {
0338             *rcv_nxt = seqno;
0339             *open = true;
0340         }
0341 
0342         if (msg_is_bulk(hdr) || msg_is_legacy(hdr)) {
0343             __skb_unlink(skb, namedq);
0344             spin_unlock_bh(&namedq->lock);
0345             return skb;
0346         }
0347 
0348         if (*open && (*rcv_nxt == seqno)) {
0349             (*rcv_nxt)++;
0350             __skb_unlink(skb, namedq);
0351             spin_unlock_bh(&namedq->lock);
0352             return skb;
0353         }
0354 
0355         if (less(seqno, *rcv_nxt)) {
0356             __skb_unlink(skb, namedq);
0357             kfree_skb(skb);
0358             continue;
0359         }
0360     }
0361     spin_unlock_bh(&namedq->lock);
0362     return NULL;
0363 }
0364 
0365 /**
0366  * tipc_named_rcv - process name table update messages sent by another node
0367  * @net: the associated network namespace
0368  * @namedq: queue to receive from
0369  * @rcv_nxt: store last received seqno here
0370  * @open: last bulk msg was received (FIXME)
0371  */
0372 void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
0373             u16 *rcv_nxt, bool *open)
0374 {
0375     struct tipc_net *tn = tipc_net(net);
0376     struct distr_item *item;
0377     struct tipc_msg *hdr;
0378     struct sk_buff *skb;
0379     u32 count, node;
0380 
0381     spin_lock_bh(&tn->nametbl_lock);
0382     while ((skb = tipc_named_dequeue(namedq, rcv_nxt, open))) {
0383         hdr = buf_msg(skb);
0384         node = msg_orignode(hdr);
0385         item = (struct distr_item *)msg_data(hdr);
0386         count = msg_data_sz(hdr) / ITEM_SIZE;
0387         while (count--) {
0388             tipc_update_nametbl(net, item, node, msg_type(hdr));
0389             item++;
0390         }
0391         kfree_skb(skb);
0392     }
0393     spin_unlock_bh(&tn->nametbl_lock);
0394 }
0395 
0396 /**
0397  * tipc_named_reinit - re-initialize local publications
0398  * @net: the associated network namespace
0399  *
0400  * This routine is called whenever TIPC networking is enabled.
0401  * All name table entries published by this node are updated to reflect
0402  * the node's new network address.
0403  */
0404 void tipc_named_reinit(struct net *net)
0405 {
0406     struct name_table *nt = tipc_name_table(net);
0407     struct tipc_net *tn = tipc_net(net);
0408     struct publication *p;
0409     u32 self = tipc_own_addr(net);
0410 
0411     spin_lock_bh(&tn->nametbl_lock);
0412 
0413     list_for_each_entry_rcu(p, &nt->node_scope, binding_node)
0414         p->sk.node = self;
0415     list_for_each_entry_rcu(p, &nt->cluster_scope, binding_node)
0416         p->sk.node = self;
0417     nt->rc_dests = 0;
0418     spin_unlock_bh(&tn->nametbl_lock);
0419 }