Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * net/tipc/name_table.c: TIPC name table code
0003  *
0004  * Copyright (c) 2000-2006, 2014-2018, Ericsson AB
0005  * Copyright (c) 2004-2008, 2010-2014, Wind River Systems
0006  * Copyright (c) 2020-2021, Red Hat Inc
0007  * All rights reserved.
0008  *
0009  * Redistribution and use in source and binary forms, with or without
0010  * modification, are permitted provided that the following conditions are met:
0011  *
0012  * 1. Redistributions of source code must retain the above copyright
0013  *    notice, this list of conditions and the following disclaimer.
0014  * 2. Redistributions in binary form must reproduce the above copyright
0015  *    notice, this list of conditions and the following disclaimer in the
0016  *    documentation and/or other materials provided with the distribution.
0017  * 3. Neither the names of the copyright holders nor the names of its
0018  *    contributors may be used to endorse or promote products derived from
0019  *    this software without specific prior written permission.
0020  *
0021  * Alternatively, this software may be distributed under the terms of the
0022  * GNU General Public License ("GPL") version 2 as published by the Free
0023  * Software Foundation.
0024  *
0025  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
0026  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
0027  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
0028  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
0029  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
0030  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
0031  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
0032  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
0033  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
0034  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
0035  * POSSIBILITY OF SUCH DAMAGE.
0036  */
0037 
0038 #include <net/sock.h>
0039 #include <linux/list_sort.h>
0040 #include <linux/rbtree_augmented.h>
0041 #include "core.h"
0042 #include "netlink.h"
0043 #include "name_table.h"
0044 #include "name_distr.h"
0045 #include "subscr.h"
0046 #include "bcast.h"
0047 #include "addr.h"
0048 #include "node.h"
0049 #include "group.h"
0050 
0051 /**
0052  * struct service_range - container for all bindings of a service range
0053  * @lower: service range lower bound
0054  * @upper: service range upper bound
0055  * @tree_node: member of service range RB tree
0056  * @max: largest 'upper' in this node subtree
0057  * @local_publ: list of identical publications made from this node
0058  *   Used by closest_first lookup and multicast lookup algorithm
0059  * @all_publ: all publications identical to this one, whatever node and scope
0060  *   Used by round-robin lookup algorithm
0061  */
0062 struct service_range {
0063     u32 lower;
0064     u32 upper;
0065     struct rb_node tree_node;
0066     u32 max;
0067     struct list_head local_publ;
0068     struct list_head all_publ;
0069 };
0070 
0071 /**
0072  * struct tipc_service - container for all published instances of a service type
0073  * @type: 32 bit 'type' value for service
0074  * @publ_cnt: increasing counter for publications in this service
0075  * @ranges: rb tree containing all service ranges for this service
0076  * @service_list: links to adjacent name ranges in hash chain
0077  * @subscriptions: list of subscriptions for this service type
0078  * @lock: spinlock controlling access to pertaining service ranges/publications
0079  * @rcu: RCU callback head used for deferred freeing
0080  */
0081 struct tipc_service {
0082     u32 type;
0083     u32 publ_cnt;
0084     struct rb_root ranges;
0085     struct hlist_node service_list;
0086     struct list_head subscriptions;
0087     spinlock_t lock; /* Covers service range list */
0088     struct rcu_head rcu;
0089 };
0090 
0091 #define service_range_upper(sr) ((sr)->upper)
0092 RB_DECLARE_CALLBACKS_MAX(static, sr_callbacks,
0093              struct service_range, tree_node, u32, max,
0094              service_range_upper)
0095 
0096 #define service_range_entry(rbtree_node)                \
0097     (container_of(rbtree_node, struct service_range, tree_node))
0098 
0099 #define service_range_overlap(sr, start, end)               \
0100     ((sr)->lower <= (end) && (sr)->upper >= (start))
0101 
0102 /**
0103  * service_range_foreach_match - iterate over tipc service rbtree for each
0104  *                               range match
0105  * @sr: the service range pointer as a loop cursor
0106  * @sc: the pointer to tipc service which holds the service range rbtree
0107  * @start: beginning of the search range (end >= start) for matching
0108  * @end: end of the search range (end >= start) for matching
0109  */
0110 #define service_range_foreach_match(sr, sc, start, end)         \
0111     for (sr = service_range_match_first((sc)->ranges.rb_node,   \
0112                         start,          \
0113                         end);           \
0114          sr;                            \
0115          sr = service_range_match_next(&(sr)->tree_node,        \
0116                        start,           \
0117                        end))
0118 
0119 /**
0120  * service_range_match_first - find first service range matching a range
0121  * @n: the root node of service range rbtree for searching
0122  * @start: beginning of the search range (end >= start) for matching
0123  * @end: end of the search range (end >= start) for matching
0124  *
0125  * Return: the leftmost service range node in the rbtree that overlaps the
0126  * specific range if any. Otherwise, returns NULL.
0127  */
0128 static struct service_range *service_range_match_first(struct rb_node *n,
0129                                u32 start, u32 end)
0130 {
0131     struct service_range *sr;
0132     struct rb_node *l, *r;
0133 
0134     /* Non overlaps in tree at all? */
0135     if (!n || service_range_entry(n)->max < start)
0136         return NULL;
0137 
0138     while (n) {
0139         l = n->rb_left;
0140         if (l && service_range_entry(l)->max >= start) {
0141             /* A leftmost overlap range node must be one in the left
0142              * subtree. If not, it has lower > end, then nodes on
0143              * the right side cannot satisfy the condition either.
0144              */
0145             n = l;
0146             continue;
0147         }
0148 
0149         /* No one in the left subtree can match, return if this node is
0150          * an overlap i.e. leftmost.
0151          */
0152         sr = service_range_entry(n);
0153         if (service_range_overlap(sr, start, end))
0154             return sr;
0155 
0156         /* Ok, try to lookup on the right side */
0157         r = n->rb_right;
0158         if (sr->lower <= end &&
0159             r && service_range_entry(r)->max >= start) {
0160             n = r;
0161             continue;
0162         }
0163         break;
0164     }
0165 
0166     return NULL;
0167 }
0168 
0169 /**
0170  * service_range_match_next - find next service range matching a range
0171  * @n: a node in service range rbtree from which the searching starts
0172  * @start: beginning of the search range (end >= start) for matching
0173  * @end: end of the search range (end >= start) for matching
0174  *
0175  * Return: the next service range node to the given node in the rbtree that
0176  * overlaps the specific range if any. Otherwise, returns NULL.
0177  */
0178 static struct service_range *service_range_match_next(struct rb_node *n,
0179                               u32 start, u32 end)
0180 {
0181     struct service_range *sr;
0182     struct rb_node *p, *r;
0183 
0184     while (n) {
0185         r = n->rb_right;
0186         if (r && service_range_entry(r)->max >= start)
0187             /* A next overlap range node must be one in the right
0188              * subtree. If not, it has lower > end, then any next
0189              * successor (- an ancestor) of this node cannot
0190              * satisfy the condition either.
0191              */
0192             return service_range_match_first(r, start, end);
0193 
0194         /* No one in the right subtree can match, go up to find an
0195          * ancestor of this node which is parent of a left-hand child.
0196          */
0197         while ((p = rb_parent(n)) && n == p->rb_right)
0198             n = p;
0199         if (!p)
0200             break;
0201 
0202         /* Return if this ancestor is an overlap */
0203         sr = service_range_entry(p);
0204         if (service_range_overlap(sr, start, end))
0205             return sr;
0206 
0207         /* Ok, try to lookup more from this ancestor */
0208         if (sr->lower <= end) {
0209             n = p;
0210             continue;
0211         }
0212         break;
0213     }
0214 
0215     return NULL;
0216 }
0217 
0218 static int hash(int x)
0219 {
0220     return x & (TIPC_NAMETBL_SIZE - 1);
0221 }
0222 
0223 /**
0224  * tipc_publ_create - create a publication structure
0225  * @ua: the service range the user is binding to
0226  * @sk: the address of the socket that is bound
0227  * @key: publication key
0228  */
0229 static struct publication *tipc_publ_create(struct tipc_uaddr *ua,
0230                         struct tipc_socket_addr *sk,
0231                         u32 key)
0232 {
0233     struct publication *p = kzalloc(sizeof(*p), GFP_ATOMIC);
0234 
0235     if (!p)
0236         return NULL;
0237 
0238     p->sr = ua->sr;
0239     p->sk = *sk;
0240     p->scope = ua->scope;
0241     p->key = key;
0242     INIT_LIST_HEAD(&p->binding_sock);
0243     INIT_LIST_HEAD(&p->binding_node);
0244     INIT_LIST_HEAD(&p->local_publ);
0245     INIT_LIST_HEAD(&p->all_publ);
0246     INIT_LIST_HEAD(&p->list);
0247     return p;
0248 }
0249 
0250 /**
0251  * tipc_service_create - create a service structure for the specified 'type'
0252  * @net: network namespace
0253  * @ua: address representing the service to be bound
0254  *
0255  * Allocates a single range structure and sets it to all 0's.
0256  */
0257 static struct tipc_service *tipc_service_create(struct net *net,
0258                         struct tipc_uaddr *ua)
0259 {
0260     struct name_table *nt = tipc_name_table(net);
0261     struct tipc_service *service;
0262     struct hlist_head *hd;
0263 
0264     service = kzalloc(sizeof(*service), GFP_ATOMIC);
0265     if (!service) {
0266         pr_warn("Service creation failed, no memory\n");
0267         return NULL;
0268     }
0269 
0270     spin_lock_init(&service->lock);
0271     service->type = ua->sr.type;
0272     service->ranges = RB_ROOT;
0273     INIT_HLIST_NODE(&service->service_list);
0274     INIT_LIST_HEAD(&service->subscriptions);
0275     hd = &nt->services[hash(ua->sr.type)];
0276     hlist_add_head_rcu(&service->service_list, hd);
0277     return service;
0278 }
0279 
0280 /*  tipc_service_find_range - find service range matching publication parameters
0281  */
0282 static struct service_range *tipc_service_find_range(struct tipc_service *sc,
0283                              struct tipc_uaddr *ua)
0284 {
0285     struct service_range *sr;
0286 
0287     service_range_foreach_match(sr, sc, ua->sr.lower, ua->sr.upper) {
0288         /* Look for exact match */
0289         if (sr->lower == ua->sr.lower && sr->upper == ua->sr.upper)
0290             return sr;
0291     }
0292 
0293     return NULL;
0294 }
0295 
0296 static struct service_range *tipc_service_create_range(struct tipc_service *sc,
0297                                struct publication *p)
0298 {
0299     struct rb_node **n, *parent = NULL;
0300     struct service_range *sr;
0301     u32 lower = p->sr.lower;
0302     u32 upper = p->sr.upper;
0303 
0304     n = &sc->ranges.rb_node;
0305     while (*n) {
0306         parent = *n;
0307         sr = service_range_entry(parent);
0308         if (lower == sr->lower && upper == sr->upper)
0309             return sr;
0310         if (sr->max < upper)
0311             sr->max = upper;
0312         if (lower <= sr->lower)
0313             n = &parent->rb_left;
0314         else
0315             n = &parent->rb_right;
0316     }
0317     sr = kzalloc(sizeof(*sr), GFP_ATOMIC);
0318     if (!sr)
0319         return NULL;
0320     sr->lower = lower;
0321     sr->upper = upper;
0322     sr->max = upper;
0323     INIT_LIST_HEAD(&sr->local_publ);
0324     INIT_LIST_HEAD(&sr->all_publ);
0325     rb_link_node(&sr->tree_node, parent, n);
0326     rb_insert_augmented(&sr->tree_node, &sc->ranges, &sr_callbacks);
0327     return sr;
0328 }
0329 
0330 static bool tipc_service_insert_publ(struct net *net,
0331                      struct tipc_service *sc,
0332                      struct publication *p)
0333 {
0334     struct tipc_subscription *sub, *tmp;
0335     struct service_range *sr;
0336     struct publication *_p;
0337     u32 node = p->sk.node;
0338     bool first = false;
0339     bool res = false;
0340     u32 key = p->key;
0341 
0342     spin_lock_bh(&sc->lock);
0343     sr = tipc_service_create_range(sc, p);
0344     if (!sr)
0345         goto  exit;
0346 
0347     first = list_empty(&sr->all_publ);
0348 
0349     /* Return if the publication already exists */
0350     list_for_each_entry(_p, &sr->all_publ, all_publ) {
0351         if (_p->key == key && (!_p->sk.node || _p->sk.node == node)) {
0352             pr_debug("Failed to bind duplicate %u,%u,%u/%u:%u/%u\n",
0353                  p->sr.type, p->sr.lower, p->sr.upper,
0354                  node, p->sk.ref, key);
0355             goto exit;
0356         }
0357     }
0358 
0359     if (in_own_node(net, p->sk.node))
0360         list_add(&p->local_publ, &sr->local_publ);
0361     list_add(&p->all_publ, &sr->all_publ);
0362     p->id = sc->publ_cnt++;
0363 
0364     /* Any subscriptions waiting for notification?  */
0365     list_for_each_entry_safe(sub, tmp, &sc->subscriptions, service_list) {
0366         tipc_sub_report_overlap(sub, p, TIPC_PUBLISHED, first);
0367     }
0368     res = true;
0369 exit:
0370     if (!res)
0371         pr_warn("Failed to bind to %u,%u,%u\n",
0372             p->sr.type, p->sr.lower, p->sr.upper);
0373     spin_unlock_bh(&sc->lock);
0374     return res;
0375 }
0376 
0377 /**
0378  * tipc_service_remove_publ - remove a publication from a service
0379  * @r: service_range to remove publication from
0380  * @sk: address publishing socket
0381  * @key: target publication key
0382  */
0383 static struct publication *tipc_service_remove_publ(struct service_range *r,
0384                             struct tipc_socket_addr *sk,
0385                             u32 key)
0386 {
0387     struct publication *p;
0388     u32 node = sk->node;
0389 
0390     list_for_each_entry(p, &r->all_publ, all_publ) {
0391         if (p->key != key || (node && node != p->sk.node))
0392             continue;
0393         list_del(&p->all_publ);
0394         list_del(&p->local_publ);
0395         return p;
0396     }
0397     return NULL;
0398 }
0399 
0400 /*
0401  * Code reused: time_after32() for the same purpose
0402  */
0403 #define publication_after(pa, pb) time_after32((pa)->id, (pb)->id)
0404 static int tipc_publ_sort(void *priv, const struct list_head *a,
0405               const struct list_head *b)
0406 {
0407     struct publication *pa, *pb;
0408 
0409     pa = container_of(a, struct publication, list);
0410     pb = container_of(b, struct publication, list);
0411     return publication_after(pa, pb);
0412 }
0413 
0414 /**
0415  * tipc_service_subscribe - attach a subscription, and optionally
0416  * issue the prescribed number of events if there is any service
0417  * range overlapping with the requested range
0418  * @service: the tipc_service to attach the @sub to
0419  * @sub: the subscription to attach
0420  */
0421 static void tipc_service_subscribe(struct tipc_service *service,
0422                    struct tipc_subscription *sub)
0423 {
0424     struct publication *p, *first, *tmp;
0425     struct list_head publ_list;
0426     struct service_range *sr;
0427     u32 filter, lower, upper;
0428 
0429     filter = sub->s.filter;
0430     lower = sub->s.seq.lower;
0431     upper = sub->s.seq.upper;
0432 
0433     tipc_sub_get(sub);
0434     list_add(&sub->service_list, &service->subscriptions);
0435 
0436     if (filter & TIPC_SUB_NO_STATUS)
0437         return;
0438 
0439     INIT_LIST_HEAD(&publ_list);
0440     service_range_foreach_match(sr, service, lower, upper) {
0441         first = NULL;
0442         list_for_each_entry(p, &sr->all_publ, all_publ) {
0443             if (filter & TIPC_SUB_PORTS)
0444                 list_add_tail(&p->list, &publ_list);
0445             else if (!first || publication_after(first, p))
0446                 /* Pick this range's *first* publication */
0447                 first = p;
0448         }
0449         if (first)
0450             list_add_tail(&first->list, &publ_list);
0451     }
0452 
0453     /* Sort the publications before reporting */
0454     list_sort(NULL, &publ_list, tipc_publ_sort);
0455     list_for_each_entry_safe(p, tmp, &publ_list, list) {
0456         tipc_sub_report_overlap(sub, p, TIPC_PUBLISHED, true);
0457         list_del_init(&p->list);
0458     }
0459 }
0460 
0461 static struct tipc_service *tipc_service_find(struct net *net,
0462                           struct tipc_uaddr *ua)
0463 {
0464     struct name_table *nt = tipc_name_table(net);
0465     struct hlist_head *service_head;
0466     struct tipc_service *service;
0467 
0468     service_head = &nt->services[hash(ua->sr.type)];
0469     hlist_for_each_entry_rcu(service, service_head, service_list) {
0470         if (service->type == ua->sr.type)
0471             return service;
0472     }
0473     return NULL;
0474 };
0475 
0476 struct publication *tipc_nametbl_insert_publ(struct net *net,
0477                          struct tipc_uaddr *ua,
0478                          struct tipc_socket_addr *sk,
0479                          u32 key)
0480 {
0481     struct tipc_service *sc;
0482     struct publication *p;
0483 
0484     p = tipc_publ_create(ua, sk, key);
0485     if (!p)
0486         return NULL;
0487 
0488     sc = tipc_service_find(net, ua);
0489     if (!sc)
0490         sc = tipc_service_create(net, ua);
0491     if (sc && tipc_service_insert_publ(net, sc, p))
0492         return p;
0493     kfree(p);
0494     return NULL;
0495 }
0496 
0497 struct publication *tipc_nametbl_remove_publ(struct net *net,
0498                          struct tipc_uaddr *ua,
0499                          struct tipc_socket_addr *sk,
0500                          u32 key)
0501 {
0502     struct tipc_subscription *sub, *tmp;
0503     struct publication *p = NULL;
0504     struct service_range *sr;
0505     struct tipc_service *sc;
0506     bool last;
0507 
0508     sc = tipc_service_find(net, ua);
0509     if (!sc)
0510         goto exit;
0511 
0512     spin_lock_bh(&sc->lock);
0513     sr = tipc_service_find_range(sc, ua);
0514     if (!sr)
0515         goto unlock;
0516     p = tipc_service_remove_publ(sr, sk, key);
0517     if (!p)
0518         goto unlock;
0519 
0520     /* Notify any waiting subscriptions */
0521     last = list_empty(&sr->all_publ);
0522     list_for_each_entry_safe(sub, tmp, &sc->subscriptions, service_list) {
0523         tipc_sub_report_overlap(sub, p, TIPC_WITHDRAWN, last);
0524     }
0525 
0526     /* Remove service range item if this was its last publication */
0527     if (list_empty(&sr->all_publ)) {
0528         rb_erase_augmented(&sr->tree_node, &sc->ranges, &sr_callbacks);
0529         kfree(sr);
0530     }
0531 
0532     /* Delete service item if no more publications and subscriptions */
0533     if (RB_EMPTY_ROOT(&sc->ranges) && list_empty(&sc->subscriptions)) {
0534         hlist_del_init_rcu(&sc->service_list);
0535         kfree_rcu(sc, rcu);
0536     }
0537 unlock:
0538     spin_unlock_bh(&sc->lock);
0539 exit:
0540     if (!p) {
0541         pr_err("Failed to remove unknown binding: %u,%u,%u/%u:%u/%u\n",
0542                ua->sr.type, ua->sr.lower, ua->sr.upper,
0543                sk->node, sk->ref, key);
0544     }
0545     return p;
0546 }
0547 
0548 /**
0549  * tipc_nametbl_lookup_anycast - perform service instance to socket translation
0550  * @net: network namespace
0551  * @ua: service address to look up
0552  * @sk: address to socket we want to find
0553  *
0554  * On entry, a non-zero 'sk->node' indicates the node where we want lookup to be
0555  * performed, which may not be this one.
0556  *
0557  * On exit:
0558  *
0559  * - If lookup is deferred to another node, leave 'sk->node' unchanged and
0560  *   return 'true'.
0561  * - If lookup is successful, set the 'sk->node' and 'sk->ref' (== portid) which
0562  *   represent the bound socket and return 'true'.
0563  * - If lookup fails, return 'false'
0564  *
0565  * Note that for legacy users (node configured with Z.C.N address format) the
0566  * 'closest-first' lookup algorithm must be maintained, i.e., if sk.node is 0
0567  * we must look in the local binding list first
0568  */
0569 bool tipc_nametbl_lookup_anycast(struct net *net,
0570                  struct tipc_uaddr *ua,
0571                  struct tipc_socket_addr *sk)
0572 {
0573     struct tipc_net *tn = tipc_net(net);
0574     bool legacy = tn->legacy_addr_format;
0575     u32 self = tipc_own_addr(net);
0576     u32 inst = ua->sa.instance;
0577     struct service_range *r;
0578     struct tipc_service *sc;
0579     struct publication *p;
0580     struct list_head *l;
0581     bool res = false;
0582 
0583     if (!tipc_in_scope(legacy, sk->node, self))
0584         return true;
0585 
0586     rcu_read_lock();
0587     sc = tipc_service_find(net, ua);
0588     if (unlikely(!sc))
0589         goto exit;
0590 
0591     spin_lock_bh(&sc->lock);
0592     service_range_foreach_match(r, sc, inst, inst) {
0593         /* Select lookup algo: local, closest-first or round-robin */
0594         if (sk->node == self) {
0595             l = &r->local_publ;
0596             if (list_empty(l))
0597                 continue;
0598             p = list_first_entry(l, struct publication, local_publ);
0599             list_move_tail(&p->local_publ, &r->local_publ);
0600         } else if (legacy && !sk->node && !list_empty(&r->local_publ)) {
0601             l = &r->local_publ;
0602             p = list_first_entry(l, struct publication, local_publ);
0603             list_move_tail(&p->local_publ, &r->local_publ);
0604         } else {
0605             l = &r->all_publ;
0606             p = list_first_entry(l, struct publication, all_publ);
0607             list_move_tail(&p->all_publ, &r->all_publ);
0608         }
0609         *sk = p->sk;
0610         res = true;
0611         /* Todo: as for legacy, pick the first matching range only, a
0612          * "true" round-robin will be performed as needed.
0613          */
0614         break;
0615     }
0616     spin_unlock_bh(&sc->lock);
0617 
0618 exit:
0619     rcu_read_unlock();
0620     return res;
0621 }
0622 
0623 /* tipc_nametbl_lookup_group(): lookup destinaton(s) in a communication group
0624  * Returns a list of one (== group anycast) or more (== group multicast)
0625  * destination socket/node pairs matching the given address.
0626  * The requester may or may not want to exclude himself from the list.
0627  */
0628 bool tipc_nametbl_lookup_group(struct net *net, struct tipc_uaddr *ua,
0629                    struct list_head *dsts, int *dstcnt,
0630                    u32 exclude, bool mcast)
0631 {
0632     u32 self = tipc_own_addr(net);
0633     u32 inst = ua->sa.instance;
0634     struct service_range *sr;
0635     struct tipc_service *sc;
0636     struct publication *p;
0637 
0638     *dstcnt = 0;
0639     rcu_read_lock();
0640     sc = tipc_service_find(net, ua);
0641     if (unlikely(!sc))
0642         goto exit;
0643 
0644     spin_lock_bh(&sc->lock);
0645 
0646     /* Todo: a full search i.e. service_range_foreach_match() instead? */
0647     sr = service_range_match_first(sc->ranges.rb_node, inst, inst);
0648     if (!sr)
0649         goto no_match;
0650 
0651     list_for_each_entry(p, &sr->all_publ, all_publ) {
0652         if (p->scope != ua->scope)
0653             continue;
0654         if (p->sk.ref == exclude && p->sk.node == self)
0655             continue;
0656         tipc_dest_push(dsts, p->sk.node, p->sk.ref);
0657         (*dstcnt)++;
0658         if (mcast)
0659             continue;
0660         list_move_tail(&p->all_publ, &sr->all_publ);
0661         break;
0662     }
0663 no_match:
0664     spin_unlock_bh(&sc->lock);
0665 exit:
0666     rcu_read_unlock();
0667     return !list_empty(dsts);
0668 }
0669 
0670 /* tipc_nametbl_lookup_mcast_sockets(): look up node local destinaton sockets
0671  *                                      matching the given address
0672  * Used on nodes which have received a multicast/broadcast message
0673  * Returns a list of local sockets
0674  */
0675 void tipc_nametbl_lookup_mcast_sockets(struct net *net, struct tipc_uaddr *ua,
0676                        struct list_head *dports)
0677 {
0678     struct service_range *sr;
0679     struct tipc_service *sc;
0680     struct publication *p;
0681     u8 scope = ua->scope;
0682 
0683     rcu_read_lock();
0684     sc = tipc_service_find(net, ua);
0685     if (!sc)
0686         goto exit;
0687 
0688     spin_lock_bh(&sc->lock);
0689     service_range_foreach_match(sr, sc, ua->sr.lower, ua->sr.upper) {
0690         list_for_each_entry(p, &sr->local_publ, local_publ) {
0691             if (scope == p->scope || scope == TIPC_ANY_SCOPE)
0692                 tipc_dest_push(dports, 0, p->sk.ref);
0693         }
0694     }
0695     spin_unlock_bh(&sc->lock);
0696 exit:
0697     rcu_read_unlock();
0698 }
0699 
0700 /* tipc_nametbl_lookup_mcast_nodes(): look up all destination nodes matching
0701  *                                    the given address. Used in sending node.
0702  * Used on nodes which are sending out a multicast/broadcast message
0703  * Returns a list of nodes, including own node if applicable
0704  */
0705 void tipc_nametbl_lookup_mcast_nodes(struct net *net, struct tipc_uaddr *ua,
0706                      struct tipc_nlist *nodes)
0707 {
0708     struct service_range *sr;
0709     struct tipc_service *sc;
0710     struct publication *p;
0711 
0712     rcu_read_lock();
0713     sc = tipc_service_find(net, ua);
0714     if (!sc)
0715         goto exit;
0716 
0717     spin_lock_bh(&sc->lock);
0718     service_range_foreach_match(sr, sc, ua->sr.lower, ua->sr.upper) {
0719         list_for_each_entry(p, &sr->all_publ, all_publ) {
0720             tipc_nlist_add(nodes, p->sk.node);
0721         }
0722     }
0723     spin_unlock_bh(&sc->lock);
0724 exit:
0725     rcu_read_unlock();
0726 }
0727 
0728 /* tipc_nametbl_build_group - build list of communication group members
0729  */
0730 void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
0731                   struct tipc_uaddr *ua)
0732 {
0733     struct service_range *sr;
0734     struct tipc_service *sc;
0735     struct publication *p;
0736     struct rb_node *n;
0737 
0738     rcu_read_lock();
0739     sc = tipc_service_find(net, ua);
0740     if (!sc)
0741         goto exit;
0742 
0743     spin_lock_bh(&sc->lock);
0744     for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
0745         sr = container_of(n, struct service_range, tree_node);
0746         list_for_each_entry(p, &sr->all_publ, all_publ) {
0747             if (p->scope != ua->scope)
0748                 continue;
0749             tipc_group_add_member(grp, p->sk.node, p->sk.ref,
0750                           p->sr.lower);
0751         }
0752     }
0753     spin_unlock_bh(&sc->lock);
0754 exit:
0755     rcu_read_unlock();
0756 }
0757 
0758 /* tipc_nametbl_publish - add service binding to name table
0759  */
0760 struct publication *tipc_nametbl_publish(struct net *net, struct tipc_uaddr *ua,
0761                      struct tipc_socket_addr *sk, u32 key)
0762 {
0763     struct name_table *nt = tipc_name_table(net);
0764     struct tipc_net *tn = tipc_net(net);
0765     struct publication *p = NULL;
0766     struct sk_buff *skb = NULL;
0767     u32 rc_dests;
0768 
0769     spin_lock_bh(&tn->nametbl_lock);
0770 
0771     if (nt->local_publ_count >= TIPC_MAX_PUBL) {
0772         pr_warn("Bind failed, max limit %u reached\n", TIPC_MAX_PUBL);
0773         goto exit;
0774     }
0775 
0776     p = tipc_nametbl_insert_publ(net, ua, sk, key);
0777     if (p) {
0778         nt->local_publ_count++;
0779         skb = tipc_named_publish(net, p);
0780     }
0781     rc_dests = nt->rc_dests;
0782 exit:
0783     spin_unlock_bh(&tn->nametbl_lock);
0784 
0785     if (skb)
0786         tipc_node_broadcast(net, skb, rc_dests);
0787     return p;
0788 
0789 }
0790 
0791 /**
0792  * tipc_nametbl_withdraw - withdraw a service binding
0793  * @net: network namespace
0794  * @ua: service address/range being unbound
0795  * @sk: address of the socket being unbound from
0796  * @key: target publication key
0797  */
0798 void tipc_nametbl_withdraw(struct net *net, struct tipc_uaddr *ua,
0799                struct tipc_socket_addr *sk, u32 key)
0800 {
0801     struct name_table *nt = tipc_name_table(net);
0802     struct tipc_net *tn = tipc_net(net);
0803     struct sk_buff *skb = NULL;
0804     struct publication *p;
0805     u32 rc_dests;
0806 
0807     spin_lock_bh(&tn->nametbl_lock);
0808 
0809     p = tipc_nametbl_remove_publ(net, ua, sk, key);
0810     if (p) {
0811         nt->local_publ_count--;
0812         skb = tipc_named_withdraw(net, p);
0813         list_del_init(&p->binding_sock);
0814         kfree_rcu(p, rcu);
0815     }
0816     rc_dests = nt->rc_dests;
0817     spin_unlock_bh(&tn->nametbl_lock);
0818 
0819     if (skb)
0820         tipc_node_broadcast(net, skb, rc_dests);
0821 }
0822 
0823 /**
0824  * tipc_nametbl_subscribe - add a subscription object to the name table
0825  * @sub: subscription to add
0826  */
0827 bool tipc_nametbl_subscribe(struct tipc_subscription *sub)
0828 {
0829     struct tipc_net *tn = tipc_net(sub->net);
0830     u32 type = sub->s.seq.type;
0831     struct tipc_service *sc;
0832     struct tipc_uaddr ua;
0833     bool res = true;
0834 
0835     tipc_uaddr(&ua, TIPC_SERVICE_RANGE, TIPC_NODE_SCOPE, type,
0836            sub->s.seq.lower, sub->s.seq.upper);
0837     spin_lock_bh(&tn->nametbl_lock);
0838     sc = tipc_service_find(sub->net, &ua);
0839     if (!sc)
0840         sc = tipc_service_create(sub->net, &ua);
0841     if (sc) {
0842         spin_lock_bh(&sc->lock);
0843         tipc_service_subscribe(sc, sub);
0844         spin_unlock_bh(&sc->lock);
0845     } else {
0846         pr_warn("Failed to subscribe for {%u,%u,%u}\n",
0847             type, sub->s.seq.lower, sub->s.seq.upper);
0848         res = false;
0849     }
0850     spin_unlock_bh(&tn->nametbl_lock);
0851     return res;
0852 }
0853 
0854 /**
0855  * tipc_nametbl_unsubscribe - remove a subscription object from name table
0856  * @sub: subscription to remove
0857  */
0858 void tipc_nametbl_unsubscribe(struct tipc_subscription *sub)
0859 {
0860     struct tipc_net *tn = tipc_net(sub->net);
0861     struct tipc_service *sc;
0862     struct tipc_uaddr ua;
0863 
0864     tipc_uaddr(&ua, TIPC_SERVICE_RANGE, TIPC_NODE_SCOPE,
0865            sub->s.seq.type, sub->s.seq.lower, sub->s.seq.upper);
0866     spin_lock_bh(&tn->nametbl_lock);
0867     sc = tipc_service_find(sub->net, &ua);
0868     if (!sc)
0869         goto exit;
0870 
0871     spin_lock_bh(&sc->lock);
0872     list_del_init(&sub->service_list);
0873     tipc_sub_put(sub);
0874 
0875     /* Delete service item if no more publications and subscriptions */
0876     if (RB_EMPTY_ROOT(&sc->ranges) && list_empty(&sc->subscriptions)) {
0877         hlist_del_init_rcu(&sc->service_list);
0878         kfree_rcu(sc, rcu);
0879     }
0880     spin_unlock_bh(&sc->lock);
0881 exit:
0882     spin_unlock_bh(&tn->nametbl_lock);
0883 }
0884 
0885 int tipc_nametbl_init(struct net *net)
0886 {
0887     struct tipc_net *tn = tipc_net(net);
0888     struct name_table *nt;
0889     int i;
0890 
0891     nt = kzalloc(sizeof(*nt), GFP_KERNEL);
0892     if (!nt)
0893         return -ENOMEM;
0894 
0895     for (i = 0; i < TIPC_NAMETBL_SIZE; i++)
0896         INIT_HLIST_HEAD(&nt->services[i]);
0897 
0898     INIT_LIST_HEAD(&nt->node_scope);
0899     INIT_LIST_HEAD(&nt->cluster_scope);
0900     rwlock_init(&nt->cluster_scope_lock);
0901     tn->nametbl = nt;
0902     spin_lock_init(&tn->nametbl_lock);
0903     return 0;
0904 }
0905 
0906 /**
0907  * tipc_service_delete - purge all publications for a service and delete it
0908  * @net: the associated network namespace
0909  * @sc: tipc_service to delete
0910  */
0911 static void tipc_service_delete(struct net *net, struct tipc_service *sc)
0912 {
0913     struct service_range *sr, *tmpr;
0914     struct publication *p, *tmp;
0915 
0916     spin_lock_bh(&sc->lock);
0917     rbtree_postorder_for_each_entry_safe(sr, tmpr, &sc->ranges, tree_node) {
0918         list_for_each_entry_safe(p, tmp, &sr->all_publ, all_publ) {
0919             tipc_service_remove_publ(sr, &p->sk, p->key);
0920             kfree_rcu(p, rcu);
0921         }
0922         rb_erase_augmented(&sr->tree_node, &sc->ranges, &sr_callbacks);
0923         kfree(sr);
0924     }
0925     hlist_del_init_rcu(&sc->service_list);
0926     spin_unlock_bh(&sc->lock);
0927     kfree_rcu(sc, rcu);
0928 }
0929 
0930 void tipc_nametbl_stop(struct net *net)
0931 {
0932     struct name_table *nt = tipc_name_table(net);
0933     struct tipc_net *tn = tipc_net(net);
0934     struct hlist_head *service_head;
0935     struct tipc_service *service;
0936     u32 i;
0937 
0938     /* Verify name table is empty and purge any lingering
0939      * publications, then release the name table
0940      */
0941     spin_lock_bh(&tn->nametbl_lock);
0942     for (i = 0; i < TIPC_NAMETBL_SIZE; i++) {
0943         if (hlist_empty(&nt->services[i]))
0944             continue;
0945         service_head = &nt->services[i];
0946         hlist_for_each_entry_rcu(service, service_head, service_list) {
0947             tipc_service_delete(net, service);
0948         }
0949     }
0950     spin_unlock_bh(&tn->nametbl_lock);
0951 
0952     synchronize_net();
0953     kfree(nt);
0954 }
0955 
0956 static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg,
0957                     struct tipc_service *service,
0958                     struct service_range *sr,
0959                     u32 *last_key)
0960 {
0961     struct publication *p;
0962     struct nlattr *attrs;
0963     struct nlattr *b;
0964     void *hdr;
0965 
0966     if (*last_key) {
0967         list_for_each_entry(p, &sr->all_publ, all_publ)
0968             if (p->key == *last_key)
0969                 break;
0970         if (list_entry_is_head(p, &sr->all_publ, all_publ))
0971             return -EPIPE;
0972     } else {
0973         p = list_first_entry(&sr->all_publ,
0974                      struct publication,
0975                      all_publ);
0976     }
0977 
0978     list_for_each_entry_from(p, &sr->all_publ, all_publ) {
0979         *last_key = p->key;
0980 
0981         hdr = genlmsg_put(msg->skb, msg->portid, msg->seq,
0982                   &tipc_genl_family, NLM_F_MULTI,
0983                   TIPC_NL_NAME_TABLE_GET);
0984         if (!hdr)
0985             return -EMSGSIZE;
0986 
0987         attrs = nla_nest_start_noflag(msg->skb, TIPC_NLA_NAME_TABLE);
0988         if (!attrs)
0989             goto msg_full;
0990 
0991         b = nla_nest_start_noflag(msg->skb, TIPC_NLA_NAME_TABLE_PUBL);
0992         if (!b)
0993             goto attr_msg_full;
0994 
0995         if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_TYPE, service->type))
0996             goto publ_msg_full;
0997         if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_LOWER, sr->lower))
0998             goto publ_msg_full;
0999         if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_UPPER, sr->upper))
1000             goto publ_msg_full;
1001         if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_SCOPE, p->scope))
1002             goto publ_msg_full;
1003         if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_NODE, p->sk.node))
1004             goto publ_msg_full;
1005         if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_REF, p->sk.ref))
1006             goto publ_msg_full;
1007         if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_KEY, p->key))
1008             goto publ_msg_full;
1009 
1010         nla_nest_end(msg->skb, b);
1011         nla_nest_end(msg->skb, attrs);
1012         genlmsg_end(msg->skb, hdr);
1013     }
1014     *last_key = 0;
1015 
1016     return 0;
1017 
1018 publ_msg_full:
1019     nla_nest_cancel(msg->skb, b);
1020 attr_msg_full:
1021     nla_nest_cancel(msg->skb, attrs);
1022 msg_full:
1023     genlmsg_cancel(msg->skb, hdr);
1024 
1025     return -EMSGSIZE;
1026 }
1027 
1028 static int __tipc_nl_service_range_list(struct tipc_nl_msg *msg,
1029                     struct tipc_service *sc,
1030                     u32 *last_lower, u32 *last_key)
1031 {
1032     struct service_range *sr;
1033     struct rb_node *n;
1034     int err;
1035 
1036     for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
1037         sr = container_of(n, struct service_range, tree_node);
1038         if (sr->lower < *last_lower)
1039             continue;
1040         err = __tipc_nl_add_nametable_publ(msg, sc, sr, last_key);
1041         if (err) {
1042             *last_lower = sr->lower;
1043             return err;
1044         }
1045     }
1046     *last_lower = 0;
1047     return 0;
1048 }
1049 
1050 static int tipc_nl_service_list(struct net *net, struct tipc_nl_msg *msg,
1051                 u32 *last_type, u32 *last_lower, u32 *last_key)
1052 {
1053     struct tipc_net *tn = tipc_net(net);
1054     struct tipc_service *service = NULL;
1055     struct hlist_head *head;
1056     struct tipc_uaddr ua;
1057     int err;
1058     int i;
1059 
1060     if (*last_type)
1061         i = hash(*last_type);
1062     else
1063         i = 0;
1064 
1065     for (; i < TIPC_NAMETBL_SIZE; i++) {
1066         head = &tn->nametbl->services[i];
1067 
1068         if (*last_type ||
1069             (!i && *last_key && (*last_lower == *last_key))) {
1070             tipc_uaddr(&ua, TIPC_SERVICE_RANGE, TIPC_NODE_SCOPE,
1071                    *last_type, *last_lower, *last_lower);
1072             service = tipc_service_find(net, &ua);
1073             if (!service)
1074                 return -EPIPE;
1075         } else {
1076             hlist_for_each_entry_rcu(service, head, service_list)
1077                 break;
1078             if (!service)
1079                 continue;
1080         }
1081 
1082         hlist_for_each_entry_from_rcu(service, service_list) {
1083             spin_lock_bh(&service->lock);
1084             err = __tipc_nl_service_range_list(msg, service,
1085                                last_lower,
1086                                last_key);
1087 
1088             if (err) {
1089                 *last_type = service->type;
1090                 spin_unlock_bh(&service->lock);
1091                 return err;
1092             }
1093             spin_unlock_bh(&service->lock);
1094         }
1095         *last_type = 0;
1096     }
1097     return 0;
1098 }
1099 
1100 int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)
1101 {
1102     struct net *net = sock_net(skb->sk);
1103     u32 last_type = cb->args[0];
1104     u32 last_lower = cb->args[1];
1105     u32 last_key = cb->args[2];
1106     int done = cb->args[3];
1107     struct tipc_nl_msg msg;
1108     int err;
1109 
1110     if (done)
1111         return 0;
1112 
1113     msg.skb = skb;
1114     msg.portid = NETLINK_CB(cb->skb).portid;
1115     msg.seq = cb->nlh->nlmsg_seq;
1116 
1117     rcu_read_lock();
1118     err = tipc_nl_service_list(net, &msg, &last_type,
1119                    &last_lower, &last_key);
1120     if (!err) {
1121         done = 1;
1122     } else if (err != -EMSGSIZE) {
1123         /* We never set seq or call nl_dump_check_consistent() this
1124          * means that setting prev_seq here will cause the consistence
1125          * check to fail in the netlink callback handler. Resulting in
1126          * the NLMSG_DONE message having the NLM_F_DUMP_INTR flag set if
1127          * we got an error.
1128          */
1129         cb->prev_seq = 1;
1130     }
1131     rcu_read_unlock();
1132 
1133     cb->args[0] = last_type;
1134     cb->args[1] = last_lower;
1135     cb->args[2] = last_key;
1136     cb->args[3] = done;
1137 
1138     return skb->len;
1139 }
1140 
1141 struct tipc_dest *tipc_dest_find(struct list_head *l, u32 node, u32 port)
1142 {
1143     struct tipc_dest *dst;
1144 
1145     list_for_each_entry(dst, l, list) {
1146         if (dst->node == node && dst->port == port)
1147             return dst;
1148     }
1149     return NULL;
1150 }
1151 
1152 bool tipc_dest_push(struct list_head *l, u32 node, u32 port)
1153 {
1154     struct tipc_dest *dst;
1155 
1156     if (tipc_dest_find(l, node, port))
1157         return false;
1158 
1159     dst = kmalloc(sizeof(*dst), GFP_ATOMIC);
1160     if (unlikely(!dst))
1161         return false;
1162     dst->node = node;
1163     dst->port = port;
1164     list_add(&dst->list, l);
1165     return true;
1166 }
1167 
1168 bool tipc_dest_pop(struct list_head *l, u32 *node, u32 *port)
1169 {
1170     struct tipc_dest *dst;
1171 
1172     if (list_empty(l))
1173         return false;
1174     dst = list_first_entry(l, typeof(*dst), list);
1175     if (port)
1176         *port = dst->port;
1177     if (node)
1178         *node = dst->node;
1179     list_del(&dst->list);
1180     kfree(dst);
1181     return true;
1182 }
1183 
1184 bool tipc_dest_del(struct list_head *l, u32 node, u32 port)
1185 {
1186     struct tipc_dest *dst;
1187 
1188     dst = tipc_dest_find(l, node, port);
1189     if (!dst)
1190         return false;
1191     list_del(&dst->list);
1192     kfree(dst);
1193     return true;
1194 }
1195 
1196 void tipc_dest_list_purge(struct list_head *l)
1197 {
1198     struct tipc_dest *dst, *tmp;
1199 
1200     list_for_each_entry_safe(dst, tmp, l, list) {
1201         list_del(&dst->list);
1202         kfree(dst);
1203     }
1204 }