Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * net/tipc/bcast.c: TIPC broadcast code
0003  *
0004  * Copyright (c) 2004-2006, 2014-2017, Ericsson AB
0005  * Copyright (c) 2004, Intel Corporation.
0006  * Copyright (c) 2005, 2010-2011, Wind River Systems
0007  * All rights reserved.
0008  *
0009  * Redistribution and use in source and binary forms, with or without
0010  * modification, are permitted provided that the following conditions are met:
0011  *
0012  * 1. Redistributions of source code must retain the above copyright
0013  *    notice, this list of conditions and the following disclaimer.
0014  * 2. Redistributions in binary form must reproduce the above copyright
0015  *    notice, this list of conditions and the following disclaimer in the
0016  *    documentation and/or other materials provided with the distribution.
0017  * 3. Neither the names of the copyright holders nor the names of its
0018  *    contributors may be used to endorse or promote products derived from
0019  *    this software without specific prior written permission.
0020  *
0021  * Alternatively, this software may be distributed under the terms of the
0022  * GNU General Public License ("GPL") version 2 as published by the Free
0023  * Software Foundation.
0024  *
0025  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
0026  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
0027  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
0028  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
0029  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
0030  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
0031  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
0032  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
0033  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
0034  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
0035  * POSSIBILITY OF SUCH DAMAGE.
0036  */
0037 
0038 #include <linux/tipc_config.h>
0039 #include "socket.h"
0040 #include "msg.h"
0041 #include "bcast.h"
0042 #include "link.h"
0043 #include "name_table.h"
0044 
0045 #define BCLINK_WIN_DEFAULT  50  /* bcast link window size (default) */
0046 #define BCLINK_WIN_MIN      32  /* bcast minimum link window size */
0047 
0048 const char tipc_bclink_name[] = "broadcast-link";
0049 unsigned long sysctl_tipc_bc_retruni __read_mostly;
0050 
0051 /**
0052  * struct tipc_bc_base - base structure for keeping broadcast send state
0053  * @link: broadcast send link structure
0054  * @inputq: data input queue; will only carry SOCK_WAKEUP messages
0055  * @dests: array keeping number of reachable destinations per bearer
0056  * @primary_bearer: a bearer having links to all broadcast destinations, if any
0057  * @bcast_support: indicates if primary bearer, if any, supports broadcast
0058  * @force_bcast: forces broadcast for multicast traffic
0059  * @rcast_support: indicates if all peer nodes support replicast
0060  * @force_rcast: forces replicast for multicast traffic
0061  * @rc_ratio: dest count as percentage of cluster size where send method changes
0062  * @bc_threshold: calculated from rc_ratio; if dests > threshold use broadcast
0063  */
0064 struct tipc_bc_base {
0065     struct tipc_link *link;
0066     struct sk_buff_head inputq;
0067     int dests[MAX_BEARERS];
0068     int primary_bearer;
0069     bool bcast_support;
0070     bool force_bcast;
0071     bool rcast_support;
0072     bool force_rcast;
0073     int rc_ratio;
0074     int bc_threshold;
0075 };
0076 
0077 static struct tipc_bc_base *tipc_bc_base(struct net *net)
0078 {
0079     return tipc_net(net)->bcbase;
0080 }
0081 
0082 /* tipc_bcast_get_mtu(): -get the MTU currently used by broadcast link
0083  * Note: the MTU is decremented to give room for a tunnel header, in
0084  * case the message needs to be sent as replicast
0085  */
0086 int tipc_bcast_get_mtu(struct net *net)
0087 {
0088     return tipc_link_mss(tipc_bc_sndlink(net));
0089 }
0090 
0091 void tipc_bcast_toggle_rcast(struct net *net, bool supp)
0092 {
0093     tipc_bc_base(net)->rcast_support = supp;
0094 }
0095 
0096 static void tipc_bcbase_calc_bc_threshold(struct net *net)
0097 {
0098     struct tipc_bc_base *bb = tipc_bc_base(net);
0099     int cluster_size = tipc_link_bc_peers(tipc_bc_sndlink(net));
0100 
0101     bb->bc_threshold = 1 + (cluster_size * bb->rc_ratio / 100);
0102 }
0103 
0104 /* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
0105  *                               if any, and make it primary bearer
0106  */
0107 static void tipc_bcbase_select_primary(struct net *net)
0108 {
0109     struct tipc_bc_base *bb = tipc_bc_base(net);
0110     int all_dests =  tipc_link_bc_peers(bb->link);
0111     int max_win = tipc_link_max_win(bb->link);
0112     int min_win = tipc_link_min_win(bb->link);
0113     int i, mtu, prim;
0114 
0115     bb->primary_bearer = INVALID_BEARER_ID;
0116     bb->bcast_support = true;
0117 
0118     if (!all_dests)
0119         return;
0120 
0121     for (i = 0; i < MAX_BEARERS; i++) {
0122         if (!bb->dests[i])
0123             continue;
0124 
0125         mtu = tipc_bearer_mtu(net, i);
0126         if (mtu < tipc_link_mtu(bb->link)) {
0127             tipc_link_set_mtu(bb->link, mtu);
0128             tipc_link_set_queue_limits(bb->link,
0129                            min_win,
0130                            max_win);
0131         }
0132         bb->bcast_support &= tipc_bearer_bcast_support(net, i);
0133         if (bb->dests[i] < all_dests)
0134             continue;
0135 
0136         bb->primary_bearer = i;
0137 
0138         /* Reduce risk that all nodes select same primary */
0139         if ((i ^ tipc_own_addr(net)) & 1)
0140             break;
0141     }
0142     prim = bb->primary_bearer;
0143     if (prim != INVALID_BEARER_ID)
0144         bb->bcast_support = tipc_bearer_bcast_support(net, prim);
0145 }
0146 
0147 void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id)
0148 {
0149     struct tipc_bc_base *bb = tipc_bc_base(net);
0150 
0151     tipc_bcast_lock(net);
0152     bb->dests[bearer_id]++;
0153     tipc_bcbase_select_primary(net);
0154     tipc_bcast_unlock(net);
0155 }
0156 
0157 void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id)
0158 {
0159     struct tipc_bc_base *bb = tipc_bc_base(net);
0160 
0161     tipc_bcast_lock(net);
0162     bb->dests[bearer_id]--;
0163     tipc_bcbase_select_primary(net);
0164     tipc_bcast_unlock(net);
0165 }
0166 
0167 /* tipc_bcbase_xmit - broadcast a packet queue across one or more bearers
0168  *
0169  * Note that number of reachable destinations, as indicated in the dests[]
0170  * array, may transitionally differ from the number of destinations indicated
0171  * in each sent buffer. We can sustain this. Excess destination nodes will
0172  * drop and never acknowledge the unexpected packets, and missing destinations
0173  * will either require retransmission (if they are just about to be added to
0174  * the bearer), or be removed from the buffer's 'ackers' counter (if they
0175  * just went down)
0176  */
0177 static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
0178 {
0179     int bearer_id;
0180     struct tipc_bc_base *bb = tipc_bc_base(net);
0181     struct sk_buff *skb, *_skb;
0182     struct sk_buff_head _xmitq;
0183 
0184     if (skb_queue_empty(xmitq))
0185         return;
0186 
0187     /* The typical case: at least one bearer has links to all nodes */
0188     bearer_id = bb->primary_bearer;
0189     if (bearer_id >= 0) {
0190         tipc_bearer_bc_xmit(net, bearer_id, xmitq);
0191         return;
0192     }
0193 
0194     /* We have to transmit across all bearers */
0195     __skb_queue_head_init(&_xmitq);
0196     for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
0197         if (!bb->dests[bearer_id])
0198             continue;
0199 
0200         skb_queue_walk(xmitq, skb) {
0201             _skb = pskb_copy_for_clone(skb, GFP_ATOMIC);
0202             if (!_skb)
0203                 break;
0204             __skb_queue_tail(&_xmitq, _skb);
0205         }
0206         tipc_bearer_bc_xmit(net, bearer_id, &_xmitq);
0207     }
0208     __skb_queue_purge(xmitq);
0209     __skb_queue_purge(&_xmitq);
0210 }
0211 
0212 static void tipc_bcast_select_xmit_method(struct net *net, int dests,
0213                       struct tipc_mc_method *method)
0214 {
0215     struct tipc_bc_base *bb = tipc_bc_base(net);
0216     unsigned long exp = method->expires;
0217 
0218     /* Broadcast supported by used bearer/bearers? */
0219     if (!bb->bcast_support) {
0220         method->rcast = true;
0221         return;
0222     }
0223     /* Any destinations which don't support replicast ? */
0224     if (!bb->rcast_support) {
0225         method->rcast = false;
0226         return;
0227     }
0228     /* Can current method be changed ? */
0229     method->expires = jiffies + TIPC_METHOD_EXPIRE;
0230     if (method->mandatory)
0231         return;
0232 
0233     if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) &&
0234         time_before(jiffies, exp))
0235         return;
0236 
0237     /* Configuration as force 'broadcast' method */
0238     if (bb->force_bcast) {
0239         method->rcast = false;
0240         return;
0241     }
0242     /* Configuration as force 'replicast' method */
0243     if (bb->force_rcast) {
0244         method->rcast = true;
0245         return;
0246     }
0247     /* Configuration as 'autoselect' or default method */
0248     /* Determine method to use now */
0249     method->rcast = dests <= bb->bc_threshold;
0250 }
0251 
0252 /* tipc_bcast_xmit - broadcast the buffer chain to all external nodes
0253  * @net: the applicable net namespace
0254  * @pkts: chain of buffers containing message
0255  * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0
0256  * Consumes the buffer chain.
0257  * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
0258  */
0259 int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
0260             u16 *cong_link_cnt)
0261 {
0262     struct tipc_link *l = tipc_bc_sndlink(net);
0263     struct sk_buff_head xmitq;
0264     int rc = 0;
0265 
0266     __skb_queue_head_init(&xmitq);
0267     tipc_bcast_lock(net);
0268     if (tipc_link_bc_peers(l))
0269         rc = tipc_link_xmit(l, pkts, &xmitq);
0270     tipc_bcast_unlock(net);
0271     tipc_bcbase_xmit(net, &xmitq);
0272     __skb_queue_purge(pkts);
0273     if (rc == -ELINKCONG) {
0274         *cong_link_cnt = 1;
0275         rc = 0;
0276     }
0277     return rc;
0278 }
0279 
0280 /* tipc_rcast_xmit - replicate and send a message to given destination nodes
0281  * @net: the applicable net namespace
0282  * @pkts: chain of buffers containing message
0283  * @dests: list of destination nodes
0284  * @cong_link_cnt: returns number of congested links
0285  * @cong_links: returns identities of congested links
0286  * Returns 0 if success, otherwise errno
0287  */
0288 static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
0289                struct tipc_nlist *dests, u16 *cong_link_cnt)
0290 {
0291     struct tipc_dest *dst, *tmp;
0292     struct sk_buff_head _pkts;
0293     u32 dnode, selector;
0294 
0295     selector = msg_link_selector(buf_msg(skb_peek(pkts)));
0296     __skb_queue_head_init(&_pkts);
0297 
0298     list_for_each_entry_safe(dst, tmp, &dests->list, list) {
0299         dnode = dst->node;
0300         if (!tipc_msg_pskb_copy(dnode, pkts, &_pkts))
0301             return -ENOMEM;
0302 
0303         /* Any other return value than -ELINKCONG is ignored */
0304         if (tipc_node_xmit(net, &_pkts, dnode, selector) == -ELINKCONG)
0305             (*cong_link_cnt)++;
0306     }
0307     return 0;
0308 }
0309 
0310 /* tipc_mcast_send_sync - deliver a dummy message with SYN bit
0311  * @net: the applicable net namespace
0312  * @skb: socket buffer to copy
0313  * @method: send method to be used
0314  * @dests: destination nodes for message.
0315  * Returns 0 if success, otherwise errno
0316  */
0317 static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb,
0318                 struct tipc_mc_method *method,
0319                 struct tipc_nlist *dests)
0320 {
0321     struct tipc_msg *hdr, *_hdr;
0322     struct sk_buff_head tmpq;
0323     struct sk_buff *_skb;
0324     u16 cong_link_cnt;
0325     int rc = 0;
0326 
0327     /* Is a cluster supporting with new capabilities ? */
0328     if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL))
0329         return 0;
0330 
0331     hdr = buf_msg(skb);
0332     if (msg_user(hdr) == MSG_FRAGMENTER)
0333         hdr = msg_inner_hdr(hdr);
0334     if (msg_type(hdr) != TIPC_MCAST_MSG)
0335         return 0;
0336 
0337     /* Allocate dummy message */
0338     _skb = tipc_buf_acquire(MCAST_H_SIZE, GFP_KERNEL);
0339     if (!_skb)
0340         return -ENOMEM;
0341 
0342     /* Preparing for 'synching' header */
0343     msg_set_syn(hdr, 1);
0344 
0345     /* Copy skb's header into a dummy header */
0346     skb_copy_to_linear_data(_skb, hdr, MCAST_H_SIZE);
0347     skb_orphan(_skb);
0348 
0349     /* Reverse method for dummy message */
0350     _hdr = buf_msg(_skb);
0351     msg_set_size(_hdr, MCAST_H_SIZE);
0352     msg_set_is_rcast(_hdr, !msg_is_rcast(hdr));
0353     msg_set_errcode(_hdr, TIPC_ERR_NO_PORT);
0354 
0355     __skb_queue_head_init(&tmpq);
0356     __skb_queue_tail(&tmpq, _skb);
0357     if (method->rcast)
0358         rc = tipc_bcast_xmit(net, &tmpq, &cong_link_cnt);
0359     else
0360         rc = tipc_rcast_xmit(net, &tmpq, dests, &cong_link_cnt);
0361 
0362     /* This queue should normally be empty by now */
0363     __skb_queue_purge(&tmpq);
0364 
0365     return rc;
0366 }
0367 
0368 /* tipc_mcast_xmit - deliver message to indicated destination nodes
0369  *                   and to identified node local sockets
0370  * @net: the applicable net namespace
0371  * @pkts: chain of buffers containing message
0372  * @method: send method to be used
0373  * @dests: destination nodes for message.
0374  * @cong_link_cnt: returns number of encountered congested destination links
0375  * Consumes buffer chain.
0376  * Returns 0 if success, otherwise errno
0377  */
0378 int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
0379             struct tipc_mc_method *method, struct tipc_nlist *dests,
0380             u16 *cong_link_cnt)
0381 {
0382     struct sk_buff_head inputq, localq;
0383     bool rcast = method->rcast;
0384     struct tipc_msg *hdr;
0385     struct sk_buff *skb;
0386     int rc = 0;
0387 
0388     skb_queue_head_init(&inputq);
0389     __skb_queue_head_init(&localq);
0390 
0391     /* Clone packets before they are consumed by next call */
0392     if (dests->local && !tipc_msg_reassemble(pkts, &localq)) {
0393         rc = -ENOMEM;
0394         goto exit;
0395     }
0396     /* Send according to determined transmit method */
0397     if (dests->remote) {
0398         tipc_bcast_select_xmit_method(net, dests->remote, method);
0399 
0400         skb = skb_peek(pkts);
0401         hdr = buf_msg(skb);
0402         if (msg_user(hdr) == MSG_FRAGMENTER)
0403             hdr = msg_inner_hdr(hdr);
0404         msg_set_is_rcast(hdr, method->rcast);
0405 
0406         /* Switch method ? */
0407         if (rcast != method->rcast) {
0408             rc = tipc_mcast_send_sync(net, skb, method, dests);
0409             if (unlikely(rc)) {
0410                 pr_err("Unable to send SYN: method %d, rc %d\n",
0411                        rcast, rc);
0412                 goto exit;
0413             }
0414         }
0415 
0416         if (method->rcast)
0417             rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
0418         else
0419             rc = tipc_bcast_xmit(net, pkts, cong_link_cnt);
0420     }
0421 
0422     if (dests->local) {
0423         tipc_loopback_trace(net, &localq);
0424         tipc_sk_mcast_rcv(net, &localq, &inputq);
0425     }
0426 exit:
0427     /* This queue should normally be empty by now */
0428     __skb_queue_purge(pkts);
0429     return rc;
0430 }
0431 
0432 /* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link
0433  *
0434  * RCU is locked, no other locks set
0435  */
0436 int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb)
0437 {
0438     struct tipc_msg *hdr = buf_msg(skb);
0439     struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
0440     struct sk_buff_head xmitq;
0441     int rc;
0442 
0443     __skb_queue_head_init(&xmitq);
0444 
0445     if (msg_mc_netid(hdr) != tipc_netid(net) || !tipc_link_is_up(l)) {
0446         kfree_skb(skb);
0447         return 0;
0448     }
0449 
0450     tipc_bcast_lock(net);
0451     if (msg_user(hdr) == BCAST_PROTOCOL)
0452         rc = tipc_link_bc_nack_rcv(l, skb, &xmitq);
0453     else
0454         rc = tipc_link_rcv(l, skb, NULL);
0455     tipc_bcast_unlock(net);
0456 
0457     tipc_bcbase_xmit(net, &xmitq);
0458 
0459     /* Any socket wakeup messages ? */
0460     if (!skb_queue_empty(inputq))
0461         tipc_sk_rcv(net, inputq);
0462 
0463     return rc;
0464 }
0465 
0466 /* tipc_bcast_ack_rcv - receive and handle a broadcast acknowledge
0467  *
0468  * RCU is locked, no other locks set
0469  */
0470 void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
0471             struct tipc_msg *hdr)
0472 {
0473     struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
0474     u16 acked = msg_bcast_ack(hdr);
0475     struct sk_buff_head xmitq;
0476 
0477     /* Ignore bc acks sent by peer before bcast synch point was received */
0478     if (msg_bc_ack_invalid(hdr))
0479         return;
0480 
0481     __skb_queue_head_init(&xmitq);
0482 
0483     tipc_bcast_lock(net);
0484     tipc_link_bc_ack_rcv(l, acked, 0, NULL, &xmitq, NULL);
0485     tipc_bcast_unlock(net);
0486 
0487     tipc_bcbase_xmit(net, &xmitq);
0488 
0489     /* Any socket wakeup messages ? */
0490     if (!skb_queue_empty(inputq))
0491         tipc_sk_rcv(net, inputq);
0492 }
0493 
0494 /* tipc_bcast_synch_rcv -  check and update rcv link with peer's send state
0495  *
0496  * RCU is locked, no other locks set
0497  */
0498 int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
0499             struct tipc_msg *hdr,
0500             struct sk_buff_head *retrq)
0501 {
0502     struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
0503     struct tipc_gap_ack_blks *ga;
0504     struct sk_buff_head xmitq;
0505     int rc = 0;
0506 
0507     __skb_queue_head_init(&xmitq);
0508 
0509     tipc_bcast_lock(net);
0510     if (msg_type(hdr) != STATE_MSG) {
0511         tipc_link_bc_init_rcv(l, hdr);
0512     } else if (!msg_bc_ack_invalid(hdr)) {
0513         tipc_get_gap_ack_blks(&ga, l, hdr, false);
0514         if (!sysctl_tipc_bc_retruni)
0515             retrq = &xmitq;
0516         rc = tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr),
0517                       msg_bc_gap(hdr), ga, &xmitq,
0518                       retrq);
0519         rc |= tipc_link_bc_sync_rcv(l, hdr, &xmitq);
0520     }
0521     tipc_bcast_unlock(net);
0522 
0523     tipc_bcbase_xmit(net, &xmitq);
0524 
0525     /* Any socket wakeup messages ? */
0526     if (!skb_queue_empty(inputq))
0527         tipc_sk_rcv(net, inputq);
0528     return rc;
0529 }
0530 
0531 /* tipc_bcast_add_peer - add a peer node to broadcast link and bearer
0532  *
0533  * RCU is locked, node lock is set
0534  */
0535 void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l,
0536              struct sk_buff_head *xmitq)
0537 {
0538     struct tipc_link *snd_l = tipc_bc_sndlink(net);
0539 
0540     tipc_bcast_lock(net);
0541     tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
0542     tipc_bcbase_select_primary(net);
0543     tipc_bcbase_calc_bc_threshold(net);
0544     tipc_bcast_unlock(net);
0545 }
0546 
0547 /* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer
0548  *
0549  * RCU is locked, node lock is set
0550  */
0551 void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
0552 {
0553     struct tipc_link *snd_l = tipc_bc_sndlink(net);
0554     struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
0555     struct sk_buff_head xmitq;
0556 
0557     __skb_queue_head_init(&xmitq);
0558 
0559     tipc_bcast_lock(net);
0560     tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
0561     tipc_bcbase_select_primary(net);
0562     tipc_bcbase_calc_bc_threshold(net);
0563     tipc_bcast_unlock(net);
0564 
0565     tipc_bcbase_xmit(net, &xmitq);
0566 
0567     /* Any socket wakeup messages ? */
0568     if (!skb_queue_empty(inputq))
0569         tipc_sk_rcv(net, inputq);
0570 }
0571 
0572 int tipc_bclink_reset_stats(struct net *net, struct tipc_link *l)
0573 {
0574     if (!l)
0575         return -ENOPROTOOPT;
0576 
0577     tipc_bcast_lock(net);
0578     tipc_link_reset_stats(l);
0579     tipc_bcast_unlock(net);
0580     return 0;
0581 }
0582 
0583 static int tipc_bc_link_set_queue_limits(struct net *net, u32 max_win)
0584 {
0585     struct tipc_link *l = tipc_bc_sndlink(net);
0586 
0587     if (!l)
0588         return -ENOPROTOOPT;
0589     if (max_win < BCLINK_WIN_MIN)
0590         max_win = BCLINK_WIN_MIN;
0591     if (max_win > TIPC_MAX_LINK_WIN)
0592         return -EINVAL;
0593     tipc_bcast_lock(net);
0594     tipc_link_set_queue_limits(l, tipc_link_min_win(l), max_win);
0595     tipc_bcast_unlock(net);
0596     return 0;
0597 }
0598 
0599 static int tipc_bc_link_set_broadcast_mode(struct net *net, u32 bc_mode)
0600 {
0601     struct tipc_bc_base *bb = tipc_bc_base(net);
0602 
0603     switch (bc_mode) {
0604     case BCLINK_MODE_BCAST:
0605         if (!bb->bcast_support)
0606             return -ENOPROTOOPT;
0607 
0608         bb->force_bcast = true;
0609         bb->force_rcast = false;
0610         break;
0611     case BCLINK_MODE_RCAST:
0612         if (!bb->rcast_support)
0613             return -ENOPROTOOPT;
0614 
0615         bb->force_bcast = false;
0616         bb->force_rcast = true;
0617         break;
0618     case BCLINK_MODE_SEL:
0619         if (!bb->bcast_support || !bb->rcast_support)
0620             return -ENOPROTOOPT;
0621 
0622         bb->force_bcast = false;
0623         bb->force_rcast = false;
0624         break;
0625     default:
0626         return -EINVAL;
0627     }
0628 
0629     return 0;
0630 }
0631 
0632 static int tipc_bc_link_set_broadcast_ratio(struct net *net, u32 bc_ratio)
0633 {
0634     struct tipc_bc_base *bb = tipc_bc_base(net);
0635 
0636     if (!bb->bcast_support || !bb->rcast_support)
0637         return -ENOPROTOOPT;
0638 
0639     if (bc_ratio > 100 || bc_ratio <= 0)
0640         return -EINVAL;
0641 
0642     bb->rc_ratio = bc_ratio;
0643     tipc_bcast_lock(net);
0644     tipc_bcbase_calc_bc_threshold(net);
0645     tipc_bcast_unlock(net);
0646 
0647     return 0;
0648 }
0649 
0650 int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[])
0651 {
0652     int err;
0653     u32 win;
0654     u32 bc_mode;
0655     u32 bc_ratio;
0656     struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
0657 
0658     if (!attrs[TIPC_NLA_LINK_PROP])
0659         return -EINVAL;
0660 
0661     err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP], props);
0662     if (err)
0663         return err;
0664 
0665     if (!props[TIPC_NLA_PROP_WIN] &&
0666         !props[TIPC_NLA_PROP_BROADCAST] &&
0667         !props[TIPC_NLA_PROP_BROADCAST_RATIO]) {
0668         return -EOPNOTSUPP;
0669     }
0670 
0671     if (props[TIPC_NLA_PROP_BROADCAST]) {
0672         bc_mode = nla_get_u32(props[TIPC_NLA_PROP_BROADCAST]);
0673         err = tipc_bc_link_set_broadcast_mode(net, bc_mode);
0674     }
0675 
0676     if (!err && props[TIPC_NLA_PROP_BROADCAST_RATIO]) {
0677         bc_ratio = nla_get_u32(props[TIPC_NLA_PROP_BROADCAST_RATIO]);
0678         err = tipc_bc_link_set_broadcast_ratio(net, bc_ratio);
0679     }
0680 
0681     if (!err && props[TIPC_NLA_PROP_WIN]) {
0682         win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
0683         err = tipc_bc_link_set_queue_limits(net, win);
0684     }
0685 
0686     return err;
0687 }
0688 
0689 int tipc_bcast_init(struct net *net)
0690 {
0691     struct tipc_net *tn = tipc_net(net);
0692     struct tipc_bc_base *bb = NULL;
0693     struct tipc_link *l = NULL;
0694 
0695     bb = kzalloc(sizeof(*bb), GFP_KERNEL);
0696     if (!bb)
0697         goto enomem;
0698     tn->bcbase = bb;
0699     spin_lock_init(&tipc_net(net)->bclock);
0700 
0701     if (!tipc_link_bc_create(net, 0, 0, NULL,
0702                  one_page_mtu,
0703                  BCLINK_WIN_DEFAULT,
0704                  BCLINK_WIN_DEFAULT,
0705                  0,
0706                  &bb->inputq,
0707                  NULL,
0708                  NULL,
0709                  &l))
0710         goto enomem;
0711     bb->link = l;
0712     tn->bcl = l;
0713     bb->rc_ratio = 10;
0714     bb->rcast_support = true;
0715     return 0;
0716 enomem:
0717     kfree(bb);
0718     kfree(l);
0719     return -ENOMEM;
0720 }
0721 
0722 void tipc_bcast_stop(struct net *net)
0723 {
0724     struct tipc_net *tn = net_generic(net, tipc_net_id);
0725 
0726     synchronize_net();
0727     kfree(tn->bcbase);
0728     kfree(tn->bcl);
0729 }
0730 
0731 void tipc_nlist_init(struct tipc_nlist *nl, u32 self)
0732 {
0733     memset(nl, 0, sizeof(*nl));
0734     INIT_LIST_HEAD(&nl->list);
0735     nl->self = self;
0736 }
0737 
0738 void tipc_nlist_add(struct tipc_nlist *nl, u32 node)
0739 {
0740     if (node == nl->self)
0741         nl->local = true;
0742     else if (tipc_dest_push(&nl->list, node, 0))
0743         nl->remote++;
0744 }
0745 
0746 void tipc_nlist_del(struct tipc_nlist *nl, u32 node)
0747 {
0748     if (node == nl->self)
0749         nl->local = false;
0750     else if (tipc_dest_del(&nl->list, node, 0))
0751         nl->remote--;
0752 }
0753 
0754 void tipc_nlist_purge(struct tipc_nlist *nl)
0755 {
0756     tipc_dest_list_purge(&nl->list);
0757     nl->remote = 0;
0758     nl->local = false;
0759 }
0760 
0761 u32 tipc_bcast_get_mode(struct net *net)
0762 {
0763     struct tipc_bc_base *bb = tipc_bc_base(net);
0764 
0765     if (bb->force_bcast)
0766         return BCLINK_MODE_BCAST;
0767 
0768     if (bb->force_rcast)
0769         return BCLINK_MODE_RCAST;
0770 
0771     if (bb->bcast_support && bb->rcast_support)
0772         return BCLINK_MODE_SEL;
0773 
0774     return 0;
0775 }
0776 
0777 u32 tipc_bcast_get_broadcast_ratio(struct net *net)
0778 {
0779     struct tipc_bc_base *bb = tipc_bc_base(net);
0780 
0781     return bb->rc_ratio;
0782 }
0783 
0784 void tipc_mcast_filter_msg(struct net *net, struct sk_buff_head *defq,
0785                struct sk_buff_head *inputq)
0786 {
0787     struct sk_buff *skb, *_skb, *tmp;
0788     struct tipc_msg *hdr, *_hdr;
0789     bool match = false;
0790     u32 node, port;
0791 
0792     skb = skb_peek(inputq);
0793     if (!skb)
0794         return;
0795 
0796     hdr = buf_msg(skb);
0797 
0798     if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq)))
0799         return;
0800 
0801     node = msg_orignode(hdr);
0802     if (node == tipc_own_addr(net))
0803         return;
0804 
0805     port = msg_origport(hdr);
0806 
0807     /* Has the twin SYN message already arrived ? */
0808     skb_queue_walk(defq, _skb) {
0809         _hdr = buf_msg(_skb);
0810         if (msg_orignode(_hdr) != node)
0811             continue;
0812         if (msg_origport(_hdr) != port)
0813             continue;
0814         match = true;
0815         break;
0816     }
0817 
0818     if (!match) {
0819         if (!msg_is_syn(hdr))
0820             return;
0821         __skb_dequeue(inputq);
0822         __skb_queue_tail(defq, skb);
0823         return;
0824     }
0825 
0826     /* Deliver non-SYN message from other link, otherwise queue it */
0827     if (!msg_is_syn(hdr)) {
0828         if (msg_is_rcast(hdr) != msg_is_rcast(_hdr))
0829             return;
0830         __skb_dequeue(inputq);
0831         __skb_queue_tail(defq, skb);
0832         return;
0833     }
0834 
0835     /* Queue non-SYN/SYN message from same link */
0836     if (msg_is_rcast(hdr) == msg_is_rcast(_hdr)) {
0837         __skb_dequeue(inputq);
0838         __skb_queue_tail(defq, skb);
0839         return;
0840     }
0841 
0842     /* Matching SYN messages => return the one with data, if any */
0843     __skb_unlink(_skb, defq);
0844     if (msg_data_sz(hdr)) {
0845         kfree_skb(_skb);
0846     } else {
0847         __skb_dequeue(inputq);
0848         kfree_skb(skb);
0849         __skb_queue_tail(inputq, _skb);
0850     }
0851 
0852     /* Deliver subsequent non-SYN messages from same peer */
0853     skb_queue_walk_safe(defq, _skb, tmp) {
0854         _hdr = buf_msg(_skb);
0855         if (msg_orignode(_hdr) != node)
0856             continue;
0857         if (msg_origport(_hdr) != port)
0858             continue;
0859         if (msg_is_syn(_hdr))
0860             break;
0861         __skb_unlink(_skb, defq);
0862         __skb_queue_tail(inputq, _skb);
0863     }
0864 }